69
73
transport._clear_protocol_handlers()
70
74
transport.register_transport_proto('foo')
71
75
transport.register_lazy_transport('foo',
72
'breezy.tests.test_transport',
73
'TestTransport.SampleHandler')
76
'breezy.tests.test_transport',
77
'TestTransport.SampleHandler')
74
78
transport.register_transport_proto('bar')
75
79
transport.register_lazy_transport('bar',
76
'breezy.tests.test_transport',
77
'TestTransport.SampleHandler')
80
'breezy.tests.test_transport',
81
'TestTransport.SampleHandler')
78
82
self.assertEqual([SampleHandler.__module__,
79
'breezy.transport.chroot',
80
'breezy.transport.pathfilter'],
81
transport._get_transport_modules())
83
'breezy.transport.chroot',
84
'breezy.transport.pathfilter'],
85
transport._get_transport_modules())
83
87
def test_transport_dependency(self):
84
88
"""Transport with missing dependency causes no error"""
118
123
transport.get_transport_from_url('ssh://fooserver/foo')
119
124
except errors.UnsupportedProtocol as e:
121
'Unsupported protocol'
122
' for url "ssh://fooserver/foo":'
123
' Use bzr+ssh for Bazaar operations over SSH, '
124
'e.g. "bzr+ssh://fooserver/foo". Use git+ssh '
125
'for Git operations over SSH, e.g. "git+ssh://fooserver/foo".',
126
self.assertEqual('Unsupported protocol'
127
' for url "ssh://fooserver/foo":'
128
' bzr supports bzr+ssh to operate over ssh,'
129
' use "bzr+ssh://fooserver/foo".',
128
132
self.fail('Did not raise UnsupportedProtocol')
176
180
def test_coalesce_overlapped(self):
177
181
self.assertRaises(ValueError,
178
self.check, [(0, 15, [(0, 10), (5, 10)])],
182
self.check, [(0, 15, [(0, 10), (5, 10)])],
181
185
def test_coalesce_limit(self):
182
186
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
183
187
(30, 10), (40, 10)]),
184
188
(60, 50, [(0, 10), (10, 10), (20, 10),
185
189
(30, 10), (40, 10)]),
186
], [(10, 10), (20, 10), (30, 10), (40, 10),
187
(50, 10), (60, 10), (70, 10), (80, 10),
188
(90, 10), (100, 10)],
190
], [(10, 10), (20, 10), (30, 10), (40, 10),
191
(50, 10), (60, 10), (70, 10), (80, 10),
192
(90, 10), (100, 10)],
191
195
def test_coalesce_no_limit(self):
192
196
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
193
197
(30, 10), (40, 10), (50, 10),
194
198
(60, 10), (70, 10), (80, 10),
196
], [(10, 10), (20, 10), (30, 10), (40, 10),
197
(50, 10), (60, 10), (70, 10), (80, 10),
198
(90, 10), (100, 10)])
200
], [(10, 10), (20, 10), (30, 10), (40, 10),
201
(50, 10), (60, 10), (70, 10), (80, 10),
202
(90, 10), (100, 10)])
200
204
def test_coalesce_fudge(self):
201
205
self.check([(10, 30, [(0, 10), (20, 10)]),
202
206
(100, 10, [(0, 10)]),
203
], [(10, 10), (30, 10), (100, 10)],
207
], [(10, 10), (30, 10), (100, 10)],
206
210
def test_coalesce_max_size(self):
208
212
(30, 50, [(0, 50)]),
209
213
# If one range is above max_size, it gets its own coalesced
211
(100, 80, [(0, 80)]), ],
215
(100, 80, [(0, 80)]),],
212
216
[(10, 10), (20, 10), (30, 50), (100, 80)],
215
219
def test_coalesce_no_max_size(self):
216
220
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
217
221
[(10, 10), (20, 10), (30, 50), (80, 100)],
220
224
def test_coalesce_default_limit(self):
221
225
# By default we use a 100MB max size.
222
226
ten_mb = 10 * 1024 * 1024
224
[(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
225
(10 * ten_mb, ten_mb, [(0, ten_mb)])],
226
[(i * ten_mb, ten_mb) for i in range(11)])
228
[(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
229
[(i * ten_mb, ten_mb) for i in range(11)],
230
max_size=1 * 1024 * 1024 * 1024)
227
self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
228
(10*ten_mb, ten_mb, [(0, ten_mb)])],
229
[(i*ten_mb, ten_mb) for i in range(11)])
230
self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
231
[(i * ten_mb, ten_mb) for i in range(11)],
232
max_size=1*1024*1024*1024)
233
235
class TestMemoryServer(tests.TestCase):
271
273
def test_append_and_get(self):
272
274
t = memory.MemoryTransport()
273
t.append_bytes('path', b'content')
274
self.assertEqual(t.get('path').read(), b'content')
275
t.append_bytes('path', 'content')
276
self.assertEqual(t.get('path').read(), 'content')
275
277
t.append_file('path', BytesIO(b'content'))
276
with t.get('path') as f:
277
self.assertEqual(f.read(), b'contentcontent')
278
self.assertEqual(t.get('path').read(), 'contentcontent')
279
280
def test_put_and_get(self):
280
281
t = memory.MemoryTransport()
281
282
t.put_file('path', BytesIO(b'content'))
282
self.assertEqual(t.get('path').read(), b'content')
283
t.put_bytes('path', b'content')
284
self.assertEqual(t.get('path').read(), b'content')
283
self.assertEqual(t.get('path').read(), 'content')
284
t.put_bytes('path', 'content')
285
self.assertEqual(t.get('path').read(), 'content')
286
287
def test_append_without_dir_fails(self):
287
288
t = memory.MemoryTransport()
288
289
self.assertRaises(errors.NoSuchFile,
289
t.append_bytes, 'dir/path', b'content')
290
t.append_bytes, 'dir/path', 'content')
291
292
def test_put_without_dir_fails(self):
292
293
t = memory.MemoryTransport()
304
305
def test_has_present(self):
305
306
t = memory.MemoryTransport()
306
t.append_bytes('foo', b'content')
307
t.append_bytes('foo', 'content')
307
308
self.assertEqual(True, t.has('foo'))
309
310
def test_list_dir(self):
310
311
t = memory.MemoryTransport()
311
t.put_bytes('foo', b'content')
312
t.put_bytes('foo', 'content')
313
t.put_bytes('dir/subfoo', b'content')
314
t.put_bytes('dirlike', b'content')
314
t.put_bytes('dir/subfoo', 'content')
315
t.put_bytes('dirlike', 'content')
316
317
self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
317
318
self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
340
340
def test_iter_files_recursive(self):
341
341
t = memory.MemoryTransport()
343
t.put_bytes('dir/foo', b'content')
344
t.put_bytes('dir/bar', b'content')
345
t.put_bytes('bar', b'content')
343
t.put_bytes('dir/foo', 'content')
344
t.put_bytes('dir/bar', 'content')
345
t.put_bytes('bar', 'content')
346
346
paths = set(t.iter_files_recursive())
347
347
self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
349
349
def test_stat(self):
350
350
t = memory.MemoryTransport()
351
t.put_bytes('foo', b'content')
352
t.put_bytes('bar', b'phowar')
351
t.put_bytes('foo', 'content')
352
t.put_bytes('bar', 'phowar')
353
353
self.assertEqual(7, t.stat('foo').st_size)
354
354
self.assertEqual(6, t.stat('bar').st_size)
455
455
"""Check all expected transport hook points are set up"""
456
456
hookpoint = transport.TransportHooks()
457
457
self.assertTrue("post_connect" in hookpoint,
458
"post_connect not in %s" % (hookpoint,))
458
"post_connect not in %s" % (hookpoint,))
460
460
def test_post_connect(self):
461
461
"""Ensure the post_connect hook is called when _set_transport is"""
463
463
transport.Transport.hooks.install_named_hook("post_connect",
465
465
t = self._get_connected_transport()
466
466
self.assertLength(0, calls)
467
467
t._set_connection("connection", "auth")
708
706
t = transport.get_transport_from_path(self.test_dir)
709
707
self.assertIsInstance(t, local.LocalTransport)
710
708
self.assertEqual(t.base.rstrip("/"),
711
urlutils.local_path_to_url(self.test_dir))
709
urlutils.local_path_to_url(self.test_dir))
713
711
def test_with_url(self):
714
712
t = transport.get_transport_from_path("file:")
715
713
self.assertIsInstance(t, local.LocalTransport)
714
self.assertEqual(t.base.rstrip("/"),
718
715
urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
721
718
class TestTransportFromUrl(tests.TestCaseInTempDir):
723
720
def test_with_path(self):
724
self.assertRaises(urlutils.InvalidURL, transport.get_transport_from_url,
721
self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
727
724
def test_with_url(self):
728
725
url = urlutils.local_path_to_url(self.test_dir)
924
921
def test_reuse_same_transport(self):
925
922
possible_transports = []
926
t1 = transport.get_transport_from_url(
927
'http://foo/', possible_transports=possible_transports)
923
t1 = transport.get_transport_from_url('http://foo/',
924
possible_transports=possible_transports)
928
925
self.assertEqual([t1], possible_transports)
929
926
t2 = transport.get_transport_from_url('http://foo/',
930
possible_transports=[t1])
927
possible_transports=[t1])
931
928
self.assertIs(t1, t2)
933
930
# Also check that final '/' are handled correctly
934
931
t3 = transport.get_transport_from_url('http://foo/path/')
935
932
t4 = transport.get_transport_from_url('http://foo/path',
936
possible_transports=[t3])
933
possible_transports=[t3])
937
934
self.assertIs(t3, t4)
939
936
t5 = transport.get_transport_from_url('http://foo/path')
940
937
t6 = transport.get_transport_from_url('http://foo/path/',
941
possible_transports=[t5])
938
possible_transports=[t5])
942
939
self.assertIs(t5, t6)
944
941
def test_don_t_reuse_different_transport(self):
945
942
t1 = transport.get_transport_from_url('http://foo/path')
946
943
t2 = transport.get_transport_from_url('http://bar/path',
947
possible_transports=[t1])
944
possible_transports=[t1])
948
945
self.assertIsNot(t1, t2)
1105
1100
result = http.unhtml_roughly(fake_html)
1106
1101
self.assertEqual(len(result), 1000)
1107
1102
self.assertStartsWith(result, " something!")
1105
class SomeDirectory(object):
1107
def look_up(self, name, url):
1111
class TestLocationToUrl(tests.TestCase):
1113
def get_base_location(self):
1114
path = osutils.abspath('/foo/bar')
1115
if path.startswith('/'):
1116
url = 'file://%s' % (path,)
1118
# On Windows, abspaths start with the drive letter, so we have to
1119
# add in the extra '/'
1120
url = 'file:///%s' % (path,)
1123
def test_regular_url(self):
1124
self.assertEqual("file://foo", location_to_url("file://foo"))
1126
def test_directory(self):
1127
directories.register("bar:", SomeDirectory, "Dummy directory")
1128
self.addCleanup(directories.remove, "bar:")
1129
self.assertEqual("http://bar", location_to_url("bar:"))
1131
def test_unicode_url(self):
1132
self.assertRaises(errors.InvalidURL, location_to_url,
1133
"http://fo/\xc3\xaf".decode("utf-8"))
1135
def test_unicode_path(self):
1136
path, url = self.get_base_location()
1137
location = path + "\xc3\xaf".decode("utf-8")
1139
self.assertEqual(url, location_to_url(location))
1141
def test_path(self):
1142
path, url = self.get_base_location()
1143
self.assertEqual(url, location_to_url(path))
1145
def test_relative_file_url(self):
1146
self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
1147
location_to_url("file:bar"))
1149
def test_absolute_file_url(self):
1150
self.assertEqual("file:///bar", location_to_url("file:/bar"))