180
213
self._formatTime(self._benchmarkTime),
181
214
self._elapsedTestTimeString())
183
return " %s" % self._elapsedTestTimeString()
216
return " %s" % self._elapsedTestTimeString()
185
218
def _formatTime(self, seconds):
186
219
"""Format seconds as milliseconds with leading spaces."""
187
return "%5dms" % (1000 * seconds)
220
# some benchmarks can take thousands of seconds to run, so we need 8
222
return "%8dms" % (1000 * seconds)
189
def _ellipsise_unimportant_words(self, a_string, final_width,
191
"""Add ellipses (sp?) for overly long strings.
193
:param keep_start: If true preserve the start of a_string rather
197
if len(a_string) > final_width:
198
result = a_string[:final_width-3] + '...'
202
if len(a_string) > final_width:
203
result = '...' + a_string[3-final_width:]
206
return result.ljust(final_width)
224
def _shortened_test_description(self, test):
226
what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
208
229
def startTest(self, test):
209
230
unittest.TestResult.startTest(self, test)
210
# In a short description, the important words are in
211
# the beginning, but in an id, the important words are
213
SHOW_DESCRIPTIONS = False
215
if not self.showAll and self.dots and self.pb is not None:
218
final_width = osutils.terminal_width()
219
final_width = final_width - 15 - 8
221
if SHOW_DESCRIPTIONS:
222
what = test.shortDescription()
224
what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
227
if what.startswith('bzrlib.tests.'):
229
what = self._ellipsise_unimportant_words(what, final_width)
231
self.stream.write(what)
232
elif self.dots and self.pb is not None:
233
self.pb.update(what, self.testsRun - 1, None)
231
self.report_test_start(test)
232
test.number = self.count
235
233
self._recordTestStartTime()
237
235
def _recordTestStartTime(self):
238
236
"""Record that a test has started."""
239
237
self._start_time = time.time()
239
def _cleanupLogFile(self, test):
240
# We can only do this if we have one of our TestCases, not if
242
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
243
if setKeepLogfile is not None:
241
246
def addError(self, test, err):
247
self.extractBenchmarkTime(test)
248
self._cleanupLogFile(test)
242
249
if isinstance(err[1], TestSkipped):
243
return self.addSkipped(test, err)
250
return self.addSkipped(test, err)
251
elif isinstance(err[1], UnavailableFeature):
252
return self.addNotSupported(test, err[1].args[0])
244
253
unittest.TestResult.addError(self, test, err)
245
# We can only do this if we have one of our TestCases, not if
247
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
248
if setKeepLogfile is not None:
250
self.extractBenchmarkTime(test)
252
self.stream.writeln("ERROR %s" % self._testTimeString())
253
elif self.dots and self.pb is None:
254
self.stream.write('E')
256
self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
257
self.pb.note(self._ellipsise_unimportant_words(
258
test.id() + ': ERROR',
259
osutils.terminal_width()))
254
self.error_count += 1
255
self.report_error(test, err)
261
256
if self.stop_early:
264
259
def addFailure(self, test, err):
260
self._cleanupLogFile(test)
261
self.extractBenchmarkTime(test)
262
if isinstance(err[1], KnownFailure):
263
return self.addKnownFailure(test, err)
265
264
unittest.TestResult.addFailure(self, test, err)
266
# We can only do this if we have one of our TestCases, not if
268
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
269
if setKeepLogfile is not None:
271
self.extractBenchmarkTime(test)
273
self.stream.writeln(" FAIL %s" % self._testTimeString())
274
elif self.dots and self.pb is None:
275
self.stream.write('F')
277
self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
278
self.pb.note(self._ellipsise_unimportant_words(
279
test.id() + ': FAIL',
280
osutils.terminal_width()))
265
self.failure_count += 1
266
self.report_failure(test, err)
282
267
if self.stop_early:
270
def addKnownFailure(self, test, err):
271
self.known_failure_count += 1
272
self.report_known_failure(test, err)
274
def addNotSupported(self, test, feature):
275
self.unsupported.setdefault(str(feature), 0)
276
self.unsupported[str(feature)] += 1
277
self.report_unsupported(test, feature)
285
279
def addSuccess(self, test):
286
280
self.extractBenchmarkTime(test)
287
281
if self._bench_history is not None:
336
316
self.stream.writeln(self.separator2)
337
317
self.stream.writeln("%s" % err)
322
def report_cleaning_up(self):
325
def report_success(self, test):
329
class TextTestResult(ExtendedTestResult):
330
"""Displays progress and results of tests in text form"""
332
def __init__(self, stream, descriptions, verbosity,
336
use_numbered_dirs=False,
338
ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
339
bench_history, num_tests, use_numbered_dirs)
341
self.pb = self.ui.nested_progress_bar()
342
self._supplied_pb = False
345
self._supplied_pb = True
346
self.pb.show_pct = False
347
self.pb.show_spinner = False
348
self.pb.show_eta = False,
349
self.pb.show_count = False
350
self.pb.show_bar = False
352
def report_starting(self):
353
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
355
def _progress_prefix_text(self):
356
a = '[%d' % self.count
357
if self.num_tests is not None:
358
a +='/%d' % self.num_tests
359
a += ' in %ds' % (time.time() - self._overall_start_time)
361
a += ', %d errors' % self.error_count
362
if self.failure_count:
363
a += ', %d failed' % self.failure_count
364
if self.known_failure_count:
365
a += ', %d known failures' % self.known_failure_count
367
a += ', %d skipped' % self.skip_count
369
a += ', %d missing features' % len(self.unsupported)
373
def report_test_start(self, test):
376
self._progress_prefix_text()
378
+ self._shortened_test_description(test))
380
def _test_description(self, test):
381
if self.use_numbered_dirs:
382
return '#%d %s' % (self.count,
383
self._shortened_test_description(test))
385
return self._shortened_test_description(test)
387
def report_error(self, test, err):
388
self.pb.note('ERROR: %s\n %s\n',
389
self._test_description(test),
393
def report_failure(self, test, err):
394
self.pb.note('FAIL: %s\n %s\n',
395
self._test_description(test),
399
def report_known_failure(self, test, err):
400
self.pb.note('XFAIL: %s\n%s\n',
401
self._test_description(test), err[1])
403
def report_skip(self, test, skip_excinfo):
406
# at the moment these are mostly not things we can fix
407
# and so they just produce stipple; use the verbose reporter
410
# show test and reason for skip
411
self.pb.note('SKIP: %s\n %s\n',
412
self._shortened_test_description(test),
415
# since the class name was left behind in the still-visible
417
self.pb.note('SKIP: %s', skip_excinfo[1])
419
def report_unsupported(self, test, feature):
420
"""test cannot be run because feature is missing."""
422
def report_cleaning_up(self):
423
self.pb.update('cleaning up...')
426
if not self._supplied_pb:
430
class VerboseTestResult(ExtendedTestResult):
431
"""Produce long output, with one line per test run plus times"""
433
def _ellipsize_to_right(self, a_string, final_width):
434
"""Truncate and pad a string, keeping the right hand side"""
435
if len(a_string) > final_width:
436
result = '...' + a_string[3-final_width:]
439
return result.ljust(final_width)
441
def report_starting(self):
442
self.stream.write('running %d tests...\n' % self.num_tests)
444
def report_test_start(self, test):
446
name = self._shortened_test_description(test)
447
# width needs space for 6 char status, plus 1 for slash, plus 2 10-char
448
# numbers, plus a trailing blank
449
# when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
450
if self.use_numbered_dirs:
451
self.stream.write('%5d ' % self.count)
452
self.stream.write(self._ellipsize_to_right(name,
453
osutils.terminal_width()-36))
455
self.stream.write(self._ellipsize_to_right(name,
456
osutils.terminal_width()-30))
459
def _error_summary(self, err):
461
if self.use_numbered_dirs:
463
return '%s%s' % (indent, err[1])
465
def report_error(self, test, err):
466
self.stream.writeln('ERROR %s\n%s'
467
% (self._testTimeString(),
468
self._error_summary(err)))
470
def report_failure(self, test, err):
471
self.stream.writeln(' FAIL %s\n%s'
472
% (self._testTimeString(),
473
self._error_summary(err)))
475
def report_known_failure(self, test, err):
476
self.stream.writeln('XFAIL %s\n%s'
477
% (self._testTimeString(),
478
self._error_summary(err)))
480
def report_success(self, test):
481
self.stream.writeln(' OK %s' % self._testTimeString())
482
for bench_called, stats in getattr(test, '_benchcalls', []):
483
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
484
stats.pprint(file=self.stream)
485
# flush the stream so that we get smooth output. This verbose mode is
486
# used to show the output in PQM.
489
def report_skip(self, test, skip_excinfo):
491
self.stream.writeln(' SKIP %s\n%s'
492
% (self._testTimeString(),
493
self._error_summary(skip_excinfo)))
495
def report_unsupported(self, test, feature):
496
"""test cannot be run because feature is missing."""
497
self.stream.writeln("NODEP %s\n The feature '%s' is not available."
498
%(self._testTimeString(), feature))
340
502
class TextTestRunner(object):
341
503
stop_on_failure = False
387
561
if failed: self.stream.write(", ")
388
562
self.stream.write("errors=%d" % errored)
563
if result.known_failure_count:
564
if failed or errored: self.stream.write(", ")
565
self.stream.write("known_failure_count=%d" %
566
result.known_failure_count)
389
567
self.stream.writeln(")")
391
self.stream.writeln("OK")
392
if self.pb is not None:
393
self.pb.update('Cleaning up', 0, 1)
394
# This is still a little bogus,
395
# but only a little. Folk not using our testrunner will
396
# have to delete their temp directories themselves.
397
test_root = TestCaseInTempDir.TEST_ROOT
398
if result.wasSuccessful() or not self.keep_output:
399
if test_root is not None:
400
# If LANG=C we probably have created some bogus paths
401
# which rmtree(unicode) will fail to delete
402
# so make sure we are using rmtree(str) to delete everything
403
# except on win32, where rmtree(str) will fail
404
# since it doesn't have the property of byte-stream paths
405
# (they are either ascii or mbcs)
406
if sys.platform == 'win32':
407
# make sure we are using the unicode win32 api
408
test_root = unicode(test_root)
410
test_root = test_root.encode(
411
sys.getfilesystemencoding())
412
osutils.rmtree(test_root)
414
if self.pb is not None:
415
self.pb.note("Failed tests working directories are in '%s'\n",
569
if result.known_failure_count:
570
self.stream.writeln("OK (known_failures=%d)" %
571
result.known_failure_count)
419
"Failed tests working directories are in '%s'\n" %
421
TestCaseInTempDir.TEST_ROOT = None
422
if self.pb is not None:
573
self.stream.writeln("OK")
574
if result.skip_count > 0:
575
skipped = result.skip_count
576
self.stream.writeln('%d test%s skipped' %
577
(skipped, skipped != 1 and "s" or ""))
578
if result.unsupported:
579
for feature, count in sorted(result.unsupported.items()):
580
self.stream.writeln("Missing feature '%s' skipped %d tests." %
574
866
raise AssertionError("value(s) %r not present in container %r" %
575
867
(missing, superlist))
577
def assertIs(self, left, right):
869
def assertListRaises(self, excClass, func, *args, **kwargs):
870
"""Fail unless excClass is raised when the iterator from func is used.
872
Many functions can return generators this makes sure
873
to wrap them in a list() call to make sure the whole generator
874
is run, and that the proper exception is raised.
877
list(func(*args, **kwargs))
881
if getattr(excClass,'__name__', None) is not None:
882
excName = excClass.__name__
884
excName = str(excClass)
885
raise self.failureException, "%s not raised" % excName
887
def assertRaises(self, excClass, callableObj, *args, **kwargs):
888
"""Assert that a callable raises a particular exception.
890
:param excClass: As for the except statement, this may be either an
891
exception class, or a tuple of classes.
892
:param callableObj: A callable, will be passed ``*args`` and
895
Returns the exception so that you can examine it.
898
callableObj(*args, **kwargs)
902
if getattr(excClass,'__name__', None) is not None:
903
excName = excClass.__name__
906
excName = str(excClass)
907
raise self.failureException, "%s not raised" % excName
909
def assertIs(self, left, right, message=None):
578
910
if not (left is right):
579
raise AssertionError("%r is not %r." % (left, right))
911
if message is not None:
912
raise AssertionError(message)
914
raise AssertionError("%r is not %r." % (left, right))
916
def assertIsNot(self, left, right, message=None):
918
if message is not None:
919
raise AssertionError(message)
921
raise AssertionError("%r is %r." % (left, right))
581
923
def assertTransportMode(self, transport, path, mode):
582
924
"""Fail if a path does not have mode mode.
780
1208
if not keep_log_file:
781
1209
self._log_contents = log_contents
782
os.remove(self._log_file_name)
1211
os.remove(self._log_file_name)
1213
if sys.platform == 'win32' and e.errno == errno.EACCES:
1214
print >>sys.stderr, ('Unable to delete log file '
1215
' %r' % self._log_file_name)
783
1218
return log_contents
785
1220
return "DELETED log file to reduce memory footprint"
1222
@deprecated_method(zero_eighteen)
787
1223
def capture(self, cmd, retcode=0):
788
1224
"""Shortcut that splits cmd into words, runs, and returns stdout"""
789
1225
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
1227
def requireFeature(self, feature):
1228
"""This test requires a specific feature is available.
1230
:raises UnavailableFeature: When feature is not available.
1232
if not feature.available():
1233
raise UnavailableFeature(feature)
1235
@deprecated_method(zero_eighteen)
791
1236
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
792
1237
working_dir=None):
793
1238
"""Invoke bzr and return (stdout, stderr).
795
Useful for code that wants to check the contents of the
796
output, the way error messages are presented, etc.
798
This should be the main method for tests that want to exercise the
799
overall behavior of the bzr application (rather than a unit test
800
or a functional test of the library.)
802
Much of the old code runs bzr by forking a new copy of Python, but
803
that is slower, harder to debug, and generally not necessary.
805
This runs bzr through the interface that catches and reports
806
errors, and with logging set to something approximating the
807
default, so that error reporting can be checked.
809
:param argv: arguments to invoke bzr
810
:param retcode: expected return code, or None for don't-care.
811
:param encoding: encoding for sys.stdout and sys.stderr
1240
Don't call this method, just use run_bzr() which is equivalent.
1242
:param argv: Arguments to invoke bzr. This may be either a
1243
single string, in which case it is split by shlex into words,
1244
or a list of arguments.
1245
:param retcode: Expected return code, or None for don't-care.
1246
:param encoding: Encoding for sys.stdout and sys.stderr
812
1247
:param stdin: A string to be used as stdin for the command.
813
1248
:param working_dir: Change to this directory before running
1250
return self._run_bzr_autosplit(argv, retcode=retcode,
1251
encoding=encoding, stdin=stdin, working_dir=working_dir,
1254
def _run_bzr_autosplit(self, args, retcode, encoding, stdin,
1256
"""Run bazaar command line, splitting up a string command line."""
1257
if isinstance(args, basestring):
1258
args = list(shlex.split(args))
1259
return self._run_bzr_core(args, retcode=retcode,
1260
encoding=encoding, stdin=stdin, working_dir=working_dir,
1263
def _run_bzr_core(self, args, retcode, encoding, stdin,
815
1265
if encoding is None:
816
1266
encoding = bzrlib.user_encoding
817
if stdin is not None:
818
stdin = StringIO(stdin)
819
1267
stdout = StringIOWrapper()
820
1268
stderr = StringIOWrapper()
821
1269
stdout.encoding = encoding
822
1270
stderr.encoding = encoding
824
self.log('run bzr: %r', argv)
1272
self.log('run bzr: %r', args)
825
1273
# FIXME: don't call into logging here
826
1274
handler = logging.StreamHandler(stderr)
827
1275
handler.setLevel(logging.INFO)
828
1276
logger = logging.getLogger('')
829
1277
logger.addHandler(handler)
830
old_ui_factory = bzrlib.ui.ui_factory
831
bzrlib.ui.ui_factory = bzrlib.tests.blackbox.TestUIFactory(
834
bzrlib.ui.ui_factory.stdin = stdin
1278
old_ui_factory = ui.ui_factory
1279
ui.ui_factory = TestUIFactory(stdin=stdin, stdout=stdout, stderr=stderr)
837
1282
if working_dir is not None:
1087
1593
sys.stderr = real_stderr
1088
1594
sys.stdin = real_stdin
1090
@symbol_versioning.deprecated_method(symbol_versioning.zero_eleven)
1091
def merge(self, branch_from, wt_to):
1092
"""A helper for tests to do a ui-less merge.
1094
This should move to the main library when someone has time to integrate
1097
# minimal ui-less merge.
1098
wt_to.branch.fetch(branch_from)
1099
base_rev = common_ancestor(branch_from.last_revision(),
1100
wt_to.branch.last_revision(),
1101
wt_to.branch.repository)
1102
merge_inner(wt_to.branch, branch_from.basis_tree(),
1103
wt_to.branch.repository.revision_tree(base_rev),
1105
wt_to.add_parent_tree_id(branch_from.last_revision())
1108
BzrTestBase = TestCase
1596
def reduceLockdirTimeout(self):
1597
"""Reduce the default lock timeout for the duration of the test, so that
1598
if LockContention occurs during a test, it does so quickly.
1600
Tests that expect to provoke LockContention errors should call this.
1602
orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
1604
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
1605
self.addCleanup(resetTimeout)
1606
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
1609
class TestCaseWithMemoryTransport(TestCase):
1610
"""Common test class for tests that do not need disk resources.
1612
Tests that need disk resources should derive from TestCaseWithTransport.
1614
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1616
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1617
a directory which does not exist. This serves to help ensure test isolation
1618
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1619
must exist. However, TestCaseWithMemoryTransport does not offer local
1620
file defaults for the transport in tests, nor does it obey the command line
1621
override, so tests that accidentally write to the common directory should
1624
:cvar TEST_ROOT: Directory containing all temporary directories, plus
1625
a .bzr directory that stops us ascending higher into the filesystem.
1631
def __init__(self, methodName='runTest'):
1632
# allow test parameterisation after test construction and before test
1633
# execution. Variables that the parameteriser sets need to be
1634
# ones that are not set by setUp, or setUp will trash them.
1635
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1636
self.vfs_transport_factory = default_transport
1637
self.transport_server = None
1638
self.transport_readonly_server = None
1639
self.__vfs_server = None
1641
def get_transport(self):
1642
"""Return a writeable transport for the test scratch space"""
1643
t = get_transport(self.get_url())
1644
self.assertFalse(t.is_readonly())
1647
def get_readonly_transport(self):
1648
"""Return a readonly transport for the test scratch space
1650
This can be used to test that operations which should only need
1651
readonly access in fact do not try to write.
1653
t = get_transport(self.get_readonly_url())
1654
self.assertTrue(t.is_readonly())
1657
def create_transport_readonly_server(self):
1658
"""Create a transport server from class defined at init.
1660
This is mostly a hook for daughter classes.
1662
return self.transport_readonly_server()
1664
def get_readonly_server(self):
1665
"""Get the server instance for the readonly transport
1667
This is useful for some tests with specific servers to do diagnostics.
1669
if self.__readonly_server is None:
1670
if self.transport_readonly_server is None:
1671
# readonly decorator requested
1672
# bring up the server
1673
self.__readonly_server = ReadonlyServer()
1674
self.__readonly_server.setUp(self.get_vfs_only_server())
1676
self.__readonly_server = self.create_transport_readonly_server()
1677
self.__readonly_server.setUp(self.get_vfs_only_server())
1678
self.addCleanup(self.__readonly_server.tearDown)
1679
return self.__readonly_server
1681
def get_readonly_url(self, relpath=None):
1682
"""Get a URL for the readonly transport.
1684
This will either be backed by '.' or a decorator to the transport
1685
used by self.get_url()
1686
relpath provides for clients to get a path relative to the base url.
1687
These should only be downwards relative, not upwards.
1689
base = self.get_readonly_server().get_url()
1690
if relpath is not None:
1691
if not base.endswith('/'):
1693
base = base + relpath
1696
def get_vfs_only_server(self):
1697
"""Get the vfs only read/write server instance.
1699
This is useful for some tests with specific servers that need
1702
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1703
is no means to override it.
1705
if self.__vfs_server is None:
1706
self.__vfs_server = MemoryServer()
1707
self.__vfs_server.setUp()
1708
self.addCleanup(self.__vfs_server.tearDown)
1709
return self.__vfs_server
1711
def get_server(self):
1712
"""Get the read/write server instance.
1714
This is useful for some tests with specific servers that need
1717
This is built from the self.transport_server factory. If that is None,
1718
then the self.get_vfs_server is returned.
1720
if self.__server is None:
1721
if self.transport_server is None or self.transport_server is self.vfs_transport_factory:
1722
return self.get_vfs_only_server()
1724
# bring up a decorated means of access to the vfs only server.
1725
self.__server = self.transport_server()
1727
self.__server.setUp(self.get_vfs_only_server())
1728
except TypeError, e:
1729
# This should never happen; the try:Except here is to assist
1730
# developers having to update code rather than seeing an
1731
# uninformative TypeError.
1732
raise Exception, "Old server API in use: %s, %s" % (self.__server, e)
1733
self.addCleanup(self.__server.tearDown)
1734
return self.__server
1736
def _adjust_url(self, base, relpath):
1737
"""Get a URL (or maybe a path) for the readwrite transport.
1739
This will either be backed by '.' or to an equivalent non-file based
1741
relpath provides for clients to get a path relative to the base url.
1742
These should only be downwards relative, not upwards.
1744
if relpath is not None and relpath != '.':
1745
if not base.endswith('/'):
1747
# XXX: Really base should be a url; we did after all call
1748
# get_url()! But sometimes it's just a path (from
1749
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1750
# to a non-escaped local path.
1751
if base.startswith('./') or base.startswith('/'):
1754
base += urlutils.escape(relpath)
1757
def get_url(self, relpath=None):
1758
"""Get a URL (or maybe a path) for the readwrite transport.
1760
This will either be backed by '.' or to an equivalent non-file based
1762
relpath provides for clients to get a path relative to the base url.
1763
These should only be downwards relative, not upwards.
1765
base = self.get_server().get_url()
1766
return self._adjust_url(base, relpath)
1768
def get_vfs_only_url(self, relpath=None):
1769
"""Get a URL (or maybe a path for the plain old vfs transport.
1771
This will never be a smart protocol. It always has all the
1772
capabilities of the local filesystem, but it might actually be a
1773
MemoryTransport or some other similar virtual filesystem.
1775
This is the backing transport (if any) of the server returned by
1776
get_url and get_readonly_url.
1778
:param relpath: provides for clients to get a path relative to the base
1779
url. These should only be downwards relative, not upwards.
1782
base = self.get_vfs_only_server().get_url()
1783
return self._adjust_url(base, relpath)
1785
def _make_test_root(self):
1786
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1788
root = tempfile.mkdtemp(prefix='testbzr-', suffix='.tmp')
1789
TestCaseWithMemoryTransport.TEST_ROOT = root
1791
# make a fake bzr directory there to prevent any tests propagating
1792
# up onto the source directory's real branch
1793
bzrdir.BzrDir.create_standalone_workingtree(root)
1795
# The same directory is used by all tests, and we're not specifically
1796
# told when all tests are finished. This will do.
1797
atexit.register(_rmtree_temp_dir, root)
1799
def makeAndChdirToTestDir(self):
1800
"""Create a temporary directories for this one test.
1802
This must set self.test_home_dir and self.test_dir and chdir to
1805
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1807
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1808
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1809
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1811
def make_branch(self, relpath, format=None):
1812
"""Create a branch on the transport at relpath."""
1813
repo = self.make_repository(relpath, format=format)
1814
return repo.bzrdir.create_branch()
1816
def make_bzrdir(self, relpath, format=None):
1818
# might be a relative or absolute path
1819
maybe_a_url = self.get_url(relpath)
1820
segments = maybe_a_url.rsplit('/', 1)
1821
t = get_transport(maybe_a_url)
1822
if len(segments) > 1 and segments[-1] not in ('', '.'):
1826
if isinstance(format, basestring):
1827
format = bzrdir.format_registry.make_bzrdir(format)
1828
return format.initialize_on_transport(t)
1829
except errors.UninitializableFormat:
1830
raise TestSkipped("Format %s is not initializable." % format)
1832
def make_repository(self, relpath, shared=False, format=None):
1833
"""Create a repository on our default transport at relpath.
1835
Note that relpath must be a relative path, not a full url.
1837
# FIXME: If you create a remoterepository this returns the underlying
1838
# real format, which is incorrect. Actually we should make sure that
1839
# RemoteBzrDir returns a RemoteRepository.
1840
# maybe mbp 20070410
1841
made_control = self.make_bzrdir(relpath, format=format)
1842
return made_control.create_repository(shared=shared)
1844
def make_branch_and_memory_tree(self, relpath, format=None):
1845
"""Create a branch on the default transport and a MemoryTree for it."""
1846
b = self.make_branch(relpath, format=format)
1847
return memorytree.MemoryTree.create_on_branch(b)
1849
def overrideEnvironmentForTesting(self):
1850
os.environ['HOME'] = self.test_home_dir
1851
os.environ['BZR_HOME'] = self.test_home_dir
1854
super(TestCaseWithMemoryTransport, self).setUp()
1855
self._make_test_root()
1856
_currentdir = os.getcwdu()
1857
def _leaveDirectory():
1858
os.chdir(_currentdir)
1859
self.addCleanup(_leaveDirectory)
1860
self.makeAndChdirToTestDir()
1861
self.overrideEnvironmentForTesting()
1862
self.__readonly_server = None
1863
self.__server = None
1864
self.reduceLockdirTimeout()
1111
class TestCaseInTempDir(TestCase):
1867
class TestCaseInTempDir(TestCaseWithMemoryTransport):
1112
1868
"""Derived class that runs a test within a temporary directory.
1114
1870
This is useful for tests that need to create a branch, etc.
1133
1895
self.log("actually: %r" % contents)
1134
1896
self.fail("contents of %s not as expected" % filename)
1136
def _make_test_root(self):
1137
if TestCaseInTempDir.TEST_ROOT is not None:
1141
root = u'test%04d.tmp' % i
1145
if e.errno == errno.EEXIST:
1150
# successfully created
1151
TestCaseInTempDir.TEST_ROOT = osutils.abspath(root)
1153
# make a fake bzr directory there to prevent any tests propagating
1154
# up onto the source directory's real branch
1155
bzrdir.BzrDir.create_standalone_workingtree(TestCaseInTempDir.TEST_ROOT)
1158
super(TestCaseInTempDir, self).setUp()
1159
self._make_test_root()
1160
_currentdir = os.getcwdu()
1161
# shorten the name, to avoid test failures due to path length
1162
short_id = self.id().replace('bzrlib.tests.', '') \
1163
.replace('__main__.', '')[-100:]
1164
# it's possible the same test class is run several times for
1165
# parameterized tests, so make sure the names don't collide.
1169
candidate_dir = '%s/%s.%d' % (self.TEST_ROOT, short_id, i)
1171
candidate_dir = '%s/%s' % (self.TEST_ROOT, short_id)
1172
if os.path.exists(candidate_dir):
1176
os.mkdir(candidate_dir)
1177
self.test_home_dir = candidate_dir + '/home'
1178
os.mkdir(self.test_home_dir)
1179
self.test_dir = candidate_dir + '/work'
1180
os.mkdir(self.test_dir)
1181
os.chdir(self.test_dir)
1183
os.environ['HOME'] = self.test_home_dir
1184
os.environ['APPDATA'] = self.test_home_dir
1185
def _leaveDirectory():
1186
os.chdir(_currentdir)
1187
self.addCleanup(_leaveDirectory)
1898
def makeAndChdirToTestDir(self):
1899
"""See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
1189
def build_tree(self, shape, line_endings='native', transport=None):
1901
For TestCaseInTempDir we create a temporary directory based on the test
1902
name and then create two subdirs - test and home under it.
1904
# create a directory within the top level test directory
1905
candidate_dir = tempfile.mkdtemp(dir=self.TEST_ROOT)
1906
# now create test and home directories within this dir
1907
self.test_base_dir = candidate_dir
1908
self.test_home_dir = self.test_base_dir + '/home'
1909
os.mkdir(self.test_home_dir)
1910
self.test_dir = self.test_base_dir + '/work'
1911
os.mkdir(self.test_dir)
1912
os.chdir(self.test_dir)
1913
# put name of test inside
1914
f = file(self.test_base_dir + '/name', 'w')
1919
self.addCleanup(self.deleteTestDir)
1921
def deleteTestDir(self):
1922
os.chdir(self.TEST_ROOT)
1923
_rmtree_temp_dir(self.test_base_dir)
1925
def build_tree(self, shape, line_endings='binary', transport=None):
1190
1926
"""Build a test tree according to a pattern.
1192
1928
shape is a sequence of file specifications. If the final
1217
1952
elif line_endings == 'native':
1218
1953
end = os.linesep
1220
raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
1955
raise errors.BzrError(
1956
'Invalid line ending request %r' % line_endings)
1221
1957
content = "contents of %s%s" % (name.encode('utf-8'), end)
1222
# Technically 'put()' is the right command. However, put
1223
# uses an AtomicFile, which requires an extra rename into place
1224
# As long as the files didn't exist in the past, append() will
1225
# do the same thing as put()
1226
# On jam's machine, make_kernel_like_tree is:
1227
# put: 4.5-7.5s (averaging 6s)
1229
# put_non_atomic: 2.9-4.5s
1230
1958
transport.put_bytes_non_atomic(urlutils.escape(name), content)
1232
1960
def build_tree_contents(self, shape):
1233
1961
build_tree_contents(shape)
1963
def assertFileEqual(self, content, path):
1964
"""Fail if path does not contain 'content'."""
1965
self.failUnlessExists(path)
1966
f = file(path, 'rb')
1971
self.assertEqualDiff(content, s)
1235
1973
def failUnlessExists(self, path):
1236
"""Fail unless path, which may be abs or relative, exists."""
1237
self.failUnless(osutils.lexists(path))
1974
"""Fail unless path or paths, which may be abs or relative, exist."""
1975
if not isinstance(path, basestring):
1977
self.failUnlessExists(p)
1979
self.failUnless(osutils.lexists(path),path+" does not exist")
1239
1981
def failIfExists(self, path):
1240
"""Fail if path, which may be abs or relative, exists."""
1241
self.failIf(osutils.lexists(path))
1243
def assertFileEqual(self, content, path):
1244
"""Fail if path does not contain 'content'."""
1245
self.failUnless(osutils.lexists(path))
1246
# TODO: jam 20060427 Shouldn't this be 'rb'?
1247
self.assertEqualDiff(content, open(path, 'r').read())
1982
"""Fail if path or paths, which may be abs or relative, exist."""
1983
if not isinstance(path, basestring):
1985
self.failIfExists(p)
1987
self.failIf(osutils.lexists(path),path+" exists")
1989
def assertInWorkingTree(self,path,root_path='.',tree=None):
1990
"""Assert whether path or paths are in the WorkingTree"""
1992
tree = workingtree.WorkingTree.open(root_path)
1993
if not isinstance(path, basestring):
1995
self.assertInWorkingTree(p,tree=tree)
1997
self.assertIsNot(tree.path2id(path), None,
1998
path+' not in working tree.')
2000
def assertNotInWorkingTree(self,path,root_path='.',tree=None):
2001
"""Assert whether path or paths are not in the WorkingTree"""
2003
tree = workingtree.WorkingTree.open(root_path)
2004
if not isinstance(path, basestring):
2006
self.assertNotInWorkingTree(p,tree=tree)
2008
self.assertIs(tree.path2id(path), None, path+' in working tree.')
1250
2011
class TestCaseWithTransport(TestCaseInTempDir):
1261
2022
readwrite one must both define get_url() as resolving to os.getcwd().
1264
def __init__(self, methodName='testMethod'):
1265
super(TestCaseWithTransport, self).__init__(methodName)
1266
self.__readonly_server = None
1267
self.__server = None
1268
self.transport_server = default_transport
1269
self.transport_readonly_server = None
1271
def get_readonly_url(self, relpath=None):
1272
"""Get a URL for the readonly transport.
1274
This will either be backed by '.' or a decorator to the transport
1275
used by self.get_url()
1276
relpath provides for clients to get a path relative to the base url.
1277
These should only be downwards relative, not upwards.
1279
base = self.get_readonly_server().get_url()
1280
if relpath is not None:
1281
if not base.endswith('/'):
1283
base = base + relpath
1286
def get_readonly_server(self):
1287
"""Get the server instance for the readonly transport
1289
This is useful for some tests with specific servers to do diagnostics.
1291
if self.__readonly_server is None:
1292
if self.transport_readonly_server is None:
1293
# readonly decorator requested
1294
# bring up the server
1296
self.__readonly_server = ReadonlyServer()
1297
self.__readonly_server.setUp(self.__server)
1299
self.__readonly_server = self.transport_readonly_server()
1300
self.__readonly_server.setUp()
1301
self.addCleanup(self.__readonly_server.tearDown)
1302
return self.__readonly_server
1304
def get_server(self):
1305
"""Get the read/write server instance.
2025
def get_vfs_only_server(self):
2026
"""See TestCaseWithMemoryTransport.
1307
2028
This is useful for some tests with specific servers that need
1310
if self.__server is None:
1311
self.__server = self.transport_server()
1312
self.__server.setUp()
1313
self.addCleanup(self.__server.tearDown)
1314
return self.__server
1316
def get_url(self, relpath=None):
1317
"""Get a URL (or maybe a path) for the readwrite transport.
1319
This will either be backed by '.' or to an equivalent non-file based
1321
relpath provides for clients to get a path relative to the base url.
1322
These should only be downwards relative, not upwards.
1324
base = self.get_server().get_url()
1325
if relpath is not None and relpath != '.':
1326
if not base.endswith('/'):
1328
# XXX: Really base should be a url; we did after all call
1329
# get_url()! But sometimes it's just a path (from
1330
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1331
# to a non-escaped local path.
1332
if base.startswith('./') or base.startswith('/'):
1335
base += urlutils.escape(relpath)
1338
def get_transport(self):
1339
"""Return a writeable transport for the test scratch space"""
1340
t = get_transport(self.get_url())
1341
self.assertFalse(t.is_readonly())
1344
def get_readonly_transport(self):
1345
"""Return a readonly transport for the test scratch space
1347
This can be used to test that operations which should only need
1348
readonly access in fact do not try to write.
1350
t = get_transport(self.get_readonly_url())
1351
self.assertTrue(t.is_readonly())
1354
def make_branch(self, relpath, format=None):
1355
"""Create a branch on the transport at relpath."""
1356
repo = self.make_repository(relpath, format=format)
1357
return repo.bzrdir.create_branch()
1359
def make_bzrdir(self, relpath, format=None):
1361
# might be a relative or absolute path
1362
maybe_a_url = self.get_url(relpath)
1363
segments = maybe_a_url.rsplit('/', 1)
1364
t = get_transport(maybe_a_url)
1365
if len(segments) > 1 and segments[-1] not in ('', '.'):
1368
except errors.FileExists:
1371
format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
1372
return format.initialize_on_transport(t)
1373
except errors.UninitializableFormat:
1374
raise TestSkipped("Format %s is not initializable." % format)
1376
def make_repository(self, relpath, shared=False, format=None):
1377
"""Create a repository on our default transport at relpath."""
1378
made_control = self.make_bzrdir(relpath, format=format)
1379
return made_control.create_repository(shared=shared)
1381
def make_branch_and_memory_tree(self, relpath):
1382
"""Create a branch on the default transport and a MemoryTree for it."""
1383
b = self.make_branch(relpath)
1384
return memorytree.MemoryTree.create_on_branch(b)
2031
if self.__vfs_server is None:
2032
self.__vfs_server = self.vfs_transport_factory()
2033
self.__vfs_server.setUp()
2034
self.addCleanup(self.__vfs_server.tearDown)
2035
return self.__vfs_server
1386
2037
def make_branch_and_tree(self, relpath, format=None):
1387
2038
"""Create a branch on the transport and a tree locally.
1389
2040
If the transport is not a LocalTransport, the Tree can't be created on
1390
the transport. In that case the working tree is created in the local
1391
directory, and the returned tree's branch and repository will also be
1394
This will fail if the original default transport for this test
1395
case wasn't backed by the working directory, as the branch won't
1396
be on disk for us to open it.
2041
the transport. In that case if the vfs_transport_factory is
2042
LocalURLServer the working tree is created in the local
2043
directory backing the transport, and the returned tree's branch and
2044
repository will also be accessed locally. Otherwise a lightweight
2045
checkout is created and returned.
1398
2047
:param format: The BzrDirFormat.
1399
2048
:returns: the WorkingTree.
1446
2110
def setUp(self):
1447
2111
super(ChrootedTestCase, self).setUp()
1448
if not self.transport_server == bzrlib.transport.memory.MemoryServer:
1449
self.transport_readonly_server = bzrlib.transport.http.HttpServer
1452
def filter_suite_by_re(suite, pattern):
1453
result = TestUtil.TestSuite()
2112
if not self.vfs_transport_factory == MemoryServer:
2113
self.transport_readonly_server = HttpServer
2116
def filter_suite_by_re(suite, pattern, exclude_pattern=None,
2117
random_order=False):
2118
"""Create a test suite by filtering another one.
2120
:param suite: the source suite
2121
:param pattern: pattern that names must match
2122
:param exclude_pattern: pattern that names must not match, if any
2123
:param random_order: if True, tests in the new suite will be put in
2125
:returns: the newly created suite
2127
return sort_suite_by_re(suite, pattern, exclude_pattern,
2128
random_order, False)
2131
def sort_suite_by_re(suite, pattern, exclude_pattern=None,
2132
random_order=False, append_rest=True):
2133
"""Create a test suite by sorting another one.
2135
:param suite: the source suite
2136
:param pattern: pattern that names must match in order to go
2137
first in the new suite
2138
:param exclude_pattern: pattern that names must not match, if any
2139
:param random_order: if True, tests in the new suite will be put in
2141
:param append_rest: if False, pattern is a strict filter and not
2142
just an ordering directive
2143
:returns: the newly created suite
1454
2147
filter_re = re.compile(pattern)
2148
if exclude_pattern is not None:
2149
exclude_re = re.compile(exclude_pattern)
1455
2150
for test in iter_suite_tests(suite):
1456
if filter_re.search(test.id()):
1457
result.addTest(test)
2152
if exclude_pattern is None or not exclude_re.search(test_id):
2153
if filter_re.search(test_id):
2158
random.shuffle(first)
2159
random.shuffle(second)
2160
return TestUtil.TestSuite(first + second)
1461
2163
def run_suite(suite, name='test', verbose=False, pattern=".*",
1462
stop_on_failure=False, keep_output=False,
1463
transport=None, lsprof_timed=None, bench_history=None):
1464
TestCaseInTempDir._TEST_NAME = name
2164
stop_on_failure=False,
2165
transport=None, lsprof_timed=None, bench_history=None,
2166
matching_tests_first=None,
2170
exclude_pattern=None,
2172
use_numbered_dirs = bool(numbered_dirs)
1465
2174
TestCase._gather_lsprof_in_benchmarks = lsprof_timed
2175
if numbered_dirs is not None:
2176
TestCaseInTempDir.use_numbered_dirs = use_numbered_dirs
1471
pb = progress.ProgressBar()
1472
2181
runner = TextTestRunner(stream=sys.stdout,
1473
2182
descriptions=0,
1474
2183
verbosity=verbosity,
1475
keep_output=keep_output,
1477
bench_history=bench_history)
2184
bench_history=bench_history,
2185
use_numbered_dirs=use_numbered_dirs,
2186
list_only=list_only,
1478
2188
runner.stop_on_failure=stop_on_failure
1480
suite = filter_suite_by_re(suite, pattern)
2189
# Initialise the random number generator and display the seed used.
2190
# We convert the seed to a long to make it reuseable across invocations.
2191
random_order = False
2192
if random_seed is not None:
2194
if random_seed == "now":
2195
random_seed = long(time.time())
2197
# Convert the seed to a long if we can
2199
random_seed = long(random_seed)
2202
runner.stream.writeln("Randomizing test order using seed %s\n" %
2204
random.seed(random_seed)
2205
# Customise the list of tests if requested
2206
if pattern != '.*' or exclude_pattern is not None or random_order:
2207
if matching_tests_first:
2208
suite = sort_suite_by_re(suite, pattern, exclude_pattern,
2211
suite = filter_suite_by_re(suite, pattern, exclude_pattern,
1481
2213
result = runner.run(suite)
1482
2214
return result.wasSuccessful()
1485
2217
def selftest(verbose=False, pattern=".*", stop_on_failure=True,
1487
2218
transport=None,
1488
2219
test_suite_factory=None,
1489
2220
lsprof_timed=None,
1490
bench_history=None):
2222
matching_tests_first=None,
2226
exclude_pattern=None):
1491
2227
"""Run the whole test suite under the enhanced runner"""
1492
2228
# XXX: Very ugly way to do this...
1493
2229
# Disable warning about old formats because we don't want it to disturb
1523
2264
testmod_names = [
1524
2265
'bzrlib.tests.test_ancestry',
2266
'bzrlib.tests.test_annotate',
1525
2267
'bzrlib.tests.test_api',
1526
2268
'bzrlib.tests.test_atomicfile',
1527
2269
'bzrlib.tests.test_bad_files',
1528
2270
'bzrlib.tests.test_branch',
2271
'bzrlib.tests.test_branchbuilder',
2272
'bzrlib.tests.test_bugtracker',
1529
2273
'bzrlib.tests.test_bundle',
1530
2274
'bzrlib.tests.test_bzrdir',
1531
2275
'bzrlib.tests.test_cache_utf8',
1532
'bzrlib.tests.test_command',
2276
'bzrlib.tests.test_commands',
1533
2277
'bzrlib.tests.test_commit',
1534
2278
'bzrlib.tests.test_commit_merge',
1535
2279
'bzrlib.tests.test_config',
1536
2280
'bzrlib.tests.test_conflicts',
2281
'bzrlib.tests.test_pack',
2282
'bzrlib.tests.test_counted_lock',
1537
2283
'bzrlib.tests.test_decorators',
2284
'bzrlib.tests.test_delta',
2285
'bzrlib.tests.test_deprecated_graph',
1538
2286
'bzrlib.tests.test_diff',
1539
'bzrlib.tests.test_doc_generate',
2287
'bzrlib.tests.test_dirstate',
1540
2288
'bzrlib.tests.test_errors',
1541
2289
'bzrlib.tests.test_escaped_store',
2290
'bzrlib.tests.test_extract',
1542
2291
'bzrlib.tests.test_fetch',
1543
2292
'bzrlib.tests.test_ftp_transport',
2293
'bzrlib.tests.test_generate_docs',
2294
'bzrlib.tests.test_generate_ids',
2295
'bzrlib.tests.test_globbing',
1544
2296
'bzrlib.tests.test_gpg',
1545
2297
'bzrlib.tests.test_graph',
1546
2298
'bzrlib.tests.test_hashcache',
2299
'bzrlib.tests.test_help',
2300
'bzrlib.tests.test_hooks',
1547
2301
'bzrlib.tests.test_http',
1548
2302
'bzrlib.tests.test_http_response',
2303
'bzrlib.tests.test_https_ca_bundle',
1549
2304
'bzrlib.tests.test_identitymap',
1550
2305
'bzrlib.tests.test_ignores',
2306
'bzrlib.tests.test_info',
1551
2307
'bzrlib.tests.test_inv',
1552
2308
'bzrlib.tests.test_knit',
1553
2309
'bzrlib.tests.test_lazy_import',
2310
'bzrlib.tests.test_lazy_regex',
1554
2311
'bzrlib.tests.test_lockdir',
1555
2312
'bzrlib.tests.test_lockable_files',
1556
2313
'bzrlib.tests.test_log',
2314
'bzrlib.tests.test_lsprof',
1557
2315
'bzrlib.tests.test_memorytree',
1558
2316
'bzrlib.tests.test_merge',
1559
2317
'bzrlib.tests.test_merge3',
1560
2318
'bzrlib.tests.test_merge_core',
2319
'bzrlib.tests.test_merge_directive',
1561
2320
'bzrlib.tests.test_missing',
1562
2321
'bzrlib.tests.test_msgeditor',
1563
2322
'bzrlib.tests.test_nonascii',
1564
2323
'bzrlib.tests.test_options',
1565
2324
'bzrlib.tests.test_osutils',
2325
'bzrlib.tests.test_osutils_encodings',
1566
2326
'bzrlib.tests.test_patch',
1567
2327
'bzrlib.tests.test_patches',
1568
2328
'bzrlib.tests.test_permissions',
1569
2329
'bzrlib.tests.test_plugins',
1570
2330
'bzrlib.tests.test_progress',
1571
2331
'bzrlib.tests.test_reconcile',
2332
'bzrlib.tests.test_registry',
2333
'bzrlib.tests.test_remote',
1572
2334
'bzrlib.tests.test_repository',
1573
2335
'bzrlib.tests.test_revert',
1574
2336
'bzrlib.tests.test_revision',
1633
2421
"""Adapt the modules in mods_list using adapter and add to suite."""
1634
2422
for test in iter_suite_tests(loader.loadTestsFromModuleNames(mods_list)):
1635
2423
suite.addTests(adapter.adapt(test))
2426
def _rmtree_temp_dir(dirname):
2427
# If LANG=C we probably have created some bogus paths
2428
# which rmtree(unicode) will fail to delete
2429
# so make sure we are using rmtree(str) to delete everything
2430
# except on win32, where rmtree(str) will fail
2431
# since it doesn't have the property of byte-stream paths
2432
# (they are either ascii or mbcs)
2433
if sys.platform == 'win32':
2434
# make sure we are using the unicode win32 api
2435
dirname = unicode(dirname)
2437
dirname = dirname.encode(sys.getfilesystemencoding())
2439
osutils.rmtree(dirname)
2441
if sys.platform == 'win32' and e.errno == errno.EACCES:
2442
print >>sys.stderr, ('Permission denied: '
2443
'unable to remove testing dir '
2444
'%s' % os.path.basename(dirname))
2449
def clean_selftest_output(root=None, quiet=False):
2450
"""Remove all selftest output directories from root directory.
2452
:param root: root directory for clean
2453
(if ommitted or None then clean current directory).
2454
:param quiet: suppress report about deleting directories
2457
re_dir = re.compile(r'''test\d\d\d\d\.tmp''')
2460
for i in os.listdir(root):
2461
if os.path.isdir(i) and re_dir.match(i):
2463
print 'delete directory:', i
2467
class Feature(object):
2468
"""An operating system Feature."""
2471
self._available = None
2473
def available(self):
2474
"""Is the feature available?
2476
:return: True if the feature is available.
2478
if self._available is None:
2479
self._available = self._probe()
2480
return self._available
2483
"""Implement this method in concrete features.
2485
:return: True if the feature is available.
2487
raise NotImplementedError
2490
if getattr(self, 'feature_name', None):
2491
return self.feature_name()
2492
return self.__class__.__name__
2495
class TestScenarioApplier(object):
2496
"""A tool to apply scenarios to tests."""
2498
def adapt(self, test):
2499
"""Return a TestSuite containing a copy of test for each scenario."""
2500
result = unittest.TestSuite()
2501
for scenario in self.scenarios:
2502
result.addTest(self.adapt_test_to_scenario(test, scenario))
2505
def adapt_test_to_scenario(self, test, scenario):
2506
"""Copy test and apply scenario to it.
2508
:param test: A test to adapt.
2509
:param scenario: A tuple describing the scenarion.
2510
The first element of the tuple is the new test id.
2511
The second element is a dict containing attributes to set on the
2513
:return: The adapted test.
2515
from copy import deepcopy
2516
new_test = deepcopy(test)
2517
for name, value in scenario[1].items():
2518
setattr(new_test, name, value)
2519
new_id = "%s(%s)" % (new_test.id(), scenario[0])
2520
new_test.id = lambda: new_id