72
209
self.assertContainsRe(result, r'^[a-z0-9]{100}$')
75
class TestSafeUnicode(TestCase):
212
class TestIsInside(tests.TestCase):
214
def test_is_inside(self):
215
is_inside = osutils.is_inside
216
self.assertTrue(is_inside('src', 'src/foo.c'))
217
self.assertFalse(is_inside('src', 'srccontrol'))
218
self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
219
self.assertTrue(is_inside('foo.c', 'foo.c'))
220
self.assertFalse(is_inside('foo.c', ''))
221
self.assertTrue(is_inside('', 'foo.c'))
223
def test_is_inside_any(self):
224
SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
225
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
226
(['src'], SRC_FOO_C),
229
self.assertTrue(osutils.is_inside_any(dirs, fn))
230
for dirs, fn in [(['src'], 'srccontrol'),
231
(['src'], 'srccontrol/foo')]:
232
self.assertFalse(osutils.is_inside_any(dirs, fn))
234
def test_is_inside_or_parent_of_any(self):
235
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
236
(['src'], 'src/foo.c'),
237
(['src/bar.c'], 'src'),
238
(['src/bar.c', 'bla/foo.c'], 'src'),
241
self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
243
for dirs, fn in [(['src'], 'srccontrol'),
244
(['srccontrol/foo.c'], 'src'),
245
(['src'], 'srccontrol/foo')]:
246
self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
249
class TestLstat(tests.TestCaseInTempDir):
251
def test_lstat_matches_fstat(self):
252
# On Windows, lstat and fstat don't always agree, primarily in the
253
# 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
254
# custom implementation.
255
if sys.platform == 'win32':
256
# We only have special lstat/fstat if we have the extension.
257
# Without it, we may end up re-reading content when we don't have
258
# to, but otherwise it doesn't effect correctness.
259
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
260
with open('test-file.txt', 'wb') as f:
261
f.write(b'some content\n')
263
self.assertEqualStat(osutils.fstat(f.fileno()),
264
osutils.lstat('test-file.txt'))
267
class TestRmTree(tests.TestCaseInTempDir):
269
def test_rmtree(self):
270
# Check to remove tree with read-only files/dirs
272
with open('dir/file', 'w') as f:
274
# would like to also try making the directory readonly, but at the
275
# moment python shutil.rmtree doesn't handle that properly - it would
276
# need to chmod the directory before removing things inside it - deferred
277
# for now -- mbp 20060505
278
# osutils.make_readonly('dir')
279
osutils.make_readonly('dir/file')
281
osutils.rmtree('dir')
283
self.assertPathDoesNotExist('dir/file')
284
self.assertPathDoesNotExist('dir')
287
class TestDeleteAny(tests.TestCaseInTempDir):
289
def test_delete_any_readonly(self):
290
# from <https://bugs.launchpad.net/bzr/+bug/218206>
291
self.build_tree(['d/', 'f'])
292
osutils.make_readonly('d')
293
osutils.make_readonly('f')
295
osutils.delete_any('f')
296
osutils.delete_any('d')
299
class TestKind(tests.TestCaseInTempDir):
301
def test_file_kind(self):
302
self.build_tree(['file', 'dir/'])
303
self.assertEqual('file', osutils.file_kind('file'))
304
self.assertEqual('directory', osutils.file_kind('dir/'))
305
if osutils.has_symlinks():
306
os.symlink('symlink', 'symlink')
307
self.assertEqual('symlink', osutils.file_kind('symlink'))
309
# TODO: jam 20060529 Test a block device
311
os.lstat('/dev/null')
313
if e.errno not in (errno.ENOENT,):
318
osutils.file_kind(os.path.realpath('/dev/null')))
320
mkfifo = getattr(os, 'mkfifo', None)
324
self.assertEqual('fifo', osutils.file_kind('fifo'))
328
AF_UNIX = getattr(socket, 'AF_UNIX', None)
330
s = socket.socket(AF_UNIX)
333
self.assertEqual('socket', osutils.file_kind('socket'))
337
def test_kind_marker(self):
338
self.assertEqual("", osutils.kind_marker("file"))
339
self.assertEqual("/", osutils.kind_marker('directory'))
340
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
341
self.assertEqual("@", osutils.kind_marker("symlink"))
342
self.assertEqual("+", osutils.kind_marker("tree-reference"))
343
self.assertEqual("", osutils.kind_marker("fifo"))
344
self.assertEqual("", osutils.kind_marker("socket"))
345
self.assertEqual("", osutils.kind_marker("unknown"))
348
class TestUmask(tests.TestCaseInTempDir):
350
def test_get_umask(self):
351
if sys.platform == 'win32':
352
# umask always returns '0', no way to set it
353
self.assertEqual(0, osutils.get_umask())
356
orig_umask = osutils.get_umask()
357
self.addCleanup(os.umask, orig_umask)
359
self.assertEqual(0o222, osutils.get_umask())
361
self.assertEqual(0o022, osutils.get_umask())
363
self.assertEqual(0o002, osutils.get_umask())
365
self.assertEqual(0o027, osutils.get_umask())
368
class TestDateTime(tests.TestCase):
370
def assertFormatedDelta(self, expected, seconds):
371
"""Assert osutils.format_delta formats as expected"""
372
actual = osutils.format_delta(seconds)
373
self.assertEqual(expected, actual)
375
def test_format_delta(self):
376
self.assertFormatedDelta('0 seconds ago', 0)
377
self.assertFormatedDelta('1 second ago', 1)
378
self.assertFormatedDelta('10 seconds ago', 10)
379
self.assertFormatedDelta('59 seconds ago', 59)
380
self.assertFormatedDelta('89 seconds ago', 89)
381
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
382
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
383
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
384
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
385
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
386
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
387
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
388
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
389
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
390
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
391
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
392
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
393
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
394
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
395
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
396
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
397
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
398
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
400
# We handle when time steps the wrong direction because computers
401
# don't have synchronized clocks.
402
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
403
self.assertFormatedDelta('1 second in the future', -1)
404
self.assertFormatedDelta('2 seconds in the future', -2)
406
def test_format_date(self):
407
self.assertRaises(osutils.UnsupportedTimezoneFormat,
408
osutils.format_date, 0, timezone='foo')
409
self.assertIsInstance(osutils.format_date(0), str)
410
self.assertIsInstance(osutils.format_local_date(0), text_type)
411
# Testing for the actual value of the local weekday without
412
# duplicating the code from format_date is difficult.
413
# Instead blackbox.test_locale should check for localized
414
# dates once they do occur in output strings.
416
def test_format_date_with_offset_in_original_timezone(self):
417
self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
418
osutils.format_date_with_offset_in_original_timezone(0))
419
self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
420
osutils.format_date_with_offset_in_original_timezone(100000))
421
self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
422
osutils.format_date_with_offset_in_original_timezone(100000, 7200))
424
def test_local_time_offset(self):
425
"""Test that local_time_offset() returns a sane value."""
426
offset = osutils.local_time_offset()
427
self.assertTrue(isinstance(offset, int))
428
# Test that the offset is no more than a eighteen hours in
430
# Time zone handling is system specific, so it is difficult to
431
# do more specific tests, but a value outside of this range is
433
eighteen_hours = 18 * 3600
434
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
436
def test_local_time_offset_with_timestamp(self):
437
"""Test that local_time_offset() works with a timestamp."""
438
offset = osutils.local_time_offset(1000000000.1234567)
439
self.assertTrue(isinstance(offset, int))
440
eighteen_hours = 18 * 3600
441
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
444
class TestFdatasync(tests.TestCaseInTempDir):
446
def do_fdatasync(self):
447
f = tempfile.NamedTemporaryFile()
448
osutils.fdatasync(f.fileno())
452
def raise_eopnotsupp(*args, **kwargs):
453
raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
456
def raise_enotsup(*args, **kwargs):
457
raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
459
def test_fdatasync_handles_system_function(self):
460
self.overrideAttr(os, "fdatasync")
463
def test_fdatasync_handles_no_fdatasync_no_fsync(self):
464
self.overrideAttr(os, "fdatasync")
465
self.overrideAttr(os, "fsync")
468
def test_fdatasync_handles_no_EOPNOTSUPP(self):
469
self.overrideAttr(errno, "EOPNOTSUPP")
472
def test_fdatasync_catches_ENOTSUP(self):
473
enotsup = getattr(errno, "ENOTSUP", None)
475
raise tests.TestNotApplicable("No ENOTSUP on this platform")
476
self.overrideAttr(os, "fdatasync", self.raise_enotsup)
479
def test_fdatasync_catches_EOPNOTSUPP(self):
480
enotsup = getattr(errno, "EOPNOTSUPP", None)
482
raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
483
self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
487
class TestLinks(tests.TestCaseInTempDir):
489
def test_dereference_path(self):
490
self.requireFeature(features.SymlinkFeature)
491
cwd = osutils.realpath('.')
493
bar_path = osutils.pathjoin(cwd, 'bar')
494
# Using './' to avoid bug #1213894 (first path component not
495
# dereferenced) in Python 2.4.1 and earlier
496
self.assertEqual(bar_path, osutils.realpath('./bar'))
497
os.symlink('bar', 'foo')
498
self.assertEqual(bar_path, osutils.realpath('./foo'))
500
# Does not dereference terminal symlinks
501
foo_path = osutils.pathjoin(cwd, 'foo')
502
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
504
# Dereferences parent symlinks
506
baz_path = osutils.pathjoin(bar_path, 'baz')
507
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
509
# Dereferences parent symlinks that are the first path element
510
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
512
# Dereferences parent symlinks in absolute paths
513
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
514
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
516
def test_changing_access(self):
517
with open('file', 'w') as f:
520
# Make a file readonly
521
osutils.make_readonly('file')
522
mode = os.lstat('file').st_mode
523
self.assertEqual(mode, mode & 0o777555)
525
# Make a file writable
526
osutils.make_writable('file')
527
mode = os.lstat('file').st_mode
528
self.assertEqual(mode, mode | 0o200)
530
if osutils.has_symlinks():
531
# should not error when handed a symlink
532
os.symlink('nonexistent', 'dangling')
533
osutils.make_readonly('dangling')
534
osutils.make_writable('dangling')
536
def test_host_os_dereferences_symlinks(self):
537
osutils.host_os_dereferences_symlinks()
540
class TestCanonicalRelPath(tests.TestCaseInTempDir):
542
_test_needs_features = [features.CaseInsCasePresFilenameFeature]
544
def test_canonical_relpath_simple(self):
545
f = open('MixedCaseName', 'w')
547
actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
548
self.assertEqual('work/MixedCaseName', actual)
550
def test_canonical_relpath_missing_tail(self):
551
os.mkdir('MixedCaseParent')
552
actual = osutils.canonical_relpath(self.test_base_dir,
553
'mixedcaseparent/nochild')
554
self.assertEqual('work/MixedCaseParent/nochild', actual)
557
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
559
def assertRelpath(self, expected, base, path):
560
actual = osutils._cicp_canonical_relpath(base, path)
561
self.assertEqual(expected, actual)
563
def test_simple(self):
564
self.build_tree(['MixedCaseName'])
565
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
566
self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
568
def test_subdir_missing_tail(self):
569
self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
570
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
571
self.assertRelpath('MixedCaseParent/a_child', base,
572
'MixedCaseParent/a_child')
573
self.assertRelpath('MixedCaseParent/a_child', base,
574
'MixedCaseParent/A_Child')
575
self.assertRelpath('MixedCaseParent/not_child', base,
576
'MixedCaseParent/not_child')
578
def test_at_root_slash(self):
579
# We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
581
if osutils.MIN_ABS_PATHLENGTH > 1:
582
raise tests.TestSkipped('relpath requires %d chars'
583
% osutils.MIN_ABS_PATHLENGTH)
584
self.assertRelpath('foo', '/', '/foo')
586
def test_at_root_drive(self):
587
if sys.platform != 'win32':
588
raise tests.TestNotApplicable('we can only test drive-letter relative'
589
' paths on Windows where we have drive'
592
# The specific issue is that when at the root of a drive, 'abspath'
593
# returns "C:/" or just "/". However, the code assumes that abspath
594
# always returns something like "C:/foo" or "/foo" (no trailing slash).
595
self.assertRelpath('foo', 'C:/', 'C:/foo')
596
self.assertRelpath('foo', 'X:/', 'X:/foo')
597
self.assertRelpath('foo', 'X:/', 'X://foo')
600
class TestPumpFile(tests.TestCase):
601
"""Test pumpfile method."""
604
super(TestPumpFile, self).setUp()
605
# create a test datablock
606
self.block_size = 512
607
pattern = b'0123456789ABCDEF'
608
self.test_data = pattern * (3 * self.block_size // len(pattern))
609
self.test_data_len = len(self.test_data)
611
def test_bracket_block_size(self):
612
"""Read data in blocks with the requested read size bracketing the
614
# make sure test data is larger than max read size
615
self.assertTrue(self.test_data_len > self.block_size)
617
from_file = file_utils.FakeReadFile(self.test_data)
620
# read (max // 2) bytes and verify read size wasn't affected
621
num_bytes_to_read = self.block_size // 2
622
osutils.pumpfile(from_file, to_file,
623
num_bytes_to_read, self.block_size)
624
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
625
self.assertEqual(from_file.get_read_count(), 1)
627
# read (max) bytes and verify read size wasn't affected
628
num_bytes_to_read = self.block_size
629
from_file.reset_read_count()
630
osutils.pumpfile(from_file, to_file,
631
num_bytes_to_read, self.block_size)
632
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
633
self.assertEqual(from_file.get_read_count(), 1)
635
# read (max + 1) bytes and verify read size was limited
636
num_bytes_to_read = self.block_size + 1
637
from_file.reset_read_count()
638
osutils.pumpfile(from_file, to_file,
639
num_bytes_to_read, self.block_size)
640
self.assertEqual(from_file.get_max_read_size(), self.block_size)
641
self.assertEqual(from_file.get_read_count(), 2)
643
# finish reading the rest of the data
644
num_bytes_to_read = self.test_data_len - to_file.tell()
645
osutils.pumpfile(from_file, to_file,
646
num_bytes_to_read, self.block_size)
648
# report error if the data wasn't equal (we only report the size due
649
# to the length of the data)
650
response_data = to_file.getvalue()
651
if response_data != self.test_data:
652
message = "Data not equal. Expected %d bytes, received %d."
653
self.fail(message % (len(response_data), self.test_data_len))
655
def test_specified_size(self):
656
"""Request a transfer larger than the maximum block size and verify
657
that the maximum read doesn't exceed the block_size."""
658
# make sure test data is larger than max read size
659
self.assertTrue(self.test_data_len > self.block_size)
661
# retrieve data in blocks
662
from_file = file_utils.FakeReadFile(self.test_data)
664
osutils.pumpfile(from_file, to_file, self.test_data_len,
667
# verify read size was equal to the maximum read size
668
self.assertTrue(from_file.get_max_read_size() > 0)
669
self.assertEqual(from_file.get_max_read_size(), self.block_size)
670
self.assertEqual(from_file.get_read_count(), 3)
672
# report error if the data wasn't equal (we only report the size due
673
# to the length of the data)
674
response_data = to_file.getvalue()
675
if response_data != self.test_data:
676
message = "Data not equal. Expected %d bytes, received %d."
677
self.fail(message % (len(response_data), self.test_data_len))
679
def test_to_eof(self):
680
"""Read to end-of-file and verify that the reads are not larger than
681
the maximum read size."""
682
# make sure test data is larger than max read size
683
self.assertTrue(self.test_data_len > self.block_size)
685
# retrieve data to EOF
686
from_file = file_utils.FakeReadFile(self.test_data)
688
osutils.pumpfile(from_file, to_file, -1, self.block_size)
690
# verify read size was equal to the maximum read size
691
self.assertEqual(from_file.get_max_read_size(), self.block_size)
692
self.assertEqual(from_file.get_read_count(), 4)
694
# report error if the data wasn't equal (we only report the size due
695
# to the length of the data)
696
response_data = to_file.getvalue()
697
if response_data != self.test_data:
698
message = "Data not equal. Expected %d bytes, received %d."
699
self.fail(message % (len(response_data), self.test_data_len))
701
def test_defaults(self):
702
"""Verifies that the default arguments will read to EOF -- this
703
test verifies that any existing usages of pumpfile will not be broken
704
with this new version."""
705
# retrieve data using default (old) pumpfile method
706
from_file = file_utils.FakeReadFile(self.test_data)
708
osutils.pumpfile(from_file, to_file)
710
# report error if the data wasn't equal (we only report the size due
711
# to the length of the data)
712
response_data = to_file.getvalue()
713
if response_data != self.test_data:
714
message = "Data not equal. Expected %d bytes, received %d."
715
self.fail(message % (len(response_data), self.test_data_len))
717
def test_report_activity(self):
720
def log_activity(length, direction):
721
activity.append((length, direction))
722
from_file = BytesIO(self.test_data)
724
osutils.pumpfile(from_file, to_file, buff_size=500,
725
report_activity=log_activity, direction='read')
726
self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
727
(36, 'read')], activity)
729
from_file = BytesIO(self.test_data)
732
osutils.pumpfile(from_file, to_file, buff_size=500,
733
report_activity=log_activity, direction='write')
734
self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
735
(36, 'write')], activity)
737
# And with a limited amount of data
738
from_file = BytesIO(self.test_data)
741
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
742
report_activity=log_activity, direction='read')
744
[(500, 'read'), (500, 'read'), (28, 'read')], activity)
747
class TestPumpStringFile(tests.TestCase):
749
def test_empty(self):
751
osutils.pump_string_file(b"", output)
752
self.assertEqual(b"", output.getvalue())
754
def test_more_than_segment_size(self):
756
osutils.pump_string_file(b"123456789", output, 2)
757
self.assertEqual(b"123456789", output.getvalue())
759
def test_segment_size(self):
761
osutils.pump_string_file(b"12", output, 2)
762
self.assertEqual(b"12", output.getvalue())
764
def test_segment_size_multiple(self):
766
osutils.pump_string_file(b"1234", output, 2)
767
self.assertEqual(b"1234", output.getvalue())
770
class TestRelpath(tests.TestCase):
772
def test_simple_relpath(self):
773
cwd = osutils.getcwd()
774
subdir = cwd + '/subdir'
775
self.assertEqual('subdir', osutils.relpath(cwd, subdir))
777
def test_deep_relpath(self):
778
cwd = osutils.getcwd()
779
subdir = cwd + '/sub/subsubdir'
780
self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
782
def test_not_relative(self):
783
self.assertRaises(errors.PathNotChild,
784
osutils.relpath, 'C:/path', 'H:/path')
785
self.assertRaises(errors.PathNotChild,
786
osutils.relpath, 'C:/', 'H:/path')
789
class TestSafeUnicode(tests.TestCase):
77
791
def test_from_ascii_string(self):
78
self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
792
self.assertEqual(u'foobar', osutils.safe_unicode(b'foobar'))
80
794
def test_from_unicode_string_ascii_contents(self):
81
795
self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
84
798
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
86
800
def test_from_utf8_string(self):
87
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
801
self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
89
803
def test_bad_utf8_string(self):
90
self.assertRaises(BzrBadParameterNotUnicode,
804
self.assertRaises(errors.BzrBadParameterNotUnicode,
91
805
osutils.safe_unicode,
809
class TestSafeUtf8(tests.TestCase):
811
def test_from_ascii_string(self):
813
self.assertEqual(b'foobar', osutils.safe_utf8(f))
815
def test_from_unicode_string_ascii_contents(self):
816
self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
818
def test_from_unicode_string_unicode_contents(self):
819
self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
821
def test_from_utf8_string(self):
822
self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
824
def test_bad_utf8_string(self):
825
self.assertRaises(errors.BzrBadParameterNotUnicode,
826
osutils.safe_utf8, b'\xbb\xbb')
829
class TestSafeRevisionId(tests.TestCase):
831
def test_from_ascii_string(self):
832
# this shouldn't give a warning because it's getting an ascii string
833
self.assertEqual(b'foobar', osutils.safe_revision_id(b'foobar'))
835
def test_from_unicode_string_ascii_contents(self):
836
self.assertRaises(TypeError,
837
osutils.safe_revision_id, u'bargam')
839
def test_from_unicode_string_unicode_contents(self):
840
self.assertRaises(TypeError,
841
osutils.safe_revision_id, u'bargam\xae')
843
def test_from_utf8_string(self):
844
self.assertEqual(b'foo\xc2\xae',
845
osutils.safe_revision_id(b'foo\xc2\xae'))
848
"""Currently, None is a valid revision_id"""
849
self.assertEqual(None, osutils.safe_revision_id(None))
852
class TestSafeFileId(tests.TestCase):
854
def test_from_ascii_string(self):
855
self.assertEqual(b'foobar', osutils.safe_file_id(b'foobar'))
857
def test_from_unicode_string_ascii_contents(self):
858
self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
860
def test_from_unicode_string_unicode_contents(self):
861
self.assertRaises(TypeError,
862
osutils.safe_file_id, u'bargam\xae')
864
def test_from_utf8_string(self):
865
self.assertEqual(b'foo\xc2\xae',
866
osutils.safe_file_id(b'foo\xc2\xae'))
869
"""Currently, None is a valid revision_id"""
870
self.assertEqual(None, osutils.safe_file_id(None))
873
class TestSendAll(tests.TestCase):
875
def test_send_with_disconnected_socket(self):
876
class DisconnectedSocket(object):
877
def __init__(self, err):
880
def send(self, content):
885
# All of these should be treated as ConnectionReset
887
for err_cls in (IOError, socket.error):
888
for errnum in osutils._end_of_stream_errors:
889
errs.append(err_cls(errnum))
891
sock = DisconnectedSocket(err)
892
self.assertRaises(errors.ConnectionReset,
893
osutils.send_all, sock, b'some more content')
895
def test_send_with_no_progress(self):
896
# See https://bugs.launchpad.net/bzr/+bug/1047309
897
# It seems that paramiko can get into a state where it doesn't error,
898
# but it returns 0 bytes sent for requests over and over again.
899
class NoSendingSocket(object):
903
def send(self, bytes):
905
if self.call_count > 100:
906
# Prevent the test suite from hanging
907
raise RuntimeError('too many calls')
909
sock = NoSendingSocket()
910
self.assertRaises(errors.ConnectionReset,
911
osutils.send_all, sock, b'content')
912
self.assertEqual(1, sock.call_count)
915
class TestPosixFuncs(tests.TestCase):
916
"""Test that the posix version of normpath returns an appropriate path
917
when used with 2 leading slashes."""
919
def test_normpath(self):
920
self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
922
'/etc/shadow', osutils._posix_normpath('//etc/shadow'))
924
'/etc/shadow', osutils._posix_normpath('///etc/shadow'))
927
class TestWin32Funcs(tests.TestCase):
928
"""Test that _win32 versions of os utilities return appropriate paths."""
930
def test_abspath(self):
931
self.requireFeature(features.win32_feature)
932
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
933
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
934
self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
935
self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
937
def test_realpath(self):
938
self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
939
self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
941
def test_pathjoin(self):
942
self.assertEqual('path/to/foo',
943
osutils._win32_pathjoin('path', 'to', 'foo'))
944
self.assertEqual('C:/foo',
945
osutils._win32_pathjoin('path\\to', 'C:\\foo'))
946
self.assertEqual('C:/foo',
947
osutils._win32_pathjoin('path/to', 'C:/foo'))
948
self.assertEqual('path/to/foo',
949
osutils._win32_pathjoin('path/to/', 'foo'))
951
def test_pathjoin_late_bugfix(self):
952
if sys.version_info < (2, 7, 6):
956
self.assertEqual(expected,
957
osutils._win32_pathjoin('C:/path/to/', '/foo'))
958
self.assertEqual(expected,
959
osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
961
def test_normpath(self):
962
self.assertEqual('path/to/foo',
963
osutils._win32_normpath(r'path\\from\..\to\.\foo'))
964
self.assertEqual('path/to/foo',
965
osutils._win32_normpath('path//from/../to/./foo'))
967
def test_getcwd(self):
968
cwd = osutils._win32_getcwd()
969
os_cwd = osutils._getcwd()
970
self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
971
# win32 is inconsistent whether it returns lower or upper case
972
# and even if it was consistent the user might type the other
973
# so we force it to uppercase
974
# running python.exe under cmd.exe return capital C:\\
975
# running win32 python inside a cygwin shell returns lowercase
976
self.assertEqual(os_cwd[0].upper(), cwd[0])
978
def test_fixdrive(self):
979
self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
980
self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
981
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
984
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
985
"""Test win32 functions that create files."""
987
def test_getcwd(self):
988
self.requireFeature(features.UnicodeFilenameFeature)
991
# TODO: jam 20060427 This will probably fail on Mac OSX because
992
# it will change the normalization of B\xe5gfors
993
# Consider using a different unicode character, or make
994
# osutils.getcwd() renormalize the path.
995
self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
997
def test_minimum_path_selection(self):
998
self.assertEqual(set(),
999
osutils.minimum_path_selection([]))
1000
self.assertEqual({'a'},
1001
osutils.minimum_path_selection(['a']))
1002
self.assertEqual({'a', 'b'},
1003
osutils.minimum_path_selection(['a', 'b']))
1004
self.assertEqual({'a/', 'b'},
1005
osutils.minimum_path_selection(['a/', 'b']))
1006
self.assertEqual({'a/', 'b'},
1007
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
1008
self.assertEqual({'a-b', 'a', 'a0b'},
1009
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
1011
def test_mkdtemp(self):
1012
tmpdir = osutils._win32_mkdtemp(dir='.')
1013
self.assertFalse('\\' in tmpdir)
1015
def test_rename(self):
1016
with open('a', 'wb') as a:
1018
with open('b', 'wb') as b:
1021
osutils._win32_rename('b', 'a')
1022
self.assertPathExists('a')
1023
self.assertPathDoesNotExist('b')
1024
self.assertFileEqual(b'baz\n', 'a')
1026
def test_rename_missing_file(self):
1027
with open('a', 'wb') as a:
1031
osutils._win32_rename('b', 'a')
1032
except (IOError, OSError) as e:
1033
self.assertEqual(errno.ENOENT, e.errno)
1034
self.assertFileEqual(b'foo\n', 'a')
1036
def test_rename_missing_dir(self):
1039
osutils._win32_rename('b', 'a')
1040
except (IOError, OSError) as e:
1041
self.assertEqual(errno.ENOENT, e.errno)
1043
def test_rename_current_dir(self):
1046
# You can't rename the working directory
1047
# doing rename non-existant . usually
1048
# just raises ENOENT, since non-existant
1051
osutils._win32_rename('b', '.')
1052
except (IOError, OSError) as e:
1053
self.assertEqual(errno.ENOENT, e.errno)
1055
def test_splitpath(self):
1056
def check(expected, path):
1057
self.assertEqual(expected, osutils.splitpath(path))
1060
check(['a', 'b'], 'a/b')
1061
check(['a', 'b'], 'a/./b')
1062
check(['a', '.b'], 'a/.b')
1063
if os.path.sep == '\\':
1064
check(['a', '.b'], 'a\\.b')
1066
check(['a\\.b'], 'a\\.b')
1068
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
1071
class TestParentDirectories(tests.TestCaseInTempDir):
1072
"""Test osutils.parent_directories()"""
1074
def test_parent_directories(self):
1075
self.assertEqual([], osutils.parent_directories('a'))
1076
self.assertEqual(['a'], osutils.parent_directories('a/b'))
1077
self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
1080
class TestMacFuncsDirs(tests.TestCaseInTempDir):
1081
"""Test mac special functions that require directories."""
1083
def test_getcwd(self):
1084
self.requireFeature(features.UnicodeFilenameFeature)
1085
os.mkdir(u'B\xe5gfors')
1086
os.chdir(u'B\xe5gfors')
1087
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
1089
def test_getcwd_nonnorm(self):
1090
self.requireFeature(features.UnicodeFilenameFeature)
1091
# Test that _mac_getcwd() will normalize this path
1092
os.mkdir(u'Ba\u030agfors')
1093
os.chdir(u'Ba\u030agfors')
1094
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
1097
class TestChunksToLines(tests.TestCase):
1099
def test_smoketest(self):
1100
self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1101
osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
1102
self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1103
osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
1105
def test_osutils_binding(self):
1106
from . import test__chunks_to_lines
1107
if test__chunks_to_lines.compiled_chunkstolines_feature.available():
1108
from .._chunks_to_lines_pyx import chunks_to_lines
1110
from .._chunks_to_lines_py import chunks_to_lines
1111
self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1114
class TestSplitLines(tests.TestCase):
1116
def test_split_unicode(self):
1117
self.assertEqual([u'foo\n', u'bar\xae'],
1118
osutils.split_lines(u'foo\nbar\xae'))
1119
self.assertEqual([u'foo\n', u'bar\xae\n'],
1120
osutils.split_lines(u'foo\nbar\xae\n'))
1122
def test_split_with_carriage_returns(self):
1123
self.assertEqual([b'foo\rbar\n'],
1124
osutils.split_lines(b'foo\rbar\n'))
1127
class TestWalkDirs(tests.TestCaseInTempDir):
1129
def assertExpectedBlocks(self, expected, result):
1130
self.assertEqual(expected,
1131
[(dirinfo, [line[0:3] for line in block])
1132
for dirinfo, block in result])
1134
def test_walkdirs(self):
1143
self.build_tree(tree)
1144
expected_dirblocks = [
1146
[('0file', '0file', 'file'),
1147
('1dir', '1dir', 'directory'),
1148
('2file', '2file', 'file'),
1151
(('1dir', './1dir'),
1152
[('1dir/0file', '0file', 'file'),
1153
('1dir/1dir', '1dir', 'directory'),
1156
(('1dir/1dir', './1dir/1dir'),
1162
found_bzrdir = False
1163
for dirdetail, dirblock in osutils.walkdirs('.'):
1164
if len(dirblock) and dirblock[0][1] == '.bzr':
1165
# this tests the filtering of selected paths
1168
result.append((dirdetail, dirblock))
1170
self.assertTrue(found_bzrdir)
1171
self.assertExpectedBlocks(expected_dirblocks, result)
1172
# you can search a subdir only, with a supplied prefix.
1174
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1175
result.append(dirblock)
1176
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1178
def test_walkdirs_os_error(self):
1179
# <https://bugs.launchpad.net/bzr/+bug/338653>
1180
# Pyrex readdir didn't raise useful messages if it had an error
1181
# reading the directory
1182
if sys.platform == 'win32':
1183
raise tests.TestNotApplicable(
1184
"readdir IOError not tested on win32")
1185
self.requireFeature(features.not_running_as_root)
1186
os.mkdir("test-unreadable")
1187
os.chmod("test-unreadable", 0000)
1188
# must chmod it back so that it can be removed
1189
self.addCleanup(os.chmod, "test-unreadable", 0o700)
1190
# The error is not raised until the generator is actually evaluated.
1191
# (It would be ok if it happened earlier but at the moment it
1193
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1194
self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
1195
self.assertEqual(errno.EACCES, e.errno)
1196
# Ensure the message contains the file name
1197
self.assertContainsRe(str(e), "\\./test-unreadable")
1199
def test_walkdirs_encoding_error(self):
1200
# <https://bugs.launchpad.net/bzr/+bug/488519>
1201
# walkdirs didn't raise a useful message when the filenames
1202
# are not using the filesystem's encoding
1204
# require a bytestring based filesystem
1205
self.requireFeature(features.ByteStringNamedFilesystem)
1216
self.build_tree(tree)
1218
# rename the 1file to a latin-1 filename
1219
os.rename(b"./1file", b"\xe8file")
1220
if b"\xe8file" not in os.listdir("."):
1221
self.skipTest("Lack filesystem that preserves arbitrary bytes")
1223
self._save_platform_info()
1224
osutils._fs_enc = 'UTF-8'
1226
# this should raise on error
1228
for dirdetail, dirblock in osutils.walkdirs(b'.'):
1231
self.assertRaises(errors.BadFilenameEncoding, attempt)
1233
def test__walkdirs_utf8(self):
1242
self.build_tree(tree)
1243
expected_dirblocks = [
1245
[('0file', '0file', 'file'),
1246
('1dir', '1dir', 'directory'),
1247
('2file', '2file', 'file'),
1250
(('1dir', './1dir'),
1251
[('1dir/0file', '0file', 'file'),
1252
('1dir/1dir', '1dir', 'directory'),
1255
(('1dir/1dir', './1dir/1dir'),
1261
found_bzrdir = False
1262
for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
1263
if len(dirblock) and dirblock[0][1] == b'.bzr':
1264
# this tests the filtering of selected paths
1267
dirdetail = (dirdetail[0].decode('utf-8'),
1268
osutils.safe_unicode(dirdetail[1]))
1270
(entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
1271
for entry in dirblock]
1272
result.append((dirdetail, dirblock))
1274
self.assertTrue(found_bzrdir)
1275
self.assertExpectedBlocks(expected_dirblocks, result)
1277
# you can search a subdir only, with a supplied prefix.
1279
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1280
result.append(dirblock)
1281
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1283
def _filter_out_stat(self, result):
1284
"""Filter out the stat value from the walkdirs result"""
1285
for dirdetail, dirblock in result:
1287
for info in dirblock:
1288
# Ignore info[3] which is the stat
1289
new_dirblock.append((info[0], info[1], info[2], info[4]))
1290
dirblock[:] = new_dirblock
1292
def _save_platform_info(self):
1293
self.overrideAttr(osutils, '_fs_enc')
1294
self.overrideAttr(osutils, '_selected_dir_reader')
1296
def assertDirReaderIs(self, expected, top):
1297
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1298
# Force it to redetect
1299
osutils._selected_dir_reader = None
1300
# Nothing to list, but should still trigger the selection logic
1301
self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
1302
self.assertIsInstance(osutils._selected_dir_reader, expected)
1304
def test_force_walkdirs_utf8_fs_utf8(self):
1305
self.requireFeature(UTF8DirReaderFeature)
1306
self._save_platform_info()
1307
osutils._fs_enc = 'utf-8'
1308
self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
1310
def test_force_walkdirs_utf8_fs_ascii(self):
1311
self.requireFeature(UTF8DirReaderFeature)
1312
self._save_platform_info()
1313
osutils._fs_enc = 'ascii'
1314
self.assertDirReaderIs(
1315
UTF8DirReaderFeature.module.UTF8DirReader, b".")
1317
def test_force_walkdirs_utf8_fs_latin1(self):
1318
self._save_platform_info()
1319
osutils._fs_enc = 'iso-8859-1'
1320
self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
1322
def test_force_walkdirs_utf8_nt(self):
1323
# Disabled because the thunk of the whole walkdirs api is disabled.
1324
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1325
self._save_platform_info()
1326
from .._walkdirs_win32 import Win32ReadDir
1327
self.assertDirReaderIs(Win32ReadDir, ".")
1329
def test_unicode_walkdirs(self):
1330
"""Walkdirs should always return unicode paths."""
1331
self.requireFeature(features.UnicodeFilenameFeature)
1332
name0 = u'0file-\xb6'
1333
name1 = u'1dir-\u062c\u0648'
1334
name2 = u'2file-\u0633'
1338
name1 + '/' + name0,
1339
name1 + '/' + name1 + '/',
1342
self.build_tree(tree)
1343
expected_dirblocks = [
1345
[(name0, name0, 'file', './' + name0),
1346
(name1, name1, 'directory', './' + name1),
1347
(name2, name2, 'file', './' + name2),
1350
((name1, './' + name1),
1351
[(name1 + '/' + name0, name0, 'file', './' + name1
1353
(name1 + '/' + name1, name1, 'directory', './' + name1
1357
((name1 + '/' + name1, './' + name1 + '/' + name1),
1362
result = list(osutils.walkdirs('.'))
1363
self._filter_out_stat(result)
1364
self.assertEqual(expected_dirblocks, result)
1365
result = list(osutils.walkdirs(u'./' + name1, name1))
1366
self._filter_out_stat(result)
1367
self.assertEqual(expected_dirblocks[1:], result)
1369
def test_unicode__walkdirs_utf8(self):
1370
"""Walkdirs_utf8 should always return utf8 paths.
1372
The abspath portion might be in unicode or utf-8
1374
self.requireFeature(features.UnicodeFilenameFeature)
1375
name0 = u'0file-\xb6'
1376
name1 = u'1dir-\u062c\u0648'
1377
name2 = u'2file-\u0633'
1381
name1 + '/' + name0,
1382
name1 + '/' + name1 + '/',
1385
self.build_tree(tree)
1386
name0 = name0.encode('utf8')
1387
name1 = name1.encode('utf8')
1388
name2 = name2.encode('utf8')
1390
expected_dirblocks = [
1392
[(name0, name0, 'file', b'./' + name0),
1393
(name1, name1, 'directory', b'./' + name1),
1394
(name2, name2, 'file', b'./' + name2),
1397
((name1, b'./' + name1),
1398
[(name1 + b'/' + name0, name0, 'file', b'./' + name1
1400
(name1 + b'/' + name1, name1, 'directory', b'./' + name1
1404
((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
1410
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1411
# all abspaths are Unicode, and encode them back into utf8.
1412
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1413
self.assertIsInstance(dirdetail[0], bytes)
1414
if isinstance(dirdetail[1], text_type):
1415
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1416
dirblock = [list(info) for info in dirblock]
1417
for info in dirblock:
1418
self.assertIsInstance(info[4], text_type)
1419
info[4] = info[4].encode('utf8')
1421
for info in dirblock:
1422
self.assertIsInstance(info[0], bytes)
1423
self.assertIsInstance(info[1], bytes)
1424
self.assertIsInstance(info[4], bytes)
1425
# Remove the stat information
1426
new_dirblock.append((info[0], info[1], info[2], info[4]))
1427
result.append((dirdetail, new_dirblock))
1428
self.assertEqual(expected_dirblocks, result)
1430
def test__walkdirs_utf8_with_unicode_fs(self):
1431
"""UnicodeDirReader should be a safe fallback everywhere
1433
The abspath portion should be in unicode
1435
self.requireFeature(features.UnicodeFilenameFeature)
1436
# Use the unicode reader. TODO: split into driver-and-driven unit
1438
self._save_platform_info()
1439
osutils._selected_dir_reader = osutils.UnicodeDirReader()
1440
name0u = u'0file-\xb6'
1441
name1u = u'1dir-\u062c\u0648'
1442
name2u = u'2file-\u0633'
1446
name1u + '/' + name0u,
1447
name1u + '/' + name1u + '/',
1450
self.build_tree(tree)
1451
name0 = name0u.encode('utf8')
1452
name1 = name1u.encode('utf8')
1453
name2 = name2u.encode('utf8')
1455
# All of the abspaths should be in unicode, all of the relative paths
1457
expected_dirblocks = [
1459
[(name0, name0, 'file', './' + name0u),
1460
(name1, name1, 'directory', './' + name1u),
1461
(name2, name2, 'file', './' + name2u),
1464
((name1, './' + name1u),
1465
[(name1 + b'/' + name0, name0, 'file', './' + name1u
1467
(name1 + b'/' + name1, name1, 'directory', './' + name1u
1471
((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1476
result = list(osutils._walkdirs_utf8('.'))
1477
self._filter_out_stat(result)
1478
self.assertEqual(expected_dirblocks, result)
1480
def test__walkdirs_utf8_win32readdir(self):
1481
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1482
self.requireFeature(features.UnicodeFilenameFeature)
1483
from .._walkdirs_win32 import Win32ReadDir
1484
self._save_platform_info()
1485
osutils._selected_dir_reader = Win32ReadDir()
1486
name0u = u'0file-\xb6'
1487
name1u = u'1dir-\u062c\u0648'
1488
name2u = u'2file-\u0633'
1492
name1u + '/' + name0u,
1493
name1u + '/' + name1u + '/',
1496
self.build_tree(tree)
1497
name0 = name0u.encode('utf8')
1498
name1 = name1u.encode('utf8')
1499
name2 = name2u.encode('utf8')
1501
# All of the abspaths should be in unicode, all of the relative paths
1503
expected_dirblocks = [
1505
[(name0, name0, 'file', './' + name0u),
1506
(name1, name1, 'directory', './' + name1u),
1507
(name2, name2, 'file', './' + name2u),
1510
((name1, './' + name1u),
1511
[(name1 + '/' + name0, name0, 'file', './' + name1u
1513
(name1 + '/' + name1, name1, 'directory', './' + name1u
1517
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1522
result = list(osutils._walkdirs_utf8(u'.'))
1523
self._filter_out_stat(result)
1524
self.assertEqual(expected_dirblocks, result)
1526
def assertStatIsCorrect(self, path, win32stat):
1527
os_stat = os.stat(path)
1528
self.assertEqual(os_stat.st_size, win32stat.st_size)
1529
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1530
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1531
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1532
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1533
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1534
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1536
def test__walkdirs_utf_win32_find_file_stat_file(self):
1537
"""make sure our Stat values are valid"""
1538
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1539
self.requireFeature(features.UnicodeFilenameFeature)
1540
from .._walkdirs_win32 import Win32ReadDir
1541
name0u = u'0file-\xb6'
1542
name0 = name0u.encode('utf8')
1543
self.build_tree([name0u])
1544
# I hate to sleep() here, but I'm trying to make the ctime different
1547
with open(name0u, 'ab') as f:
1548
f.write(b'just a small update')
1550
result = Win32ReadDir().read_dir('', u'.')
1552
self.assertEqual((name0, name0, 'file'), entry[:3])
1553
self.assertEqual(u'./' + name0u, entry[4])
1554
self.assertStatIsCorrect(entry[4], entry[3])
1555
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1557
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1558
"""make sure our Stat values are valid"""
1559
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1560
self.requireFeature(features.UnicodeFilenameFeature)
1561
from .._walkdirs_win32 import Win32ReadDir
1562
name0u = u'0dir-\u062c\u0648'
1563
name0 = name0u.encode('utf8')
1564
self.build_tree([name0u + '/'])
1566
result = Win32ReadDir().read_dir('', u'.')
1568
self.assertEqual((name0, name0, 'directory'), entry[:3])
1569
self.assertEqual(u'./' + name0u, entry[4])
1570
self.assertStatIsCorrect(entry[4], entry[3])
1572
def assertPathCompare(self, path_less, path_greater):
1573
"""check that path_less and path_greater compare correctly."""
1574
self.assertEqual(0, osutils.compare_paths_prefix_order(
1575
path_less, path_less))
1576
self.assertEqual(0, osutils.compare_paths_prefix_order(
1577
path_greater, path_greater))
1578
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1579
path_less, path_greater))
1580
self.assertEqual(1, osutils.compare_paths_prefix_order(
1581
path_greater, path_less))
1583
def test_compare_paths_prefix_order(self):
1584
# root before all else
1585
self.assertPathCompare("/", "/a")
1586
# alpha within a dir
1587
self.assertPathCompare("/a", "/b")
1588
self.assertPathCompare("/b", "/z")
1589
# high dirs before lower.
1590
self.assertPathCompare("/z", "/a/a")
1591
# except if the deeper dir should be output first
1592
self.assertPathCompare("/a/b/c", "/d/g")
1593
# lexical betwen dirs of the same height
1594
self.assertPathCompare("/a/z", "/z/z")
1595
self.assertPathCompare("/a/c/z", "/a/d/e")
1597
# this should also be consistent for no leading / paths
1598
# root before all else
1599
self.assertPathCompare("", "a")
1600
# alpha within a dir
1601
self.assertPathCompare("a", "b")
1602
self.assertPathCompare("b", "z")
1603
# high dirs before lower.
1604
self.assertPathCompare("z", "a/a")
1605
# except if the deeper dir should be output first
1606
self.assertPathCompare("a/b/c", "d/g")
1607
# lexical betwen dirs of the same height
1608
self.assertPathCompare("a/z", "z/z")
1609
self.assertPathCompare("a/c/z", "a/d/e")
1611
def test_path_prefix_sorting(self):
1612
"""Doing a sort on path prefix should match our sample data."""
1627
dir_sorted_paths = [
1643
sorted(original_paths, key=osutils.path_prefix_key))
1644
# using the comparison routine shoudl work too:
1647
sorted(original_paths, key=osutils.path_prefix_key))
1650
class TestCopyTree(tests.TestCaseInTempDir):
1652
def test_copy_basic_tree(self):
1653
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1654
osutils.copy_tree('source', 'target')
1655
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1656
self.assertEqual(['c'], os.listdir('target/b'))
1658
def test_copy_tree_target_exists(self):
1659
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1661
osutils.copy_tree('source', 'target')
1662
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1663
self.assertEqual(['c'], os.listdir('target/b'))
1665
def test_copy_tree_symlinks(self):
1666
self.requireFeature(features.SymlinkFeature)
1667
self.build_tree(['source/'])
1668
os.symlink('a/generic/path', 'source/lnk')
1669
osutils.copy_tree('source', 'target')
1670
self.assertEqual(['lnk'], os.listdir('target'))
1671
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1673
def test_copy_tree_handlers(self):
1674
processed_files = []
1675
processed_links = []
1677
def file_handler(from_path, to_path):
1678
processed_files.append(('f', from_path, to_path))
1680
def dir_handler(from_path, to_path):
1681
processed_files.append(('d', from_path, to_path))
1683
def link_handler(from_path, to_path):
1684
processed_links.append((from_path, to_path))
1685
handlers = {'file': file_handler,
1686
'directory': dir_handler,
1687
'symlink': link_handler,
1690
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1691
if osutils.has_symlinks():
1692
os.symlink('a/generic/path', 'source/lnk')
1693
osutils.copy_tree('source', 'target', handlers=handlers)
1695
self.assertEqual([('d', 'source', 'target'),
1696
('f', 'source/a', 'target/a'),
1697
('d', 'source/b', 'target/b'),
1698
('f', 'source/b/c', 'target/b/c'),
1700
self.assertPathDoesNotExist('target')
1701
if osutils.has_symlinks():
1702
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1705
class TestSetUnsetEnv(tests.TestCase):
1706
"""Test updating the environment"""
1709
super(TestSetUnsetEnv, self).setUp()
1711
self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
1712
'Environment was not cleaned up properly.'
1713
' Variable BRZ_TEST_ENV_VAR should not exist.')
1716
if 'BRZ_TEST_ENV_VAR' in os.environ:
1717
del os.environ['BRZ_TEST_ENV_VAR']
1718
self.addCleanup(cleanup)
1721
"""Test that we can set an env variable"""
1722
old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1723
self.assertEqual(None, old)
1724
self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
1726
def test_double_set(self):
1727
"""Test that we get the old value out"""
1728
osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1729
old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
1730
self.assertEqual('foo', old)
1731
self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
1733
def test_unicode(self):
1734
"""Environment can only contain plain strings
1736
So Unicode strings must be encoded.
1738
uni_val, env_val = tests.probe_unicode_in_user_encoding()
1740
raise tests.TestSkipped(
1741
'Cannot find a unicode character that works in encoding %s'
1742
% (osutils.get_user_encoding(),))
1744
osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
1746
self.assertEqual(uni_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1748
self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1750
def test_unset(self):
1751
"""Test that passing None will remove the env var"""
1752
osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1753
old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
1754
self.assertEqual('foo', old)
1755
self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
1756
self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
1759
class TestSizeShaFile(tests.TestCaseInTempDir):
1761
def test_sha_empty(self):
1762
self.build_tree_contents([('foo', b'')])
1763
expected_sha = osutils.sha_string(b'')
1765
self.addCleanup(f.close)
1766
size, sha = osutils.size_sha_file(f)
1767
self.assertEqual(0, size)
1768
self.assertEqual(expected_sha, sha)
1770
def test_sha_mixed_endings(self):
1771
text = b'test\r\nwith\nall\rpossible line endings\r\n'
1772
self.build_tree_contents([('foo', text)])
1773
expected_sha = osutils.sha_string(text)
1774
f = open('foo', 'rb')
1775
self.addCleanup(f.close)
1776
size, sha = osutils.size_sha_file(f)
1777
self.assertEqual(38, size)
1778
self.assertEqual(expected_sha, sha)
1781
class TestShaFileByName(tests.TestCaseInTempDir):
1783
def test_sha_empty(self):
1784
self.build_tree_contents([('foo', b'')])
1785
expected_sha = osutils.sha_string(b'')
1786
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1788
def test_sha_mixed_endings(self):
1789
text = b'test\r\nwith\nall\rpossible line endings\r\n'
1790
self.build_tree_contents([('foo', text)])
1791
expected_sha = osutils.sha_string(text)
1792
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1795
class TestResourceLoading(tests.TestCaseInTempDir):
1797
def test_resource_string(self):
1798
# test resource in breezy
1799
text = osutils.resource_string('breezy', 'debug.py')
1800
self.assertContainsRe(text, "debug_flags = set()")
1801
# test resource under breezy
1802
text = osutils.resource_string('breezy.ui', 'text.py')
1803
self.assertContainsRe(text, "class TextUIFactory")
1804
# test unsupported package
1805
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1807
# test unknown resource
1808
self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
1811
class TestDirReader(tests.TestCaseInTempDir):
1813
scenarios = dir_reader_scenarios()
1816
_dir_reader_class = None
1817
_native_to_unicode = None
1820
super(TestDirReader, self).setUp()
1821
self.overrideAttr(osutils,
1822
'_selected_dir_reader', self._dir_reader_class())
1824
def _get_ascii_tree(self):
1832
expected_dirblocks = [
1834
[(b'0file', b'0file', 'file', './0file'),
1835
(b'1dir', b'1dir', 'directory', './1dir'),
1836
(b'2file', b'2file', 'file', './2file'),
1839
((b'1dir', './1dir'),
1840
[(b'1dir/0file', b'0file', 'file', './1dir/0file'),
1841
(b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
1844
((b'1dir/1dir', './1dir/1dir'),
1849
return tree, expected_dirblocks
1851
def test_walk_cur_dir(self):
1852
tree, expected_dirblocks = self._get_ascii_tree()
1853
self.build_tree(tree)
1854
result = list(osutils._walkdirs_utf8('.'))
1855
# Filter out stat and abspath
1856
self.assertEqual(expected_dirblocks,
1857
self._filter_out(result))
1859
def test_walk_sub_dir(self):
1860
tree, expected_dirblocks = self._get_ascii_tree()
1861
self.build_tree(tree)
1862
# you can search a subdir only, with a supplied prefix.
1863
result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
1864
# Filter out stat and abspath
1865
self.assertEqual(expected_dirblocks[1:],
1866
self._filter_out(result))
1868
def _get_unicode_tree(self):
1869
name0u = u'0file-\xb6'
1870
name1u = u'1dir-\u062c\u0648'
1871
name2u = u'2file-\u0633'
1875
name1u + '/' + name0u,
1876
name1u + '/' + name1u + '/',
1879
name0 = name0u.encode('UTF-8')
1880
name1 = name1u.encode('UTF-8')
1881
name2 = name2u.encode('UTF-8')
1882
expected_dirblocks = [
1884
[(name0, name0, 'file', './' + name0u),
1885
(name1, name1, 'directory', './' + name1u),
1886
(name2, name2, 'file', './' + name2u),
1889
((name1, './' + name1u),
1890
[(name1 + b'/' + name0, name0, 'file', './' + name1u
1892
(name1 + b'/' + name1, name1, 'directory', './' + name1u
1896
((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1901
return tree, expected_dirblocks
1903
def _filter_out(self, raw_dirblocks):
1904
"""Filter out a walkdirs_utf8 result.
1906
stat field is removed, all native paths are converted to unicode
1908
filtered_dirblocks = []
1909
for dirinfo, block in raw_dirblocks:
1910
dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1914
line[0:3] + (self._native_to_unicode(line[4]), ))
1915
filtered_dirblocks.append((dirinfo, details))
1916
return filtered_dirblocks
1918
def test_walk_unicode_tree(self):
1919
self.requireFeature(features.UnicodeFilenameFeature)
1920
tree, expected_dirblocks = self._get_unicode_tree()
1921
self.build_tree(tree)
1922
result = list(osutils._walkdirs_utf8('.'))
1923
self.assertEqual(expected_dirblocks, self._filter_out(result))
1925
def test_symlink(self):
1926
self.requireFeature(features.SymlinkFeature)
1927
self.requireFeature(features.UnicodeFilenameFeature)
1928
target = u'target\N{Euro Sign}'
1929
link_name = u'l\N{Euro Sign}nk'
1930
os.symlink(target, link_name)
1931
link_name_utf8 = link_name.encode('UTF-8')
1932
expected_dirblocks = [
1934
[(link_name_utf8, link_name_utf8,
1935
'symlink', './' + link_name), ],
1937
result = list(osutils._walkdirs_utf8('.'))
1938
self.assertEqual(expected_dirblocks, self._filter_out(result))
1941
class TestReadLink(tests.TestCaseInTempDir):
1942
"""Exposes os.readlink() problems and the osutils solution.
1944
The only guarantee offered by os.readlink(), starting with 2.6, is that a
1945
unicode string will be returned if a unicode string is passed.
1947
But prior python versions failed to properly encode the passed unicode
1950
_test_needs_features = [features.SymlinkFeature,
1951
features.UnicodeFilenameFeature]
1954
super(tests.TestCaseInTempDir, self).setUp()
1955
self.link = u'l\N{Euro Sign}ink'
1956
self.target = u'targe\N{Euro Sign}t'
1957
os.symlink(self.target, self.link)
1959
def test_os_readlink_link_encoding(self):
1960
self.assertEqual(self.target, os.readlink(self.link))
1962
def test_os_readlink_link_decoding(self):
1963
self.assertEqual(self.target.encode(osutils._fs_enc),
1964
os.readlink(self.link.encode(osutils._fs_enc)))
1967
class TestConcurrency(tests.TestCase):
1970
super(TestConcurrency, self).setUp()
1971
self.overrideAttr(osutils, '_cached_local_concurrency')
1973
def test_local_concurrency(self):
1974
concurrency = osutils.local_concurrency()
1975
self.assertIsInstance(concurrency, int)
1977
def test_local_concurrency_environment_variable(self):
1978
self.overrideEnv('BRZ_CONCURRENCY', '2')
1979
self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1980
self.overrideEnv('BRZ_CONCURRENCY', '3')
1981
self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1982
self.overrideEnv('BRZ_CONCURRENCY', 'foo')
1983
self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1985
def test_option_concurrency(self):
1986
self.overrideEnv('BRZ_CONCURRENCY', '1')
1987
self.run_bzr('rocks --concurrency 42')
1988
# Command line overrides environment variable
1989
self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
1990
self.assertEqual(42, osutils.local_concurrency(use_cache=False))
1993
class TestFailedToLoadExtension(tests.TestCase):
1995
def _try_loading(self):
1997
import breezy._fictional_extension_py # noqa: F401
1998
except ImportError as e:
1999
osutils.failed_to_load_extension(e)
2003
super(TestFailedToLoadExtension, self).setUp()
2004
self.overrideAttr(osutils, '_extension_load_failures', [])
2006
def test_failure_to_load(self):
2008
self.assertLength(1, osutils._extension_load_failures)
2011
osutils._extension_load_failures[0],
2012
"No module named 'breezy._fictional_extension_py'")
2014
self.assertEqual(osutils._extension_load_failures[0],
2015
"No module named _fictional_extension_py")
2017
def test_report_extension_load_failures_no_warning(self):
2018
self.assertTrue(self._try_loading())
2019
warnings, result = self.callCatchWarnings(
2020
osutils.report_extension_load_failures)
2021
# it used to give a Python warning; it no longer does
2022
self.assertLength(0, warnings)
2024
def test_report_extension_load_failures_message(self):
2026
trace.push_log_file(log)
2027
self.assertTrue(self._try_loading())
2028
osutils.report_extension_load_failures()
2029
self.assertContainsRe(
2031
br"brz: warning: some compiled extensions could not be loaded; "
2032
b"see ``brz help missing-extensions``\n"
2036
class TestTerminalWidth(tests.TestCase):
2039
super(TestTerminalWidth, self).setUp()
2040
self._orig_terminal_size_state = osutils._terminal_size_state
2041
self._orig_first_terminal_size = osutils._first_terminal_size
2042
self.addCleanup(self.restore_osutils_globals)
2043
osutils._terminal_size_state = 'no_data'
2044
osutils._first_terminal_size = None
2046
def restore_osutils_globals(self):
2047
osutils._terminal_size_state = self._orig_terminal_size_state
2048
osutils._first_terminal_size = self._orig_first_terminal_size
2050
def replace_stdout(self, new):
2051
self.overrideAttr(sys, 'stdout', new)
2053
def replace__terminal_size(self, new):
2054
self.overrideAttr(osutils, '_terminal_size', new)
2056
def set_fake_tty(self):
2058
class I_am_a_tty(object):
2062
self.replace_stdout(I_am_a_tty())
2064
def test_default_values(self):
2065
self.assertEqual(80, osutils.default_terminal_width)
2067
def test_defaults_to_BRZ_COLUMNS(self):
2068
# BRZ_COLUMNS is set by the test framework
2069
self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
2070
self.overrideEnv('BRZ_COLUMNS', '12')
2071
self.assertEqual(12, osutils.terminal_width())
2073
def test_BRZ_COLUMNS_0_no_limit(self):
2074
self.overrideEnv('BRZ_COLUMNS', '0')
2075
self.assertEqual(None, osutils.terminal_width())
2077
def test_falls_back_to_COLUMNS(self):
2078
self.overrideEnv('BRZ_COLUMNS', None)
2079
self.assertNotEqual('42', os.environ['COLUMNS'])
2081
self.overrideEnv('COLUMNS', '42')
2082
self.assertEqual(42, osutils.terminal_width())
2084
def test_tty_default_without_columns(self):
2085
self.overrideEnv('BRZ_COLUMNS', None)
2086
self.overrideEnv('COLUMNS', None)
2088
def terminal_size(w, h):
2092
# We need to override the osutils definition as it depends on the
2093
# running environment that we can't control (PQM running without a
2094
# controlling terminal is one example).
2095
self.replace__terminal_size(terminal_size)
2096
self.assertEqual(42, osutils.terminal_width())
2098
def test_non_tty_default_without_columns(self):
2099
self.overrideEnv('BRZ_COLUMNS', None)
2100
self.overrideEnv('COLUMNS', None)
2101
self.replace_stdout(None)
2102
self.assertEqual(None, osutils.terminal_width())
2104
def test_no_TIOCGWINSZ(self):
2105
self.requireFeature(term_ios_feature)
2106
termios = term_ios_feature.module
2107
# bug 63539 is about a termios without TIOCGWINSZ attribute
2110
except AttributeError:
2111
# We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
2114
self.overrideAttr(termios, 'TIOCGWINSZ')
2115
del termios.TIOCGWINSZ
2116
self.overrideEnv('BRZ_COLUMNS', None)
2117
self.overrideEnv('COLUMNS', None)
2118
# Whatever the result is, if we don't raise an exception, it's ok.
2119
osutils.terminal_width()
2122
class TestCreationOps(tests.TestCaseInTempDir):
2123
_test_needs_features = [features.chown_feature]
2126
super(TestCreationOps, self).setUp()
2127
self.overrideAttr(os, 'chown', self._dummy_chown)
2129
# params set by call to _dummy_chown
2130
self.path = self.uid = self.gid = None
2132
def _dummy_chown(self, path, uid, gid):
2133
self.path, self.uid, self.gid = path, uid, gid
2135
def test_copy_ownership_from_path(self):
2136
"""copy_ownership_from_path test with specified src."""
2138
open('test_file', 'wt').close()
2139
osutils.copy_ownership_from_path('test_file', ownsrc)
2142
self.assertEqual(self.path, 'test_file')
2143
self.assertEqual(self.uid, s.st_uid)
2144
self.assertEqual(self.gid, s.st_gid)
2146
def test_copy_ownership_nonesrc(self):
2147
"""copy_ownership_from_path test with src=None."""
2148
open('test_file', 'wt').close()
2149
# should use parent dir for permissions
2150
osutils.copy_ownership_from_path('test_file')
2153
self.assertEqual(self.path, 'test_file')
2154
self.assertEqual(self.uid, s.st_uid)
2155
self.assertEqual(self.gid, s.st_gid)
2158
class TestPathFromEnviron(tests.TestCase):
2160
def test_is_unicode(self):
2161
self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
2162
path = osutils.path_from_environ('BRZ_TEST_PATH')
2163
self.assertIsInstance(path, text_type)
2164
self.assertEqual(u'./anywhere at all/', path)
2166
def test_posix_path_env_ascii(self):
2167
self.overrideEnv('BRZ_TEST_PATH', '/tmp')
2168
home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
2169
self.assertIsInstance(home, text_type)
2170
self.assertEqual(u'/tmp', home)
2172
def test_posix_path_env_unicode(self):
2173
self.requireFeature(features.ByteStringNamedFilesystem)
2174
self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
2175
self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2176
self.assertEqual(u'/home/\xa7test',
2177
osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2178
osutils._fs_enc = "iso8859-5"
2180
# In Python 3, os.environ returns unicode.
2181
self.assertEqual(u'/home/\xa7test',
2182
osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2184
self.assertEqual(u'/home/\u0407test',
2185
osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2186
osutils._fs_enc = "utf-8"
2188
errors.BadFilenameEncoding,
2189
osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
2192
class TestGetHomeDir(tests.TestCase):
2194
def test_is_unicode(self):
2195
home = osutils._get_home_dir()
2196
self.assertIsInstance(home, text_type)
2198
def test_posix_homeless(self):
2199
self.overrideEnv('HOME', None)
2200
home = osutils._get_home_dir()
2201
self.assertIsInstance(home, text_type)
2203
def test_posix_home_ascii(self):
2204
self.overrideEnv('HOME', '/home/test')
2205
home = osutils._posix_get_home_dir()
2206
self.assertIsInstance(home, text_type)
2207
self.assertEqual(u'/home/test', home)
2209
def test_posix_home_unicode(self):
2210
self.requireFeature(features.ByteStringNamedFilesystem)
2211
self.overrideEnv('HOME', '/home/\xa7test')
2212
self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2213
self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2214
osutils._fs_enc = "iso8859-5"
2216
# In python 3, os.environ returns unicode
2217
self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2219
self.assertEqual(u'/home/\u0407test',
2220
osutils._posix_get_home_dir())
2221
osutils._fs_enc = "utf-8"
2222
self.assertRaises(errors.BadFilenameEncoding,
2223
osutils._posix_get_home_dir)
2226
class TestGetuserUnicode(tests.TestCase):
2228
def test_is_unicode(self):
2229
user = osutils.getuser_unicode()
2230
self.assertIsInstance(user, text_type)
2232
def envvar_to_override(self):
2233
if sys.platform == "win32":
2234
# Disable use of platform calls on windows so envvar is used
2235
self.overrideAttr(win32utils, 'has_ctypes', False)
2236
return 'USERNAME' # only variable used on windows
2237
return 'LOGNAME' # first variable checked by getpass.getuser()
2239
def test_ascii_user(self):
2240
self.overrideEnv(self.envvar_to_override(), 'jrandom')
2241
self.assertEqual(u'jrandom', osutils.getuser_unicode())
2243
def test_unicode_user(self):
2244
ue = osutils.get_user_encoding()
2245
uni_val, env_val = tests.probe_unicode_in_user_encoding()
2247
raise tests.TestSkipped(
2248
'Cannot find a unicode character that works in encoding %s'
2249
% (osutils.get_user_encoding(),))
2250
uni_username = u'jrandom' + uni_val
2251
encoded_username = uni_username.encode(ue)
2253
self.overrideEnv(self.envvar_to_override(), uni_username)
2255
self.overrideEnv(self.envvar_to_override(), encoded_username)
2256
self.assertEqual(uni_username, osutils.getuser_unicode())
2259
class TestBackupNames(tests.TestCase):
2262
super(TestBackupNames, self).setUp()
2265
def backup_exists(self, name):
2266
return name in self.backups
2268
def available_backup_name(self, name):
2269
backup_name = osutils.available_backup_name(name, self.backup_exists)
2270
self.backups.append(backup_name)
2273
def assertBackupName(self, expected, name):
2274
self.assertEqual(expected, self.available_backup_name(name))
2276
def test_empty(self):
2277
self.assertBackupName('file.~1~', 'file')
2279
def test_existing(self):
2280
self.available_backup_name('file')
2281
self.available_backup_name('file')
2282
self.assertBackupName('file.~3~', 'file')
2283
# Empty slots are found, this is not a strict requirement and may be
2284
# revisited if we test against all implementations.
2285
self.backups.remove('file.~2~')
2286
self.assertBackupName('file.~2~', 'file')
2289
class TestFindExecutableInPath(tests.TestCase):
2291
def test_windows(self):
2292
if sys.platform != 'win32':
2293
raise tests.TestSkipped('test requires win32')
2294
self.assertTrue(osutils.find_executable_on_path(
2295
'explorer') is not None)
2297
osutils.find_executable_on_path('explorer.exe') is not None)
2299
osutils.find_executable_on_path('EXPLORER.EXE') is not None)
2301
osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2302
self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
2304
def test_windows_app_path(self):
2305
if sys.platform != 'win32':
2306
raise tests.TestSkipped('test requires win32')
2307
# Override PATH env var so that exe can only be found on App Path
2308
self.overrideEnv('PATH', '')
2309
# Internt Explorer is always registered in the App Path
2310
self.assertTrue(osutils.find_executable_on_path(
2311
'iexplore') is not None)
2313
def test_other(self):
2314
if sys.platform == 'win32':
2315
raise tests.TestSkipped('test requires non-win32')
2316
self.assertTrue(osutils.find_executable_on_path('sh') is not None)
2318
osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2321
class TestEnvironmentErrors(tests.TestCase):
2322
"""Test handling of environmental errors"""
2324
def test_is_oserror(self):
2325
self.assertTrue(osutils.is_environment_error(
2326
OSError(errno.EINVAL, "Invalid parameter")))
2328
def test_is_ioerror(self):
2329
self.assertTrue(osutils.is_environment_error(
2330
IOError(errno.EINVAL, "Invalid parameter")))
2332
def test_is_socket_error(self):
2333
self.assertTrue(osutils.is_environment_error(
2334
socket.error(errno.EINVAL, "Invalid parameter")))
2336
def test_is_select_error(self):
2337
self.assertTrue(osutils.is_environment_error(
2338
select.error(errno.EINVAL, "Invalid parameter")))
2340
def test_is_pywintypes_error(self):
2341
self.requireFeature(features.pywintypes)
2343
self.assertTrue(osutils.is_environment_error(
2344
pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))
2347
class SupportsExecutableTests(tests.TestCaseInTempDir):
2349
def test_returns_bool(self):
2350
self.assertIsInstance(osutils.supports_executable(self.test_dir), bool)
2353
class SupportsSymlinksTests(tests.TestCaseInTempDir):
2355
def test_returns_bool(self):
2356
self.assertIsInstance(osutils.supports_symlinks(self.test_dir), bool)
2359
class GetFsTypeTests(tests.TestCaseInTempDir):
2361
def test_returns_string(self):
2362
self.requireFeature(psutil_feature)
2363
self.assertIsInstance(osutils.get_fs_type(self.test_dir), str)