1
# Copyright (C) 2006-2010 Canonical Ltd
1
# Copyright (C) 2006-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
45
45
from bzrlib.branch import Branch
46
from bzrlib.bzrdir import BzrDir, BzrDirFormat
46
from bzrlib.bzrdir import (
47
51
from bzrlib.remote import (
49
53
RemoteBranchFormat,
53
56
RemoteRepositoryFormat,
58
61
from bzrlib.smart.client import _SmartClient
59
62
from bzrlib.smart.repository import SmartServerRepositoryGetParentMap
60
63
from bzrlib.tests import (
62
split_suite_by_condition,
66
from bzrlib.transport import get_transport
66
from bzrlib.tests.scenarios import load_tests_apply_scenarios
67
67
from bzrlib.transport.memory import MemoryTransport
68
68
from bzrlib.transport.remote import (
70
70
RemoteSSHTransport,
71
71
RemoteTCPTransport,
74
def load_tests(standard_tests, module, loader):
75
to_adapt, result = split_suite_by_condition(
76
standard_tests, condition_isinstance(BasicRemoteObjectTests))
77
smart_server_version_scenarios = [
75
load_tests = load_tests_apply_scenarios
78
class BasicRemoteObjectTests(tests.TestCaseWithTransport):
79
{'transport_server': test_server.SmartTCPServer_for_testing_v2_only}),
82
{'transport_server': test_server.SmartTCPServer_for_testing_v2_only}),
81
{'transport_server': test_server.SmartTCPServer_for_testing})]
82
return multiply_tests(to_adapt, smart_server_version_scenarios, result)
85
class BasicRemoteObjectTests(tests.TestCaseWithTransport):
84
{'transport_server': test_server.SmartTCPServer_for_testing})]
88
88
super(BasicRemoteObjectTests, self).setUp()
89
89
self.transport = self.get_transport()
90
90
# make a branch that can be opened over the smart transport
91
91
self.local_wt = BzrDir.create_standalone_workingtree('.')
94
self.transport.disconnect()
95
tests.TestCaseWithTransport.tearDown(self)
92
self.addCleanup(self.transport.disconnect)
97
94
def test_create_remote_bzrdir(self):
98
95
b = remote.RemoteBzrDir(self.transport, remote.RemoteBzrDirFormat())
122
119
def test_find_correct_format(self):
123
120
"""Should open a RemoteBzrDir over a RemoteTransport"""
124
121
fmt = BzrDirFormat.find_format(self.transport)
125
self.assertTrue(RemoteBzrDirFormat
126
in BzrDirFormat._control_server_formats)
122
self.assertTrue(bzrdir.RemoteBzrProber
123
in controldir.ControlDirFormat._server_probers)
127
124
self.assertIsInstance(fmt, remote.RemoteBzrDirFormat)
129
126
def test_open_detected_smart_format(self):
359
356
a given client_base and transport_base.
361
358
client_medium = medium.SmartClientMedium(client_base)
362
transport = get_transport(transport_base)
363
result = client_medium.remote_path_from_transport(transport)
359
t = transport.get_transport(transport_base)
360
result = client_medium.remote_path_from_transport(t)
364
361
self.assertEqual(expected, result)
366
363
def test_remote_path_from_transport(self):
377
374
a given transport_base and relpath of that transport. (Note that
378
375
HttpTransportBase is a subclass of SmartClientMedium)
380
base_transport = get_transport(transport_base)
377
base_transport = transport.get_transport(transport_base)
381
378
client_medium = base_transport.get_smart_medium()
382
379
cloned_transport = base_transport.clone(relpath)
383
380
result = client_medium.remote_path_from_transport(cloned_transport)
609
606
# _get_tree_branch is a form of open_branch, but it should only ask for
610
607
# branch opening, not any other network requests.
609
def open_branch(name=None):
613
610
calls.append("Called")
614
611
return "a-branch"
615
612
transport = MemoryTransport()
688
685
self.assertRaises(errors.NotBranchError,
689
RemoteBzrDirFormat.probe_transport, OldServerTransport())
686
RemoteBzrProber.probe_transport, OldServerTransport())
692
689
class TestBzrDirCreateBranch(TestRemote):
1618
1615
def test_get_multi_line_branch_conf(self):
1619
1616
# Make sure that multiple-line branch.conf files are supported
1621
# https://bugs.edge.launchpad.net/bzr/+bug/354075
1618
# https://bugs.launchpad.net/bzr/+bug/354075
1622
1619
client = FakeClient()
1623
1620
client.add_expected_call(
1624
1621
'Branch.get_stacked_on_url', ('memory:///',),
1652
1649
branch.unlock()
1653
1650
self.assertFinished(client)
1652
def test_set_option_with_dict(self):
1653
client = FakeClient()
1654
client.add_expected_call(
1655
'Branch.get_stacked_on_url', ('memory:///',),
1656
'error', ('NotStacked',),)
1657
client.add_expected_call(
1658
'Branch.lock_write', ('memory:///', '', ''),
1659
'success', ('ok', 'branch token', 'repo token'))
1660
encoded_dict_value = 'd5:ascii1:a11:unicode \xe2\x8c\x9a3:\xe2\x80\xbde'
1661
client.add_expected_call(
1662
'Branch.set_config_option_dict', ('memory:///', 'branch token',
1663
'repo token', encoded_dict_value, 'foo', ''),
1665
client.add_expected_call(
1666
'Branch.unlock', ('memory:///', 'branch token', 'repo token'),
1668
transport = MemoryTransport()
1669
branch = self.make_remote_branch(transport, client)
1671
config = branch._get_config()
1673
{'ascii': 'a', u'unicode \N{WATCH}': u'\N{INTERROBANG}'},
1676
self.assertFinished(client)
1655
1678
def test_backwards_compat_set_option(self):
1656
1679
self.setup_smart_server_with_call_log()
1657
1680
branch = self.make_branch('.')
1664
1687
self.assertLength(10, self.hpss_calls)
1665
1688
self.assertEqual('value', branch._get_config().get_option('name'))
1690
def test_backwards_compat_set_option_with_dict(self):
1691
self.setup_smart_server_with_call_log()
1692
branch = self.make_branch('.')
1693
verb = 'Branch.set_config_option_dict'
1694
self.disable_verb(verb)
1696
self.addCleanup(branch.unlock)
1697
self.reset_smart_call_log()
1698
config = branch._get_config()
1699
value_dict = {'ascii': 'a', u'unicode \N{WATCH}': u'\N{INTERROBANG}'}
1700
config.set_option(value_dict, 'name')
1701
self.assertLength(10, self.hpss_calls)
1702
self.assertEqual(value_dict, branch._get_config().get_option('name'))
1668
1705
class TestBranchLockWrite(RemoteBranchTestCase):
1828
1865
class TestRepositoryFormat(TestRemoteRepository):
1830
1867
def test_fast_delta(self):
1831
true_name = groupcompress_repo.RepositoryFormatCHK1().network_name()
1868
true_name = groupcompress_repo.RepositoryFormat2a().network_name()
1832
1869
true_format = RemoteRepositoryFormat()
1833
1870
true_format._network_name = true_name
1834
1871
self.assertEqual(True, true_format.fast_deltas)
2301
2339
transport_path = 'quack'
2302
2340
repo, client = self.setup_fake_client_and_repository(transport_path)
2303
2341
client.add_success_response('ok', 'a token')
2304
result = repo.lock_write()
2342
token = repo.lock_write().repository_token
2305
2343
self.assertEqual(
2306
2344
[('call', 'Repository.lock_write', ('quack/', ''))],
2308
self.assertEqual('a token', result)
2346
self.assertEqual('a token', token)
2310
2348
def test_lock_write_already_locked(self):
2311
2349
transport_path = 'quack'