/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_osutils.py

  • Committer: Jelmer Vernooij
  • Date: 2018-02-18 21:42:57 UTC
  • mto: This revision was merged to the branch mainline in revision 6859.
  • Revision ID: jelmer@jelmer.uk-20180218214257-jpevutp1wa30tz3v
Update TODO to reference Breezy, not Bazaar.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
 
19
from __future__ import absolute_import, division
 
20
 
19
21
import errno
20
 
from io import BytesIO
21
22
import os
 
23
import re
22
24
import select
23
25
import socket
24
26
import sys
27
29
 
28
30
from .. import (
29
31
    errors,
 
32
    lazy_regex,
30
33
    osutils,
 
34
    symbol_versioning,
31
35
    tests,
32
36
    trace,
33
37
    win32utils,
34
38
    )
 
39
from ..sixish import (
 
40
    BytesIO,
 
41
    )
35
42
from . import (
36
43
    features,
37
44
    file_utils,
51
58
        except ImportError:
52
59
            return False
53
60
 
54
 
 
55
61
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
56
62
 
57
63
term_ios_feature = features.ModuleAvailableFeature('termios')
130
136
 
131
137
    def test_fancy_rename(self):
132
138
        # This should work everywhere
133
 
        self.create_file('a', b'something in a\n')
 
139
        self.create_file('a', 'something in a\n')
134
140
        self._fancy_rename('a', 'b')
135
141
        self.assertPathDoesNotExist('a')
136
142
        self.assertPathExists('b')
137
 
        self.check_file_contents('b', b'something in a\n')
 
143
        self.check_file_contents('b', 'something in a\n')
138
144
 
139
 
        self.create_file('a', b'new something in a\n')
 
145
        self.create_file('a', 'new something in a\n')
140
146
        self._fancy_rename('b', 'a')
141
147
 
142
 
        self.check_file_contents('a', b'something in a\n')
 
148
        self.check_file_contents('a', 'something in a\n')
143
149
 
144
150
    def test_fancy_rename_fails_source_missing(self):
145
151
        # An exception should be raised, and the target should be left in place
146
 
        self.create_file('target', b'data in target\n')
 
152
        self.create_file('target', 'data in target\n')
147
153
        self.assertRaises((IOError, OSError), self._fancy_rename,
148
154
                          'missingsource', 'target')
149
155
        self.assertPathExists('target')
150
 
        self.check_file_contents('target', b'data in target\n')
 
156
        self.check_file_contents('target', 'data in target\n')
151
157
 
152
158
    def test_fancy_rename_fails_if_source_and_target_missing(self):
153
159
        self.assertRaises((IOError, OSError), self._fancy_rename,
155
161
 
156
162
    def test_rename(self):
157
163
        # Rename should be semi-atomic on all platforms
158
 
        self.create_file('a', b'something in a\n')
 
164
        self.create_file('a', 'something in a\n')
159
165
        osutils.rename('a', 'b')
160
166
        self.assertPathDoesNotExist('a')
161
167
        self.assertPathExists('b')
162
 
        self.check_file_contents('b', b'something in a\n')
 
168
        self.check_file_contents('b', 'something in a\n')
163
169
 
164
 
        self.create_file('a', b'new something in a\n')
 
170
        self.create_file('a', 'new something in a\n')
165
171
        osutils.rename('b', 'a')
166
172
 
167
 
        self.check_file_contents('a', b'something in a\n')
 
173
        self.check_file_contents('a', 'something in a\n')
168
174
 
169
175
    # TODO: test fancy_rename using a MemoryTransport
170
176
 
249
255
            # Without it, we may end up re-reading content when we don't have
250
256
            # to, but otherwise it doesn't effect correctness.
251
257
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
252
 
        with open('test-file.txt', 'wb') as f:
253
 
            f.write(b'some content\n')
254
 
            f.flush()
255
 
            self.assertEqualStat(osutils.fstat(f.fileno()),
256
 
                                 osutils.lstat('test-file.txt'))
 
258
        f = open('test-file.txt', 'wb')
 
259
        self.addCleanup(f.close)
 
260
        f.write('some content\n')
 
261
        f.flush()
 
262
        self.assertEqualStat(osutils.fstat(f.fileno()),
 
263
                             osutils.lstat('test-file.txt'))
257
264
 
258
265
 
259
266
class TestRmTree(tests.TestCaseInTempDir):
261
268
    def test_rmtree(self):
262
269
        # Check to remove tree with read-only files/dirs
263
270
        os.mkdir('dir')
264
 
        with open('dir/file', 'w') as f:
265
 
            f.write('spam')
 
271
        f = file('dir/file', 'w')
 
272
        f.write('spam')
 
273
        f.close()
266
274
        # would like to also try making the directory readonly, but at the
267
275
        # moment python shutil.rmtree doesn't handle that properly - it would
268
276
        # need to chmod the directory before removing things inside it - deferred
305
313
            if e.errno not in (errno.ENOENT,):
306
314
                raise
307
315
        else:
308
 
            self.assertEqual(
309
 
                'chardev',
310
 
                osutils.file_kind(os.path.realpath('/dev/null')))
 
316
            self.assertEqual('chardev', osutils.file_kind('/dev/null'))
311
317
 
312
318
        mkfifo = getattr(os, 'mkfifo', None)
313
319
        if mkfifo:
397
403
 
398
404
    def test_format_date(self):
399
405
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
400
 
                          osutils.format_date, 0, timezone='foo')
 
406
            osutils.format_date, 0, timezone='foo')
401
407
        self.assertIsInstance(osutils.format_date(0), str)
402
 
        self.assertIsInstance(osutils.format_local_date(0), str)
 
408
        self.assertIsInstance(osutils.format_local_date(0), unicode)
403
409
        # Testing for the actual value of the local weekday without
404
410
        # duplicating the code from format_date is difficult.
405
411
        # Instead blackbox.test_locale should check for localized
407
413
 
408
414
    def test_format_date_with_offset_in_original_timezone(self):
409
415
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
410
 
                         osutils.format_date_with_offset_in_original_timezone(0))
 
416
            osutils.format_date_with_offset_in_original_timezone(0))
411
417
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
412
 
                         osutils.format_date_with_offset_in_original_timezone(100000))
 
418
            osutils.format_date_with_offset_in_original_timezone(100000))
413
419
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
414
 
                         osutils.format_date_with_offset_in_original_timezone(100000, 7200))
 
420
            osutils.format_date_with_offset_in_original_timezone(100000, 7200))
415
421
 
416
422
    def test_local_time_offset(self):
417
423
        """Test that local_time_offset() returns a sane value."""
506
512
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
507
513
 
508
514
    def test_changing_access(self):
509
 
        with open('file', 'w') as f:
510
 
            f.write('monkey')
 
515
        f = file('file', 'w')
 
516
        f.write('monkey')
 
517
        f.close()
511
518
 
512
519
        # Make a file readonly
513
520
        osutils.make_readonly('file')
534
541
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
535
542
 
536
543
    def test_canonical_relpath_simple(self):
537
 
        f = open('MixedCaseName', 'w')
 
544
        f = file('MixedCaseName', 'w')
538
545
        f.close()
539
546
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
540
547
        self.assertEqual('work/MixedCaseName', actual)
611
618
 
612
619
        # read (max // 2) bytes and verify read size wasn't affected
613
620
        num_bytes_to_read = self.block_size // 2
614
 
        osutils.pumpfile(from_file, to_file,
615
 
                         num_bytes_to_read, self.block_size)
 
621
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
616
622
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
617
623
        self.assertEqual(from_file.get_read_count(), 1)
618
624
 
619
625
        # read (max) bytes and verify read size wasn't affected
620
626
        num_bytes_to_read = self.block_size
621
627
        from_file.reset_read_count()
622
 
        osutils.pumpfile(from_file, to_file,
623
 
                         num_bytes_to_read, self.block_size)
 
628
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
624
629
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
625
630
        self.assertEqual(from_file.get_read_count(), 1)
626
631
 
627
632
        # read (max + 1) bytes and verify read size was limited
628
633
        num_bytes_to_read = self.block_size + 1
629
634
        from_file.reset_read_count()
630
 
        osutils.pumpfile(from_file, to_file,
631
 
                         num_bytes_to_read, self.block_size)
 
635
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
632
636
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
633
637
        self.assertEqual(from_file.get_read_count(), 2)
634
638
 
635
639
        # finish reading the rest of the data
636
640
        num_bytes_to_read = self.test_data_len - to_file.tell()
637
 
        osutils.pumpfile(from_file, to_file,
638
 
                         num_bytes_to_read, self.block_size)
 
641
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
639
642
 
640
643
        # report error if the data wasn't equal (we only report the size due
641
644
        # to the length of the data)
708
711
 
709
712
    def test_report_activity(self):
710
713
        activity = []
711
 
 
712
714
        def log_activity(length, direction):
713
715
            activity.append((length, direction))
714
716
        from_file = BytesIO(self.test_data)
732
734
        del activity[:]
733
735
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
734
736
                         report_activity=log_activity, direction='read')
735
 
        self.assertEqual(
736
 
            [(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
737
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
738
 
737
739
 
738
740
 
739
741
class TestPumpStringFile(tests.TestCase):
781
783
class TestSafeUnicode(tests.TestCase):
782
784
 
783
785
    def test_from_ascii_string(self):
784
 
        self.assertEqual(u'foobar', osutils.safe_unicode(b'foobar'))
 
786
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
785
787
 
786
788
    def test_from_unicode_string_ascii_contents(self):
787
789
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
790
792
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
791
793
 
792
794
    def test_from_utf8_string(self):
793
 
        self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
 
795
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
794
796
 
795
797
    def test_bad_utf8_string(self):
796
798
        self.assertRaises(errors.BzrBadParameterNotUnicode,
797
799
                          osutils.safe_unicode,
798
 
                          b'\xbb\xbb')
 
800
                          '\xbb\xbb')
799
801
 
800
802
 
801
803
class TestSafeUtf8(tests.TestCase):
802
804
 
803
805
    def test_from_ascii_string(self):
804
 
        f = b'foobar'
805
 
        self.assertEqual(b'foobar', osutils.safe_utf8(f))
 
806
        f = 'foobar'
 
807
        self.assertEqual('foobar', osutils.safe_utf8(f))
806
808
 
807
809
    def test_from_unicode_string_ascii_contents(self):
808
 
        self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
 
810
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
809
811
 
810
812
    def test_from_unicode_string_unicode_contents(self):
811
 
        self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
813
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
812
814
 
813
815
    def test_from_utf8_string(self):
814
 
        self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
 
816
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
815
817
 
816
818
    def test_bad_utf8_string(self):
817
819
        self.assertRaises(errors.BzrBadParameterNotUnicode,
818
 
                          osutils.safe_utf8, b'\xbb\xbb')
 
820
                          osutils.safe_utf8, '\xbb\xbb')
 
821
 
 
822
 
 
823
class TestSafeRevisionId(tests.TestCase):
 
824
 
 
825
    def test_from_ascii_string(self):
 
826
        # this shouldn't give a warning because it's getting an ascii string
 
827
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
828
 
 
829
    def test_from_unicode_string_ascii_contents(self):
 
830
        self.assertRaises(TypeError,
 
831
                          osutils.safe_revision_id, u'bargam')
 
832
 
 
833
    def test_from_unicode_string_unicode_contents(self):
 
834
        self.assertRaises(TypeError,
 
835
                         osutils.safe_revision_id, u'bargam\xae')
 
836
 
 
837
    def test_from_utf8_string(self):
 
838
        self.assertEqual('foo\xc2\xae',
 
839
                         osutils.safe_revision_id('foo\xc2\xae'))
 
840
 
 
841
    def test_none(self):
 
842
        """Currently, None is a valid revision_id"""
 
843
        self.assertEqual(None, osutils.safe_revision_id(None))
 
844
 
 
845
 
 
846
class TestSafeFileId(tests.TestCase):
 
847
 
 
848
    def test_from_ascii_string(self):
 
849
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
850
 
 
851
    def test_from_unicode_string_ascii_contents(self):
 
852
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
 
853
 
 
854
    def test_from_unicode_string_unicode_contents(self):
 
855
        self.assertRaises(TypeError,
 
856
                          osutils.safe_file_id, u'bargam\xae')
 
857
 
 
858
    def test_from_utf8_string(self):
 
859
        self.assertEqual('foo\xc2\xae',
 
860
                         osutils.safe_file_id('foo\xc2\xae'))
 
861
 
 
862
    def test_none(self):
 
863
        """Currently, None is a valid revision_id"""
 
864
        self.assertEqual(None, osutils.safe_file_id(None))
819
865
 
820
866
 
821
867
class TestSendAll(tests.TestCase):
824
870
        class DisconnectedSocket(object):
825
871
            def __init__(self, err):
826
872
                self.err = err
827
 
 
828
873
            def send(self, content):
829
874
                raise self.err
830
 
 
831
875
            def close(self):
832
876
                pass
833
877
        # All of these should be treated as ConnectionReset
838
882
        for err in errs:
839
883
            sock = DisconnectedSocket(err)
840
884
            self.assertRaises(errors.ConnectionReset,
841
 
                              osutils.send_all, sock, b'some more content')
 
885
                osutils.send_all, sock, b'some more content')
842
886
 
843
887
    def test_send_with_no_progress(self):
844
888
        # See https://bugs.launchpad.net/bzr/+bug/1047309
847
891
        class NoSendingSocket(object):
848
892
            def __init__(self):
849
893
                self.call_count = 0
850
 
 
851
894
            def send(self, bytes):
852
895
                self.call_count += 1
853
896
                if self.call_count > 100:
866
909
 
867
910
    def test_normpath(self):
868
911
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
869
 
        self.assertEqual(
870
 
            '/etc/shadow', osutils._posix_normpath('//etc/shadow'))
871
 
        self.assertEqual(
872
 
            '/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
912
        self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
913
        self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
873
914
 
874
915
 
875
916
class TestWin32Funcs(tests.TestCase):
897
938
                         osutils._win32_pathjoin('path/to/', 'foo'))
898
939
 
899
940
    def test_pathjoin_late_bugfix(self):
900
 
        expected = 'C:/foo'
 
941
        if sys.version_info < (2, 7, 6):
 
942
            expected = '/foo'
 
943
        else:
 
944
            expected = 'C:/foo'
901
945
        self.assertEqual(expected,
902
946
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
903
947
        self.assertEqual(expected,
941
985
 
942
986
    def test_minimum_path_selection(self):
943
987
        self.assertEqual(set(),
944
 
                         osutils.minimum_path_selection([]))
 
988
            osutils.minimum_path_selection([]))
945
989
        self.assertEqual({'a'},
946
 
                         osutils.minimum_path_selection(['a']))
 
990
            osutils.minimum_path_selection(['a']))
947
991
        self.assertEqual({'a', 'b'},
948
 
                         osutils.minimum_path_selection(['a', 'b']))
949
 
        self.assertEqual({'a/', 'b'},
950
 
                         osutils.minimum_path_selection(['a/', 'b']))
951
 
        self.assertEqual({'a/', 'b'},
952
 
                         osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
992
            osutils.minimum_path_selection(['a', 'b']))
 
993
        self.assertEqual({'a/', 'b'},
 
994
            osutils.minimum_path_selection(['a/', 'b']))
 
995
        self.assertEqual({'a/', 'b'},
 
996
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
953
997
        self.assertEqual({'a-b', 'a', 'a0b'},
954
 
                         osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
 
998
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
955
999
 
956
1000
    def test_mkdtemp(self):
957
1001
        tmpdir = osutils._win32_mkdtemp(dir='.')
958
1002
        self.assertFalse('\\' in tmpdir)
959
1003
 
960
1004
    def test_rename(self):
961
 
        with open('a', 'wb') as a:
962
 
            a.write(b'foo\n')
963
 
        with open('b', 'wb') as b:
964
 
            b.write(b'baz\n')
 
1005
        a = open('a', 'wb')
 
1006
        a.write('foo\n')
 
1007
        a.close()
 
1008
        b = open('b', 'wb')
 
1009
        b.write('baz\n')
 
1010
        b.close()
965
1011
 
966
1012
        osutils._win32_rename('b', 'a')
967
1013
        self.assertPathExists('a')
968
1014
        self.assertPathDoesNotExist('b')
969
 
        self.assertFileEqual(b'baz\n', 'a')
 
1015
        self.assertFileEqual('baz\n', 'a')
970
1016
 
971
1017
    def test_rename_missing_file(self):
972
 
        with open('a', 'wb') as a:
973
 
            a.write(b'foo\n')
 
1018
        a = open('a', 'wb')
 
1019
        a.write('foo\n')
 
1020
        a.close()
974
1021
 
975
1022
        try:
976
1023
            osutils._win32_rename('b', 'a')
977
1024
        except (IOError, OSError) as e:
978
1025
            self.assertEqual(errno.ENOENT, e.errno)
979
 
        self.assertFileEqual(b'foo\n', 'a')
 
1026
        self.assertFileEqual('foo\n', 'a')
980
1027
 
981
1028
    def test_rename_missing_dir(self):
982
1029
        os.mkdir('a')
1005
1052
        check(['a', 'b'], 'a/b')
1006
1053
        check(['a', 'b'], 'a/./b')
1007
1054
        check(['a', '.b'], 'a/.b')
1008
 
        if os.path.sep == '\\':
1009
 
            check(['a', '.b'], 'a\\.b')
1010
 
        else:
1011
 
            check(['a\\.b'], 'a\\.b')
 
1055
        check(['a', '.b'], 'a\\.b')
1012
1056
 
1013
1057
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
1014
1058
 
1042
1086
class TestChunksToLines(tests.TestCase):
1043
1087
 
1044
1088
    def test_smoketest(self):
1045
 
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1046
 
                         osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
1047
 
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1048
 
                         osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
 
1089
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
1090
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
 
1091
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
1092
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
1049
1093
 
1050
1094
    def test_osutils_binding(self):
1051
1095
        from . import test__chunks_to_lines
1065
1109
                         osutils.split_lines(u'foo\nbar\xae\n'))
1066
1110
 
1067
1111
    def test_split_with_carriage_returns(self):
1068
 
        self.assertEqual([b'foo\rbar\n'],
1069
 
                         osutils.split_lines(b'foo\rbar\n'))
 
1112
        self.assertEqual(['foo\rbar\n'],
 
1113
                         osutils.split_lines('foo\rbar\n'))
1070
1114
 
1071
1115
 
1072
1116
class TestWalkDirs(tests.TestCaseInTempDir):
1087
1131
            ]
1088
1132
        self.build_tree(tree)
1089
1133
        expected_dirblocks = [
1090
 
            (('', '.'),
1091
 
             [('0file', '0file', 'file'),
1092
 
              ('1dir', '1dir', 'directory'),
1093
 
              ('2file', '2file', 'file'),
1094
 
              ]
1095
 
             ),
1096
 
            (('1dir', './1dir'),
1097
 
             [('1dir/0file', '0file', 'file'),
1098
 
              ('1dir/1dir', '1dir', 'directory'),
1099
 
              ]
1100
 
             ),
1101
 
            (('1dir/1dir', './1dir/1dir'),
1102
 
             [
1103
 
                ]
1104
 
             ),
 
1134
                (('', '.'),
 
1135
                 [('0file', '0file', 'file'),
 
1136
                  ('1dir', '1dir', 'directory'),
 
1137
                  ('2file', '2file', 'file'),
 
1138
                 ]
 
1139
                ),
 
1140
                (('1dir', './1dir'),
 
1141
                 [('1dir/0file', '0file', 'file'),
 
1142
                  ('1dir/1dir', '1dir', 'directory'),
 
1143
                 ]
 
1144
                ),
 
1145
                (('1dir/1dir', './1dir/1dir'),
 
1146
                 [
 
1147
                 ]
 
1148
                ),
1105
1149
            ]
1106
1150
        result = []
1107
1151
        found_bzrdir = False
1136
1180
        # (It would be ok if it happened earlier but at the moment it
1137
1181
        # doesn't.)
1138
1182
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1139
 
        self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
 
1183
        self.assertEqual('./test-unreadable', e.filename)
1140
1184
        self.assertEqual(errno.EACCES, e.errno)
1141
1185
        # Ensure the message contains the file name
1142
1186
        self.assertContainsRe(str(e), "\\./test-unreadable")
1143
1187
 
 
1188
 
1144
1189
    def test_walkdirs_encoding_error(self):
1145
1190
        # <https://bugs.launchpad.net/bzr/+bug/488519>
1146
1191
        # walkdirs didn't raise a useful message when the filenames
1161
1206
        self.build_tree(tree)
1162
1207
 
1163
1208
        # rename the 1file to a latin-1 filename
1164
 
        os.rename(b"./1file", b"\xe8file")
1165
 
        if b"\xe8file" not in os.listdir("."):
 
1209
        os.rename("./1file", "\xe8file")
 
1210
        if "\xe8file" not in os.listdir("."):
1166
1211
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
1167
1212
 
1168
1213
        self._save_platform_info()
1170
1215
 
1171
1216
        # this should raise on error
1172
1217
        def attempt():
1173
 
            for dirdetail, dirblock in osutils.walkdirs(b'.'):
 
1218
            for dirdetail, dirblock in osutils.walkdirs('.'):
1174
1219
                pass
1175
1220
 
1176
1221
        self.assertRaises(errors.BadFilenameEncoding, attempt)
1186
1231
            ]
1187
1232
        self.build_tree(tree)
1188
1233
        expected_dirblocks = [
1189
 
            (('', '.'),
1190
 
             [('0file', '0file', 'file'),
1191
 
              ('1dir', '1dir', 'directory'),
1192
 
              ('2file', '2file', 'file'),
1193
 
              ]
1194
 
             ),
1195
 
            (('1dir', './1dir'),
1196
 
             [('1dir/0file', '0file', 'file'),
1197
 
              ('1dir/1dir', '1dir', 'directory'),
1198
 
              ]
1199
 
             ),
1200
 
            (('1dir/1dir', './1dir/1dir'),
1201
 
             [
1202
 
                ]
1203
 
             ),
 
1234
                (('', '.'),
 
1235
                 [('0file', '0file', 'file'),
 
1236
                  ('1dir', '1dir', 'directory'),
 
1237
                  ('2file', '2file', 'file'),
 
1238
                 ]
 
1239
                ),
 
1240
                (('1dir', './1dir'),
 
1241
                 [('1dir/0file', '0file', 'file'),
 
1242
                  ('1dir/1dir', '1dir', 'directory'),
 
1243
                 ]
 
1244
                ),
 
1245
                (('1dir/1dir', './1dir/1dir'),
 
1246
                 [
 
1247
                 ]
 
1248
                ),
1204
1249
            ]
1205
1250
        result = []
1206
1251
        found_bzrdir = False
1207
 
        for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
1208
 
            if len(dirblock) and dirblock[0][1] == b'.bzr':
 
1252
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1253
            if len(dirblock) and dirblock[0][1] == '.bzr':
1209
1254
                # this tests the filtering of selected paths
1210
1255
                found_bzrdir = True
1211
1256
                del dirblock[0]
1212
 
            dirdetail = (dirdetail[0].decode('utf-8'),
1213
 
                         osutils.safe_unicode(dirdetail[1]))
1214
 
            dirblock = [
1215
 
                (entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
1216
 
                for entry in dirblock]
1217
1257
            result.append((dirdetail, dirblock))
1218
1258
 
1219
1259
        self.assertTrue(found_bzrdir)
1238
1278
        self.overrideAttr(osutils, '_fs_enc')
1239
1279
        self.overrideAttr(osutils, '_selected_dir_reader')
1240
1280
 
1241
 
    def assertDirReaderIs(self, expected, top):
 
1281
    def assertDirReaderIs(self, expected):
1242
1282
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
1243
1283
        # Force it to redetect
1244
1284
        osutils._selected_dir_reader = None
1245
1285
        # Nothing to list, but should still trigger the selection logic
1246
 
        self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
 
1286
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1247
1287
        self.assertIsInstance(osutils._selected_dir_reader, expected)
1248
1288
 
1249
1289
    def test_force_walkdirs_utf8_fs_utf8(self):
1250
1290
        self.requireFeature(UTF8DirReaderFeature)
1251
1291
        self._save_platform_info()
1252
1292
        osutils._fs_enc = 'utf-8'
1253
 
        self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1293
        self.assertDirReaderIs(
 
1294
            UTF8DirReaderFeature.module.UTF8DirReader)
1254
1295
 
1255
1296
    def test_force_walkdirs_utf8_fs_ascii(self):
1256
1297
        self.requireFeature(UTF8DirReaderFeature)
1257
1298
        self._save_platform_info()
1258
1299
        osutils._fs_enc = 'ascii'
1259
1300
        self.assertDirReaderIs(
1260
 
            UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1301
            UTF8DirReaderFeature.module.UTF8DirReader)
1261
1302
 
1262
1303
    def test_force_walkdirs_utf8_fs_latin1(self):
1263
1304
        self._save_platform_info()
1264
1305
        osutils._fs_enc = 'iso-8859-1'
1265
 
        self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
 
1306
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1266
1307
 
1267
1308
    def test_force_walkdirs_utf8_nt(self):
1268
1309
        # Disabled because the thunk of the whole walkdirs api is disabled.
1269
1310
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1270
1311
        self._save_platform_info()
1271
1312
        from .._walkdirs_win32 import Win32ReadDir
1272
 
        self.assertDirReaderIs(Win32ReadDir, ".")
 
1313
        self.assertDirReaderIs(Win32ReadDir)
1273
1314
 
1274
1315
    def test_unicode_walkdirs(self):
1275
1316
        """Walkdirs should always return unicode paths."""
1286
1327
            ]
1287
1328
        self.build_tree(tree)
1288
1329
        expected_dirblocks = [
1289
 
            ((u'', u'.'),
1290
 
             [(name0, name0, 'file', './' + name0),
1291
 
              (name1, name1, 'directory', './' + name1),
1292
 
              (name2, name2, 'file', './' + name2),
1293
 
              ]
1294
 
             ),
1295
 
            ((name1, './' + name1),
1296
 
             [(name1 + '/' + name0, name0, 'file', './' + name1
1297
 
               + '/' + name0),
1298
 
              (name1 + '/' + name1, name1, 'directory', './' + name1
1299
 
               + '/' + name1),
1300
 
              ]
1301
 
             ),
1302
 
            ((name1 + '/' + name1, './' + name1 + '/' + name1),
1303
 
             [
1304
 
                ]
1305
 
             ),
 
1330
                ((u'', u'.'),
 
1331
                 [(name0, name0, 'file', './' + name0),
 
1332
                  (name1, name1, 'directory', './' + name1),
 
1333
                  (name2, name2, 'file', './' + name2),
 
1334
                 ]
 
1335
                ),
 
1336
                ((name1, './' + name1),
 
1337
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1338
                                                        + '/' + name0),
 
1339
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1340
                                                            + '/' + name1),
 
1341
                 ]
 
1342
                ),
 
1343
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1344
                 [
 
1345
                 ]
 
1346
                ),
1306
1347
            ]
1307
1348
        result = list(osutils.walkdirs('.'))
1308
1349
        self._filter_out_stat(result)
1309
1350
        self.assertEqual(expected_dirblocks, result)
1310
 
        result = list(osutils.walkdirs(u'./' + name1, name1))
 
1351
        result = list(osutils.walkdirs(u'./'+name1, name1))
1311
1352
        self._filter_out_stat(result)
1312
1353
        self.assertEqual(expected_dirblocks[1:], result)
1313
1354
 
1333
1374
        name2 = name2.encode('utf8')
1334
1375
 
1335
1376
        expected_dirblocks = [
1336
 
            ((b'', b'.'),
1337
 
             [(name0, name0, 'file', b'./' + name0),
1338
 
              (name1, name1, 'directory', b'./' + name1),
1339
 
              (name2, name2, 'file', b'./' + name2),
1340
 
              ]
1341
 
             ),
1342
 
            ((name1, b'./' + name1),
1343
 
             [(name1 + b'/' + name0, name0, 'file', b'./' + name1
1344
 
               + b'/' + name0),
1345
 
              (name1 + b'/' + name1, name1, 'directory', b'./' + name1
1346
 
               + b'/' + name1),
1347
 
              ]
1348
 
             ),
1349
 
            ((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
1350
 
             [
1351
 
                ]
1352
 
             ),
 
1377
                (('', '.'),
 
1378
                 [(name0, name0, 'file', './' + name0),
 
1379
                  (name1, name1, 'directory', './' + name1),
 
1380
                  (name2, name2, 'file', './' + name2),
 
1381
                 ]
 
1382
                ),
 
1383
                ((name1, './' + name1),
 
1384
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1385
                                                        + '/' + name0),
 
1386
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1387
                                                            + '/' + name1),
 
1388
                 ]
 
1389
                ),
 
1390
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1391
                 [
 
1392
                 ]
 
1393
                ),
1353
1394
            ]
1354
1395
        result = []
1355
1396
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1356
1397
        # all abspaths are Unicode, and encode them back into utf8.
1357
1398
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1358
 
            self.assertIsInstance(dirdetail[0], bytes)
1359
 
            if isinstance(dirdetail[1], str):
 
1399
            self.assertIsInstance(dirdetail[0], str)
 
1400
            if isinstance(dirdetail[1], unicode):
1360
1401
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1361
1402
                dirblock = [list(info) for info in dirblock]
1362
1403
                for info in dirblock:
1363
 
                    self.assertIsInstance(info[4], str)
 
1404
                    self.assertIsInstance(info[4], unicode)
1364
1405
                    info[4] = info[4].encode('utf8')
1365
1406
            new_dirblock = []
1366
1407
            for info in dirblock:
1367
 
                self.assertIsInstance(info[0], bytes)
1368
 
                self.assertIsInstance(info[1], bytes)
1369
 
                self.assertIsInstance(info[4], bytes)
 
1408
                self.assertIsInstance(info[0], str)
 
1409
                self.assertIsInstance(info[1], str)
 
1410
                self.assertIsInstance(info[4], str)
1370
1411
                # Remove the stat information
1371
1412
                new_dirblock.append((info[0], info[1], info[2], info[4]))
1372
1413
            result.append((dirdetail, new_dirblock))
1400
1441
        # All of the abspaths should be in unicode, all of the relative paths
1401
1442
        # should be in utf8
1402
1443
        expected_dirblocks = [
1403
 
            ((b'', '.'),
1404
 
             [(name0, name0, 'file', './' + name0u),
1405
 
              (name1, name1, 'directory', './' + name1u),
1406
 
              (name2, name2, 'file', './' + name2u),
1407
 
              ]
1408
 
             ),
1409
 
            ((name1, './' + name1u),
1410
 
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
1411
 
               + '/' + name0u),
1412
 
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
1413
 
               + '/' + name1u),
1414
 
              ]
1415
 
             ),
1416
 
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1417
 
             [
1418
 
                ]
1419
 
             ),
 
1444
                (('', '.'),
 
1445
                 [(name0, name0, 'file', './' + name0u),
 
1446
                  (name1, name1, 'directory', './' + name1u),
 
1447
                  (name2, name2, 'file', './' + name2u),
 
1448
                 ]
 
1449
                ),
 
1450
                ((name1, './' + name1u),
 
1451
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1452
                                                        + '/' + name0u),
 
1453
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1454
                                                            + '/' + name1u),
 
1455
                 ]
 
1456
                ),
 
1457
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1458
                 [
 
1459
                 ]
 
1460
                ),
1420
1461
            ]
1421
1462
        result = list(osutils._walkdirs_utf8('.'))
1422
1463
        self._filter_out_stat(result)
1446
1487
        # All of the abspaths should be in unicode, all of the relative paths
1447
1488
        # should be in utf8
1448
1489
        expected_dirblocks = [
1449
 
            (('', '.'),
1450
 
             [(name0, name0, 'file', './' + name0u),
1451
 
              (name1, name1, 'directory', './' + name1u),
1452
 
              (name2, name2, 'file', './' + name2u),
1453
 
              ]
1454
 
             ),
1455
 
            ((name1, './' + name1u),
1456
 
             [(name1 + '/' + name0, name0, 'file', './' + name1u
1457
 
               + '/' + name0u),
1458
 
              (name1 + '/' + name1, name1, 'directory', './' + name1u
1459
 
               + '/' + name1u),
1460
 
              ]
1461
 
             ),
1462
 
            ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1463
 
             [
1464
 
                ]
1465
 
             ),
 
1490
                (('', '.'),
 
1491
                 [(name0, name0, 'file', './' + name0u),
 
1492
                  (name1, name1, 'directory', './' + name1u),
 
1493
                  (name2, name2, 'file', './' + name2u),
 
1494
                 ]
 
1495
                ),
 
1496
                ((name1, './' + name1u),
 
1497
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1498
                                                        + '/' + name0u),
 
1499
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1500
                                                            + '/' + name1u),
 
1501
                 ]
 
1502
                ),
 
1503
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1504
                 [
 
1505
                 ]
 
1506
                ),
1466
1507
            ]
1467
1508
        result = list(osutils._walkdirs_utf8(u'.'))
1468
1509
        self._filter_out_stat(result)
1489
1530
        # I hate to sleep() here, but I'm trying to make the ctime different
1490
1531
        # from the mtime
1491
1532
        time.sleep(2)
1492
 
        with open(name0u, 'ab') as f:
1493
 
            f.write(b'just a small update')
 
1533
        f = open(name0u, 'ab')
 
1534
        try:
 
1535
            f.write('just a small update')
 
1536
        finally:
 
1537
            f.close()
1494
1538
 
1495
1539
        result = Win32ReadDir().read_dir('', u'.')
1496
1540
        entry = result[0]
1589
1633
        # using the comparison routine shoudl work too:
1590
1634
        self.assertEqual(
1591
1635
            dir_sorted_paths,
1592
 
            sorted(original_paths, key=osutils.path_prefix_key))
 
1636
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1593
1637
 
1594
1638
 
1595
1639
class TestCopyTree(tests.TestCaseInTempDir):
1618
1662
    def test_copy_tree_handlers(self):
1619
1663
        processed_files = []
1620
1664
        processed_links = []
1621
 
 
1622
1665
        def file_handler(from_path, to_path):
1623
1666
            processed_files.append(('f', from_path, to_path))
1624
 
 
1625
1667
        def dir_handler(from_path, to_path):
1626
1668
            processed_files.append(('d', from_path, to_path))
1627
 
 
1628
1669
        def link_handler(from_path, to_path):
1629
1670
            processed_links.append((from_path, to_path))
1630
1671
        handlers = {'file': file_handler,
1631
1672
                    'directory': dir_handler,
1632
1673
                    'symlink': link_handler,
1633
 
                    }
 
1674
                   }
1634
1675
 
1635
1676
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1636
1677
        if osutils.has_symlinks():
1641
1682
                          ('f', 'source/a', 'target/a'),
1642
1683
                          ('d', 'source/b', 'target/b'),
1643
1684
                          ('f', 'source/b/c', 'target/b/c'),
1644
 
                          ], processed_files)
 
1685
                         ], processed_files)
1645
1686
        self.assertPathDoesNotExist('target')
1646
1687
        if osutils.has_symlinks():
1647
1688
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1656
1697
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
1657
1698
                         'Environment was not cleaned up properly.'
1658
1699
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
1659
 
 
1660
1700
        def cleanup():
1661
1701
            if 'BRZ_TEST_ENV_VAR' in os.environ:
1662
1702
                del os.environ['BRZ_TEST_ENV_VAR']
1686
1726
                'Cannot find a unicode character that works in encoding %s'
1687
1727
                % (osutils.get_user_encoding(),))
1688
1728
 
1689
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
1690
 
        self.assertEqual(uni_val, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1729
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
 
1730
        self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1691
1731
 
1692
1732
    def test_unset(self):
1693
1733
        """Test that passing None will remove the env var"""
1695
1735
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
1696
1736
        self.assertEqual('foo', old)
1697
1737
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
1698
 
        self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
 
1738
        self.assertFalse('BRZ_TEST_ENV_VAR' in os.environ)
1699
1739
 
1700
1740
 
1701
1741
class TestSizeShaFile(tests.TestCaseInTempDir):
1702
1742
 
1703
1743
    def test_sha_empty(self):
1704
 
        self.build_tree_contents([('foo', b'')])
1705
 
        expected_sha = osutils.sha_string(b'')
 
1744
        self.build_tree_contents([('foo', '')])
 
1745
        expected_sha = osutils.sha_string('')
1706
1746
        f = open('foo')
1707
1747
        self.addCleanup(f.close)
1708
1748
        size, sha = osutils.size_sha_file(f)
1710
1750
        self.assertEqual(expected_sha, sha)
1711
1751
 
1712
1752
    def test_sha_mixed_endings(self):
1713
 
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1753
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1714
1754
        self.build_tree_contents([('foo', text)])
1715
1755
        expected_sha = osutils.sha_string(text)
1716
1756
        f = open('foo', 'rb')
1723
1763
class TestShaFileByName(tests.TestCaseInTempDir):
1724
1764
 
1725
1765
    def test_sha_empty(self):
1726
 
        self.build_tree_contents([('foo', b'')])
1727
 
        expected_sha = osutils.sha_string(b'')
 
1766
        self.build_tree_contents([('foo', '')])
 
1767
        expected_sha = osutils.sha_string('')
1728
1768
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1729
1769
 
1730
1770
    def test_sha_mixed_endings(self):
1731
 
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1771
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1732
1772
        self.build_tree_contents([('foo', text)])
1733
1773
        expected_sha = osutils.sha_string(text)
1734
1774
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1745
1785
        self.assertContainsRe(text, "class TextUIFactory")
1746
1786
        # test unsupported package
1747
1787
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1748
 
                          'yyy.xx')
 
1788
            'yyy.xx')
1749
1789
        # test unknown resource
1750
1790
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
1751
1791
 
1772
1812
            '2file'
1773
1813
            ]
1774
1814
        expected_dirblocks = [
1775
 
            ((b'', '.'),
1776
 
             [(b'0file', b'0file', 'file', './0file'),
1777
 
              (b'1dir', b'1dir', 'directory', './1dir'),
1778
 
              (b'2file', b'2file', 'file', './2file'),
1779
 
              ]
1780
 
             ),
1781
 
            ((b'1dir', './1dir'),
1782
 
             [(b'1dir/0file', b'0file', 'file', './1dir/0file'),
1783
 
              (b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
1784
 
              ]
1785
 
             ),
1786
 
            ((b'1dir/1dir', './1dir/1dir'),
1787
 
             [
1788
 
                ]
1789
 
             ),
 
1815
                (('', '.'),
 
1816
                 [('0file', '0file', 'file'),
 
1817
                  ('1dir', '1dir', 'directory'),
 
1818
                  ('2file', '2file', 'file'),
 
1819
                 ]
 
1820
                ),
 
1821
                (('1dir', './1dir'),
 
1822
                 [('1dir/0file', '0file', 'file'),
 
1823
                  ('1dir/1dir', '1dir', 'directory'),
 
1824
                 ]
 
1825
                ),
 
1826
                (('1dir/1dir', './1dir/1dir'),
 
1827
                 [
 
1828
                 ]
 
1829
                ),
1790
1830
            ]
1791
1831
        return tree, expected_dirblocks
1792
1832
 
1796
1836
        result = list(osutils._walkdirs_utf8('.'))
1797
1837
        # Filter out stat and abspath
1798
1838
        self.assertEqual(expected_dirblocks,
1799
 
                         self._filter_out(result))
 
1839
                         [(dirinfo, [line[0:3] for line in block])
 
1840
                          for dirinfo, block in result])
1800
1841
 
1801
1842
    def test_walk_sub_dir(self):
1802
1843
        tree, expected_dirblocks = self._get_ascii_tree()
1803
1844
        self.build_tree(tree)
1804
1845
        # you can search a subdir only, with a supplied prefix.
1805
 
        result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
 
1846
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1806
1847
        # Filter out stat and abspath
1807
1848
        self.assertEqual(expected_dirblocks[1:],
1808
 
                         self._filter_out(result))
 
1849
                         [(dirinfo, [line[0:3] for line in block])
 
1850
                          for dirinfo, block in result])
1809
1851
 
1810
1852
    def _get_unicode_tree(self):
1811
1853
        name0u = u'0file-\xb6'
1822
1864
        name1 = name1u.encode('UTF-8')
1823
1865
        name2 = name2u.encode('UTF-8')
1824
1866
        expected_dirblocks = [
1825
 
            ((b'', '.'),
1826
 
             [(name0, name0, 'file', './' + name0u),
1827
 
              (name1, name1, 'directory', './' + name1u),
1828
 
              (name2, name2, 'file', './' + name2u),
1829
 
              ]
1830
 
             ),
1831
 
            ((name1, './' + name1u),
1832
 
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
1833
 
               + '/' + name0u),
1834
 
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
1835
 
               + '/' + name1u),
1836
 
              ]
1837
 
             ),
1838
 
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1839
 
             [
1840
 
                ]
1841
 
             ),
 
1867
                (('', '.'),
 
1868
                 [(name0, name0, 'file', './' + name0u),
 
1869
                  (name1, name1, 'directory', './' + name1u),
 
1870
                  (name2, name2, 'file', './' + name2u),
 
1871
                 ]
 
1872
                ),
 
1873
                ((name1, './' + name1u),
 
1874
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1875
                                                        + '/' + name0u),
 
1876
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1877
                                                            + '/' + name1u),
 
1878
                 ]
 
1879
                ),
 
1880
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1881
                 [
 
1882
                 ]
 
1883
                ),
1842
1884
            ]
1843
1885
        return tree, expected_dirblocks
1844
1886
 
1852
1894
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1853
1895
            details = []
1854
1896
            for line in block:
1855
 
                details.append(
1856
 
                    line[0:3] + (self._native_to_unicode(line[4]), ))
 
1897
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1857
1898
            filtered_dirblocks.append((dirinfo, details))
1858
1899
        return filtered_dirblocks
1859
1900
 
1870
1911
        target = u'target\N{Euro Sign}'
1871
1912
        link_name = u'l\N{Euro Sign}nk'
1872
1913
        os.symlink(target, link_name)
 
1914
        target_utf8 = target.encode('UTF-8')
1873
1915
        link_name_utf8 = link_name.encode('UTF-8')
1874
1916
        expected_dirblocks = [
1875
 
            ((b'', '.'),
1876
 
             [(link_name_utf8, link_name_utf8,
1877
 
               'symlink', './' + link_name), ],
1878
 
             )]
 
1917
                (('', '.'),
 
1918
                 [(link_name_utf8, link_name_utf8,
 
1919
                   'symlink', './' + link_name),],
 
1920
                 )]
1879
1921
        result = list(osutils._walkdirs_utf8('.'))
1880
1922
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1881
1923
 
1889
1931
    But prior python versions failed to properly encode the passed unicode
1890
1932
    string.
1891
1933
    """
1892
 
    _test_needs_features = [features.SymlinkFeature,
1893
 
                            features.UnicodeFilenameFeature]
 
1934
    _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
1894
1935
 
1895
1936
    def setUp(self):
1896
1937
        super(tests.TestCaseInTempDir, self).setUp()
1899
1940
        os.symlink(self.target, self.link)
1900
1941
 
1901
1942
    def test_os_readlink_link_encoding(self):
1902
 
        self.assertEqual(self.target, os.readlink(self.link))
 
1943
        self.assertEqual(self.target,  os.readlink(self.link))
1903
1944
 
1904
1945
    def test_os_readlink_link_decoding(self):
1905
1946
        self.assertEqual(self.target.encode(osutils._fs_enc),
1906
 
                         os.readlink(self.link.encode(osutils._fs_enc)))
 
1947
                          os.readlink(self.link.encode(osutils._fs_enc)))
1907
1948
 
1908
1949
 
1909
1950
class TestConcurrency(tests.TestCase):
1936
1977
 
1937
1978
    def _try_loading(self):
1938
1979
        try:
1939
 
            import breezy._fictional_extension_py  # noqa: F401
 
1980
            import breezy._fictional_extension_py
1940
1981
        except ImportError as e:
1941
1982
            osutils.failed_to_load_extension(e)
1942
1983
            return True
1948
1989
    def test_failure_to_load(self):
1949
1990
        self._try_loading()
1950
1991
        self.assertLength(1, osutils._extension_load_failures)
1951
 
        self.assertEqual(
1952
 
            osutils._extension_load_failures[0],
1953
 
            "No module named 'breezy._fictional_extension_py'")
 
1992
        self.assertEqual(osutils._extension_load_failures[0],
 
1993
            "No module named _fictional_extension_py")
1954
1994
 
1955
1995
    def test_report_extension_load_failures_no_warning(self):
1956
1996
        self.assertTrue(self._try_loading())
1957
 
        warnings, result = self.callCatchWarnings(
1958
 
            osutils.report_extension_load_failures)
 
1997
        warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1959
1998
        # it used to give a Python warning; it no longer does
1960
1999
        self.assertLength(0, warnings)
1961
2000
 
1966
2005
        osutils.report_extension_load_failures()
1967
2006
        self.assertContainsRe(
1968
2007
            log.getvalue(),
1969
 
            br"brz: warning: some compiled extensions could not be loaded; "
1970
 
            b"see ``brz help missing-extensions``\n"
 
2008
            r"brz: warning: some compiled extensions could not be loaded; "
 
2009
            "see ``brz help missing-extensions``\n"
1971
2010
            )
1972
2011
 
1973
2012
 
2044
2083
        termios = term_ios_feature.module
2045
2084
        # bug 63539 is about a termios without TIOCGWINSZ attribute
2046
2085
        try:
2047
 
            termios.TIOCGWINSZ
 
2086
            orig = termios.TIOCGWINSZ
2048
2087
        except AttributeError:
2049
2088
            # We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
2050
2089
            pass
2073
2112
    def test_copy_ownership_from_path(self):
2074
2113
        """copy_ownership_from_path test with specified src."""
2075
2114
        ownsrc = '/'
2076
 
        open('test_file', 'wt').close()
 
2115
        f = open('test_file', 'wt')
2077
2116
        osutils.copy_ownership_from_path('test_file', ownsrc)
2078
2117
 
2079
2118
        s = os.stat(ownsrc)
2083
2122
 
2084
2123
    def test_copy_ownership_nonesrc(self):
2085
2124
        """copy_ownership_from_path test with src=None."""
2086
 
        open('test_file', 'wt').close()
 
2125
        f = open('test_file', 'wt')
2087
2126
        # should use parent dir for permissions
2088
2127
        osutils.copy_ownership_from_path('test_file')
2089
2128
 
2093
2132
        self.assertEqual(self.gid, s.st_gid)
2094
2133
 
2095
2134
 
 
2135
class TestPathFromEnviron(tests.TestCase):
 
2136
 
 
2137
    def test_is_unicode(self):
 
2138
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
 
2139
        path = osutils.path_from_environ('BRZ_TEST_PATH')
 
2140
        self.assertIsInstance(path, unicode)
 
2141
        self.assertEqual(u'./anywhere at all/', path)
 
2142
 
 
2143
    def test_posix_path_env_ascii(self):
 
2144
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
 
2145
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
 
2146
        self.assertIsInstance(home, unicode)
 
2147
        self.assertEqual(u'/tmp', home)
 
2148
 
 
2149
    def test_posix_path_env_unicode(self):
 
2150
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2151
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
 
2152
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2153
        self.assertEqual(u'/home/\xa7test',
 
2154
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2155
        osutils._fs_enc = "iso8859-5"
 
2156
        self.assertEqual(u'/home/\u0407test',
 
2157
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2158
        osutils._fs_enc = "utf-8"
 
2159
        self.assertRaises(errors.BadFilenameEncoding,
 
2160
            osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
 
2161
 
 
2162
 
2096
2163
class TestGetHomeDir(tests.TestCase):
2097
2164
 
2098
2165
    def test_is_unicode(self):
2099
2166
        home = osutils._get_home_dir()
2100
 
        self.assertIsInstance(home, str)
 
2167
        self.assertIsInstance(home, unicode)
2101
2168
 
2102
2169
    def test_posix_homeless(self):
2103
2170
        self.overrideEnv('HOME', None)
2104
2171
        home = osutils._get_home_dir()
2105
 
        self.assertIsInstance(home, str)
 
2172
        self.assertIsInstance(home, unicode)
2106
2173
 
2107
2174
    def test_posix_home_ascii(self):
2108
2175
        self.overrideEnv('HOME', '/home/test')
2109
2176
        home = osutils._posix_get_home_dir()
2110
 
        self.assertIsInstance(home, str)
 
2177
        self.assertIsInstance(home, unicode)
2111
2178
        self.assertEqual(u'/home/test', home)
2112
2179
 
2113
2180
    def test_posix_home_unicode(self):
2116
2183
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2117
2184
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2118
2185
        osutils._fs_enc = "iso8859-5"
2119
 
        # In python 3, os.environ returns unicode
2120
 
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2186
        self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
 
2187
        osutils._fs_enc = "utf-8"
 
2188
        self.assertRaises(errors.BadFilenameEncoding,
 
2189
            osutils._posix_get_home_dir)
2121
2190
 
2122
2191
 
2123
2192
class TestGetuserUnicode(tests.TestCase):
2124
2193
 
2125
2194
    def test_is_unicode(self):
2126
2195
        user = osutils.getuser_unicode()
2127
 
        self.assertIsInstance(user, str)
 
2196
        self.assertIsInstance(user, unicode)
2128
2197
 
2129
2198
    def envvar_to_override(self):
2130
2199
        if sys.platform == "win32":
2131
2200
            # Disable use of platform calls on windows so envvar is used
2132
2201
            self.overrideAttr(win32utils, 'has_ctypes', False)
2133
 
            return 'USERNAME'  # only variable used on windows
2134
 
        return 'LOGNAME'  # first variable checked by getpass.getuser()
 
2202
            return 'USERNAME' # only variable used on windows
 
2203
        return 'LOGNAME' # first variable checked by getpass.getuser()
2135
2204
 
2136
2205
    def test_ascii_user(self):
2137
2206
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
2146
2215
                % (osutils.get_user_encoding(),))
2147
2216
        uni_username = u'jrandom' + uni_val
2148
2217
        encoded_username = uni_username.encode(ue)
2149
 
        self.overrideEnv(self.envvar_to_override(), uni_username)
 
2218
        self.overrideEnv(self.envvar_to_override(), encoded_username)
2150
2219
        self.assertEqual(uni_username, osutils.getuser_unicode())
2151
2220
 
2152
2221
 
2185
2254
    def test_windows(self):
2186
2255
        if sys.platform != 'win32':
2187
2256
            raise tests.TestSkipped('test requires win32')
2188
 
        self.assertTrue(osutils.find_executable_on_path(
2189
 
            'explorer') is not None)
 
2257
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
2190
2258
        self.assertTrue(
2191
2259
            osutils.find_executable_on_path('explorer.exe') is not None)
2192
2260
        self.assertTrue(
2194
2262
        self.assertTrue(
2195
2263
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2196
2264
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
2197
 
 
 
2265
        
2198
2266
    def test_windows_app_path(self):
2199
2267
        if sys.platform != 'win32':
2200
2268
            raise tests.TestSkipped('test requires win32')
2201
2269
        # Override PATH env var so that exe can only be found on App Path
2202
2270
        self.overrideEnv('PATH', '')
2203
2271
        # Internt Explorer is always registered in the App Path
2204
 
        self.assertTrue(osutils.find_executable_on_path(
2205
 
            'iexplore') is not None)
 
2272
        self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
2206
2273
 
2207
2274
    def test_other(self):
2208
2275
        if sys.platform == 'win32':
2212
2279
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2213
2280
 
2214
2281
 
2215
 
class SupportsExecutableTests(tests.TestCaseInTempDir):
2216
 
 
2217
 
    def test_returns_bool(self):
2218
 
        self.assertIsInstance(osutils.supports_executable(self.test_dir), bool)
2219
 
 
2220
 
 
2221
 
class SupportsSymlinksTests(tests.TestCaseInTempDir):
2222
 
 
2223
 
    def test_returns_bool(self):
2224
 
        self.assertIsInstance(osutils.supports_symlinks(self.test_dir), bool)
2225
 
 
2226
 
 
2227
 
class MtabReader(tests.TestCaseInTempDir):
2228
 
 
2229
 
    def test_read_mtab(self):
2230
 
        self.build_tree_contents([('mtab', """\
2231
 
/dev/mapper/blah--vg-root / ext4 rw,relatime,errors=remount-ro 0 0
2232
 
/dev/mapper/blah--vg-home /home vfat rw,relatime 0 0
2233
 
# comment
2234
 
 
2235
 
iminvalid
2236
 
""")])
2237
 
        self.assertEqual(
2238
 
            list(osutils.read_mtab('mtab')),
2239
 
            [(b'/', 'ext4'),
2240
 
             (b'/home', 'vfat')])
2241
 
 
2242
 
 
2243
 
class GetFsTypeTests(tests.TestCaseInTempDir):
2244
 
 
2245
 
    def test_returns_string_or_none(self):
2246
 
        ret = osutils.get_fs_type(self.test_dir)
2247
 
        self.assertTrue(isinstance(ret, str) or ret is None)
2248
 
 
2249
 
    def test_returns_most_specific(self):
2250
 
        self.overrideAttr(
2251
 
            osutils, '_FILESYSTEM_FINDER',
2252
 
            osutils.FilesystemFinder(
2253
 
                [(b'/', 'ext4'), (b'/home', 'vfat'),
2254
 
                 (b'/home/jelmer', 'ext2')]))
2255
 
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer/blah'), 'ext2')
2256
 
        self.assertEqual(osutils.get_fs_type('/home/jelmer/blah'), 'ext2')
2257
 
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer'), 'ext2')
2258
 
        self.assertEqual(osutils.get_fs_type(b'/home/martin'), 'vfat')
2259
 
        self.assertEqual(osutils.get_fs_type(b'/home'), 'vfat')
2260
 
        self.assertEqual(osutils.get_fs_type(b'/other'), 'ext4')
2261
 
 
2262
 
    def test_returns_none(self):
2263
 
        self.overrideAttr(
2264
 
            osutils, '_FILESYSTEM_FINDER',
2265
 
            osutils.FilesystemFinder([]))
2266
 
        self.assertIs(osutils.get_fs_type('/home/jelmer/blah'), None)
2267
 
        self.assertIs(osutils.get_fs_type(b'/home/jelmer/blah'), None)
2268
 
        self.assertIs(osutils.get_fs_type('/home/jelmer'), None)
 
2282
class TestEnvironmentErrors(tests.TestCase):
 
2283
    """Test handling of environmental errors"""
 
2284
 
 
2285
    def test_is_oserror(self):
 
2286
        self.assertTrue(osutils.is_environment_error(
 
2287
            OSError(errno.EINVAL, "Invalid parameter")))
 
2288
 
 
2289
    def test_is_ioerror(self):
 
2290
        self.assertTrue(osutils.is_environment_error(
 
2291
            IOError(errno.EINVAL, "Invalid parameter")))
 
2292
 
 
2293
    def test_is_socket_error(self):
 
2294
        self.assertTrue(osutils.is_environment_error(
 
2295
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2296
 
 
2297
    def test_is_select_error(self):
 
2298
        self.assertTrue(osutils.is_environment_error(
 
2299
            select.error(errno.EINVAL, "Invalid parameter")))
 
2300
 
 
2301
    def test_is_pywintypes_error(self):
 
2302
        self.requireFeature(features.pywintypes)
 
2303
        import pywintypes
 
2304
        self.assertTrue(osutils.is_environment_error(
 
2305
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))