73
71
transport._clear_protocol_handlers()
74
72
transport.register_transport_proto('foo')
75
73
transport.register_lazy_transport('foo',
76
'breezy.tests.test_transport',
77
'TestTransport.SampleHandler')
74
'breezy.tests.test_transport',
75
'TestTransport.SampleHandler')
78
76
transport.register_transport_proto('bar')
79
77
transport.register_lazy_transport('bar',
80
'breezy.tests.test_transport',
81
'TestTransport.SampleHandler')
78
'breezy.tests.test_transport',
79
'TestTransport.SampleHandler')
82
80
self.assertEqual([SampleHandler.__module__,
83
'breezy.transport.chroot',
84
'breezy.transport.pathfilter'],
85
transport._get_transport_modules())
81
'breezy.transport.chroot',
82
'breezy.transport.pathfilter'],
83
transport._get_transport_modules())
87
85
def test_transport_dependency(self):
88
86
"""Transport with missing dependency causes no error"""
123
120
transport.get_transport_from_url('ssh://fooserver/foo')
124
121
except errors.UnsupportedProtocol as e:
126
self.assertEqual('Unsupported protocol'
127
' for url "ssh://fooserver/foo":'
128
' bzr supports bzr+ssh to operate over ssh,'
129
' use "bzr+ssh://fooserver/foo".',
123
'Unsupported protocol'
124
' for url "ssh://fooserver/foo":'
125
' Use bzr+ssh for Bazaar operations over SSH, '
126
'e.g. "bzr+ssh://fooserver/foo". Use git+ssh '
127
'for Git operations over SSH, e.g. "git+ssh://fooserver/foo".',
132
130
self.fail('Did not raise UnsupportedProtocol')
180
178
def test_coalesce_overlapped(self):
181
179
self.assertRaises(ValueError,
182
self.check, [(0, 15, [(0, 10), (5, 10)])],
180
self.check, [(0, 15, [(0, 10), (5, 10)])],
185
183
def test_coalesce_limit(self):
186
184
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
187
185
(30, 10), (40, 10)]),
188
186
(60, 50, [(0, 10), (10, 10), (20, 10),
189
187
(30, 10), (40, 10)]),
190
], [(10, 10), (20, 10), (30, 10), (40, 10),
191
(50, 10), (60, 10), (70, 10), (80, 10),
192
(90, 10), (100, 10)],
188
], [(10, 10), (20, 10), (30, 10), (40, 10),
189
(50, 10), (60, 10), (70, 10), (80, 10),
190
(90, 10), (100, 10)],
195
193
def test_coalesce_no_limit(self):
196
194
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
197
195
(30, 10), (40, 10), (50, 10),
198
196
(60, 10), (70, 10), (80, 10),
200
], [(10, 10), (20, 10), (30, 10), (40, 10),
201
(50, 10), (60, 10), (70, 10), (80, 10),
202
(90, 10), (100, 10)])
198
], [(10, 10), (20, 10), (30, 10), (40, 10),
199
(50, 10), (60, 10), (70, 10), (80, 10),
200
(90, 10), (100, 10)])
204
202
def test_coalesce_fudge(self):
205
203
self.check([(10, 30, [(0, 10), (20, 10)]),
206
204
(100, 10, [(0, 10)]),
207
], [(10, 10), (30, 10), (100, 10)],
205
], [(10, 10), (30, 10), (100, 10)],
210
208
def test_coalesce_max_size(self):
212
210
(30, 50, [(0, 50)]),
213
211
# If one range is above max_size, it gets its own coalesced
215
(100, 80, [(0, 80)]),],
213
(100, 80, [(0, 80)]), ],
216
214
[(10, 10), (20, 10), (30, 50), (100, 80)],
219
217
def test_coalesce_no_max_size(self):
220
218
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
221
219
[(10, 10), (20, 10), (30, 50), (80, 100)],
224
222
def test_coalesce_default_limit(self):
225
223
# By default we use a 100MB max size.
226
224
ten_mb = 10 * 1024 * 1024
227
self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
228
(10*ten_mb, ten_mb, [(0, ten_mb)])],
229
[(i*ten_mb, ten_mb) for i in range(11)])
230
self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
231
[(i * ten_mb, ten_mb) for i in range(11)],
232
max_size=1*1024*1024*1024)
226
[(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
227
(10 * ten_mb, ten_mb, [(0, ten_mb)])],
228
[(i * ten_mb, ten_mb) for i in range(11)])
230
[(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
231
[(i * ten_mb, ten_mb) for i in range(11)],
232
max_size=1 * 1024 * 1024 * 1024)
235
235
class TestMemoryServer(tests.TestCase):
457
457
"""Check all expected transport hook points are set up"""
458
458
hookpoint = transport.TransportHooks()
459
459
self.assertTrue("post_connect" in hookpoint,
460
"post_connect not in %s" % (hookpoint,))
460
"post_connect not in %s" % (hookpoint,))
462
462
def test_post_connect(self):
463
463
"""Ensure the post_connect hook is called when _set_transport is"""
465
465
transport.Transport.hooks.install_named_hook("post_connect",
467
467
t = self._get_connected_transport()
468
468
self.assertLength(0, calls)
469
469
t._set_connection("connection", "auth")
923
926
def test_reuse_same_transport(self):
924
927
possible_transports = []
925
t1 = transport.get_transport_from_url('http://foo/',
926
possible_transports=possible_transports)
928
t1 = transport.get_transport_from_url(
929
'http://foo/', possible_transports=possible_transports)
927
930
self.assertEqual([t1], possible_transports)
928
931
t2 = transport.get_transport_from_url('http://foo/',
929
possible_transports=[t1])
932
possible_transports=[t1])
930
933
self.assertIs(t1, t2)
932
935
# Also check that final '/' are handled correctly
933
936
t3 = transport.get_transport_from_url('http://foo/path/')
934
937
t4 = transport.get_transport_from_url('http://foo/path',
935
possible_transports=[t3])
938
possible_transports=[t3])
936
939
self.assertIs(t3, t4)
938
941
t5 = transport.get_transport_from_url('http://foo/path')
939
942
t6 = transport.get_transport_from_url('http://foo/path/',
940
possible_transports=[t5])
943
possible_transports=[t5])
941
944
self.assertIs(t5, t6)
943
946
def test_don_t_reuse_different_transport(self):
944
947
t1 = transport.get_transport_from_url('http://foo/path')
945
948
t2 = transport.get_transport_from_url('http://bar/path',
946
possible_transports=[t1])
949
possible_transports=[t1])
947
950
self.assertIsNot(t1, t2)
1103
1107
result = http.unhtml_roughly(fake_html)
1104
1108
self.assertEqual(len(result), 1000)
1105
1109
self.assertStartsWith(result, " something!")
1108
class SomeDirectory(object):
1110
def look_up(self, name, url):
1114
class TestLocationToUrl(tests.TestCase):
1116
def get_base_location(self):
1117
path = osutils.abspath('/foo/bar')
1118
if path.startswith('/'):
1119
url = 'file://%s' % (path,)
1121
# On Windows, abspaths start with the drive letter, so we have to
1122
# add in the extra '/'
1123
url = 'file:///%s' % (path,)
1126
def test_regular_url(self):
1127
self.assertEqual("file://foo", location_to_url("file://foo"))
1129
def test_directory(self):
1130
directories.register("bar:", SomeDirectory, "Dummy directory")
1131
self.addCleanup(directories.remove, "bar:")
1132
self.assertEqual("http://bar", location_to_url("bar:"))
1134
def test_unicode_url(self):
1135
self.assertRaises(urlutils.InvalidURL, location_to_url,
1136
b"http://fo/\xc3\xaf".decode("utf-8"))
1138
def test_unicode_path(self):
1139
path, url = self.get_base_location()
1140
location = path + b"\xc3\xaf".decode("utf-8")
1142
self.assertEqual(url, location_to_url(location))
1144
def test_path(self):
1145
path, url = self.get_base_location()
1146
self.assertEqual(url, location_to_url(path))
1148
def test_relative_file_url(self):
1149
self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
1150
location_to_url("file:bar"))
1152
def test_absolute_file_url(self):
1153
self.assertEqual("file:///bar", location_to_url("file:/bar"))