69
69
def __init__(self, vendor):
70
70
self.vendor = vendor
73
73
self.vendor.calls.append(('close', ))
75
75
def get_filelike_channels(self):
76
76
return self.vendor.read_from, self.vendor.write_to
79
79
class _InvalidHostnameFeature(tests.Feature):
80
80
"""Does 'non_existent.invalid' fail to resolve?
82
82
RFC 2606 states that .invalid is reserved for invalid domain names, and
83
83
also underscores are not a valid character in domain names. Despite this,
84
84
it's possible a badly misconfigured name server might decide to always
132
132
t = threading.Thread(target=_receive_bytes_on_server)
136
136
def test_construct_smart_simple_pipes_client_medium(self):
137
137
# the SimplePipes client medium takes two pipes:
138
138
# readable pipe, writeable pipe.
139
139
# Constructing one should just save these and do nothing.
140
140
# We test this by passing in None.
141
141
client_medium = medium.SmartSimplePipesClientMedium(None, None, None)
143
143
def test_simple_pipes_client_request_type(self):
144
144
# SimplePipesClient should use SmartClientStreamMediumRequest's.
145
145
client_medium = medium.SmartSimplePipesClientMedium(None, None, None)
149
149
def test_simple_pipes_client_get_concurrent_requests(self):
150
150
# the simple_pipes client does not support pipelined requests:
151
# but it does support serial requests: we construct one after
151
# but it does support serial requests: we construct one after
152
152
# another is finished. This is a smoke test testing the integration
153
153
# of the SmartClientStreamMediumRequest and the SmartClientStreamMedium
154
154
# classes - as the sibling classes share this logic, they do not have
212
212
self.assertEqual('abc', client_medium.read_bytes(3))
213
213
client_medium.disconnect()
214
214
self.assertEqual('def', client_medium.read_bytes(3))
216
216
def test_simple_pipes_client_supports__flush(self):
217
# invoking _flush on a SimplePipesClient should flush the output
217
# invoking _flush on a SimplePipesClient should flush the output
218
218
# pipe. We test this by creating an output pipe that records
219
219
# flush calls made to it.
220
220
from StringIO import StringIO # get regular StringIO
262
262
'a hostname', 'a port',
263
263
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes'])],
266
def test_ssh_client_changes_command_when_BZR_REMOTE_PATH_is_set(self):
267
# The only thing that initiates a connection from the medium is giving
270
vendor = StringIOSSHVendor(StringIO(), output)
271
orig_bzr_remote_path = os.environ.get('BZR_REMOTE_PATH')
272
def cleanup_environ():
273
osutils.set_or_unset_env('BZR_REMOTE_PATH', orig_bzr_remote_path)
274
self.addCleanup(cleanup_environ)
275
os.environ['BZR_REMOTE_PATH'] = 'fugly'
276
client_medium = self.callDeprecated(
277
['bzr_remote_path is required as of bzr 0.92'],
278
medium.SmartSSHClientMedium, 'a hostname', 'a port', 'a username',
279
'a password', 'base', vendor)
280
client_medium._accept_bytes('abc')
281
self.assertEqual('abc', output.getvalue())
282
self.assertEqual([('connect_ssh', 'a username', 'a password',
283
'a hostname', 'a port',
284
['fugly', 'serve', '--inet', '--directory=/', '--allow-writes'])],
287
266
def test_ssh_client_changes_command_when_bzr_remote_path_passed(self):
288
267
# The only thing that initiates a connection from the medium is giving
469
448
class TestSmartClientStreamMediumRequest(tests.TestCase):
470
449
"""Tests the for SmartClientStreamMediumRequest.
472
SmartClientStreamMediumRequest is a helper for the three stream based
451
SmartClientStreamMediumRequest is a helper for the three stream based
473
452
mediums: TCP, SSH, SimplePipes, so we only test it once, and then test that
474
453
those three mediums implement the interface it expects.
477
456
def test_accept_bytes_after_finished_writing_errors(self):
478
# calling accept_bytes after calling finished_writing raises
457
# calling accept_bytes after calling finished_writing raises
479
458
# WritingCompleted to prevent bad assumptions on stream environments
480
459
# breaking the needs of message-based environments.
481
460
output = StringIO()
537
516
None, None, 'base')
538
517
request = medium.SmartClientStreamMediumRequest(client_medium)
539
518
self.assertRaises(errors.WritingNotComplete, request.finished_reading)
541
520
def test_read_bytes(self):
542
521
# read bytes should invoke _read_bytes on the stream medium.
543
522
# we test this by using the SimplePipes medium - the most trivial one
544
# and checking that the data is supplied. Its possible that a
523
# and checking that the data is supplied. Its possible that a
545
524
# faulty implementation could poke at the pipe variables them selves,
546
525
# but we trust that this will be caught as it will break the integration
1229
1208
Note: these tests are rudimentary versions of the command object tests in
1233
1212
def test_hello(self):
1234
1213
cmd = _mod_request.HelloRequest(None, '/')
1235
1214
response = cmd.execute()
1236
1215
self.assertEqual(('ok', '2'), response.args)
1237
1216
self.assertEqual(None, response.body)
1239
1218
def test_get_bundle(self):
1240
1219
from bzrlib.bundle import serializer
1241
1220
wt = self.make_branch_and_tree('.')
1242
1221
self.build_tree_contents([('hello', 'hello world')])
1243
1222
wt.add('hello')
1244
1223
rev_id = wt.commit('add hello')
1246
1225
cmd = _mod_request.GetBundleRequest(self.get_transport(), '/')
1247
1226
response = cmd.execute('.', rev_id)
1248
1227
bundle = serializer.read_bundle(StringIO(response.body))
1272
1251
handler.dispatch_command('hello', ())
1273
1252
self.assertEqual(('ok', '2'), handler.response.args)
1274
1253
self.assertEqual(None, handler.response.body)
1276
1255
def test_disable_vfs_handler_classes_via_environment(self):
1277
1256
# VFS handler classes will raise an error from "execute" if
1278
1257
# BZR_NO_SMART_VFS is set.
1468
1447
def assertOffsetSerialisation(self, expected_offsets, expected_serialised,
1470
1449
"""Check that smart (de)serialises offsets as expected.
1472
1451
We check both serialisation and deserialisation at the same time
1473
1452
to ensure that the round tripping cannot skew: both directions should
1474
1453
be as expected.
1476
1455
:param expected_offsets: a readv offset list.
1477
1456
:param expected_seralised: an expected serial form of the offsets.
1489
1468
smart_protocol._has_dispatched = True
1490
1469
smart_protocol.request = _mod_request.SmartServerRequestHandler(
1491
1470
None, _mod_request.request_handlers, '/')
1492
class FakeCommand(object):
1493
def do_body(cmd, body_bytes):
1471
class FakeCommand(_mod_request.SmartServerRequest):
1472
def do_body(self_cmd, body_bytes):
1494
1473
self.end_received = True
1495
1474
self.assertEqual('abcdefg', body_bytes)
1496
1475
return _mod_request.SuccessfulSmartServerResponse(('ok', ))
1497
smart_protocol.request._command = FakeCommand()
1476
smart_protocol.request._command = FakeCommand(None)
1498
1477
# Call accept_bytes to make sure that internal state like _body_decoder
1499
1478
# is initialised. This test should probably be given a clearer
1500
1479
# interface to work with that will not cause this inconsistency.
1528
1507
ex = self.assertRaises(errors.ConnectionReset,
1529
1508
response_handler.read_response_tuple)
1530
1509
self.assertEqual("Connection closed: "
1531
"please check connectivity and permissions "
1532
"(and try -Dhpss if further diagnosis is required)", str(ex))
1510
"please check connectivity and permissions ",
1534
1513
def test_server_offset_serialisation(self):
1535
1514
"""The Smart protocol serialises offsets as a comma and \n string.
2392
2363
protocol_decoder, medium_request)
2393
2364
return response_handler
2395
def test_body_stream_interrupted_by_error(self):
2396
interrupted_body_stream = (
2397
'oS' # successful response
2398
's\0\0\0\x02le' # empty args
2399
'b\0\0\0\x09chunk one' # first chunk
2400
'b\0\0\0\x09chunk two' # second chunk
2402
's\0\0\0\x0el5:error3:abce' # bencoded error
2366
def test_interrupted_by_error(self):
2405
2367
response_handler = self.make_response_handler(interrupted_body_stream)
2406
2368
stream = response_handler.read_streamed_body()
2407
self.assertEqual('chunk one', stream.next())
2408
self.assertEqual('chunk two', stream.next())
2369
self.assertEqual('aaa', stream.next())
2370
self.assertEqual('bbb', stream.next())
2409
2371
exc = self.assertRaises(errors.ErrorFromSmartServer, stream.next)
2410
self.assertEqual(('error', 'abc'), exc.error_tuple)
2372
self.assertEqual(('error', 'Boom!'), exc.error_tuple)
2412
def test_body_stream_interrupted_by_connection_lost(self):
2374
def test_interrupted_by_connection_lost(self):
2413
2375
interrupted_body_stream = (
2414
2376
'oS' # successful response
2415
2377
's\0\0\0\x02le' # empty args
2427
2389
self.assertRaises(
2428
2390
errors.ConnectionReset, response_handler.read_body_bytes)
2392
def test_multiple_bytes_parts(self):
2393
multiple_bytes_parts = (
2394
'oS' # successful response
2395
's\0\0\0\x02le' # empty args
2396
'b\0\0\0\x0bSome bytes\n' # some bytes
2397
'b\0\0\0\x0aMore bytes' # more bytes
2400
response_handler = self.make_response_handler(multiple_bytes_parts)
2402
'Some bytes\nMore bytes', response_handler.read_body_bytes())
2403
response_handler = self.make_response_handler(multiple_bytes_parts)
2405
['Some bytes\n', 'More bytes'],
2406
list(response_handler.read_streamed_body()))
2409
class FakeResponder(object):
2411
response_sent = False
2413
def send_error(self, exc):
2416
def send_response(self, response):
2420
class TestConventionalRequestHandlerBodyStream(tests.TestCase):
2421
"""Tests for ConventionalRequestHandler's handling of request bodies."""
2423
def make_request_handler(self, request_bytes):
2424
"""Make a ConventionalRequestHandler for the given bytes using test
2425
doubles for the request_handler and the responder.
2427
from bzrlib.smart.message import ConventionalRequestHandler
2428
request_handler = InstrumentedRequestHandler()
2429
request_handler.response = _mod_request.SuccessfulSmartServerResponse(('arg', 'arg'))
2430
responder = FakeResponder()
2431
message_handler = ConventionalRequestHandler(request_handler, responder)
2432
protocol_decoder = protocol.ProtocolThreeDecoder(message_handler)
2433
# put decoder in desired state (waiting for message parts)
2434
protocol_decoder.state_accept = protocol_decoder._state_accept_expecting_message_part
2435
protocol_decoder.accept_bytes(request_bytes)
2436
return request_handler
2438
def test_multiple_bytes_parts(self):
2439
"""Each bytes part triggers a call to the request_handler's
2442
multiple_bytes_parts = (
2443
's\0\0\0\x07l3:fooe' # args
2444
'b\0\0\0\x0bSome bytes\n' # some bytes
2445
'b\0\0\0\x0aMore bytes' # more bytes
2448
request_handler = self.make_request_handler(multiple_bytes_parts)
2449
accept_body_calls = [
2450
call_info[1] for call_info in request_handler.calls
2451
if call_info[0] == 'accept_body']
2453
['Some bytes\n', 'More bytes'], accept_body_calls)
2455
def test_error_flag_after_body(self):
2457
's\0\0\0\x07l3:fooe' # request args
2458
'b\0\0\0\x0bSome bytes\n' # some bytes
2459
'b\0\0\0\x0aMore bytes' # more bytes
2461
's\0\0\0\x07l3:bare' # error args
2464
request_handler = self.make_request_handler(body_then_error)
2466
[('post_body_error_received', ('bar',)), ('end_received',)],
2467
request_handler.calls[-2:])
2431
2470
class TestMessageHandlerErrors(tests.TestCase):
2432
2471
"""Tests for v3 that unrecognised (but well-formed) requests/responses are
2477
2516
def __init__(self):
2478
2517
self.calls = []
2480
def body_chunk_received(self, chunk_bytes):
2481
self.calls.append(('body_chunk_received', chunk_bytes))
2518
self.finished_reading = False
2483
2520
def no_body_received(self):
2484
2521
self.calls.append(('no_body_received',))
2486
def prefixed_body_received(self, body_bytes):
2487
self.calls.append(('prefixed_body_received', body_bytes))
2489
2523
def end_received(self):
2490
2524
self.calls.append(('end_received',))
2525
self.finished_reading = True
2527
def dispatch_command(self, cmd, args):
2528
self.calls.append(('dispatch_command', cmd, args))
2530
def accept_body(self, bytes):
2531
self.calls.append(('accept_body', bytes))
2533
def end_of_body(self):
2534
self.calls.append(('end_of_body',))
2535
self.finished_reading = True
2537
def post_body_error_received(self, error_args):
2538
self.calls.append(('post_body_error_received', error_args))
2493
2541
class StubRequest(object):
2529
2577
# The message handler has been invoked with all the parts of the
2530
2578
# trivial response: empty headers, status byte, no args, end.
2531
2579
self.assertEqual(
2532
[('headers', {}), ('byte', 'S'), ('structure', []), ('end',)],
2580
[('headers', {}), ('byte', 'S'), ('structure', ()), ('end',)],
2533
2581
response_handler.event_log)
2535
2583
def test_incomplete_message(self):
2643
2691
self.assertEqual(
2644
2692
['accept_bytes', 'finished_writing'], medium_request.calls)
2694
def test_call_with_body_stream_smoke_test(self):
2695
"""A smoke test for ProtocolThreeRequester.call_with_body_stream.
2697
This test checks that a particular simple invocation of
2698
call_with_body_stream emits the correct bytes for that invocation.
2700
requester, output = self.make_client_encoder_and_output()
2701
requester.set_headers({'header name': 'header value'})
2702
stream = ['chunk 1', 'chunk two']
2703
requester.call_with_body_stream(('one arg',), stream)
2705
'bzr message 3 (bzr 1.6)\n' # protocol version
2706
'\x00\x00\x00\x1fd11:header name12:header valuee' # headers
2707
's\x00\x00\x00\x0bl7:one arge' # args
2708
'b\x00\x00\x00\x07chunk 1' # a prefixed body chunk
2709
'b\x00\x00\x00\x09chunk two' # a prefixed body chunk
2713
def test_call_with_body_stream_empty_stream(self):
2714
"""call_with_body_stream with an empty stream."""
2715
requester, output = self.make_client_encoder_and_output()
2716
requester.set_headers({})
2718
requester.call_with_body_stream(('one arg',), stream)
2720
'bzr message 3 (bzr 1.6)\n' # protocol version
2721
'\x00\x00\x00\x02de' # headers
2722
's\x00\x00\x00\x0bl7:one arge' # args
2727
def test_call_with_body_stream_error(self):
2728
"""call_with_body_stream will abort the streamed body with an
2729
error if the stream raises an error during iteration.
2731
The resulting request will still be a complete message.
2733
requester, output = self.make_client_encoder_and_output()
2734
requester.set_headers({})
2735
def stream_that_fails():
2738
raise Exception('Boom!')
2739
self.assertRaises(Exception, requester.call_with_body_stream,
2740
('one arg',), stream_that_fails())
2742
'bzr message 3 (bzr 1.6)\n' # protocol version
2743
'\x00\x00\x00\x02de' # headers
2744
's\x00\x00\x00\x0bl7:one arge' # args
2745
'b\x00\x00\x00\x03aaa' # body
2746
'b\x00\x00\x00\x03bbb' # more body
2748
's\x00\x00\x00\x09l5:errore' # error args: ('error',)
2647
2753
class StubMediumRequest(object):
2648
2754
"""A stub medium request that tracks the number of times accept_bytes is
2660
2766
self.calls.append('finished_writing')
2769
interrupted_body_stream = (
2770
'oS' # status flag (success)
2771
's\x00\x00\x00\x08l4:argse' # args struct ('args,')
2772
'b\x00\x00\x00\x03aaa' # body part ('aaa')
2773
'b\x00\x00\x00\x03bbb' # body part ('bbb')
2774
'oE' # status flag (error)
2775
's\x00\x00\x00\x10l5:error5:Boom!e' # err struct ('error', 'Boom!')
2663
2780
class TestResponseEncodingProtocolThree(tests.TestCase):
2665
2782
def make_response_encoder(self):
2681
2798
# end of message
2801
def test_send_broken_body_stream(self):
2802
encoder, out_stream = self.make_response_encoder()
2803
encoder._headers = {}
2804
def stream_that_fails():
2807
raise Exception('Boom!')
2808
response = _mod_request.SuccessfulSmartServerResponse(
2809
('args',), body_stream=stream_that_fails())
2810
encoder.send_response(response)
2811
expected_response = (
2812
'bzr message 3 (bzr 1.6)\n' # protocol marker
2813
'\x00\x00\x00\x02de' # headers dict (empty)
2814
+ interrupted_body_stream)
2815
self.assertEqual(expected_response, out_stream.getvalue())
2685
2818
class TestResponseEncoderBufferingProtocolThree(tests.TestCase):
2686
2819
"""Tests for buffering of responses.
2719
2852
self.responder.send_response(response)
2720
2853
self.assertWriteCount(1)
2722
def test_send_response_with_body_stream_writes_once_per_chunk(self):
2723
"""A normal response with a stream body is written to the medium
2724
writes to the medium once per chunk.
2855
def test_send_response_with_body_stream_buffers_writes(self):
2856
"""A normal response with a stream body writes to the medium once."""
2726
2857
# Construct a response with stream with 2 chunks in it.
2727
2858
response = _mod_request.SuccessfulSmartServerResponse(
2728
2859
('arg', 'arg'), body_stream=['chunk1', 'chunk2'])
2729
2860
self.responder.send_response(response)
2730
# We will write 3 times: exactly once for each chunk, plus a final
2731
# write to end the response.
2732
self.assertWriteCount(3)
2861
# We will write just once, despite the multiple chunks, due to
2863
self.assertWriteCount(1)
2865
def test_send_response_with_body_stream_flushes_buffers_sometimes(self):
2866
"""When there are many chunks (>100), multiple writes will occur rather
2867
than buffering indefinitely.
2869
# Construct a response with stream with 40 chunks in it. Every chunk
2870
# triggers 3 buffered writes, so we expect > 100 buffered writes, but <
2872
body_stream = ['chunk %d' % count for count in range(40)]
2873
response = _mod_request.SuccessfulSmartServerResponse(
2874
('arg', 'arg'), body_stream=body_stream)
2875
self.responder.send_response(response)
2876
# The write buffer is flushed every 100 buffered writes, so we expect 2
2878
self.assertWriteCount(2)
2735
2881
class TestSmartClientUnicode(tests.TestCase):
2790
2936
super(MockMedium, self).__init__('dummy base')
2791
2937
self._mock_request = _MockMediumRequest(self)
2792
2938
self._expected_events = []
2794
2940
def expect_request(self, request_bytes, response_bytes,
2795
2941
allow_partial_read=False):
2796
2942
"""Expect 'request_bytes' to be sent, and reply with 'response_bytes'.
3412
3558
self.assertNotEquals(type(r), type(t))
3415
# TODO: Client feature that does get_bundle and then installs that into a
3416
# branch; this can be used in place of the regular pull/fetch operation when
3417
# coming from a smart server.
3419
# TODO: Eventually, want to do a 'branch' command by fetching the whole
3420
# history as one big bundle. How?
3422
# The branch command does 'br_from.sprout', which tries to preserve the same
3423
# format. We don't necessarily even want that.
3425
# It might be simpler to handle cmd_pull first, which does a simpler fetch()
3426
# operation from one branch into another. It already has some code for
3427
# pulling from a bundle, which it does by trying to see if the destination is
3428
# a bundle file. So it seems the logic for pull ought to be:
3430
# - if it's a smart server, get a bundle from there and install that
3431
# - if it's a bundle, install that
3432
# - if it's a branch, pull from there
3434
# Getting a bundle from a smart server is a bit different from reading a
3435
# bundle from a URL:
3437
# - we can reasonably remember the URL we last read from
3438
# - you can specify a revision number to pull, and we need to pass it across
3439
# to the server as a limit on what will be requested
3441
# TODO: Given a URL, determine whether it is a smart server or not (or perhaps
3442
# otherwise whether it's a bundle?) Should this be a property or method of
3443
# the transport? For the ssh protocol, we always know it's a smart server.
3444
# For http, we potentially need to probe. But if we're explicitly given
3445
# bzr+http:// then we can skip that for now.