73
69
transport._clear_protocol_handlers()
74
70
transport.register_transport_proto('foo')
75
71
transport.register_lazy_transport('foo',
76
'breezy.tests.test_transport',
77
'TestTransport.SampleHandler')
72
'breezy.tests.test_transport',
73
'TestTransport.SampleHandler')
78
74
transport.register_transport_proto('bar')
79
75
transport.register_lazy_transport('bar',
80
'breezy.tests.test_transport',
81
'TestTransport.SampleHandler')
76
'breezy.tests.test_transport',
77
'TestTransport.SampleHandler')
82
78
self.assertEqual([SampleHandler.__module__,
83
'breezy.transport.chroot',
84
'breezy.transport.pathfilter'],
85
transport._get_transport_modules())
79
'breezy.transport.chroot',
80
'breezy.transport.pathfilter'],
81
transport._get_transport_modules())
87
83
def test_transport_dependency(self):
88
84
"""Transport with missing dependency causes no error"""
123
118
transport.get_transport_from_url('ssh://fooserver/foo')
124
119
except errors.UnsupportedProtocol as e:
126
self.assertEqual('Unsupported protocol'
127
' for url "ssh://fooserver/foo":'
128
' bzr supports bzr+ssh to operate over ssh,'
129
' use "bzr+ssh://fooserver/foo".',
121
'Unsupported protocol'
122
' for url "ssh://fooserver/foo":'
123
' Use bzr+ssh for Bazaar operations over SSH, '
124
'e.g. "bzr+ssh://fooserver/foo". Use git+ssh '
125
'for Git operations over SSH, e.g. "git+ssh://fooserver/foo".',
132
128
self.fail('Did not raise UnsupportedProtocol')
180
176
def test_coalesce_overlapped(self):
181
177
self.assertRaises(ValueError,
182
self.check, [(0, 15, [(0, 10), (5, 10)])],
178
self.check, [(0, 15, [(0, 10), (5, 10)])],
185
181
def test_coalesce_limit(self):
186
182
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
187
183
(30, 10), (40, 10)]),
188
184
(60, 50, [(0, 10), (10, 10), (20, 10),
189
185
(30, 10), (40, 10)]),
190
], [(10, 10), (20, 10), (30, 10), (40, 10),
191
(50, 10), (60, 10), (70, 10), (80, 10),
192
(90, 10), (100, 10)],
186
], [(10, 10), (20, 10), (30, 10), (40, 10),
187
(50, 10), (60, 10), (70, 10), (80, 10),
188
(90, 10), (100, 10)],
195
191
def test_coalesce_no_limit(self):
196
192
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
197
193
(30, 10), (40, 10), (50, 10),
198
194
(60, 10), (70, 10), (80, 10),
200
], [(10, 10), (20, 10), (30, 10), (40, 10),
201
(50, 10), (60, 10), (70, 10), (80, 10),
202
(90, 10), (100, 10)])
196
], [(10, 10), (20, 10), (30, 10), (40, 10),
197
(50, 10), (60, 10), (70, 10), (80, 10),
198
(90, 10), (100, 10)])
204
200
def test_coalesce_fudge(self):
205
201
self.check([(10, 30, [(0, 10), (20, 10)]),
206
202
(100, 10, [(0, 10)]),
207
], [(10, 10), (30, 10), (100, 10)],
203
], [(10, 10), (30, 10), (100, 10)],
210
206
def test_coalesce_max_size(self):
212
208
(30, 50, [(0, 50)]),
213
209
# If one range is above max_size, it gets its own coalesced
215
(100, 80, [(0, 80)]),],
211
(100, 80, [(0, 80)]), ],
216
212
[(10, 10), (20, 10), (30, 50), (100, 80)],
219
215
def test_coalesce_no_max_size(self):
220
216
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
221
217
[(10, 10), (20, 10), (30, 50), (80, 100)],
224
220
def test_coalesce_default_limit(self):
225
221
# By default we use a 100MB max size.
226
222
ten_mb = 10 * 1024 * 1024
227
self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
228
(10*ten_mb, ten_mb, [(0, ten_mb)])],
229
[(i*ten_mb, ten_mb) for i in range(11)])
230
self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
231
[(i * ten_mb, ten_mb) for i in range(11)],
232
max_size=1*1024*1024*1024)
224
[(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
225
(10 * ten_mb, ten_mb, [(0, ten_mb)])],
226
[(i * ten_mb, ten_mb) for i in range(11)])
228
[(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
229
[(i * ten_mb, ten_mb) for i in range(11)],
230
max_size=1 * 1024 * 1024 * 1024)
235
233
class TestMemoryServer(tests.TestCase):
273
271
def test_append_and_get(self):
274
272
t = memory.MemoryTransport()
275
t.append_bytes('path', 'content')
276
self.assertEqual(t.get('path').read(), 'content')
273
t.append_bytes('path', b'content')
274
self.assertEqual(t.get('path').read(), b'content')
277
275
t.append_file('path', BytesIO(b'content'))
278
self.assertEqual(t.get('path').read(), 'contentcontent')
276
with t.get('path') as f:
277
self.assertEqual(f.read(), b'contentcontent')
280
279
def test_put_and_get(self):
281
280
t = memory.MemoryTransport()
282
281
t.put_file('path', BytesIO(b'content'))
283
self.assertEqual(t.get('path').read(), 'content')
284
t.put_bytes('path', 'content')
285
self.assertEqual(t.get('path').read(), 'content')
282
self.assertEqual(t.get('path').read(), b'content')
283
t.put_bytes('path', b'content')
284
self.assertEqual(t.get('path').read(), b'content')
287
286
def test_append_without_dir_fails(self):
288
287
t = memory.MemoryTransport()
289
288
self.assertRaises(errors.NoSuchFile,
290
t.append_bytes, 'dir/path', 'content')
289
t.append_bytes, 'dir/path', b'content')
292
291
def test_put_without_dir_fails(self):
293
292
t = memory.MemoryTransport()
305
304
def test_has_present(self):
306
305
t = memory.MemoryTransport()
307
t.append_bytes('foo', 'content')
306
t.append_bytes('foo', b'content')
308
307
self.assertEqual(True, t.has('foo'))
310
309
def test_list_dir(self):
311
310
t = memory.MemoryTransport()
312
t.put_bytes('foo', 'content')
311
t.put_bytes('foo', b'content')
314
t.put_bytes('dir/subfoo', 'content')
315
t.put_bytes('dirlike', 'content')
313
t.put_bytes('dir/subfoo', b'content')
314
t.put_bytes('dirlike', b'content')
317
316
self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
318
317
self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
340
340
def test_iter_files_recursive(self):
341
341
t = memory.MemoryTransport()
343
t.put_bytes('dir/foo', 'content')
344
t.put_bytes('dir/bar', 'content')
345
t.put_bytes('bar', 'content')
343
t.put_bytes('dir/foo', b'content')
344
t.put_bytes('dir/bar', b'content')
345
t.put_bytes('bar', b'content')
346
346
paths = set(t.iter_files_recursive())
347
347
self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
349
349
def test_stat(self):
350
350
t = memory.MemoryTransport()
351
t.put_bytes('foo', 'content')
352
t.put_bytes('bar', 'phowar')
351
t.put_bytes('foo', b'content')
352
t.put_bytes('bar', b'phowar')
353
353
self.assertEqual(7, t.stat('foo').st_size)
354
354
self.assertEqual(6, t.stat('bar').st_size)
455
455
"""Check all expected transport hook points are set up"""
456
456
hookpoint = transport.TransportHooks()
457
457
self.assertTrue("post_connect" in hookpoint,
458
"post_connect not in %s" % (hookpoint,))
458
"post_connect not in %s" % (hookpoint,))
460
460
def test_post_connect(self):
461
461
"""Ensure the post_connect hook is called when _set_transport is"""
463
463
transport.Transport.hooks.install_named_hook("post_connect",
465
465
t = self._get_connected_transport()
466
466
self.assertLength(0, calls)
467
467
t._set_connection("connection", "auth")
921
924
def test_reuse_same_transport(self):
922
925
possible_transports = []
923
t1 = transport.get_transport_from_url('http://foo/',
924
possible_transports=possible_transports)
926
t1 = transport.get_transport_from_url(
927
'http://foo/', possible_transports=possible_transports)
925
928
self.assertEqual([t1], possible_transports)
926
929
t2 = transport.get_transport_from_url('http://foo/',
927
possible_transports=[t1])
930
possible_transports=[t1])
928
931
self.assertIs(t1, t2)
930
933
# Also check that final '/' are handled correctly
931
934
t3 = transport.get_transport_from_url('http://foo/path/')
932
935
t4 = transport.get_transport_from_url('http://foo/path',
933
possible_transports=[t3])
936
possible_transports=[t3])
934
937
self.assertIs(t3, t4)
936
939
t5 = transport.get_transport_from_url('http://foo/path')
937
940
t6 = transport.get_transport_from_url('http://foo/path/',
938
possible_transports=[t5])
941
possible_transports=[t5])
939
942
self.assertIs(t5, t6)
941
944
def test_don_t_reuse_different_transport(self):
942
945
t1 = transport.get_transport_from_url('http://foo/path')
943
946
t2 = transport.get_transport_from_url('http://bar/path',
944
possible_transports=[t1])
947
possible_transports=[t1])
945
948
self.assertIsNot(t1, t2)
1100
1105
result = http.unhtml_roughly(fake_html)
1101
1106
self.assertEqual(len(result), 1000)
1102
1107
self.assertStartsWith(result, " something!")
1105
class SomeDirectory(object):
1107
def look_up(self, name, url):
1111
class TestLocationToUrl(tests.TestCase):
1113
def get_base_location(self):
1114
path = osutils.abspath('/foo/bar')
1115
if path.startswith('/'):
1116
url = 'file://%s' % (path,)
1118
# On Windows, abspaths start with the drive letter, so we have to
1119
# add in the extra '/'
1120
url = 'file:///%s' % (path,)
1123
def test_regular_url(self):
1124
self.assertEqual("file://foo", location_to_url("file://foo"))
1126
def test_directory(self):
1127
directories.register("bar:", SomeDirectory, "Dummy directory")
1128
self.addCleanup(directories.remove, "bar:")
1129
self.assertEqual("http://bar", location_to_url("bar:"))
1131
def test_unicode_url(self):
1132
self.assertRaises(urlutils.InvalidURL, location_to_url,
1133
"http://fo/\xc3\xaf".decode("utf-8"))
1135
def test_unicode_path(self):
1136
path, url = self.get_base_location()
1137
location = path + "\xc3\xaf".decode("utf-8")
1139
self.assertEqual(url, location_to_url(location))
1141
def test_path(self):
1142
path, url = self.get_base_location()
1143
self.assertEqual(url, location_to_url(path))
1145
def test_relative_file_url(self):
1146
self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
1147
location_to_url("file:bar"))
1149
def test_absolute_file_url(self):
1150
self.assertEqual("file:///bar", location_to_url("file:/bar"))