577
596
self.fail("%r is an instance of %s rather than %s" % (
578
597
obj, obj.__class__, kls))
580
def callDeprecated(self, expected, callable, *args, **kwargs):
581
"""Assert that a callable is deprecated in a particular way.
599
def _capture_warnings(self, a_callable, *args, **kwargs):
600
"""A helper for callDeprecated and applyDeprecated.
583
:param expected: a list of the deprecation warnings expected, in order
584
:param callable: The callable to call
602
:param a_callable: A callable to call.
585
603
:param args: The positional arguments for the callable
586
604
:param kwargs: The keyword arguments for the callable
605
:return: A tuple (warnings, result). result is the result of calling
606
a_callable(*args, **kwargs).
588
608
local_warnings = []
589
609
def capture_warnings(msg, cls, stacklevel=None):
610
# we've hooked into a deprecation specific callpath,
611
# only deprecations should getting sent via it.
590
612
self.assertEqual(cls, DeprecationWarning)
591
613
local_warnings.append(msg)
592
method = symbol_versioning.warn
614
original_warning_method = symbol_versioning.warn
593
615
symbol_versioning.set_warning_method(capture_warnings)
595
result = callable(*args, **kwargs)
617
result = a_callable(*args, **kwargs)
597
symbol_versioning.set_warning_method(method)
598
self.assertEqual(expected, local_warnings)
619
symbol_versioning.set_warning_method(original_warning_method)
620
return (local_warnings, result)
622
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
623
"""Call a deprecated callable without warning the user.
625
:param deprecation_format: The deprecation format that the callable
626
should have been deprecated with. This is the same type as the
627
parameter to deprecated_method/deprecated_function. If the
628
callable is not deprecated with this format, an assertion error
630
:param a_callable: A callable to call. This may be a bound method or
631
a regular function. It will be called with *args and **kwargs.
632
:param args: The positional arguments for the callable
633
:param kwargs: The keyword arguments for the callable
634
:return: The result of a_callable(*args, **kwargs)
636
call_warnings, result = self._capture_warnings(a_callable,
638
expected_first_warning = symbol_versioning.deprecation_string(
639
a_callable, deprecation_format)
640
if len(call_warnings) == 0:
641
self.fail("No assertion generated by call to %s" %
643
self.assertEqual(expected_first_warning, call_warnings[0])
646
def callDeprecated(self, expected, callable, *args, **kwargs):
647
"""Assert that a callable is deprecated in a particular way.
649
This is a very precise test for unusual requirements. The
650
applyDeprecated helper function is probably more suited for most tests
651
as it allows you to simply specify the deprecation format being used
652
and will ensure that that is issued for the function being called.
654
:param expected: a list of the deprecation warnings expected, in order
655
:param callable: The callable to call
656
:param args: The positional arguments for the callable
657
:param kwargs: The keyword arguments for the callable
659
call_warnings, result = self._capture_warnings(callable,
661
self.assertEqual(expected, call_warnings)
601
664
def _startLogFile(self):
612
675
def _finishLogFile(self):
613
676
"""Finished with the log file.
615
Read contents into memory, close, and delete.
678
Close the file and delete it, unless setKeepLogfile was called.
617
680
if self._log_file is None:
619
682
bzrlib.trace.disable_test_log(self._log_nonce)
620
self._log_file.seek(0)
621
self._log_contents = self._log_file.read()
622
683
self._log_file.close()
623
os.remove(self._log_file_name)
624
self._log_file = self._log_file_name = None
684
self._log_file = None
685
if not self._keep_log_file:
686
os.remove(self._log_file_name)
687
self._log_file_name = None
689
def setKeepLogfile(self):
690
"""Make the logfile not be deleted when _finishLogFile is called."""
691
self._keep_log_file = True
626
693
def addCleanup(self, callable):
627
694
"""Arrange to run a callable when this case is torn down.
641
708
'BZR_EMAIL': None,
642
709
'BZREMAIL': None, # may still be present in the environment
711
'BZR_PROGRESS_BAR': None,
645
713
self.__old_env = {}
646
714
self.addCleanup(self._restoreEnvironment)
647
715
for name, value in new_env.iteritems():
648
716
self._captureVar(name, value)
651
718
def _captureVar(self, name, newvalue):
652
"""Set an environment variable, preparing it to be reset when finished."""
653
self.__old_env[name] = os.environ.get(name, None)
655
if name in os.environ:
658
os.environ[name] = newvalue
661
def _restoreVar(name, value):
663
if name in os.environ:
666
os.environ[name] = value
719
"""Set an environment variable, and reset it when finished."""
720
self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
668
722
def _restoreEnvironment(self):
669
723
for name, value in self.__old_env.iteritems():
670
self._restoreVar(name, value)
724
osutils.set_or_unset_env(name, value)
672
726
def tearDown(self):
673
727
self._runCleanups()
708
762
def log(self, *args):
712
"""Return as a string the log for this test"""
713
if self._log_file_name:
714
return open(self._log_file_name).read()
765
def _get_log(self, keep_log_file=False):
766
"""Return as a string the log for this test. If the file is still
767
on disk and keep_log_file=False, delete the log file and store the
768
content in self._log_contents."""
769
# flush the log file, to get all content
771
bzrlib.trace._trace_file.flush()
772
if self._log_contents:
716
773
return self._log_contents
717
# TODO: Delete the log after it's been read in
774
if self._log_file_name is not None:
775
logfile = open(self._log_file_name)
777
log_contents = logfile.read()
780
if not keep_log_file:
781
self._log_contents = log_contents
782
os.remove(self._log_file_name)
785
return "DELETED log file to reduce memory footprint"
719
787
def capture(self, cmd, retcode=0):
720
788
"""Shortcut that splits cmd into words, runs, and returns stdout"""
721
789
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
723
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None):
791
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
724
793
"""Invoke bzr and return (stdout, stderr).
726
795
Useful for code that wants to check the contents of the
843
923
profiled or debugged so easily.
845
925
:param retcode: The status code that is expected. Defaults to 0. If
846
None is supplied, the status code is not checked.
926
None is supplied, the status code is not checked.
927
:param env_changes: A dictionary which lists changes to environment
928
variables. A value of None will unset the env variable.
929
The values must be strings. The change will only occur in the
930
child, so you don't need to fix the environment after running.
931
:param universal_newlines: Convert CRLF => LF
933
env_changes = kwargs.get('env_changes', {})
934
working_dir = kwargs.get('working_dir', None)
935
process = self.start_bzr_subprocess(args, env_changes=env_changes,
936
working_dir=working_dir)
937
# We distinguish between retcode=None and retcode not passed.
938
supplied_retcode = kwargs.get('retcode', 0)
939
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
940
universal_newlines=kwargs.get('universal_newlines', False),
943
def start_bzr_subprocess(self, process_args, env_changes=None,
944
skip_if_plan_to_signal=False,
946
"""Start bzr in a subprocess for testing.
948
This starts a new Python interpreter and runs bzr in there.
949
This should only be used for tests that have a justifiable need for
950
this isolation: e.g. they are testing startup time, or signal
951
handling, or early startup code, etc. Subprocess code can't be
952
profiled or debugged so easily.
954
:param process_args: a list of arguments to pass to the bzr executable,
955
for example `['--version']`.
956
:param env_changes: A dictionary which lists changes to environment
957
variables. A value of None will unset the env variable.
958
The values must be strings. The change will only occur in the
959
child, so you don't need to fix the environment after running.
960
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
963
:returns: Popen object for the started process.
965
if skip_if_plan_to_signal:
966
if not getattr(os, 'kill', None):
967
raise TestSkipped("os.kill not available.")
969
if env_changes is None:
973
def cleanup_environment():
974
for env_var, value in env_changes.iteritems():
975
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
977
def restore_environment():
978
for env_var, value in old_env.iteritems():
979
osutils.set_or_unset_env(env_var, value)
981
bzr_path = self.get_bzr_path()
984
if working_dir is not None:
985
cwd = osutils.getcwd()
986
os.chdir(working_dir)
989
# win32 subprocess doesn't support preexec_fn
990
# so we will avoid using it on all platforms, just to
991
# make sure the code path is used, and we don't break on win32
992
cleanup_environment()
993
process = Popen([sys.executable, bzr_path] + list(process_args),
994
stdin=PIPE, stdout=PIPE, stderr=PIPE)
996
restore_environment()
1002
def get_bzr_path(self):
1003
"""Return the path of the 'bzr' executable for this test suite."""
848
1004
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
850
process = Popen([sys.executable, bzr_path]+args, stdout=PIPE,
852
out = process.stdout.read()
853
err = process.stderr.read()
854
retcode = process.wait()
855
supplied_retcode = kwargs.get('retcode', 0)
856
if supplied_retcode is not None:
857
assert supplied_retcode == retcode
1005
if not os.path.isfile(bzr_path):
1006
# We are probably installed. Assume sys.argv is the right file
1007
bzr_path = sys.argv[0]
1010
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1011
universal_newlines=False, process_args=None):
1012
"""Finish the execution of process.
1014
:param process: the Popen object returned from start_bzr_subprocess.
1015
:param retcode: The status code that is expected. Defaults to 0. If
1016
None is supplied, the status code is not checked.
1017
:param send_signal: an optional signal to send to the process.
1018
:param universal_newlines: Convert CRLF => LF
1019
:returns: (stdout, stderr)
1021
if send_signal is not None:
1022
os.kill(process.pid, send_signal)
1023
out, err = process.communicate()
1025
if universal_newlines:
1026
out = out.replace('\r\n', '\n')
1027
err = err.replace('\r\n', '\n')
1029
if retcode is not None and retcode != process.returncode:
1030
if process_args is None:
1031
process_args = "(unknown args)"
1032
mutter('Output of bzr %s:\n%s', process_args, out)
1033
mutter('Error for bzr %s:\n%s', process_args, err)
1034
self.fail('Command bzr %s failed with retcode %s != %s'
1035
% (process_args, retcode, process.returncode))
858
1036
return [out, err]
860
1038
def check_inventory_shape(self, inv, shape):
1169
1359
def make_bzrdir(self, relpath, format=None):
1171
url = self.get_url(relpath)
1172
mutter('relpath %r => url %r', relpath, url)
1173
segments = url.split('/')
1174
if segments and segments[-1] not in ('', '.'):
1175
parent = '/'.join(segments[:-1])
1176
t = get_transport(parent)
1361
# might be a relative or absolute path
1362
maybe_a_url = self.get_url(relpath)
1363
segments = maybe_a_url.rsplit('/', 1)
1364
t = get_transport(maybe_a_url)
1365
if len(segments) > 1 and segments[-1] not in ('', '.'):
1178
t.mkdir(segments[-1])
1179
1368
except errors.FileExists:
1181
1370
if format is None:
1182
format=bzrlib.bzrdir.BzrDirFormat.get_default_format()
1183
# FIXME: make this use a single transport someday. RBC 20060418
1184
return format.initialize_on_transport(get_transport(relpath))
1371
format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
1372
return format.initialize_on_transport(t)
1185
1373
except errors.UninitializableFormat:
1186
1374
raise TestSkipped("Format %s is not initializable." % format)
1190
1378
made_control = self.make_bzrdir(relpath, format=format)
1191
1379
return made_control.create_repository(shared=shared)
1381
def make_branch_and_memory_tree(self, relpath):
1382
"""Create a branch on the default transport and a MemoryTree for it."""
1383
b = self.make_branch(relpath)
1384
return memorytree.MemoryTree.create_on_branch(b)
1193
1386
def make_branch_and_tree(self, relpath, format=None):
1194
1387
"""Create a branch on the transport and a tree locally.
1389
If the transport is not a LocalTransport, the Tree can't be created on
1390
the transport. In that case the working tree is created in the local
1391
directory, and the returned tree's branch and repository will also be
1394
This will fail if the original default transport for this test
1395
case wasn't backed by the working directory, as the branch won't
1396
be on disk for us to open it.
1398
:param format: The BzrDirFormat.
1399
:returns: the WorkingTree.
1198
1401
# TODO: always use the local disk path for the working tree,
1199
1402
# this obviously requires a format that supports branch references