137
131
def test_fancy_rename(self):
138
132
# This should work everywhere
139
self.create_file('a', 'something in a\n')
133
self.create_file('a', b'something in a\n')
140
134
self._fancy_rename('a', 'b')
141
135
self.assertPathDoesNotExist('a')
142
136
self.assertPathExists('b')
143
self.check_file_contents('b', 'something in a\n')
137
self.check_file_contents('b', b'something in a\n')
145
self.create_file('a', 'new something in a\n')
139
self.create_file('a', b'new something in a\n')
146
140
self._fancy_rename('b', 'a')
148
self.check_file_contents('a', 'something in a\n')
142
self.check_file_contents('a', b'something in a\n')
150
144
def test_fancy_rename_fails_source_missing(self):
151
145
# An exception should be raised, and the target should be left in place
152
self.create_file('target', 'data in target\n')
146
self.create_file('target', b'data in target\n')
153
147
self.assertRaises((IOError, OSError), self._fancy_rename,
154
148
'missingsource', 'target')
155
149
self.assertPathExists('target')
156
self.check_file_contents('target', 'data in target\n')
150
self.check_file_contents('target', b'data in target\n')
158
152
def test_fancy_rename_fails_if_source_and_target_missing(self):
159
153
self.assertRaises((IOError, OSError), self._fancy_rename,
162
156
def test_rename(self):
163
157
# Rename should be semi-atomic on all platforms
164
self.create_file('a', 'something in a\n')
158
self.create_file('a', b'something in a\n')
165
159
osutils.rename('a', 'b')
166
160
self.assertPathDoesNotExist('a')
167
161
self.assertPathExists('b')
168
self.check_file_contents('b', 'something in a\n')
162
self.check_file_contents('b', b'something in a\n')
170
self.create_file('a', 'new something in a\n')
164
self.create_file('a', b'new something in a\n')
171
165
osutils.rename('b', 'a')
173
self.check_file_contents('a', 'something in a\n')
167
self.check_file_contents('a', b'something in a\n')
175
169
# TODO: test fancy_rename using a MemoryTransport
255
249
# Without it, we may end up re-reading content when we don't have
256
250
# to, but otherwise it doesn't effect correctness.
257
251
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
258
f = open('test-file.txt', 'wb')
259
self.addCleanup(f.close)
260
f.write('some content\n')
262
self.assertEqualStat(osutils.fstat(f.fileno()),
263
osutils.lstat('test-file.txt'))
252
with open('test-file.txt', 'wb') as f:
253
f.write(b'some content\n')
255
self.assertEqualStat(osutils.fstat(f.fileno()),
256
osutils.lstat('test-file.txt'))
266
259
class TestRmTree(tests.TestCaseInTempDir):
404
398
def test_format_date(self):
405
399
self.assertRaises(osutils.UnsupportedTimezoneFormat,
406
osutils.format_date, 0, timezone='foo')
400
osutils.format_date, 0, timezone='foo')
407
401
self.assertIsInstance(osutils.format_date(0), str)
408
self.assertIsInstance(osutils.format_local_date(0), unicode)
402
self.assertIsInstance(osutils.format_local_date(0), str)
409
403
# Testing for the actual value of the local weekday without
410
404
# duplicating the code from format_date is difficult.
411
405
# Instead blackbox.test_locale should check for localized
414
408
def test_format_date_with_offset_in_original_timezone(self):
415
409
self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
416
osutils.format_date_with_offset_in_original_timezone(0))
410
osutils.format_date_with_offset_in_original_timezone(0))
417
411
self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
418
osutils.format_date_with_offset_in_original_timezone(100000))
412
osutils.format_date_with_offset_in_original_timezone(100000))
419
413
self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
420
osutils.format_date_with_offset_in_original_timezone(100000, 7200))
414
osutils.format_date_with_offset_in_original_timezone(100000, 7200))
422
416
def test_local_time_offset(self):
423
417
"""Test that local_time_offset() returns a sane value."""
619
612
# read (max // 2) bytes and verify read size wasn't affected
620
613
num_bytes_to_read = self.block_size // 2
621
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
614
osutils.pumpfile(from_file, to_file,
615
num_bytes_to_read, self.block_size)
622
616
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
623
617
self.assertEqual(from_file.get_read_count(), 1)
625
619
# read (max) bytes and verify read size wasn't affected
626
620
num_bytes_to_read = self.block_size
627
621
from_file.reset_read_count()
628
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
622
osutils.pumpfile(from_file, to_file,
623
num_bytes_to_read, self.block_size)
629
624
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
630
625
self.assertEqual(from_file.get_read_count(), 1)
632
627
# read (max + 1) bytes and verify read size was limited
633
628
num_bytes_to_read = self.block_size + 1
634
629
from_file.reset_read_count()
635
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
630
osutils.pumpfile(from_file, to_file,
631
num_bytes_to_read, self.block_size)
636
632
self.assertEqual(from_file.get_max_read_size(), self.block_size)
637
633
self.assertEqual(from_file.get_read_count(), 2)
639
635
# finish reading the rest of the data
640
636
num_bytes_to_read = self.test_data_len - to_file.tell()
641
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
637
osutils.pumpfile(from_file, to_file,
638
num_bytes_to_read, self.block_size)
643
640
# report error if the data wasn't equal (we only report the size due
644
641
# to the length of the data)
735
733
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
736
734
report_activity=log_activity, direction='read')
737
self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
736
[(500, 'read'), (500, 'read'), (28, 'read')], activity)
741
739
class TestPumpStringFile(tests.TestCase):
792
790
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
794
792
def test_from_utf8_string(self):
795
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
793
self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
797
795
def test_bad_utf8_string(self):
798
796
self.assertRaises(errors.BzrBadParameterNotUnicode,
799
797
osutils.safe_unicode,
803
801
class TestSafeUtf8(tests.TestCase):
805
803
def test_from_ascii_string(self):
807
self.assertEqual('foobar', osutils.safe_utf8(f))
805
self.assertEqual(b'foobar', osutils.safe_utf8(f))
809
807
def test_from_unicode_string_ascii_contents(self):
810
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
808
self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
812
810
def test_from_unicode_string_unicode_contents(self):
813
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
811
self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
815
813
def test_from_utf8_string(self):
816
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
814
self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
818
816
def test_bad_utf8_string(self):
819
817
self.assertRaises(errors.BzrBadParameterNotUnicode,
820
osutils.safe_utf8, '\xbb\xbb')
818
osutils.safe_utf8, b'\xbb\xbb')
823
821
class TestSafeRevisionId(tests.TestCase):
825
823
def test_from_ascii_string(self):
826
824
# this shouldn't give a warning because it's getting an ascii string
827
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
825
self.assertEqual(b'foobar', osutils.safe_revision_id(b'foobar'))
829
827
def test_from_unicode_string_ascii_contents(self):
830
828
self.assertRaises(TypeError,
833
831
def test_from_unicode_string_unicode_contents(self):
834
832
self.assertRaises(TypeError,
835
osutils.safe_revision_id, u'bargam\xae')
833
osutils.safe_revision_id, u'bargam\xae')
837
835
def test_from_utf8_string(self):
838
self.assertEqual('foo\xc2\xae',
839
osutils.safe_revision_id('foo\xc2\xae'))
836
self.assertEqual(b'foo\xc2\xae',
837
osutils.safe_revision_id(b'foo\xc2\xae'))
841
839
def test_none(self):
842
840
"""Currently, None is a valid revision_id"""
986
986
def test_minimum_path_selection(self):
987
987
self.assertEqual(set(),
988
osutils.minimum_path_selection([]))
988
osutils.minimum_path_selection([]))
989
989
self.assertEqual({'a'},
990
osutils.minimum_path_selection(['a']))
990
osutils.minimum_path_selection(['a']))
991
991
self.assertEqual({'a', 'b'},
992
osutils.minimum_path_selection(['a', 'b']))
993
self.assertEqual({'a/', 'b'},
994
osutils.minimum_path_selection(['a/', 'b']))
995
self.assertEqual({'a/', 'b'},
996
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
992
osutils.minimum_path_selection(['a', 'b']))
993
self.assertEqual({'a/', 'b'},
994
osutils.minimum_path_selection(['a/', 'b']))
995
self.assertEqual({'a/', 'b'},
996
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
997
997
self.assertEqual({'a-b', 'a', 'a0b'},
998
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
998
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
1000
1000
def test_mkdtemp(self):
1001
1001
tmpdir = osutils._win32_mkdtemp(dir='.')
1002
1002
self.assertFalse('\\' in tmpdir)
1004
1004
def test_rename(self):
1005
with open('a', 'wb') as a:
1007
with open('b', 'wb') as b:
1012
1010
osutils._win32_rename('b', 'a')
1013
1011
self.assertPathExists('a')
1014
1012
self.assertPathDoesNotExist('b')
1015
self.assertFileEqual('baz\n', 'a')
1013
self.assertFileEqual(b'baz\n', 'a')
1017
1015
def test_rename_missing_file(self):
1016
with open('a', 'wb') as a:
1023
1020
osutils._win32_rename('b', 'a')
1024
1021
except (IOError, OSError) as e:
1025
1022
self.assertEqual(errno.ENOENT, e.errno)
1026
self.assertFileEqual('foo\n', 'a')
1023
self.assertFileEqual(b'foo\n', 'a')
1028
1025
def test_rename_missing_dir(self):
1052
1049
check(['a', 'b'], 'a/b')
1053
1050
check(['a', 'b'], 'a/./b')
1054
1051
check(['a', '.b'], 'a/.b')
1055
check(['a', '.b'], 'a\\.b')
1052
if os.path.sep == '\\':
1053
check(['a', '.b'], 'a\\.b')
1055
check(['a\\.b'], 'a\\.b')
1057
1057
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
1086
1086
class TestChunksToLines(tests.TestCase):
1088
1088
def test_smoketest(self):
1089
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
1090
osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
1091
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
1092
osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
1089
self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1090
osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
1091
self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1092
osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
1094
1094
def test_osutils_binding(self):
1095
1095
from . import test__chunks_to_lines
1132
1132
self.build_tree(tree)
1133
1133
expected_dirblocks = [
1135
[('0file', '0file', 'file'),
1136
('1dir', '1dir', 'directory'),
1137
('2file', '2file', 'file'),
1140
(('1dir', './1dir'),
1141
[('1dir/0file', '0file', 'file'),
1142
('1dir/1dir', '1dir', 'directory'),
1145
(('1dir/1dir', './1dir/1dir'),
1135
[('0file', '0file', 'file'),
1136
('1dir', '1dir', 'directory'),
1137
('2file', '2file', 'file'),
1140
(('1dir', './1dir'),
1141
[('1dir/0file', '0file', 'file'),
1142
('1dir/1dir', '1dir', 'directory'),
1145
(('1dir/1dir', './1dir/1dir'),
1151
1151
found_bzrdir = False
1180
1180
# (It would be ok if it happened earlier but at the moment it
1182
1182
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1183
self.assertEqual('./test-unreadable', e.filename)
1183
self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
1184
1184
self.assertEqual(errno.EACCES, e.errno)
1185
1185
# Ensure the message contains the file name
1186
1186
self.assertContainsRe(str(e), "\\./test-unreadable")
1189
1188
def test_walkdirs_encoding_error(self):
1190
1189
# <https://bugs.launchpad.net/bzr/+bug/488519>
1191
1190
# walkdirs didn't raise a useful message when the filenames
1232
1231
self.build_tree(tree)
1233
1232
expected_dirblocks = [
1235
[('0file', '0file', 'file'),
1236
('1dir', '1dir', 'directory'),
1237
('2file', '2file', 'file'),
1240
(('1dir', './1dir'),
1241
[('1dir/0file', '0file', 'file'),
1242
('1dir/1dir', '1dir', 'directory'),
1245
(('1dir/1dir', './1dir/1dir'),
1234
[('0file', '0file', 'file'),
1235
('1dir', '1dir', 'directory'),
1236
('2file', '2file', 'file'),
1239
(('1dir', './1dir'),
1240
[('1dir/0file', '0file', 'file'),
1241
('1dir/1dir', '1dir', 'directory'),
1244
(('1dir/1dir', './1dir/1dir'),
1251
1250
found_bzrdir = False
1252
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1253
if len(dirblock) and dirblock[0][1] == '.bzr':
1251
for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
1252
if len(dirblock) and dirblock[0][1] == b'.bzr':
1254
1253
# this tests the filtering of selected paths
1255
1254
found_bzrdir = True
1256
1255
del dirblock[0]
1256
dirdetail = (dirdetail[0].decode('utf-8'),
1257
osutils.safe_unicode(dirdetail[1]))
1259
(entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
1260
for entry in dirblock]
1257
1261
result.append((dirdetail, dirblock))
1259
1263
self.assertTrue(found_bzrdir)
1278
1282
self.overrideAttr(osutils, '_fs_enc')
1279
1283
self.overrideAttr(osutils, '_selected_dir_reader')
1281
def assertDirReaderIs(self, expected):
1285
def assertDirReaderIs(self, expected, top):
1282
1286
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1283
1287
# Force it to redetect
1284
1288
osutils._selected_dir_reader = None
1285
1289
# Nothing to list, but should still trigger the selection logic
1286
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1290
self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
1287
1291
self.assertIsInstance(osutils._selected_dir_reader, expected)
1289
1293
def test_force_walkdirs_utf8_fs_utf8(self):
1290
1294
self.requireFeature(UTF8DirReaderFeature)
1291
1295
self._save_platform_info()
1292
1296
osutils._fs_enc = 'utf-8'
1293
self.assertDirReaderIs(
1294
UTF8DirReaderFeature.module.UTF8DirReader)
1297
self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
1296
1299
def test_force_walkdirs_utf8_fs_ascii(self):
1297
1300
self.requireFeature(UTF8DirReaderFeature)
1298
1301
self._save_platform_info()
1299
1302
osutils._fs_enc = 'ascii'
1300
1303
self.assertDirReaderIs(
1301
UTF8DirReaderFeature.module.UTF8DirReader)
1304
UTF8DirReaderFeature.module.UTF8DirReader, b".")
1303
1306
def test_force_walkdirs_utf8_fs_latin1(self):
1304
1307
self._save_platform_info()
1305
1308
osutils._fs_enc = 'iso-8859-1'
1306
self.assertDirReaderIs(osutils.UnicodeDirReader)
1309
self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
1308
1311
def test_force_walkdirs_utf8_nt(self):
1309
1312
# Disabled because the thunk of the whole walkdirs api is disabled.
1310
1313
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1311
1314
self._save_platform_info()
1312
1315
from .._walkdirs_win32 import Win32ReadDir
1313
self.assertDirReaderIs(Win32ReadDir)
1316
self.assertDirReaderIs(Win32ReadDir, ".")
1315
1318
def test_unicode_walkdirs(self):
1316
1319
"""Walkdirs should always return unicode paths."""
1328
1331
self.build_tree(tree)
1329
1332
expected_dirblocks = [
1331
[(name0, name0, 'file', './' + name0),
1332
(name1, name1, 'directory', './' + name1),
1333
(name2, name2, 'file', './' + name2),
1336
((name1, './' + name1),
1337
[(name1 + '/' + name0, name0, 'file', './' + name1
1339
(name1 + '/' + name1, name1, 'directory', './' + name1
1343
((name1 + '/' + name1, './' + name1 + '/' + name1),
1334
[(name0, name0, 'file', './' + name0),
1335
(name1, name1, 'directory', './' + name1),
1336
(name2, name2, 'file', './' + name2),
1339
((name1, './' + name1),
1340
[(name1 + '/' + name0, name0, 'file', './' + name1
1342
(name1 + '/' + name1, name1, 'directory', './' + name1
1346
((name1 + '/' + name1, './' + name1 + '/' + name1),
1348
1351
result = list(osutils.walkdirs('.'))
1349
1352
self._filter_out_stat(result)
1350
1353
self.assertEqual(expected_dirblocks, result)
1351
result = list(osutils.walkdirs(u'./'+name1, name1))
1354
result = list(osutils.walkdirs(u'./' + name1, name1))
1352
1355
self._filter_out_stat(result)
1353
1356
self.assertEqual(expected_dirblocks[1:], result)
1374
1377
name2 = name2.encode('utf8')
1376
1379
expected_dirblocks = [
1378
[(name0, name0, 'file', './' + name0),
1379
(name1, name1, 'directory', './' + name1),
1380
(name2, name2, 'file', './' + name2),
1383
((name1, './' + name1),
1384
[(name1 + '/' + name0, name0, 'file', './' + name1
1386
(name1 + '/' + name1, name1, 'directory', './' + name1
1390
((name1 + '/' + name1, './' + name1 + '/' + name1),
1381
[(name0, name0, 'file', b'./' + name0),
1382
(name1, name1, 'directory', b'./' + name1),
1383
(name2, name2, 'file', b'./' + name2),
1386
((name1, b'./' + name1),
1387
[(name1 + b'/' + name0, name0, 'file', b'./' + name1
1389
(name1 + b'/' + name1, name1, 'directory', b'./' + name1
1393
((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
1396
1399
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1397
1400
# all abspaths are Unicode, and encode them back into utf8.
1398
1401
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1399
self.assertIsInstance(dirdetail[0], str)
1400
if isinstance(dirdetail[1], unicode):
1402
self.assertIsInstance(dirdetail[0], bytes)
1403
if isinstance(dirdetail[1], str):
1401
1404
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1402
1405
dirblock = [list(info) for info in dirblock]
1403
1406
for info in dirblock:
1404
self.assertIsInstance(info[4], unicode)
1407
self.assertIsInstance(info[4], str)
1405
1408
info[4] = info[4].encode('utf8')
1406
1409
new_dirblock = []
1407
1410
for info in dirblock:
1408
self.assertIsInstance(info[0], str)
1409
self.assertIsInstance(info[1], str)
1410
self.assertIsInstance(info[4], str)
1411
self.assertIsInstance(info[0], bytes)
1412
self.assertIsInstance(info[1], bytes)
1413
self.assertIsInstance(info[4], bytes)
1411
1414
# Remove the stat information
1412
1415
new_dirblock.append((info[0], info[1], info[2], info[4]))
1413
1416
result.append((dirdetail, new_dirblock))
1441
1444
# All of the abspaths should be in unicode, all of the relative paths
1442
1445
# should be in utf8
1443
1446
expected_dirblocks = [
1445
[(name0, name0, 'file', './' + name0u),
1446
(name1, name1, 'directory', './' + name1u),
1447
(name2, name2, 'file', './' + name2u),
1450
((name1, './' + name1u),
1451
[(name1 + '/' + name0, name0, 'file', './' + name1u
1453
(name1 + '/' + name1, name1, 'directory', './' + name1u
1457
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1448
[(name0, name0, 'file', './' + name0u),
1449
(name1, name1, 'directory', './' + name1u),
1450
(name2, name2, 'file', './' + name2u),
1453
((name1, './' + name1u),
1454
[(name1 + b'/' + name0, name0, 'file', './' + name1u
1456
(name1 + b'/' + name1, name1, 'directory', './' + name1u
1460
((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1462
1465
result = list(osutils._walkdirs_utf8('.'))
1463
1466
self._filter_out_stat(result)
1487
1490
# All of the abspaths should be in unicode, all of the relative paths
1488
1491
# should be in utf8
1489
1492
expected_dirblocks = [
1491
[(name0, name0, 'file', './' + name0u),
1492
(name1, name1, 'directory', './' + name1u),
1493
(name2, name2, 'file', './' + name2u),
1496
((name1, './' + name1u),
1497
[(name1 + '/' + name0, name0, 'file', './' + name1u
1499
(name1 + '/' + name1, name1, 'directory', './' + name1u
1503
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1494
[(name0, name0, 'file', './' + name0u),
1495
(name1, name1, 'directory', './' + name1u),
1496
(name2, name2, 'file', './' + name2u),
1499
((name1, './' + name1u),
1500
[(name1 + '/' + name0, name0, 'file', './' + name1u
1502
(name1 + '/' + name1, name1, 'directory', './' + name1u
1506
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1508
1511
result = list(osutils._walkdirs_utf8(u'.'))
1509
1512
self._filter_out_stat(result)
1662
1662
def test_copy_tree_handlers(self):
1663
1663
processed_files = []
1664
1664
processed_links = []
1665
1666
def file_handler(from_path, to_path):
1666
1667
processed_files.append(('f', from_path, to_path))
1667
1669
def dir_handler(from_path, to_path):
1668
1670
processed_files.append(('d', from_path, to_path))
1669
1672
def link_handler(from_path, to_path):
1670
1673
processed_links.append((from_path, to_path))
1671
1674
handlers = {'file': file_handler,
1672
1675
'directory': dir_handler,
1673
1676
'symlink': link_handler,
1676
1679
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1677
1680
if osutils.has_symlinks():
1682
1685
('f', 'source/a', 'target/a'),
1683
1686
('d', 'source/b', 'target/b'),
1684
1687
('f', 'source/b/c', 'target/b/c'),
1686
1689
self.assertPathDoesNotExist('target')
1687
1690
if osutils.has_symlinks():
1688
1691
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1726
1730
'Cannot find a unicode character that works in encoding %s'
1727
1731
% (osutils.get_user_encoding(),))
1729
old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
1730
self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1733
osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
1734
self.assertEqual(uni_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1732
1736
def test_unset(self):
1733
1737
"""Test that passing None will remove the env var"""
1735
1739
old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
1736
1740
self.assertEqual('foo', old)
1737
1741
self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
1738
self.assertFalse('BRZ_TEST_ENV_VAR' in os.environ)
1742
self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
1741
1745
class TestSizeShaFile(tests.TestCaseInTempDir):
1743
1747
def test_sha_empty(self):
1744
self.build_tree_contents([('foo', '')])
1745
expected_sha = osutils.sha_string('')
1748
self.build_tree_contents([('foo', b'')])
1749
expected_sha = osutils.sha_string(b'')
1746
1750
f = open('foo')
1747
1751
self.addCleanup(f.close)
1748
1752
size, sha = osutils.size_sha_file(f)
1763
1767
class TestShaFileByName(tests.TestCaseInTempDir):
1765
1769
def test_sha_empty(self):
1766
self.build_tree_contents([('foo', '')])
1767
expected_sha = osutils.sha_string('')
1770
self.build_tree_contents([('foo', b'')])
1771
expected_sha = osutils.sha_string(b'')
1768
1772
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1770
1774
def test_sha_mixed_endings(self):
1771
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1775
text = b'test\r\nwith\nall\rpossible line endings\r\n'
1772
1776
self.build_tree_contents([('foo', text)])
1773
1777
expected_sha = osutils.sha_string(text)
1774
1778
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1814
1818
expected_dirblocks = [
1816
[('0file', '0file', 'file'),
1817
('1dir', '1dir', 'directory'),
1818
('2file', '2file', 'file'),
1821
(('1dir', './1dir'),
1822
[('1dir/0file', '0file', 'file'),
1823
('1dir/1dir', '1dir', 'directory'),
1826
(('1dir/1dir', './1dir/1dir'),
1820
[(b'0file', b'0file', 'file', './0file'),
1821
(b'1dir', b'1dir', 'directory', './1dir'),
1822
(b'2file', b'2file', 'file', './2file'),
1825
((b'1dir', './1dir'),
1826
[(b'1dir/0file', b'0file', 'file', './1dir/0file'),
1827
(b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
1830
((b'1dir/1dir', './1dir/1dir'),
1831
1835
return tree, expected_dirblocks
1836
1840
result = list(osutils._walkdirs_utf8('.'))
1837
1841
# Filter out stat and abspath
1838
1842
self.assertEqual(expected_dirblocks,
1839
[(dirinfo, [line[0:3] for line in block])
1840
for dirinfo, block in result])
1843
self._filter_out(result))
1842
1845
def test_walk_sub_dir(self):
1843
1846
tree, expected_dirblocks = self._get_ascii_tree()
1844
1847
self.build_tree(tree)
1845
1848
# you can search a subdir only, with a supplied prefix.
1846
result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1849
result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
1847
1850
# Filter out stat and abspath
1848
1851
self.assertEqual(expected_dirblocks[1:],
1849
[(dirinfo, [line[0:3] for line in block])
1850
for dirinfo, block in result])
1852
self._filter_out(result))
1852
1854
def _get_unicode_tree(self):
1853
1855
name0u = u'0file-\xb6'
1864
1866
name1 = name1u.encode('UTF-8')
1865
1867
name2 = name2u.encode('UTF-8')
1866
1868
expected_dirblocks = [
1868
[(name0, name0, 'file', './' + name0u),
1869
(name1, name1, 'directory', './' + name1u),
1870
(name2, name2, 'file', './' + name2u),
1873
((name1, './' + name1u),
1874
[(name1 + '/' + name0, name0, 'file', './' + name1u
1876
(name1 + '/' + name1, name1, 'directory', './' + name1u
1880
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1870
[(name0, name0, 'file', './' + name0u),
1871
(name1, name1, 'directory', './' + name1u),
1872
(name2, name2, 'file', './' + name2u),
1875
((name1, './' + name1u),
1876
[(name1 + b'/' + name0, name0, 'file', './' + name1u
1878
(name1 + b'/' + name1, name1, 'directory', './' + name1u
1882
((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1885
1887
return tree, expected_dirblocks
1911
1914
target = u'target\N{Euro Sign}'
1912
1915
link_name = u'l\N{Euro Sign}nk'
1913
1916
os.symlink(target, link_name)
1914
target_utf8 = target.encode('UTF-8')
1915
1917
link_name_utf8 = link_name.encode('UTF-8')
1916
1918
expected_dirblocks = [
1918
[(link_name_utf8, link_name_utf8,
1919
'symlink', './' + link_name),],
1920
[(link_name_utf8, link_name_utf8,
1921
'symlink', './' + link_name), ],
1921
1923
result = list(osutils._walkdirs_utf8('.'))
1922
1924
self.assertEqual(expected_dirblocks, self._filter_out(result))
1940
1943
os.symlink(self.target, self.link)
1942
1945
def test_os_readlink_link_encoding(self):
1943
self.assertEqual(self.target, os.readlink(self.link))
1946
self.assertEqual(self.target, os.readlink(self.link))
1945
1948
def test_os_readlink_link_decoding(self):
1946
1949
self.assertEqual(self.target.encode(osutils._fs_enc),
1947
os.readlink(self.link.encode(osutils._fs_enc)))
1950
os.readlink(self.link.encode(osutils._fs_enc)))
1950
1953
class TestConcurrency(tests.TestCase):
1989
1992
def test_failure_to_load(self):
1990
1993
self._try_loading()
1991
1994
self.assertLength(1, osutils._extension_load_failures)
1992
self.assertEqual(osutils._extension_load_failures[0],
1993
"No module named _fictional_extension_py")
1996
osutils._extension_load_failures[0],
1997
"No module named 'breezy._fictional_extension_py'")
1995
1999
def test_report_extension_load_failures_no_warning(self):
1996
2000
self.assertTrue(self._try_loading())
1997
warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
2001
warnings, result = self.callCatchWarnings(
2002
osutils.report_extension_load_failures)
1998
2003
# it used to give a Python warning; it no longer does
1999
2004
self.assertLength(0, warnings)
2137
2142
def test_is_unicode(self):
2138
2143
self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
2139
2144
path = osutils.path_from_environ('BRZ_TEST_PATH')
2140
self.assertIsInstance(path, unicode)
2145
self.assertIsInstance(path, str)
2141
2146
self.assertEqual(u'./anywhere at all/', path)
2143
2148
def test_posix_path_env_ascii(self):
2144
2149
self.overrideEnv('BRZ_TEST_PATH', '/tmp')
2145
2150
home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
2146
self.assertIsInstance(home, unicode)
2151
self.assertIsInstance(home, str)
2147
2152
self.assertEqual(u'/tmp', home)
2149
2154
def test_posix_path_env_unicode(self):
2151
2156
self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
2152
2157
self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2153
2158
self.assertEqual(u'/home/\xa7test',
2154
osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2159
osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2155
2160
osutils._fs_enc = "iso8859-5"
2156
self.assertEqual(u'/home/\u0407test',
2157
osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2158
osutils._fs_enc = "utf-8"
2159
self.assertRaises(errors.BadFilenameEncoding,
2160
osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
2161
# In Python 3, os.environ returns unicode.
2162
self.assertEqual(u'/home/\xa7test',
2163
osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2163
2166
class TestGetHomeDir(tests.TestCase):
2165
2168
def test_is_unicode(self):
2166
2169
home = osutils._get_home_dir()
2167
self.assertIsInstance(home, unicode)
2170
self.assertIsInstance(home, str)
2169
2172
def test_posix_homeless(self):
2170
2173
self.overrideEnv('HOME', None)
2171
2174
home = osutils._get_home_dir()
2172
self.assertIsInstance(home, unicode)
2175
self.assertIsInstance(home, str)
2174
2177
def test_posix_home_ascii(self):
2175
2178
self.overrideEnv('HOME', '/home/test')
2176
2179
home = osutils._posix_get_home_dir()
2177
self.assertIsInstance(home, unicode)
2180
self.assertIsInstance(home, str)
2178
2181
self.assertEqual(u'/home/test', home)
2180
2183
def test_posix_home_unicode(self):
2183
2186
self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2184
2187
self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2185
2188
osutils._fs_enc = "iso8859-5"
2186
self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
2187
osutils._fs_enc = "utf-8"
2188
self.assertRaises(errors.BadFilenameEncoding,
2189
osutils._posix_get_home_dir)
2189
# In python 3, os.environ returns unicode
2190
self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2192
2193
class TestGetuserUnicode(tests.TestCase):
2194
2195
def test_is_unicode(self):
2195
2196
user = osutils.getuser_unicode()
2196
self.assertIsInstance(user, unicode)
2197
self.assertIsInstance(user, str)
2198
2199
def envvar_to_override(self):
2199
2200
if sys.platform == "win32":
2200
2201
# Disable use of platform calls on windows so envvar is used
2201
2202
self.overrideAttr(win32utils, 'has_ctypes', False)
2202
return 'USERNAME' # only variable used on windows
2203
return 'LOGNAME' # first variable checked by getpass.getuser()
2203
return 'USERNAME' # only variable used on windows
2204
return 'LOGNAME' # first variable checked by getpass.getuser()
2205
2206
def test_ascii_user(self):
2206
2207
self.overrideEnv(self.envvar_to_override(), 'jrandom')
2262
2264
self.assertTrue(
2263
2265
osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2264
2266
self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
2266
2268
def test_windows_app_path(self):
2267
2269
if sys.platform != 'win32':
2268
2270
raise tests.TestSkipped('test requires win32')
2269
2271
# Override PATH env var so that exe can only be found on App Path
2270
2272
self.overrideEnv('PATH', '')
2271
2273
# Internt Explorer is always registered in the App Path
2272
self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
2274
self.assertTrue(osutils.find_executable_on_path(
2275
'iexplore') is not None)
2274
2277
def test_other(self):
2275
2278
if sys.platform == 'win32':
2303
2306
import pywintypes
2304
2307
self.assertTrue(osutils.is_environment_error(
2305
2308
pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))
2311
class SupportsExecutableTests(tests.TestCaseInTempDir):
2313
def test_returns_bool(self):
2314
self.assertIsInstance(osutils.supports_executable(self.test_dir), bool)
2317
class SupportsSymlinksTests(tests.TestCaseInTempDir):
2319
def test_returns_bool(self):
2320
self.assertIsInstance(osutils.supports_symlinks(self.test_dir), bool)
2323
class MtabReader(tests.TestCaseInTempDir):
2325
def test_read_mtab(self):
2326
self.build_tree_contents([('mtab', """\
2327
/dev/mapper/blah--vg-root / ext4 rw,relatime,errors=remount-ro 0 0
2328
/dev/mapper/blah--vg-home /home vfat rw,relatime 0 0
2334
list(osutils.read_mtab('mtab')),
2336
(b'/home', 'vfat')])
2339
class GetFsTypeTests(tests.TestCaseInTempDir):
2341
def test_returns_string_or_none(self):
2342
ret = osutils.get_fs_type(self.test_dir)
2343
self.assertTrue(isinstance(ret, str) or ret is None)
2345
def test_returns_most_specific(self):
2347
osutils, '_FILESYSTEM_FINDER',
2348
osutils.FilesystemFinder(
2349
[(b'/', 'ext4'), (b'/home', 'vfat'),
2350
(b'/home/jelmer', 'ext2')]))
2351
self.assertEqual(osutils.get_fs_type(b'/home/jelmer/blah'), 'ext2')
2352
self.assertEqual(osutils.get_fs_type('/home/jelmer/blah'), 'ext2')
2353
self.assertEqual(osutils.get_fs_type(b'/home/jelmer'), 'ext2')
2354
self.assertEqual(osutils.get_fs_type(b'/home/martin'), 'vfat')
2355
self.assertEqual(osutils.get_fs_type(b'/home'), 'vfat')
2356
self.assertEqual(osutils.get_fs_type(b'/other'), 'ext4')
2358
def test_returns_none(self):
2360
osutils, '_FILESYSTEM_FINDER',
2361
osutils.FilesystemFinder([]))
2362
self.assertIs(osutils.get_fs_type('/home/jelmer/blah'), None)
2363
self.assertIs(osutils.get_fs_type(b'/home/jelmer/blah'), None)
2364
self.assertIs(osutils.get_fs_type('/home/jelmer'), None)