/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smart_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2009-02-25 15:36:48 UTC
  • mfrom: (4048 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4050.
  • Revision ID: jelmer@samba.org-20090225153648-7r5mk20nr9dttqbf
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
68
68
 
69
69
    def __init__(self, vendor):
70
70
        self.vendor = vendor
71
 
    
 
71
 
72
72
    def close(self):
73
73
        self.vendor.calls.append(('close', ))
74
 
        
 
74
 
75
75
    def get_filelike_channels(self):
76
76
        return self.vendor.read_from, self.vendor.write_to
77
77
 
78
78
 
79
79
class _InvalidHostnameFeature(tests.Feature):
80
80
    """Does 'non_existent.invalid' fail to resolve?
81
 
    
 
81
 
82
82
    RFC 2606 states that .invalid is reserved for invalid domain names, and
83
83
    also underscores are not a valid character in domain names.  Despite this,
84
84
    it's possible a badly misconfigured name server might decide to always
132
132
        t = threading.Thread(target=_receive_bytes_on_server)
133
133
        t.start()
134
134
        return t
135
 
    
 
135
 
136
136
    def test_construct_smart_simple_pipes_client_medium(self):
137
137
        # the SimplePipes client medium takes two pipes:
138
138
        # readable pipe, writeable pipe.
139
139
        # Constructing one should just save these and do nothing.
140
140
        # We test this by passing in None.
141
141
        client_medium = medium.SmartSimplePipesClientMedium(None, None, None)
142
 
        
 
142
 
143
143
    def test_simple_pipes_client_request_type(self):
144
144
        # SimplePipesClient should use SmartClientStreamMediumRequest's.
145
145
        client_medium = medium.SmartSimplePipesClientMedium(None, None, None)
148
148
 
149
149
    def test_simple_pipes_client_get_concurrent_requests(self):
150
150
        # the simple_pipes client does not support pipelined requests:
151
 
        # but it does support serial requests: we construct one after 
 
151
        # but it does support serial requests: we construct one after
152
152
        # another is finished. This is a smoke test testing the integration
153
153
        # of the SmartClientStreamMediumRequest and the SmartClientStreamMedium
154
154
        # classes - as the sibling classes share this logic, they do not have
170
170
            None, output, 'base')
171
171
        client_medium._accept_bytes('abc')
172
172
        self.assertEqual('abc', output.getvalue())
173
 
    
 
173
 
174
174
    def test_simple_pipes_client_disconnect_does_nothing(self):
175
175
        # calling disconnect does nothing.
176
176
        input = StringIO()
197
197
        self.assertFalse(input.closed)
198
198
        self.assertFalse(output.closed)
199
199
        self.assertEqual('abcabc', output.getvalue())
200
 
    
 
200
 
201
201
    def test_simple_pipes_client_ignores_disconnect_when_not_connected(self):
202
202
        # Doing a disconnect on a new (and thus unconnected) SimplePipes medium
203
203
        # does nothing.
212
212
        self.assertEqual('abc', client_medium.read_bytes(3))
213
213
        client_medium.disconnect()
214
214
        self.assertEqual('def', client_medium.read_bytes(3))
215
 
        
 
215
 
216
216
    def test_simple_pipes_client_supports__flush(self):
217
 
        # invoking _flush on a SimplePipesClient should flush the output 
 
217
        # invoking _flush on a SimplePipesClient should flush the output
218
218
        # pipe. We test this by creating an output pipe that records
219
219
        # flush calls made to it.
220
220
        from StringIO import StringIO # get regular StringIO
262
262
            'a hostname', 'a port',
263
263
            ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes'])],
264
264
            vendor.calls)
265
 
    
 
265
 
266
266
    def test_ssh_client_changes_command_when_BZR_REMOTE_PATH_is_set(self):
267
267
        # The only thing that initiates a connection from the medium is giving
268
268
        # it bytes.
283
283
            'a hostname', 'a port',
284
284
            ['fugly', 'serve', '--inet', '--directory=/', '--allow-writes'])],
285
285
            vendor.calls)
286
 
    
 
286
 
287
287
    def test_ssh_client_changes_command_when_bzr_remote_path_passed(self):
288
288
        # The only thing that initiates a connection from the medium is giving
289
289
        # it bytes.
350
350
            ('close', ),
351
351
            ],
352
352
            vendor.calls)
353
 
    
 
353
 
354
354
    def test_ssh_client_ignores_disconnect_when_not_connected(self):
355
355
        # Doing a disconnect on a new (and thus unconnected) SSH medium
356
356
        # does not fail.  It's ok to disconnect an unconnected medium.
369
369
                          1)
370
370
 
371
371
    def test_ssh_client_supports__flush(self):
372
 
        # invoking _flush on a SSHClientMedium should flush the output 
 
372
        # invoking _flush on a SSHClientMedium should flush the output
373
373
        # pipe. We test this by creating an output pipe that records
374
374
        # flush calls made to it.
375
375
        from StringIO import StringIO # get regular StringIO
387
387
        client_medium._flush()
388
388
        client_medium.disconnect()
389
389
        self.assertEqual(['flush'], flush_calls)
390
 
        
 
390
 
391
391
    def test_construct_smart_tcp_client_medium(self):
392
392
        # the TCP client medium takes a host and a port.  Constructing it won't
393
393
        # connect to anything.
408
408
        t.join()
409
409
        sock.close()
410
410
        self.assertEqual(['abc'], bytes)
411
 
    
 
411
 
412
412
    def test_tcp_client_disconnect_does_so(self):
413
413
        # calling disconnect on the client terminates the connection.
414
414
        # we test this by forcing a short read during a socket.MSG_WAITALL
425
425
        # really did disconnect.
426
426
        medium.disconnect()
427
427
 
428
 
    
 
428
 
429
429
    def test_tcp_client_ignores_disconnect_when_not_connected(self):
430
430
        # Doing a disconnect on a new (and thus unconnected) TCP medium
431
431
        # does not fail.  It's ok to disconnect an unconnected medium.
468
468
 
469
469
class TestSmartClientStreamMediumRequest(tests.TestCase):
470
470
    """Tests the for SmartClientStreamMediumRequest.
471
 
    
472
 
    SmartClientStreamMediumRequest is a helper for the three stream based 
 
471
 
 
472
    SmartClientStreamMediumRequest is a helper for the three stream based
473
473
    mediums: TCP, SSH, SimplePipes, so we only test it once, and then test that
474
474
    those three mediums implement the interface it expects.
475
475
    """
476
476
 
477
477
    def test_accept_bytes_after_finished_writing_errors(self):
478
 
        # calling accept_bytes after calling finished_writing raises 
 
478
        # calling accept_bytes after calling finished_writing raises
479
479
        # WritingCompleted to prevent bad assumptions on stream environments
480
480
        # breaking the needs of message-based environments.
481
481
        output = StringIO()
537
537
            None, None, 'base')
538
538
        request = medium.SmartClientStreamMediumRequest(client_medium)
539
539
        self.assertRaises(errors.WritingNotComplete, request.finished_reading)
540
 
        
 
540
 
541
541
    def test_read_bytes(self):
542
542
        # read bytes should invoke _read_bytes on the stream medium.
543
543
        # we test this by using the SimplePipes medium - the most trivial one
544
 
        # and checking that the data is supplied. Its possible that a 
 
544
        # and checking that the data is supplied. Its possible that a
545
545
        # faulty implementation could poke at the pipe variables them selves,
546
546
        # but we trust that this will be caught as it will break the integration
547
547
        # smoke tests.
566
566
        self.assertRaises(errors.WritingNotComplete, request.read_bytes, None)
567
567
 
568
568
    def test_read_bytes_after_finished_reading_errors(self):
569
 
        # calling read_bytes after calling finished_reading raises 
 
569
        # calling read_bytes after calling finished_reading raises
570
570
        # ReadingCompleted to prevent bad assumptions on stream environments
571
571
        # breaking the needs of message-based environments.
572
572
        output = StringIO()
604
604
 
605
605
 
606
606
class SampleRequest(object):
607
 
    
 
607
 
608
608
    def __init__(self, expected_bytes):
609
609
        self.accepted_bytes = ''
610
610
        self._finished_reading = False
632
632
 
633
633
    def portable_socket_pair(self):
634
634
        """Return a pair of TCP sockets connected to each other.
635
 
        
 
635
 
636
636
        Unlike socket.socketpair, this should work on Windows.
637
637
        """
638
638
        listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
643
643
        server_sock, addr = listen_sock.accept()
644
644
        listen_sock.close()
645
645
        return server_sock, client_sock
646
 
    
 
646
 
647
647
    def test_smart_query_version(self):
648
648
        """Feed a canned query version to a server"""
649
649
        # wire-to-wire, using the whole stack
723
723
        server = medium.SmartServerPipeStreamMedium(to_server, from_server, None)
724
724
        server._serve_one_request(SampleRequest('x'))
725
725
        self.assertTrue(server.finished)
726
 
        
 
726
 
727
727
    def test_socket_stream_shutdown_detection(self):
728
728
        server_sock, client_sock = self.portable_socket_pair()
729
729
        client_sock.close()
731
731
            server_sock, None)
732
732
        server._serve_one_request(SampleRequest('x'))
733
733
        self.assertTrue(server.finished)
734
 
        
 
734
 
735
735
    def test_socket_stream_incomplete_request(self):
736
736
        """The medium should still construct the right protocol version even if
737
737
        the initial read only reads part of the request.
815
815
        self.assertEqual('', from_server.getvalue())
816
816
        self.assertEqual(sample_request_bytes, second_protocol.accepted_bytes)
817
817
        self.assertFalse(server.finished)
818
 
        
 
818
 
819
819
    def test_socket_stream_with_two_requests(self):
820
820
        # If two requests are read in one go, then two calls to
821
821
        # _serve_one_request should still process both of them as if they had
856
856
        self.assertEqual('', from_server.getvalue())
857
857
        self.assertTrue(self.closed)
858
858
        self.assertTrue(server.finished)
859
 
        
 
859
 
860
860
    def test_socket_stream_error_handling(self):
861
861
        server_sock, client_sock = self.portable_socket_pair()
862
862
        server = medium.SmartServerSocketStreamMedium(
867
867
        # closed.
868
868
        self.assertEqual('', client_sock.recv(1))
869
869
        self.assertTrue(server.finished)
870
 
        
 
870
 
871
871
    def test_pipe_like_stream_keyboard_interrupt_handling(self):
872
872
        to_server = StringIO('')
873
873
        from_server = StringIO()
918
918
        # Any empty request (i.e. no bytes) is detected as protocol version one.
919
919
        server_protocol = self.build_protocol_pipe_like('')
920
920
        self.assertProtocolOne(server_protocol)
921
 
        
 
921
 
922
922
    def test_socket_like_build_protocol_empty_bytes(self):
923
923
        # Any empty request (i.e. no bytes) is detected as protocol version one.
924
924
        server_protocol = self.build_protocol_socket('')
959
959
        self.assertEqual(
960
960
            protocol.build_server_protocol_three, protocol_factory)
961
961
        self.assertEqual('extra bytes', remainder)
962
 
        
 
962
 
963
963
    def test_version_two(self):
964
964
        result = medium._get_protocol_factory_for_bytes(
965
965
            'bzr request 2\nextra bytes')
967
967
        self.assertEqual(
968
968
            protocol.SmartServerRequestProtocolTwo, protocol_factory)
969
969
        self.assertEqual('extra bytes', remainder)
970
 
        
 
970
 
971
971
    def test_version_one(self):
972
972
        """Version one requests have no version markers."""
973
973
        result = medium._get_protocol_factory_for_bytes('anything\n')
975
975
        self.assertEqual(
976
976
            protocol.SmartServerRequestProtocolOne, protocol_factory)
977
977
        self.assertEqual('anything\n', remainder)
978
 
        
 
978
 
979
979
 
980
980
class TestSmartTCPServer(tests.TestCase):
981
981
 
1229
1229
    Note: these tests are rudimentary versions of the command object tests in
1230
1230
    test_smart.py.
1231
1231
    """
1232
 
        
 
1232
 
1233
1233
    def test_hello(self):
1234
1234
        cmd = _mod_request.HelloRequest(None, '/')
1235
1235
        response = cmd.execute()
1236
1236
        self.assertEqual(('ok', '2'), response.args)
1237
1237
        self.assertEqual(None, response.body)
1238
 
        
 
1238
 
1239
1239
    def test_get_bundle(self):
1240
1240
        from bzrlib.bundle import serializer
1241
1241
        wt = self.make_branch_and_tree('.')
1242
1242
        self.build_tree_contents([('hello', 'hello world')])
1243
1243
        wt.add('hello')
1244
1244
        rev_id = wt.commit('add hello')
1245
 
        
 
1245
 
1246
1246
        cmd = _mod_request.GetBundleRequest(self.get_transport(), '/')
1247
1247
        response = cmd.execute('.', rev_id)
1248
1248
        bundle = serializer.read_bundle(StringIO(response.body))
1272
1272
        handler.dispatch_command('hello', ())
1273
1273
        self.assertEqual(('ok', '2'), handler.response.args)
1274
1274
        self.assertEqual(None, handler.response.body)
1275
 
        
 
1275
 
1276
1276
    def test_disable_vfs_handler_classes_via_environment(self):
1277
1277
        # VFS handler classes will raise an error from "execute" if
1278
1278
        # BZR_NO_SMART_VFS is set.
1316
1316
        self.assertTrue(handler.finished_reading)
1317
1317
        self.assertEqual(('ok', ), handler.response.args)
1318
1318
        self.assertEqual(None, handler.response.body)
1319
 
        
 
1319
 
1320
1320
    def test_readv_accept_body(self):
1321
1321
        """'readv' should set finished_reading after reading offsets."""
1322
1322
        self.build_tree(['a-file'])
1363
1363
 
1364
1364
 
1365
1365
class TestRemoteTransport(tests.TestCase):
1366
 
        
 
1366
 
1367
1367
    def test_use_connection_factory(self):
1368
1368
        # We want to be able to pass a client as a parameter to RemoteTransport.
1369
1369
        input = StringIO('ok\n3\nbardone\n')
1468
1468
    def assertOffsetSerialisation(self, expected_offsets, expected_serialised,
1469
1469
        requester):
1470
1470
        """Check that smart (de)serialises offsets as expected.
1471
 
        
 
1471
 
1472
1472
        We check both serialisation and deserialisation at the same time
1473
1473
        to ensure that the round tripping cannot skew: both directions should
1474
1474
        be as expected.
1475
 
        
 
1475
 
1476
1476
        :param expected_offsets: a readv offset list.
1477
1477
        :param expected_seralised: an expected serial form of the offsets.
1478
1478
        """
1489
1489
        smart_protocol._has_dispatched = True
1490
1490
        smart_protocol.request = _mod_request.SmartServerRequestHandler(
1491
1491
            None, _mod_request.request_handlers, '/')
1492
 
        class FakeCommand(object):
1493
 
            def do_body(cmd, body_bytes):
 
1492
        class FakeCommand(_mod_request.SmartServerRequest):
 
1493
            def do_body(self_cmd, body_bytes):
1494
1494
                self.end_received = True
1495
1495
                self.assertEqual('abcdefg', body_bytes)
1496
1496
                return _mod_request.SuccessfulSmartServerResponse(('ok', ))
1497
 
        smart_protocol.request._command = FakeCommand()
 
1497
        smart_protocol.request._command = FakeCommand(None)
1498
1498
        # Call accept_bytes to make sure that internal state like _body_decoder
1499
1499
        # is initialised.  This test should probably be given a clearer
1500
1500
        # interface to work with that will not cause this inconsistency.
1654
1654
 
1655
1655
    def test_query_version(self):
1656
1656
        """query_version on a SmartClientProtocolOne should return a number.
1657
 
        
 
1657
 
1658
1658
        The protocol provides the query_version because the domain level clients
1659
1659
        may all need to be able to probe for capabilities.
1660
1660
        """
1925
1925
 
1926
1926
    def test_query_version(self):
1927
1927
        """query_version on a SmartClientProtocolTwo should return a number.
1928
 
        
 
1928
 
1929
1929
        The protocol provides the query_version because the domain level clients
1930
1930
        may all need to be able to probe for capabilities.
1931
1931
        """
2270
2270
        self.assertEqual(4, smart_protocol.next_read_size())
2271
2271
 
2272
2272
 
2273
 
class NoOpRequest(_mod_request.SmartServerRequest):
2274
 
 
2275
 
    def do(self):
2276
 
        return _mod_request.SuccessfulSmartServerResponse(())
2277
 
 
2278
 
dummy_registry = {'ARG': NoOpRequest}
2279
 
 
2280
 
 
2281
2273
class LoggingMessageHandler(object):
2282
2274
 
2283
2275
    def __init__(self):
2358
2350
            '\0\0\0\x07' # length prefix
2359
2351
            'l3:ARGe' # ['ARG']
2360
2352
            )
2361
 
        self.assertEqual([('structure', ['ARG'])], event_log)
 
2353
        self.assertEqual([('structure', ('ARG',))], event_log)
2362
2354
 
2363
2355
    def test_decode_multiple_bytes(self):
2364
2356
        """The protocol can decode a multiple 'bytes' message parts."""
2375
2367
            [('bytes', 'first'), ('bytes', 'second')], event_log)
2376
2368
 
2377
2369
 
2378
 
class TestConventionalResponseHandler(tests.TestCase):
 
2370
class TestConventionalResponseHandlerBodyStream(tests.TestCase):
2379
2371
 
2380
2372
    def make_response_handler(self, response_bytes):
2381
2373
        from bzrlib.smart.message import ConventionalResponseHandler
2392
2384
            protocol_decoder, medium_request)
2393
2385
        return response_handler
2394
2386
 
2395
 
    def test_body_stream_interrupted_by_error(self):
 
2387
    def test_interrupted_by_error(self):
2396
2388
        interrupted_body_stream = (
2397
2389
            'oS' # successful response
2398
2390
            's\0\0\0\x02le' # empty args
2409
2401
        exc = self.assertRaises(errors.ErrorFromSmartServer, stream.next)
2410
2402
        self.assertEqual(('error', 'abc'), exc.error_tuple)
2411
2403
 
2412
 
    def test_body_stream_interrupted_by_connection_lost(self):
 
2404
    def test_interrupted_by_connection_lost(self):
2413
2405
        interrupted_body_stream = (
2414
2406
            'oS' # successful response
2415
2407
            's\0\0\0\x02le' # empty args
2427
2419
        self.assertRaises(
2428
2420
            errors.ConnectionReset, response_handler.read_body_bytes)
2429
2421
 
 
2422
    def test_multiple_bytes_parts(self):
 
2423
        multiple_bytes_parts = (
 
2424
            'oS' # successful response
 
2425
            's\0\0\0\x02le' # empty args
 
2426
            'b\0\0\0\x0bSome bytes\n' # some bytes
 
2427
            'b\0\0\0\x0aMore bytes' # more bytes
 
2428
            'e' # message end
 
2429
            )
 
2430
        response_handler = self.make_response_handler(multiple_bytes_parts)
 
2431
        self.assertEqual(
 
2432
            'Some bytes\nMore bytes', response_handler.read_body_bytes())
 
2433
        response_handler = self.make_response_handler(multiple_bytes_parts)
 
2434
        self.assertEqual(
 
2435
            ['Some bytes\n', 'More bytes'],
 
2436
            list(response_handler.read_streamed_body()))
 
2437
 
 
2438
 
 
2439
class FakeResponder(object):
 
2440
 
 
2441
    response_sent = False
 
2442
 
 
2443
    def send_error(self, exc):
 
2444
        raise exc
 
2445
 
 
2446
    def send_response(self, response):
 
2447
        pass
 
2448
 
 
2449
 
 
2450
class TestConventionalRequestHandlerBodyStream(tests.TestCase):
 
2451
    """Tests for ConventionalRequestHandler's handling of request bodies."""
 
2452
 
 
2453
    def make_request_handler(self, request_bytes):
 
2454
        """Make a ConventionalRequestHandler for the given bytes using test
 
2455
        doubles for the request_handler and the responder.
 
2456
        """
 
2457
        from bzrlib.smart.message import ConventionalRequestHandler
 
2458
        request_handler = InstrumentedRequestHandler()
 
2459
        request_handler.response = _mod_request.SuccessfulSmartServerResponse(('arg', 'arg'))
 
2460
        responder = FakeResponder()
 
2461
        message_handler = ConventionalRequestHandler(request_handler, responder)
 
2462
        protocol_decoder = protocol.ProtocolThreeDecoder(message_handler)
 
2463
        # put decoder in desired state (waiting for message parts)
 
2464
        protocol_decoder.state_accept = protocol_decoder._state_accept_expecting_message_part
 
2465
        protocol_decoder.accept_bytes(request_bytes)
 
2466
        return request_handler
 
2467
 
 
2468
    def test_multiple_bytes_parts(self):
 
2469
        """Each bytes part triggers a call to the request_handler's
 
2470
        accept_body method.
 
2471
        """
 
2472
        multiple_bytes_parts = (
 
2473
            's\0\0\0\x07l3:fooe' # args
 
2474
            'b\0\0\0\x0bSome bytes\n' # some bytes
 
2475
            'b\0\0\0\x0aMore bytes' # more bytes
 
2476
            'e' # message end
 
2477
            )
 
2478
        request_handler = self.make_request_handler(multiple_bytes_parts)
 
2479
        accept_body_calls = [
 
2480
            call_info[1] for call_info in request_handler.calls
 
2481
            if call_info[0] == 'accept_body']
 
2482
        self.assertEqual(
 
2483
            ['Some bytes\n', 'More bytes'], accept_body_calls)
 
2484
 
 
2485
    def test_error_flag_after_body(self):
 
2486
        body_then_error = (
 
2487
            's\0\0\0\x07l3:fooe' # request args
 
2488
            'b\0\0\0\x0bSome bytes\n' # some bytes
 
2489
            'b\0\0\0\x0aMore bytes' # more bytes
 
2490
            'oE' # error flag
 
2491
            's\0\0\0\x07l3:bare' # error args
 
2492
            'e' # message end
 
2493
            )
 
2494
        request_handler = self.make_request_handler(body_then_error)
 
2495
        self.assertEqual(
 
2496
            [('post_body_error_received', ('bar',)), ('end_received',)],
 
2497
            request_handler.calls[-2:])
 
2498
 
2430
2499
 
2431
2500
class TestMessageHandlerErrors(tests.TestCase):
2432
2501
    """Tests for v3 that unrecognised (but well-formed) requests/responses are
2476
2545
 
2477
2546
    def __init__(self):
2478
2547
        self.calls = []
2479
 
 
2480
 
    def body_chunk_received(self, chunk_bytes):
2481
 
        self.calls.append(('body_chunk_received', chunk_bytes))
 
2548
        self.finished_reading = False
2482
2549
 
2483
2550
    def no_body_received(self):
2484
2551
        self.calls.append(('no_body_received',))
2485
2552
 
2486
 
    def prefixed_body_received(self, body_bytes):
2487
 
        self.calls.append(('prefixed_body_received', body_bytes))
2488
 
 
2489
2553
    def end_received(self):
2490
2554
        self.calls.append(('end_received',))
 
2555
        self.finished_reading = True
 
2556
 
 
2557
    def dispatch_command(self, cmd, args):
 
2558
        self.calls.append(('dispatch_command', cmd, args))
 
2559
 
 
2560
    def accept_body(self, bytes):
 
2561
        self.calls.append(('accept_body', bytes))
 
2562
 
 
2563
    def end_of_body(self):
 
2564
        self.calls.append(('end_of_body',))
 
2565
        self.finished_reading = True
 
2566
 
 
2567
    def post_body_error_received(self, error_args):
 
2568
        self.calls.append(('post_body_error_received', error_args))
2491
2569
 
2492
2570
 
2493
2571
class StubRequest(object):
2529
2607
        # The message handler has been invoked with all the parts of the
2530
2608
        # trivial response: empty headers, status byte, no args, end.
2531
2609
        self.assertEqual(
2532
 
            [('headers', {}), ('byte', 'S'), ('structure', []), ('end',)],
 
2610
            [('headers', {}), ('byte', 'S'), ('structure', ()), ('end',)],
2533
2611
            response_handler.event_log)
2534
2612
 
2535
2613
    def test_incomplete_message(self):
2643
2721
        self.assertEqual(
2644
2722
            ['accept_bytes', 'finished_writing'], medium_request.calls)
2645
2723
 
 
2724
    def test_call_with_body_stream_smoke_test(self):
 
2725
        """A smoke test for ProtocolThreeRequester.call_with_body_stream.
 
2726
 
 
2727
        This test checks that a particular simple invocation of
 
2728
        call_with_body_stream emits the correct bytes for that invocation.
 
2729
        """
 
2730
        requester, output = self.make_client_encoder_and_output()
 
2731
        requester.set_headers({'header name': 'header value'})
 
2732
        stream = ['chunk 1', 'chunk two']
 
2733
        requester.call_with_body_stream(('one arg',), stream)
 
2734
        self.assertEquals(
 
2735
            'bzr message 3 (bzr 1.6)\n' # protocol version
 
2736
            '\x00\x00\x00\x1fd11:header name12:header valuee' # headers
 
2737
            's\x00\x00\x00\x0bl7:one arge' # args
 
2738
            'b\x00\x00\x00\x07chunk 1' # a prefixed body chunk
 
2739
            'b\x00\x00\x00\x09chunk two' # a prefixed body chunk
 
2740
            'e', # end
 
2741
            output.getvalue())
 
2742
 
 
2743
    def test_call_with_body_stream_empty_stream(self):
 
2744
        """call_with_body_stream with an empty stream."""
 
2745
        requester, output = self.make_client_encoder_and_output()
 
2746
        requester.set_headers({})
 
2747
        stream = []
 
2748
        requester.call_with_body_stream(('one arg',), stream)
 
2749
        self.assertEquals(
 
2750
            'bzr message 3 (bzr 1.6)\n' # protocol version
 
2751
            '\x00\x00\x00\x02de' # headers
 
2752
            's\x00\x00\x00\x0bl7:one arge' # args
 
2753
            # no body chunks
 
2754
            'e', # end
 
2755
            output.getvalue())
 
2756
 
 
2757
    def test_call_with_body_stream_error(self):
 
2758
        """call_with_body_stream will abort the streamed body with an
 
2759
        error if the stream raises an error during iteration.
 
2760
 
 
2761
        The resulting request will still be a complete message.
 
2762
        """
 
2763
        requester, output = self.make_client_encoder_and_output()
 
2764
        requester.set_headers({})
 
2765
        def stream_that_fails():
 
2766
            yield 'aaa'
 
2767
            yield 'bbb'
 
2768
            raise Exception('Boom!')
 
2769
        self.assertRaises(Exception, requester.call_with_body_stream,
 
2770
            ('one arg',), stream_that_fails())
 
2771
        self.assertEquals(
 
2772
            'bzr message 3 (bzr 1.6)\n' # protocol version
 
2773
            '\x00\x00\x00\x02de' # headers
 
2774
            's\x00\x00\x00\x0bl7:one arge' # args
 
2775
            'b\x00\x00\x00\x03aaa' # body
 
2776
            'b\x00\x00\x00\x03bbb' # more body
 
2777
            'oE' # error flag
 
2778
            's\x00\x00\x00\x09l5:errore' # error args: ('error',)
 
2779
            'e', # end
 
2780
            output.getvalue())
 
2781
 
2646
2782
 
2647
2783
class StubMediumRequest(object):
2648
2784
    """A stub medium request that tracks the number of times accept_bytes is
2697
2833
        self.assertEqual(
2698
2834
            expected_count, len(self.writes),
2699
2835
            "Too many writes: %r" % (self.writes,))
2700
 
        
 
2836
 
2701
2837
    def test_send_error_writes_just_once(self):
2702
2838
        """An error response is written to the medium all at once."""
2703
2839
        self.responder.send_error(Exception('An exception string.'))
2772
2908
 
2773
2909
class MockMedium(medium.SmartClientMedium):
2774
2910
    """A mock medium that can be used to test _SmartClient.
2775
 
    
 
2911
 
2776
2912
    It can be given a series of requests to expect (and responses it should
2777
2913
    return for them).  It can also be told when the client is expected to
2778
2914
    disconnect a medium.  Expectations must be satisfied in the order they are
2790
2926
        super(MockMedium, self).__init__('dummy base')
2791
2927
        self._mock_request = _MockMediumRequest(self)
2792
2928
        self._expected_events = []
2793
 
        
 
2929
 
2794
2930
    def expect_request(self, request_bytes, response_bytes,
2795
2931
                       allow_partial_read=False):
2796
2932
        """Expect 'request_bytes' to be sent, and reply with 'response_bytes'.
2799
2935
        called to send the request.  Similarly, no assumption is made about how
2800
2936
        many times read_bytes/read_line are called by protocol code to read a
2801
2937
        response.  e.g.::
2802
 
        
 
2938
 
2803
2939
            request.accept_bytes('ab')
2804
2940
            request.accept_bytes('cd')
2805
2941
            request.finished_writing()
2806
2942
 
2807
2943
        and::
2808
 
        
 
2944
 
2809
2945
            request.accept_bytes('abcd')
2810
2946
            request.finished_writing()
2811
2947
 
2996
3132
    def test_first_response_is_error(self):
2997
3133
        """If the server replies with an error, then the version detection
2998
3134
        should be complete.
2999
 
        
 
3135
 
3000
3136
        This test is very similar to test_version_two_server, but catches a bug
3001
3137
        we had in the case where the first reply was an error response.
3002
3138
        """
3042
3178
 
3043
3179
class LengthPrefixedBodyDecoder(tests.TestCase):
3044
3180
 
3045
 
    # XXX: TODO: make accept_reading_trailer invoke translate_response or 
 
3181
    # XXX: TODO: make accept_reading_trailer invoke translate_response or
3046
3182
    # something similar to the ProtocolBase method.
3047
3183
 
3048
3184
    def test_construct(self):
3084
3220
        self.assertEqual(1, decoder.next_read_size())
3085
3221
        self.assertEqual('', decoder.read_pending_data())
3086
3222
        self.assertEqual('blarg', decoder.unused_data)
3087
 
        
 
3223
 
3088
3224
    def test_accept_bytes_all_at_once_with_excess(self):
3089
3225
        decoder = protocol.LengthPrefixedBodyDecoder()
3090
3226
        decoder.accept_bytes('1\nadone\nunused')
3109
3245
 
3110
3246
class TestChunkedBodyDecoder(tests.TestCase):
3111
3247
    """Tests for ChunkedBodyDecoder.
3112
 
    
 
3248
 
3113
3249
    This is the body decoder used for protocol version two.
3114
3250
    """
3115
3251
 
3141
3277
        self.assertTrue(decoder.finished_reading)
3142
3278
        self.assertEqual(chunk_content, decoder.read_next_chunk())
3143
3279
        self.assertEqual('', decoder.unused_data)
3144
 
        
 
3280
 
3145
3281
    def test_incomplete_chunk(self):
3146
3282
        """When there are less bytes in the chunk than declared by the length,
3147
3283
        then we haven't finished reading yet.
3412
3548
        self.assertNotEquals(type(r), type(t))
3413
3549
 
3414
3550
 
3415
 
# TODO: Client feature that does get_bundle and then installs that into a
3416
 
# branch; this can be used in place of the regular pull/fetch operation when
3417
 
# coming from a smart server.
3418
 
#
3419
 
# TODO: Eventually, want to do a 'branch' command by fetching the whole
3420
 
# history as one big bundle.  How?  
3421
 
#
3422
 
# The branch command does 'br_from.sprout', which tries to preserve the same
3423
 
# format.  We don't necessarily even want that.  
3424
 
#
3425
 
# It might be simpler to handle cmd_pull first, which does a simpler fetch()
3426
 
# operation from one branch into another.  It already has some code for
3427
 
# pulling from a bundle, which it does by trying to see if the destination is
3428
 
# a bundle file.  So it seems the logic for pull ought to be:
3429
 
3430
 
#  - if it's a smart server, get a bundle from there and install that
3431
 
#  - if it's a bundle, install that
3432
 
#  - if it's a branch, pull from there
3433
 
#
3434
 
# Getting a bundle from a smart server is a bit different from reading a
3435
 
# bundle from a URL:
3436
 
#
3437
 
#  - we can reasonably remember the URL we last read from 
3438
 
#  - you can specify a revision number to pull, and we need to pass it across
3439
 
#    to the server as a limit on what will be requested
3440
 
#
3441
 
# TODO: Given a URL, determine whether it is a smart server or not (or perhaps
3442
 
# otherwise whether it's a bundle?)  Should this be a property or method of
3443
 
# the transport?  For the ssh protocol, we always know it's a smart server.
3444
 
# For http, we potentially need to probe.  But if we're explicitly given
3445
 
# bzr+http:// then we can skip that for now.