230
230
# having vendor be invalid means that if it tries to connect via the
231
231
# vendor it will blow up.
232
232
client_medium = medium.SmartSSHClientMedium('127.0.0.1', unopened_port,
233
username=None, password=None, vendor="not a vendor")
233
username=None, password=None, vendor="not a vendor",
234
bzr_remote_path='bzr')
236
237
def test_ssh_client_connects_on_first_use(self):
239
240
output = StringIO()
240
241
vendor = StringIOSSHVendor(StringIO(), output)
241
242
client_medium = medium.SmartSSHClientMedium(
242
'a hostname', 'a port', 'a username', 'a password', vendor)
243
'a hostname', 'a port', 'a username', 'a password', vendor, 'bzr')
243
244
client_medium._accept_bytes('abc')
244
245
self.assertEqual('abc', output.getvalue())
245
246
self.assertEqual([('connect_ssh', 'a username', 'a password',
257
258
osutils.set_or_unset_env('BZR_REMOTE_PATH', orig_bzr_remote_path)
258
259
self.addCleanup(cleanup_environ)
259
260
os.environ['BZR_REMOTE_PATH'] = 'fugly'
260
client_medium = medium.SmartSSHClientMedium('a hostname', 'a port', 'a username',
261
client_medium = self.callDeprecated(
262
['bzr_remote_path is required as of bzr 0.92'],
263
medium.SmartSSHClientMedium, 'a hostname', 'a port', 'a username',
261
264
'a password', vendor)
262
265
client_medium._accept_bytes('abc')
263
266
self.assertEqual('abc', output.getvalue())
266
269
['fugly', 'serve', '--inet', '--directory=/', '--allow-writes'])],
272
def test_ssh_client_changes_command_when_bzr_remote_path_passed(self):
273
# The only thing that initiates a connection from the medium is giving
276
vendor = StringIOSSHVendor(StringIO(), output)
277
client_medium = medium.SmartSSHClientMedium('a hostname', 'a port',
278
'a username', 'a password', vendor, bzr_remote_path='fugly')
279
client_medium._accept_bytes('abc')
280
self.assertEqual('abc', output.getvalue())
281
self.assertEqual([('connect_ssh', 'a username', 'a password',
282
'a hostname', 'a port',
283
['fugly', 'serve', '--inet', '--directory=/', '--allow-writes'])],
269
286
def test_ssh_client_disconnect_does_so(self):
270
287
# calling disconnect should disconnect both the read_from and write_to
271
288
# file-like object it from the ssh connection.
272
289
input = StringIO()
273
290
output = StringIO()
274
291
vendor = StringIOSSHVendor(input, output)
275
client_medium = medium.SmartSSHClientMedium('a hostname', vendor=vendor)
292
client_medium = medium.SmartSSHClientMedium('a hostname',
294
bzr_remote_path='bzr')
276
295
client_medium._accept_bytes('abc')
277
296
client_medium.disconnect()
278
297
self.assertTrue(input.closed)
292
311
input = StringIO()
293
312
output = StringIO()
294
313
vendor = StringIOSSHVendor(input, output)
295
client_medium = medium.SmartSSHClientMedium('a hostname', vendor=vendor)
314
client_medium = medium.SmartSSHClientMedium('a hostname',
315
vendor=vendor, bzr_remote_path='bzr')
296
316
client_medium._accept_bytes('abc')
297
317
client_medium.disconnect()
298
318
# the disconnect has closed output, so we need a new output for the
320
340
def test_ssh_client_ignores_disconnect_when_not_connected(self):
321
341
# Doing a disconnect on a new (and thus unconnected) SSH medium
322
342
# does not fail. It's ok to disconnect an unconnected medium.
323
client_medium = medium.SmartSSHClientMedium(None)
343
client_medium = medium.SmartSSHClientMedium(None,
344
bzr_remote_path='bzr')
324
345
client_medium.disconnect()
326
347
def test_ssh_client_raises_on_read_when_not_connected(self):
327
348
# Doing a read on a new (and thus unconnected) SSH medium raises
328
349
# MediumNotConnected.
329
client_medium = medium.SmartSSHClientMedium(None)
330
self.assertRaises(errors.MediumNotConnected, client_medium.read_bytes, 0)
331
self.assertRaises(errors.MediumNotConnected, client_medium.read_bytes, 1)
350
client_medium = medium.SmartSSHClientMedium(None,
351
bzr_remote_path='bzr')
352
self.assertRaises(errors.MediumNotConnected, client_medium.read_bytes,
354
self.assertRaises(errors.MediumNotConnected, client_medium.read_bytes,
333
357
def test_ssh_client_supports__flush(self):
334
358
# invoking _flush on a SSHClientMedium should flush the output
341
365
def logging_flush(): flush_calls.append('flush')
342
366
output.flush = logging_flush
343
367
vendor = StringIOSSHVendor(input, output)
344
client_medium = medium.SmartSSHClientMedium('a hostname', vendor=vendor)
368
client_medium = medium.SmartSSHClientMedium('a hostname',
370
bzr_remote_path='bzr')
345
371
# this call is here to ensure we only flush once, not on every
346
372
# _accept_bytes call.
347
373
client_medium._accept_bytes('abc')
1208
1234
self.assertIsInstance(t, remote.RemoteSSHTransport)
1209
1235
self.assertEqual('example.com', t._host)
1237
def test_bzr_https(self):
1238
# https://bugs.launchpad.net/bzr/+bug/128456
1239
t = get_transport('bzr+https://example.com/path')
1240
self.assertIsInstance(t, remote.RemoteHTTPTransport)
1241
self.assertStartsWith(
1242
t._http_transport.base,
1212
1246
class TestRemoteTransport(tests.TestCase):