40
47
from .scenarios import load_tests_apply_scenarios
43
class _UTF8DirReaderFeature(features.ModuleAvailableFeature):
50
class _UTF8DirReaderFeature(features.Feature):
47
54
from .. import _readdir_pyx
48
self._module = _readdir_pyx
49
55
self.reader = _readdir_pyx.UTF8DirReader
51
57
except ImportError:
60
def feature_name(self):
61
return 'breezy._readdir_pyx'
55
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
63
UTF8DirReaderFeature = features.ModuleAvailableFeature('breezy._readdir_pyx')
57
65
term_ios_feature = features.ModuleAvailableFeature('termios')
131
139
def test_fancy_rename(self):
132
140
# This should work everywhere
133
self.create_file('a', b'something in a\n')
141
self.create_file('a', 'something in a\n')
134
142
self._fancy_rename('a', 'b')
135
143
self.assertPathDoesNotExist('a')
136
144
self.assertPathExists('b')
137
self.check_file_contents('b', b'something in a\n')
145
self.check_file_contents('b', 'something in a\n')
139
self.create_file('a', b'new something in a\n')
147
self.create_file('a', 'new something in a\n')
140
148
self._fancy_rename('b', 'a')
142
self.check_file_contents('a', b'something in a\n')
150
self.check_file_contents('a', 'something in a\n')
144
152
def test_fancy_rename_fails_source_missing(self):
145
153
# An exception should be raised, and the target should be left in place
146
self.create_file('target', b'data in target\n')
154
self.create_file('target', 'data in target\n')
147
155
self.assertRaises((IOError, OSError), self._fancy_rename,
148
156
'missingsource', 'target')
149
157
self.assertPathExists('target')
150
self.check_file_contents('target', b'data in target\n')
158
self.check_file_contents('target', 'data in target\n')
152
160
def test_fancy_rename_fails_if_source_and_target_missing(self):
153
161
self.assertRaises((IOError, OSError), self._fancy_rename,
156
164
def test_rename(self):
157
165
# Rename should be semi-atomic on all platforms
158
self.create_file('a', b'something in a\n')
166
self.create_file('a', 'something in a\n')
159
167
osutils.rename('a', 'b')
160
168
self.assertPathDoesNotExist('a')
161
169
self.assertPathExists('b')
162
self.check_file_contents('b', b'something in a\n')
170
self.check_file_contents('b', 'something in a\n')
164
self.create_file('a', b'new something in a\n')
172
self.create_file('a', 'new something in a\n')
165
173
osutils.rename('b', 'a')
167
self.check_file_contents('a', b'something in a\n')
175
self.check_file_contents('a', 'something in a\n')
169
177
# TODO: test fancy_rename using a MemoryTransport
249
257
# Without it, we may end up re-reading content when we don't have
250
258
# to, but otherwise it doesn't effect correctness.
251
259
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
252
with open('test-file.txt', 'wb') as f:
253
f.write(b'some content\n')
255
self.assertEqualStat(osutils.fstat(f.fileno()),
256
osutils.lstat('test-file.txt'))
260
f = open('test-file.txt', 'wb')
261
self.addCleanup(f.close)
262
f.write('some content\n')
264
self.assertEqualStat(osutils.fstat(f.fileno()),
265
osutils.lstat('test-file.txt'))
259
268
class TestRmTree(tests.TestCaseInTempDir):
396
404
self.assertFormatedDelta('2 seconds in the future', -2)
398
406
def test_format_date(self):
399
self.assertRaises(osutils.UnsupportedTimezoneFormat,
400
osutils.format_date, 0, timezone='foo')
407
self.assertRaises(errors.UnsupportedTimezoneFormat,
408
osutils.format_date, 0, timezone='foo')
401
409
self.assertIsInstance(osutils.format_date(0), str)
402
self.assertIsInstance(osutils.format_local_date(0), str)
410
self.assertIsInstance(osutils.format_local_date(0), unicode)
403
411
# Testing for the actual value of the local weekday without
404
412
# duplicating the code from format_date is difficult.
405
413
# Instead blackbox.test_locale should check for localized
408
416
def test_format_date_with_offset_in_original_timezone(self):
409
417
self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
410
osutils.format_date_with_offset_in_original_timezone(0))
418
osutils.format_date_with_offset_in_original_timezone(0))
411
419
self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
412
osutils.format_date_with_offset_in_original_timezone(100000))
420
osutils.format_date_with_offset_in_original_timezone(100000))
413
421
self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
414
osutils.format_date_with_offset_in_original_timezone(100000, 7200))
422
osutils.format_date_with_offset_in_original_timezone(100000, 7200))
416
424
def test_local_time_offset(self):
417
425
"""Test that local_time_offset() returns a sane value."""
612
621
# read (max // 2) bytes and verify read size wasn't affected
613
622
num_bytes_to_read = self.block_size // 2
614
osutils.pumpfile(from_file, to_file,
615
num_bytes_to_read, self.block_size)
623
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
616
624
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
617
625
self.assertEqual(from_file.get_read_count(), 1)
619
627
# read (max) bytes and verify read size wasn't affected
620
628
num_bytes_to_read = self.block_size
621
629
from_file.reset_read_count()
622
osutils.pumpfile(from_file, to_file,
623
num_bytes_to_read, self.block_size)
630
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
624
631
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
625
632
self.assertEqual(from_file.get_read_count(), 1)
627
634
# read (max + 1) bytes and verify read size was limited
628
635
num_bytes_to_read = self.block_size + 1
629
636
from_file.reset_read_count()
630
osutils.pumpfile(from_file, to_file,
631
num_bytes_to_read, self.block_size)
637
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
632
638
self.assertEqual(from_file.get_max_read_size(), self.block_size)
633
639
self.assertEqual(from_file.get_read_count(), 2)
635
641
# finish reading the rest of the data
636
642
num_bytes_to_read = self.test_data_len - to_file.tell()
637
osutils.pumpfile(from_file, to_file,
638
num_bytes_to_read, self.block_size)
643
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
640
645
# report error if the data wasn't equal (we only report the size due
641
646
# to the length of the data)
733
737
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
734
738
report_activity=log_activity, direction='read')
736
[(500, 'read'), (500, 'read'), (28, 'read')], activity)
739
self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
739
743
class TestPumpStringFile(tests.TestCase):
790
794
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
792
796
def test_from_utf8_string(self):
793
self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
797
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
795
799
def test_bad_utf8_string(self):
796
800
self.assertRaises(errors.BzrBadParameterNotUnicode,
797
801
osutils.safe_unicode,
801
805
class TestSafeUtf8(tests.TestCase):
803
807
def test_from_ascii_string(self):
805
self.assertEqual(b'foobar', osutils.safe_utf8(f))
809
self.assertEqual('foobar', osutils.safe_utf8(f))
807
811
def test_from_unicode_string_ascii_contents(self):
808
self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
812
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
810
814
def test_from_unicode_string_unicode_contents(self):
811
self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
815
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
813
817
def test_from_utf8_string(self):
814
self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
818
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
816
820
def test_bad_utf8_string(self):
817
821
self.assertRaises(errors.BzrBadParameterNotUnicode,
818
osutils.safe_utf8, b'\xbb\xbb')
822
osutils.safe_utf8, '\xbb\xbb')
825
class TestSafeRevisionId(tests.TestCase):
827
def test_from_ascii_string(self):
828
# this shouldn't give a warning because it's getting an ascii string
829
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
831
def test_from_unicode_string_ascii_contents(self):
832
self.assertRaises(TypeError,
833
osutils.safe_revision_id, u'bargam')
835
def test_from_unicode_string_unicode_contents(self):
836
self.assertRaises(TypeError,
837
osutils.safe_revision_id, u'bargam\xae')
839
def test_from_utf8_string(self):
840
self.assertEqual('foo\xc2\xae',
841
osutils.safe_revision_id('foo\xc2\xae'))
844
"""Currently, None is a valid revision_id"""
845
self.assertEqual(None, osutils.safe_revision_id(None))
848
class TestSafeFileId(tests.TestCase):
850
def test_from_ascii_string(self):
851
self.assertEqual('foobar', osutils.safe_file_id('foobar'))
853
def test_from_unicode_string_ascii_contents(self):
854
self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
856
def test_from_unicode_string_unicode_contents(self):
857
self.assertRaises(TypeError,
858
osutils.safe_file_id, u'bargam\xae')
860
def test_from_utf8_string(self):
861
self.assertEqual('foo\xc2\xae',
862
osutils.safe_file_id('foo\xc2\xae'))
865
"""Currently, None is a valid revision_id"""
866
self.assertEqual(None, osutils.safe_file_id(None))
821
869
class TestSendAll(tests.TestCase):
942
988
def test_minimum_path_selection(self):
943
989
self.assertEqual(set(),
944
osutils.minimum_path_selection([]))
990
osutils.minimum_path_selection([]))
945
991
self.assertEqual({'a'},
946
osutils.minimum_path_selection(['a']))
992
osutils.minimum_path_selection(['a']))
947
993
self.assertEqual({'a', 'b'},
948
osutils.minimum_path_selection(['a', 'b']))
949
self.assertEqual({'a/', 'b'},
950
osutils.minimum_path_selection(['a/', 'b']))
951
self.assertEqual({'a/', 'b'},
952
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
994
osutils.minimum_path_selection(['a', 'b']))
995
self.assertEqual({'a/', 'b'},
996
osutils.minimum_path_selection(['a/', 'b']))
997
self.assertEqual({'a/', 'b'},
998
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
953
999
self.assertEqual({'a-b', 'a', 'a0b'},
954
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
1000
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
956
1002
def test_mkdtemp(self):
957
1003
tmpdir = osutils._win32_mkdtemp(dir='.')
958
1004
self.assertFalse('\\' in tmpdir)
960
1006
def test_rename(self):
961
with open('a', 'wb') as a:
963
with open('b', 'wb') as b:
966
1014
osutils._win32_rename('b', 'a')
967
1015
self.assertPathExists('a')
968
1016
self.assertPathDoesNotExist('b')
969
self.assertFileEqual(b'baz\n', 'a')
1017
self.assertFileEqual('baz\n', 'a')
971
1019
def test_rename_missing_file(self):
972
with open('a', 'wb') as a:
976
1025
osutils._win32_rename('b', 'a')
977
1026
except (IOError, OSError) as e:
978
1027
self.assertEqual(errno.ENOENT, e.errno)
979
self.assertFileEqual(b'foo\n', 'a')
1028
self.assertFileEqual('foo\n', 'a')
981
1030
def test_rename_missing_dir(self):
1005
1054
check(['a', 'b'], 'a/b')
1006
1055
check(['a', 'b'], 'a/./b')
1007
1056
check(['a', '.b'], 'a/.b')
1008
if os.path.sep == '\\':
1009
check(['a', '.b'], 'a\\.b')
1011
check(['a\\.b'], 'a\\.b')
1057
check(['a', '.b'], 'a\\.b')
1013
1059
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
1042
1088
class TestChunksToLines(tests.TestCase):
1044
1090
def test_smoketest(self):
1045
self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1046
osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
1047
self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1048
osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
1091
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
1092
osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
1093
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
1094
osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
1050
1096
def test_osutils_binding(self):
1051
1097
from . import test__chunks_to_lines
1088
1134
self.build_tree(tree)
1089
1135
expected_dirblocks = [
1091
[('0file', '0file', 'file'),
1092
('1dir', '1dir', 'directory'),
1093
('2file', '2file', 'file'),
1096
(('1dir', './1dir'),
1097
[('1dir/0file', '0file', 'file'),
1098
('1dir/1dir', '1dir', 'directory'),
1101
(('1dir/1dir', './1dir/1dir'),
1137
[('0file', '0file', 'file'),
1138
('1dir', '1dir', 'directory'),
1139
('2file', '2file', 'file'),
1142
(('1dir', './1dir'),
1143
[('1dir/0file', '0file', 'file'),
1144
('1dir/1dir', '1dir', 'directory'),
1147
(('1dir/1dir', './1dir/1dir'),
1107
1153
found_bzrdir = False
1136
1182
# (It would be ok if it happened earlier but at the moment it
1138
1184
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1139
self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
1185
self.assertEqual('./test-unreadable', e.filename)
1140
1186
self.assertEqual(errno.EACCES, e.errno)
1141
1187
# Ensure the message contains the file name
1142
self.assertContainsRe(str(e), "\\./test-unreadable")
1188
self.assertContainsRe(str(e), "\./test-unreadable")
1144
1191
def test_walkdirs_encoding_error(self):
1145
1192
# <https://bugs.launchpad.net/bzr/+bug/488519>
1187
1234
self.build_tree(tree)
1188
1235
expected_dirblocks = [
1190
[('0file', '0file', 'file'),
1191
('1dir', '1dir', 'directory'),
1192
('2file', '2file', 'file'),
1195
(('1dir', './1dir'),
1196
[('1dir/0file', '0file', 'file'),
1197
('1dir/1dir', '1dir', 'directory'),
1200
(('1dir/1dir', './1dir/1dir'),
1237
[('0file', '0file', 'file'),
1238
('1dir', '1dir', 'directory'),
1239
('2file', '2file', 'file'),
1242
(('1dir', './1dir'),
1243
[('1dir/0file', '0file', 'file'),
1244
('1dir/1dir', '1dir', 'directory'),
1247
(('1dir/1dir', './1dir/1dir'),
1206
1253
found_bzrdir = False
1207
for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
1208
if len(dirblock) and dirblock[0][1] == b'.bzr':
1254
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1255
if len(dirblock) and dirblock[0][1] == '.bzr':
1209
1256
# this tests the filtering of selected paths
1210
1257
found_bzrdir = True
1211
1258
del dirblock[0]
1212
dirdetail = (dirdetail[0].decode('utf-8'),
1213
osutils.safe_unicode(dirdetail[1]))
1215
(entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
1216
for entry in dirblock]
1217
1259
result.append((dirdetail, dirblock))
1219
1261
self.assertTrue(found_bzrdir)
1238
1280
self.overrideAttr(osutils, '_fs_enc')
1239
1281
self.overrideAttr(osutils, '_selected_dir_reader')
1241
def assertDirReaderIs(self, expected, top):
1283
def assertDirReaderIs(self, expected):
1242
1284
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1243
1285
# Force it to redetect
1244
1286
osutils._selected_dir_reader = None
1245
1287
# Nothing to list, but should still trigger the selection logic
1246
self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
1288
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1247
1289
self.assertIsInstance(osutils._selected_dir_reader, expected)
1249
1291
def test_force_walkdirs_utf8_fs_utf8(self):
1250
1292
self.requireFeature(UTF8DirReaderFeature)
1251
1293
self._save_platform_info()
1252
1294
osutils._fs_enc = 'utf-8'
1253
self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
1295
self.assertDirReaderIs(
1296
UTF8DirReaderFeature.module.UTF8DirReader)
1255
1298
def test_force_walkdirs_utf8_fs_ascii(self):
1256
1299
self.requireFeature(UTF8DirReaderFeature)
1257
1300
self._save_platform_info()
1258
1301
osutils._fs_enc = 'ascii'
1259
1302
self.assertDirReaderIs(
1260
UTF8DirReaderFeature.module.UTF8DirReader, b".")
1303
UTF8DirReaderFeature.module.UTF8DirReader)
1262
1305
def test_force_walkdirs_utf8_fs_latin1(self):
1263
1306
self._save_platform_info()
1264
1307
osutils._fs_enc = 'iso-8859-1'
1265
self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
1308
self.assertDirReaderIs(osutils.UnicodeDirReader)
1267
1310
def test_force_walkdirs_utf8_nt(self):
1268
1311
# Disabled because the thunk of the whole walkdirs api is disabled.
1269
1312
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1270
1313
self._save_platform_info()
1271
1314
from .._walkdirs_win32 import Win32ReadDir
1272
self.assertDirReaderIs(Win32ReadDir, ".")
1315
self.assertDirReaderIs(Win32ReadDir)
1274
1317
def test_unicode_walkdirs(self):
1275
1318
"""Walkdirs should always return unicode paths."""
1287
1330
self.build_tree(tree)
1288
1331
expected_dirblocks = [
1290
[(name0, name0, 'file', './' + name0),
1291
(name1, name1, 'directory', './' + name1),
1292
(name2, name2, 'file', './' + name2),
1295
((name1, './' + name1),
1296
[(name1 + '/' + name0, name0, 'file', './' + name1
1298
(name1 + '/' + name1, name1, 'directory', './' + name1
1302
((name1 + '/' + name1, './' + name1 + '/' + name1),
1333
[(name0, name0, 'file', './' + name0),
1334
(name1, name1, 'directory', './' + name1),
1335
(name2, name2, 'file', './' + name2),
1338
((name1, './' + name1),
1339
[(name1 + '/' + name0, name0, 'file', './' + name1
1341
(name1 + '/' + name1, name1, 'directory', './' + name1
1345
((name1 + '/' + name1, './' + name1 + '/' + name1),
1307
1350
result = list(osutils.walkdirs('.'))
1308
1351
self._filter_out_stat(result)
1309
1352
self.assertEqual(expected_dirblocks, result)
1310
result = list(osutils.walkdirs(u'./' + name1, name1))
1353
result = list(osutils.walkdirs(u'./'+name1, name1))
1311
1354
self._filter_out_stat(result)
1312
1355
self.assertEqual(expected_dirblocks[1:], result)
1333
1376
name2 = name2.encode('utf8')
1335
1378
expected_dirblocks = [
1337
[(name0, name0, 'file', b'./' + name0),
1338
(name1, name1, 'directory', b'./' + name1),
1339
(name2, name2, 'file', b'./' + name2),
1342
((name1, b'./' + name1),
1343
[(name1 + b'/' + name0, name0, 'file', b'./' + name1
1345
(name1 + b'/' + name1, name1, 'directory', b'./' + name1
1349
((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
1380
[(name0, name0, 'file', './' + name0),
1381
(name1, name1, 'directory', './' + name1),
1382
(name2, name2, 'file', './' + name2),
1385
((name1, './' + name1),
1386
[(name1 + '/' + name0, name0, 'file', './' + name1
1388
(name1 + '/' + name1, name1, 'directory', './' + name1
1392
((name1 + '/' + name1, './' + name1 + '/' + name1),
1355
1398
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1356
1399
# all abspaths are Unicode, and encode them back into utf8.
1357
1400
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1358
self.assertIsInstance(dirdetail[0], bytes)
1359
if isinstance(dirdetail[1], str):
1401
self.assertIsInstance(dirdetail[0], str)
1402
if isinstance(dirdetail[1], unicode):
1360
1403
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1361
1404
dirblock = [list(info) for info in dirblock]
1362
1405
for info in dirblock:
1363
self.assertIsInstance(info[4], str)
1406
self.assertIsInstance(info[4], unicode)
1364
1407
info[4] = info[4].encode('utf8')
1365
1408
new_dirblock = []
1366
1409
for info in dirblock:
1367
self.assertIsInstance(info[0], bytes)
1368
self.assertIsInstance(info[1], bytes)
1369
self.assertIsInstance(info[4], bytes)
1410
self.assertIsInstance(info[0], str)
1411
self.assertIsInstance(info[1], str)
1412
self.assertIsInstance(info[4], str)
1370
1413
# Remove the stat information
1371
1414
new_dirblock.append((info[0], info[1], info[2], info[4]))
1372
1415
result.append((dirdetail, new_dirblock))
1400
1443
# All of the abspaths should be in unicode, all of the relative paths
1401
1444
# should be in utf8
1402
1445
expected_dirblocks = [
1404
[(name0, name0, 'file', './' + name0u),
1405
(name1, name1, 'directory', './' + name1u),
1406
(name2, name2, 'file', './' + name2u),
1409
((name1, './' + name1u),
1410
[(name1 + b'/' + name0, name0, 'file', './' + name1u
1412
(name1 + b'/' + name1, name1, 'directory', './' + name1u
1416
((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1447
[(name0, name0, 'file', './' + name0u),
1448
(name1, name1, 'directory', './' + name1u),
1449
(name2, name2, 'file', './' + name2u),
1452
((name1, './' + name1u),
1453
[(name1 + '/' + name0, name0, 'file', './' + name1u
1455
(name1 + '/' + name1, name1, 'directory', './' + name1u
1459
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1421
1464
result = list(osutils._walkdirs_utf8('.'))
1422
1465
self._filter_out_stat(result)
1446
1489
# All of the abspaths should be in unicode, all of the relative paths
1447
1490
# should be in utf8
1448
1491
expected_dirblocks = [
1450
[(name0, name0, 'file', './' + name0u),
1451
(name1, name1, 'directory', './' + name1u),
1452
(name2, name2, 'file', './' + name2u),
1455
((name1, './' + name1u),
1456
[(name1 + '/' + name0, name0, 'file', './' + name1u
1458
(name1 + '/' + name1, name1, 'directory', './' + name1u
1462
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1493
[(name0, name0, 'file', './' + name0u),
1494
(name1, name1, 'directory', './' + name1u),
1495
(name2, name2, 'file', './' + name2u),
1498
((name1, './' + name1u),
1499
[(name1 + '/' + name0, name0, 'file', './' + name1u
1501
(name1 + '/' + name1, name1, 'directory', './' + name1u
1505
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1467
1510
result = list(osutils._walkdirs_utf8(u'.'))
1468
1511
self._filter_out_stat(result)
1618
1664
def test_copy_tree_handlers(self):
1619
1665
processed_files = []
1620
1666
processed_links = []
1622
1667
def file_handler(from_path, to_path):
1623
1668
processed_files.append(('f', from_path, to_path))
1625
1669
def dir_handler(from_path, to_path):
1626
1670
processed_files.append(('d', from_path, to_path))
1628
1671
def link_handler(from_path, to_path):
1629
1672
processed_links.append((from_path, to_path))
1630
handlers = {'file': file_handler,
1631
'directory': dir_handler,
1632
'symlink': link_handler,
1673
handlers = {'file':file_handler,
1674
'directory':dir_handler,
1675
'symlink':link_handler,
1635
1678
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1636
1679
if osutils.has_symlinks():
1641
1684
('f', 'source/a', 'target/a'),
1642
1685
('d', 'source/b', 'target/b'),
1643
1686
('f', 'source/b/c', 'target/b/c'),
1645
1688
self.assertPathDoesNotExist('target')
1646
1689
if osutils.has_symlinks():
1647
1690
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1695
1737
old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
1696
1738
self.assertEqual('foo', old)
1697
1739
self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
1698
self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
1740
self.assertFalse('BRZ_TEST_ENV_VAR' in os.environ)
1701
1743
class TestSizeShaFile(tests.TestCaseInTempDir):
1703
1745
def test_sha_empty(self):
1704
self.build_tree_contents([('foo', b'')])
1705
expected_sha = osutils.sha_string(b'')
1746
self.build_tree_contents([('foo', '')])
1747
expected_sha = osutils.sha_string('')
1706
1748
f = open('foo')
1707
1749
self.addCleanup(f.close)
1708
1750
size, sha = osutils.size_sha_file(f)
1723
1765
class TestShaFileByName(tests.TestCaseInTempDir):
1725
1767
def test_sha_empty(self):
1726
self.build_tree_contents([('foo', b'')])
1727
expected_sha = osutils.sha_string(b'')
1768
self.build_tree_contents([('foo', '')])
1769
expected_sha = osutils.sha_string('')
1728
1770
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1730
1772
def test_sha_mixed_endings(self):
1731
text = b'test\r\nwith\nall\rpossible line endings\r\n'
1773
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1732
1774
self.build_tree_contents([('foo', text)])
1733
1775
expected_sha = osutils.sha_string(text)
1734
1776
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1774
1816
expected_dirblocks = [
1776
[(b'0file', b'0file', 'file', './0file'),
1777
(b'1dir', b'1dir', 'directory', './1dir'),
1778
(b'2file', b'2file', 'file', './2file'),
1781
((b'1dir', './1dir'),
1782
[(b'1dir/0file', b'0file', 'file', './1dir/0file'),
1783
(b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
1786
((b'1dir/1dir', './1dir/1dir'),
1818
[('0file', '0file', 'file'),
1819
('1dir', '1dir', 'directory'),
1820
('2file', '2file', 'file'),
1823
(('1dir', './1dir'),
1824
[('1dir/0file', '0file', 'file'),
1825
('1dir/1dir', '1dir', 'directory'),
1828
(('1dir/1dir', './1dir/1dir'),
1791
1833
return tree, expected_dirblocks
1796
1838
result = list(osutils._walkdirs_utf8('.'))
1797
1839
# Filter out stat and abspath
1798
1840
self.assertEqual(expected_dirblocks,
1799
self._filter_out(result))
1841
[(dirinfo, [line[0:3] for line in block])
1842
for dirinfo, block in result])
1801
1844
def test_walk_sub_dir(self):
1802
1845
tree, expected_dirblocks = self._get_ascii_tree()
1803
1846
self.build_tree(tree)
1804
1847
# you can search a subdir only, with a supplied prefix.
1805
result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
1848
result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1806
1849
# Filter out stat and abspath
1807
1850
self.assertEqual(expected_dirblocks[1:],
1808
self._filter_out(result))
1851
[(dirinfo, [line[0:3] for line in block])
1852
for dirinfo, block in result])
1810
1854
def _get_unicode_tree(self):
1811
1855
name0u = u'0file-\xb6'
1822
1866
name1 = name1u.encode('UTF-8')
1823
1867
name2 = name2u.encode('UTF-8')
1824
1868
expected_dirblocks = [
1826
[(name0, name0, 'file', './' + name0u),
1827
(name1, name1, 'directory', './' + name1u),
1828
(name2, name2, 'file', './' + name2u),
1831
((name1, './' + name1u),
1832
[(name1 + b'/' + name0, name0, 'file', './' + name1u
1834
(name1 + b'/' + name1, name1, 'directory', './' + name1u
1838
((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1870
[(name0, name0, 'file', './' + name0u),
1871
(name1, name1, 'directory', './' + name1u),
1872
(name2, name2, 'file', './' + name2u),
1875
((name1, './' + name1u),
1876
[(name1 + '/' + name0, name0, 'file', './' + name1u
1878
(name1 + '/' + name1, name1, 'directory', './' + name1u
1882
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1843
1887
return tree, expected_dirblocks
1870
1913
target = u'target\N{Euro Sign}'
1871
1914
link_name = u'l\N{Euro Sign}nk'
1872
1915
os.symlink(target, link_name)
1916
target_utf8 = target.encode('UTF-8')
1873
1917
link_name_utf8 = link_name.encode('UTF-8')
1874
1918
expected_dirblocks = [
1876
[(link_name_utf8, link_name_utf8,
1877
'symlink', './' + link_name), ],
1920
[(link_name_utf8, link_name_utf8,
1921
'symlink', './' + link_name),],
1879
1923
result = list(osutils._walkdirs_utf8('.'))
1880
1924
self.assertEqual(expected_dirblocks, self._filter_out(result))
1899
1942
os.symlink(self.target, self.link)
1901
1944
def test_os_readlink_link_encoding(self):
1902
self.assertEqual(self.target, os.readlink(self.link))
1945
self.assertEqual(self.target, os.readlink(self.link))
1904
1947
def test_os_readlink_link_decoding(self):
1905
1948
self.assertEqual(self.target.encode(osutils._fs_enc),
1906
os.readlink(self.link.encode(osutils._fs_enc)))
1949
os.readlink(self.link.encode(osutils._fs_enc)))
1909
1952
class TestConcurrency(tests.TestCase):
1948
1991
def test_failure_to_load(self):
1949
1992
self._try_loading()
1950
1993
self.assertLength(1, osutils._extension_load_failures)
1952
osutils._extension_load_failures[0],
1953
"No module named 'breezy._fictional_extension_py'")
1994
self.assertEqual(osutils._extension_load_failures[0],
1995
"No module named _fictional_extension_py")
1955
1997
def test_report_extension_load_failures_no_warning(self):
1956
1998
self.assertTrue(self._try_loading())
1957
warnings, result = self.callCatchWarnings(
1958
osutils.report_extension_load_failures)
1999
warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1959
2000
# it used to give a Python warning; it no longer does
1960
2001
self.assertLength(0, warnings)
2093
2134
self.assertEqual(self.gid, s.st_gid)
2137
class TestPathFromEnviron(tests.TestCase):
2139
def test_is_unicode(self):
2140
self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
2141
path = osutils.path_from_environ('BRZ_TEST_PATH')
2142
self.assertIsInstance(path, unicode)
2143
self.assertEqual(u'./anywhere at all/', path)
2145
def test_posix_path_env_ascii(self):
2146
self.overrideEnv('BRZ_TEST_PATH', '/tmp')
2147
home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
2148
self.assertIsInstance(home, unicode)
2149
self.assertEqual(u'/tmp', home)
2151
def test_posix_path_env_unicode(self):
2152
self.requireFeature(features.ByteStringNamedFilesystem)
2153
self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
2154
self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2155
self.assertEqual(u'/home/\xa7test',
2156
osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2157
osutils._fs_enc = "iso8859-5"
2158
self.assertEqual(u'/home/\u0407test',
2159
osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2160
osutils._fs_enc = "utf-8"
2161
self.assertRaises(errors.BadFilenameEncoding,
2162
osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
2096
2165
class TestGetHomeDir(tests.TestCase):
2098
2167
def test_is_unicode(self):
2099
2168
home = osutils._get_home_dir()
2100
self.assertIsInstance(home, str)
2169
self.assertIsInstance(home, unicode)
2102
2171
def test_posix_homeless(self):
2103
2172
self.overrideEnv('HOME', None)
2104
2173
home = osutils._get_home_dir()
2105
self.assertIsInstance(home, str)
2174
self.assertIsInstance(home, unicode)
2107
2176
def test_posix_home_ascii(self):
2108
2177
self.overrideEnv('HOME', '/home/test')
2109
2178
home = osutils._posix_get_home_dir()
2110
self.assertIsInstance(home, str)
2179
self.assertIsInstance(home, unicode)
2111
2180
self.assertEqual(u'/home/test', home)
2113
2182
def test_posix_home_unicode(self):
2116
2185
self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2117
2186
self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2118
2187
osutils._fs_enc = "iso8859-5"
2119
# In python 3, os.environ returns unicode
2120
self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2188
self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
2189
osutils._fs_enc = "utf-8"
2190
self.assertRaises(errors.BadFilenameEncoding,
2191
osutils._posix_get_home_dir)
2123
2194
class TestGetuserUnicode(tests.TestCase):
2125
2196
def test_is_unicode(self):
2126
2197
user = osutils.getuser_unicode()
2127
self.assertIsInstance(user, str)
2198
self.assertIsInstance(user, unicode)
2129
2200
def envvar_to_override(self):
2130
2201
if sys.platform == "win32":
2131
2202
# Disable use of platform calls on windows so envvar is used
2132
2203
self.overrideAttr(win32utils, 'has_ctypes', False)
2133
return 'USERNAME' # only variable used on windows
2134
return 'LOGNAME' # first variable checked by getpass.getuser()
2204
return 'USERNAME' # only variable used on windows
2205
return 'LOGNAME' # first variable checked by getpass.getuser()
2136
2207
def test_ascii_user(self):
2137
2208
self.overrideEnv(self.envvar_to_override(), 'jrandom')
2194
2264
self.assertTrue(
2195
2265
osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2196
2266
self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
2198
2268
def test_windows_app_path(self):
2199
2269
if sys.platform != 'win32':
2200
2270
raise tests.TestSkipped('test requires win32')
2201
2271
# Override PATH env var so that exe can only be found on App Path
2202
2272
self.overrideEnv('PATH', '')
2203
2273
# Internt Explorer is always registered in the App Path
2204
self.assertTrue(osutils.find_executable_on_path(
2205
'iexplore') is not None)
2274
self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
2207
2276
def test_other(self):
2208
2277
if sys.platform == 'win32':
2212
2281
osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2215
class SupportsExecutableTests(tests.TestCaseInTempDir):
2217
def test_returns_bool(self):
2218
self.assertIsInstance(osutils.supports_executable(self.test_dir), bool)
2221
class SupportsSymlinksTests(tests.TestCaseInTempDir):
2223
def test_returns_bool(self):
2224
self.assertIsInstance(osutils.supports_symlinks(self.test_dir), bool)
2227
class MtabReader(tests.TestCaseInTempDir):
2229
def test_read_mtab(self):
2230
self.build_tree_contents([('mtab', """\
2231
/dev/mapper/blah--vg-root / ext4 rw,relatime,errors=remount-ro 0 0
2232
/dev/mapper/blah--vg-home /home vfat rw,relatime 0 0
2238
list(osutils.read_mtab('mtab')),
2240
(b'/home', 'vfat')])
2243
class GetFsTypeTests(tests.TestCaseInTempDir):
2245
def test_returns_string_or_none(self):
2246
ret = osutils.get_fs_type(self.test_dir)
2247
self.assertTrue(isinstance(ret, str) or ret is None)
2249
def test_returns_most_specific(self):
2251
osutils, '_FILESYSTEM_FINDER',
2252
osutils.FilesystemFinder(
2253
[(b'/', 'ext4'), (b'/home', 'vfat'),
2254
(b'/home/jelmer', 'ext2')]))
2255
self.assertEqual(osutils.get_fs_type(b'/home/jelmer/blah'), 'ext2')
2256
self.assertEqual(osutils.get_fs_type('/home/jelmer/blah'), 'ext2')
2257
self.assertEqual(osutils.get_fs_type(b'/home/jelmer'), 'ext2')
2258
self.assertEqual(osutils.get_fs_type(b'/home/martin'), 'vfat')
2259
self.assertEqual(osutils.get_fs_type(b'/home'), 'vfat')
2260
self.assertEqual(osutils.get_fs_type(b'/other'), 'ext4')
2262
def test_returns_none(self):
2264
osutils, '_FILESYSTEM_FINDER',
2265
osutils.FilesystemFinder([]))
2266
self.assertIs(osutils.get_fs_type('/home/jelmer/blah'), None)
2267
self.assertIs(osutils.get_fs_type(b'/home/jelmer/blah'), None)
2268
self.assertIs(osutils.get_fs_type('/home/jelmer'), None)
2284
class TestEnvironmentErrors(tests.TestCase):
2285
"""Test handling of environmental errors"""
2287
def test_is_oserror(self):
2288
self.assertTrue(osutils.is_environment_error(
2289
OSError(errno.EINVAL, "Invalid parameter")))
2291
def test_is_ioerror(self):
2292
self.assertTrue(osutils.is_environment_error(
2293
IOError(errno.EINVAL, "Invalid parameter")))
2295
def test_is_socket_error(self):
2296
self.assertTrue(osutils.is_environment_error(
2297
socket.error(errno.EINVAL, "Invalid parameter")))
2299
def test_is_select_error(self):
2300
self.assertTrue(osutils.is_environment_error(
2301
select.error(errno.EINVAL, "Invalid parameter")))
2303
def test_is_pywintypes_error(self):
2304
self.requireFeature(features.pywintypes)
2306
self.assertTrue(osutils.is_environment_error(
2307
pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))