/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_osutils.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-08 23:30:31 UTC
  • mto: This revision was merged to the branch mainline in revision 6690.
  • Revision ID: jelmer@jelmer.uk-20170608233031-3qavls2o7a1pqllj
Update imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
 
19
from __future__ import absolute_import, division
 
20
 
19
21
import errno
20
 
from io import BytesIO
21
22
import os
 
23
import re
22
24
import select
23
25
import socket
24
26
import sys
27
29
 
28
30
from .. import (
29
31
    errors,
 
32
    lazy_regex,
30
33
    osutils,
 
34
    symbol_versioning,
31
35
    tests,
32
36
    trace,
33
37
    win32utils,
34
38
    )
 
39
from ..sixish import (
 
40
    BytesIO,
 
41
    )
35
42
from . import (
36
43
    features,
37
44
    file_utils,
40
47
from .scenarios import load_tests_apply_scenarios
41
48
 
42
49
 
43
 
class _UTF8DirReaderFeature(features.ModuleAvailableFeature):
 
50
class _UTF8DirReaderFeature(features.Feature):
44
51
 
45
52
    def _probe(self):
46
53
        try:
47
54
            from .. import _readdir_pyx
48
 
            self._module = _readdir_pyx
49
55
            self.reader = _readdir_pyx.UTF8DirReader
50
56
            return True
51
57
        except ImportError:
52
58
            return False
53
59
 
 
60
    def feature_name(self):
 
61
        return 'breezy._readdir_pyx'
54
62
 
55
 
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
 
63
UTF8DirReaderFeature = features.ModuleAvailableFeature('breezy._readdir_pyx')
56
64
 
57
65
term_ios_feature = features.ModuleAvailableFeature('termios')
58
66
 
130
138
 
131
139
    def test_fancy_rename(self):
132
140
        # This should work everywhere
133
 
        self.create_file('a', b'something in a\n')
 
141
        self.create_file('a', 'something in a\n')
134
142
        self._fancy_rename('a', 'b')
135
143
        self.assertPathDoesNotExist('a')
136
144
        self.assertPathExists('b')
137
 
        self.check_file_contents('b', b'something in a\n')
 
145
        self.check_file_contents('b', 'something in a\n')
138
146
 
139
 
        self.create_file('a', b'new something in a\n')
 
147
        self.create_file('a', 'new something in a\n')
140
148
        self._fancy_rename('b', 'a')
141
149
 
142
 
        self.check_file_contents('a', b'something in a\n')
 
150
        self.check_file_contents('a', 'something in a\n')
143
151
 
144
152
    def test_fancy_rename_fails_source_missing(self):
145
153
        # An exception should be raised, and the target should be left in place
146
 
        self.create_file('target', b'data in target\n')
 
154
        self.create_file('target', 'data in target\n')
147
155
        self.assertRaises((IOError, OSError), self._fancy_rename,
148
156
                          'missingsource', 'target')
149
157
        self.assertPathExists('target')
150
 
        self.check_file_contents('target', b'data in target\n')
 
158
        self.check_file_contents('target', 'data in target\n')
151
159
 
152
160
    def test_fancy_rename_fails_if_source_and_target_missing(self):
153
161
        self.assertRaises((IOError, OSError), self._fancy_rename,
155
163
 
156
164
    def test_rename(self):
157
165
        # Rename should be semi-atomic on all platforms
158
 
        self.create_file('a', b'something in a\n')
 
166
        self.create_file('a', 'something in a\n')
159
167
        osutils.rename('a', 'b')
160
168
        self.assertPathDoesNotExist('a')
161
169
        self.assertPathExists('b')
162
 
        self.check_file_contents('b', b'something in a\n')
 
170
        self.check_file_contents('b', 'something in a\n')
163
171
 
164
 
        self.create_file('a', b'new something in a\n')
 
172
        self.create_file('a', 'new something in a\n')
165
173
        osutils.rename('b', 'a')
166
174
 
167
 
        self.check_file_contents('a', b'something in a\n')
 
175
        self.check_file_contents('a', 'something in a\n')
168
176
 
169
177
    # TODO: test fancy_rename using a MemoryTransport
170
178
 
249
257
            # Without it, we may end up re-reading content when we don't have
250
258
            # to, but otherwise it doesn't effect correctness.
251
259
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
252
 
        with open('test-file.txt', 'wb') as f:
253
 
            f.write(b'some content\n')
254
 
            f.flush()
255
 
            self.assertEqualStat(osutils.fstat(f.fileno()),
256
 
                                 osutils.lstat('test-file.txt'))
 
260
        f = open('test-file.txt', 'wb')
 
261
        self.addCleanup(f.close)
 
262
        f.write('some content\n')
 
263
        f.flush()
 
264
        self.assertEqualStat(osutils.fstat(f.fileno()),
 
265
                             osutils.lstat('test-file.txt'))
257
266
 
258
267
 
259
268
class TestRmTree(tests.TestCaseInTempDir):
261
270
    def test_rmtree(self):
262
271
        # Check to remove tree with read-only files/dirs
263
272
        os.mkdir('dir')
264
 
        with open('dir/file', 'w') as f:
265
 
            f.write('spam')
 
273
        f = file('dir/file', 'w')
 
274
        f.write('spam')
 
275
        f.close()
266
276
        # would like to also try making the directory readonly, but at the
267
277
        # moment python shutil.rmtree doesn't handle that properly - it would
268
278
        # need to chmod the directory before removing things inside it - deferred
305
315
            if e.errno not in (errno.ENOENT,):
306
316
                raise
307
317
        else:
308
 
            self.assertEqual(
309
 
                'chardev',
310
 
                osutils.file_kind(os.path.realpath('/dev/null')))
 
318
            self.assertEqual('chardev', osutils.file_kind('/dev/null'))
311
319
 
312
320
        mkfifo = getattr(os, 'mkfifo', None)
313
321
        if mkfifo:
396
404
        self.assertFormatedDelta('2 seconds in the future', -2)
397
405
 
398
406
    def test_format_date(self):
399
 
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
400
 
                          osutils.format_date, 0, timezone='foo')
 
407
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
408
            osutils.format_date, 0, timezone='foo')
401
409
        self.assertIsInstance(osutils.format_date(0), str)
402
 
        self.assertIsInstance(osutils.format_local_date(0), str)
 
410
        self.assertIsInstance(osutils.format_local_date(0), unicode)
403
411
        # Testing for the actual value of the local weekday without
404
412
        # duplicating the code from format_date is difficult.
405
413
        # Instead blackbox.test_locale should check for localized
407
415
 
408
416
    def test_format_date_with_offset_in_original_timezone(self):
409
417
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
410
 
                         osutils.format_date_with_offset_in_original_timezone(0))
 
418
            osutils.format_date_with_offset_in_original_timezone(0))
411
419
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
412
 
                         osutils.format_date_with_offset_in_original_timezone(100000))
 
420
            osutils.format_date_with_offset_in_original_timezone(100000))
413
421
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
414
 
                         osutils.format_date_with_offset_in_original_timezone(100000, 7200))
 
422
            osutils.format_date_with_offset_in_original_timezone(100000, 7200))
415
423
 
416
424
    def test_local_time_offset(self):
417
425
        """Test that local_time_offset() returns a sane value."""
506
514
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
507
515
 
508
516
    def test_changing_access(self):
509
 
        with open('file', 'w') as f:
510
 
            f.write('monkey')
 
517
        f = file('file', 'w')
 
518
        f.write('monkey')
 
519
        f.close()
511
520
 
512
521
        # Make a file readonly
513
522
        osutils.make_readonly('file')
534
543
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
535
544
 
536
545
    def test_canonical_relpath_simple(self):
537
 
        f = open('MixedCaseName', 'w')
 
546
        f = file('MixedCaseName', 'w')
538
547
        f.close()
539
548
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
540
549
        self.assertEqual('work/MixedCaseName', actual)
611
620
 
612
621
        # read (max // 2) bytes and verify read size wasn't affected
613
622
        num_bytes_to_read = self.block_size // 2
614
 
        osutils.pumpfile(from_file, to_file,
615
 
                         num_bytes_to_read, self.block_size)
 
623
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
616
624
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
617
625
        self.assertEqual(from_file.get_read_count(), 1)
618
626
 
619
627
        # read (max) bytes and verify read size wasn't affected
620
628
        num_bytes_to_read = self.block_size
621
629
        from_file.reset_read_count()
622
 
        osutils.pumpfile(from_file, to_file,
623
 
                         num_bytes_to_read, self.block_size)
 
630
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
624
631
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
625
632
        self.assertEqual(from_file.get_read_count(), 1)
626
633
 
627
634
        # read (max + 1) bytes and verify read size was limited
628
635
        num_bytes_to_read = self.block_size + 1
629
636
        from_file.reset_read_count()
630
 
        osutils.pumpfile(from_file, to_file,
631
 
                         num_bytes_to_read, self.block_size)
 
637
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
632
638
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
633
639
        self.assertEqual(from_file.get_read_count(), 2)
634
640
 
635
641
        # finish reading the rest of the data
636
642
        num_bytes_to_read = self.test_data_len - to_file.tell()
637
 
        osutils.pumpfile(from_file, to_file,
638
 
                         num_bytes_to_read, self.block_size)
 
643
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
639
644
 
640
645
        # report error if the data wasn't equal (we only report the size due
641
646
        # to the length of the data)
708
713
 
709
714
    def test_report_activity(self):
710
715
        activity = []
711
 
 
712
716
        def log_activity(length, direction):
713
717
            activity.append((length, direction))
714
718
        from_file = BytesIO(self.test_data)
732
736
        del activity[:]
733
737
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
734
738
                         report_activity=log_activity, direction='read')
735
 
        self.assertEqual(
736
 
            [(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
739
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
740
 
737
741
 
738
742
 
739
743
class TestPumpStringFile(tests.TestCase):
781
785
class TestSafeUnicode(tests.TestCase):
782
786
 
783
787
    def test_from_ascii_string(self):
784
 
        self.assertEqual(u'foobar', osutils.safe_unicode(b'foobar'))
 
788
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
785
789
 
786
790
    def test_from_unicode_string_ascii_contents(self):
787
791
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
790
794
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
791
795
 
792
796
    def test_from_utf8_string(self):
793
 
        self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
 
797
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
794
798
 
795
799
    def test_bad_utf8_string(self):
796
800
        self.assertRaises(errors.BzrBadParameterNotUnicode,
797
801
                          osutils.safe_unicode,
798
 
                          b'\xbb\xbb')
 
802
                          '\xbb\xbb')
799
803
 
800
804
 
801
805
class TestSafeUtf8(tests.TestCase):
802
806
 
803
807
    def test_from_ascii_string(self):
804
 
        f = b'foobar'
805
 
        self.assertEqual(b'foobar', osutils.safe_utf8(f))
 
808
        f = 'foobar'
 
809
        self.assertEqual('foobar', osutils.safe_utf8(f))
806
810
 
807
811
    def test_from_unicode_string_ascii_contents(self):
808
 
        self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
 
812
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
809
813
 
810
814
    def test_from_unicode_string_unicode_contents(self):
811
 
        self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
815
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
812
816
 
813
817
    def test_from_utf8_string(self):
814
 
        self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
 
818
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
815
819
 
816
820
    def test_bad_utf8_string(self):
817
821
        self.assertRaises(errors.BzrBadParameterNotUnicode,
818
 
                          osutils.safe_utf8, b'\xbb\xbb')
 
822
                          osutils.safe_utf8, '\xbb\xbb')
 
823
 
 
824
 
 
825
class TestSafeRevisionId(tests.TestCase):
 
826
 
 
827
    def test_from_ascii_string(self):
 
828
        # this shouldn't give a warning because it's getting an ascii string
 
829
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
830
 
 
831
    def test_from_unicode_string_ascii_contents(self):
 
832
        self.assertRaises(TypeError,
 
833
                          osutils.safe_revision_id, u'bargam')
 
834
 
 
835
    def test_from_unicode_string_unicode_contents(self):
 
836
        self.assertRaises(TypeError,
 
837
                         osutils.safe_revision_id, u'bargam\xae')
 
838
 
 
839
    def test_from_utf8_string(self):
 
840
        self.assertEqual('foo\xc2\xae',
 
841
                         osutils.safe_revision_id('foo\xc2\xae'))
 
842
 
 
843
    def test_none(self):
 
844
        """Currently, None is a valid revision_id"""
 
845
        self.assertEqual(None, osutils.safe_revision_id(None))
 
846
 
 
847
 
 
848
class TestSafeFileId(tests.TestCase):
 
849
 
 
850
    def test_from_ascii_string(self):
 
851
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
852
 
 
853
    def test_from_unicode_string_ascii_contents(self):
 
854
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
 
855
 
 
856
    def test_from_unicode_string_unicode_contents(self):
 
857
        self.assertRaises(TypeError,
 
858
                          osutils.safe_file_id, u'bargam\xae')
 
859
 
 
860
    def test_from_utf8_string(self):
 
861
        self.assertEqual('foo\xc2\xae',
 
862
                         osutils.safe_file_id('foo\xc2\xae'))
 
863
 
 
864
    def test_none(self):
 
865
        """Currently, None is a valid revision_id"""
 
866
        self.assertEqual(None, osutils.safe_file_id(None))
819
867
 
820
868
 
821
869
class TestSendAll(tests.TestCase):
824
872
        class DisconnectedSocket(object):
825
873
            def __init__(self, err):
826
874
                self.err = err
827
 
 
828
875
            def send(self, content):
829
876
                raise self.err
830
 
 
831
877
            def close(self):
832
878
                pass
833
879
        # All of these should be treated as ConnectionReset
838
884
        for err in errs:
839
885
            sock = DisconnectedSocket(err)
840
886
            self.assertRaises(errors.ConnectionReset,
841
 
                              osutils.send_all, sock, b'some more content')
 
887
                osutils.send_all, sock, b'some more content')
842
888
 
843
889
    def test_send_with_no_progress(self):
844
890
        # See https://bugs.launchpad.net/bzr/+bug/1047309
847
893
        class NoSendingSocket(object):
848
894
            def __init__(self):
849
895
                self.call_count = 0
850
 
 
851
896
            def send(self, bytes):
852
897
                self.call_count += 1
853
898
                if self.call_count > 100:
866
911
 
867
912
    def test_normpath(self):
868
913
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
869
 
        self.assertEqual(
870
 
            '/etc/shadow', osutils._posix_normpath('//etc/shadow'))
871
 
        self.assertEqual(
872
 
            '/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
914
        self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
915
        self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
873
916
 
874
917
 
875
918
class TestWin32Funcs(tests.TestCase):
897
940
                         osutils._win32_pathjoin('path/to/', 'foo'))
898
941
 
899
942
    def test_pathjoin_late_bugfix(self):
900
 
        expected = 'C:/foo'
 
943
        if sys.version_info < (2, 7, 6):
 
944
            expected = '/foo'
 
945
        else:
 
946
            expected = 'C:/foo'
901
947
        self.assertEqual(expected,
902
948
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
903
949
        self.assertEqual(expected,
941
987
 
942
988
    def test_minimum_path_selection(self):
943
989
        self.assertEqual(set(),
944
 
                         osutils.minimum_path_selection([]))
 
990
            osutils.minimum_path_selection([]))
945
991
        self.assertEqual({'a'},
946
 
                         osutils.minimum_path_selection(['a']))
 
992
            osutils.minimum_path_selection(['a']))
947
993
        self.assertEqual({'a', 'b'},
948
 
                         osutils.minimum_path_selection(['a', 'b']))
949
 
        self.assertEqual({'a/', 'b'},
950
 
                         osutils.minimum_path_selection(['a/', 'b']))
951
 
        self.assertEqual({'a/', 'b'},
952
 
                         osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
994
            osutils.minimum_path_selection(['a', 'b']))
 
995
        self.assertEqual({'a/', 'b'},
 
996
            osutils.minimum_path_selection(['a/', 'b']))
 
997
        self.assertEqual({'a/', 'b'},
 
998
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
953
999
        self.assertEqual({'a-b', 'a', 'a0b'},
954
 
                         osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
 
1000
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
955
1001
 
956
1002
    def test_mkdtemp(self):
957
1003
        tmpdir = osutils._win32_mkdtemp(dir='.')
958
1004
        self.assertFalse('\\' in tmpdir)
959
1005
 
960
1006
    def test_rename(self):
961
 
        with open('a', 'wb') as a:
962
 
            a.write(b'foo\n')
963
 
        with open('b', 'wb') as b:
964
 
            b.write(b'baz\n')
 
1007
        a = open('a', 'wb')
 
1008
        a.write('foo\n')
 
1009
        a.close()
 
1010
        b = open('b', 'wb')
 
1011
        b.write('baz\n')
 
1012
        b.close()
965
1013
 
966
1014
        osutils._win32_rename('b', 'a')
967
1015
        self.assertPathExists('a')
968
1016
        self.assertPathDoesNotExist('b')
969
 
        self.assertFileEqual(b'baz\n', 'a')
 
1017
        self.assertFileEqual('baz\n', 'a')
970
1018
 
971
1019
    def test_rename_missing_file(self):
972
 
        with open('a', 'wb') as a:
973
 
            a.write(b'foo\n')
 
1020
        a = open('a', 'wb')
 
1021
        a.write('foo\n')
 
1022
        a.close()
974
1023
 
975
1024
        try:
976
1025
            osutils._win32_rename('b', 'a')
977
1026
        except (IOError, OSError) as e:
978
1027
            self.assertEqual(errno.ENOENT, e.errno)
979
 
        self.assertFileEqual(b'foo\n', 'a')
 
1028
        self.assertFileEqual('foo\n', 'a')
980
1029
 
981
1030
    def test_rename_missing_dir(self):
982
1031
        os.mkdir('a')
1005
1054
        check(['a', 'b'], 'a/b')
1006
1055
        check(['a', 'b'], 'a/./b')
1007
1056
        check(['a', '.b'], 'a/.b')
1008
 
        if os.path.sep == '\\':
1009
 
            check(['a', '.b'], 'a\\.b')
1010
 
        else:
1011
 
            check(['a\\.b'], 'a\\.b')
 
1057
        check(['a', '.b'], 'a\\.b')
1012
1058
 
1013
1059
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
1014
1060
 
1042
1088
class TestChunksToLines(tests.TestCase):
1043
1089
 
1044
1090
    def test_smoketest(self):
1045
 
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1046
 
                         osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
1047
 
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1048
 
                         osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
 
1091
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
1092
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
 
1093
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
1094
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
1049
1095
 
1050
1096
    def test_osutils_binding(self):
1051
1097
        from . import test__chunks_to_lines
1065
1111
                         osutils.split_lines(u'foo\nbar\xae\n'))
1066
1112
 
1067
1113
    def test_split_with_carriage_returns(self):
1068
 
        self.assertEqual([b'foo\rbar\n'],
1069
 
                         osutils.split_lines(b'foo\rbar\n'))
 
1114
        self.assertEqual(['foo\rbar\n'],
 
1115
                         osutils.split_lines('foo\rbar\n'))
1070
1116
 
1071
1117
 
1072
1118
class TestWalkDirs(tests.TestCaseInTempDir):
1087
1133
            ]
1088
1134
        self.build_tree(tree)
1089
1135
        expected_dirblocks = [
1090
 
            (('', '.'),
1091
 
             [('0file', '0file', 'file'),
1092
 
              ('1dir', '1dir', 'directory'),
1093
 
              ('2file', '2file', 'file'),
1094
 
              ]
1095
 
             ),
1096
 
            (('1dir', './1dir'),
1097
 
             [('1dir/0file', '0file', 'file'),
1098
 
              ('1dir/1dir', '1dir', 'directory'),
1099
 
              ]
1100
 
             ),
1101
 
            (('1dir/1dir', './1dir/1dir'),
1102
 
             [
1103
 
                ]
1104
 
             ),
 
1136
                (('', '.'),
 
1137
                 [('0file', '0file', 'file'),
 
1138
                  ('1dir', '1dir', 'directory'),
 
1139
                  ('2file', '2file', 'file'),
 
1140
                 ]
 
1141
                ),
 
1142
                (('1dir', './1dir'),
 
1143
                 [('1dir/0file', '0file', 'file'),
 
1144
                  ('1dir/1dir', '1dir', 'directory'),
 
1145
                 ]
 
1146
                ),
 
1147
                (('1dir/1dir', './1dir/1dir'),
 
1148
                 [
 
1149
                 ]
 
1150
                ),
1105
1151
            ]
1106
1152
        result = []
1107
1153
        found_bzrdir = False
1136
1182
        # (It would be ok if it happened earlier but at the moment it
1137
1183
        # doesn't.)
1138
1184
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1139
 
        self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
 
1185
        self.assertEqual('./test-unreadable', e.filename)
1140
1186
        self.assertEqual(errno.EACCES, e.errno)
1141
1187
        # Ensure the message contains the file name
1142
 
        self.assertContainsRe(str(e), "\\./test-unreadable")
 
1188
        self.assertContainsRe(str(e), "\./test-unreadable")
 
1189
 
1143
1190
 
1144
1191
    def test_walkdirs_encoding_error(self):
1145
1192
        # <https://bugs.launchpad.net/bzr/+bug/488519>
1161
1208
        self.build_tree(tree)
1162
1209
 
1163
1210
        # rename the 1file to a latin-1 filename
1164
 
        os.rename(b"./1file", b"\xe8file")
1165
 
        if b"\xe8file" not in os.listdir("."):
 
1211
        os.rename("./1file", "\xe8file")
 
1212
        if "\xe8file" not in os.listdir("."):
1166
1213
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
1167
1214
 
1168
1215
        self._save_platform_info()
1170
1217
 
1171
1218
        # this should raise on error
1172
1219
        def attempt():
1173
 
            for dirdetail, dirblock in osutils.walkdirs(b'.'):
 
1220
            for dirdetail, dirblock in osutils.walkdirs('.'):
1174
1221
                pass
1175
1222
 
1176
1223
        self.assertRaises(errors.BadFilenameEncoding, attempt)
1186
1233
            ]
1187
1234
        self.build_tree(tree)
1188
1235
        expected_dirblocks = [
1189
 
            (('', '.'),
1190
 
             [('0file', '0file', 'file'),
1191
 
              ('1dir', '1dir', 'directory'),
1192
 
              ('2file', '2file', 'file'),
1193
 
              ]
1194
 
             ),
1195
 
            (('1dir', './1dir'),
1196
 
             [('1dir/0file', '0file', 'file'),
1197
 
              ('1dir/1dir', '1dir', 'directory'),
1198
 
              ]
1199
 
             ),
1200
 
            (('1dir/1dir', './1dir/1dir'),
1201
 
             [
1202
 
                ]
1203
 
             ),
 
1236
                (('', '.'),
 
1237
                 [('0file', '0file', 'file'),
 
1238
                  ('1dir', '1dir', 'directory'),
 
1239
                  ('2file', '2file', 'file'),
 
1240
                 ]
 
1241
                ),
 
1242
                (('1dir', './1dir'),
 
1243
                 [('1dir/0file', '0file', 'file'),
 
1244
                  ('1dir/1dir', '1dir', 'directory'),
 
1245
                 ]
 
1246
                ),
 
1247
                (('1dir/1dir', './1dir/1dir'),
 
1248
                 [
 
1249
                 ]
 
1250
                ),
1204
1251
            ]
1205
1252
        result = []
1206
1253
        found_bzrdir = False
1207
 
        for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
1208
 
            if len(dirblock) and dirblock[0][1] == b'.bzr':
 
1254
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1255
            if len(dirblock) and dirblock[0][1] == '.bzr':
1209
1256
                # this tests the filtering of selected paths
1210
1257
                found_bzrdir = True
1211
1258
                del dirblock[0]
1212
 
            dirdetail = (dirdetail[0].decode('utf-8'),
1213
 
                         osutils.safe_unicode(dirdetail[1]))
1214
 
            dirblock = [
1215
 
                (entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
1216
 
                for entry in dirblock]
1217
1259
            result.append((dirdetail, dirblock))
1218
1260
 
1219
1261
        self.assertTrue(found_bzrdir)
1238
1280
        self.overrideAttr(osutils, '_fs_enc')
1239
1281
        self.overrideAttr(osutils, '_selected_dir_reader')
1240
1282
 
1241
 
    def assertDirReaderIs(self, expected, top):
 
1283
    def assertDirReaderIs(self, expected):
1242
1284
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
1243
1285
        # Force it to redetect
1244
1286
        osutils._selected_dir_reader = None
1245
1287
        # Nothing to list, but should still trigger the selection logic
1246
 
        self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
 
1288
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1247
1289
        self.assertIsInstance(osutils._selected_dir_reader, expected)
1248
1290
 
1249
1291
    def test_force_walkdirs_utf8_fs_utf8(self):
1250
1292
        self.requireFeature(UTF8DirReaderFeature)
1251
1293
        self._save_platform_info()
1252
1294
        osutils._fs_enc = 'utf-8'
1253
 
        self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1295
        self.assertDirReaderIs(
 
1296
            UTF8DirReaderFeature.module.UTF8DirReader)
1254
1297
 
1255
1298
    def test_force_walkdirs_utf8_fs_ascii(self):
1256
1299
        self.requireFeature(UTF8DirReaderFeature)
1257
1300
        self._save_platform_info()
1258
1301
        osutils._fs_enc = 'ascii'
1259
1302
        self.assertDirReaderIs(
1260
 
            UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1303
            UTF8DirReaderFeature.module.UTF8DirReader)
1261
1304
 
1262
1305
    def test_force_walkdirs_utf8_fs_latin1(self):
1263
1306
        self._save_platform_info()
1264
1307
        osutils._fs_enc = 'iso-8859-1'
1265
 
        self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
 
1308
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1266
1309
 
1267
1310
    def test_force_walkdirs_utf8_nt(self):
1268
1311
        # Disabled because the thunk of the whole walkdirs api is disabled.
1269
1312
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1270
1313
        self._save_platform_info()
1271
1314
        from .._walkdirs_win32 import Win32ReadDir
1272
 
        self.assertDirReaderIs(Win32ReadDir, ".")
 
1315
        self.assertDirReaderIs(Win32ReadDir)
1273
1316
 
1274
1317
    def test_unicode_walkdirs(self):
1275
1318
        """Walkdirs should always return unicode paths."""
1286
1329
            ]
1287
1330
        self.build_tree(tree)
1288
1331
        expected_dirblocks = [
1289
 
            ((u'', u'.'),
1290
 
             [(name0, name0, 'file', './' + name0),
1291
 
              (name1, name1, 'directory', './' + name1),
1292
 
              (name2, name2, 'file', './' + name2),
1293
 
              ]
1294
 
             ),
1295
 
            ((name1, './' + name1),
1296
 
             [(name1 + '/' + name0, name0, 'file', './' + name1
1297
 
               + '/' + name0),
1298
 
              (name1 + '/' + name1, name1, 'directory', './' + name1
1299
 
               + '/' + name1),
1300
 
              ]
1301
 
             ),
1302
 
            ((name1 + '/' + name1, './' + name1 + '/' + name1),
1303
 
             [
1304
 
                ]
1305
 
             ),
 
1332
                ((u'', u'.'),
 
1333
                 [(name0, name0, 'file', './' + name0),
 
1334
                  (name1, name1, 'directory', './' + name1),
 
1335
                  (name2, name2, 'file', './' + name2),
 
1336
                 ]
 
1337
                ),
 
1338
                ((name1, './' + name1),
 
1339
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1340
                                                        + '/' + name0),
 
1341
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1342
                                                            + '/' + name1),
 
1343
                 ]
 
1344
                ),
 
1345
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1346
                 [
 
1347
                 ]
 
1348
                ),
1306
1349
            ]
1307
1350
        result = list(osutils.walkdirs('.'))
1308
1351
        self._filter_out_stat(result)
1309
1352
        self.assertEqual(expected_dirblocks, result)
1310
 
        result = list(osutils.walkdirs(u'./' + name1, name1))
 
1353
        result = list(osutils.walkdirs(u'./'+name1, name1))
1311
1354
        self._filter_out_stat(result)
1312
1355
        self.assertEqual(expected_dirblocks[1:], result)
1313
1356
 
1333
1376
        name2 = name2.encode('utf8')
1334
1377
 
1335
1378
        expected_dirblocks = [
1336
 
            ((b'', b'.'),
1337
 
             [(name0, name0, 'file', b'./' + name0),
1338
 
              (name1, name1, 'directory', b'./' + name1),
1339
 
              (name2, name2, 'file', b'./' + name2),
1340
 
              ]
1341
 
             ),
1342
 
            ((name1, b'./' + name1),
1343
 
             [(name1 + b'/' + name0, name0, 'file', b'./' + name1
1344
 
               + b'/' + name0),
1345
 
              (name1 + b'/' + name1, name1, 'directory', b'./' + name1
1346
 
               + b'/' + name1),
1347
 
              ]
1348
 
             ),
1349
 
            ((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
1350
 
             [
1351
 
                ]
1352
 
             ),
 
1379
                (('', '.'),
 
1380
                 [(name0, name0, 'file', './' + name0),
 
1381
                  (name1, name1, 'directory', './' + name1),
 
1382
                  (name2, name2, 'file', './' + name2),
 
1383
                 ]
 
1384
                ),
 
1385
                ((name1, './' + name1),
 
1386
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1387
                                                        + '/' + name0),
 
1388
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1389
                                                            + '/' + name1),
 
1390
                 ]
 
1391
                ),
 
1392
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1393
                 [
 
1394
                 ]
 
1395
                ),
1353
1396
            ]
1354
1397
        result = []
1355
1398
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1356
1399
        # all abspaths are Unicode, and encode them back into utf8.
1357
1400
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1358
 
            self.assertIsInstance(dirdetail[0], bytes)
1359
 
            if isinstance(dirdetail[1], str):
 
1401
            self.assertIsInstance(dirdetail[0], str)
 
1402
            if isinstance(dirdetail[1], unicode):
1360
1403
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1361
1404
                dirblock = [list(info) for info in dirblock]
1362
1405
                for info in dirblock:
1363
 
                    self.assertIsInstance(info[4], str)
 
1406
                    self.assertIsInstance(info[4], unicode)
1364
1407
                    info[4] = info[4].encode('utf8')
1365
1408
            new_dirblock = []
1366
1409
            for info in dirblock:
1367
 
                self.assertIsInstance(info[0], bytes)
1368
 
                self.assertIsInstance(info[1], bytes)
1369
 
                self.assertIsInstance(info[4], bytes)
 
1410
                self.assertIsInstance(info[0], str)
 
1411
                self.assertIsInstance(info[1], str)
 
1412
                self.assertIsInstance(info[4], str)
1370
1413
                # Remove the stat information
1371
1414
                new_dirblock.append((info[0], info[1], info[2], info[4]))
1372
1415
            result.append((dirdetail, new_dirblock))
1400
1443
        # All of the abspaths should be in unicode, all of the relative paths
1401
1444
        # should be in utf8
1402
1445
        expected_dirblocks = [
1403
 
            ((b'', '.'),
1404
 
             [(name0, name0, 'file', './' + name0u),
1405
 
              (name1, name1, 'directory', './' + name1u),
1406
 
              (name2, name2, 'file', './' + name2u),
1407
 
              ]
1408
 
             ),
1409
 
            ((name1, './' + name1u),
1410
 
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
1411
 
               + '/' + name0u),
1412
 
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
1413
 
               + '/' + name1u),
1414
 
              ]
1415
 
             ),
1416
 
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1417
 
             [
1418
 
                ]
1419
 
             ),
 
1446
                (('', '.'),
 
1447
                 [(name0, name0, 'file', './' + name0u),
 
1448
                  (name1, name1, 'directory', './' + name1u),
 
1449
                  (name2, name2, 'file', './' + name2u),
 
1450
                 ]
 
1451
                ),
 
1452
                ((name1, './' + name1u),
 
1453
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1454
                                                        + '/' + name0u),
 
1455
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1456
                                                            + '/' + name1u),
 
1457
                 ]
 
1458
                ),
 
1459
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1460
                 [
 
1461
                 ]
 
1462
                ),
1420
1463
            ]
1421
1464
        result = list(osutils._walkdirs_utf8('.'))
1422
1465
        self._filter_out_stat(result)
1446
1489
        # All of the abspaths should be in unicode, all of the relative paths
1447
1490
        # should be in utf8
1448
1491
        expected_dirblocks = [
1449
 
            (('', '.'),
1450
 
             [(name0, name0, 'file', './' + name0u),
1451
 
              (name1, name1, 'directory', './' + name1u),
1452
 
              (name2, name2, 'file', './' + name2u),
1453
 
              ]
1454
 
             ),
1455
 
            ((name1, './' + name1u),
1456
 
             [(name1 + '/' + name0, name0, 'file', './' + name1u
1457
 
               + '/' + name0u),
1458
 
              (name1 + '/' + name1, name1, 'directory', './' + name1u
1459
 
               + '/' + name1u),
1460
 
              ]
1461
 
             ),
1462
 
            ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1463
 
             [
1464
 
                ]
1465
 
             ),
 
1492
                (('', '.'),
 
1493
                 [(name0, name0, 'file', './' + name0u),
 
1494
                  (name1, name1, 'directory', './' + name1u),
 
1495
                  (name2, name2, 'file', './' + name2u),
 
1496
                 ]
 
1497
                ),
 
1498
                ((name1, './' + name1u),
 
1499
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1500
                                                        + '/' + name0u),
 
1501
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1502
                                                            + '/' + name1u),
 
1503
                 ]
 
1504
                ),
 
1505
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1506
                 [
 
1507
                 ]
 
1508
                ),
1466
1509
            ]
1467
1510
        result = list(osutils._walkdirs_utf8(u'.'))
1468
1511
        self._filter_out_stat(result)
1489
1532
        # I hate to sleep() here, but I'm trying to make the ctime different
1490
1533
        # from the mtime
1491
1534
        time.sleep(2)
1492
 
        with open(name0u, 'ab') as f:
1493
 
            f.write(b'just a small update')
 
1535
        f = open(name0u, 'ab')
 
1536
        try:
 
1537
            f.write('just a small update')
 
1538
        finally:
 
1539
            f.close()
1494
1540
 
1495
1541
        result = Win32ReadDir().read_dir('', u'.')
1496
1542
        entry = result[0]
1589
1635
        # using the comparison routine shoudl work too:
1590
1636
        self.assertEqual(
1591
1637
            dir_sorted_paths,
1592
 
            sorted(original_paths, key=osutils.path_prefix_key))
 
1638
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1593
1639
 
1594
1640
 
1595
1641
class TestCopyTree(tests.TestCaseInTempDir):
1618
1664
    def test_copy_tree_handlers(self):
1619
1665
        processed_files = []
1620
1666
        processed_links = []
1621
 
 
1622
1667
        def file_handler(from_path, to_path):
1623
1668
            processed_files.append(('f', from_path, to_path))
1624
 
 
1625
1669
        def dir_handler(from_path, to_path):
1626
1670
            processed_files.append(('d', from_path, to_path))
1627
 
 
1628
1671
        def link_handler(from_path, to_path):
1629
1672
            processed_links.append((from_path, to_path))
1630
 
        handlers = {'file': file_handler,
1631
 
                    'directory': dir_handler,
1632
 
                    'symlink': link_handler,
1633
 
                    }
 
1673
        handlers = {'file':file_handler,
 
1674
                    'directory':dir_handler,
 
1675
                    'symlink':link_handler,
 
1676
                   }
1634
1677
 
1635
1678
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1636
1679
        if osutils.has_symlinks():
1641
1684
                          ('f', 'source/a', 'target/a'),
1642
1685
                          ('d', 'source/b', 'target/b'),
1643
1686
                          ('f', 'source/b/c', 'target/b/c'),
1644
 
                          ], processed_files)
 
1687
                         ], processed_files)
1645
1688
        self.assertPathDoesNotExist('target')
1646
1689
        if osutils.has_symlinks():
1647
1690
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1656
1699
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
1657
1700
                         'Environment was not cleaned up properly.'
1658
1701
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
1659
 
 
1660
1702
        def cleanup():
1661
1703
            if 'BRZ_TEST_ENV_VAR' in os.environ:
1662
1704
                del os.environ['BRZ_TEST_ENV_VAR']
1686
1728
                'Cannot find a unicode character that works in encoding %s'
1687
1729
                % (osutils.get_user_encoding(),))
1688
1730
 
1689
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
1690
 
        self.assertEqual(uni_val, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1731
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
 
1732
        self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1691
1733
 
1692
1734
    def test_unset(self):
1693
1735
        """Test that passing None will remove the env var"""
1695
1737
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
1696
1738
        self.assertEqual('foo', old)
1697
1739
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
1698
 
        self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
 
1740
        self.assertFalse('BRZ_TEST_ENV_VAR' in os.environ)
1699
1741
 
1700
1742
 
1701
1743
class TestSizeShaFile(tests.TestCaseInTempDir):
1702
1744
 
1703
1745
    def test_sha_empty(self):
1704
 
        self.build_tree_contents([('foo', b'')])
1705
 
        expected_sha = osutils.sha_string(b'')
 
1746
        self.build_tree_contents([('foo', '')])
 
1747
        expected_sha = osutils.sha_string('')
1706
1748
        f = open('foo')
1707
1749
        self.addCleanup(f.close)
1708
1750
        size, sha = osutils.size_sha_file(f)
1710
1752
        self.assertEqual(expected_sha, sha)
1711
1753
 
1712
1754
    def test_sha_mixed_endings(self):
1713
 
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1755
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1714
1756
        self.build_tree_contents([('foo', text)])
1715
1757
        expected_sha = osutils.sha_string(text)
1716
1758
        f = open('foo', 'rb')
1723
1765
class TestShaFileByName(tests.TestCaseInTempDir):
1724
1766
 
1725
1767
    def test_sha_empty(self):
1726
 
        self.build_tree_contents([('foo', b'')])
1727
 
        expected_sha = osutils.sha_string(b'')
 
1768
        self.build_tree_contents([('foo', '')])
 
1769
        expected_sha = osutils.sha_string('')
1728
1770
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1729
1771
 
1730
1772
    def test_sha_mixed_endings(self):
1731
 
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1773
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1732
1774
        self.build_tree_contents([('foo', text)])
1733
1775
        expected_sha = osutils.sha_string(text)
1734
1776
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1745
1787
        self.assertContainsRe(text, "class TextUIFactory")
1746
1788
        # test unsupported package
1747
1789
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1748
 
                          'yyy.xx')
 
1790
            'yyy.xx')
1749
1791
        # test unknown resource
1750
1792
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
1751
1793
 
1772
1814
            '2file'
1773
1815
            ]
1774
1816
        expected_dirblocks = [
1775
 
            ((b'', '.'),
1776
 
             [(b'0file', b'0file', 'file', './0file'),
1777
 
              (b'1dir', b'1dir', 'directory', './1dir'),
1778
 
              (b'2file', b'2file', 'file', './2file'),
1779
 
              ]
1780
 
             ),
1781
 
            ((b'1dir', './1dir'),
1782
 
             [(b'1dir/0file', b'0file', 'file', './1dir/0file'),
1783
 
              (b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
1784
 
              ]
1785
 
             ),
1786
 
            ((b'1dir/1dir', './1dir/1dir'),
1787
 
             [
1788
 
                ]
1789
 
             ),
 
1817
                (('', '.'),
 
1818
                 [('0file', '0file', 'file'),
 
1819
                  ('1dir', '1dir', 'directory'),
 
1820
                  ('2file', '2file', 'file'),
 
1821
                 ]
 
1822
                ),
 
1823
                (('1dir', './1dir'),
 
1824
                 [('1dir/0file', '0file', 'file'),
 
1825
                  ('1dir/1dir', '1dir', 'directory'),
 
1826
                 ]
 
1827
                ),
 
1828
                (('1dir/1dir', './1dir/1dir'),
 
1829
                 [
 
1830
                 ]
 
1831
                ),
1790
1832
            ]
1791
1833
        return tree, expected_dirblocks
1792
1834
 
1796
1838
        result = list(osutils._walkdirs_utf8('.'))
1797
1839
        # Filter out stat and abspath
1798
1840
        self.assertEqual(expected_dirblocks,
1799
 
                         self._filter_out(result))
 
1841
                         [(dirinfo, [line[0:3] for line in block])
 
1842
                          for dirinfo, block in result])
1800
1843
 
1801
1844
    def test_walk_sub_dir(self):
1802
1845
        tree, expected_dirblocks = self._get_ascii_tree()
1803
1846
        self.build_tree(tree)
1804
1847
        # you can search a subdir only, with a supplied prefix.
1805
 
        result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
 
1848
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1806
1849
        # Filter out stat and abspath
1807
1850
        self.assertEqual(expected_dirblocks[1:],
1808
 
                         self._filter_out(result))
 
1851
                         [(dirinfo, [line[0:3] for line in block])
 
1852
                          for dirinfo, block in result])
1809
1853
 
1810
1854
    def _get_unicode_tree(self):
1811
1855
        name0u = u'0file-\xb6'
1822
1866
        name1 = name1u.encode('UTF-8')
1823
1867
        name2 = name2u.encode('UTF-8')
1824
1868
        expected_dirblocks = [
1825
 
            ((b'', '.'),
1826
 
             [(name0, name0, 'file', './' + name0u),
1827
 
              (name1, name1, 'directory', './' + name1u),
1828
 
              (name2, name2, 'file', './' + name2u),
1829
 
              ]
1830
 
             ),
1831
 
            ((name1, './' + name1u),
1832
 
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
1833
 
               + '/' + name0u),
1834
 
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
1835
 
               + '/' + name1u),
1836
 
              ]
1837
 
             ),
1838
 
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1839
 
             [
1840
 
                ]
1841
 
             ),
 
1869
                (('', '.'),
 
1870
                 [(name0, name0, 'file', './' + name0u),
 
1871
                  (name1, name1, 'directory', './' + name1u),
 
1872
                  (name2, name2, 'file', './' + name2u),
 
1873
                 ]
 
1874
                ),
 
1875
                ((name1, './' + name1u),
 
1876
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1877
                                                        + '/' + name0u),
 
1878
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1879
                                                            + '/' + name1u),
 
1880
                 ]
 
1881
                ),
 
1882
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1883
                 [
 
1884
                 ]
 
1885
                ),
1842
1886
            ]
1843
1887
        return tree, expected_dirblocks
1844
1888
 
1852
1896
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1853
1897
            details = []
1854
1898
            for line in block:
1855
 
                details.append(
1856
 
                    line[0:3] + (self._native_to_unicode(line[4]), ))
 
1899
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1857
1900
            filtered_dirblocks.append((dirinfo, details))
1858
1901
        return filtered_dirblocks
1859
1902
 
1870
1913
        target = u'target\N{Euro Sign}'
1871
1914
        link_name = u'l\N{Euro Sign}nk'
1872
1915
        os.symlink(target, link_name)
 
1916
        target_utf8 = target.encode('UTF-8')
1873
1917
        link_name_utf8 = link_name.encode('UTF-8')
1874
1918
        expected_dirblocks = [
1875
 
            ((b'', '.'),
1876
 
             [(link_name_utf8, link_name_utf8,
1877
 
               'symlink', './' + link_name), ],
1878
 
             )]
 
1919
                (('', '.'),
 
1920
                 [(link_name_utf8, link_name_utf8,
 
1921
                   'symlink', './' + link_name),],
 
1922
                 )]
1879
1923
        result = list(osutils._walkdirs_utf8('.'))
1880
1924
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1881
1925
 
1889
1933
    But prior python versions failed to properly encode the passed unicode
1890
1934
    string.
1891
1935
    """
1892
 
    _test_needs_features = [features.SymlinkFeature,
1893
 
                            features.UnicodeFilenameFeature]
 
1936
    _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
1894
1937
 
1895
1938
    def setUp(self):
1896
1939
        super(tests.TestCaseInTempDir, self).setUp()
1899
1942
        os.symlink(self.target, self.link)
1900
1943
 
1901
1944
    def test_os_readlink_link_encoding(self):
1902
 
        self.assertEqual(self.target, os.readlink(self.link))
 
1945
        self.assertEqual(self.target,  os.readlink(self.link))
1903
1946
 
1904
1947
    def test_os_readlink_link_decoding(self):
1905
1948
        self.assertEqual(self.target.encode(osutils._fs_enc),
1906
 
                         os.readlink(self.link.encode(osutils._fs_enc)))
 
1949
                          os.readlink(self.link.encode(osutils._fs_enc)))
1907
1950
 
1908
1951
 
1909
1952
class TestConcurrency(tests.TestCase):
1936
1979
 
1937
1980
    def _try_loading(self):
1938
1981
        try:
1939
 
            import breezy._fictional_extension_py  # noqa: F401
 
1982
            import breezy._fictional_extension_py
1940
1983
        except ImportError as e:
1941
1984
            osutils.failed_to_load_extension(e)
1942
1985
            return True
1948
1991
    def test_failure_to_load(self):
1949
1992
        self._try_loading()
1950
1993
        self.assertLength(1, osutils._extension_load_failures)
1951
 
        self.assertEqual(
1952
 
            osutils._extension_load_failures[0],
1953
 
            "No module named 'breezy._fictional_extension_py'")
 
1994
        self.assertEqual(osutils._extension_load_failures[0],
 
1995
            "No module named _fictional_extension_py")
1954
1996
 
1955
1997
    def test_report_extension_load_failures_no_warning(self):
1956
1998
        self.assertTrue(self._try_loading())
1957
 
        warnings, result = self.callCatchWarnings(
1958
 
            osutils.report_extension_load_failures)
 
1999
        warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1959
2000
        # it used to give a Python warning; it no longer does
1960
2001
        self.assertLength(0, warnings)
1961
2002
 
1966
2007
        osutils.report_extension_load_failures()
1967
2008
        self.assertContainsRe(
1968
2009
            log.getvalue(),
1969
 
            br"brz: warning: some compiled extensions could not be loaded; "
1970
 
            b"see ``brz help missing-extensions``\n"
 
2010
            r"brz: warning: some compiled extensions could not be loaded; "
 
2011
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
1971
2012
            )
1972
2013
 
1973
2014
 
2044
2085
        termios = term_ios_feature.module
2045
2086
        # bug 63539 is about a termios without TIOCGWINSZ attribute
2046
2087
        try:
2047
 
            termios.TIOCGWINSZ
 
2088
            orig = termios.TIOCGWINSZ
2048
2089
        except AttributeError:
2049
2090
            # We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
2050
2091
            pass
2073
2114
    def test_copy_ownership_from_path(self):
2074
2115
        """copy_ownership_from_path test with specified src."""
2075
2116
        ownsrc = '/'
2076
 
        open('test_file', 'wt').close()
 
2117
        f = open('test_file', 'wt')
2077
2118
        osutils.copy_ownership_from_path('test_file', ownsrc)
2078
2119
 
2079
2120
        s = os.stat(ownsrc)
2083
2124
 
2084
2125
    def test_copy_ownership_nonesrc(self):
2085
2126
        """copy_ownership_from_path test with src=None."""
2086
 
        open('test_file', 'wt').close()
 
2127
        f = open('test_file', 'wt')
2087
2128
        # should use parent dir for permissions
2088
2129
        osutils.copy_ownership_from_path('test_file')
2089
2130
 
2093
2134
        self.assertEqual(self.gid, s.st_gid)
2094
2135
 
2095
2136
 
 
2137
class TestPathFromEnviron(tests.TestCase):
 
2138
 
 
2139
    def test_is_unicode(self):
 
2140
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
 
2141
        path = osutils.path_from_environ('BRZ_TEST_PATH')
 
2142
        self.assertIsInstance(path, unicode)
 
2143
        self.assertEqual(u'./anywhere at all/', path)
 
2144
 
 
2145
    def test_posix_path_env_ascii(self):
 
2146
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
 
2147
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
 
2148
        self.assertIsInstance(home, unicode)
 
2149
        self.assertEqual(u'/tmp', home)
 
2150
 
 
2151
    def test_posix_path_env_unicode(self):
 
2152
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2153
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
 
2154
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2155
        self.assertEqual(u'/home/\xa7test',
 
2156
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2157
        osutils._fs_enc = "iso8859-5"
 
2158
        self.assertEqual(u'/home/\u0407test',
 
2159
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2160
        osutils._fs_enc = "utf-8"
 
2161
        self.assertRaises(errors.BadFilenameEncoding,
 
2162
            osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
 
2163
 
 
2164
 
2096
2165
class TestGetHomeDir(tests.TestCase):
2097
2166
 
2098
2167
    def test_is_unicode(self):
2099
2168
        home = osutils._get_home_dir()
2100
 
        self.assertIsInstance(home, str)
 
2169
        self.assertIsInstance(home, unicode)
2101
2170
 
2102
2171
    def test_posix_homeless(self):
2103
2172
        self.overrideEnv('HOME', None)
2104
2173
        home = osutils._get_home_dir()
2105
 
        self.assertIsInstance(home, str)
 
2174
        self.assertIsInstance(home, unicode)
2106
2175
 
2107
2176
    def test_posix_home_ascii(self):
2108
2177
        self.overrideEnv('HOME', '/home/test')
2109
2178
        home = osutils._posix_get_home_dir()
2110
 
        self.assertIsInstance(home, str)
 
2179
        self.assertIsInstance(home, unicode)
2111
2180
        self.assertEqual(u'/home/test', home)
2112
2181
 
2113
2182
    def test_posix_home_unicode(self):
2116
2185
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2117
2186
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2118
2187
        osutils._fs_enc = "iso8859-5"
2119
 
        # In python 3, os.environ returns unicode
2120
 
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2188
        self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
 
2189
        osutils._fs_enc = "utf-8"
 
2190
        self.assertRaises(errors.BadFilenameEncoding,
 
2191
            osutils._posix_get_home_dir)
2121
2192
 
2122
2193
 
2123
2194
class TestGetuserUnicode(tests.TestCase):
2124
2195
 
2125
2196
    def test_is_unicode(self):
2126
2197
        user = osutils.getuser_unicode()
2127
 
        self.assertIsInstance(user, str)
 
2198
        self.assertIsInstance(user, unicode)
2128
2199
 
2129
2200
    def envvar_to_override(self):
2130
2201
        if sys.platform == "win32":
2131
2202
            # Disable use of platform calls on windows so envvar is used
2132
2203
            self.overrideAttr(win32utils, 'has_ctypes', False)
2133
 
            return 'USERNAME'  # only variable used on windows
2134
 
        return 'LOGNAME'  # first variable checked by getpass.getuser()
 
2204
            return 'USERNAME' # only variable used on windows
 
2205
        return 'LOGNAME' # first variable checked by getpass.getuser()
2135
2206
 
2136
2207
    def test_ascii_user(self):
2137
2208
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
2146
2217
                % (osutils.get_user_encoding(),))
2147
2218
        uni_username = u'jrandom' + uni_val
2148
2219
        encoded_username = uni_username.encode(ue)
2149
 
        self.overrideEnv(self.envvar_to_override(), uni_username)
 
2220
        self.overrideEnv(self.envvar_to_override(), encoded_username)
2150
2221
        self.assertEqual(uni_username, osutils.getuser_unicode())
2151
2222
 
2152
2223
 
2185
2256
    def test_windows(self):
2186
2257
        if sys.platform != 'win32':
2187
2258
            raise tests.TestSkipped('test requires win32')
2188
 
        self.assertTrue(osutils.find_executable_on_path(
2189
 
            'explorer') is not None)
 
2259
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
2190
2260
        self.assertTrue(
2191
2261
            osutils.find_executable_on_path('explorer.exe') is not None)
2192
2262
        self.assertTrue(
2194
2264
        self.assertTrue(
2195
2265
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2196
2266
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
2197
 
 
 
2267
        
2198
2268
    def test_windows_app_path(self):
2199
2269
        if sys.platform != 'win32':
2200
2270
            raise tests.TestSkipped('test requires win32')
2201
2271
        # Override PATH env var so that exe can only be found on App Path
2202
2272
        self.overrideEnv('PATH', '')
2203
2273
        # Internt Explorer is always registered in the App Path
2204
 
        self.assertTrue(osutils.find_executable_on_path(
2205
 
            'iexplore') is not None)
 
2274
        self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
2206
2275
 
2207
2276
    def test_other(self):
2208
2277
        if sys.platform == 'win32':
2212
2281
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2213
2282
 
2214
2283
 
2215
 
class SupportsExecutableTests(tests.TestCaseInTempDir):
2216
 
 
2217
 
    def test_returns_bool(self):
2218
 
        self.assertIsInstance(osutils.supports_executable(self.test_dir), bool)
2219
 
 
2220
 
 
2221
 
class SupportsSymlinksTests(tests.TestCaseInTempDir):
2222
 
 
2223
 
    def test_returns_bool(self):
2224
 
        self.assertIsInstance(osutils.supports_symlinks(self.test_dir), bool)
2225
 
 
2226
 
 
2227
 
class MtabReader(tests.TestCaseInTempDir):
2228
 
 
2229
 
    def test_read_mtab(self):
2230
 
        self.build_tree_contents([('mtab', """\
2231
 
/dev/mapper/blah--vg-root / ext4 rw,relatime,errors=remount-ro 0 0
2232
 
/dev/mapper/blah--vg-home /home vfat rw,relatime 0 0
2233
 
# comment
2234
 
 
2235
 
iminvalid
2236
 
""")])
2237
 
        self.assertEqual(
2238
 
            list(osutils.read_mtab('mtab')),
2239
 
            [(b'/', 'ext4'),
2240
 
             (b'/home', 'vfat')])
2241
 
 
2242
 
 
2243
 
class GetFsTypeTests(tests.TestCaseInTempDir):
2244
 
 
2245
 
    def test_returns_string_or_none(self):
2246
 
        ret = osutils.get_fs_type(self.test_dir)
2247
 
        self.assertTrue(isinstance(ret, str) or ret is None)
2248
 
 
2249
 
    def test_returns_most_specific(self):
2250
 
        self.overrideAttr(
2251
 
            osutils, '_FILESYSTEM_FINDER',
2252
 
            osutils.FilesystemFinder(
2253
 
                [(b'/', 'ext4'), (b'/home', 'vfat'),
2254
 
                 (b'/home/jelmer', 'ext2')]))
2255
 
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer/blah'), 'ext2')
2256
 
        self.assertEqual(osutils.get_fs_type('/home/jelmer/blah'), 'ext2')
2257
 
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer'), 'ext2')
2258
 
        self.assertEqual(osutils.get_fs_type(b'/home/martin'), 'vfat')
2259
 
        self.assertEqual(osutils.get_fs_type(b'/home'), 'vfat')
2260
 
        self.assertEqual(osutils.get_fs_type(b'/other'), 'ext4')
2261
 
 
2262
 
    def test_returns_none(self):
2263
 
        self.overrideAttr(
2264
 
            osutils, '_FILESYSTEM_FINDER',
2265
 
            osutils.FilesystemFinder([]))
2266
 
        self.assertIs(osutils.get_fs_type('/home/jelmer/blah'), None)
2267
 
        self.assertIs(osutils.get_fs_type(b'/home/jelmer/blah'), None)
2268
 
        self.assertIs(osutils.get_fs_type('/home/jelmer'), None)
 
2284
class TestEnvironmentErrors(tests.TestCase):
 
2285
    """Test handling of environmental errors"""
 
2286
 
 
2287
    def test_is_oserror(self):
 
2288
        self.assertTrue(osutils.is_environment_error(
 
2289
            OSError(errno.EINVAL, "Invalid parameter")))
 
2290
 
 
2291
    def test_is_ioerror(self):
 
2292
        self.assertTrue(osutils.is_environment_error(
 
2293
            IOError(errno.EINVAL, "Invalid parameter")))
 
2294
 
 
2295
    def test_is_socket_error(self):
 
2296
        self.assertTrue(osutils.is_environment_error(
 
2297
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2298
 
 
2299
    def test_is_select_error(self):
 
2300
        self.assertTrue(osutils.is_environment_error(
 
2301
            select.error(errno.EINVAL, "Invalid parameter")))
 
2302
 
 
2303
    def test_is_pywintypes_error(self):
 
2304
        self.requireFeature(features.pywintypes)
 
2305
        import pywintypes
 
2306
        self.assertTrue(osutils.is_environment_error(
 
2307
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))