1
# Copyright (C) 2006, 2008-2012, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the Registry classes"""
29
from ..sixish import viewitems
32
class TestRegistry(tests.TestCase):
34
def register_stuff(self, a_registry):
35
a_registry.register('one', 1)
36
a_registry.register('two', 2)
37
a_registry.register('four', 4)
38
a_registry.register('five', 5)
40
def test_registry(self):
41
a_registry = registry.Registry()
42
self.register_stuff(a_registry)
44
self.assertTrue(a_registry.default_key is None)
46
# test get() (self.default_key is None)
47
self.assertRaises(KeyError, a_registry.get)
48
self.assertRaises(KeyError, a_registry.get, None)
49
self.assertEqual(2, a_registry.get('two'))
50
self.assertRaises(KeyError, a_registry.get, 'three')
52
# test _set_default_key
53
a_registry.default_key = 'five'
54
self.assertTrue(a_registry.default_key == 'five')
55
self.assertEqual(5, a_registry.get())
56
self.assertEqual(5, a_registry.get(None))
57
# If they ask for a specific entry, they should get KeyError
58
# not the default value. They can always pass None if they prefer
59
self.assertRaises(KeyError, a_registry.get, 'six')
60
self.assertRaises(KeyError, a_registry._set_default_key, 'six')
63
self.assertEqual(['five', 'four', 'one', 'two'], a_registry.keys())
65
def test_registry_funcs(self):
66
a_registry = registry.Registry()
67
self.register_stuff(a_registry)
69
self.assertTrue('one' in a_registry)
70
a_registry.remove('one')
71
self.assertFalse('one' in a_registry)
72
self.assertRaises(KeyError, a_registry.get, 'one')
74
a_registry.register('one', 'one')
76
self.assertEqual(['five', 'four', 'one', 'two'],
77
sorted(a_registry.keys()))
78
self.assertEqual([('five', 5), ('four', 4),
79
('one', 'one'), ('two', 2)],
80
sorted(a_registry.iteritems()))
82
def test_register_override(self):
83
a_registry = registry.Registry()
84
a_registry.register('one', 'one')
85
self.assertRaises(KeyError, a_registry.register, 'one', 'two')
86
self.assertRaises(KeyError, a_registry.register, 'one', 'two',
87
override_existing=False)
89
a_registry.register('one', 'two', override_existing=True)
90
self.assertEqual('two', a_registry.get('one'))
92
self.assertRaises(KeyError, a_registry.register_lazy,
93
'one', 'three', 'four')
95
a_registry.register_lazy('one', 'module', 'member',
96
override_existing=True)
98
def test_registry_help(self):
99
a_registry = registry.Registry()
100
a_registry.register('one', 1, help='help text for one')
101
# We should not have to import the module to return the help
103
a_registry.register_lazy('two', 'nonexistent_module', 'member',
104
help='help text for two')
106
# We should be able to handle a callable to get information
109
def generic_help(reg, key):
110
help_calls.append(key)
111
return 'generic help for %s' % (key,)
112
a_registry.register('three', 3, help=generic_help)
113
a_registry.register_lazy('four', 'nonexistent_module', 'member2',
115
a_registry.register('five', 5)
117
def help_from_object(reg, key):
121
class SimpleObj(object):
123
return 'this is my help'
124
a_registry.register('six', SimpleObj(), help=help_from_object)
126
self.assertEqual('help text for one', a_registry.get_help('one'))
127
self.assertEqual('help text for two', a_registry.get_help('two'))
128
self.assertEqual('generic help for three',
129
a_registry.get_help('three'))
130
self.assertEqual(['three'], help_calls)
131
self.assertEqual('generic help for four',
132
a_registry.get_help('four'))
133
self.assertEqual(['three', 'four'], help_calls)
134
self.assertEqual(None, a_registry.get_help('five'))
135
self.assertEqual('this is my help', a_registry.get_help('six'))
137
self.assertRaises(KeyError, a_registry.get_help, None)
138
self.assertRaises(KeyError, a_registry.get_help, 'seven')
140
a_registry.default_key = 'one'
141
self.assertEqual('help text for one', a_registry.get_help(None))
142
self.assertRaises(KeyError, a_registry.get_help, 'seven')
144
self.assertEqual([('five', None),
145
('four', 'generic help for four'),
146
('one', 'help text for one'),
147
('six', 'this is my help'),
148
('three', 'generic help for three'),
149
('two', 'help text for two'),
150
], sorted((key, a_registry.get_help(key))
151
for key in a_registry.keys()))
153
# We don't know what order it was called in, but we should get
154
# 2 more calls to three and four
155
self.assertEqual(['four', 'four', 'three', 'three'],
158
def test_registry_info(self):
159
a_registry = registry.Registry()
160
a_registry.register('one', 1, info='string info')
161
# We should not have to import the module to return the info
162
a_registry.register_lazy('two', 'nonexistent_module', 'member',
165
# We should be able to handle a callable to get information
166
a_registry.register('three', 3, info=['a', 'list'])
168
a_registry.register_lazy('four', 'nonexistent_module', 'member2',
170
a_registry.register('five', 5)
172
self.assertEqual('string info', a_registry.get_info('one'))
173
self.assertEqual(2, a_registry.get_info('two'))
174
self.assertEqual(['a', 'list'], a_registry.get_info('three'))
175
self.assertIs(obj, a_registry.get_info('four'))
176
self.assertIs(None, a_registry.get_info('five'))
178
self.assertRaises(KeyError, a_registry.get_info, None)
179
self.assertRaises(KeyError, a_registry.get_info, 'six')
181
a_registry.default_key = 'one'
182
self.assertEqual('string info', a_registry.get_info(None))
183
self.assertRaises(KeyError, a_registry.get_info, 'six')
185
self.assertEqual([('five', None),
187
('one', 'string info'),
188
('three', ['a', 'list']),
190
], sorted((key, a_registry.get_info(key))
191
for key in a_registry.keys()))
193
def test_get_prefix(self):
194
my_registry = registry.Registry()
195
http_object = object()
196
sftp_object = object()
197
my_registry.register('http:', http_object)
198
my_registry.register('sftp:', sftp_object)
199
found_object, suffix = my_registry.get_prefix('http://foo/bar')
200
self.assertEqual('//foo/bar', suffix)
201
self.assertIs(http_object, found_object)
202
self.assertIsNot(sftp_object, found_object)
203
found_object, suffix = my_registry.get_prefix('sftp://baz/qux')
204
self.assertEqual('//baz/qux', suffix)
205
self.assertIs(sftp_object, found_object)
207
def test_registry_alias(self):
208
a_registry = registry.Registry()
209
a_registry.register('one', 1, info='string info')
210
a_registry.register_alias('two', 'one')
211
a_registry.register_alias('three', 'one', info='own info')
212
self.assertEqual(a_registry.get('one'), a_registry.get('two'))
213
self.assertEqual(a_registry.get_help('one'),
214
a_registry.get_help('two'))
215
self.assertEqual(a_registry.get_info('one'),
216
a_registry.get_info('two'))
217
self.assertEqual('own info', a_registry.get_info('three'))
218
self.assertEqual({'two': 'one', 'three': 'one'}, a_registry.aliases())
220
{'one': ['three', 'two']},
221
{k: sorted(v) for (k, v) in viewitems(a_registry.alias_map())})
223
def test_registry_alias_exists(self):
224
a_registry = registry.Registry()
225
a_registry.register('one', 1, info='string info')
226
a_registry.register('two', 2)
227
self.assertRaises(KeyError, a_registry.register_alias, 'one', 'one')
229
def test_registry_alias_targetmissing(self):
230
a_registry = registry.Registry()
231
self.assertRaises(KeyError, a_registry.register_alias, 'one', 'two')
234
class TestRegistryIter(tests.TestCase):
235
"""Test registry iteration behaviors.
237
There are dark corner cases here when the registered objects trigger
238
addition in the iterated registry.
242
super(TestRegistryIter, self).setUp()
244
# We create a registry with "official" objects and "hidden"
245
# objects. The later represent the side effects that led to bug #277048
247
_registry = registry.Registry()
250
_registry.register('hidden', None)
252
# Avoid closing over self by binding local variable
253
self.registry = _registry
254
self.registry.register('passive', None)
255
self.registry.register('active', register_more)
256
self.registry.register('passive-too', None)
258
class InvasiveGetter(registry._ObjectGetter):
260
def get_obj(inner_self):
261
# Surprise ! Getting a registered object (think lazy loaded
262
# module) register yet another object !
263
_registry.register('more hidden', None)
264
return inner_self._obj
266
self.registry.register('hacky', None)
267
# We peek under the covers because the alternative is to use lazy
268
# registration and create a module that can reference our test registry
269
# it's too much work for such a corner case -- vila 090916
270
self.registry._dict['hacky'] = InvasiveGetter(None)
272
def _iter_them(self, iter_func_name):
273
iter_func = getattr(self.registry, iter_func_name, None)
274
self.assertIsNot(None, iter_func)
276
for name, func in iter_func():
278
self.assertFalse(name in ('hidden', 'more hidden'))
280
# Using an object register another one as a side effect
282
self.assertEqual(4, count)
284
def test_iteritems(self):
285
# the dict is modified during the iteration
286
self.assertRaises(RuntimeError, self._iter_them, 'iteritems')
288
def test_items(self):
289
# we should be able to iterate even if one item modify the dict
290
self._iter_them('items')
293
class TestRegistryWithDirs(tests.TestCaseInTempDir):
294
"""Registry tests that require temporary dirs"""
296
def create_plugin_file(self, contents):
297
"""Create a file to be used as a plugin.
299
This is created in a temporary directory, so that we
300
are sure that it doesn't start in the plugin path.
303
plugin_name = 'bzr_plugin_a_%s' % (osutils.rand_chars(4),)
304
with open('tmp/' + plugin_name + '.py', 'wb') as f:
308
def create_simple_plugin(self):
309
return self.create_plugin_file(
312
b'def function(a,b,c):\n'
315
b'class MyClass(object):\n'
316
b' def __init__(self, a):\n'
321
def test_lazy_import_registry_foo(self):
322
a_registry = registry.Registry()
323
a_registry.register_lazy('foo', 'breezy.branch', 'Branch')
324
a_registry.register_lazy('bar', 'breezy.branch', 'Branch.hooks')
325
self.assertEqual(branch.Branch, a_registry.get('foo'))
326
self.assertEqual(branch.Branch.hooks, a_registry.get('bar'))
328
def test_lazy_import_registry(self):
329
plugin_name = self.create_simple_plugin()
330
a_registry = registry.Registry()
331
a_registry.register_lazy('obj', plugin_name, 'object1')
332
a_registry.register_lazy('function', plugin_name, 'function')
333
a_registry.register_lazy('klass', plugin_name, 'MyClass')
334
a_registry.register_lazy('module', plugin_name, None)
336
self.assertEqual(['function', 'klass', 'module', 'obj'],
337
sorted(a_registry.keys()))
338
# The plugin should not be loaded until we grab the first object
339
self.assertFalse(plugin_name in sys.modules)
341
# By default the plugin won't be in the search path
342
self.assertRaises(ImportError, a_registry.get, 'obj')
344
plugin_path = self.test_dir + '/tmp'
345
sys.path.append(plugin_path)
347
obj = a_registry.get('obj')
348
self.assertEqual('foo', obj)
349
self.assertTrue(plugin_name in sys.modules)
351
# Now grab another object
352
func = a_registry.get('function')
353
self.assertEqual(plugin_name, func.__module__)
354
self.assertEqual('function', func.__name__)
355
self.assertEqual((1, [], '3'), func(1, [], '3'))
357
# And finally a class
358
klass = a_registry.get('klass')
359
self.assertEqual(plugin_name, klass.__module__)
360
self.assertEqual('MyClass', klass.__name__)
363
self.assertIsInstance(inst, klass)
364
self.assertEqual(1, inst.a)
366
module = a_registry.get('module')
367
self.assertIs(obj, module.object1)
368
self.assertIs(func, module.function)
369
self.assertIs(klass, module.MyClass)
371
sys.path.remove(plugin_path)
373
def test_lazy_import_get_module(self):
374
a_registry = registry.Registry()
375
a_registry.register_lazy('obj', "breezy.tests.test_registry",
377
self.assertEqual("breezy.tests.test_registry",
378
a_registry._get_module("obj"))
380
def test_normal_get_module(self):
381
class AThing(object):
383
a_registry = registry.Registry()
384
a_registry.register("obj", AThing())
385
self.assertEqual("breezy.tests.test_registry",
386
a_registry._get_module("obj"))