131
137
def test_fancy_rename(self):
132
138
# This should work everywhere
133
self.create_file('a', b'something in a\n')
139
self.create_file('a', 'something in a\n')
134
140
self._fancy_rename('a', 'b')
135
141
self.assertPathDoesNotExist('a')
136
142
self.assertPathExists('b')
137
self.check_file_contents('b', b'something in a\n')
143
self.check_file_contents('b', 'something in a\n')
139
self.create_file('a', b'new something in a\n')
145
self.create_file('a', 'new something in a\n')
140
146
self._fancy_rename('b', 'a')
142
self.check_file_contents('a', b'something in a\n')
148
self.check_file_contents('a', 'something in a\n')
144
150
def test_fancy_rename_fails_source_missing(self):
145
151
# An exception should be raised, and the target should be left in place
146
self.create_file('target', b'data in target\n')
152
self.create_file('target', 'data in target\n')
147
153
self.assertRaises((IOError, OSError), self._fancy_rename,
148
154
'missingsource', 'target')
149
155
self.assertPathExists('target')
150
self.check_file_contents('target', b'data in target\n')
156
self.check_file_contents('target', 'data in target\n')
152
158
def test_fancy_rename_fails_if_source_and_target_missing(self):
153
159
self.assertRaises((IOError, OSError), self._fancy_rename,
156
162
def test_rename(self):
157
163
# Rename should be semi-atomic on all platforms
158
self.create_file('a', b'something in a\n')
164
self.create_file('a', 'something in a\n')
159
165
osutils.rename('a', 'b')
160
166
self.assertPathDoesNotExist('a')
161
167
self.assertPathExists('b')
162
self.check_file_contents('b', b'something in a\n')
168
self.check_file_contents('b', 'something in a\n')
164
self.create_file('a', b'new something in a\n')
170
self.create_file('a', 'new something in a\n')
165
171
osutils.rename('b', 'a')
167
self.check_file_contents('a', b'something in a\n')
173
self.check_file_contents('a', 'something in a\n')
169
175
# TODO: test fancy_rename using a MemoryTransport
249
255
# Without it, we may end up re-reading content when we don't have
250
256
# to, but otherwise it doesn't effect correctness.
251
257
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
252
with open('test-file.txt', 'wb') as f:
253
f.write(b'some content\n')
255
self.assertEqualStat(osutils.fstat(f.fileno()),
256
osutils.lstat('test-file.txt'))
258
f = open('test-file.txt', 'wb')
259
self.addCleanup(f.close)
260
f.write('some content\n')
262
self.assertEqualStat(osutils.fstat(f.fileno()),
263
osutils.lstat('test-file.txt'))
259
266
class TestRmTree(tests.TestCaseInTempDir):
398
404
def test_format_date(self):
399
405
self.assertRaises(osutils.UnsupportedTimezoneFormat,
400
osutils.format_date, 0, timezone='foo')
406
osutils.format_date, 0, timezone='foo')
401
407
self.assertIsInstance(osutils.format_date(0), str)
402
self.assertIsInstance(osutils.format_local_date(0), str)
408
self.assertIsInstance(osutils.format_local_date(0), unicode)
403
409
# Testing for the actual value of the local weekday without
404
410
# duplicating the code from format_date is difficult.
405
411
# Instead blackbox.test_locale should check for localized
408
414
def test_format_date_with_offset_in_original_timezone(self):
409
415
self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
410
osutils.format_date_with_offset_in_original_timezone(0))
416
osutils.format_date_with_offset_in_original_timezone(0))
411
417
self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
412
osutils.format_date_with_offset_in_original_timezone(100000))
418
osutils.format_date_with_offset_in_original_timezone(100000))
413
419
self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
414
osutils.format_date_with_offset_in_original_timezone(100000, 7200))
420
osutils.format_date_with_offset_in_original_timezone(100000, 7200))
416
422
def test_local_time_offset(self):
417
423
"""Test that local_time_offset() returns a sane value."""
612
619
# read (max // 2) bytes and verify read size wasn't affected
613
620
num_bytes_to_read = self.block_size // 2
614
osutils.pumpfile(from_file, to_file,
615
num_bytes_to_read, self.block_size)
621
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
616
622
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
617
623
self.assertEqual(from_file.get_read_count(), 1)
619
625
# read (max) bytes and verify read size wasn't affected
620
626
num_bytes_to_read = self.block_size
621
627
from_file.reset_read_count()
622
osutils.pumpfile(from_file, to_file,
623
num_bytes_to_read, self.block_size)
628
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
624
629
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
625
630
self.assertEqual(from_file.get_read_count(), 1)
627
632
# read (max + 1) bytes and verify read size was limited
628
633
num_bytes_to_read = self.block_size + 1
629
634
from_file.reset_read_count()
630
osutils.pumpfile(from_file, to_file,
631
num_bytes_to_read, self.block_size)
635
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
632
636
self.assertEqual(from_file.get_max_read_size(), self.block_size)
633
637
self.assertEqual(from_file.get_read_count(), 2)
635
639
# finish reading the rest of the data
636
640
num_bytes_to_read = self.test_data_len - to_file.tell()
637
osutils.pumpfile(from_file, to_file,
638
num_bytes_to_read, self.block_size)
641
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
640
643
# report error if the data wasn't equal (we only report the size due
641
644
# to the length of the data)
733
735
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
734
736
report_activity=log_activity, direction='read')
736
[(500, 'read'), (500, 'read'), (28, 'read')], activity)
737
self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
739
741
class TestPumpStringFile(tests.TestCase):
790
792
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
792
794
def test_from_utf8_string(self):
793
self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
795
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
795
797
def test_bad_utf8_string(self):
796
798
self.assertRaises(errors.BzrBadParameterNotUnicode,
797
799
osutils.safe_unicode,
801
803
class TestSafeUtf8(tests.TestCase):
803
805
def test_from_ascii_string(self):
805
self.assertEqual(b'foobar', osutils.safe_utf8(f))
807
self.assertEqual('foobar', osutils.safe_utf8(f))
807
809
def test_from_unicode_string_ascii_contents(self):
808
self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
810
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
810
812
def test_from_unicode_string_unicode_contents(self):
811
self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
813
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
813
815
def test_from_utf8_string(self):
814
self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
816
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
816
818
def test_bad_utf8_string(self):
817
819
self.assertRaises(errors.BzrBadParameterNotUnicode,
818
osutils.safe_utf8, b'\xbb\xbb')
820
osutils.safe_utf8, '\xbb\xbb')
823
class TestSafeRevisionId(tests.TestCase):
825
def test_from_ascii_string(self):
826
# this shouldn't give a warning because it's getting an ascii string
827
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
829
def test_from_unicode_string_ascii_contents(self):
830
self.assertRaises(TypeError,
831
osutils.safe_revision_id, u'bargam')
833
def test_from_unicode_string_unicode_contents(self):
834
self.assertRaises(TypeError,
835
osutils.safe_revision_id, u'bargam\xae')
837
def test_from_utf8_string(self):
838
self.assertEqual('foo\xc2\xae',
839
osutils.safe_revision_id('foo\xc2\xae'))
842
"""Currently, None is a valid revision_id"""
843
self.assertEqual(None, osutils.safe_revision_id(None))
846
class TestSafeFileId(tests.TestCase):
848
def test_from_ascii_string(self):
849
self.assertEqual('foobar', osutils.safe_file_id('foobar'))
851
def test_from_unicode_string_ascii_contents(self):
852
self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
854
def test_from_unicode_string_unicode_contents(self):
855
self.assertRaises(TypeError,
856
osutils.safe_file_id, u'bargam\xae')
858
def test_from_utf8_string(self):
859
self.assertEqual('foo\xc2\xae',
860
osutils.safe_file_id('foo\xc2\xae'))
863
"""Currently, None is a valid revision_id"""
864
self.assertEqual(None, osutils.safe_file_id(None))
821
867
class TestSendAll(tests.TestCase):
942
986
def test_minimum_path_selection(self):
943
987
self.assertEqual(set(),
944
osutils.minimum_path_selection([]))
988
osutils.minimum_path_selection([]))
945
989
self.assertEqual({'a'},
946
osutils.minimum_path_selection(['a']))
990
osutils.minimum_path_selection(['a']))
947
991
self.assertEqual({'a', 'b'},
948
osutils.minimum_path_selection(['a', 'b']))
949
self.assertEqual({'a/', 'b'},
950
osutils.minimum_path_selection(['a/', 'b']))
951
self.assertEqual({'a/', 'b'},
952
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
992
osutils.minimum_path_selection(['a', 'b']))
993
self.assertEqual({'a/', 'b'},
994
osutils.minimum_path_selection(['a/', 'b']))
995
self.assertEqual({'a/', 'b'},
996
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
953
997
self.assertEqual({'a-b', 'a', 'a0b'},
954
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
998
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
956
1000
def test_mkdtemp(self):
957
1001
tmpdir = osutils._win32_mkdtemp(dir='.')
958
1002
self.assertFalse('\\' in tmpdir)
960
1004
def test_rename(self):
961
with open('a', 'wb') as a:
963
with open('b', 'wb') as b:
966
1012
osutils._win32_rename('b', 'a')
967
1013
self.assertPathExists('a')
968
1014
self.assertPathDoesNotExist('b')
969
self.assertFileEqual(b'baz\n', 'a')
1015
self.assertFileEqual('baz\n', 'a')
971
1017
def test_rename_missing_file(self):
972
with open('a', 'wb') as a:
976
1023
osutils._win32_rename('b', 'a')
977
1024
except (IOError, OSError) as e:
978
1025
self.assertEqual(errno.ENOENT, e.errno)
979
self.assertFileEqual(b'foo\n', 'a')
1026
self.assertFileEqual('foo\n', 'a')
981
1028
def test_rename_missing_dir(self):
1005
1052
check(['a', 'b'], 'a/b')
1006
1053
check(['a', 'b'], 'a/./b')
1007
1054
check(['a', '.b'], 'a/.b')
1008
if os.path.sep == '\\':
1009
check(['a', '.b'], 'a\\.b')
1011
check(['a\\.b'], 'a\\.b')
1055
check(['a', '.b'], 'a\\.b')
1013
1057
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
1042
1086
class TestChunksToLines(tests.TestCase):
1044
1088
def test_smoketest(self):
1045
self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1046
osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
1047
self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1048
osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
1089
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
1090
osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
1091
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
1092
osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
1050
1094
def test_osutils_binding(self):
1051
1095
from . import test__chunks_to_lines
1088
1132
self.build_tree(tree)
1089
1133
expected_dirblocks = [
1091
[('0file', '0file', 'file'),
1092
('1dir', '1dir', 'directory'),
1093
('2file', '2file', 'file'),
1096
(('1dir', './1dir'),
1097
[('1dir/0file', '0file', 'file'),
1098
('1dir/1dir', '1dir', 'directory'),
1101
(('1dir/1dir', './1dir/1dir'),
1135
[('0file', '0file', 'file'),
1136
('1dir', '1dir', 'directory'),
1137
('2file', '2file', 'file'),
1140
(('1dir', './1dir'),
1141
[('1dir/0file', '0file', 'file'),
1142
('1dir/1dir', '1dir', 'directory'),
1145
(('1dir/1dir', './1dir/1dir'),
1107
1151
found_bzrdir = False
1136
1180
# (It would be ok if it happened earlier but at the moment it
1138
1182
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1139
self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
1183
self.assertEqual('./test-unreadable', e.filename)
1140
1184
self.assertEqual(errno.EACCES, e.errno)
1141
1185
# Ensure the message contains the file name
1142
1186
self.assertContainsRe(str(e), "\\./test-unreadable")
1144
1189
def test_walkdirs_encoding_error(self):
1145
1190
# <https://bugs.launchpad.net/bzr/+bug/488519>
1146
1191
# walkdirs didn't raise a useful message when the filenames
1187
1232
self.build_tree(tree)
1188
1233
expected_dirblocks = [
1190
[('0file', '0file', 'file'),
1191
('1dir', '1dir', 'directory'),
1192
('2file', '2file', 'file'),
1195
(('1dir', './1dir'),
1196
[('1dir/0file', '0file', 'file'),
1197
('1dir/1dir', '1dir', 'directory'),
1200
(('1dir/1dir', './1dir/1dir'),
1235
[('0file', '0file', 'file'),
1236
('1dir', '1dir', 'directory'),
1237
('2file', '2file', 'file'),
1240
(('1dir', './1dir'),
1241
[('1dir/0file', '0file', 'file'),
1242
('1dir/1dir', '1dir', 'directory'),
1245
(('1dir/1dir', './1dir/1dir'),
1206
1251
found_bzrdir = False
1207
for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
1208
if len(dirblock) and dirblock[0][1] == b'.bzr':
1252
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1253
if len(dirblock) and dirblock[0][1] == '.bzr':
1209
1254
# this tests the filtering of selected paths
1210
1255
found_bzrdir = True
1211
1256
del dirblock[0]
1212
dirdetail = (dirdetail[0].decode('utf-8'),
1213
osutils.safe_unicode(dirdetail[1]))
1215
(entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
1216
for entry in dirblock]
1217
1257
result.append((dirdetail, dirblock))
1219
1259
self.assertTrue(found_bzrdir)
1238
1278
self.overrideAttr(osutils, '_fs_enc')
1239
1279
self.overrideAttr(osutils, '_selected_dir_reader')
1241
def assertDirReaderIs(self, expected, top):
1281
def assertDirReaderIs(self, expected):
1242
1282
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1243
1283
# Force it to redetect
1244
1284
osutils._selected_dir_reader = None
1245
1285
# Nothing to list, but should still trigger the selection logic
1246
self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
1286
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1247
1287
self.assertIsInstance(osutils._selected_dir_reader, expected)
1249
1289
def test_force_walkdirs_utf8_fs_utf8(self):
1250
1290
self.requireFeature(UTF8DirReaderFeature)
1251
1291
self._save_platform_info()
1252
1292
osutils._fs_enc = 'utf-8'
1253
self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
1293
self.assertDirReaderIs(
1294
UTF8DirReaderFeature.module.UTF8DirReader)
1255
1296
def test_force_walkdirs_utf8_fs_ascii(self):
1256
1297
self.requireFeature(UTF8DirReaderFeature)
1257
1298
self._save_platform_info()
1258
1299
osutils._fs_enc = 'ascii'
1259
1300
self.assertDirReaderIs(
1260
UTF8DirReaderFeature.module.UTF8DirReader, b".")
1301
UTF8DirReaderFeature.module.UTF8DirReader)
1262
1303
def test_force_walkdirs_utf8_fs_latin1(self):
1263
1304
self._save_platform_info()
1264
1305
osutils._fs_enc = 'iso-8859-1'
1265
self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
1306
self.assertDirReaderIs(osutils.UnicodeDirReader)
1267
1308
def test_force_walkdirs_utf8_nt(self):
1268
1309
# Disabled because the thunk of the whole walkdirs api is disabled.
1269
1310
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1270
1311
self._save_platform_info()
1271
1312
from .._walkdirs_win32 import Win32ReadDir
1272
self.assertDirReaderIs(Win32ReadDir, ".")
1313
self.assertDirReaderIs(Win32ReadDir)
1274
1315
def test_unicode_walkdirs(self):
1275
1316
"""Walkdirs should always return unicode paths."""
1287
1328
self.build_tree(tree)
1288
1329
expected_dirblocks = [
1290
[(name0, name0, 'file', './' + name0),
1291
(name1, name1, 'directory', './' + name1),
1292
(name2, name2, 'file', './' + name2),
1295
((name1, './' + name1),
1296
[(name1 + '/' + name0, name0, 'file', './' + name1
1298
(name1 + '/' + name1, name1, 'directory', './' + name1
1302
((name1 + '/' + name1, './' + name1 + '/' + name1),
1331
[(name0, name0, 'file', './' + name0),
1332
(name1, name1, 'directory', './' + name1),
1333
(name2, name2, 'file', './' + name2),
1336
((name1, './' + name1),
1337
[(name1 + '/' + name0, name0, 'file', './' + name1
1339
(name1 + '/' + name1, name1, 'directory', './' + name1
1343
((name1 + '/' + name1, './' + name1 + '/' + name1),
1307
1348
result = list(osutils.walkdirs('.'))
1308
1349
self._filter_out_stat(result)
1309
1350
self.assertEqual(expected_dirblocks, result)
1310
result = list(osutils.walkdirs(u'./' + name1, name1))
1351
result = list(osutils.walkdirs(u'./'+name1, name1))
1311
1352
self._filter_out_stat(result)
1312
1353
self.assertEqual(expected_dirblocks[1:], result)
1333
1374
name2 = name2.encode('utf8')
1335
1376
expected_dirblocks = [
1337
[(name0, name0, 'file', b'./' + name0),
1338
(name1, name1, 'directory', b'./' + name1),
1339
(name2, name2, 'file', b'./' + name2),
1342
((name1, b'./' + name1),
1343
[(name1 + b'/' + name0, name0, 'file', b'./' + name1
1345
(name1 + b'/' + name1, name1, 'directory', b'./' + name1
1349
((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
1378
[(name0, name0, 'file', './' + name0),
1379
(name1, name1, 'directory', './' + name1),
1380
(name2, name2, 'file', './' + name2),
1383
((name1, './' + name1),
1384
[(name1 + '/' + name0, name0, 'file', './' + name1
1386
(name1 + '/' + name1, name1, 'directory', './' + name1
1390
((name1 + '/' + name1, './' + name1 + '/' + name1),
1355
1396
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1356
1397
# all abspaths are Unicode, and encode them back into utf8.
1357
1398
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1358
self.assertIsInstance(dirdetail[0], bytes)
1359
if isinstance(dirdetail[1], str):
1399
self.assertIsInstance(dirdetail[0], str)
1400
if isinstance(dirdetail[1], unicode):
1360
1401
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1361
1402
dirblock = [list(info) for info in dirblock]
1362
1403
for info in dirblock:
1363
self.assertIsInstance(info[4], str)
1404
self.assertIsInstance(info[4], unicode)
1364
1405
info[4] = info[4].encode('utf8')
1365
1406
new_dirblock = []
1366
1407
for info in dirblock:
1367
self.assertIsInstance(info[0], bytes)
1368
self.assertIsInstance(info[1], bytes)
1369
self.assertIsInstance(info[4], bytes)
1408
self.assertIsInstance(info[0], str)
1409
self.assertIsInstance(info[1], str)
1410
self.assertIsInstance(info[4], str)
1370
1411
# Remove the stat information
1371
1412
new_dirblock.append((info[0], info[1], info[2], info[4]))
1372
1413
result.append((dirdetail, new_dirblock))
1400
1441
# All of the abspaths should be in unicode, all of the relative paths
1401
1442
# should be in utf8
1402
1443
expected_dirblocks = [
1404
[(name0, name0, 'file', './' + name0u),
1405
(name1, name1, 'directory', './' + name1u),
1406
(name2, name2, 'file', './' + name2u),
1409
((name1, './' + name1u),
1410
[(name1 + b'/' + name0, name0, 'file', './' + name1u
1412
(name1 + b'/' + name1, name1, 'directory', './' + name1u
1416
((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1445
[(name0, name0, 'file', './' + name0u),
1446
(name1, name1, 'directory', './' + name1u),
1447
(name2, name2, 'file', './' + name2u),
1450
((name1, './' + name1u),
1451
[(name1 + '/' + name0, name0, 'file', './' + name1u
1453
(name1 + '/' + name1, name1, 'directory', './' + name1u
1457
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1421
1462
result = list(osutils._walkdirs_utf8('.'))
1422
1463
self._filter_out_stat(result)
1446
1487
# All of the abspaths should be in unicode, all of the relative paths
1447
1488
# should be in utf8
1448
1489
expected_dirblocks = [
1450
[(name0, name0, 'file', './' + name0u),
1451
(name1, name1, 'directory', './' + name1u),
1452
(name2, name2, 'file', './' + name2u),
1455
((name1, './' + name1u),
1456
[(name1 + '/' + name0, name0, 'file', './' + name1u
1458
(name1 + '/' + name1, name1, 'directory', './' + name1u
1462
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1491
[(name0, name0, 'file', './' + name0u),
1492
(name1, name1, 'directory', './' + name1u),
1493
(name2, name2, 'file', './' + name2u),
1496
((name1, './' + name1u),
1497
[(name1 + '/' + name0, name0, 'file', './' + name1u
1499
(name1 + '/' + name1, name1, 'directory', './' + name1u
1503
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1467
1508
result = list(osutils._walkdirs_utf8(u'.'))
1468
1509
self._filter_out_stat(result)
1618
1662
def test_copy_tree_handlers(self):
1619
1663
processed_files = []
1620
1664
processed_links = []
1622
1665
def file_handler(from_path, to_path):
1623
1666
processed_files.append(('f', from_path, to_path))
1625
1667
def dir_handler(from_path, to_path):
1626
1668
processed_files.append(('d', from_path, to_path))
1628
1669
def link_handler(from_path, to_path):
1629
1670
processed_links.append((from_path, to_path))
1630
1671
handlers = {'file': file_handler,
1631
1672
'directory': dir_handler,
1632
1673
'symlink': link_handler,
1635
1676
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1636
1677
if osutils.has_symlinks():
1641
1682
('f', 'source/a', 'target/a'),
1642
1683
('d', 'source/b', 'target/b'),
1643
1684
('f', 'source/b/c', 'target/b/c'),
1645
1686
self.assertPathDoesNotExist('target')
1646
1687
if osutils.has_symlinks():
1647
1688
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1695
1735
old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
1696
1736
self.assertEqual('foo', old)
1697
1737
self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
1698
self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
1738
self.assertFalse('BRZ_TEST_ENV_VAR' in os.environ)
1701
1741
class TestSizeShaFile(tests.TestCaseInTempDir):
1703
1743
def test_sha_empty(self):
1704
self.build_tree_contents([('foo', b'')])
1705
expected_sha = osutils.sha_string(b'')
1744
self.build_tree_contents([('foo', '')])
1745
expected_sha = osutils.sha_string('')
1706
1746
f = open('foo')
1707
1747
self.addCleanup(f.close)
1708
1748
size, sha = osutils.size_sha_file(f)
1723
1763
class TestShaFileByName(tests.TestCaseInTempDir):
1725
1765
def test_sha_empty(self):
1726
self.build_tree_contents([('foo', b'')])
1727
expected_sha = osutils.sha_string(b'')
1766
self.build_tree_contents([('foo', '')])
1767
expected_sha = osutils.sha_string('')
1728
1768
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1730
1770
def test_sha_mixed_endings(self):
1731
text = b'test\r\nwith\nall\rpossible line endings\r\n'
1771
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1732
1772
self.build_tree_contents([('foo', text)])
1733
1773
expected_sha = osutils.sha_string(text)
1734
1774
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1774
1814
expected_dirblocks = [
1776
[(b'0file', b'0file', 'file', './0file'),
1777
(b'1dir', b'1dir', 'directory', './1dir'),
1778
(b'2file', b'2file', 'file', './2file'),
1781
((b'1dir', './1dir'),
1782
[(b'1dir/0file', b'0file', 'file', './1dir/0file'),
1783
(b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
1786
((b'1dir/1dir', './1dir/1dir'),
1816
[('0file', '0file', 'file'),
1817
('1dir', '1dir', 'directory'),
1818
('2file', '2file', 'file'),
1821
(('1dir', './1dir'),
1822
[('1dir/0file', '0file', 'file'),
1823
('1dir/1dir', '1dir', 'directory'),
1826
(('1dir/1dir', './1dir/1dir'),
1791
1831
return tree, expected_dirblocks
1796
1836
result = list(osutils._walkdirs_utf8('.'))
1797
1837
# Filter out stat and abspath
1798
1838
self.assertEqual(expected_dirblocks,
1799
self._filter_out(result))
1839
[(dirinfo, [line[0:3] for line in block])
1840
for dirinfo, block in result])
1801
1842
def test_walk_sub_dir(self):
1802
1843
tree, expected_dirblocks = self._get_ascii_tree()
1803
1844
self.build_tree(tree)
1804
1845
# you can search a subdir only, with a supplied prefix.
1805
result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
1846
result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1806
1847
# Filter out stat and abspath
1807
1848
self.assertEqual(expected_dirblocks[1:],
1808
self._filter_out(result))
1849
[(dirinfo, [line[0:3] for line in block])
1850
for dirinfo, block in result])
1810
1852
def _get_unicode_tree(self):
1811
1853
name0u = u'0file-\xb6'
1822
1864
name1 = name1u.encode('UTF-8')
1823
1865
name2 = name2u.encode('UTF-8')
1824
1866
expected_dirblocks = [
1826
[(name0, name0, 'file', './' + name0u),
1827
(name1, name1, 'directory', './' + name1u),
1828
(name2, name2, 'file', './' + name2u),
1831
((name1, './' + name1u),
1832
[(name1 + b'/' + name0, name0, 'file', './' + name1u
1834
(name1 + b'/' + name1, name1, 'directory', './' + name1u
1838
((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1868
[(name0, name0, 'file', './' + name0u),
1869
(name1, name1, 'directory', './' + name1u),
1870
(name2, name2, 'file', './' + name2u),
1873
((name1, './' + name1u),
1874
[(name1 + '/' + name0, name0, 'file', './' + name1u
1876
(name1 + '/' + name1, name1, 'directory', './' + name1u
1880
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1843
1885
return tree, expected_dirblocks
1870
1911
target = u'target\N{Euro Sign}'
1871
1912
link_name = u'l\N{Euro Sign}nk'
1872
1913
os.symlink(target, link_name)
1914
target_utf8 = target.encode('UTF-8')
1873
1915
link_name_utf8 = link_name.encode('UTF-8')
1874
1916
expected_dirblocks = [
1876
[(link_name_utf8, link_name_utf8,
1877
'symlink', './' + link_name), ],
1918
[(link_name_utf8, link_name_utf8,
1919
'symlink', './' + link_name),],
1879
1921
result = list(osutils._walkdirs_utf8('.'))
1880
1922
self.assertEqual(expected_dirblocks, self._filter_out(result))
1899
1940
os.symlink(self.target, self.link)
1901
1942
def test_os_readlink_link_encoding(self):
1902
self.assertEqual(self.target, os.readlink(self.link))
1943
self.assertEqual(self.target, os.readlink(self.link))
1904
1945
def test_os_readlink_link_decoding(self):
1905
1946
self.assertEqual(self.target.encode(osutils._fs_enc),
1906
os.readlink(self.link.encode(osutils._fs_enc)))
1947
os.readlink(self.link.encode(osutils._fs_enc)))
1909
1950
class TestConcurrency(tests.TestCase):
1948
1989
def test_failure_to_load(self):
1949
1990
self._try_loading()
1950
1991
self.assertLength(1, osutils._extension_load_failures)
1952
osutils._extension_load_failures[0],
1953
"No module named 'breezy._fictional_extension_py'")
1992
self.assertEqual(osutils._extension_load_failures[0],
1993
"No module named _fictional_extension_py")
1955
1995
def test_report_extension_load_failures_no_warning(self):
1956
1996
self.assertTrue(self._try_loading())
1957
warnings, result = self.callCatchWarnings(
1958
osutils.report_extension_load_failures)
1997
warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1959
1998
# it used to give a Python warning; it no longer does
1960
1999
self.assertLength(0, warnings)
2093
2132
self.assertEqual(self.gid, s.st_gid)
2135
class TestPathFromEnviron(tests.TestCase):
2137
def test_is_unicode(self):
2138
self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
2139
path = osutils.path_from_environ('BRZ_TEST_PATH')
2140
self.assertIsInstance(path, unicode)
2141
self.assertEqual(u'./anywhere at all/', path)
2143
def test_posix_path_env_ascii(self):
2144
self.overrideEnv('BRZ_TEST_PATH', '/tmp')
2145
home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
2146
self.assertIsInstance(home, unicode)
2147
self.assertEqual(u'/tmp', home)
2149
def test_posix_path_env_unicode(self):
2150
self.requireFeature(features.ByteStringNamedFilesystem)
2151
self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
2152
self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2153
self.assertEqual(u'/home/\xa7test',
2154
osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2155
osutils._fs_enc = "iso8859-5"
2156
self.assertEqual(u'/home/\u0407test',
2157
osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2158
osutils._fs_enc = "utf-8"
2159
self.assertRaises(errors.BadFilenameEncoding,
2160
osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
2096
2163
class TestGetHomeDir(tests.TestCase):
2098
2165
def test_is_unicode(self):
2099
2166
home = osutils._get_home_dir()
2100
self.assertIsInstance(home, str)
2167
self.assertIsInstance(home, unicode)
2102
2169
def test_posix_homeless(self):
2103
2170
self.overrideEnv('HOME', None)
2104
2171
home = osutils._get_home_dir()
2105
self.assertIsInstance(home, str)
2172
self.assertIsInstance(home, unicode)
2107
2174
def test_posix_home_ascii(self):
2108
2175
self.overrideEnv('HOME', '/home/test')
2109
2176
home = osutils._posix_get_home_dir()
2110
self.assertIsInstance(home, str)
2177
self.assertIsInstance(home, unicode)
2111
2178
self.assertEqual(u'/home/test', home)
2113
2180
def test_posix_home_unicode(self):
2116
2183
self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2117
2184
self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2118
2185
osutils._fs_enc = "iso8859-5"
2119
# In python 3, os.environ returns unicode
2120
self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2186
self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
2187
osutils._fs_enc = "utf-8"
2188
self.assertRaises(errors.BadFilenameEncoding,
2189
osutils._posix_get_home_dir)
2123
2192
class TestGetuserUnicode(tests.TestCase):
2125
2194
def test_is_unicode(self):
2126
2195
user = osutils.getuser_unicode()
2127
self.assertIsInstance(user, str)
2196
self.assertIsInstance(user, unicode)
2129
2198
def envvar_to_override(self):
2130
2199
if sys.platform == "win32":
2131
2200
# Disable use of platform calls on windows so envvar is used
2132
2201
self.overrideAttr(win32utils, 'has_ctypes', False)
2133
return 'USERNAME' # only variable used on windows
2134
return 'LOGNAME' # first variable checked by getpass.getuser()
2202
return 'USERNAME' # only variable used on windows
2203
return 'LOGNAME' # first variable checked by getpass.getuser()
2136
2205
def test_ascii_user(self):
2137
2206
self.overrideEnv(self.envvar_to_override(), 'jrandom')
2194
2262
self.assertTrue(
2195
2263
osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2196
2264
self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
2198
2266
def test_windows_app_path(self):
2199
2267
if sys.platform != 'win32':
2200
2268
raise tests.TestSkipped('test requires win32')
2201
2269
# Override PATH env var so that exe can only be found on App Path
2202
2270
self.overrideEnv('PATH', '')
2203
2271
# Internt Explorer is always registered in the App Path
2204
self.assertTrue(osutils.find_executable_on_path(
2205
'iexplore') is not None)
2272
self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
2207
2274
def test_other(self):
2208
2275
if sys.platform == 'win32':
2212
2279
osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2215
class SupportsExecutableTests(tests.TestCaseInTempDir):
2217
def test_returns_bool(self):
2218
self.assertIsInstance(osutils.supports_executable(self.test_dir), bool)
2221
class SupportsSymlinksTests(tests.TestCaseInTempDir):
2223
def test_returns_bool(self):
2224
self.assertIsInstance(osutils.supports_symlinks(self.test_dir), bool)
2227
class MtabReader(tests.TestCaseInTempDir):
2229
def test_read_mtab(self):
2230
self.build_tree_contents([('mtab', """\
2231
/dev/mapper/blah--vg-root / ext4 rw,relatime,errors=remount-ro 0 0
2232
/dev/mapper/blah--vg-home /home vfat rw,relatime 0 0
2238
list(osutils.read_mtab('mtab')),
2240
(b'/home', 'vfat')])
2243
class GetFsTypeTests(tests.TestCaseInTempDir):
2245
def test_returns_string_or_none(self):
2246
ret = osutils.get_fs_type(self.test_dir)
2247
self.assertTrue(isinstance(ret, str) or ret is None)
2249
def test_returns_most_specific(self):
2251
osutils, '_FILESYSTEM_FINDER',
2252
osutils.FilesystemFinder(
2253
[(b'/', 'ext4'), (b'/home', 'vfat'),
2254
(b'/home/jelmer', 'ext2')]))
2255
self.assertEqual(osutils.get_fs_type(b'/home/jelmer/blah'), 'ext2')
2256
self.assertEqual(osutils.get_fs_type('/home/jelmer/blah'), 'ext2')
2257
self.assertEqual(osutils.get_fs_type(b'/home/jelmer'), 'ext2')
2258
self.assertEqual(osutils.get_fs_type(b'/home/martin'), 'vfat')
2259
self.assertEqual(osutils.get_fs_type(b'/home'), 'vfat')
2260
self.assertEqual(osutils.get_fs_type(b'/other'), 'ext4')
2262
def test_returns_none(self):
2264
osutils, '_FILESYSTEM_FINDER',
2265
osutils.FilesystemFinder([]))
2266
self.assertIs(osutils.get_fs_type('/home/jelmer/blah'), None)
2267
self.assertIs(osutils.get_fs_type(b'/home/jelmer/blah'), None)
2268
self.assertIs(osutils.get_fs_type('/home/jelmer'), None)
2282
class TestEnvironmentErrors(tests.TestCase):
2283
"""Test handling of environmental errors"""
2285
def test_is_oserror(self):
2286
self.assertTrue(osutils.is_environment_error(
2287
OSError(errno.EINVAL, "Invalid parameter")))
2289
def test_is_ioerror(self):
2290
self.assertTrue(osutils.is_environment_error(
2291
IOError(errno.EINVAL, "Invalid parameter")))
2293
def test_is_socket_error(self):
2294
self.assertTrue(osutils.is_environment_error(
2295
socket.error(errno.EINVAL, "Invalid parameter")))
2297
def test_is_select_error(self):
2298
self.assertTrue(osutils.is_environment_error(
2299
select.error(errno.EINVAL, "Invalid parameter")))
2301
def test_is_pywintypes_error(self):
2302
self.requireFeature(features.pywintypes)
2304
self.assertTrue(osutils.is_environment_error(
2305
pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))