13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""Tests for smart transport"""
69
69
def __init__(self, vendor):
70
70
self.vendor = vendor
73
73
self.vendor.calls.append(('close', ))
75
75
def get_filelike_channels(self):
76
76
return self.vendor.read_from, self.vendor.write_to
79
79
class _InvalidHostnameFeature(tests.Feature):
80
80
"""Does 'non_existent.invalid' fail to resolve?
82
82
RFC 2606 states that .invalid is reserved for invalid domain names, and
83
83
also underscores are not a valid character in domain names. Despite this,
84
84
it's possible a badly misconfigured name server might decide to always
132
132
t = threading.Thread(target=_receive_bytes_on_server)
136
136
def test_construct_smart_simple_pipes_client_medium(self):
137
137
# the SimplePipes client medium takes two pipes:
138
138
# readable pipe, writeable pipe.
139
139
# Constructing one should just save these and do nothing.
140
140
# We test this by passing in None.
141
141
client_medium = medium.SmartSimplePipesClientMedium(None, None, None)
143
143
def test_simple_pipes_client_request_type(self):
144
144
# SimplePipesClient should use SmartClientStreamMediumRequest's.
145
145
client_medium = medium.SmartSimplePipesClientMedium(None, None, None)
212
212
self.assertEqual('abc', client_medium.read_bytes(3))
213
213
client_medium.disconnect()
214
214
self.assertEqual('def', client_medium.read_bytes(3))
216
216
def test_simple_pipes_client_supports__flush(self):
217
# invoking _flush on a SimplePipesClient should flush the output
217
# invoking _flush on a SimplePipesClient should flush the output
218
218
# pipe. We test this by creating an output pipe that records
219
219
# flush calls made to it.
220
220
from StringIO import StringIO # get regular StringIO
262
262
'a hostname', 'a port',
263
263
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes'])],
266
def test_ssh_client_changes_command_when_BZR_REMOTE_PATH_is_set(self):
267
# The only thing that initiates a connection from the medium is giving
270
vendor = StringIOSSHVendor(StringIO(), output)
271
orig_bzr_remote_path = os.environ.get('BZR_REMOTE_PATH')
272
def cleanup_environ():
273
osutils.set_or_unset_env('BZR_REMOTE_PATH', orig_bzr_remote_path)
274
self.addCleanup(cleanup_environ)
275
os.environ['BZR_REMOTE_PATH'] = 'fugly'
276
client_medium = self.callDeprecated(
277
['bzr_remote_path is required as of bzr 0.92'],
278
medium.SmartSSHClientMedium, 'a hostname', 'a port', 'a username',
279
'a password', 'base', vendor)
280
client_medium._accept_bytes('abc')
281
self.assertEqual('abc', output.getvalue())
282
self.assertEqual([('connect_ssh', 'a username', 'a password',
283
'a hostname', 'a port',
284
['fugly', 'serve', '--inet', '--directory=/', '--allow-writes'])],
287
266
def test_ssh_client_changes_command_when_bzr_remote_path_passed(self):
288
267
# The only thing that initiates a connection from the medium is giving
469
448
class TestSmartClientStreamMediumRequest(tests.TestCase):
470
449
"""Tests the for SmartClientStreamMediumRequest.
472
SmartClientStreamMediumRequest is a helper for the three stream based
451
SmartClientStreamMediumRequest is a helper for the three stream based
473
452
mediums: TCP, SSH, SimplePipes, so we only test it once, and then test that
474
453
those three mediums implement the interface it expects.
477
456
def test_accept_bytes_after_finished_writing_errors(self):
478
# calling accept_bytes after calling finished_writing raises
457
# calling accept_bytes after calling finished_writing raises
479
458
# WritingCompleted to prevent bad assumptions on stream environments
480
459
# breaking the needs of message-based environments.
481
460
output = StringIO()
537
516
None, None, 'base')
538
517
request = medium.SmartClientStreamMediumRequest(client_medium)
539
518
self.assertRaises(errors.WritingNotComplete, request.finished_reading)
541
520
def test_read_bytes(self):
542
521
# read bytes should invoke _read_bytes on the stream medium.
543
522
# we test this by using the SimplePipes medium - the most trivial one
544
# and checking that the data is supplied. Its possible that a
523
# and checking that the data is supplied. Its possible that a
545
524
# faulty implementation could poke at the pipe variables them selves,
546
525
# but we trust that this will be caught as it will break the integration
678
657
# wire-to-wire, using the whole stack, with a UTF-8 filename.
679
658
transport = memory.MemoryTransport('memory:///')
680
659
utf8_filename = u'testfile\N{INTERROBANG}'.encode('utf-8')
660
# VFS requests use filenames, not raw UTF-8.
661
hpss_path = urlutils.escape(utf8_filename)
681
662
transport.put_bytes(utf8_filename, 'contents\nof\nfile\n')
682
to_server = StringIO('get\001' + utf8_filename + '\n')
663
to_server = StringIO('get\001' + hpss_path + '\n')
683
664
from_server = StringIO()
684
665
server = medium.SmartServerPipeStreamMedium(
685
666
to_server, from_server, transport)
815
796
self.assertEqual('', from_server.getvalue())
816
797
self.assertEqual(sample_request_bytes, second_protocol.accepted_bytes)
817
798
self.assertFalse(server.finished)
819
800
def test_socket_stream_with_two_requests(self):
820
801
# If two requests are read in one go, then two calls to
821
802
# _serve_one_request should still process both of them as if they had
822
# been received seperately.
803
# been received separately.
823
804
sample_request_bytes = 'command\n'
824
805
server_sock, client_sock = self.portable_socket_pair()
825
806
server = medium.SmartServerSocketStreamMedium(
992
973
smart_server.start_background_thread('-' + self.id())
994
975
transport = remote.RemoteTCPTransport(smart_server.get_url())
996
transport.get('something')
997
except errors.TransportError, e:
998
self.assertContainsRe(str(e), 'some random exception')
1000
self.fail("get did not raise expected error")
976
err = self.assertRaises(errors.UnknownErrorFromSmartServer,
977
transport.get, 'something')
978
self.assertContainsRe(str(err), 'some random exception')
1001
979
transport.disconnect()
1003
981
smart_server.stop_background_thread()
1009
987
All of these tests are run with a server running on another thread serving
1010
988
a MemoryTransport, and a connection to it already open.
1012
the server is obtained by calling self.setUpServer(readonly=False).
990
the server is obtained by calling self.start_server(readonly=False).
1015
def setUpServer(self, readonly=False, backing_transport=None):
993
def start_server(self, readonly=False, backing_transport=None):
1016
994
"""Setup the server.
1018
996
:param readonly: Create a readonly server.
998
# NB: Tests using this fall into two categories: tests of the server,
999
# tests wanting a server. The latter should be updated to use
1000
# self.vfs_transport_factory etc.
1020
1001
if not backing_transport:
1021
self.backing_transport = memory.MemoryTransport()
1002
mem_server = memory.MemoryServer()
1003
mem_server.start_server()
1004
self.addCleanup(mem_server.stop_server)
1005
self.permit_url(mem_server.get_url())
1006
self.backing_transport = transport.get_transport(
1007
mem_server.get_url())
1023
1009
self.backing_transport = backing_transport
1025
1011
self.real_backing_transport = self.backing_transport
1026
self.backing_transport = get_transport("readonly+" + self.backing_transport.abspath('.'))
1012
self.backing_transport = transport.get_transport(
1013
"readonly+" + self.backing_transport.abspath('.'))
1027
1014
self.server = server.SmartTCPServer(self.backing_transport)
1028
1015
self.server.start_background_thread('-' + self.id())
1029
1016
self.transport = remote.RemoteTCPTransport(self.server.get_url())
1030
1017
self.addCleanup(self.tearDownServer)
1018
self.permit_url(self.server.get_url())
1032
1020
def tearDownServer(self):
1033
1021
if getattr(self, 'transport', None):
1091
1080
# asked for by the client. This gives meaningful and unsurprising errors
1093
1082
self._captureVar('BZR_NO_SMART_VFS', None)
1095
self.transport.get('not%20a%20file')
1096
except errors.NoSuchFile, e:
1097
self.assertEqual('not%20a%20file', e.path)
1099
self.fail("get did not raise expected error")
1083
err = self.assertRaises(
1084
errors.NoSuchFile, self.transport.get, 'not%20a%20file')
1085
self.assertSubset([err.path], ['not%20a%20file', './not%20a%20file'])
1101
1087
def test_simple_clone_conn(self):
1102
1088
"""Test that cloning reuses the same connection."""
1235
1221
Note: these tests are rudimentary versions of the command object tests in
1239
1225
def test_hello(self):
1240
1226
cmd = _mod_request.HelloRequest(None, '/')
1241
1227
response = cmd.execute()
1242
1228
self.assertEqual(('ok', '2'), response.args)
1243
1229
self.assertEqual(None, response.body)
1245
1231
def test_get_bundle(self):
1246
1232
from bzrlib.bundle import serializer
1247
1233
wt = self.make_branch_and_tree('.')
1248
1234
self.build_tree_contents([('hello', 'hello world')])
1249
1235
wt.add('hello')
1250
1236
rev_id = wt.commit('add hello')
1252
1238
cmd = _mod_request.GetBundleRequest(self.get_transport(), '/')
1253
1239
response = cmd.execute('.', rev_id)
1254
1240
bundle = serializer.read_bundle(StringIO(response.body))
1276
1262
def test_hello(self):
1277
1263
handler = self.build_handler(None)
1278
handler.dispatch_command('hello', ())
1264
handler.args_received(('hello',))
1279
1265
self.assertEqual(('ok', '2'), handler.response.args)
1280
1266
self.assertEqual(None, handler.response.body)
1282
1268
def test_disable_vfs_handler_classes_via_environment(self):
1283
1269
# VFS handler classes will raise an error from "execute" if
1284
1270
# BZR_NO_SMART_VFS is set.
1295
1281
"""The response for a read-only error is ('ReadOnlyError')."""
1296
1282
handler = self.build_handler(self.get_readonly_transport())
1297
1283
# send a mkdir for foo, with no explicit mode - should fail.
1298
handler.dispatch_command('mkdir', ('foo', ''))
1284
handler.args_received(('mkdir', 'foo', ''))
1299
1285
# and the failure should be an explicit ReadOnlyError
1300
1286
self.assertEqual(("ReadOnlyError", ), handler.response.args)
1301
1287
# XXX: TODO: test that other TransportNotPossible errors are
1306
1292
def test_hello_has_finished_body_on_dispatch(self):
1307
1293
"""The 'hello' command should set finished_reading."""
1308
1294
handler = self.build_handler(None)
1309
handler.dispatch_command('hello', ())
1295
handler.args_received(('hello',))
1310
1296
self.assertTrue(handler.finished_reading)
1311
1297
self.assertNotEqual(None, handler.response)
1313
1299
def test_put_bytes_non_atomic(self):
1314
1300
"""'put_...' should set finished_reading after reading the bytes."""
1315
1301
handler = self.build_handler(self.get_transport())
1316
handler.dispatch_command('put_non_atomic', ('a-file', '', 'F', ''))
1302
handler.args_received(('put_non_atomic', 'a-file', '', 'F', ''))
1317
1303
self.assertFalse(handler.finished_reading)
1318
1304
handler.accept_body('1234')
1319
1305
self.assertFalse(handler.finished_reading)
1322
1308
self.assertTrue(handler.finished_reading)
1323
1309
self.assertEqual(('ok', ), handler.response.args)
1324
1310
self.assertEqual(None, handler.response.body)
1326
1312
def test_readv_accept_body(self):
1327
1313
"""'readv' should set finished_reading after reading offsets."""
1328
1314
self.build_tree(['a-file'])
1329
1315
handler = self.build_handler(self.get_readonly_transport())
1330
handler.dispatch_command('readv', ('a-file', ))
1316
handler.args_received(('readv', 'a-file'))
1331
1317
self.assertFalse(handler.finished_reading)
1332
1318
handler.accept_body('2,')
1333
1319
self.assertFalse(handler.finished_reading)
1355
1341
class RemoteTransportRegistration(tests.TestCase):
1357
1343
def test_registration(self):
1358
t = get_transport('bzr+ssh://example.com/path')
1344
t = transport.get_transport('bzr+ssh://example.com/path')
1359
1345
self.assertIsInstance(t, remote.RemoteSSHTransport)
1360
1346
self.assertEqual('example.com', t._host)
1362
1348
def test_bzr_https(self):
1363
1349
# https://bugs.launchpad.net/bzr/+bug/128456
1364
t = get_transport('bzr+https://example.com/path')
1350
t = transport.get_transport('bzr+https://example.com/path')
1365
1351
self.assertIsInstance(t, remote.RemoteHTTPTransport)
1366
1352
self.assertStartsWith(
1367
1353
t._http_transport.base,
1400
1386
client_medium = medium.SmartSimplePipesClientMedium(None, None, 'base')
1401
1387
transport = remote.RemoteTransport(
1402
1388
'bzr://localhost/', medium=client_medium)
1389
err = errors.ErrorFromSmartServer(("ReadOnlyError", ))
1403
1390
self.assertRaises(errors.TransportNotPossible,
1404
transport._translate_error, ("ReadOnlyError", ))
1391
transport._translate_error, err)
1407
1394
class TestSmartProtocol(tests.TestCase):
1494
1481
smart_protocol._has_dispatched = True
1495
1482
smart_protocol.request = _mod_request.SmartServerRequestHandler(
1496
1483
None, _mod_request.request_handlers, '/')
1497
class FakeCommand(object):
1498
def do_body(cmd, body_bytes):
1484
class FakeCommand(_mod_request.SmartServerRequest):
1485
def do_body(self_cmd, body_bytes):
1499
1486
self.end_received = True
1500
1487
self.assertEqual('abcdefg', body_bytes)
1501
1488
return _mod_request.SuccessfulSmartServerResponse(('ok', ))
1502
smart_protocol.request._command = FakeCommand()
1489
smart_protocol.request._command = FakeCommand(None)
1503
1490
# Call accept_bytes to make sure that internal state like _body_decoder
1504
1491
# is initialised. This test should probably be given a clearer
1505
1492
# interface to work with that will not cause this inconsistency.
1533
1520
ex = self.assertRaises(errors.ConnectionReset,
1534
1521
response_handler.read_response_tuple)
1535
1522
self.assertEqual("Connection closed: "
1536
"please check connectivity and permissions "
1537
"(and try -Dhpss if further diagnosis is required)", str(ex))
1523
"Unexpected end of message. Please check connectivity "
1524
"and permissions, and report a bug if problems persist. ",
1539
1527
def test_server_offset_serialisation(self):
1540
1528
"""The Smart protocol serialises offsets as a comma and \n string.
1660
1648
def test_query_version(self):
1661
1649
"""query_version on a SmartClientProtocolOne should return a number.
1663
1651
The protocol provides the query_version because the domain level clients
1664
1652
may all need to be able to probe for capabilities.
1666
1654
# What we really want to test here is that SmartClientProtocolOne calls
1667
1655
# accept_bytes(tuple_based_encoding_of_hello) and reads and parses the
1668
# response of tuple-encoded (ok, 1). Also, seperately we should test
1656
# response of tuple-encoded (ok, 1). Also, separately we should test
1669
1657
# the error if the response is a non-understood version.
1670
1658
input = StringIO('ok\x012\n')
1671
1659
output = StringIO()
1931
1919
def test_query_version(self):
1932
1920
"""query_version on a SmartClientProtocolTwo should return a number.
1934
1922
The protocol provides the query_version because the domain level clients
1935
1923
may all need to be able to probe for capabilities.
1937
1925
# What we really want to test here is that SmartClientProtocolTwo calls
1938
1926
# accept_bytes(tuple_based_encoding_of_hello) and reads and parses the
1939
# response of tuple-encoded (ok, 1). Also, seperately we should test
1927
# response of tuple-encoded (ok, 1). Also, separately we should test
1940
1928
# the error if the response is a non-understood version.
1941
1929
input = StringIO(self.response_marker + 'success\nok\x012\n')
1942
1930
output = StringIO()
2330
2310
self.assertEqual(0, smart_protocol.next_read_size())
2331
2311
self.assertEqual('', smart_protocol.unused_data)
2313
def test_repeated_excess(self):
2314
"""Repeated calls to accept_bytes after the message end has been parsed
2315
accumlates the bytes in the unused_data attribute.
2318
headers = '\0\0\0\x02de' # length-prefixed, bencoded empty dict
2320
request_bytes = headers + end
2321
smart_protocol = self.server_protocol_class(LoggingMessageHandler())
2322
smart_protocol.accept_bytes(request_bytes)
2323
self.assertEqual('', smart_protocol.unused_data)
2324
smart_protocol.accept_bytes('aaa')
2325
self.assertEqual('aaa', smart_protocol.unused_data)
2326
smart_protocol.accept_bytes('bbb')
2327
self.assertEqual('aaabbb', smart_protocol.unused_data)
2328
self.assertEqual(0, smart_protocol.next_read_size())
2333
2330
def make_protocol_expecting_message_part(self):
2334
2331
headers = '\0\0\0\x02de' # length-prefixed, bencoded empty dict
2335
2332
message_handler = LoggingMessageHandler()
2397
2394
protocol_decoder, medium_request)
2398
2395
return response_handler
2400
def test_body_stream_interrupted_by_error(self):
2401
interrupted_body_stream = (
2402
'oS' # successful response
2403
's\0\0\0\x02le' # empty args
2404
'b\0\0\0\x09chunk one' # first chunk
2405
'b\0\0\0\x09chunk two' # second chunk
2407
's\0\0\0\x0el5:error3:abce' # bencoded error
2397
def test_interrupted_by_error(self):
2410
2398
response_handler = self.make_response_handler(interrupted_body_stream)
2411
2399
stream = response_handler.read_streamed_body()
2412
self.assertEqual('chunk one', stream.next())
2413
self.assertEqual('chunk two', stream.next())
2400
self.assertEqual('aaa', stream.next())
2401
self.assertEqual('bbb', stream.next())
2414
2402
exc = self.assertRaises(errors.ErrorFromSmartServer, stream.next)
2415
self.assertEqual(('error', 'abc'), exc.error_tuple)
2403
self.assertEqual(('error', 'Boom!'), exc.error_tuple)
2417
def test_body_stream_interrupted_by_connection_lost(self):
2405
def test_interrupted_by_connection_lost(self):
2418
2406
interrupted_body_stream = (
2419
2407
'oS' # successful response
2420
2408
's\0\0\0\x02le' # empty args
2432
2420
self.assertRaises(
2433
2421
errors.ConnectionReset, response_handler.read_body_bytes)
2423
def test_multiple_bytes_parts(self):
2424
multiple_bytes_parts = (
2425
'oS' # successful response
2426
's\0\0\0\x02le' # empty args
2427
'b\0\0\0\x0bSome bytes\n' # some bytes
2428
'b\0\0\0\x0aMore bytes' # more bytes
2431
response_handler = self.make_response_handler(multiple_bytes_parts)
2433
'Some bytes\nMore bytes', response_handler.read_body_bytes())
2434
response_handler = self.make_response_handler(multiple_bytes_parts)
2436
['Some bytes\n', 'More bytes'],
2437
list(response_handler.read_streamed_body()))
2440
class FakeResponder(object):
2442
response_sent = False
2444
def send_error(self, exc):
2447
def send_response(self, response):
2451
class TestConventionalRequestHandlerBodyStream(tests.TestCase):
2452
"""Tests for ConventionalRequestHandler's handling of request bodies."""
2454
def make_request_handler(self, request_bytes):
2455
"""Make a ConventionalRequestHandler for the given bytes using test
2456
doubles for the request_handler and the responder.
2458
from bzrlib.smart.message import ConventionalRequestHandler
2459
request_handler = InstrumentedRequestHandler()
2460
request_handler.response = _mod_request.SuccessfulSmartServerResponse(('arg', 'arg'))
2461
responder = FakeResponder()
2462
message_handler = ConventionalRequestHandler(request_handler, responder)
2463
protocol_decoder = protocol.ProtocolThreeDecoder(message_handler)
2464
# put decoder in desired state (waiting for message parts)
2465
protocol_decoder.state_accept = protocol_decoder._state_accept_expecting_message_part
2466
protocol_decoder.accept_bytes(request_bytes)
2467
return request_handler
2469
def test_multiple_bytes_parts(self):
2470
"""Each bytes part triggers a call to the request_handler's
2473
multiple_bytes_parts = (
2474
's\0\0\0\x07l3:fooe' # args
2475
'b\0\0\0\x0bSome bytes\n' # some bytes
2476
'b\0\0\0\x0aMore bytes' # more bytes
2479
request_handler = self.make_request_handler(multiple_bytes_parts)
2480
accept_body_calls = [
2481
call_info[1] for call_info in request_handler.calls
2482
if call_info[0] == 'accept_body']
2484
['Some bytes\n', 'More bytes'], accept_body_calls)
2486
def test_error_flag_after_body(self):
2488
's\0\0\0\x07l3:fooe' # request args
2489
'b\0\0\0\x0bSome bytes\n' # some bytes
2490
'b\0\0\0\x0aMore bytes' # more bytes
2492
's\0\0\0\x07l3:bare' # error args
2495
request_handler = self.make_request_handler(body_then_error)
2497
[('post_body_error_received', ('bar',)), ('end_received',)],
2498
request_handler.calls[-2:])
2436
2501
class TestMessageHandlerErrors(tests.TestCase):
2437
2502
"""Tests for v3 that unrecognised (but well-formed) requests/responses are
2482
2547
def __init__(self):
2483
2548
self.calls = []
2485
def body_chunk_received(self, chunk_bytes):
2486
self.calls.append(('body_chunk_received', chunk_bytes))
2549
self.finished_reading = False
2488
2551
def no_body_received(self):
2489
2552
self.calls.append(('no_body_received',))
2491
def prefixed_body_received(self, body_bytes):
2492
self.calls.append(('prefixed_body_received', body_bytes))
2494
2554
def end_received(self):
2495
2555
self.calls.append(('end_received',))
2556
self.finished_reading = True
2558
def args_received(self, args):
2559
self.calls.append(('args_received', args))
2561
def accept_body(self, bytes):
2562
self.calls.append(('accept_body', bytes))
2564
def end_of_body(self):
2565
self.calls.append(('end_of_body',))
2566
self.finished_reading = True
2568
def post_body_error_received(self, error_args):
2569
self.calls.append(('post_body_error_received', error_args))
2498
2572
class StubRequest(object):
2534
2608
# The message handler has been invoked with all the parts of the
2535
2609
# trivial response: empty headers, status byte, no args, end.
2536
2610
self.assertEqual(
2537
[('headers', {}), ('byte', 'S'), ('structure', []), ('end',)],
2611
[('headers', {}), ('byte', 'S'), ('structure', ()), ('end',)],
2538
2612
response_handler.event_log)
2540
2614
def test_incomplete_message(self):
2648
2722
self.assertEqual(
2649
2723
['accept_bytes', 'finished_writing'], medium_request.calls)
2725
def test_call_with_body_stream_smoke_test(self):
2726
"""A smoke test for ProtocolThreeRequester.call_with_body_stream.
2728
This test checks that a particular simple invocation of
2729
call_with_body_stream emits the correct bytes for that invocation.
2731
requester, output = self.make_client_encoder_and_output()
2732
requester.set_headers({'header name': 'header value'})
2733
stream = ['chunk 1', 'chunk two']
2734
requester.call_with_body_stream(('one arg',), stream)
2736
'bzr message 3 (bzr 1.6)\n' # protocol version
2737
'\x00\x00\x00\x1fd11:header name12:header valuee' # headers
2738
's\x00\x00\x00\x0bl7:one arge' # args
2739
'b\x00\x00\x00\x07chunk 1' # a prefixed body chunk
2740
'b\x00\x00\x00\x09chunk two' # a prefixed body chunk
2744
def test_call_with_body_stream_empty_stream(self):
2745
"""call_with_body_stream with an empty stream."""
2746
requester, output = self.make_client_encoder_and_output()
2747
requester.set_headers({})
2749
requester.call_with_body_stream(('one arg',), stream)
2751
'bzr message 3 (bzr 1.6)\n' # protocol version
2752
'\x00\x00\x00\x02de' # headers
2753
's\x00\x00\x00\x0bl7:one arge' # args
2758
def test_call_with_body_stream_error(self):
2759
"""call_with_body_stream will abort the streamed body with an
2760
error if the stream raises an error during iteration.
2762
The resulting request will still be a complete message.
2764
requester, output = self.make_client_encoder_and_output()
2765
requester.set_headers({})
2766
def stream_that_fails():
2769
raise Exception('Boom!')
2770
self.assertRaises(Exception, requester.call_with_body_stream,
2771
('one arg',), stream_that_fails())
2773
'bzr message 3 (bzr 1.6)\n' # protocol version
2774
'\x00\x00\x00\x02de' # headers
2775
's\x00\x00\x00\x0bl7:one arge' # args
2776
'b\x00\x00\x00\x03aaa' # body
2777
'b\x00\x00\x00\x03bbb' # more body
2779
's\x00\x00\x00\x09l5:errore' # error args: ('error',)
2652
2784
class StubMediumRequest(object):
2653
2785
"""A stub medium request that tracks the number of times accept_bytes is
2665
2797
self.calls.append('finished_writing')
2800
interrupted_body_stream = (
2801
'oS' # status flag (success)
2802
's\x00\x00\x00\x08l4:argse' # args struct ('args,')
2803
'b\x00\x00\x00\x03aaa' # body part ('aaa')
2804
'b\x00\x00\x00\x03bbb' # body part ('bbb')
2805
'oE' # status flag (error)
2806
's\x00\x00\x00\x10l5:error5:Boom!e' # err struct ('error', 'Boom!')
2668
2811
class TestResponseEncodingProtocolThree(tests.TestCase):
2670
2813
def make_response_encoder(self):
2686
2829
# end of message
2832
def test_send_broken_body_stream(self):
2833
encoder, out_stream = self.make_response_encoder()
2834
encoder._headers = {}
2835
def stream_that_fails():
2838
raise Exception('Boom!')
2839
response = _mod_request.SuccessfulSmartServerResponse(
2840
('args',), body_stream=stream_that_fails())
2841
encoder.send_response(response)
2842
expected_response = (
2843
'bzr message 3 (bzr 1.6)\n' # protocol marker
2844
'\x00\x00\x00\x02de' # headers dict (empty)
2845
+ interrupted_body_stream)
2846
self.assertEqual(expected_response, out_stream.getvalue())
2690
2849
class TestResponseEncoderBufferingProtocolThree(tests.TestCase):
2691
2850
"""Tests for buffering of responses.
2724
2884
self.responder.send_response(response)
2725
2885
self.assertWriteCount(1)
2727
def test_send_response_with_body_stream_writes_once_per_chunk(self):
2728
"""A normal response with a stream body is written to the medium
2729
writes to the medium once per chunk.
2887
def test_send_response_with_body_stream_buffers_writes(self):
2888
"""A normal response with a stream body writes to the medium once."""
2731
2889
# Construct a response with stream with 2 chunks in it.
2732
2890
response = _mod_request.SuccessfulSmartServerResponse(
2733
2891
('arg', 'arg'), body_stream=['chunk1', 'chunk2'])
2734
2892
self.responder.send_response(response)
2735
# We will write 3 times: exactly once for each chunk, plus a final
2736
# write to end the response.
2737
self.assertWriteCount(3)
2893
# We will write just once, despite the multiple chunks, due to
2895
self.assertWriteCount(1)
2897
def test_send_response_with_body_stream_flushes_buffers_sometimes(self):
2898
"""When there are many bytes (>1MB), multiple writes will occur rather
2899
than buffering indefinitely.
2901
# Construct a response with stream with ~1.5MB in it. This should
2902
# trigger 2 writes, but not 3
2903
onekib = '12345678' * 128
2904
body_stream = [onekib] * (1024 + 512)
2905
response = _mod_request.SuccessfulSmartServerResponse(
2906
('arg', 'arg'), body_stream=body_stream)
2907
self.responder.send_response(response)
2908
self.assertWriteCount(2)
2740
2911
class TestSmartClientUnicode(tests.TestCase):
3362
3533
# requests for child URLs of that to the original URL. i.e., we want to
3363
3534
# POST to "bzr+http://host/foo/.bzr/smart" and never something like
3364
3535
# "bzr+http://host/foo/.bzr/branch/.bzr/smart". So, a cloned
3365
# RemoteHTTPTransport remembers the initial URL, and adjusts the relpaths
3366
# it sends in smart requests accordingly.
3536
# RemoteHTTPTransport remembers the initial URL, and adjusts the
3537
# relpaths it sends in smart requests accordingly.
3367
3538
base_transport = remote.RemoteHTTPTransport('bzr+http://host/path')
3368
3539
new_transport = base_transport.clone('child_dir')
3369
3540
self.assertEqual(base_transport._http_transport,
3384
3555
# still work correctly.
3385
3556
base_transport = remote.RemoteHTTPTransport('bzr+http://host/%7Ea/b')
3386
3557
new_transport = base_transport.clone('c')
3387
self.assertEqual('bzr+http://host/%7Ea/b/c/', new_transport.base)
3558
self.assertEqual('bzr+http://host/~a/b/c/', new_transport.base)
3388
3559
self.assertEqual(
3390
3561
new_transport._client.remote_path_from_transport(new_transport))
3393
# TODO: Client feature that does get_bundle and then installs that into a
3394
# branch; this can be used in place of the regular pull/fetch operation when
3395
# coming from a smart server.
3397
# TODO: Eventually, want to do a 'branch' command by fetching the whole
3398
# history as one big bundle. How?
3400
# The branch command does 'br_from.sprout', which tries to preserve the same
3401
# format. We don't necessarily even want that.
3403
# It might be simpler to handle cmd_pull first, which does a simpler fetch()
3404
# operation from one branch into another. It already has some code for
3405
# pulling from a bundle, which it does by trying to see if the destination is
3406
# a bundle file. So it seems the logic for pull ought to be:
3408
# - if it's a smart server, get a bundle from there and install that
3409
# - if it's a bundle, install that
3410
# - if it's a branch, pull from there
3412
# Getting a bundle from a smart server is a bit different from reading a
3413
# bundle from a URL:
3415
# - we can reasonably remember the URL we last read from
3416
# - you can specify a revision number to pull, and we need to pass it across
3417
# to the server as a limit on what will be requested
3419
# TODO: Given a URL, determine whether it is a smart server or not (or perhaps
3420
# otherwise whether it's a bundle?) Should this be a property or method of
3421
# the transport? For the ssh protocol, we always know it's a smart server.
3422
# For http, we potentially need to probe. But if we're explicitly given
3423
# bzr+http:// then we can skip that for now.
3563
def test__redirect_to(self):
3564
t = remote.RemoteHTTPTransport('bzr+http://www.example.com/foo')
3565
r = t._redirected_to('http://www.example.com/foo',
3566
'http://www.example.com/bar')
3567
self.assertEquals(type(r), type(t))
3569
def test__redirect_sibling_protocol(self):
3570
t = remote.RemoteHTTPTransport('bzr+http://www.example.com/foo')
3571
r = t._redirected_to('http://www.example.com/foo',
3572
'https://www.example.com/bar')
3573
self.assertEquals(type(r), type(t))
3574
self.assertStartsWith(r.base, 'bzr+https')
3576
def test__redirect_to_with_user(self):
3577
t = remote.RemoteHTTPTransport('bzr+http://joe@www.example.com/foo')
3578
r = t._redirected_to('http://www.example.com/foo',
3579
'http://www.example.com/bar')
3580
self.assertEquals(type(r), type(t))
3581
self.assertEquals('joe', t._user)
3582
self.assertEquals(t._user, r._user)
3584
def test_redirected_to_same_host_different_protocol(self):
3585
t = remote.RemoteHTTPTransport('bzr+http://joe@www.example.com/foo')
3586
r = t._redirected_to('http://www.example.com/foo',
3587
'ftp://www.example.com/foo')
3588
self.assertNotEquals(type(r), type(t))