1022
1033
except NotImplementedError:
1023
1034
raise TestSkipped("Transport %s has no bogus URL support." %
1024
1035
self._server.__class__)
1025
# This should be: but SSH still connects on construction. No COOKIE!
1026
# self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1028
t = bzrlib.transport.get_transport(url)
1029
t.get('.bzr/branch')
1030
except (ConnectionError, NoSuchFile), e:
1032
except (Exception), e:
1033
self.fail('Wrong exception thrown (%s.%s): %s'
1034
% (e.__class__.__module__, e.__class__.__name__, e))
1036
self.fail('Did not get the expected ConnectionError or NoSuchFile.')
1036
t = get_transport(url)
1037
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1038
1039
def test_stat(self):
1039
1040
# TODO: Test stat, just try once, and if it throws, stop testing
1129
1130
self.assertEqual(['%25'], names)
1130
1131
self.assertIsInstance(names[0], str)
1133
def test_clone_preserve_info(self):
1134
t1 = self.get_transport()
1135
if not isinstance(t1, ConnectedTransport):
1136
raise TestSkipped("not a connected transport")
1138
t2 = t1.clone('subdir')
1139
self.assertEquals(t1._scheme, t2._scheme)
1140
self.assertEquals(t1._user, t2._user)
1141
self.assertEquals(t1._password, t2._password)
1142
self.assertEquals(t1._host, t2._host)
1143
self.assertEquals(t1._port, t2._port)
1145
def test__reuse_for(self):
1146
t = self.get_transport()
1147
if not isinstance(t, ConnectedTransport):
1148
raise TestSkipped("not a connected transport")
1150
def new_url(scheme=None, user=None, password=None,
1151
host=None, port=None, path=None):
1152
"""Build a new url from t.base chaging only parts of it.
1154
Only the parameters different from None will be changed.
1156
if scheme is None: scheme = t._scheme
1157
if user is None: user = t._user
1158
if password is None: password = t._password
1159
if user is None: user = t._user
1160
if host is None: host = t._host
1161
if port is None: port = t._port
1162
if path is None: path = t._path
1163
return t._unsplit_url(scheme, user, password, host, port, path)
1165
self.assertIsNot(t, t._reuse_for(new_url(scheme='foo')))
1170
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1171
# passwords are not taken into account because:
1172
# - it makes no sense to have two different valid passwords for the
1174
# - _password in ConnectedTransport is intended to collect what the
1175
# user specified from the command-line and there are cases where the
1176
# new url can contain no password (if the url was built from an
1177
# existing transport.base for example)
1178
# - password are considered part of the credentials provided at
1179
# connection creation time and as such may not be present in the url
1180
# (they may be typed by the user when prompted for example)
1181
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1182
# We will not connect, we can use a invalid host
1183
self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1188
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1190
def test_connection_sharing(self):
1191
t = self.get_transport()
1192
if not isinstance(t, ConnectedTransport):
1193
raise TestSkipped("not a connected transport")
1195
c = t.clone('subdir')
1196
# Some transports will create the connection only when needed
1197
t.has('surely_not') # Force connection
1198
self.assertIs(t._get_connection(), c._get_connection())
1200
# Temporary failure, we need to create a new dummy connection
1201
new_connection = object()
1202
t._set_connection(new_connection)
1203
# Check that both transports use the same connection
1204
self.assertIs(new_connection, t._get_connection())
1205
self.assertIs(new_connection, c._get_connection())
1207
def test_reuse_connection_for_various_paths(self):
1208
t = self.get_transport()
1209
if not isinstance(t, ConnectedTransport):
1210
raise TestSkipped("not a connected transport")
1212
t.has('surely_not') # Force connection
1213
self.assertIsNot(None, t._get_connection())
1215
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1216
self.assertIsNot(t, subdir)
1217
self.assertIs(t._get_connection(), subdir._get_connection())
1219
home = subdir._reuse_for(t.base + 'home')
1220
self.assertIs(t._get_connection(), home._get_connection())
1221
self.assertIs(subdir._get_connection(), home._get_connection())
1132
1223
def test_clone(self):
1133
1224
# TODO: Test that clone moves up and down the filesystem
1134
1225
t1 = self.get_transport()
1353
1444
self.check_transport_contents(contents, t, urlutils.escape(fname))
1355
1446
def test_connect_twice_is_same_content(self):
1356
# check that our server (whatever it is) is accessable reliably
1447
# check that our server (whatever it is) is accessible reliably
1357
1448
# via get_transport and multiple connections share content.
1358
1449
transport = self.get_transport()
1359
1450
if transport.is_readonly():
1361
1452
transport.put_bytes('foo', 'bar')
1362
transport2 = self.get_transport()
1363
self.check_transport_contents('bar', transport2, 'foo')
1453
transport3 = self.get_transport()
1454
self.check_transport_contents('bar', transport3, 'foo')
1364
1455
# its base should be usable.
1365
transport2 = bzrlib.transport.get_transport(transport.base)
1366
self.check_transport_contents('bar', transport2, 'foo')
1456
transport4 = get_transport(transport.base)
1457
self.check_transport_contents('bar', transport4, 'foo')
1368
1459
# now opening at a relative url should give use a sane result:
1369
1460
transport.mkdir('newdir')
1370
transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
1371
transport2 = transport2.clone('..')
1372
self.check_transport_contents('bar', transport2, 'foo')
1461
transport5 = get_transport(transport.base + "newdir")
1462
transport6 = transport5.clone('..')
1463
self.check_transport_contents('bar', transport6, 'foo')
1374
1465
def test_lock_write(self):
1375
1466
"""Test transport-level write locks.