1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for smart transport"""
19
# all of this deals with byte strings so this is safe
20
from cStringIO import StringIO
33
from bzrlib.transport import (
39
from bzrlib.transport.http import (
41
SmartClientHTTPMediumRequest,
46
class StringIOSSHVendor(object):
47
"""A SSH vendor that uses StringIO to buffer writes and answer reads."""
49
def __init__(self, read_from, write_to):
50
self.read_from = read_from
51
self.write_to = write_to
54
def connect_ssh(self, username, password, host, port, command):
55
self.calls.append(('connect_ssh', username, password, host, port,
57
return StringIOSSHConnection(self)
60
class StringIOSSHConnection(object):
61
"""A SSH connection that uses StringIO to buffer writes and answer reads."""
63
def __init__(self, vendor):
67
self.vendor.calls.append(('close', ))
69
def get_filelike_channels(self):
70
return self.vendor.read_from, self.vendor.write_to
74
class SmartClientMediumTests(tests.TestCase):
75
"""Tests for SmartClientMedium.
77
We should create a test scenario for this: we need a server module that
78
construct the test-servers (like make_loopsocket_and_medium), and the list
79
of SmartClientMedium classes to test.
82
def make_loopsocket_and_medium(self):
83
"""Create a loopback socket for testing, and a medium aimed at it."""
84
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
85
sock.bind(('127.0.0.1', 0))
87
port = sock.getsockname()[1]
88
medium = smart.SmartTCPClientMedium('127.0.0.1', port)
91
def receive_bytes_on_server(self, sock, bytes):
92
"""Accept a connection on sock and read 3 bytes.
94
The bytes are appended to the list bytes.
96
:return: a Thread which is running to do the accept and recv.
98
def _receive_bytes_on_server():
99
connection, address = sock.accept()
100
bytes.append(connection.recv(3, socket.MSG_WAITALL))
102
t = threading.Thread(target=_receive_bytes_on_server)
106
def test_construct_smart_stream_medium_client(self):
107
# make a new instance of the common base for Stream-like Mediums.
108
# this just ensures that the constructor stays parameter-free which
109
# is important for reuse : some subclasses will dynamically connect,
110
# others are always on, etc.
111
medium = smart.SmartClientStreamMedium()
113
def test_construct_smart_client_medium(self):
114
# the base client medium takes no parameters
115
medium = smart.SmartClientMedium()
117
def test_construct_smart_simple_pipes_client_medium(self):
118
# the SimplePipes client medium takes two pipes:
119
# readable pipe, writeable pipe.
120
# Constructing one should just save these and do nothing.
121
# We test this by passing in None.
122
medium = smart.SmartSimplePipesClientMedium(None, None)
124
def test_simple_pipes_client_request_type(self):
125
# SimplePipesClient should use SmartClientStreamMediumRequest's.
126
medium = smart.SmartSimplePipesClientMedium(None, None)
127
request = medium.get_request()
128
self.assertIsInstance(request, smart.SmartClientStreamMediumRequest)
130
def test_simple_pipes_client_get_concurrent_requests(self):
131
# the simple_pipes client does not support pipelined requests:
132
# but it does support serial requests: we construct one after
133
# another is finished. This is a smoke test testing the integration
134
# of the SmartClientStreamMediumRequest and the SmartClientStreamMedium
135
# classes - as the sibling classes share this logic, they do not have
136
# explicit tests for this.
138
medium = smart.SmartSimplePipesClientMedium(None, output)
139
request = medium.get_request()
140
request.finished_writing()
141
request.finished_reading()
142
request2 = medium.get_request()
143
request2.finished_writing()
144
request2.finished_reading()
146
def test_simple_pipes_client__accept_bytes_writes_to_writable(self):
147
# accept_bytes writes to the writeable pipe.
149
medium = smart.SmartSimplePipesClientMedium(None, output)
150
medium._accept_bytes('abc')
151
self.assertEqual('abc', output.getvalue())
153
def test_simple_pipes_client_disconnect_does_nothing(self):
154
# calling disconnect does nothing.
157
medium = smart.SmartSimplePipesClientMedium(input, output)
158
# send some bytes to ensure disconnecting after activity still does not
160
medium._accept_bytes('abc')
162
self.assertFalse(input.closed)
163
self.assertFalse(output.closed)
165
def test_simple_pipes_client_accept_bytes_after_disconnect(self):
166
# calling disconnect on the client does not alter the pipe that
167
# accept_bytes writes to.
170
medium = smart.SmartSimplePipesClientMedium(input, output)
171
medium._accept_bytes('abc')
173
medium._accept_bytes('abc')
174
self.assertFalse(input.closed)
175
self.assertFalse(output.closed)
176
self.assertEqual('abcabc', output.getvalue())
178
def test_simple_pipes_client_ignores_disconnect_when_not_connected(self):
179
# Doing a disconnect on a new (and thus unconnected) SimplePipes medium
181
medium = smart.SmartSimplePipesClientMedium(None, None)
184
def test_simple_pipes_client_can_always_read(self):
185
# SmartSimplePipesClientMedium is never disconnected, so read_bytes
186
# always tries to read from the underlying pipe.
187
input = StringIO('abcdef')
188
medium = smart.SmartSimplePipesClientMedium(input, None)
189
self.assertEqual('abc', medium.read_bytes(3))
191
self.assertEqual('def', medium.read_bytes(3))
193
def test_simple_pipes_client_supports__flush(self):
194
# invoking _flush on a SimplePipesClient should flush the output
195
# pipe. We test this by creating an output pipe that records
196
# flush calls made to it.
197
from StringIO import StringIO # get regular StringIO
201
def logging_flush(): flush_calls.append('flush')
202
output.flush = logging_flush
203
medium = smart.SmartSimplePipesClientMedium(input, output)
204
# this call is here to ensure we only flush once, not on every
205
# _accept_bytes call.
206
medium._accept_bytes('abc')
209
self.assertEqual(['flush'], flush_calls)
211
def test_construct_smart_ssh_client_medium(self):
212
# the SSH client medium takes:
213
# host, port, username, password, vendor
214
# Constructing one should just save these and do nothing.
215
# we test this by creating a empty bound socket and constructing
217
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
218
sock.bind(('127.0.0.1', 0))
219
unopened_port = sock.getsockname()[1]
220
# having vendor be invalid means that if it tries to connect via the
221
# vendor it will blow up.
222
medium = smart.SmartSSHClientMedium('127.0.0.1', unopened_port,
223
username=None, password=None, vendor="not a vendor")
226
def test_ssh_client_connects_on_first_use(self):
227
# The only thing that initiates a connection from the medium is giving
230
vendor = StringIOSSHVendor(StringIO(), output)
231
medium = smart.SmartSSHClientMedium('a hostname', 'a port', 'a username',
232
'a password', vendor)
233
medium._accept_bytes('abc')
234
self.assertEqual('abc', output.getvalue())
235
self.assertEqual([('connect_ssh', 'a username', 'a password',
236
'a hostname', 'a port',
237
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes'])],
240
def test_ssh_client_changes_command_when_BZR_REMOTE_PATH_is_set(self):
241
# The only thing that initiates a connection from the medium is giving
244
vendor = StringIOSSHVendor(StringIO(), output)
245
orig_bzr_remote_path = os.environ.get('BZR_REMOTE_PATH')
246
def cleanup_environ():
247
osutils.set_or_unset_env('BZR_REMOTE_PATH', orig_bzr_remote_path)
248
self.addCleanup(cleanup_environ)
249
os.environ['BZR_REMOTE_PATH'] = 'fugly'
250
medium = smart.SmartSSHClientMedium('a hostname', 'a port', 'a username',
251
'a password', vendor)
252
medium._accept_bytes('abc')
253
self.assertEqual('abc', output.getvalue())
254
self.assertEqual([('connect_ssh', 'a username', 'a password',
255
'a hostname', 'a port',
256
['fugly', 'serve', '--inet', '--directory=/', '--allow-writes'])],
259
def test_ssh_client_disconnect_does_so(self):
260
# calling disconnect should disconnect both the read_from and write_to
261
# file-like object it from the ssh connection.
264
vendor = StringIOSSHVendor(input, output)
265
medium = smart.SmartSSHClientMedium('a hostname', vendor=vendor)
266
medium._accept_bytes('abc')
268
self.assertTrue(input.closed)
269
self.assertTrue(output.closed)
271
('connect_ssh', None, None, 'a hostname', None,
272
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
277
def test_ssh_client_disconnect_allows_reconnection(self):
278
# calling disconnect on the client terminates the connection, but should
279
# not prevent additional connections occuring.
280
# we test this by initiating a second connection after doing a
284
vendor = StringIOSSHVendor(input, output)
285
medium = smart.SmartSSHClientMedium('a hostname', vendor=vendor)
286
medium._accept_bytes('abc')
288
# the disconnect has closed output, so we need a new output for the
289
# new connection to write to.
292
vendor.read_from = input2
293
vendor.write_to = output2
294
medium._accept_bytes('abc')
296
self.assertTrue(input.closed)
297
self.assertTrue(output.closed)
298
self.assertTrue(input2.closed)
299
self.assertTrue(output2.closed)
301
('connect_ssh', None, None, 'a hostname', None,
302
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
304
('connect_ssh', None, None, 'a hostname', None,
305
['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
310
def test_ssh_client_ignores_disconnect_when_not_connected(self):
311
# Doing a disconnect on a new (and thus unconnected) SSH medium
312
# does not fail. It's ok to disconnect an unconnected medium.
313
medium = smart.SmartSSHClientMedium(None)
316
def test_ssh_client_raises_on_read_when_not_connected(self):
317
# Doing a read on a new (and thus unconnected) SSH medium raises
318
# MediumNotConnected.
319
medium = smart.SmartSSHClientMedium(None)
320
self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 0)
321
self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 1)
323
def test_ssh_client_supports__flush(self):
324
# invoking _flush on a SSHClientMedium should flush the output
325
# pipe. We test this by creating an output pipe that records
326
# flush calls made to it.
327
from StringIO import StringIO # get regular StringIO
331
def logging_flush(): flush_calls.append('flush')
332
output.flush = logging_flush
333
vendor = StringIOSSHVendor(input, output)
334
medium = smart.SmartSSHClientMedium('a hostname', vendor=vendor)
335
# this call is here to ensure we only flush once, not on every
336
# _accept_bytes call.
337
medium._accept_bytes('abc')
340
self.assertEqual(['flush'], flush_calls)
342
def test_construct_smart_tcp_client_medium(self):
343
# the TCP client medium takes a host and a port. Constructing it won't
344
# connect to anything.
345
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
346
sock.bind(('127.0.0.1', 0))
347
unopened_port = sock.getsockname()[1]
348
medium = smart.SmartTCPClientMedium('127.0.0.1', unopened_port)
351
def test_tcp_client_connects_on_first_use(self):
352
# The only thing that initiates a connection from the medium is giving
354
sock, medium = self.make_loopsocket_and_medium()
356
t = self.receive_bytes_on_server(sock, bytes)
357
medium.accept_bytes('abc')
360
self.assertEqual(['abc'], bytes)
362
def test_tcp_client_disconnect_does_so(self):
363
# calling disconnect on the client terminates the connection.
364
# we test this by forcing a short read during a socket.MSG_WAITALL
365
# call: write 2 bytes, try to read 3, and then the client disconnects.
366
sock, medium = self.make_loopsocket_and_medium()
368
t = self.receive_bytes_on_server(sock, bytes)
369
medium.accept_bytes('ab')
373
self.assertEqual(['ab'], bytes)
374
# now disconnect again: this should not do anything, if disconnection
375
# really did disconnect.
378
def test_tcp_client_ignores_disconnect_when_not_connected(self):
379
# Doing a disconnect on a new (and thus unconnected) TCP medium
380
# does not fail. It's ok to disconnect an unconnected medium.
381
medium = smart.SmartTCPClientMedium(None, None)
384
def test_tcp_client_raises_on_read_when_not_connected(self):
385
# Doing a read on a new (and thus unconnected) TCP medium raises
386
# MediumNotConnected.
387
medium = smart.SmartTCPClientMedium(None, None)
388
self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 0)
389
self.assertRaises(errors.MediumNotConnected, medium.read_bytes, 1)
391
def test_tcp_client_supports__flush(self):
392
# invoking _flush on a TCPClientMedium should do something useful.
393
# RBC 20060922 not sure how to test/tell in this case.
394
sock, medium = self.make_loopsocket_and_medium()
396
t = self.receive_bytes_on_server(sock, bytes)
397
# try with nothing buffered
399
medium._accept_bytes('ab')
400
# and with something sent.
405
self.assertEqual(['ab'], bytes)
406
# now disconnect again : this should not do anything, if disconnection
407
# really did disconnect.
411
class TestSmartClientStreamMediumRequest(tests.TestCase):
412
"""Tests the for SmartClientStreamMediumRequest.
414
SmartClientStreamMediumRequest is a helper for the three stream based
415
mediums: TCP, SSH, SimplePipes, so we only test it once, and then test that
416
those three mediums implement the interface it expects.
419
def test_accept_bytes_after_finished_writing_errors(self):
420
# calling accept_bytes after calling finished_writing raises
421
# WritingCompleted to prevent bad assumptions on stream environments
422
# breaking the needs of message-based environments.
424
medium = smart.SmartSimplePipesClientMedium(None, output)
425
request = smart.SmartClientStreamMediumRequest(medium)
426
request.finished_writing()
427
self.assertRaises(errors.WritingCompleted, request.accept_bytes, None)
429
def test_accept_bytes(self):
430
# accept bytes should invoke _accept_bytes on the stream medium.
431
# we test this by using the SimplePipes medium - the most trivial one
432
# and checking that the pipes get the data.
435
medium = smart.SmartSimplePipesClientMedium(input, output)
436
request = smart.SmartClientStreamMediumRequest(medium)
437
request.accept_bytes('123')
438
request.finished_writing()
439
request.finished_reading()
440
self.assertEqual('', input.getvalue())
441
self.assertEqual('123', output.getvalue())
443
def test_construct_sets_stream_request(self):
444
# constructing a SmartClientStreamMediumRequest on a StreamMedium sets
445
# the current request to the new SmartClientStreamMediumRequest
447
medium = smart.SmartSimplePipesClientMedium(None, output)
448
request = smart.SmartClientStreamMediumRequest(medium)
449
self.assertIs(medium._current_request, request)
451
def test_construct_while_another_request_active_throws(self):
452
# constructing a SmartClientStreamMediumRequest on a StreamMedium with
453
# a non-None _current_request raises TooManyConcurrentRequests.
455
medium = smart.SmartSimplePipesClientMedium(None, output)
456
medium._current_request = "a"
457
self.assertRaises(errors.TooManyConcurrentRequests,
458
smart.SmartClientStreamMediumRequest, medium)
460
def test_finished_read_clears_current_request(self):
461
# calling finished_reading clears the current request from the requests
464
medium = smart.SmartSimplePipesClientMedium(None, output)
465
request = smart.SmartClientStreamMediumRequest(medium)
466
request.finished_writing()
467
request.finished_reading()
468
self.assertEqual(None, medium._current_request)
470
def test_finished_read_before_finished_write_errors(self):
471
# calling finished_reading before calling finished_writing triggers a
472
# WritingNotComplete error.
473
medium = smart.SmartSimplePipesClientMedium(None, None)
474
request = smart.SmartClientStreamMediumRequest(medium)
475
self.assertRaises(errors.WritingNotComplete, request.finished_reading)
477
def test_read_bytes(self):
478
# read bytes should invoke _read_bytes on the stream medium.
479
# we test this by using the SimplePipes medium - the most trivial one
480
# and checking that the data is supplied. Its possible that a
481
# faulty implementation could poke at the pipe variables them selves,
482
# but we trust that this will be caught as it will break the integration
484
input = StringIO('321')
486
medium = smart.SmartSimplePipesClientMedium(input, output)
487
request = smart.SmartClientStreamMediumRequest(medium)
488
request.finished_writing()
489
self.assertEqual('321', request.read_bytes(3))
490
request.finished_reading()
491
self.assertEqual('', input.read())
492
self.assertEqual('', output.getvalue())
494
def test_read_bytes_before_finished_write_errors(self):
495
# calling read_bytes before calling finished_writing triggers a
496
# WritingNotComplete error because the Smart protocol is designed to be
497
# compatible with strict message based protocols like HTTP where the
498
# request cannot be submitted until the writing has completed.
499
medium = smart.SmartSimplePipesClientMedium(None, None)
500
request = smart.SmartClientStreamMediumRequest(medium)
501
self.assertRaises(errors.WritingNotComplete, request.read_bytes, None)
503
def test_read_bytes_after_finished_reading_errors(self):
504
# calling read_bytes after calling finished_reading raises
505
# ReadingCompleted to prevent bad assumptions on stream environments
506
# breaking the needs of message-based environments.
508
medium = smart.SmartSimplePipesClientMedium(None, output)
509
request = smart.SmartClientStreamMediumRequest(medium)
510
request.finished_writing()
511
request.finished_reading()
512
self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)
515
class RemoteTransportTests(tests.TestCaseWithTransport):
518
super(RemoteTransportTests, self).setUp()
519
# We're allowed to set the transport class here, so that we don't use
520
# the default or a parameterized class, but rather use the
521
# TestCaseWithTransport infrastructure to set up a smart server and
523
self.transport_server = smart.SmartTCPServer_for_testing
525
def test_plausible_url(self):
526
self.assert_(self.get_url().startswith('bzr://'))
528
def test_probe_transport(self):
529
t = self.get_transport()
530
self.assertIsInstance(t, smart.SmartTransport)
532
def test_get_medium_from_transport(self):
533
"""Remote transport has a medium always, which it can return."""
534
t = self.get_transport()
535
medium = t.get_smart_medium()
536
self.assertIsInstance(medium, smart.SmartClientMedium)
539
class ErrorRaisingProtocol(object):
541
def __init__(self, exception):
542
self.exception = exception
544
def next_read_size(self):
548
class SampleRequest(object):
550
def __init__(self, expected_bytes):
551
self.accepted_bytes = ''
552
self._finished_reading = False
553
self.expected_bytes = expected_bytes
554
self.excess_buffer = ''
556
def accept_bytes(self, bytes):
557
self.accepted_bytes += bytes
558
if self.accepted_bytes.startswith(self.expected_bytes):
559
self._finished_reading = True
560
self.excess_buffer = self.accepted_bytes[len(self.expected_bytes):]
562
def next_read_size(self):
563
if self._finished_reading:
569
class TestSmartServerStreamMedium(tests.TestCase):
571
def portable_socket_pair(self):
572
"""Return a pair of TCP sockets connected to each other.
574
Unlike socket.socketpair, this should work on Windows.
576
listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
577
listen_sock.bind(('127.0.0.1', 0))
578
listen_sock.listen(1)
579
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
580
client_sock.connect(listen_sock.getsockname())
581
server_sock, addr = listen_sock.accept()
583
return server_sock, client_sock
585
def test_smart_query_version(self):
586
"""Feed a canned query version to a server"""
587
# wire-to-wire, using the whole stack
588
to_server = StringIO('hello\n')
589
from_server = StringIO()
590
transport = local.LocalTransport(urlutils.local_path_to_url('/'))
591
server = smart.SmartServerPipeStreamMedium(
592
to_server, from_server, transport)
593
protocol = smart.SmartServerRequestProtocolOne(transport,
595
server._serve_one_request(protocol)
596
self.assertEqual('ok\0011\n',
597
from_server.getvalue())
599
def test_canned_get_response(self):
600
transport = memory.MemoryTransport('memory:///')
601
transport.put_bytes('testfile', 'contents\nof\nfile\n')
602
to_server = StringIO('get\001./testfile\n')
603
from_server = StringIO()
604
server = smart.SmartServerPipeStreamMedium(
605
to_server, from_server, transport)
606
protocol = smart.SmartServerRequestProtocolOne(transport,
608
server._serve_one_request(protocol)
609
self.assertEqual('ok\n'
611
'contents\nof\nfile\n'
613
from_server.getvalue())
615
def test_pipe_like_stream_with_bulk_data(self):
616
sample_request_bytes = 'command\n9\nbulk datadone\n'
617
to_server = StringIO(sample_request_bytes)
618
from_server = StringIO()
619
server = smart.SmartServerPipeStreamMedium(to_server, from_server, None)
620
sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
621
server._serve_one_request(sample_protocol)
622
self.assertEqual('', from_server.getvalue())
623
self.assertEqual(sample_request_bytes, sample_protocol.accepted_bytes)
624
self.assertFalse(server.finished)
626
def test_socket_stream_with_bulk_data(self):
627
sample_request_bytes = 'command\n9\nbulk datadone\n'
628
server_sock, client_sock = self.portable_socket_pair()
629
server = smart.SmartServerSocketStreamMedium(
631
sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
632
client_sock.sendall(sample_request_bytes)
633
server._serve_one_request(sample_protocol)
635
self.assertEqual('', client_sock.recv(1))
636
self.assertEqual(sample_request_bytes, sample_protocol.accepted_bytes)
637
self.assertFalse(server.finished)
639
def test_pipe_like_stream_shutdown_detection(self):
640
to_server = StringIO('')
641
from_server = StringIO()
642
server = smart.SmartServerPipeStreamMedium(to_server, from_server, None)
643
server._serve_one_request(SampleRequest('x'))
644
self.assertTrue(server.finished)
646
def test_socket_stream_shutdown_detection(self):
647
server_sock, client_sock = self.portable_socket_pair()
649
server = smart.SmartServerSocketStreamMedium(
651
server._serve_one_request(SampleRequest('x'))
652
self.assertTrue(server.finished)
654
def test_pipe_like_stream_with_two_requests(self):
655
# If two requests are read in one go, then two calls to
656
# _serve_one_request should still process both of them as if they had
657
# been received seperately.
658
sample_request_bytes = 'command\n'
659
to_server = StringIO(sample_request_bytes * 2)
660
from_server = StringIO()
661
server = smart.SmartServerPipeStreamMedium(to_server, from_server, None)
662
first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
663
server._serve_one_request(first_protocol)
664
self.assertEqual(0, first_protocol.next_read_size())
665
self.assertEqual('', from_server.getvalue())
666
self.assertFalse(server.finished)
667
# Make a new protocol, call _serve_one_request with it to collect the
669
second_protocol = SampleRequest(expected_bytes=sample_request_bytes)
670
server._serve_one_request(second_protocol)
671
self.assertEqual('', from_server.getvalue())
672
self.assertEqual(sample_request_bytes, second_protocol.accepted_bytes)
673
self.assertFalse(server.finished)
675
def test_socket_stream_with_two_requests(self):
676
# If two requests are read in one go, then two calls to
677
# _serve_one_request should still process both of them as if they had
678
# been received seperately.
679
sample_request_bytes = 'command\n'
680
server_sock, client_sock = self.portable_socket_pair()
681
server = smart.SmartServerSocketStreamMedium(
683
first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
684
# Put two whole requests on the wire.
685
client_sock.sendall(sample_request_bytes * 2)
686
server._serve_one_request(first_protocol)
687
self.assertEqual(0, first_protocol.next_read_size())
688
self.assertFalse(server.finished)
689
# Make a new protocol, call _serve_one_request with it to collect the
691
second_protocol = SampleRequest(expected_bytes=sample_request_bytes)
692
stream_still_open = server._serve_one_request(second_protocol)
693
self.assertEqual(sample_request_bytes, second_protocol.accepted_bytes)
694
self.assertFalse(server.finished)
696
self.assertEqual('', client_sock.recv(1))
698
def test_pipe_like_stream_error_handling(self):
699
# Use plain python StringIO so we can monkey-patch the close method to
700
# not discard the contents.
701
from StringIO import StringIO
702
to_server = StringIO('')
703
from_server = StringIO()
707
from_server.close = close
708
server = smart.SmartServerPipeStreamMedium(to_server, from_server, None)
709
fake_protocol = ErrorRaisingProtocol(Exception('boom'))
710
server._serve_one_request(fake_protocol)
711
self.assertEqual('', from_server.getvalue())
712
self.assertTrue(self.closed)
713
self.assertTrue(server.finished)
715
def test_socket_stream_error_handling(self):
716
# Use plain python StringIO so we can monkey-patch the close method to
717
# not discard the contents.
718
from StringIO import StringIO
719
server_sock, client_sock = self.portable_socket_pair()
720
server = smart.SmartServerSocketStreamMedium(
722
fake_protocol = ErrorRaisingProtocol(Exception('boom'))
723
server._serve_one_request(fake_protocol)
724
# recv should not block, because the other end of the socket has been
726
self.assertEqual('', client_sock.recv(1))
727
self.assertTrue(server.finished)
729
def test_pipe_like_stream_keyboard_interrupt_handling(self):
730
# Use plain python StringIO so we can monkey-patch the close method to
731
# not discard the contents.
732
to_server = StringIO('')
733
from_server = StringIO()
734
server = smart.SmartServerPipeStreamMedium(to_server, from_server, None)
735
fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
737
KeyboardInterrupt, server._serve_one_request, fake_protocol)
738
self.assertEqual('', from_server.getvalue())
740
def test_socket_stream_keyboard_interrupt_handling(self):
741
server_sock, client_sock = self.portable_socket_pair()
742
server = smart.SmartServerSocketStreamMedium(
744
fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
746
KeyboardInterrupt, server._serve_one_request, fake_protocol)
748
self.assertEqual('', client_sock.recv(1))
751
class TestSmartTCPServer(tests.TestCase):
753
def test_get_error_unexpected(self):
754
"""Error reported by server with no specific representation"""
755
class FlakyTransport(object):
756
def get_bytes(self, path):
757
raise Exception("some random exception from inside server")
758
server = smart.SmartTCPServer(backing_transport=FlakyTransport())
759
server.start_background_thread()
761
transport = smart.SmartTCPTransport(server.get_url())
763
transport.get('something')
764
except errors.TransportError, e:
765
self.assertContainsRe(str(e), 'some random exception')
767
self.fail("get did not raise expected error")
769
server.stop_background_thread()
772
class SmartTCPTests(tests.TestCase):
773
"""Tests for connection/end to end behaviour using the TCP server.
775
All of these tests are run with a server running on another thread serving
776
a MemoryTransport, and a connection to it already open.
778
the server is obtained by calling self.setUpServer(readonly=False).
781
def setUpServer(self, readonly=False):
784
:param readonly: Create a readonly server.
786
self.backing_transport = memory.MemoryTransport()
788
self.real_backing_transport = self.backing_transport
789
self.backing_transport = get_transport("readonly+" + self.backing_transport.abspath('.'))
790
self.server = smart.SmartTCPServer(self.backing_transport)
791
self.server.start_background_thread()
792
self.transport = smart.SmartTCPTransport(self.server.get_url())
795
if getattr(self, 'transport', None):
796
self.transport.disconnect()
797
if getattr(self, 'server', None):
798
self.server.stop_background_thread()
799
super(SmartTCPTests, self).tearDown()
802
class WritableEndToEndTests(SmartTCPTests):
803
"""Client to server tests that require a writable transport."""
806
super(WritableEndToEndTests, self).setUp()
809
def test_start_tcp_server(self):
810
url = self.server.get_url()
811
self.assertContainsRe(url, r'^bzr://127\.0\.0\.1:[0-9]{2,}/')
813
def test_smart_transport_has(self):
814
"""Checking for file existence over smart."""
815
self.backing_transport.put_bytes("foo", "contents of foo\n")
816
self.assertTrue(self.transport.has("foo"))
817
self.assertFalse(self.transport.has("non-foo"))
819
def test_smart_transport_get(self):
820
"""Read back a file over smart."""
821
self.backing_transport.put_bytes("foo", "contents\nof\nfoo\n")
822
fp = self.transport.get("foo")
823
self.assertEqual('contents\nof\nfoo\n', fp.read())
825
def test_get_error_enoent(self):
826
"""Error reported from server getting nonexistent file."""
827
# The path in a raised NoSuchFile exception should be the precise path
828
# asked for by the client. This gives meaningful and unsurprising errors
831
self.transport.get('not%20a%20file')
832
except errors.NoSuchFile, e:
833
self.assertEqual('not%20a%20file', e.path)
835
self.fail("get did not raise expected error")
837
def test_simple_clone_conn(self):
838
"""Test that cloning reuses the same connection."""
839
# we create a real connection not a loopback one, but it will use the
840
# same server and pipes
841
conn2 = self.transport.clone('.')
842
self.assertIs(self.transport._medium, conn2._medium)
844
def test__remote_path(self):
845
self.assertEquals('/foo/bar',
846
self.transport._remote_path('foo/bar'))
848
def test_clone_changes_base(self):
849
"""Cloning transport produces one with a new base location"""
850
conn2 = self.transport.clone('subdir')
851
self.assertEquals(self.transport.base + 'subdir/',
854
def test_open_dir(self):
855
"""Test changing directory"""
856
transport = self.transport
857
self.backing_transport.mkdir('toffee')
858
self.backing_transport.mkdir('toffee/apple')
859
self.assertEquals('/toffee', transport._remote_path('toffee'))
860
toffee_trans = transport.clone('toffee')
861
# Check that each transport has only the contents of its directory
862
# directly visible. If state was being held in the wrong object, it's
863
# conceivable that cloning a transport would alter the state of the
864
# cloned-from transport.
865
self.assertTrue(transport.has('toffee'))
866
self.assertFalse(toffee_trans.has('toffee'))
867
self.assertFalse(transport.has('apple'))
868
self.assertTrue(toffee_trans.has('apple'))
870
def test_open_bzrdir(self):
871
"""Open an existing bzrdir over smart transport"""
872
transport = self.transport
873
t = self.backing_transport
874
bzrdir.BzrDirFormat.get_default_format().initialize_on_transport(t)
875
result_dir = bzrdir.BzrDir.open_containing_from_transport(transport)
878
class ReadOnlyEndToEndTests(SmartTCPTests):
879
"""Tests from the client to the server using a readonly backing transport."""
881
def test_mkdir_error_readonly(self):
882
"""TransportNotPossible should be preserved from the backing transport."""
883
self.setUpServer(readonly=True)
884
self.assertRaises(errors.TransportNotPossible, self.transport.mkdir,
888
class SmartServerRequestHandlerTests(tests.TestCaseWithTransport):
889
"""Test that call directly into the handler logic, bypassing the network."""
891
def test_construct_request_handler(self):
892
"""Constructing a request handler should be easy and set defaults."""
893
handler = smart.SmartServerRequestHandler(None)
894
self.assertFalse(handler.finished_reading)
896
def test_hello(self):
897
handler = smart.SmartServerRequestHandler(None)
898
handler.dispatch_command('hello', ())
899
self.assertEqual(('ok', '1'), handler.response.args)
900
self.assertEqual(None, handler.response.body)
902
def test_get_bundle(self):
903
from bzrlib.bundle import serializer
904
wt = self.make_branch_and_tree('.')
905
self.build_tree_contents([('hello', 'hello world')])
907
rev_id = wt.commit('add hello')
909
handler = smart.SmartServerRequestHandler(self.get_transport())
910
handler.dispatch_command('get_bundle', ('.', rev_id))
911
bundle = serializer.read_bundle(StringIO(handler.response.body))
912
self.assertEqual((), handler.response.args)
914
def test_readonly_exception_becomes_transport_not_possible(self):
915
"""The response for a read-only error is ('ReadOnlyError')."""
916
handler = smart.SmartServerRequestHandler(self.get_readonly_transport())
917
# send a mkdir for foo, with no explicit mode - should fail.
918
handler.dispatch_command('mkdir', ('foo', ''))
919
# and the failure should be an explicit ReadOnlyError
920
self.assertEqual(("ReadOnlyError", ), handler.response.args)
921
# XXX: TODO: test that other TransportNotPossible errors are
922
# presented as TransportNotPossible - not possible to do that
923
# until I figure out how to trigger that relatively cleanly via
924
# the api. RBC 20060918
926
def test_hello_has_finished_body_on_dispatch(self):
927
"""The 'hello' command should set finished_reading."""
928
handler = smart.SmartServerRequestHandler(None)
929
handler.dispatch_command('hello', ())
930
self.assertTrue(handler.finished_reading)
931
self.assertNotEqual(None, handler.response)
933
def test_put_bytes_non_atomic(self):
934
"""'put_...' should set finished_reading after reading the bytes."""
935
handler = smart.SmartServerRequestHandler(self.get_transport())
936
handler.dispatch_command('put_non_atomic', ('a-file', '', 'F', ''))
937
self.assertFalse(handler.finished_reading)
938
handler.accept_body('1234')
939
self.assertFalse(handler.finished_reading)
940
handler.accept_body('5678')
941
handler.end_of_body()
942
self.assertTrue(handler.finished_reading)
943
self.assertEqual(('ok', ), handler.response.args)
944
self.assertEqual(None, handler.response.body)
946
def test_readv_accept_body(self):
947
"""'readv' should set finished_reading after reading offsets."""
948
self.build_tree(['a-file'])
949
handler = smart.SmartServerRequestHandler(self.get_readonly_transport())
950
handler.dispatch_command('readv', ('a-file', ))
951
self.assertFalse(handler.finished_reading)
952
handler.accept_body('2,')
953
self.assertFalse(handler.finished_reading)
954
handler.accept_body('3')
955
handler.end_of_body()
956
self.assertTrue(handler.finished_reading)
957
self.assertEqual(('readv', ), handler.response.args)
958
# co - nte - nt of a-file is the file contents we are extracting from.
959
self.assertEqual('nte', handler.response.body)
961
def test_readv_short_read_response_contents(self):
962
"""'readv' when a short read occurs sets the response appropriately."""
963
self.build_tree(['a-file'])
964
handler = smart.SmartServerRequestHandler(self.get_readonly_transport())
965
handler.dispatch_command('readv', ('a-file', ))
966
# read beyond the end of the file.
967
handler.accept_body('100,1')
968
handler.end_of_body()
969
self.assertTrue(handler.finished_reading)
970
self.assertEqual(('ShortReadvError', 'a-file', '100', '1', '0'),
971
handler.response.args)
972
self.assertEqual(None, handler.response.body)
975
class SmartTransportRegistration(tests.TestCase):
977
def test_registration(self):
978
t = get_transport('bzr+ssh://example.com/path')
979
self.assertIsInstance(t, smart.SmartSSHTransport)
980
self.assertEqual('example.com', t._host)
983
class TestSmartTransport(tests.TestCase):
985
def test_use_connection_factory(self):
986
# We want to be able to pass a client as a parameter to SmartTransport.
987
input = StringIO("ok\n3\nbardone\n")
989
medium = smart.SmartSimplePipesClientMedium(input, output)
990
transport = smart.SmartTransport('bzr://localhost/', medium=medium)
992
# We want to make sure the client is used when the first remote
993
# method is called. No data should have been sent, or read.
994
self.assertEqual(0, input.tell())
995
self.assertEqual('', output.getvalue())
997
# Now call a method that should result in a single request : as the
998
# transport makes its own protocol instances, we check on the wire.
999
# XXX: TODO: give the transport a protocol factory, which can make
1000
# an instrumented protocol for us.
1001
self.assertEqual('bar', transport.get_bytes('foo'))
1002
# only the needed data should have been sent/received.
1003
self.assertEqual(13, input.tell())
1004
self.assertEqual('get\x01/foo\n', output.getvalue())
1006
def test__translate_error_readonly(self):
1007
"""Sending a ReadOnlyError to _translate_error raises TransportNotPossible."""
1008
medium = smart.SmartClientMedium()
1009
transport = smart.SmartTransport('bzr://localhost/', medium=medium)
1010
self.assertRaises(errors.TransportNotPossible,
1011
transport._translate_error, ("ReadOnlyError", ))
1014
class InstrumentedServerProtocol(smart.SmartServerStreamMedium):
1015
"""A smart server which is backed by memory and saves its write requests."""
1017
def __init__(self, write_output_list):
1018
smart.SmartServerStreamMedium.__init__(self, memory.MemoryTransport())
1019
self._write_output_list = write_output_list
1022
class TestSmartProtocol(tests.TestCase):
1023
"""Tests for the smart protocol.
1025
Each test case gets a smart_server and smart_client created during setUp().
1027
It is planned that the client can be called with self.call_client() giving
1028
it an expected server response, which will be fed into it when it tries to
1029
read. Likewise, self.call_server will call a servers method with a canned
1030
serialised client request. Output done by the client or server for these
1031
calls will be captured to self.to_server and self.to_client. Each element
1032
in the list is a write call from the client or server respectively.
1036
super(TestSmartProtocol, self).setUp()
1037
self.server_to_client = []
1038
self.to_server = StringIO()
1039
self.to_client = StringIO()
1040
self.client_medium = smart.SmartSimplePipesClientMedium(self.to_client,
1042
self.client_protocol = smart.SmartClientRequestProtocolOne(
1044
self.smart_server = InstrumentedServerProtocol(self.server_to_client)
1045
self.smart_server_request = smart.SmartServerRequestHandler(None)
1047
def assertOffsetSerialisation(self, expected_offsets, expected_serialised,
1048
client, smart_server_request):
1049
"""Check that smart (de)serialises offsets as expected.
1051
We check both serialisation and deserialisation at the same time
1052
to ensure that the round tripping cannot skew: both directions should
1055
:param expected_offsets: a readv offset list.
1056
:param expected_seralised: an expected serial form of the offsets.
1057
:param smart_server_request: a SmartServerRequestHandler instance.
1059
# XXX: 'smart_server_request' should be a SmartServerRequestProtocol in
1061
offsets = smart_server_request._deserialise_offsets(expected_serialised)
1062
self.assertEqual(expected_offsets, offsets)
1063
serialised = client._serialise_offsets(offsets)
1064
self.assertEqual(expected_serialised, serialised)
1066
def build_protocol_waiting_for_body(self):
1067
out_stream = StringIO()
1068
protocol = smart.SmartServerRequestProtocolOne(None, out_stream.write)
1069
protocol.has_dispatched = True
1070
protocol.request = smart.SmartServerRequestHandler(None)
1071
def handle_end_of_bytes():
1072
self.end_received = True
1073
self.assertEqual('abcdefg', protocol.request._body_bytes)
1074
protocol.request.response = smart.SmartServerResponse(('ok', ))
1075
protocol.request._end_of_body_handler = handle_end_of_bytes
1076
# Call accept_bytes to make sure that internal state like _body_decoder
1077
# is initialised. This test should probably be given a clearer
1078
# interface to work with that will not cause this inconsistency.
1079
# -- Andrew Bennetts, 2006-09-28
1080
protocol.accept_bytes('')
1083
def test_construct_version_one_server_protocol(self):
1084
protocol = smart.SmartServerRequestProtocolOne(None, None)
1085
self.assertEqual('', protocol.excess_buffer)
1086
self.assertEqual('', protocol.in_buffer)
1087
self.assertFalse(protocol.has_dispatched)
1088
self.assertEqual(1, protocol.next_read_size())
1090
def test_construct_version_one_client_protocol(self):
1091
# we can construct a client protocol from a client medium request
1093
medium = smart.SmartSimplePipesClientMedium(None, output)
1094
request = medium.get_request()
1095
client_protocol = smart.SmartClientRequestProtocolOne(request)
1097
def test_server_offset_serialisation(self):
1098
"""The Smart protocol serialises offsets as a comma and \n string.
1100
We check a number of boundary cases are as expected: empty, one offset,
1101
one with the order of reads not increasing (an out of order read), and
1102
one that should coalesce.
1104
self.assertOffsetSerialisation([], '',
1105
self.client_protocol, self.smart_server_request)
1106
self.assertOffsetSerialisation([(1,2)], '1,2',
1107
self.client_protocol, self.smart_server_request)
1108
self.assertOffsetSerialisation([(10,40), (0,5)], '10,40\n0,5',
1109
self.client_protocol, self.smart_server_request)
1110
self.assertOffsetSerialisation([(1,2), (3,4), (100, 200)],
1111
'1,2\n3,4\n100,200', self.client_protocol, self.smart_server_request)
1113
def test_accept_bytes_to_protocol(self):
1114
out_stream = StringIO()
1115
protocol = smart.SmartServerRequestProtocolOne(None, out_stream.write)
1116
protocol.accept_bytes('abc')
1117
self.assertEqual('abc', protocol.in_buffer)
1118
protocol.accept_bytes('\n')
1119
self.assertEqual("error\x01Generic bzr smart protocol error: bad request"
1120
" u'abc'\n", out_stream.getvalue())
1121
self.assertTrue(protocol.has_dispatched)
1122
self.assertEqual(1, protocol.next_read_size())
1124
def test_accept_bytes_with_invalid_utf8_to_protocol(self):
1125
out_stream = StringIO()
1126
protocol = smart.SmartServerRequestProtocolOne(None, out_stream.write)
1127
# the byte 0xdd is not a valid UTF-8 string.
1128
protocol.accept_bytes('\xdd\n')
1130
"error\x01Generic bzr smart protocol error: "
1131
"one or more arguments of request '\\xdd\\n' are not valid UTF-8\n",
1132
out_stream.getvalue())
1133
self.assertTrue(protocol.has_dispatched)
1134
self.assertEqual(1, protocol.next_read_size())
1136
def test_accept_body_bytes_to_protocol(self):
1137
protocol = self.build_protocol_waiting_for_body()
1138
self.assertEqual(6, protocol.next_read_size())
1139
protocol.accept_bytes('7\nabc')
1140
self.assertEqual(9, protocol.next_read_size())
1141
protocol.accept_bytes('defgd')
1142
protocol.accept_bytes('one\n')
1143
self.assertEqual(0, protocol.next_read_size())
1144
self.assertTrue(self.end_received)
1146
def test_accept_request_and_body_all_at_once(self):
1147
mem_transport = memory.MemoryTransport()
1148
mem_transport.put_bytes('foo', 'abcdefghij')
1149
out_stream = StringIO()
1150
protocol = smart.SmartServerRequestProtocolOne(mem_transport,
1152
protocol.accept_bytes('readv\x01foo\n3\n3,3done\n')
1153
self.assertEqual(0, protocol.next_read_size())
1154
self.assertEqual('readv\n3\ndefdone\n', out_stream.getvalue())
1155
self.assertEqual('', protocol.excess_buffer)
1156
self.assertEqual('', protocol.in_buffer)
1158
def test_accept_excess_bytes_are_preserved(self):
1159
out_stream = StringIO()
1160
protocol = smart.SmartServerRequestProtocolOne(None, out_stream.write)
1161
protocol.accept_bytes('hello\nhello\n')
1162
self.assertEqual("ok\x011\n", out_stream.getvalue())
1163
self.assertEqual("hello\n", protocol.excess_buffer)
1164
self.assertEqual("", protocol.in_buffer)
1166
def test_accept_excess_bytes_after_body(self):
1167
protocol = self.build_protocol_waiting_for_body()
1168
protocol.accept_bytes('7\nabcdefgdone\nX')
1169
self.assertTrue(self.end_received)
1170
self.assertEqual("X", protocol.excess_buffer)
1171
self.assertEqual("", protocol.in_buffer)
1172
protocol.accept_bytes('Y')
1173
self.assertEqual("XY", protocol.excess_buffer)
1174
self.assertEqual("", protocol.in_buffer)
1176
def test_accept_excess_bytes_after_dispatch(self):
1177
out_stream = StringIO()
1178
protocol = smart.SmartServerRequestProtocolOne(None, out_stream.write)
1179
protocol.accept_bytes('hello\n')
1180
self.assertEqual("ok\x011\n", out_stream.getvalue())
1181
protocol.accept_bytes('hel')
1182
self.assertEqual("hel", protocol.excess_buffer)
1183
protocol.accept_bytes('lo\n')
1184
self.assertEqual("hello\n", protocol.excess_buffer)
1185
self.assertEqual("", protocol.in_buffer)
1187
def test_sync_with_request_sets_finished_reading(self):
1188
protocol = smart.SmartServerRequestProtocolOne(None, None)
1189
request = smart.SmartServerRequestHandler(None)
1190
protocol.sync_with_request(request)
1191
self.assertEqual(1, protocol.next_read_size())
1192
request.finished_reading = True
1193
protocol.sync_with_request(request)
1194
self.assertEqual(0, protocol.next_read_size())
1196
def test_query_version(self):
1197
"""query_version on a SmartClientProtocolOne should return a number.
1199
The protocol provides the query_version because the domain level clients
1200
may all need to be able to probe for capabilities.
1202
# What we really want to test here is that SmartClientProtocolOne calls
1203
# accept_bytes(tuple_based_encoding_of_hello) and reads and parses the
1204
# response of tuple-encoded (ok, 1). Also, seperately we should test
1205
# the error if the response is a non-understood version.
1206
input = StringIO('ok\x011\n')
1208
medium = smart.SmartSimplePipesClientMedium(input, output)
1209
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1210
self.assertEqual(1, protocol.query_version())
1212
def assertServerToClientEncoding(self, expected_bytes, expected_tuple,
1214
"""Assert that each input_tuple serialises as expected_bytes, and the
1215
bytes deserialise as expected_tuple.
1217
# check the encoding of the server for all input_tuples matches
1219
for input_tuple in input_tuples:
1220
server_output = StringIO()
1221
server_protocol = smart.SmartServerRequestProtocolOne(
1222
None, server_output.write)
1223
server_protocol._send_response(input_tuple)
1224
self.assertEqual(expected_bytes, server_output.getvalue())
1225
# check the decoding of the client protocol from expected_bytes:
1226
input = StringIO(expected_bytes)
1228
medium = smart.SmartSimplePipesClientMedium(input, output)
1229
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1230
protocol.call('foo')
1231
self.assertEqual(expected_tuple, protocol.read_response_tuple())
1233
def test_client_call_empty_response(self):
1234
# protocol.call() can get back an empty tuple as a response. This occurs
1235
# when the parsed line is an empty line, and results in a tuple with
1236
# one element - an empty string.
1237
self.assertServerToClientEncoding('\n', ('', ), [(), ('', )])
1239
def test_client_call_three_element_response(self):
1240
# protocol.call() can get back tuples of other lengths. A three element
1241
# tuple should be unpacked as three strings.
1242
self.assertServerToClientEncoding('a\x01b\x0134\n', ('a', 'b', '34'),
1245
def test_client_call_with_body_bytes_uploads(self):
1246
# protocol.call_with_upload should length-prefix the bytes onto the
1248
expected_bytes = "foo\n7\nabcdefgdone\n"
1249
input = StringIO("\n")
1251
medium = smart.SmartSimplePipesClientMedium(input, output)
1252
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1253
protocol.call_with_body_bytes(('foo', ), "abcdefg")
1254
self.assertEqual(expected_bytes, output.getvalue())
1256
def test_client_call_with_body_readv_array(self):
1257
# protocol.call_with_upload should encode the readv array and then
1258
# length-prefix the bytes onto the wire.
1259
expected_bytes = "foo\n7\n1,2\n5,6done\n"
1260
input = StringIO("\n")
1262
medium = smart.SmartSimplePipesClientMedium(input, output)
1263
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1264
protocol.call_with_body_readv_array(('foo', ), [(1,2),(5,6)])
1265
self.assertEqual(expected_bytes, output.getvalue())
1267
def test_client_read_body_bytes_all(self):
1268
# read_body_bytes should decode the body bytes from the wire into
1270
expected_bytes = "1234567"
1271
server_bytes = "ok\n7\n1234567done\n"
1272
input = StringIO(server_bytes)
1274
medium = smart.SmartSimplePipesClientMedium(input, output)
1275
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1276
protocol.call('foo')
1277
protocol.read_response_tuple(True)
1278
self.assertEqual(expected_bytes, protocol.read_body_bytes())
1280
def test_client_read_body_bytes_incremental(self):
1281
# test reading a few bytes at a time from the body
1282
# XXX: possibly we should test dribbling the bytes into the stringio
1283
# to make the state machine work harder: however, as we use the
1284
# LengthPrefixedBodyDecoder that is already well tested - we can skip
1286
expected_bytes = "1234567"
1287
server_bytes = "ok\n7\n1234567done\n"
1288
input = StringIO(server_bytes)
1290
medium = smart.SmartSimplePipesClientMedium(input, output)
1291
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1292
protocol.call('foo')
1293
protocol.read_response_tuple(True)
1294
self.assertEqual(expected_bytes[0:2], protocol.read_body_bytes(2))
1295
self.assertEqual(expected_bytes[2:4], protocol.read_body_bytes(2))
1296
self.assertEqual(expected_bytes[4:6], protocol.read_body_bytes(2))
1297
self.assertEqual(expected_bytes[6], protocol.read_body_bytes())
1299
def test_client_cancel_read_body_does_not_eat_body_bytes(self):
1300
# cancelling the expected body needs to finish the request, but not
1301
# read any more bytes.
1302
expected_bytes = "1234567"
1303
server_bytes = "ok\n7\n1234567done\n"
1304
input = StringIO(server_bytes)
1306
medium = smart.SmartSimplePipesClientMedium(input, output)
1307
protocol = smart.SmartClientRequestProtocolOne(medium.get_request())
1308
protocol.call('foo')
1309
protocol.read_response_tuple(True)
1310
protocol.cancel_read_body()
1311
self.assertEqual(3, input.tell())
1312
self.assertRaises(errors.ReadingCompleted, protocol.read_body_bytes)
1315
class LengthPrefixedBodyDecoder(tests.TestCase):
1317
# XXX: TODO: make accept_reading_trailer invoke translate_response or
1318
# something similar to the ProtocolBase method.
1320
def test_construct(self):
1321
decoder = smart.LengthPrefixedBodyDecoder()
1322
self.assertFalse(decoder.finished_reading)
1323
self.assertEqual(6, decoder.next_read_size())
1324
self.assertEqual('', decoder.read_pending_data())
1325
self.assertEqual('', decoder.unused_data)
1327
def test_accept_bytes(self):
1328
decoder = smart.LengthPrefixedBodyDecoder()
1329
decoder.accept_bytes('')
1330
self.assertFalse(decoder.finished_reading)
1331
self.assertEqual(6, decoder.next_read_size())
1332
self.assertEqual('', decoder.read_pending_data())
1333
self.assertEqual('', decoder.unused_data)
1334
decoder.accept_bytes('7')
1335
self.assertFalse(decoder.finished_reading)
1336
self.assertEqual(6, decoder.next_read_size())
1337
self.assertEqual('', decoder.read_pending_data())
1338
self.assertEqual('', decoder.unused_data)
1339
decoder.accept_bytes('\na')
1340
self.assertFalse(decoder.finished_reading)
1341
self.assertEqual(11, decoder.next_read_size())
1342
self.assertEqual('a', decoder.read_pending_data())
1343
self.assertEqual('', decoder.unused_data)
1344
decoder.accept_bytes('bcdefgd')
1345
self.assertFalse(decoder.finished_reading)
1346
self.assertEqual(4, decoder.next_read_size())
1347
self.assertEqual('bcdefg', decoder.read_pending_data())
1348
self.assertEqual('', decoder.unused_data)
1349
decoder.accept_bytes('one')
1350
self.assertFalse(decoder.finished_reading)
1351
self.assertEqual(1, decoder.next_read_size())
1352
self.assertEqual('', decoder.read_pending_data())
1353
self.assertEqual('', decoder.unused_data)
1354
decoder.accept_bytes('\nblarg')
1355
self.assertTrue(decoder.finished_reading)
1356
self.assertEqual(1, decoder.next_read_size())
1357
self.assertEqual('', decoder.read_pending_data())
1358
self.assertEqual('blarg', decoder.unused_data)
1360
def test_accept_bytes_all_at_once_with_excess(self):
1361
decoder = smart.LengthPrefixedBodyDecoder()
1362
decoder.accept_bytes('1\nadone\nunused')
1363
self.assertTrue(decoder.finished_reading)
1364
self.assertEqual(1, decoder.next_read_size())
1365
self.assertEqual('a', decoder.read_pending_data())
1366
self.assertEqual('unused', decoder.unused_data)
1368
def test_accept_bytes_exact_end_of_body(self):
1369
decoder = smart.LengthPrefixedBodyDecoder()
1370
decoder.accept_bytes('1\na')
1371
self.assertFalse(decoder.finished_reading)
1372
self.assertEqual(5, decoder.next_read_size())
1373
self.assertEqual('a', decoder.read_pending_data())
1374
self.assertEqual('', decoder.unused_data)
1375
decoder.accept_bytes('done\n')
1376
self.assertTrue(decoder.finished_reading)
1377
self.assertEqual(1, decoder.next_read_size())
1378
self.assertEqual('', decoder.read_pending_data())
1379
self.assertEqual('', decoder.unused_data)
1382
class FakeHTTPMedium(object):
1384
self.written_request = None
1385
self._current_request = None
1386
def send_http_smart_request(self, bytes):
1387
self.written_request = bytes
1391
class HTTPTunnellingSmokeTest(tests.TestCaseWithTransport):
1393
def _test_bulk_data(self, url_protocol):
1394
# We should be able to send and receive bulk data in a single message.
1395
# The 'readv' command in the smart protocol both sends and receives bulk
1396
# data, so we use that.
1397
self.build_tree(['data-file'])
1398
http_server = HTTPServerWithSmarts()
1399
http_server._url_protocol = url_protocol
1401
self.addCleanup(http_server.tearDown)
1403
http_transport = get_transport(http_server.get_url())
1405
medium = http_transport.get_smart_medium()
1406
#remote_transport = RemoteTransport('fake_url', medium)
1407
remote_transport = smart.SmartTransport('/', medium=medium)
1409
[(0, "c")], list(remote_transport.readv("data-file", [(0,1)])))
1411
def test_bulk_data_pycurl(self):
1413
self._test_bulk_data('http+pycurl')
1414
except errors.UnsupportedProtocol, e:
1415
raise tests.TestSkipped(str(e))
1417
def test_bulk_data_urllib(self):
1418
self._test_bulk_data('http+urllib')
1420
def test_smart_http_medium_request_accept_bytes(self):
1421
medium = FakeHTTPMedium()
1422
request = SmartClientHTTPMediumRequest(medium)
1423
request.accept_bytes('abc')
1424
request.accept_bytes('def')
1425
self.assertEqual(None, medium.written_request)
1426
request.finished_writing()
1427
self.assertEqual('abcdef', medium.written_request)
1429
def _test_http_send_smart_request(self, url_protocol):
1430
http_server = HTTPServerWithSmarts()
1431
http_server._url_protocol = url_protocol
1433
self.addCleanup(http_server.tearDown)
1435
post_body = 'hello\n'
1436
expected_reply_body = 'ok\x011\n'
1438
http_transport = get_transport(http_server.get_url())
1439
medium = http_transport.get_smart_medium()
1440
response = medium.send_http_smart_request(post_body)
1441
reply_body = response.read()
1442
self.assertEqual(expected_reply_body, reply_body)
1444
def test_http_send_smart_request_pycurl(self):
1446
self._test_http_send_smart_request('http+pycurl')
1447
except errors.UnsupportedProtocol, e:
1448
raise tests.TestSkipped(str(e))
1450
def test_http_send_smart_request_urllib(self):
1451
self._test_http_send_smart_request('http+urllib')
1453
def test_http_server_with_smarts(self):
1454
http_server = HTTPServerWithSmarts()
1456
self.addCleanup(http_server.tearDown)
1458
post_body = 'hello\n'
1459
expected_reply_body = 'ok\x011\n'
1461
smart_server_url = http_server.get_url() + '.bzr/smart'
1462
reply = urllib2.urlopen(smart_server_url, post_body).read()
1464
self.assertEqual(expected_reply_body, reply)
1466
def test_smart_http_server_post_request_handler(self):
1467
http_server = HTTPServerWithSmarts()
1469
self.addCleanup(http_server.tearDown)
1470
httpd = http_server._get_httpd()
1472
socket = SampleSocket(
1473
'POST /.bzr/smart HTTP/1.0\r\n'
1474
# HTTP/1.0 posts must have a Content-Length.
1475
'Content-Length: 6\r\n'
1478
request_handler = SmartRequestHandler(
1479
socket, ('localhost', 80), httpd)
1480
response = socket.writefile.getvalue()
1481
self.assertStartsWith(response, 'HTTP/1.0 200 ')
1482
# This includes the end of the HTTP headers, and all the body.
1483
expected_end_of_response = '\r\n\r\nok\x011\n'
1484
self.assertEndsWith(response, expected_end_of_response)
1487
class SampleSocket(object):
1488
"""A socket-like object for use in testing the HTTP request handler."""
1490
def __init__(self, socket_read_content):
1491
"""Constructs a sample socket.
1493
:param socket_read_content: a byte sequence
1495
# Use plain python StringIO so we can monkey-patch the close method to
1496
# not discard the contents.
1497
from StringIO import StringIO
1498
self.readfile = StringIO(socket_read_content)
1499
self.writefile = StringIO()
1500
self.writefile.close = lambda: None
1502
def makefile(self, mode='r', bufsize=None):
1504
return self.readfile
1506
return self.writefile
1509
# TODO: Client feature that does get_bundle and then installs that into a
1510
# branch; this can be used in place of the regular pull/fetch operation when
1511
# coming from a smart server.
1513
# TODO: Eventually, want to do a 'branch' command by fetching the whole
1514
# history as one big bundle. How?
1516
# The branch command does 'br_from.sprout', which tries to preserve the same
1517
# format. We don't necessarily even want that.
1519
# It might be simpler to handle cmd_pull first, which does a simpler fetch()
1520
# operation from one branch into another. It already has some code for
1521
# pulling from a bundle, which it does by trying to see if the destination is
1522
# a bundle file. So it seems the logic for pull ought to be:
1524
# - if it's a smart server, get a bundle from there and install that
1525
# - if it's a bundle, install that
1526
# - if it's a branch, pull from there
1528
# Getting a bundle from a smart server is a bit different from reading a
1529
# bundle from a URL:
1531
# - we can reasonably remember the URL we last read from
1532
# - you can specify a revision number to pull, and we need to pass it across
1533
# to the server as a limit on what will be requested
1535
# TODO: Given a URL, determine whether it is a smart server or not (or perhaps
1536
# otherwise whether it's a bundle?) Should this be a property or method of
1537
# the transport? For the ssh protocol, we always know it's a smart server.
1538
# For http, we potentially need to probe. But if we're explicitly given
1539
# bzr+http:// then we can skip that for now.