1
# Copyright (C) 2006, 2008-2012, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the Registry classes"""
29
from ..sixish import viewitems
32
class TestRegistry(tests.TestCase):
34
def register_stuff(self, a_registry):
35
a_registry.register('one', 1)
36
a_registry.register('two', 2)
37
a_registry.register('four', 4)
38
a_registry.register('five', 5)
40
def test_registry(self):
41
a_registry = registry.Registry()
42
self.register_stuff(a_registry)
44
self.assertTrue(a_registry.default_key is None)
46
# test get() (self.default_key is None)
47
self.assertRaises(KeyError, a_registry.get)
48
self.assertRaises(KeyError, a_registry.get, None)
49
self.assertEqual(2, a_registry.get('two'))
50
self.assertRaises(KeyError, a_registry.get, 'three')
52
# test _set_default_key
53
a_registry.default_key = 'five'
54
self.assertTrue(a_registry.default_key == 'five')
55
self.assertEqual(5, a_registry.get())
56
self.assertEqual(5, a_registry.get(None))
57
# If they ask for a specific entry, they should get KeyError
58
# not the default value. They can always pass None if they prefer
59
self.assertRaises(KeyError, a_registry.get, 'six')
60
self.assertRaises(KeyError, a_registry._set_default_key, 'six')
63
self.assertEqual(['five', 'four', 'one', 'two'], a_registry.keys())
65
def test_registry_funcs(self):
66
a_registry = registry.Registry()
67
self.register_stuff(a_registry)
69
self.assertTrue('one' in a_registry)
70
a_registry.remove('one')
71
self.assertFalse('one' in a_registry)
72
self.assertRaises(KeyError, a_registry.get, 'one')
74
a_registry.register('one', 'one')
76
self.assertEqual(['five', 'four', 'one', 'two'],
77
sorted(a_registry.keys()))
78
self.assertEqual([('five', 5), ('four', 4),
79
('one', 'one'), ('two', 2)],
80
sorted(a_registry.iteritems()))
82
def test_register_override(self):
83
a_registry = registry.Registry()
84
a_registry.register('one', 'one')
85
self.assertRaises(KeyError, a_registry.register, 'one', 'two')
86
self.assertRaises(KeyError, a_registry.register, 'one', 'two',
87
override_existing=False)
89
a_registry.register('one', 'two', override_existing=True)
90
self.assertEqual('two', a_registry.get('one'))
92
self.assertRaises(KeyError, a_registry.register_lazy,
93
'one', 'three', 'four')
95
a_registry.register_lazy('one', 'module', 'member',
96
override_existing=True)
98
def test_registry_help(self):
99
a_registry = registry.Registry()
100
a_registry.register('one', 1, help='help text for one')
101
# We should not have to import the module to return the help
103
a_registry.register_lazy('two', 'nonexistent_module', 'member',
104
help='help text for two')
106
# We should be able to handle a callable to get information
108
def generic_help(reg, key):
109
help_calls.append(key)
110
return 'generic help for %s' % (key,)
111
a_registry.register('three', 3, help=generic_help)
112
a_registry.register_lazy('four', 'nonexistent_module', 'member2',
114
a_registry.register('five', 5)
116
def help_from_object(reg, key):
120
class SimpleObj(object):
122
return 'this is my help'
123
a_registry.register('six', SimpleObj(), help=help_from_object)
125
self.assertEqual('help text for one', a_registry.get_help('one'))
126
self.assertEqual('help text for two', a_registry.get_help('two'))
127
self.assertEqual('generic help for three',
128
a_registry.get_help('three'))
129
self.assertEqual(['three'], help_calls)
130
self.assertEqual('generic help for four',
131
a_registry.get_help('four'))
132
self.assertEqual(['three', 'four'], help_calls)
133
self.assertEqual(None, a_registry.get_help('five'))
134
self.assertEqual('this is my help', a_registry.get_help('six'))
136
self.assertRaises(KeyError, a_registry.get_help, None)
137
self.assertRaises(KeyError, a_registry.get_help, 'seven')
139
a_registry.default_key = 'one'
140
self.assertEqual('help text for one', a_registry.get_help(None))
141
self.assertRaises(KeyError, a_registry.get_help, 'seven')
143
self.assertEqual([('five', None),
144
('four', 'generic help for four'),
145
('one', 'help text for one'),
146
('six', 'this is my help'),
147
('three', 'generic help for three'),
148
('two', 'help text for two'),
149
], sorted((key, a_registry.get_help(key))
150
for key in a_registry.keys()))
152
# We don't know what order it was called in, but we should get
153
# 2 more calls to three and four
154
self.assertEqual(['four', 'four', 'three', 'three'],
157
def test_registry_info(self):
158
a_registry = registry.Registry()
159
a_registry.register('one', 1, info='string info')
160
# We should not have to import the module to return the info
161
a_registry.register_lazy('two', 'nonexistent_module', 'member',
164
# We should be able to handle a callable to get information
165
a_registry.register('three', 3, info=['a', 'list'])
167
a_registry.register_lazy('four', 'nonexistent_module', 'member2',
169
a_registry.register('five', 5)
171
self.assertEqual('string info', a_registry.get_info('one'))
172
self.assertEqual(2, a_registry.get_info('two'))
173
self.assertEqual(['a', 'list'], a_registry.get_info('three'))
174
self.assertIs(obj, a_registry.get_info('four'))
175
self.assertIs(None, a_registry.get_info('five'))
177
self.assertRaises(KeyError, a_registry.get_info, None)
178
self.assertRaises(KeyError, a_registry.get_info, 'six')
180
a_registry.default_key = 'one'
181
self.assertEqual('string info', a_registry.get_info(None))
182
self.assertRaises(KeyError, a_registry.get_info, 'six')
184
self.assertEqual([('five', None),
186
('one', 'string info'),
187
('three', ['a', 'list']),
189
], sorted((key, a_registry.get_info(key))
190
for key in a_registry.keys()))
192
def test_get_prefix(self):
193
my_registry = registry.Registry()
194
http_object = object()
195
sftp_object = object()
196
my_registry.register('http:', http_object)
197
my_registry.register('sftp:', sftp_object)
198
found_object, suffix = my_registry.get_prefix('http://foo/bar')
199
self.assertEqual('//foo/bar', suffix)
200
self.assertIs(http_object, found_object)
201
self.assertIsNot(sftp_object, found_object)
202
found_object, suffix = my_registry.get_prefix('sftp://baz/qux')
203
self.assertEqual('//baz/qux', suffix)
204
self.assertIs(sftp_object, found_object)
206
def test_registry_alias(self):
207
a_registry = registry.Registry()
208
a_registry.register('one', 1, info='string info')
209
a_registry.register_alias('two', 'one')
210
a_registry.register_alias('three', 'one', info='own info')
211
self.assertEqual(a_registry.get('one'), a_registry.get('two'))
212
self.assertEqual(a_registry.get_help('one'), a_registry.get_help('two'))
213
self.assertEqual(a_registry.get_info('one'), a_registry.get_info('two'))
214
self.assertEqual('own info', a_registry.get_info('three'))
215
self.assertEqual({'two': 'one', 'three': 'one'}, a_registry.aliases())
216
self.assertEqual({'one': ['three', 'two']},
217
{k: sorted(v) for (k, v) in viewitems(a_registry.alias_map())})
219
def test_registry_alias_exists(self):
220
a_registry = registry.Registry()
221
a_registry.register('one', 1, info='string info')
222
a_registry.register('two', 2)
223
self.assertRaises(KeyError, a_registry.register_alias, 'one', 'one')
225
def test_registry_alias_targetmissing(self):
226
a_registry = registry.Registry()
227
self.assertRaises(KeyError, a_registry.register_alias, 'one', 'two')
230
class TestRegistryIter(tests.TestCase):
231
"""Test registry iteration behaviors.
233
There are dark corner cases here when the registered objects trigger
234
addition in the iterated registry.
238
super(TestRegistryIter, self).setUp()
240
# We create a registry with "official" objects and "hidden"
241
# objects. The later represent the side effects that led to bug #277048
243
_registry = registry.Registry()
246
_registry.register('hidden', None)
248
# Avoid closing over self by binding local variable
249
self.registry = _registry
250
self.registry.register('passive', None)
251
self.registry.register('active', register_more)
252
self.registry.register('passive-too', None)
254
class InvasiveGetter(registry._ObjectGetter):
256
def get_obj(inner_self):
257
# Surprise ! Getting a registered object (think lazy loaded
258
# module) register yet another object !
259
_registry.register('more hidden', None)
260
return inner_self._obj
262
self.registry.register('hacky', None)
263
# We peek under the covers because the alternative is to use lazy
264
# registration and create a module that can reference our test registry
265
# it's too much work for such a corner case -- vila 090916
266
self.registry._dict['hacky'] = InvasiveGetter(None)
268
def _iter_them(self, iter_func_name):
269
iter_func = getattr(self.registry, iter_func_name, None)
270
self.assertIsNot(None, iter_func)
272
for name, func in iter_func():
274
self.assertFalse(name in ('hidden', 'more hidden'))
276
# Using an object register another one as a side effect
278
self.assertEqual(4, count)
280
def test_iteritems(self):
281
# the dict is modified during the iteration
282
self.assertRaises(RuntimeError, self._iter_them, 'iteritems')
284
def test_items(self):
285
# we should be able to iterate even if one item modify the dict
286
self._iter_them('items')
289
class TestRegistryWithDirs(tests.TestCaseInTempDir):
290
"""Registry tests that require temporary dirs"""
292
def create_plugin_file(self, contents):
293
"""Create a file to be used as a plugin.
295
This is created in a temporary directory, so that we
296
are sure that it doesn't start in the plugin path.
299
plugin_name = 'bzr_plugin_a_%s' % (osutils.rand_chars(4),)
300
with open('tmp/'+plugin_name+'.py', 'wb') as f: f.write(contents)
303
def create_simple_plugin(self):
304
return self.create_plugin_file(
307
b'def function(a,b,c):\n'
310
b'class MyClass(object):\n'
311
b' def __init__(self, a):\n'
316
def test_lazy_import_registry_foo(self):
317
a_registry = registry.Registry()
318
a_registry.register_lazy('foo', 'breezy.branch', 'Branch')
319
a_registry.register_lazy('bar', 'breezy.branch', 'Branch.hooks')
320
self.assertEqual(branch.Branch, a_registry.get('foo'))
321
self.assertEqual(branch.Branch.hooks, a_registry.get('bar'))
323
def test_lazy_import_registry(self):
324
plugin_name = self.create_simple_plugin()
325
a_registry = registry.Registry()
326
a_registry.register_lazy('obj', plugin_name, 'object1')
327
a_registry.register_lazy('function', plugin_name, 'function')
328
a_registry.register_lazy('klass', plugin_name, 'MyClass')
329
a_registry.register_lazy('module', plugin_name, None)
331
self.assertEqual(['function', 'klass', 'module', 'obj'],
332
sorted(a_registry.keys()))
333
# The plugin should not be loaded until we grab the first object
334
self.assertFalse(plugin_name in sys.modules)
336
# By default the plugin won't be in the search path
337
self.assertRaises(ImportError, a_registry.get, 'obj')
339
plugin_path = self.test_dir + '/tmp'
340
sys.path.append(plugin_path)
342
obj = a_registry.get('obj')
343
self.assertEqual('foo', obj)
344
self.assertTrue(plugin_name in sys.modules)
346
# Now grab another object
347
func = a_registry.get('function')
348
self.assertEqual(plugin_name, func.__module__)
349
self.assertEqual('function', func.__name__)
350
self.assertEqual((1, [], '3'), func(1, [], '3'))
352
# And finally a class
353
klass = a_registry.get('klass')
354
self.assertEqual(plugin_name, klass.__module__)
355
self.assertEqual('MyClass', klass.__name__)
358
self.assertIsInstance(inst, klass)
359
self.assertEqual(1, inst.a)
361
module = a_registry.get('module')
362
self.assertIs(obj, module.object1)
363
self.assertIs(func, module.function)
364
self.assertIs(klass, module.MyClass)
366
sys.path.remove(plugin_path)
368
def test_lazy_import_get_module(self):
369
a_registry = registry.Registry()
370
a_registry.register_lazy('obj', "breezy.tests.test_registry",
372
self.assertEqual("breezy.tests.test_registry",
373
a_registry._get_module("obj"))
375
def test_normal_get_module(self):
376
class AThing(object):
378
a_registry = registry.Registry()
379
a_registry.register("obj", AThing())
380
self.assertEqual("breezy.tests.test_registry",
381
a_registry._get_module("obj"))