184
197
"""Format seconds as milliseconds with leading spaces."""
185
198
return "%5dms" % (1000 * seconds)
187
def _ellipsise_unimportant_words(self, a_string, final_width,
189
"""Add ellipses (sp?) for overly long strings.
191
:param keep_start: If true preserve the start of a_string rather
195
if len(a_string) > final_width:
196
result = a_string[:final_width-3] + '...'
200
if len(a_string) > final_width:
201
result = '...' + a_string[3-final_width:]
204
return result.ljust(final_width)
200
def _shortened_test_description(self, test):
202
what = re.sub(r'^bzrlib\.(tests|benchmark)\.', '', what)
206
205
def startTest(self, test):
207
206
unittest.TestResult.startTest(self, test)
208
# In a short description, the important words are in
209
# the beginning, but in an id, the important words are
211
SHOW_DESCRIPTIONS = False
213
if not self.showAll and self.dots and self.pb is not None:
216
final_width = osutils.terminal_width()
217
final_width = final_width - 15 - 8
219
if SHOW_DESCRIPTIONS:
220
what = test.shortDescription()
222
what = self._ellipsise_unimportant_words(what, final_width, keep_start=True)
225
if what.startswith('bzrlib.tests.'):
227
what = self._ellipsise_unimportant_words(what, final_width)
229
self.stream.write(what)
230
elif self.dots and self.pb is not None:
231
self.pb.update(what, self.testsRun - 1, None)
207
self.report_test_start(test)
233
208
self._recordTestStartTime()
235
210
def _recordTestStartTime(self):
240
215
if isinstance(err[1], TestSkipped):
241
216
return self.addSkipped(test, err)
242
217
unittest.TestResult.addError(self, test, err)
218
# We can only do this if we have one of our TestCases, not if
220
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
221
if setKeepLogfile is not None:
243
223
self.extractBenchmarkTime(test)
245
self.stream.writeln("ERROR %s" % self._testTimeString())
246
elif self.dots and self.pb is None:
247
self.stream.write('E')
249
self.pb.update(self._ellipsise_unimportant_words('ERROR', 13), self.testsRun, None)
250
self.pb.note(self._ellipsise_unimportant_words(
251
test.id() + ': ERROR',
252
osutils.terminal_width()))
224
self.report_error(test, err)
254
225
if self.stop_early:
257
228
def addFailure(self, test, err):
258
229
unittest.TestResult.addFailure(self, test, err)
230
# We can only do this if we have one of our TestCases, not if
232
setKeepLogfile = getattr(test, 'setKeepLogfile', None)
233
if setKeepLogfile is not None:
259
235
self.extractBenchmarkTime(test)
261
self.stream.writeln(" FAIL %s" % self._testTimeString())
262
elif self.dots and self.pb is None:
263
self.stream.write('F')
265
self.pb.update(self._ellipsise_unimportant_words('FAIL', 13), self.testsRun, None)
266
self.pb.note(self._ellipsise_unimportant_words(
267
test.id() + ': FAIL',
268
osutils.terminal_width()))
236
self.report_failure(test, err)
270
237
if self.stop_early:
277
244
self._bench_history.write("%s %s\n" % (
278
245
self._formatTime(self._benchmarkTime),
281
self.stream.writeln(' OK %s' % self._testTimeString())
282
for bench_called, stats in getattr(test, '_benchcalls', []):
283
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
284
stats.pprint(file=self.stream)
285
elif self.dots and self.pb is None:
286
self.stream.write('~')
288
self.pb.update(self._ellipsise_unimportant_words('OK', 13), self.testsRun, None)
247
self.report_success(test)
290
248
unittest.TestResult.addSuccess(self, test)
292
250
def addSkipped(self, test, skip_excinfo):
293
251
self.extractBenchmarkTime(test)
295
print >>self.stream, ' SKIP %s' % self._testTimeString()
296
print >>self.stream, ' %s' % skip_excinfo[1]
297
elif self.dots and self.pb is None:
298
self.stream.write('S')
300
self.pb.update(self._ellipsise_unimportant_words('SKIP', 13), self.testsRun, None)
252
self.report_skip(test, skip_excinfo)
302
253
# seems best to treat this as success from point-of-view of unittest
303
254
# -- it actually does nothing so it barely matters :)
324
275
self.stream.writeln(self.separator2)
325
276
self.stream.writeln("%s" % err)
281
def report_cleaning_up(self):
284
def report_success(self, test):
288
class TextTestResult(ExtendedTestResult):
289
"""Displays progress and results of tests in text form"""
291
def __init__(self, *args, **kw):
292
ExtendedTestResult.__init__(self, *args, **kw)
293
self.pb = self.ui.nested_progress_bar()
294
self.pb.show_pct = False
295
self.pb.show_spinner = False
296
self.pb.show_eta = False,
297
self.pb.show_count = False
298
self.pb.show_bar = False
300
def report_starting(self):
301
self.pb.update('[test 0/%d] starting...' % (self.num_tests))
303
def _progress_prefix_text(self):
304
a = '[%d' % self.count
305
if self.num_tests is not None:
306
a +='/%d' % self.num_tests
307
a += ' in %ds' % (time.time() - self._overall_start_time)
309
a += ', %d errors' % self.error_count
310
if self.failure_count:
311
a += ', %d failed' % self.failure_count
313
a += ', %d skipped' % self.skip_count
317
def report_test_start(self, test):
320
self._progress_prefix_text()
322
+ self._shortened_test_description(test))
324
def report_error(self, test, err):
325
self.error_count += 1
326
self.pb.note('ERROR: %s\n %s\n' % (
327
self._shortened_test_description(test),
331
def report_failure(self, test, err):
332
self.failure_count += 1
333
self.pb.note('FAIL: %s\n %s\n' % (
334
self._shortened_test_description(test),
338
def report_skip(self, test, skip_excinfo):
341
# at the moment these are mostly not things we can fix
342
# and so they just produce stipple; use the verbose reporter
345
# show test and reason for skip
346
self.pb.note('SKIP: %s\n %s\n' % (
347
self._shortened_test_description(test),
350
# since the class name was left behind in the still-visible
352
self.pb.note('SKIP: %s' % (skip_excinfo[1]))
354
def report_cleaning_up(self):
355
self.pb.update('cleaning up...')
361
class VerboseTestResult(ExtendedTestResult):
362
"""Produce long output, with one line per test run plus times"""
364
def _ellipsize_to_right(self, a_string, final_width):
365
"""Truncate and pad a string, keeping the right hand side"""
366
if len(a_string) > final_width:
367
result = '...' + a_string[3-final_width:]
370
return result.ljust(final_width)
372
def report_starting(self):
373
self.stream.write('running %d tests...\n' % self.num_tests)
375
def report_test_start(self, test):
377
name = self._shortened_test_description(test)
378
self.stream.write(self._ellipsize_to_right(name, 60))
381
def report_error(self, test, err):
382
self.error_count += 1
383
self.stream.writeln('ERROR %s\n %s'
384
% (self._testTimeString(), err[1]))
386
def report_failure(self, test, err):
387
self.failure_count += 1
388
self.stream.writeln('FAIL %s\n %s'
389
% (self._testTimeString(), err[1]))
391
def report_success(self, test):
392
self.stream.writeln(' OK %s' % self._testTimeString())
393
for bench_called, stats in getattr(test, '_benchcalls', []):
394
self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
395
stats.pprint(file=self.stream)
398
def report_skip(self, test, skip_excinfo):
399
print >>self.stream, ' SKIP %s' % self._testTimeString()
400
print >>self.stream, ' %s' % skip_excinfo[1]
328
403
class TextTestRunner(object):
329
404
stop_on_failure = False
745
826
def log(self, *args):
749
"""Return as a string the log for this test"""
750
if self._log_file_name:
751
return open(self._log_file_name).read()
829
def _get_log(self, keep_log_file=False):
830
"""Return as a string the log for this test. If the file is still
831
on disk and keep_log_file=False, delete the log file and store the
832
content in self._log_contents."""
833
# flush the log file, to get all content
835
bzrlib.trace._trace_file.flush()
836
if self._log_contents:
753
837
return self._log_contents
754
# TODO: Delete the log after it's been read in
838
if self._log_file_name is not None:
839
logfile = open(self._log_file_name)
841
log_contents = logfile.read()
844
if not keep_log_file:
845
self._log_contents = log_contents
846
os.remove(self._log_file_name)
849
return "DELETED log file to reduce memory footprint"
756
851
def capture(self, cmd, retcode=0):
757
852
"""Shortcut that splits cmd into words, runs, and returns stdout"""
758
853
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
760
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None):
855
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
761
857
"""Invoke bzr and return (stdout, stderr).
763
859
Useful for code that wants to check the contents of the
1054
1196
BzrTestBase = TestCase
1199
class TestCaseWithMemoryTransport(TestCase):
1200
"""Common test class for tests that do not need disk resources.
1202
Tests that need disk resources should derive from TestCaseWithTransport.
1204
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1206
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1207
a directory which does not exist. This serves to help ensure test isolation
1208
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1209
must exist. However, TestCaseWithMemoryTransport does not offer local
1210
file defaults for the transport in tests, nor does it obey the command line
1211
override, so tests that accidentally write to the common directory should
1219
def __init__(self, methodName='runTest'):
1220
# allow test parameterisation after test construction and before test
1221
# execution. Variables that the parameteriser sets need to be
1222
# ones that are not set by setUp, or setUp will trash them.
1223
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1224
self.transport_server = default_transport
1225
self.transport_readonly_server = None
1227
def failUnlessExists(self, path):
1228
"""Fail unless path, which may be abs or relative, exists."""
1229
self.failUnless(osutils.lexists(path))
1231
def failIfExists(self, path):
1232
"""Fail if path, which may be abs or relative, exists."""
1233
self.failIf(osutils.lexists(path))
1235
def get_transport(self):
1236
"""Return a writeable transport for the test scratch space"""
1237
t = get_transport(self.get_url())
1238
self.assertFalse(t.is_readonly())
1241
def get_readonly_transport(self):
1242
"""Return a readonly transport for the test scratch space
1244
This can be used to test that operations which should only need
1245
readonly access in fact do not try to write.
1247
t = get_transport(self.get_readonly_url())
1248
self.assertTrue(t.is_readonly())
1251
def get_readonly_server(self):
1252
"""Get the server instance for the readonly transport
1254
This is useful for some tests with specific servers to do diagnostics.
1256
if self.__readonly_server is None:
1257
if self.transport_readonly_server is None:
1258
# readonly decorator requested
1259
# bring up the server
1261
self.__readonly_server = ReadonlyServer()
1262
self.__readonly_server.setUp(self.__server)
1264
self.__readonly_server = self.transport_readonly_server()
1265
self.__readonly_server.setUp()
1266
self.addCleanup(self.__readonly_server.tearDown)
1267
return self.__readonly_server
1269
def get_readonly_url(self, relpath=None):
1270
"""Get a URL for the readonly transport.
1272
This will either be backed by '.' or a decorator to the transport
1273
used by self.get_url()
1274
relpath provides for clients to get a path relative to the base url.
1275
These should only be downwards relative, not upwards.
1277
base = self.get_readonly_server().get_url()
1278
if relpath is not None:
1279
if not base.endswith('/'):
1281
base = base + relpath
1284
def get_server(self):
1285
"""Get the read/write server instance.
1287
This is useful for some tests with specific servers that need
1290
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1291
is no means to override it.
1293
if self.__server is None:
1294
self.__server = MemoryServer()
1295
self.__server.setUp()
1296
self.addCleanup(self.__server.tearDown)
1297
return self.__server
1299
def get_url(self, relpath=None):
1300
"""Get a URL (or maybe a path) for the readwrite transport.
1302
This will either be backed by '.' or to an equivalent non-file based
1304
relpath provides for clients to get a path relative to the base url.
1305
These should only be downwards relative, not upwards.
1307
base = self.get_server().get_url()
1308
if relpath is not None and relpath != '.':
1309
if not base.endswith('/'):
1311
# XXX: Really base should be a url; we did after all call
1312
# get_url()! But sometimes it's just a path (from
1313
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1314
# to a non-escaped local path.
1315
if base.startswith('./') or base.startswith('/'):
1318
base += urlutils.escape(relpath)
1321
def _make_test_root(self):
1322
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1326
root = u'test%04d.tmp' % i
1330
if e.errno == errno.EEXIST:
1335
# successfully created
1336
TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
1338
# make a fake bzr directory there to prevent any tests propagating
1339
# up onto the source directory's real branch
1340
bzrdir.BzrDir.create_standalone_workingtree(
1341
TestCaseWithMemoryTransport.TEST_ROOT)
1343
def makeAndChdirToTestDir(self):
1344
"""Create a temporary directories for this one test.
1346
This must set self.test_home_dir and self.test_dir and chdir to
1349
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1351
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1352
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1353
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1355
def make_branch(self, relpath, format=None):
1356
"""Create a branch on the transport at relpath."""
1357
repo = self.make_repository(relpath, format=format)
1358
return repo.bzrdir.create_branch()
1360
def make_bzrdir(self, relpath, format=None):
1362
# might be a relative or absolute path
1363
maybe_a_url = self.get_url(relpath)
1364
segments = maybe_a_url.rsplit('/', 1)
1365
t = get_transport(maybe_a_url)
1366
if len(segments) > 1 and segments[-1] not in ('', '.'):
1369
except errors.FileExists:
1372
format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
1373
return format.initialize_on_transport(t)
1374
except errors.UninitializableFormat:
1375
raise TestSkipped("Format %s is not initializable." % format)
1377
def make_repository(self, relpath, shared=False, format=None):
1378
"""Create a repository on our default transport at relpath."""
1379
made_control = self.make_bzrdir(relpath, format=format)
1380
return made_control.create_repository(shared=shared)
1382
def make_branch_and_memory_tree(self, relpath, format=None):
1383
"""Create a branch on the default transport and a MemoryTree for it."""
1384
b = self.make_branch(relpath, format=format)
1385
return memorytree.MemoryTree.create_on_branch(b)
1387
def overrideEnvironmentForTesting(self):
1388
os.environ['HOME'] = self.test_home_dir
1389
os.environ['APPDATA'] = self.test_home_dir
1392
super(TestCaseWithMemoryTransport, self).setUp()
1393
self._make_test_root()
1394
_currentdir = os.getcwdu()
1395
def _leaveDirectory():
1396
os.chdir(_currentdir)
1397
self.addCleanup(_leaveDirectory)
1398
self.makeAndChdirToTestDir()
1399
self.overrideEnvironmentForTesting()
1400
self.__readonly_server = None
1401
self.__server = None
1057
class TestCaseInTempDir(TestCase):
1404
class TestCaseInTempDir(TestCaseWithMemoryTransport):
1058
1405
"""Derived class that runs a test within a temporary directory.
1060
1407
This is useful for tests that need to create a branch, etc.
1207
1520
readwrite one must both define get_url() as resolving to os.getcwd().
1210
def __init__(self, methodName='testMethod'):
1211
super(TestCaseWithTransport, self).__init__(methodName)
1212
self.__readonly_server = None
1213
self.__server = None
1214
self.transport_server = default_transport
1215
self.transport_readonly_server = None
1217
def get_readonly_url(self, relpath=None):
1218
"""Get a URL for the readonly transport.
1220
This will either be backed by '.' or a decorator to the transport
1221
used by self.get_url()
1222
relpath provides for clients to get a path relative to the base url.
1223
These should only be downwards relative, not upwards.
1225
base = self.get_readonly_server().get_url()
1226
if relpath is not None:
1227
if not base.endswith('/'):
1229
base = base + relpath
1232
def get_readonly_server(self):
1233
"""Get the server instance for the readonly transport
1235
This is useful for some tests with specific servers to do diagnostics.
1237
if self.__readonly_server is None:
1238
if self.transport_readonly_server is None:
1239
# readonly decorator requested
1240
# bring up the server
1242
self.__readonly_server = ReadonlyServer()
1243
self.__readonly_server.setUp(self.__server)
1245
self.__readonly_server = self.transport_readonly_server()
1246
self.__readonly_server.setUp()
1247
self.addCleanup(self.__readonly_server.tearDown)
1248
return self.__readonly_server
1250
1523
def get_server(self):
1251
"""Get the read/write server instance.
1524
"""See TestCaseWithMemoryTransport.
1253
1526
This is useful for some tests with specific servers that need
1259
1532
self.addCleanup(self.__server.tearDown)
1260
1533
return self.__server
1262
def get_url(self, relpath=None):
1263
"""Get a URL (or maybe a path) for the readwrite transport.
1265
This will either be backed by '.' or to an equivalent non-file based
1267
relpath provides for clients to get a path relative to the base url.
1268
These should only be downwards relative, not upwards.
1270
base = self.get_server().get_url()
1271
if relpath is not None and relpath != '.':
1272
if not base.endswith('/'):
1274
# XXX: Really base should be a url; we did after all call
1275
# get_url()! But sometimes it's just a path (from
1276
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1277
# to a non-escaped local path.
1278
if base.startswith('./') or base.startswith('/'):
1281
base += urlutils.escape(relpath)
1284
def get_transport(self):
1285
"""Return a writeable transport for the test scratch space"""
1286
t = get_transport(self.get_url())
1287
self.assertFalse(t.is_readonly())
1290
def get_readonly_transport(self):
1291
"""Return a readonly transport for the test scratch space
1293
This can be used to test that operations which should only need
1294
readonly access in fact do not try to write.
1296
t = get_transport(self.get_readonly_url())
1297
self.assertTrue(t.is_readonly())
1300
def make_branch(self, relpath, format=None):
1301
"""Create a branch on the transport at relpath."""
1302
repo = self.make_repository(relpath, format=format)
1303
return repo.bzrdir.create_branch()
1305
def make_bzrdir(self, relpath, format=None):
1307
# might be a relative or absolute path
1308
maybe_a_url = self.get_url(relpath)
1309
segments = maybe_a_url.rsplit('/', 1)
1310
t = get_transport(maybe_a_url)
1311
if len(segments) > 1 and segments[-1] not in ('', '.'):
1314
except errors.FileExists:
1317
format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
1318
return format.initialize_on_transport(t)
1319
except errors.UninitializableFormat:
1320
raise TestSkipped("Format %s is not initializable." % format)
1322
def make_repository(self, relpath, shared=False, format=None):
1323
"""Create a repository on our default transport at relpath."""
1324
made_control = self.make_bzrdir(relpath, format=format)
1325
return made_control.create_repository(shared=shared)
1327
def make_branch_and_memory_tree(self, relpath):
1328
"""Create a branch on the default transport and a MemoryTree for it."""
1329
b = self.make_branch(relpath)
1330
return memorytree.MemoryTree.create_on_branch(b)
1332
1535
def make_branch_and_tree(self, relpath, format=None):
1333
1536
"""Create a branch on the transport and a tree locally.