414
418
# Calling _remember_remote_is_before again with a lower value works.
415
419
client_medium._remember_remote_is_before((1, 5))
416
420
self.assertTrue(client_medium._is_remote_before((1, 5)))
417
# You cannot call _remember_remote_is_before with a larger value.
419
AssertionError, client_medium._remember_remote_is_before, (1, 9))
421
# If you call _remember_remote_is_before with a higher value it logs a
422
# warning, and continues to remember the lower value.
423
self.assertNotContainsRe(self.get_log(), '_remember_remote_is_before')
424
client_medium._remember_remote_is_before((1, 9))
425
self.assertContainsRe(self.get_log(), '_remember_remote_is_before')
426
self.assertTrue(client_medium._is_remote_before((1, 5)))
422
429
class TestBzrDirCloningMetaDir(TestRemote):
474
481
self.assertFinished(client)
484
class TestBzrDirOpen(TestRemote):
486
def make_fake_client_and_transport(self, path='quack'):
487
transport = MemoryTransport()
488
transport.mkdir(path)
489
transport = transport.clone(path)
490
client = FakeClient(transport.base)
491
return client, transport
493
def test_absent(self):
494
client, transport = self.make_fake_client_and_transport()
495
client.add_expected_call(
496
'BzrDir.open_2.1', ('quack/',), 'success', ('no',))
497
self.assertRaises(errors.NotBranchError, RemoteBzrDir, transport,
498
remote.RemoteBzrDirFormat(), _client=client, _force_probe=True)
499
self.assertFinished(client)
501
def test_present_without_workingtree(self):
502
client, transport = self.make_fake_client_and_transport()
503
client.add_expected_call(
504
'BzrDir.open_2.1', ('quack/',), 'success', ('yes', 'no'))
505
bd = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
506
_client=client, _force_probe=True)
507
self.assertIsInstance(bd, RemoteBzrDir)
508
self.assertFalse(bd.has_workingtree())
509
self.assertRaises(errors.NoWorkingTree, bd.open_workingtree)
510
self.assertFinished(client)
512
def test_present_with_workingtree(self):
513
client, transport = self.make_fake_client_and_transport()
514
client.add_expected_call(
515
'BzrDir.open_2.1', ('quack/',), 'success', ('yes', 'yes'))
516
bd = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
517
_client=client, _force_probe=True)
518
self.assertIsInstance(bd, RemoteBzrDir)
519
self.assertTrue(bd.has_workingtree())
520
self.assertRaises(errors.NotLocalUrl, bd.open_workingtree)
521
self.assertFinished(client)
523
def test_backwards_compat(self):
524
client, transport = self.make_fake_client_and_transport()
525
client.add_expected_call(
526
'BzrDir.open_2.1', ('quack/',), 'unknown', ('BzrDir.open_2.1',))
527
client.add_expected_call(
528
'BzrDir.open', ('quack/',), 'success', ('yes',))
529
bd = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
530
_client=client, _force_probe=True)
531
self.assertIsInstance(bd, RemoteBzrDir)
532
self.assertFinished(client)
534
def test_backwards_compat_hpss_v2(self):
535
client, transport = self.make_fake_client_and_transport()
536
# Monkey-patch fake client to simulate real-world behaviour with v2
537
# server: upon first RPC call detect the protocol version, and because
538
# the version is 2 also do _remember_remote_is_before((1, 6)) before
539
# continuing with the RPC.
540
orig_check_call = client._check_call
541
def check_call(method, args):
542
client._medium._protocol_version = 2
543
client._medium._remember_remote_is_before((1, 6))
544
client._check_call = orig_check_call
545
client._check_call(method, args)
546
client._check_call = check_call
547
client.add_expected_call(
548
'BzrDir.open_2.1', ('quack/',), 'unknown', ('BzrDir.open_2.1',))
549
client.add_expected_call(
550
'BzrDir.open', ('quack/',), 'success', ('yes',))
551
bd = RemoteBzrDir(transport, remote.RemoteBzrDirFormat(),
552
_client=client, _force_probe=True)
553
self.assertIsInstance(bd, RemoteBzrDir)
554
self.assertFinished(client)
477
557
class TestBzrDirOpenBranch(TestRemote):
479
559
def test_backwards_compat(self):
1168
1248
len(branch.repository._real_repository._fallback_repositories))
1170
1250
def test_get_stacked_on_real_branch(self):
1171
base_branch = self.make_branch('base', format='1.6')
1172
stacked_branch = self.make_branch('stacked', format='1.6')
1251
base_branch = self.make_branch('base')
1252
stacked_branch = self.make_branch('stacked')
1173
1253
stacked_branch.set_stacked_on_url('../base')
1174
1254
reference_format = self.get_repo_format()
1175
1255
network_name = reference_format.network_name()
1176
1256
client = FakeClient(self.get_url())
1177
1257
branch_network_name = self.get_branch_format().network_name()
1178
1258
client.add_expected_call(
1179
'BzrDir.open_branchV2', ('stacked/',),
1259
'BzrDir.open_branchV3', ('stacked/',),
1180
1260
'success', ('branch', branch_network_name))
1181
1261
client.add_expected_call(
1182
1262
'BzrDir.find_repositoryV3', ('stacked/',),
1183
'success', ('ok', '', 'no', 'no', 'yes', network_name))
1263
'success', ('ok', '', 'yes', 'no', 'yes', network_name))
1184
1264
# called twice, once from constructor and then again by us
1185
1265
client.add_expected_call(
1186
1266
'Branch.get_stacked_on_url', ('stacked/',),
1572
1652
branch.unlock()
1573
1653
self.assertFinished(client)
1655
def test_set_option_with_dict(self):
1656
client = FakeClient()
1657
client.add_expected_call(
1658
'Branch.get_stacked_on_url', ('memory:///',),
1659
'error', ('NotStacked',),)
1660
client.add_expected_call(
1661
'Branch.lock_write', ('memory:///', '', ''),
1662
'success', ('ok', 'branch token', 'repo token'))
1663
encoded_dict_value = 'd5:ascii1:a11:unicode \xe2\x8c\x9a3:\xe2\x80\xbde'
1664
client.add_expected_call(
1665
'Branch.set_config_option_dict', ('memory:///', 'branch token',
1666
'repo token', encoded_dict_value, 'foo', ''),
1668
client.add_expected_call(
1669
'Branch.unlock', ('memory:///', 'branch token', 'repo token'),
1671
transport = MemoryTransport()
1672
branch = self.make_remote_branch(transport, client)
1674
config = branch._get_config()
1676
{'ascii': 'a', u'unicode \N{WATCH}': u'\N{INTERROBANG}'},
1679
self.assertFinished(client)
1575
1681
def test_backwards_compat_set_option(self):
1576
1682
self.setup_smart_server_with_call_log()
1577
1683
branch = self.make_branch('.')
1584
1690
self.assertLength(10, self.hpss_calls)
1585
1691
self.assertEqual('value', branch._get_config().get_option('name'))
1693
def test_backwards_compat_set_option_with_dict(self):
1694
self.setup_smart_server_with_call_log()
1695
branch = self.make_branch('.')
1696
verb = 'Branch.set_config_option_dict'
1697
self.disable_verb(verb)
1699
self.addCleanup(branch.unlock)
1700
self.reset_smart_call_log()
1701
config = branch._get_config()
1702
value_dict = {'ascii': 'a', u'unicode \N{WATCH}': u'\N{INTERROBANG}'}
1703
config.set_option(value_dict, 'name')
1704
self.assertLength(10, self.hpss_calls)
1705
self.assertEqual(value_dict, branch._get_config().get_option('name'))
1588
1708
class TestBranchLockWrite(RemoteBranchTestCase):
2150
2289
repo.get_rev_id_for_revno, 5, (42, 'rev-foo'))
2151
2290
self.assertFinished(client)
2292
def test_branch_fallback_locking(self):
2293
"""RemoteBranch.get_rev_id takes a read lock, and tries to call the
2294
get_rev_id_for_revno verb. If the verb is unknown the VFS fallback
2295
will be invoked, which will fail if the repo is unlocked.
2297
self.setup_smart_server_with_call_log()
2298
tree = self.make_branch_and_memory_tree('.')
2301
rev1 = tree.commit('First')
2302
rev2 = tree.commit('Second')
2304
branch = tree.branch
2305
self.assertFalse(branch.is_locked())
2306
self.reset_smart_call_log()
2307
verb = 'Repository.get_rev_id_for_revno'
2308
self.disable_verb(verb)
2309
self.assertEqual(rev1, branch.get_rev_id(1))
2310
self.assertLength(1, [call for call in self.hpss_calls if
2311
call.call.method == verb])
2154
2314
class TestRepositoryIsShared(TestRemoteRepository):