128
209
os.remove('socket')
211
def test_kind_marker(self):
212
self.assertEqual(osutils.kind_marker('file'), '')
213
self.assertEqual(osutils.kind_marker('directory'), '/')
214
self.assertEqual(osutils.kind_marker('symlink'), '@')
215
self.assertEqual(osutils.kind_marker('tree-reference'), '+')
217
def test_get_umask(self):
218
if sys.platform == 'win32':
219
# umask always returns '0', no way to set it
220
self.assertEqual(0, osutils.get_umask())
223
orig_umask = osutils.get_umask()
226
self.assertEqual(0222, osutils.get_umask())
228
self.assertEqual(0022, osutils.get_umask())
230
self.assertEqual(0002, osutils.get_umask())
232
self.assertEqual(0027, osutils.get_umask())
236
def assertFormatedDelta(self, expected, seconds):
237
"""Assert osutils.format_delta formats as expected"""
238
actual = osutils.format_delta(seconds)
239
self.assertEqual(expected, actual)
241
def test_format_delta(self):
242
self.assertFormatedDelta('0 seconds ago', 0)
243
self.assertFormatedDelta('1 second ago', 1)
244
self.assertFormatedDelta('10 seconds ago', 10)
245
self.assertFormatedDelta('59 seconds ago', 59)
246
self.assertFormatedDelta('89 seconds ago', 89)
247
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
248
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
249
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
250
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
251
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
252
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
253
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
254
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
255
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
256
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
257
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
258
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
259
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
260
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
261
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
262
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
263
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
264
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
266
# We handle when time steps the wrong direction because computers
267
# don't have synchronized clocks.
268
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
269
self.assertFormatedDelta('1 second in the future', -1)
270
self.assertFormatedDelta('2 seconds in the future', -2)
272
def test_format_date(self):
273
self.assertRaises(errors.UnsupportedTimezoneFormat,
274
osutils.format_date, 0, timezone='foo')
276
def test_dereference_path(self):
277
self.requireFeature(SymlinkFeature)
278
cwd = osutils.realpath('.')
280
bar_path = osutils.pathjoin(cwd, 'bar')
281
# Using './' to avoid bug #1213894 (first path component not
282
# dereferenced) in Python 2.4.1 and earlier
283
self.assertEqual(bar_path, osutils.realpath('./bar'))
284
os.symlink('bar', 'foo')
285
self.assertEqual(bar_path, osutils.realpath('./foo'))
287
# Does not dereference terminal symlinks
288
foo_path = osutils.pathjoin(cwd, 'foo')
289
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
291
# Dereferences parent symlinks
293
baz_path = osutils.pathjoin(bar_path, 'baz')
294
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
296
# Dereferences parent symlinks that are the first path element
297
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
299
# Dereferences parent symlinks in absolute paths
300
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
301
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
303
def test_changing_access(self):
304
f = file('file', 'w')
308
# Make a file readonly
309
osutils.make_readonly('file')
310
mode = os.lstat('file').st_mode
311
self.assertEqual(mode, mode & 0777555)
313
# Make a file writable
314
osutils.make_writable('file')
315
mode = os.lstat('file').st_mode
316
self.assertEqual(mode, mode | 0200)
318
if osutils.has_symlinks():
319
# should not error when handed a symlink
320
os.symlink('nonexistent', 'dangling')
321
osutils.make_readonly('dangling')
322
osutils.make_writable('dangling')
324
def test_kind_marker(self):
325
self.assertEqual("", osutils.kind_marker("file"))
326
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
327
self.assertEqual("@", osutils.kind_marker("symlink"))
328
self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
330
def test_host_os_dereferences_symlinks(self):
331
osutils.host_os_dereferences_symlinks()
334
class TestPumpFile(TestCase):
335
"""Test pumpfile method."""
337
# create a test datablock
338
self.block_size = 512
339
pattern = '0123456789ABCDEF'
340
self.test_data = pattern * (3 * self.block_size / len(pattern))
341
self.test_data_len = len(self.test_data)
343
def test_bracket_block_size(self):
344
"""Read data in blocks with the requested read size bracketing the
346
# make sure test data is larger than max read size
347
self.assertTrue(self.test_data_len > self.block_size)
349
from_file = FakeReadFile(self.test_data)
352
# read (max / 2) bytes and verify read size wasn't affected
353
num_bytes_to_read = self.block_size / 2
354
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
355
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
356
self.assertEqual(from_file.get_read_count(), 1)
358
# read (max) bytes and verify read size wasn't affected
359
num_bytes_to_read = self.block_size
360
from_file.reset_read_count()
361
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
362
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
363
self.assertEqual(from_file.get_read_count(), 1)
365
# read (max + 1) bytes and verify read size was limited
366
num_bytes_to_read = self.block_size + 1
367
from_file.reset_read_count()
368
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
369
self.assertEqual(from_file.get_max_read_size(), self.block_size)
370
self.assertEqual(from_file.get_read_count(), 2)
372
# finish reading the rest of the data
373
num_bytes_to_read = self.test_data_len - to_file.tell()
374
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
376
# report error if the data wasn't equal (we only report the size due
377
# to the length of the data)
378
response_data = to_file.getvalue()
379
if response_data != self.test_data:
380
message = "Data not equal. Expected %d bytes, received %d."
381
self.fail(message % (len(response_data), self.test_data_len))
383
def test_specified_size(self):
384
"""Request a transfer larger than the maximum block size and verify
385
that the maximum read doesn't exceed the block_size."""
386
# make sure test data is larger than max read size
387
self.assertTrue(self.test_data_len > self.block_size)
389
# retrieve data in blocks
390
from_file = FakeReadFile(self.test_data)
392
pumpfile(from_file, to_file, self.test_data_len, self.block_size)
394
# verify read size was equal to the maximum read size
395
self.assertTrue(from_file.get_max_read_size() > 0)
396
self.assertEqual(from_file.get_max_read_size(), self.block_size)
397
self.assertEqual(from_file.get_read_count(), 3)
399
# report error if the data wasn't equal (we only report the size due
400
# to the length of the data)
401
response_data = to_file.getvalue()
402
if response_data != self.test_data:
403
message = "Data not equal. Expected %d bytes, received %d."
404
self.fail(message % (len(response_data), self.test_data_len))
406
def test_to_eof(self):
407
"""Read to end-of-file and verify that the reads are not larger than
408
the maximum read size."""
409
# make sure test data is larger than max read size
410
self.assertTrue(self.test_data_len > self.block_size)
412
# retrieve data to EOF
413
from_file = FakeReadFile(self.test_data)
415
pumpfile(from_file, to_file, -1, self.block_size)
417
# verify read size was equal to the maximum read size
418
self.assertEqual(from_file.get_max_read_size(), self.block_size)
419
self.assertEqual(from_file.get_read_count(), 4)
421
# report error if the data wasn't equal (we only report the size due
422
# to the length of the data)
423
response_data = to_file.getvalue()
424
if response_data != self.test_data:
425
message = "Data not equal. Expected %d bytes, received %d."
426
self.fail(message % (len(response_data), self.test_data_len))
428
def test_defaults(self):
429
"""Verifies that the default arguments will read to EOF -- this
430
test verifies that any existing usages of pumpfile will not be broken
431
with this new version."""
432
# retrieve data using default (old) pumpfile method
433
from_file = FakeReadFile(self.test_data)
435
pumpfile(from_file, to_file)
437
# report error if the data wasn't equal (we only report the size due
438
# to the length of the data)
439
response_data = to_file.getvalue()
440
if response_data != self.test_data:
441
message = "Data not equal. Expected %d bytes, received %d."
442
self.fail(message % (len(response_data), self.test_data_len))
131
444
class TestSafeUnicode(TestCase):
281
742
expected = zip(expected_names, expected_kind)
282
743
self.assertEqual(expected, sorted(read_result))
745
def test_walkdirs(self):
754
self.build_tree(tree)
755
expected_dirblocks = [
757
[('0file', '0file', 'file'),
758
('1dir', '1dir', 'directory'),
759
('2file', '2file', 'file'),
763
[('1dir/0file', '0file', 'file'),
764
('1dir/1dir', '1dir', 'directory'),
767
(('1dir/1dir', './1dir/1dir'),
774
for dirdetail, dirblock in osutils.walkdirs('.'):
775
if len(dirblock) and dirblock[0][1] == '.bzr':
776
# this tests the filtering of selected paths
779
result.append((dirdetail, dirblock))
781
self.assertTrue(found_bzrdir)
782
self.assertEqual(expected_dirblocks,
783
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
784
# you can search a subdir only, with a supplied prefix.
786
for dirblock in osutils.walkdirs('./1dir', '1dir'):
787
result.append(dirblock)
788
self.assertEqual(expected_dirblocks[1:],
789
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
791
def test__walkdirs_utf8(self):
800
self.build_tree(tree)
801
expected_dirblocks = [
803
[('0file', '0file', 'file'),
804
('1dir', '1dir', 'directory'),
805
('2file', '2file', 'file'),
809
[('1dir/0file', '0file', 'file'),
810
('1dir/1dir', '1dir', 'directory'),
813
(('1dir/1dir', './1dir/1dir'),
820
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
821
if len(dirblock) and dirblock[0][1] == '.bzr':
822
# this tests the filtering of selected paths
825
result.append((dirdetail, dirblock))
827
self.assertTrue(found_bzrdir)
828
self.assertEqual(expected_dirblocks,
829
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
830
# you can search a subdir only, with a supplied prefix.
832
for dirblock in osutils.walkdirs('./1dir', '1dir'):
833
result.append(dirblock)
834
self.assertEqual(expected_dirblocks[1:],
835
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
837
def _filter_out_stat(self, result):
838
"""Filter out the stat value from the walkdirs result"""
839
for dirdetail, dirblock in result:
841
for info in dirblock:
842
# Ignore info[3] which is the stat
843
new_dirblock.append((info[0], info[1], info[2], info[4]))
844
dirblock[:] = new_dirblock
846
def test__walkdirs_utf8_selection(self):
847
# Just trigger the function once, to make sure it has selected a real
849
list(osutils._walkdirs_utf8('.'))
850
if WalkdirsWin32Feature.available():
851
# If the compiled form is available, make sure it is used
852
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
853
self.assertIs(_walkdirs_utf8_win32_find_file,
854
osutils._real_walkdirs_utf8)
855
elif sys.platform == 'win32':
856
self.assertIs(osutils._walkdirs_unicode_to_utf8,
857
osutils._real_walkdirs_utf8)
858
elif osutils._fs_enc.upper() in ('UTF-8', 'ASCII', 'ANSI_X3.4-1968'): # ascii
859
self.assertIs(osutils._walkdirs_fs_utf8,
860
osutils._real_walkdirs_utf8)
862
self.assertIs(osutils._walkdirs_unicode_to_utf8,
863
osutils._real_walkdirs_utf8)
865
def _save_platform_info(self):
866
cur_winver = win32utils.winver
867
cur_fs_enc = osutils._fs_enc
868
cur_real_walkdirs_utf8 = osutils._real_walkdirs_utf8
870
win32utils.winver = cur_winver
871
osutils._fs_enc = cur_fs_enc
872
osutils._real_walkdirs_utf8 = cur_real_walkdirs_utf8
873
self.addCleanup(restore)
875
def assertWalkdirsUtf8Is(self, expected):
876
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
877
# Force it to redetect
878
osutils._real_walkdirs_utf8 = None
879
# Nothing to list, but should still trigger the selection logic
880
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
881
self.assertIs(expected, osutils._real_walkdirs_utf8)
883
def test_force_walkdirs_utf8_fs_utf8(self):
884
self._save_platform_info()
885
win32utils.winver = None # Avoid the win32 detection code
886
osutils._fs_enc = 'UTF-8'
887
self.assertWalkdirsUtf8Is(osutils._walkdirs_fs_utf8)
889
def test_force_walkdirs_utf8_fs_ascii(self):
890
self._save_platform_info()
891
win32utils.winver = None # Avoid the win32 detection code
892
osutils._fs_enc = 'US-ASCII'
893
self.assertWalkdirsUtf8Is(osutils._walkdirs_fs_utf8)
895
def test_force_walkdirs_utf8_fs_ANSI(self):
896
self._save_platform_info()
897
win32utils.winver = None # Avoid the win32 detection code
898
osutils._fs_enc = 'ANSI_X3.4-1968'
899
self.assertWalkdirsUtf8Is(osutils._walkdirs_fs_utf8)
901
def test_force_walkdirs_utf8_fs_latin1(self):
902
self._save_platform_info()
903
win32utils.winver = None # Avoid the win32 detection code
904
osutils._fs_enc = 'latin1'
905
self.assertWalkdirsUtf8Is(osutils._walkdirs_unicode_to_utf8)
907
def test_force_walkdirs_utf8_nt(self):
908
self.requireFeature(WalkdirsWin32Feature)
909
self._save_platform_info()
910
win32utils.winver = 'Windows NT'
911
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
912
self.assertWalkdirsUtf8Is(_walkdirs_utf8_win32_find_file)
914
def test_force_walkdirs_utf8_nt(self):
915
self.requireFeature(WalkdirsWin32Feature)
916
self._save_platform_info()
917
win32utils.winver = 'Windows 98'
918
self.assertWalkdirsUtf8Is(osutils._walkdirs_unicode_to_utf8)
920
def test_unicode_walkdirs(self):
921
"""Walkdirs should always return unicode paths."""
922
name0 = u'0file-\xb6'
923
name1 = u'1dir-\u062c\u0648'
924
name2 = u'2file-\u0633'
929
name1 + '/' + name1 + '/',
933
self.build_tree(tree)
935
raise TestSkipped('Could not represent Unicode chars'
936
' in current encoding.')
937
expected_dirblocks = [
939
[(name0, name0, 'file', './' + name0),
940
(name1, name1, 'directory', './' + name1),
941
(name2, name2, 'file', './' + name2),
944
((name1, './' + name1),
945
[(name1 + '/' + name0, name0, 'file', './' + name1
947
(name1 + '/' + name1, name1, 'directory', './' + name1
951
((name1 + '/' + name1, './' + name1 + '/' + name1),
956
result = list(osutils.walkdirs('.'))
957
self._filter_out_stat(result)
958
self.assertEqual(expected_dirblocks, result)
959
result = list(osutils.walkdirs(u'./'+name1, name1))
960
self._filter_out_stat(result)
961
self.assertEqual(expected_dirblocks[1:], result)
963
def test_unicode__walkdirs_utf8(self):
964
"""Walkdirs_utf8 should always return utf8 paths.
966
The abspath portion might be in unicode or utf-8
968
name0 = u'0file-\xb6'
969
name1 = u'1dir-\u062c\u0648'
970
name2 = u'2file-\u0633'
975
name1 + '/' + name1 + '/',
979
self.build_tree(tree)
981
raise TestSkipped('Could not represent Unicode chars'
982
' in current encoding.')
983
name0 = name0.encode('utf8')
984
name1 = name1.encode('utf8')
985
name2 = name2.encode('utf8')
987
expected_dirblocks = [
989
[(name0, name0, 'file', './' + name0),
990
(name1, name1, 'directory', './' + name1),
991
(name2, name2, 'file', './' + name2),
994
((name1, './' + name1),
995
[(name1 + '/' + name0, name0, 'file', './' + name1
997
(name1 + '/' + name1, name1, 'directory', './' + name1
1001
((name1 + '/' + name1, './' + name1 + '/' + name1),
1007
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1008
# all abspaths are Unicode, and encode them back into utf8.
1009
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1010
self.assertIsInstance(dirdetail[0], str)
1011
if isinstance(dirdetail[1], unicode):
1012
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1013
dirblock = [list(info) for info in dirblock]
1014
for info in dirblock:
1015
self.assertIsInstance(info[4], unicode)
1016
info[4] = info[4].encode('utf8')
1018
for info in dirblock:
1019
self.assertIsInstance(info[0], str)
1020
self.assertIsInstance(info[1], str)
1021
self.assertIsInstance(info[4], str)
1022
# Remove the stat information
1023
new_dirblock.append((info[0], info[1], info[2], info[4]))
1024
result.append((dirdetail, new_dirblock))
1025
self.assertEqual(expected_dirblocks, result)
1027
def test_unicode__walkdirs_unicode_to_utf8(self):
1028
"""walkdirs_unicode_to_utf8 should be a safe fallback everywhere
1030
The abspath portion should be in unicode
1032
name0u = u'0file-\xb6'
1033
name1u = u'1dir-\u062c\u0648'
1034
name2u = u'2file-\u0633'
1038
name1u + '/' + name0u,
1039
name1u + '/' + name1u + '/',
1043
self.build_tree(tree)
1044
except UnicodeError:
1045
raise TestSkipped('Could not represent Unicode chars'
1046
' in current encoding.')
1047
name0 = name0u.encode('utf8')
1048
name1 = name1u.encode('utf8')
1049
name2 = name2u.encode('utf8')
1051
# All of the abspaths should be in unicode, all of the relative paths
1053
expected_dirblocks = [
1055
[(name0, name0, 'file', './' + name0u),
1056
(name1, name1, 'directory', './' + name1u),
1057
(name2, name2, 'file', './' + name2u),
1060
((name1, './' + name1u),
1061
[(name1 + '/' + name0, name0, 'file', './' + name1u
1063
(name1 + '/' + name1, name1, 'directory', './' + name1u
1067
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1072
result = list(osutils._walkdirs_unicode_to_utf8('.'))
1073
self._filter_out_stat(result)
1074
self.assertEqual(expected_dirblocks, result)
1076
def test__walkdirs_utf_win32_find_file(self):
1077
self.requireFeature(WalkdirsWin32Feature)
1078
self.requireFeature(tests.UnicodeFilenameFeature)
1079
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
1080
name0u = u'0file-\xb6'
1081
name1u = u'1dir-\u062c\u0648'
1082
name2u = u'2file-\u0633'
1086
name1u + '/' + name0u,
1087
name1u + '/' + name1u + '/',
1090
self.build_tree(tree)
1091
name0 = name0u.encode('utf8')
1092
name1 = name1u.encode('utf8')
1093
name2 = name2u.encode('utf8')
1095
# All of the abspaths should be in unicode, all of the relative paths
1097
expected_dirblocks = [
1099
[(name0, name0, 'file', './' + name0u),
1100
(name1, name1, 'directory', './' + name1u),
1101
(name2, name2, 'file', './' + name2u),
1104
((name1, './' + name1u),
1105
[(name1 + '/' + name0, name0, 'file', './' + name1u
1107
(name1 + '/' + name1, name1, 'directory', './' + name1u
1111
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1116
result = list(_walkdirs_utf8_win32_find_file(u'.'))
1117
self._filter_out_stat(result)
1118
self.assertEqual(expected_dirblocks, result)
1120
def assertStatIsCorrect(self, path, win32stat):
1121
os_stat = os.stat(path)
1122
self.assertEqual(os_stat.st_size, win32stat.st_size)
1123
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1124
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1125
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1126
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1127
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1128
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1130
def test__walkdirs_utf_win32_find_file_stat_file(self):
1131
"""make sure our Stat values are valid"""
1132
self.requireFeature(WalkdirsWin32Feature)
1133
self.requireFeature(tests.UnicodeFilenameFeature)
1134
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
1135
name0u = u'0file-\xb6'
1136
name0 = name0u.encode('utf8')
1137
self.build_tree([name0u])
1138
# I hate to sleep() here, but I'm trying to make the ctime different
1141
f = open(name0u, 'ab')
1143
f.write('just a small update')
1147
result = list(_walkdirs_utf8_win32_find_file(u'.'))
1148
entry = result[0][1][0]
1149
self.assertEqual((name0, name0, 'file'), entry[:3])
1150
self.assertEqual(u'./' + name0u, entry[4])
1151
self.assertStatIsCorrect(entry[4], entry[3])
1152
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1154
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1155
"""make sure our Stat values are valid"""
1156
self.requireFeature(WalkdirsWin32Feature)
1157
self.requireFeature(tests.UnicodeFilenameFeature)
1158
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
1159
name0u = u'0dir-\u062c\u0648'
1160
name0 = name0u.encode('utf8')
1161
self.build_tree([name0u + '/'])
1163
result = list(_walkdirs_utf8_win32_find_file(u'.'))
1164
entry = result[0][1][0]
1165
self.assertEqual((name0, name0, 'directory'), entry[:3])
1166
self.assertEqual(u'./' + name0u, entry[4])
1167
self.assertStatIsCorrect(entry[4], entry[3])
1169
def assertPathCompare(self, path_less, path_greater):
1170
"""check that path_less and path_greater compare correctly."""
1171
self.assertEqual(0, osutils.compare_paths_prefix_order(
1172
path_less, path_less))
1173
self.assertEqual(0, osutils.compare_paths_prefix_order(
1174
path_greater, path_greater))
1175
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1176
path_less, path_greater))
1177
self.assertEqual(1, osutils.compare_paths_prefix_order(
1178
path_greater, path_less))
1180
def test_compare_paths_prefix_order(self):
1181
# root before all else
1182
self.assertPathCompare("/", "/a")
1183
# alpha within a dir
1184
self.assertPathCompare("/a", "/b")
1185
self.assertPathCompare("/b", "/z")
1186
# high dirs before lower.
1187
self.assertPathCompare("/z", "/a/a")
1188
# except if the deeper dir should be output first
1189
self.assertPathCompare("/a/b/c", "/d/g")
1190
# lexical betwen dirs of the same height
1191
self.assertPathCompare("/a/z", "/z/z")
1192
self.assertPathCompare("/a/c/z", "/a/d/e")
1194
# this should also be consistent for no leading / paths
1195
# root before all else
1196
self.assertPathCompare("", "a")
1197
# alpha within a dir
1198
self.assertPathCompare("a", "b")
1199
self.assertPathCompare("b", "z")
1200
# high dirs before lower.
1201
self.assertPathCompare("z", "a/a")
1202
# except if the deeper dir should be output first
1203
self.assertPathCompare("a/b/c", "d/g")
1204
# lexical betwen dirs of the same height
1205
self.assertPathCompare("a/z", "z/z")
1206
self.assertPathCompare("a/c/z", "a/d/e")
1208
def test_path_prefix_sorting(self):
1209
"""Doing a sort on path prefix should match our sample data."""
1224
dir_sorted_paths = [
1240
sorted(original_paths, key=osutils.path_prefix_key))
1241
# using the comparison routine shoudl work too:
1244
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1247
class TestCopyTree(TestCaseInTempDir):
1249
def test_copy_basic_tree(self):
1250
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1251
osutils.copy_tree('source', 'target')
1252
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1253
self.assertEqual(['c'], os.listdir('target/b'))
1255
def test_copy_tree_target_exists(self):
1256
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1258
osutils.copy_tree('source', 'target')
1259
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1260
self.assertEqual(['c'], os.listdir('target/b'))
1262
def test_copy_tree_symlinks(self):
1263
self.requireFeature(SymlinkFeature)
1264
self.build_tree(['source/'])
1265
os.symlink('a/generic/path', 'source/lnk')
1266
osutils.copy_tree('source', 'target')
1267
self.assertEqual(['lnk'], os.listdir('target'))
1268
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1270
def test_copy_tree_handlers(self):
1271
processed_files = []
1272
processed_links = []
1273
def file_handler(from_path, to_path):
1274
processed_files.append(('f', from_path, to_path))
1275
def dir_handler(from_path, to_path):
1276
processed_files.append(('d', from_path, to_path))
1277
def link_handler(from_path, to_path):
1278
processed_links.append((from_path, to_path))
1279
handlers = {'file':file_handler,
1280
'directory':dir_handler,
1281
'symlink':link_handler,
1284
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1285
if osutils.has_symlinks():
1286
os.symlink('a/generic/path', 'source/lnk')
1287
osutils.copy_tree('source', 'target', handlers=handlers)
1289
self.assertEqual([('d', 'source', 'target'),
1290
('f', 'source/a', 'target/a'),
1291
('d', 'source/b', 'target/b'),
1292
('f', 'source/b/c', 'target/b/c'),
1294
self.failIfExists('target')
1295
if osutils.has_symlinks():
1296
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1299
#class TestTerminalEncoding has been moved to test_osutils_encodings.py
1300
# [bialix] 2006/12/26
1303
class TestSetUnsetEnv(TestCase):
1304
"""Test updating the environment"""
1307
super(TestSetUnsetEnv, self).setUp()
1309
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1310
'Environment was not cleaned up properly.'
1311
' Variable BZR_TEST_ENV_VAR should not exist.')
1313
if 'BZR_TEST_ENV_VAR' in os.environ:
1314
del os.environ['BZR_TEST_ENV_VAR']
1316
self.addCleanup(cleanup)
1319
"""Test that we can set an env variable"""
1320
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1321
self.assertEqual(None, old)
1322
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1324
def test_double_set(self):
1325
"""Test that we get the old value out"""
1326
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1327
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1328
self.assertEqual('foo', old)
1329
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1331
def test_unicode(self):
1332
"""Environment can only contain plain strings
1334
So Unicode strings must be encoded.
1336
uni_val, env_val = probe_unicode_in_user_encoding()
1338
raise TestSkipped('Cannot find a unicode character that works in'
1339
' encoding %s' % (bzrlib.user_encoding,))
1341
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1342
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1344
def test_unset(self):
1345
"""Test that passing None will remove the env var"""
1346
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1347
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1348
self.assertEqual('foo', old)
1349
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1350
self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1353
class TestLocalTimeOffset(TestCase):
1355
def test_local_time_offset(self):
1356
"""Test that local_time_offset() returns a sane value."""
1357
offset = osutils.local_time_offset()
1358
self.assertTrue(isinstance(offset, int))
1359
# Test that the offset is no more than a eighteen hours in
1361
# Time zone handling is system specific, so it is difficult to
1362
# do more specific tests, but a value outside of this range is
1364
eighteen_hours = 18 * 3600
1365
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
1367
def test_local_time_offset_with_timestamp(self):
1368
"""Test that local_time_offset() works with a timestamp."""
1369
offset = osutils.local_time_offset(1000000000.1234567)
1370
self.assertTrue(isinstance(offset, int))
1371
eighteen_hours = 18 * 3600
1372
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
1375
class TestShaFileByName(TestCaseInTempDir):
1377
def test_sha_empty(self):
1378
self.build_tree_contents([('foo', '')])
1379
expected_sha = osutils.sha_string('')
1380
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1382
def test_sha_mixed_endings(self):
1383
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1384
self.build_tree_contents([('foo', text)])
1385
expected_sha = osutils.sha_string(text)
1386
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1390
r'''# Copyright (C) 2005, 2006 Canonical Ltd
1392
# This program is free software; you can redistribute it and/or modify
1393
# it under the terms of the GNU General Public License as published by
1394
# the Free Software Foundation; either version 2 of the License, or
1395
# (at your option) any later version.
1397
# This program is distributed in the hope that it will be useful,
1398
# but WITHOUT ANY WARRANTY; without even the implied warranty of
1399
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1400
# GNU General Public License for more details.
1402
# You should have received a copy of the GNU General Public License
1403
# along with this program; if not, write to the Free Software
1404
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
1407
# NOTE: If update these, please also update the help for global-options in
1408
# bzrlib/help_topics/__init__.py
1411
"""Set of flags that enable different debug behaviour.
1413
These are set with eg ``-Dlock`` on the bzr command line.
1417
* auth - show authentication sections used
1418
* error - show stack traces for all top level exceptions
1419
* evil - capture call sites that do expensive or badly-scaling operations.
1420
* fetch - trace history copying between repositories
1421
* graph - trace graph traversal information
1422
* hashcache - log every time a working file is read to determine its hash
1423
* hooks - trace hook execution
1424
* hpss - trace smart protocol requests and responses
1425
* http - trace http connections, requests and responses
1426
* index - trace major index operations
1427
* knit - trace knit operations
1428
* lock - trace when lockdir locks are taken or released
1429
* merge - emit information for debugging merges
1430
* pack - emit information about pack operations
1436
class TestResourceLoading(TestCaseInTempDir):
1438
def test_resource_string(self):
1439
# test resource in bzrlib
1440
text = osutils.resource_string('bzrlib', 'debug.py')
1441
self.assertEquals(_debug_text, text)
1442
# test resource under bzrlib
1443
text = osutils.resource_string('bzrlib.ui', 'text.py')
1444
self.assertContainsRe(text, "class TextUIFactory")
1445
# test unsupported package
1446
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1448
# test unknown resource
1449
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')