87
89
if self._chroot_server is None:
88
90
backing_transport = tests.TestCaseWithTransport.get_transport(self)
89
91
self._chroot_server = chroot.ChrootServer(backing_transport)
90
self._chroot_server.setUp()
91
self.addCleanup(self._chroot_server.tearDown)
92
self.start_server(self._chroot_server)
92
93
t = get_transport(self._chroot_server.get_url())
93
94
if relpath is not None:
94
95
t = t.clone(relpath)
98
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
99
class TestCaseWithSmartMedium(tests.TestCaseWithMemoryTransport):
101
102
super(TestCaseWithSmartMedium, self).setUp()
113
114
return self.get_transport().get_smart_medium()
117
class TestByteStreamToStream(tests.TestCase):
119
def test_repeated_substreams_same_kind_are_one_stream(self):
120
# Make a stream - an iterable of bytestrings.
121
stream = [('text', [versionedfile.FulltextContentFactory(('k1',), None,
122
None, 'foo')]),('text', [
123
versionedfile.FulltextContentFactory(('k2',), None, None, 'bar')])]
124
fmt = bzrdir.format_registry.get('pack-0.92')().repository_format
125
bytes = smart.repository._stream_to_byte_stream(stream, fmt)
127
# Iterate the resulting iterable; checking that we get only one stream
129
fmt, stream = smart.repository._byte_stream_to_stream(bytes)
130
for kind, substream in stream:
131
streams.append((kind, list(substream)))
132
self.assertLength(1, streams)
133
self.assertLength(2, streams[0][1])
116
136
class TestSmartServerResponse(tests.TestCase):
118
138
def test__eq__(self):
394
414
'False', 'False', 'False', '', '', '', '', 'False')
417
class TestSmartServerRequestOpenBzrDir(tests.TestCaseWithMemoryTransport):
419
def test_no_directory(self):
420
backing = self.get_transport()
421
request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing)
422
self.assertEqual(SmartServerResponse(('no', )),
423
request.execute('does-not-exist'))
425
def test_empty_directory(self):
426
backing = self.get_transport()
427
backing.mkdir('empty')
428
request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing)
429
self.assertEqual(SmartServerResponse(('no', )),
430
request.execute('empty'))
432
def test_outside_root_client_path(self):
433
backing = self.get_transport()
434
request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing,
435
root_client_path='root')
436
self.assertEqual(SmartServerResponse(('no', )),
437
request.execute('not-root'))
440
class TestSmartServerRequestOpenBzrDir_2_1(tests.TestCaseWithMemoryTransport):
442
def test_no_directory(self):
443
backing = self.get_transport()
444
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
445
self.assertEqual(SmartServerResponse(('no', )),
446
request.execute('does-not-exist'))
448
def test_empty_directory(self):
449
backing = self.get_transport()
450
backing.mkdir('empty')
451
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
452
self.assertEqual(SmartServerResponse(('no', )),
453
request.execute('empty'))
455
def test_present_without_workingtree(self):
456
backing = self.get_transport()
457
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
458
self.make_bzrdir('.')
459
self.assertEqual(SmartServerResponse(('yes', 'no')),
462
def test_outside_root_client_path(self):
463
backing = self.get_transport()
464
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing,
465
root_client_path='root')
466
self.assertEqual(SmartServerResponse(('no',)),
467
request.execute('not-root'))
470
class TestSmartServerRequestOpenBzrDir_2_1_disk(TestCaseWithChrootedTransport):
472
def test_present_with_workingtree(self):
473
self.vfs_transport_factory = local.LocalURLServer
474
backing = self.get_transport()
475
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
476
bd = self.make_bzrdir('.')
477
bd.create_repository()
479
bd.create_workingtree()
480
self.assertEqual(SmartServerResponse(('yes', 'yes')),
397
484
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
399
486
def test_no_branch(self):