/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_config.py

  • Committer: Vincent Ladeuil
  • Date: 2011-06-27 15:42:09 UTC
  • mfrom: (5993 +trunk)
  • mto: (5993.1.1 trunk)
  • mto: This revision was merged to the branch mainline in revision 5994.
  • Revision ID: v.ladeuil+lp@free.fr-20110627154209-azubuhbuxsz109hq
Merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
37
37
    ui,
38
38
    urlutils,
39
39
    registry,
 
40
    remote,
40
41
    tests,
41
42
    trace,
42
43
    transport,
43
44
    )
 
45
from bzrlib.symbol_versioning import (
 
46
    deprecated_in,
 
47
    deprecated_method,
 
48
    )
 
49
from bzrlib.transport import remote as transport_remote
44
50
from bzrlib.tests import (
45
51
    features,
46
 
    TestSkipped,
47
52
    scenarios,
 
53
    test_server,
48
54
    )
49
55
from bzrlib.util.configobj import configobj
50
56
 
63
69
 
64
70
load_tests = scenarios.load_tests_apply_scenarios
65
71
 
66
 
# We need adapters that can build a config store in a test context. Test
67
 
# classes, based on TestCaseWithTransport, can use the registry to parametrize
68
 
# themselves. The builder will receive a test instance and should return a
69
 
# ready-to-use store.  Plugins that defines new stores can also register
70
 
# themselves here to be tested against the tests defined below.
71
 
 
72
 
# FIXME: plugins should *not* need to import test_config to register their
73
 
# helpers (or selftest -s xxx will be broken), the following registry should be
74
 
# moved to bzrlib.config instead so that selftest -s bt.test_config also runs
75
 
# the plugin specific tests (selftest -s bp.xxx won't, that would be against
76
 
# the spirit of '-s') -- vila 20110503
77
 
test_store_builder_registry = registry.Registry()
78
 
test_store_builder_registry.register(
 
72
# Register helpers to build stores
 
73
config.test_store_builder_registry.register(
79
74
    'configobj', lambda test: config.IniFileStore(test.get_transport(),
80
75
                                                  'configobj.conf'))
81
 
test_store_builder_registry.register(
 
76
config.test_store_builder_registry.register(
82
77
    'bazaar', lambda test: config.GlobalStore())
83
 
test_store_builder_registry.register(
 
78
config.test_store_builder_registry.register(
84
79
    'location', lambda test: config.LocationStore())
85
 
test_store_builder_registry.register(
86
 
    'branch', lambda test: config.BranchStore(test.branch))
87
 
 
88
 
# FIXME: Same remark as above for the following registry -- vila 20110503
89
 
test_stack_builder_registry = registry.Registry()
90
 
test_stack_builder_registry.register(
 
80
 
 
81
 
 
82
def build_backing_branch(test, relpath,
 
83
                         transport_class=None, server_class=None):
 
84
    """Test helper to create a backing branch only once.
 
85
 
 
86
    Some tests needs multiple stores/stacks to check concurrent update
 
87
    behaviours. As such, they need to build different branch *objects* even if
 
88
    they share the branch on disk.
 
89
 
 
90
    :param relpath: The relative path to the branch. (Note that the helper
 
91
        should always specify the same relpath).
 
92
 
 
93
    :param transport_class: The Transport class the test needs to use.
 
94
 
 
95
    :param server_class: The server associated with the ``transport_class``
 
96
        above.
 
97
 
 
98
    Either both or neither of ``transport_class`` and ``server_class`` should
 
99
    be specified.
 
100
    """
 
101
    if transport_class is not None and server_class is not None:
 
102
        test.transport_class = transport_class
 
103
        test.transport_server = server_class
 
104
    elif not (transport_class is None and server_class is None):
 
105
        raise AssertionError('Specify both ``transport_class`` and '
 
106
                             '``server_class`` or neither of them')
 
107
    if getattr(test, 'backing_branch', None) is None:
 
108
        # First call, let's build the branch on disk
 
109
        test.backing_branch = test.make_branch(relpath)
 
110
 
 
111
 
 
112
def build_branch_store(test):
 
113
    build_backing_branch(test, 'branch')
 
114
    b = branch.Branch.open('branch')
 
115
    return config.BranchStore(b)
 
116
config.test_store_builder_registry.register('branch', build_branch_store)
 
117
 
 
118
 
 
119
def build_remote_branch_store(test):
 
120
    # There is only one permutation (but we won't be able to handle more with
 
121
    # this design anyway)
 
122
    (transport_class,
 
123
     server_class) = transport_remote.get_test_permutations()[0]
 
124
    build_backing_branch(test, 'branch', transport_class, server_class)
 
125
    b = branch.Branch.open(test.get_url('branch'))
 
126
    return config.BranchStore(b)
 
127
config.test_store_builder_registry.register('remote_branch',
 
128
                                            build_remote_branch_store)
 
129
 
 
130
 
 
131
config.test_stack_builder_registry.register(
91
132
    'bazaar', lambda test: config.GlobalStack())
92
 
test_stack_builder_registry.register(
 
133
config.test_stack_builder_registry.register(
93
134
    'location', lambda test: config.LocationStack('.'))
94
 
test_stack_builder_registry.register(
95
 
    'branch', lambda test: config.BranchStack(test.branch))
 
135
 
 
136
 
 
137
def build_branch_stack(test):
 
138
    build_backing_branch(test, 'branch')
 
139
    b = branch.Branch.open('branch')
 
140
    return config.BranchStack(b)
 
141
config.test_stack_builder_registry.register('branch', build_branch_stack)
 
142
 
 
143
 
 
144
def build_remote_branch_stack(test):
 
145
    # There is only one permutation (but we won't be able to handle more with
 
146
    # this design anyway)
 
147
    (transport_class,
 
148
     server_class) = transport_remote.get_test_permutations()[0]
 
149
    build_backing_branch(test, 'branch', transport_class, server_class)
 
150
    b = branch.Branch.open(test.get_url('branch'))
 
151
    return config.BranchStack(b)
 
152
config.test_stack_builder_registry.register('remote_branch',
 
153
                                            build_remote_branch_stack)
96
154
 
97
155
 
98
156
sample_long_alias="log -r-15..-1 --line"
403
461
        config.Config()
404
462
 
405
463
    def test_no_default_editor(self):
406
 
        self.assertRaises(NotImplementedError, config.Config().get_editor)
 
464
        self.assertRaises(
 
465
            NotImplementedError,
 
466
            self.applyDeprecated, deprecated_in((2, 4, 0)),
 
467
            config.Config().get_editor)
407
468
 
408
469
    def test_user_email(self):
409
470
        my_config = InstrumentedConfig()
638
699
    def test_default_is_True(self):
639
700
        self.config = self.get_config(True)
640
701
        self.assertExpandIs(True)
641
 
        
 
702
 
642
703
    def test_default_is_False(self):
643
704
        self.config = self.get_config(False)
644
705
        self.assertExpandIs(False)
645
 
        
 
706
 
646
707
 
647
708
class TestIniConfigOptionExpansion(tests.TestCase):
648
709
    """Test option expansion from the IniConfig level.
1117
1178
 
1118
1179
    def test_configured_editor(self):
1119
1180
        my_config = config.GlobalConfig.from_string(sample_config_text)
1120
 
        self.assertEqual("vim", my_config.get_editor())
 
1181
        editor = self.applyDeprecated(
 
1182
            deprecated_in((2, 4, 0)), my_config.get_editor)
 
1183
        self.assertEqual('vim', editor)
1121
1184
 
1122
1185
    def test_signatures_always(self):
1123
1186
        my_config = config.GlobalConfig.from_string(sample_always_signatures)
1808
1871
 
1809
1872
class TestTransportConfig(tests.TestCaseWithTransport):
1810
1873
 
 
1874
    def test_load_utf8(self):
 
1875
        """Ensure we can load an utf8-encoded file."""
 
1876
        t = self.get_transport()
 
1877
        unicode_user = u'b\N{Euro Sign}ar'
 
1878
        unicode_content = u'user=%s' % (unicode_user,)
 
1879
        utf8_content = unicode_content.encode('utf8')
 
1880
        # Store the raw content in the config file
 
1881
        t.put_bytes('foo.conf', utf8_content)
 
1882
        conf = config.TransportConfig(t, 'foo.conf')
 
1883
        self.assertEquals(unicode_user, conf.get_option('user'))
 
1884
 
 
1885
    def test_load_non_ascii(self):
 
1886
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
 
1887
        t = self.get_transport()
 
1888
        t.put_bytes('foo.conf', 'user=foo\n#\xff\n')
 
1889
        conf = config.TransportConfig(t, 'foo.conf')
 
1890
        self.assertRaises(errors.ConfigContentError, conf._get_configobj)
 
1891
 
 
1892
    def test_load_erroneous_content(self):
 
1893
        """Ensure we display a proper error on content that can't be parsed."""
 
1894
        t = self.get_transport()
 
1895
        t.put_bytes('foo.conf', '[open_section\n')
 
1896
        conf = config.TransportConfig(t, 'foo.conf')
 
1897
        self.assertRaises(errors.ParseConfigError, conf._get_configobj)
 
1898
 
1811
1899
    def test_get_value(self):
1812
1900
        """Test that retreiving a value from a section is possible"""
1813
 
        bzrdir_config = config.TransportConfig(transport.get_transport('.'),
 
1901
        bzrdir_config = config.TransportConfig(self.get_transport('.'),
1814
1902
                                               'control.conf')
1815
1903
        bzrdir_config.set_option('value', 'key', 'SECTION')
1816
1904
        bzrdir_config.set_option('value2', 'key2')
1846
1934
        self.assertIs(None, bzrdir_config.get_default_stack_on())
1847
1935
 
1848
1936
 
 
1937
class TestOldConfigHooks(tests.TestCaseWithTransport):
 
1938
 
 
1939
    def setUp(self):
 
1940
        super(TestOldConfigHooks, self).setUp()
 
1941
        create_configs_with_file_option(self)
 
1942
 
 
1943
    def assertGetHook(self, conf, name, value):
 
1944
        calls = []
 
1945
        def hook(*args):
 
1946
            calls.append(args)
 
1947
        config.OldConfigHooks.install_named_hook('get', hook, None)
 
1948
        self.addCleanup(
 
1949
            config.OldConfigHooks.uninstall_named_hook, 'get', None)
 
1950
        self.assertLength(0, calls)
 
1951
        actual_value = conf.get_user_option(name)
 
1952
        self.assertEquals(value, actual_value)
 
1953
        self.assertLength(1, calls)
 
1954
        self.assertEquals((conf, name, value), calls[0])
 
1955
 
 
1956
    def test_get_hook_bazaar(self):
 
1957
        self.assertGetHook(self.bazaar_config, 'file', 'bazaar')
 
1958
 
 
1959
    def test_get_hook_locations(self):
 
1960
        self.assertGetHook(self.locations_config, 'file', 'locations')
 
1961
 
 
1962
    def test_get_hook_branch(self):
 
1963
        # Since locations masks branch, we define a different option
 
1964
        self.branch_config.set_user_option('file2', 'branch')
 
1965
        self.assertGetHook(self.branch_config, 'file2', 'branch')
 
1966
 
 
1967
    def assertSetHook(self, conf, name, value):
 
1968
        calls = []
 
1969
        def hook(*args):
 
1970
            calls.append(args)
 
1971
        config.OldConfigHooks.install_named_hook('set', hook, None)
 
1972
        self.addCleanup(
 
1973
            config.OldConfigHooks.uninstall_named_hook, 'set', None)
 
1974
        self.assertLength(0, calls)
 
1975
        conf.set_user_option(name, value)
 
1976
        self.assertLength(1, calls)
 
1977
        # We can't assert the conf object below as different configs use
 
1978
        # different means to implement set_user_option and we care only about
 
1979
        # coverage here.
 
1980
        self.assertEquals((name, value), calls[0][1:])
 
1981
 
 
1982
    def test_set_hook_bazaar(self):
 
1983
        self.assertSetHook(self.bazaar_config, 'foo', 'bazaar')
 
1984
 
 
1985
    def test_set_hook_locations(self):
 
1986
        self.assertSetHook(self.locations_config, 'foo', 'locations')
 
1987
 
 
1988
    def test_set_hook_branch(self):
 
1989
        self.assertSetHook(self.branch_config, 'foo', 'branch')
 
1990
 
 
1991
    def assertRemoveHook(self, conf, name, section_name=None):
 
1992
        calls = []
 
1993
        def hook(*args):
 
1994
            calls.append(args)
 
1995
        config.OldConfigHooks.install_named_hook('remove', hook, None)
 
1996
        self.addCleanup(
 
1997
            config.OldConfigHooks.uninstall_named_hook, 'remove', None)
 
1998
        self.assertLength(0, calls)
 
1999
        conf.remove_user_option(name, section_name)
 
2000
        self.assertLength(1, calls)
 
2001
        # We can't assert the conf object below as different configs use
 
2002
        # different means to implement remove_user_option and we care only about
 
2003
        # coverage here.
 
2004
        self.assertEquals((name,), calls[0][1:])
 
2005
 
 
2006
    def test_remove_hook_bazaar(self):
 
2007
        self.assertRemoveHook(self.bazaar_config, 'file')
 
2008
 
 
2009
    def test_remove_hook_locations(self):
 
2010
        self.assertRemoveHook(self.locations_config, 'file',
 
2011
                              self.locations_config.location)
 
2012
 
 
2013
    def test_remove_hook_branch(self):
 
2014
        self.assertRemoveHook(self.branch_config, 'file')
 
2015
 
 
2016
    def assertLoadHook(self, name, conf_class, *conf_args):
 
2017
        calls = []
 
2018
        def hook(*args):
 
2019
            calls.append(args)
 
2020
        config.OldConfigHooks.install_named_hook('load', hook, None)
 
2021
        self.addCleanup(
 
2022
            config.OldConfigHooks.uninstall_named_hook, 'load', None)
 
2023
        self.assertLength(0, calls)
 
2024
        # Build a config
 
2025
        conf = conf_class(*conf_args)
 
2026
        # Access an option to trigger a load
 
2027
        conf.get_user_option(name)
 
2028
        self.assertLength(1, calls)
 
2029
        # Since we can't assert about conf, we just use the number of calls ;-/
 
2030
 
 
2031
    def test_load_hook_bazaar(self):
 
2032
        self.assertLoadHook('file', config.GlobalConfig)
 
2033
 
 
2034
    def test_load_hook_locations(self):
 
2035
        self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
 
2036
 
 
2037
    def test_load_hook_branch(self):
 
2038
        self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
 
2039
 
 
2040
    def assertSaveHook(self, conf):
 
2041
        calls = []
 
2042
        def hook(*args):
 
2043
            calls.append(args)
 
2044
        config.OldConfigHooks.install_named_hook('save', hook, None)
 
2045
        self.addCleanup(
 
2046
            config.OldConfigHooks.uninstall_named_hook, 'save', None)
 
2047
        self.assertLength(0, calls)
 
2048
        # Setting an option triggers a save
 
2049
        conf.set_user_option('foo', 'bar')
 
2050
        self.assertLength(1, calls)
 
2051
        # Since we can't assert about conf, we just use the number of calls ;-/
 
2052
 
 
2053
    def test_save_hook_bazaar(self):
 
2054
        self.assertSaveHook(self.bazaar_config)
 
2055
 
 
2056
    def test_save_hook_locations(self):
 
2057
        self.assertSaveHook(self.locations_config)
 
2058
 
 
2059
    def test_save_hook_branch(self):
 
2060
        self.assertSaveHook(self.branch_config)
 
2061
 
 
2062
 
 
2063
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
 
2064
    """Tests config hooks for remote configs.
 
2065
 
 
2066
    No tests for the remove hook as this is not implemented there.
 
2067
    """
 
2068
 
 
2069
    def setUp(self):
 
2070
        super(TestOldConfigHooksForRemote, self).setUp()
 
2071
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2072
        create_configs_with_file_option(self)
 
2073
 
 
2074
    def assertGetHook(self, conf, name, value):
 
2075
        calls = []
 
2076
        def hook(*args):
 
2077
            calls.append(args)
 
2078
        config.OldConfigHooks.install_named_hook('get', hook, None)
 
2079
        self.addCleanup(
 
2080
            config.OldConfigHooks.uninstall_named_hook, 'get', None)
 
2081
        self.assertLength(0, calls)
 
2082
        actual_value = conf.get_option(name)
 
2083
        self.assertEquals(value, actual_value)
 
2084
        self.assertLength(1, calls)
 
2085
        self.assertEquals((conf, name, value), calls[0])
 
2086
 
 
2087
    def test_get_hook_remote_branch(self):
 
2088
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
2089
        self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
 
2090
 
 
2091
    def test_get_hook_remote_bzrdir(self):
 
2092
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
 
2093
        conf = remote_bzrdir._get_config()
 
2094
        conf.set_option('remotedir', 'file')
 
2095
        self.assertGetHook(conf, 'file', 'remotedir')
 
2096
 
 
2097
    def assertSetHook(self, conf, name, value):
 
2098
        calls = []
 
2099
        def hook(*args):
 
2100
            calls.append(args)
 
2101
        config.OldConfigHooks.install_named_hook('set', hook, None)
 
2102
        self.addCleanup(
 
2103
            config.OldConfigHooks.uninstall_named_hook, 'set', None)
 
2104
        self.assertLength(0, calls)
 
2105
        conf.set_option(value, name)
 
2106
        self.assertLength(1, calls)
 
2107
        # We can't assert the conf object below as different configs use
 
2108
        # different means to implement set_user_option and we care only about
 
2109
        # coverage here.
 
2110
        self.assertEquals((name, value), calls[0][1:])
 
2111
 
 
2112
    def test_set_hook_remote_branch(self):
 
2113
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
2114
        self.addCleanup(remote_branch.lock_write().unlock)
 
2115
        self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
 
2116
 
 
2117
    def test_set_hook_remote_bzrdir(self):
 
2118
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
2119
        self.addCleanup(remote_branch.lock_write().unlock)
 
2120
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
 
2121
        self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
 
2122
 
 
2123
    def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
 
2124
        calls = []
 
2125
        def hook(*args):
 
2126
            calls.append(args)
 
2127
        config.OldConfigHooks.install_named_hook('load', hook, None)
 
2128
        self.addCleanup(
 
2129
            config.OldConfigHooks.uninstall_named_hook, 'load', None)
 
2130
        self.assertLength(0, calls)
 
2131
        # Build a config
 
2132
        conf = conf_class(*conf_args)
 
2133
        # Access an option to trigger a load
 
2134
        conf.get_option(name)
 
2135
        self.assertLength(expected_nb_calls, calls)
 
2136
        # Since we can't assert about conf, we just use the number of calls ;-/
 
2137
 
 
2138
    def test_load_hook_remote_branch(self):
 
2139
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
2140
        self.assertLoadHook(1, 'file', remote.RemoteBranchConfig, remote_branch)
 
2141
 
 
2142
    def test_load_hook_remote_bzrdir(self):
 
2143
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
 
2144
        # The config file doesn't exist, set an option to force its creation
 
2145
        conf = remote_bzrdir._get_config()
 
2146
        conf.set_option('remotedir', 'file')
 
2147
        # We get one call for the server and one call for the client, this is
 
2148
        # caused by the differences in implementations betwen
 
2149
        # SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
 
2150
        # SmartServerBranchGetConfigFile (in smart/branch.py)
 
2151
        self.assertLoadHook(2 ,'file', remote.RemoteBzrDirConfig, remote_bzrdir)
 
2152
 
 
2153
    def assertSaveHook(self, conf):
 
2154
        calls = []
 
2155
        def hook(*args):
 
2156
            calls.append(args)
 
2157
        config.OldConfigHooks.install_named_hook('save', hook, None)
 
2158
        self.addCleanup(
 
2159
            config.OldConfigHooks.uninstall_named_hook, 'save', None)
 
2160
        self.assertLength(0, calls)
 
2161
        # Setting an option triggers a save
 
2162
        conf.set_option('foo', 'bar')
 
2163
        self.assertLength(1, calls)
 
2164
        # Since we can't assert about conf, we just use the number of calls ;-/
 
2165
 
 
2166
    def test_save_hook_remote_branch(self):
 
2167
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
2168
        self.addCleanup(remote_branch.lock_write().unlock)
 
2169
        self.assertSaveHook(remote_branch._get_config())
 
2170
 
 
2171
    def test_save_hook_remote_bzrdir(self):
 
2172
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
2173
        self.addCleanup(remote_branch.lock_write().unlock)
 
2174
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
 
2175
        self.assertSaveHook(remote_bzrdir._get_config())
 
2176
 
 
2177
 
 
2178
class TestOption(tests.TestCase):
 
2179
 
 
2180
    def test_default_value(self):
 
2181
        opt = config.Option('foo', default='bar')
 
2182
        self.assertEquals('bar', opt.get_default())
 
2183
 
 
2184
 
 
2185
class TestOptionRegistry(tests.TestCase):
 
2186
 
 
2187
    def setUp(self):
 
2188
        super(TestOptionRegistry, self).setUp()
 
2189
        # Always start with an empty registry
 
2190
        self.overrideAttr(config, 'option_registry', registry.Registry())
 
2191
        self.registry = config.option_registry
 
2192
 
 
2193
    def test_register(self):
 
2194
        opt = config.Option('foo')
 
2195
        self.registry.register('foo', opt)
 
2196
        self.assertIs(opt, self.registry.get('foo'))
 
2197
 
 
2198
    lazy_option = config.Option('lazy_foo')
 
2199
 
 
2200
    def test_register_lazy(self):
 
2201
        self.registry.register_lazy('foo', self.__module__,
 
2202
                                    'TestOptionRegistry.lazy_option')
 
2203
        self.assertIs(self.lazy_option, self.registry.get('foo'))
 
2204
 
 
2205
    def test_registered_help(self):
 
2206
        opt = config.Option('foo')
 
2207
        self.registry.register('foo', opt, help='A simple option')
 
2208
        self.assertEquals('A simple option', self.registry.get_help('foo'))
 
2209
 
 
2210
 
 
2211
class TestRegisteredOptions(tests.TestCase):
 
2212
    """All registered options should verify some constraints."""
 
2213
 
 
2214
    scenarios = [(key, {'option_name': key, 'option': option}) for key, option
 
2215
                 in config.option_registry.iteritems()]
 
2216
 
 
2217
    def setUp(self):
 
2218
        super(TestRegisteredOptions, self).setUp()
 
2219
        self.registry = config.option_registry
 
2220
 
 
2221
    def test_proper_name(self):
 
2222
        # An option should be registered under its own name, this can't be
 
2223
        # checked at registration time for the lazy ones.
 
2224
        self.assertEquals(self.option_name, self.option.name)
 
2225
 
 
2226
    def test_help_is_set(self):
 
2227
        option_help = self.registry.get_help(self.option_name)
 
2228
        self.assertNotEquals(None, option_help)
 
2229
        # Come on, think about the user, he really wants to know whst the
 
2230
        # option is about
 
2231
        self.assertNotEquals('', option_help)
 
2232
 
 
2233
 
1849
2234
class TestSection(tests.TestCase):
1850
2235
 
1851
2236
    # FIXME: Parametrize so that all sections produced by Stores run these
1931
2316
 
1932
2317
class TestReadonlyStore(TestStore):
1933
2318
 
1934
 
    scenarios = [(key, {'get_store': builder})
1935
 
                 for key, builder in test_store_builder_registry.iteritems()]
 
2319
    scenarios = [(key, {'get_store': builder}) for key, builder
 
2320
                 in config.test_store_builder_registry.iteritems()]
1936
2321
 
1937
2322
    def setUp(self):
1938
2323
        super(TestReadonlyStore, self).setUp()
1939
 
        self.branch = self.make_branch('branch')
1940
2324
 
1941
2325
    def test_building_delays_load(self):
1942
2326
        store = self.get_store(self)
1969
2353
        self.assertRaises(AssertionError, store._load_from_string, 'bar=baz')
1970
2354
 
1971
2355
 
 
2356
class TestIniFileStoreContent(tests.TestCaseWithTransport):
 
2357
    """Simulate loading a config store without content of various encodings.
 
2358
 
 
2359
    All files produced by bzr are in utf8 content.
 
2360
 
 
2361
    Users may modify them manually and end up with a file that can't be
 
2362
    loaded. We need to issue proper error messages in this case.
 
2363
    """
 
2364
 
 
2365
    invalid_utf8_char = '\xff'
 
2366
 
 
2367
    def test_load_utf8(self):
 
2368
        """Ensure we can load an utf8-encoded file."""
 
2369
        t = self.get_transport()
 
2370
        # From http://pad.lv/799212
 
2371
        unicode_user = u'b\N{Euro Sign}ar'
 
2372
        unicode_content = u'user=%s' % (unicode_user,)
 
2373
        utf8_content = unicode_content.encode('utf8')
 
2374
        # Store the raw content in the config file
 
2375
        t.put_bytes('foo.conf', utf8_content)
 
2376
        store = config.IniFileStore(t, 'foo.conf')
 
2377
        store.load()
 
2378
        stack = config.Stack([store.get_sections], store)
 
2379
        self.assertEquals(unicode_user, stack.get('user'))
 
2380
 
 
2381
    def test_load_non_ascii(self):
 
2382
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
 
2383
        t = self.get_transport()
 
2384
        t.put_bytes('foo.conf', 'user=foo\n#%s\n' % (self.invalid_utf8_char,))
 
2385
        store = config.IniFileStore(t, 'foo.conf')
 
2386
        self.assertRaises(errors.ConfigContentError, store.load)
 
2387
 
 
2388
    def test_load_erroneous_content(self):
 
2389
        """Ensure we display a proper error on content that can't be parsed."""
 
2390
        t = self.get_transport()
 
2391
        t.put_bytes('foo.conf', '[open_section\n')
 
2392
        store = config.IniFileStore(t, 'foo.conf')
 
2393
        self.assertRaises(errors.ParseConfigError, store.load)
 
2394
 
 
2395
 
 
2396
class TestIniConfigContent(tests.TestCaseWithTransport):
 
2397
    """Simulate loading a IniBasedConfig without content of various encodings.
 
2398
 
 
2399
    All files produced by bzr are in utf8 content.
 
2400
 
 
2401
    Users may modify them manually and end up with a file that can't be
 
2402
    loaded. We need to issue proper error messages in this case.
 
2403
    """
 
2404
 
 
2405
    invalid_utf8_char = '\xff'
 
2406
 
 
2407
    def test_load_utf8(self):
 
2408
        """Ensure we can load an utf8-encoded file."""
 
2409
        # From http://pad.lv/799212
 
2410
        unicode_user = u'b\N{Euro Sign}ar'
 
2411
        unicode_content = u'user=%s' % (unicode_user,)
 
2412
        utf8_content = unicode_content.encode('utf8')
 
2413
        # Store the raw content in the config file
 
2414
        with open('foo.conf', 'wb') as f:
 
2415
            f.write(utf8_content)
 
2416
        conf = config.IniBasedConfig(file_name='foo.conf')
 
2417
        self.assertEquals(unicode_user, conf.get_user_option('user'))
 
2418
 
 
2419
    def test_load_badly_encoded_content(self):
 
2420
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
 
2421
        with open('foo.conf', 'wb') as f:
 
2422
            f.write('user=foo\n#%s\n' % (self.invalid_utf8_char,))
 
2423
        conf = config.IniBasedConfig(file_name='foo.conf')
 
2424
        self.assertRaises(errors.ConfigContentError, conf._get_parser)
 
2425
 
 
2426
    def test_load_erroneous_content(self):
 
2427
        """Ensure we display a proper error on content that can't be parsed."""
 
2428
        with open('foo.conf', 'wb') as f:
 
2429
            f.write('[open_section\n')
 
2430
        conf = config.IniBasedConfig(file_name='foo.conf')
 
2431
        self.assertRaises(errors.ParseConfigError, conf._get_parser)
 
2432
 
 
2433
 
1972
2434
class TestMutableStore(TestStore):
1973
2435
 
1974
 
    scenarios = [(key, {'store_id': key, 'get_store': builder})
1975
 
                 for key, builder in test_store_builder_registry.iteritems()]
 
2436
    scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
 
2437
                 in config.test_store_builder_registry.iteritems()]
1976
2438
 
1977
2439
    def setUp(self):
1978
2440
        super(TestMutableStore, self).setUp()
1979
2441
        self.transport = self.get_transport()
1980
 
        self.branch = self.make_branch('branch')
1981
2442
 
1982
2443
    def has_store(self, store):
1983
2444
        store_basename = urlutils.relative_url(self.transport.external_url(),
1985
2446
        return self.transport.has(store_basename)
1986
2447
 
1987
2448
    def test_save_empty_creates_no_file(self):
1988
 
        if self.store_id == 'branch':
 
2449
        # FIXME: There should be a better way than relying on the test
 
2450
        # parametrization to identify branch.conf -- vila 2011-0526
 
2451
        if self.store_id in ('branch', 'remote_branch'):
1989
2452
            raise tests.TestNotApplicable(
1990
2453
                'branch.conf is *always* created when a branch is initialized')
1991
2454
        store = self.get_store(self)
2004
2467
        self.assertLength(0, sections)
2005
2468
 
2006
2469
    def test_save_with_content_succeeds(self):
2007
 
        if self.store_id == 'branch':
 
2470
        # FIXME: There should be a better way than relying on the test
 
2471
        # parametrization to identify branch.conf -- vila 2011-0526
 
2472
        if self.store_id in ('branch', 'remote_branch'):
2008
2473
            raise tests.TestNotApplicable(
2009
2474
                'branch.conf is *always* created when a branch is initialized')
2010
2475
        store = self.get_store(self)
2049
2514
        self.assertLength(1, sections)
2050
2515
        self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2051
2516
 
 
2517
    def test_load_hook(self):
 
2518
        # We first needs to ensure that the store exists
 
2519
        store = self.get_store(self)
 
2520
        section = store.get_mutable_section('baz')
 
2521
        section.set('foo', 'bar')
 
2522
        store.save()
 
2523
        # Now we can try to load it
 
2524
        store = self.get_store(self)
 
2525
        calls = []
 
2526
        def hook(*args):
 
2527
            calls.append(args)
 
2528
        config.ConfigHooks.install_named_hook('load', hook, None)
 
2529
        self.assertLength(0, calls)
 
2530
        store.load()
 
2531
        self.assertLength(1, calls)
 
2532
        self.assertEquals((store,), calls[0])
 
2533
 
 
2534
    def test_save_hook(self):
 
2535
        calls = []
 
2536
        def hook(*args):
 
2537
            calls.append(args)
 
2538
        config.ConfigHooks.install_named_hook('save', hook, None)
 
2539
        self.assertLength(0, calls)
 
2540
        store = self.get_store(self)
 
2541
        section = store.get_mutable_section('baz')
 
2542
        section.set('foo', 'bar')
 
2543
        store.save()
 
2544
        self.assertLength(1, calls)
 
2545
        self.assertEquals((store,), calls[0])
 
2546
 
2052
2547
 
2053
2548
class TestIniFileStore(TestStore):
2054
2549
 
2103
2598
class TestLockableIniFileStore(TestStore):
2104
2599
 
2105
2600
    def test_create_store_in_created_dir(self):
 
2601
        self.assertPathDoesNotExist('dir')
2106
2602
        t = self.get_transport('dir/subdir')
2107
2603
        store = config.LockableIniFileStore(t, 'foo.conf')
2108
2604
        store.get_mutable_section(None).set('foo', 'bar')
2109
2605
        store.save()
2110
 
 
2111
 
    # FIXME: We should adapt the tests in TestLockableConfig about concurrent
2112
 
    # writes. Since this requires a clearer rewrite, I'll just rely on using
2113
 
    # the same code in LockableIniFileStore (copied from LockableConfig, but
2114
 
    # trivial enough, the main difference is that we add @needs_write_lock on
2115
 
    # save() instead of set_user_option() and remove_user_option()). The intent
2116
 
    # is to ensure that we always get a valid content for the store even when
2117
 
    # concurrent accesses occur, read/write, write/write. It may be worth
2118
 
    # looking into removing the lock dir when it;s not needed anymore and look
2119
 
    # at possible fallouts for concurrent lockers -- vila 20110-04-06
 
2606
        self.assertPathExists('dir/subdir')
 
2607
 
 
2608
 
 
2609
class TestConcurrentStoreUpdates(TestStore):
 
2610
    """Test that Stores properly handle conccurent updates.
 
2611
 
 
2612
    New Store implementation may fail some of these tests but until such
 
2613
    implementations exist it's hard to properly filter them from the scenarios
 
2614
    applied here. If you encounter such a case, contact the bzr devs.
 
2615
    """
 
2616
 
 
2617
    scenarios = [(key, {'get_stack': builder}) for key, builder
 
2618
                 in config.test_stack_builder_registry.iteritems()]
 
2619
 
 
2620
    def setUp(self):
 
2621
        super(TestConcurrentStoreUpdates, self).setUp()
 
2622
        self._content = 'one=1\ntwo=2\n'
 
2623
        self.stack = self.get_stack(self)
 
2624
        if not isinstance(self.stack, config._CompatibleStack):
 
2625
            raise tests.TestNotApplicable(
 
2626
                '%s is not meant to be compatible with the old config design'
 
2627
                % (self.stack,))
 
2628
        self.stack.store._load_from_string(self._content)
 
2629
        # Flush the store
 
2630
        self.stack.store.save()
 
2631
 
 
2632
    def test_simple_read_access(self):
 
2633
        self.assertEquals('1', self.stack.get('one'))
 
2634
 
 
2635
    def test_simple_write_access(self):
 
2636
        self.stack.set('one', 'one')
 
2637
        self.assertEquals('one', self.stack.get('one'))
 
2638
 
 
2639
    def test_listen_to_the_last_speaker(self):
 
2640
        c1 = self.stack
 
2641
        c2 = self.get_stack(self)
 
2642
        c1.set('one', 'ONE')
 
2643
        c2.set('two', 'TWO')
 
2644
        self.assertEquals('ONE', c1.get('one'))
 
2645
        self.assertEquals('TWO', c2.get('two'))
 
2646
        # The second update respect the first one
 
2647
        self.assertEquals('ONE', c2.get('one'))
 
2648
 
 
2649
    def test_last_speaker_wins(self):
 
2650
        # If the same config is not shared, the same variable modified twice
 
2651
        # can only see a single result.
 
2652
        c1 = self.stack
 
2653
        c2 = self.get_stack(self)
 
2654
        c1.set('one', 'c1')
 
2655
        c2.set('one', 'c2')
 
2656
        self.assertEquals('c2', c2.get('one'))
 
2657
        # The first modification is still available until another refresh
 
2658
        # occur
 
2659
        self.assertEquals('c1', c1.get('one'))
 
2660
        c1.set('two', 'done')
 
2661
        self.assertEquals('c2', c1.get('one'))
 
2662
 
 
2663
    def test_writes_are_serialized(self):
 
2664
        c1 = self.stack
 
2665
        c2 = self.get_stack(self)
 
2666
 
 
2667
        # We spawn a thread that will pause *during* the config saving.
 
2668
        before_writing = threading.Event()
 
2669
        after_writing = threading.Event()
 
2670
        writing_done = threading.Event()
 
2671
        c1_save_without_locking_orig = c1.store.save_without_locking
 
2672
        def c1_save_without_locking():
 
2673
            before_writing.set()
 
2674
            c1_save_without_locking_orig()
 
2675
            # The lock is held. We wait for the main thread to decide when to
 
2676
            # continue
 
2677
            after_writing.wait()
 
2678
        c1.store.save_without_locking = c1_save_without_locking
 
2679
        def c1_set():
 
2680
            c1.set('one', 'c1')
 
2681
            writing_done.set()
 
2682
        t1 = threading.Thread(target=c1_set)
 
2683
        # Collect the thread after the test
 
2684
        self.addCleanup(t1.join)
 
2685
        # Be ready to unblock the thread if the test goes wrong
 
2686
        self.addCleanup(after_writing.set)
 
2687
        t1.start()
 
2688
        before_writing.wait()
 
2689
        self.assertRaises(errors.LockContention,
 
2690
                          c2.set, 'one', 'c2')
 
2691
        self.assertEquals('c1', c1.get('one'))
 
2692
        # Let the lock be released
 
2693
        after_writing.set()
 
2694
        writing_done.wait()
 
2695
        c2.set('one', 'c2')
 
2696
        self.assertEquals('c2', c2.get('one'))
 
2697
 
 
2698
    def test_read_while_writing(self):
 
2699
       c1 = self.stack
 
2700
       # We spawn a thread that will pause *during* the write
 
2701
       ready_to_write = threading.Event()
 
2702
       do_writing = threading.Event()
 
2703
       writing_done = threading.Event()
 
2704
       # We override the _save implementation so we know the store is locked
 
2705
       c1_save_without_locking_orig = c1.store.save_without_locking
 
2706
       def c1_save_without_locking():
 
2707
           ready_to_write.set()
 
2708
           # The lock is held. We wait for the main thread to decide when to
 
2709
           # continue
 
2710
           do_writing.wait()
 
2711
           c1_save_without_locking_orig()
 
2712
           writing_done.set()
 
2713
       c1.store.save_without_locking = c1_save_without_locking
 
2714
       def c1_set():
 
2715
           c1.set('one', 'c1')
 
2716
       t1 = threading.Thread(target=c1_set)
 
2717
       # Collect the thread after the test
 
2718
       self.addCleanup(t1.join)
 
2719
       # Be ready to unblock the thread if the test goes wrong
 
2720
       self.addCleanup(do_writing.set)
 
2721
       t1.start()
 
2722
       # Ensure the thread is ready to write
 
2723
       ready_to_write.wait()
 
2724
       self.assertEquals('c1', c1.get('one'))
 
2725
       # If we read during the write, we get the old value
 
2726
       c2 = self.get_stack(self)
 
2727
       self.assertEquals('1', c2.get('one'))
 
2728
       # Let the writing occur and ensure it occurred
 
2729
       do_writing.set()
 
2730
       writing_done.wait()
 
2731
       # Now we get the updated value
 
2732
       c3 = self.get_stack(self)
 
2733
       self.assertEquals('c1', c3.get('one'))
 
2734
 
 
2735
    # FIXME: It may be worth looking into removing the lock dir when it's not
 
2736
    # needed anymore and look at possible fallouts for concurrent lockers. This
 
2737
    # will matter if/when we use config files outside of bazaar directories
 
2738
    # (.bazaar or .bzr) -- vila 20110-04-11
2120
2739
 
2121
2740
 
2122
2741
class TestSectionMatcher(TestStore):
2220
2839
        conf_stack = config.Stack([conf])
2221
2840
        self.assertEquals('bar', conf_stack.get('foo'))
2222
2841
 
 
2842
    def test_get_with_registered_default_value(self):
 
2843
        conf_stack = config.Stack([dict()])
 
2844
        opt = config.Option('foo', default='bar')
 
2845
        self.overrideAttr(config, 'option_registry', registry.Registry())
 
2846
        config.option_registry.register('foo', opt)
 
2847
        self.assertEquals('bar', conf_stack.get('foo'))
 
2848
 
 
2849
    def test_get_without_registered_default_value(self):
 
2850
        conf_stack = config.Stack([dict()])
 
2851
        opt = config.Option('foo')
 
2852
        self.overrideAttr(config, 'option_registry', registry.Registry())
 
2853
        config.option_registry.register('foo', opt)
 
2854
        self.assertEquals(None, conf_stack.get('foo'))
 
2855
 
 
2856
    def test_get_without_default_value_for_not_registered(self):
 
2857
        conf_stack = config.Stack([dict()])
 
2858
        opt = config.Option('foo')
 
2859
        self.overrideAttr(config, 'option_registry', registry.Registry())
 
2860
        self.assertEquals(None, conf_stack.get('foo'))
 
2861
 
2223
2862
    def test_get_first_definition(self):
2224
2863
        conf1 = dict(foo='bar')
2225
2864
        conf2 = dict(foo='baz')
2232
2871
        conf_stack = config.Stack([conf1, conf2])
2233
2872
        self.assertEquals('baz', conf_stack.get('foo'))
2234
2873
 
2235
 
    def test_get_for_empty_stack(self):
2236
 
        conf_stack = config.Stack([])
2237
 
        self.assertEquals(None, conf_stack.get('foo'))
2238
 
 
2239
2874
    def test_get_for_empty_section_callable(self):
2240
2875
        conf_stack = config.Stack([lambda : []])
2241
2876
        self.assertEquals(None, conf_stack.get('foo'))
2248
2883
 
2249
2884
class TestStackWithTransport(tests.TestCaseWithTransport):
2250
2885
 
2251
 
    def setUp(self):
2252
 
        super(TestStackWithTransport, self).setUp()
2253
 
        # FIXME: A more elaborate builder for the stack would avoid building a
2254
 
        # branch even for tests that don't need it.
2255
 
        self.branch = self.make_branch('branch')
 
2886
    scenarios = [(key, {'get_stack': builder}) for key, builder
 
2887
                 in config.test_stack_builder_registry.iteritems()]
 
2888
 
 
2889
 
 
2890
class TestConcreteStacks(TestStackWithTransport):
 
2891
 
 
2892
    def test_build_stack(self):
 
2893
        # Just a smoke test to help debug builders
 
2894
        stack = self.get_stack(self)
 
2895
 
 
2896
 
 
2897
class TestStackGet(TestStackWithTransport):
 
2898
 
 
2899
    def test_get_for_empty_stack(self):
 
2900
        conf = self.get_stack(self)
 
2901
        self.assertEquals(None, conf.get('foo'))
 
2902
 
 
2903
    def test_get_hook(self):
 
2904
        conf = self.get_stack(self)
 
2905
        conf.store._load_from_string('foo=bar')
 
2906
        calls = []
 
2907
        def hook(*args):
 
2908
            calls.append(args)
 
2909
        config.ConfigHooks.install_named_hook('get', hook, None)
 
2910
        self.assertLength(0, calls)
 
2911
        value = conf.get('foo')
 
2912
        self.assertEquals('bar', value)
 
2913
        self.assertLength(1, calls)
 
2914
        self.assertEquals((conf, 'foo', 'bar'), calls[0])
2256
2915
 
2257
2916
 
2258
2917
class TestStackSet(TestStackWithTransport):
2259
2918
 
2260
 
    scenarios = [(key, {'get_stack': builder})
2261
 
                 for key, builder in test_stack_builder_registry.iteritems()]
2262
 
 
2263
2919
    def test_simple_set(self):
2264
2920
        conf = self.get_stack(self)
2265
2921
        conf.store._load_from_string('foo=bar')
2273
2929
        conf.set('foo', 'baz')
2274
2930
        self.assertEquals, 'baz', conf.get('foo')
2275
2931
 
 
2932
    def test_set_hook(self):
 
2933
        calls = []
 
2934
        def hook(*args):
 
2935
            calls.append(args)
 
2936
        config.ConfigHooks.install_named_hook('set', hook, None)
 
2937
        self.assertLength(0, calls)
 
2938
        conf = self.get_stack(self)
 
2939
        conf.set('foo', 'bar')
 
2940
        self.assertLength(1, calls)
 
2941
        self.assertEquals((conf, 'foo', 'bar'), calls[0])
 
2942
 
2276
2943
 
2277
2944
class TestStackRemove(TestStackWithTransport):
2278
2945
 
2279
 
    scenarios = [(key, {'get_stack': builder})
2280
 
                 for key, builder in test_stack_builder_registry.iteritems()]
2281
 
 
2282
2946
    def test_remove_existing(self):
2283
2947
        conf = self.get_stack(self)
2284
2948
        conf.store._load_from_string('foo=bar')
2291
2955
        conf = self.get_stack(self)
2292
2956
        self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
2293
2957
 
2294
 
 
2295
 
class TestConcreteStacks(TestStackWithTransport):
2296
 
 
2297
 
    scenarios = [(key, {'get_stack': builder})
2298
 
                 for key, builder in test_stack_builder_registry.iteritems()]
2299
 
 
2300
 
    def test_build_stack(self):
2301
 
        stack = self.get_stack(self)
 
2958
    def test_remove_hook(self):
 
2959
        calls = []
 
2960
        def hook(*args):
 
2961
            calls.append(args)
 
2962
        config.ConfigHooks.install_named_hook('remove', hook, None)
 
2963
        self.assertLength(0, calls)
 
2964
        conf = self.get_stack(self)
 
2965
        conf.store._load_from_string('foo=bar')
 
2966
        conf.remove('foo')
 
2967
        self.assertLength(1, calls)
 
2968
        self.assertEquals((conf, 'foo'), calls[0])
2302
2969
 
2303
2970
 
2304
2971
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
2472
3139
        self.assertEquals({}, conf._get_config())
2473
3140
        self._got_user_passwd(None, None, conf, 'http', 'foo.net')
2474
3141
 
 
3142
    def test_non_utf8_config(self):
 
3143
        conf = config.AuthenticationConfig(_file=StringIO(
 
3144
                'foo = bar\xff'))
 
3145
        self.assertRaises(errors.ConfigContentError, conf._get_config)
 
3146
        
2475
3147
    def test_missing_auth_section_header(self):
2476
3148
        conf = config.AuthenticationConfig(_file=StringIO('foo = bar'))
2477
3149
        self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
2965
3637
        to be able to choose a user name with no configuration.
2966
3638
        """
2967
3639
        if sys.platform == 'win32':
2968
 
            raise TestSkipped("User name inference not implemented on win32")
 
3640
            raise tests.TestSkipped(
 
3641
                "User name inference not implemented on win32")
2969
3642
        realname, address = config._auto_user_id()
2970
3643
        if os.path.exists('/etc/mailname'):
2971
3644
            self.assertIsNot(None, realname)