/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to brzlib/tests/test_osutils.py

  • Committer: Jelmer Vernooij
  • Date: 2017-05-21 12:41:27 UTC
  • mto: This revision was merged to the branch mainline in revision 6623.
  • Revision ID: jelmer@jelmer.uk-20170521124127-iv8etg0vwymyai6y
s/bzr/brz/ in apport config.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
 
19
from cStringIO import StringIO
19
20
import errno
20
 
from io import BytesIO
21
21
import os
 
22
import re
22
23
import select
23
24
import socket
24
25
import sys
25
26
import tempfile
26
27
import time
27
28
 
28
 
from .. import (
 
29
from brzlib import (
29
30
    errors,
 
31
    lazy_regex,
30
32
    osutils,
 
33
    symbol_versioning,
31
34
    tests,
32
35
    trace,
33
36
    win32utils,
34
37
    )
35
 
from . import (
 
38
from brzlib.tests import (
36
39
    features,
37
40
    file_utils,
38
41
    test__walkdirs_win32,
39
42
    )
40
 
from .scenarios import load_tests_apply_scenarios
41
 
 
42
 
 
43
 
class _UTF8DirReaderFeature(features.ModuleAvailableFeature):
 
43
from brzlib.tests.scenarios import load_tests_apply_scenarios
 
44
 
 
45
 
 
46
class _UTF8DirReaderFeature(features.Feature):
44
47
 
45
48
    def _probe(self):
46
49
        try:
47
 
            from .. import _readdir_pyx
48
 
            self._module = _readdir_pyx
 
50
            from brzlib import _readdir_pyx
49
51
            self.reader = _readdir_pyx.UTF8DirReader
50
52
            return True
51
53
        except ImportError:
52
54
            return False
53
55
 
 
56
    def feature_name(self):
 
57
        return 'brzlib._readdir_pyx'
54
58
 
55
 
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
 
59
UTF8DirReaderFeature = features.ModuleAvailableFeature('brzlib._readdir_pyx')
56
60
 
57
61
term_ios_feature = features.ModuleAvailableFeature('termios')
58
62
 
78
82
    # Some DirReaders are platform specific and even there they may not be
79
83
    # available.
80
84
    if UTF8DirReaderFeature.available():
81
 
        from .. import _readdir_pyx
 
85
        from brzlib import _readdir_pyx
82
86
        scenarios.append(('utf8',
83
87
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
84
88
                               _native_to_unicode=_utf8_to_unicode)))
85
89
 
86
90
    if test__walkdirs_win32.win32_readdir_feature.available():
87
91
        try:
88
 
            from .. import _walkdirs_win32
 
92
            from brzlib import _walkdirs_win32
89
93
            scenarios.append(
90
94
                ('win32',
91
95
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
130
134
 
131
135
    def test_fancy_rename(self):
132
136
        # This should work everywhere
133
 
        self.create_file('a', b'something in a\n')
 
137
        self.create_file('a', 'something in a\n')
134
138
        self._fancy_rename('a', 'b')
135
139
        self.assertPathDoesNotExist('a')
136
140
        self.assertPathExists('b')
137
 
        self.check_file_contents('b', b'something in a\n')
 
141
        self.check_file_contents('b', 'something in a\n')
138
142
 
139
 
        self.create_file('a', b'new something in a\n')
 
143
        self.create_file('a', 'new something in a\n')
140
144
        self._fancy_rename('b', 'a')
141
145
 
142
 
        self.check_file_contents('a', b'something in a\n')
 
146
        self.check_file_contents('a', 'something in a\n')
143
147
 
144
148
    def test_fancy_rename_fails_source_missing(self):
145
149
        # An exception should be raised, and the target should be left in place
146
 
        self.create_file('target', b'data in target\n')
 
150
        self.create_file('target', 'data in target\n')
147
151
        self.assertRaises((IOError, OSError), self._fancy_rename,
148
152
                          'missingsource', 'target')
149
153
        self.assertPathExists('target')
150
 
        self.check_file_contents('target', b'data in target\n')
 
154
        self.check_file_contents('target', 'data in target\n')
151
155
 
152
156
    def test_fancy_rename_fails_if_source_and_target_missing(self):
153
157
        self.assertRaises((IOError, OSError), self._fancy_rename,
155
159
 
156
160
    def test_rename(self):
157
161
        # Rename should be semi-atomic on all platforms
158
 
        self.create_file('a', b'something in a\n')
 
162
        self.create_file('a', 'something in a\n')
159
163
        osutils.rename('a', 'b')
160
164
        self.assertPathDoesNotExist('a')
161
165
        self.assertPathExists('b')
162
 
        self.check_file_contents('b', b'something in a\n')
 
166
        self.check_file_contents('b', 'something in a\n')
163
167
 
164
 
        self.create_file('a', b'new something in a\n')
 
168
        self.create_file('a', 'new something in a\n')
165
169
        osutils.rename('b', 'a')
166
170
 
167
 
        self.check_file_contents('a', b'something in a\n')
 
171
        self.check_file_contents('a', 'something in a\n')
168
172
 
169
173
    # TODO: test fancy_rename using a MemoryTransport
170
174
 
181
185
    def test_rename_exception(self):
182
186
        try:
183
187
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
184
 
        except OSError as e:
 
188
        except OSError, e:
185
189
            self.assertEqual(e.old_filename, 'nonexistent_path')
186
190
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
187
191
            self.assertTrue('nonexistent_path' in e.strerror)
249
253
            # Without it, we may end up re-reading content when we don't have
250
254
            # to, but otherwise it doesn't effect correctness.
251
255
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
252
 
        with open('test-file.txt', 'wb') as f:
253
 
            f.write(b'some content\n')
254
 
            f.flush()
255
 
            self.assertEqualStat(osutils.fstat(f.fileno()),
256
 
                                 osutils.lstat('test-file.txt'))
 
256
        f = open('test-file.txt', 'wb')
 
257
        self.addCleanup(f.close)
 
258
        f.write('some content\n')
 
259
        f.flush()
 
260
        self.assertEqualStat(osutils.fstat(f.fileno()),
 
261
                             osutils.lstat('test-file.txt'))
257
262
 
258
263
 
259
264
class TestRmTree(tests.TestCaseInTempDir):
261
266
    def test_rmtree(self):
262
267
        # Check to remove tree with read-only files/dirs
263
268
        os.mkdir('dir')
264
 
        with open('dir/file', 'w') as f:
265
 
            f.write('spam')
 
269
        f = file('dir/file', 'w')
 
270
        f.write('spam')
 
271
        f.close()
266
272
        # would like to also try making the directory readonly, but at the
267
273
        # moment python shutil.rmtree doesn't handle that properly - it would
268
274
        # need to chmod the directory before removing things inside it - deferred
301
307
        # TODO: jam 20060529 Test a block device
302
308
        try:
303
309
            os.lstat('/dev/null')
304
 
        except OSError as e:
 
310
        except OSError, e:
305
311
            if e.errno not in (errno.ENOENT,):
306
312
                raise
307
313
        else:
308
 
            self.assertEqual(
309
 
                'chardev',
310
 
                osutils.file_kind(os.path.realpath('/dev/null')))
 
314
            self.assertEqual('chardev', osutils.file_kind('/dev/null'))
311
315
 
312
316
        mkfifo = getattr(os, 'mkfifo', None)
313
317
        if mkfifo:
347
351
 
348
352
        orig_umask = osutils.get_umask()
349
353
        self.addCleanup(os.umask, orig_umask)
350
 
        os.umask(0o222)
351
 
        self.assertEqual(0o222, osutils.get_umask())
352
 
        os.umask(0o022)
353
 
        self.assertEqual(0o022, osutils.get_umask())
354
 
        os.umask(0o002)
355
 
        self.assertEqual(0o002, osutils.get_umask())
356
 
        os.umask(0o027)
357
 
        self.assertEqual(0o027, osutils.get_umask())
 
354
        os.umask(0222)
 
355
        self.assertEqual(0222, osutils.get_umask())
 
356
        os.umask(0022)
 
357
        self.assertEqual(0022, osutils.get_umask())
 
358
        os.umask(0002)
 
359
        self.assertEqual(0002, osutils.get_umask())
 
360
        os.umask(0027)
 
361
        self.assertEqual(0027, osutils.get_umask())
358
362
 
359
363
 
360
364
class TestDateTime(tests.TestCase):
396
400
        self.assertFormatedDelta('2 seconds in the future', -2)
397
401
 
398
402
    def test_format_date(self):
399
 
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
400
 
                          osutils.format_date, 0, timezone='foo')
 
403
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
404
            osutils.format_date, 0, timezone='foo')
401
405
        self.assertIsInstance(osutils.format_date(0), str)
402
 
        self.assertIsInstance(osutils.format_local_date(0), str)
 
406
        self.assertIsInstance(osutils.format_local_date(0), unicode)
403
407
        # Testing for the actual value of the local weekday without
404
408
        # duplicating the code from format_date is difficult.
405
409
        # Instead blackbox.test_locale should check for localized
407
411
 
408
412
    def test_format_date_with_offset_in_original_timezone(self):
409
413
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
410
 
                         osutils.format_date_with_offset_in_original_timezone(0))
 
414
            osutils.format_date_with_offset_in_original_timezone(0))
411
415
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
412
 
                         osutils.format_date_with_offset_in_original_timezone(100000))
 
416
            osutils.format_date_with_offset_in_original_timezone(100000))
413
417
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
414
 
                         osutils.format_date_with_offset_in_original_timezone(100000, 7200))
 
418
            osutils.format_date_with_offset_in_original_timezone(100000, 7200))
415
419
 
416
420
    def test_local_time_offset(self):
417
421
        """Test that local_time_offset() returns a sane value."""
506
510
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
507
511
 
508
512
    def test_changing_access(self):
509
 
        with open('file', 'w') as f:
510
 
            f.write('monkey')
 
513
        f = file('file', 'w')
 
514
        f.write('monkey')
 
515
        f.close()
511
516
 
512
517
        # Make a file readonly
513
518
        osutils.make_readonly('file')
514
519
        mode = os.lstat('file').st_mode
515
 
        self.assertEqual(mode, mode & 0o777555)
 
520
        self.assertEqual(mode, mode & 0777555)
516
521
 
517
522
        # Make a file writable
518
523
        osutils.make_writable('file')
519
524
        mode = os.lstat('file').st_mode
520
 
        self.assertEqual(mode, mode | 0o200)
 
525
        self.assertEqual(mode, mode | 0200)
521
526
 
522
527
        if osutils.has_symlinks():
523
528
            # should not error when handed a symlink
534
539
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
535
540
 
536
541
    def test_canonical_relpath_simple(self):
537
 
        f = open('MixedCaseName', 'w')
 
542
        f = file('MixedCaseName', 'w')
538
543
        f.close()
539
544
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
540
545
        self.assertEqual('work/MixedCaseName', actual)
596
601
        super(TestPumpFile, self).setUp()
597
602
        # create a test datablock
598
603
        self.block_size = 512
599
 
        pattern = b'0123456789ABCDEF'
600
 
        self.test_data = pattern * (3 * self.block_size // len(pattern))
 
604
        pattern = '0123456789ABCDEF'
 
605
        self.test_data = pattern * (3 * self.block_size / len(pattern))
601
606
        self.test_data_len = len(self.test_data)
602
607
 
603
608
    def test_bracket_block_size(self):
607
612
        self.assertTrue(self.test_data_len > self.block_size)
608
613
 
609
614
        from_file = file_utils.FakeReadFile(self.test_data)
610
 
        to_file = BytesIO()
 
615
        to_file = StringIO()
611
616
 
612
 
        # read (max // 2) bytes and verify read size wasn't affected
613
 
        num_bytes_to_read = self.block_size // 2
614
 
        osutils.pumpfile(from_file, to_file,
615
 
                         num_bytes_to_read, self.block_size)
 
617
        # read (max / 2) bytes and verify read size wasn't affected
 
618
        num_bytes_to_read = self.block_size / 2
 
619
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
616
620
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
617
621
        self.assertEqual(from_file.get_read_count(), 1)
618
622
 
619
623
        # read (max) bytes and verify read size wasn't affected
620
624
        num_bytes_to_read = self.block_size
621
625
        from_file.reset_read_count()
622
 
        osutils.pumpfile(from_file, to_file,
623
 
                         num_bytes_to_read, self.block_size)
 
626
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
624
627
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
625
628
        self.assertEqual(from_file.get_read_count(), 1)
626
629
 
627
630
        # read (max + 1) bytes and verify read size was limited
628
631
        num_bytes_to_read = self.block_size + 1
629
632
        from_file.reset_read_count()
630
 
        osutils.pumpfile(from_file, to_file,
631
 
                         num_bytes_to_read, self.block_size)
 
633
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
632
634
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
633
635
        self.assertEqual(from_file.get_read_count(), 2)
634
636
 
635
637
        # finish reading the rest of the data
636
638
        num_bytes_to_read = self.test_data_len - to_file.tell()
637
 
        osutils.pumpfile(from_file, to_file,
638
 
                         num_bytes_to_read, self.block_size)
 
639
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
639
640
 
640
641
        # report error if the data wasn't equal (we only report the size due
641
642
        # to the length of the data)
652
653
 
653
654
        # retrieve data in blocks
654
655
        from_file = file_utils.FakeReadFile(self.test_data)
655
 
        to_file = BytesIO()
 
656
        to_file = StringIO()
656
657
        osutils.pumpfile(from_file, to_file, self.test_data_len,
657
658
                         self.block_size)
658
659
 
676
677
 
677
678
        # retrieve data to EOF
678
679
        from_file = file_utils.FakeReadFile(self.test_data)
679
 
        to_file = BytesIO()
 
680
        to_file = StringIO()
680
681
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
681
682
 
682
683
        # verify read size was equal to the maximum read size
696
697
        with this new version."""
697
698
        # retrieve data using default (old) pumpfile method
698
699
        from_file = file_utils.FakeReadFile(self.test_data)
699
 
        to_file = BytesIO()
 
700
        to_file = StringIO()
700
701
        osutils.pumpfile(from_file, to_file)
701
702
 
702
703
        # report error if the data wasn't equal (we only report the size due
708
709
 
709
710
    def test_report_activity(self):
710
711
        activity = []
711
 
 
712
712
        def log_activity(length, direction):
713
713
            activity.append((length, direction))
714
 
        from_file = BytesIO(self.test_data)
715
 
        to_file = BytesIO()
 
714
        from_file = StringIO(self.test_data)
 
715
        to_file = StringIO()
716
716
        osutils.pumpfile(from_file, to_file, buff_size=500,
717
717
                         report_activity=log_activity, direction='read')
718
718
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
719
719
                          (36, 'read')], activity)
720
720
 
721
 
        from_file = BytesIO(self.test_data)
722
 
        to_file = BytesIO()
 
721
        from_file = StringIO(self.test_data)
 
722
        to_file = StringIO()
723
723
        del activity[:]
724
724
        osutils.pumpfile(from_file, to_file, buff_size=500,
725
725
                         report_activity=log_activity, direction='write')
727
727
                          (36, 'write')], activity)
728
728
 
729
729
        # And with a limited amount of data
730
 
        from_file = BytesIO(self.test_data)
731
 
        to_file = BytesIO()
 
730
        from_file = StringIO(self.test_data)
 
731
        to_file = StringIO()
732
732
        del activity[:]
733
733
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
734
734
                         report_activity=log_activity, direction='read')
735
 
        self.assertEqual(
736
 
            [(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
735
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
736
 
737
737
 
738
738
 
739
739
class TestPumpStringFile(tests.TestCase):
740
740
 
741
741
    def test_empty(self):
742
 
        output = BytesIO()
743
 
        osutils.pump_string_file(b"", output)
744
 
        self.assertEqual(b"", output.getvalue())
 
742
        output = StringIO()
 
743
        osutils.pump_string_file("", output)
 
744
        self.assertEqual("", output.getvalue())
745
745
 
746
746
    def test_more_than_segment_size(self):
747
 
        output = BytesIO()
748
 
        osutils.pump_string_file(b"123456789", output, 2)
749
 
        self.assertEqual(b"123456789", output.getvalue())
 
747
        output = StringIO()
 
748
        osutils.pump_string_file("123456789", output, 2)
 
749
        self.assertEqual("123456789", output.getvalue())
750
750
 
751
751
    def test_segment_size(self):
752
 
        output = BytesIO()
753
 
        osutils.pump_string_file(b"12", output, 2)
754
 
        self.assertEqual(b"12", output.getvalue())
 
752
        output = StringIO()
 
753
        osutils.pump_string_file("12", output, 2)
 
754
        self.assertEqual("12", output.getvalue())
755
755
 
756
756
    def test_segment_size_multiple(self):
757
 
        output = BytesIO()
758
 
        osutils.pump_string_file(b"1234", output, 2)
759
 
        self.assertEqual(b"1234", output.getvalue())
 
757
        output = StringIO()
 
758
        osutils.pump_string_file("1234", output, 2)
 
759
        self.assertEqual("1234", output.getvalue())
760
760
 
761
761
 
762
762
class TestRelpath(tests.TestCase):
781
781
class TestSafeUnicode(tests.TestCase):
782
782
 
783
783
    def test_from_ascii_string(self):
784
 
        self.assertEqual(u'foobar', osutils.safe_unicode(b'foobar'))
 
784
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
785
785
 
786
786
    def test_from_unicode_string_ascii_contents(self):
787
787
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
790
790
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
791
791
 
792
792
    def test_from_utf8_string(self):
793
 
        self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
 
793
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
794
794
 
795
795
    def test_bad_utf8_string(self):
796
796
        self.assertRaises(errors.BzrBadParameterNotUnicode,
797
797
                          osutils.safe_unicode,
798
 
                          b'\xbb\xbb')
 
798
                          '\xbb\xbb')
799
799
 
800
800
 
801
801
class TestSafeUtf8(tests.TestCase):
802
802
 
803
803
    def test_from_ascii_string(self):
804
 
        f = b'foobar'
805
 
        self.assertEqual(b'foobar', osutils.safe_utf8(f))
 
804
        f = 'foobar'
 
805
        self.assertEqual('foobar', osutils.safe_utf8(f))
806
806
 
807
807
    def test_from_unicode_string_ascii_contents(self):
808
 
        self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
 
808
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
809
809
 
810
810
    def test_from_unicode_string_unicode_contents(self):
811
 
        self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
811
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
812
812
 
813
813
    def test_from_utf8_string(self):
814
 
        self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
 
814
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
815
815
 
816
816
    def test_bad_utf8_string(self):
817
817
        self.assertRaises(errors.BzrBadParameterNotUnicode,
818
 
                          osutils.safe_utf8, b'\xbb\xbb')
 
818
                          osutils.safe_utf8, '\xbb\xbb')
819
819
 
820
820
 
821
821
class TestSafeRevisionId(tests.TestCase):
822
822
 
823
823
    def test_from_ascii_string(self):
824
824
        # this shouldn't give a warning because it's getting an ascii string
825
 
        self.assertEqual(b'foobar', osutils.safe_revision_id(b'foobar'))
 
825
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
826
826
 
827
827
    def test_from_unicode_string_ascii_contents(self):
828
 
        self.assertRaises(TypeError,
829
 
                          osutils.safe_revision_id, u'bargam')
 
828
        self.assertEqual('bargam',
 
829
                         osutils.safe_revision_id(u'bargam', warn=False))
 
830
 
 
831
    def test_from_unicode_deprecated(self):
 
832
        self.assertEqual('bargam',
 
833
            self.callDeprecated([osutils._revision_id_warning],
 
834
                                osutils.safe_revision_id, u'bargam'))
830
835
 
831
836
    def test_from_unicode_string_unicode_contents(self):
832
 
        self.assertRaises(TypeError,
833
 
                          osutils.safe_revision_id, u'bargam\xae')
 
837
        self.assertEqual('bargam\xc2\xae',
 
838
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
834
839
 
835
840
    def test_from_utf8_string(self):
836
 
        self.assertEqual(b'foo\xc2\xae',
837
 
                         osutils.safe_revision_id(b'foo\xc2\xae'))
 
841
        self.assertEqual('foo\xc2\xae',
 
842
                         osutils.safe_revision_id('foo\xc2\xae'))
838
843
 
839
844
    def test_none(self):
840
845
        """Currently, None is a valid revision_id"""
844
849
class TestSafeFileId(tests.TestCase):
845
850
 
846
851
    def test_from_ascii_string(self):
847
 
        self.assertEqual(b'foobar', osutils.safe_file_id(b'foobar'))
 
852
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
848
853
 
849
854
    def test_from_unicode_string_ascii_contents(self):
850
 
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
 
855
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
 
856
 
 
857
    def test_from_unicode_deprecated(self):
 
858
        self.assertEqual('bargam',
 
859
            self.callDeprecated([osutils._file_id_warning],
 
860
                                osutils.safe_file_id, u'bargam'))
851
861
 
852
862
    def test_from_unicode_string_unicode_contents(self):
853
 
        self.assertRaises(TypeError,
854
 
                          osutils.safe_file_id, u'bargam\xae')
 
863
        self.assertEqual('bargam\xc2\xae',
 
864
                         osutils.safe_file_id(u'bargam\xae', warn=False))
855
865
 
856
866
    def test_from_utf8_string(self):
857
 
        self.assertEqual(b'foo\xc2\xae',
858
 
                         osutils.safe_file_id(b'foo\xc2\xae'))
 
867
        self.assertEqual('foo\xc2\xae',
 
868
                         osutils.safe_file_id('foo\xc2\xae'))
859
869
 
860
870
    def test_none(self):
861
871
        """Currently, None is a valid revision_id"""
868
878
        class DisconnectedSocket(object):
869
879
            def __init__(self, err):
870
880
                self.err = err
871
 
 
872
881
            def send(self, content):
873
882
                raise self.err
874
 
 
875
883
            def close(self):
876
884
                pass
877
885
        # All of these should be treated as ConnectionReset
882
890
        for err in errs:
883
891
            sock = DisconnectedSocket(err)
884
892
            self.assertRaises(errors.ConnectionReset,
885
 
                              osutils.send_all, sock, b'some more content')
 
893
                osutils.send_all, sock, 'some more content')
886
894
 
887
895
    def test_send_with_no_progress(self):
888
896
        # See https://bugs.launchpad.net/bzr/+bug/1047309
891
899
        class NoSendingSocket(object):
892
900
            def __init__(self):
893
901
                self.call_count = 0
894
 
 
895
902
            def send(self, bytes):
896
903
                self.call_count += 1
897
904
                if self.call_count > 100:
900
907
                return 0
901
908
        sock = NoSendingSocket()
902
909
        self.assertRaises(errors.ConnectionReset,
903
 
                          osutils.send_all, sock, b'content')
 
910
                          osutils.send_all, sock, 'content')
904
911
        self.assertEqual(1, sock.call_count)
905
912
 
906
913
 
910
917
 
911
918
    def test_normpath(self):
912
919
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
913
 
        self.assertEqual(
914
 
            '/etc/shadow', osutils._posix_normpath('//etc/shadow'))
915
 
        self.assertEqual(
916
 
            '/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
920
        self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
921
        self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
917
922
 
918
923
 
919
924
class TestWin32Funcs(tests.TestCase):
941
946
                         osutils._win32_pathjoin('path/to/', 'foo'))
942
947
 
943
948
    def test_pathjoin_late_bugfix(self):
944
 
        expected = 'C:/foo'
 
949
        if sys.version_info < (2, 7, 6):
 
950
            expected = '/foo'
 
951
        else:
 
952
            expected = 'C:/foo'
945
953
        self.assertEqual(expected,
946
954
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
947
955
        self.assertEqual(expected,
955
963
 
956
964
    def test_getcwd(self):
957
965
        cwd = osutils._win32_getcwd()
958
 
        os_cwd = osutils._getcwd()
 
966
        os_cwd = os.getcwdu()
959
967
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
960
968
        # win32 is inconsistent whether it returns lower or upper case
961
969
        # and even if it was consistent the user might type the other
969
977
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
970
978
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
971
979
 
 
980
    def test_win98_abspath(self):
 
981
        self.requireFeature(features.win32_feature)
 
982
        # absolute path
 
983
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
 
984
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
 
985
        # UNC path
 
986
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
 
987
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
 
988
        # relative path
 
989
        cwd = osutils.getcwd().rstrip('/')
 
990
        drive = osutils.ntpath.splitdrive(cwd)[0]
 
991
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
 
992
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
 
993
        # unicode path
 
994
        u = u'\u1234'
 
995
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
 
996
 
972
997
 
973
998
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
974
999
    """Test win32 functions that create files."""
985
1010
 
986
1011
    def test_minimum_path_selection(self):
987
1012
        self.assertEqual(set(),
988
 
                         osutils.minimum_path_selection([]))
989
 
        self.assertEqual({'a'},
990
 
                         osutils.minimum_path_selection(['a']))
991
 
        self.assertEqual({'a', 'b'},
992
 
                         osutils.minimum_path_selection(['a', 'b']))
993
 
        self.assertEqual({'a/', 'b'},
994
 
                         osutils.minimum_path_selection(['a/', 'b']))
995
 
        self.assertEqual({'a/', 'b'},
996
 
                         osutils.minimum_path_selection(['a/c', 'a/', 'b']))
997
 
        self.assertEqual({'a-b', 'a', 'a0b'},
998
 
                         osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
 
1013
            osutils.minimum_path_selection([]))
 
1014
        self.assertEqual(set(['a']),
 
1015
            osutils.minimum_path_selection(['a']))
 
1016
        self.assertEqual(set(['a', 'b']),
 
1017
            osutils.minimum_path_selection(['a', 'b']))
 
1018
        self.assertEqual(set(['a/', 'b']),
 
1019
            osutils.minimum_path_selection(['a/', 'b']))
 
1020
        self.assertEqual(set(['a/', 'b']),
 
1021
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
1022
        self.assertEqual(set(['a-b', 'a', 'a0b']),
 
1023
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
999
1024
 
1000
1025
    def test_mkdtemp(self):
1001
1026
        tmpdir = osutils._win32_mkdtemp(dir='.')
1002
1027
        self.assertFalse('\\' in tmpdir)
1003
1028
 
1004
1029
    def test_rename(self):
1005
 
        with open('a', 'wb') as a:
1006
 
            a.write(b'foo\n')
1007
 
        with open('b', 'wb') as b:
1008
 
            b.write(b'baz\n')
 
1030
        a = open('a', 'wb')
 
1031
        a.write('foo\n')
 
1032
        a.close()
 
1033
        b = open('b', 'wb')
 
1034
        b.write('baz\n')
 
1035
        b.close()
1009
1036
 
1010
1037
        osutils._win32_rename('b', 'a')
1011
1038
        self.assertPathExists('a')
1012
1039
        self.assertPathDoesNotExist('b')
1013
 
        self.assertFileEqual(b'baz\n', 'a')
 
1040
        self.assertFileEqual('baz\n', 'a')
1014
1041
 
1015
1042
    def test_rename_missing_file(self):
1016
 
        with open('a', 'wb') as a:
1017
 
            a.write(b'foo\n')
 
1043
        a = open('a', 'wb')
 
1044
        a.write('foo\n')
 
1045
        a.close()
1018
1046
 
1019
1047
        try:
1020
1048
            osutils._win32_rename('b', 'a')
1021
 
        except (IOError, OSError) as e:
 
1049
        except (IOError, OSError), e:
1022
1050
            self.assertEqual(errno.ENOENT, e.errno)
1023
 
        self.assertFileEqual(b'foo\n', 'a')
 
1051
        self.assertFileEqual('foo\n', 'a')
1024
1052
 
1025
1053
    def test_rename_missing_dir(self):
1026
1054
        os.mkdir('a')
1027
1055
        try:
1028
1056
            osutils._win32_rename('b', 'a')
1029
 
        except (IOError, OSError) as e:
 
1057
        except (IOError, OSError), e:
1030
1058
            self.assertEqual(errno.ENOENT, e.errno)
1031
1059
 
1032
1060
    def test_rename_current_dir(self):
1038
1066
        # doesn't exist.
1039
1067
        try:
1040
1068
            osutils._win32_rename('b', '.')
1041
 
        except (IOError, OSError) as e:
 
1069
        except (IOError, OSError), e:
1042
1070
            self.assertEqual(errno.ENOENT, e.errno)
1043
1071
 
1044
1072
    def test_splitpath(self):
1049
1077
        check(['a', 'b'], 'a/b')
1050
1078
        check(['a', 'b'], 'a/./b')
1051
1079
        check(['a', '.b'], 'a/.b')
1052
 
        if os.path.sep == '\\':
1053
 
            check(['a', '.b'], 'a\\.b')
1054
 
        else:
1055
 
            check(['a\\.b'], 'a\\.b')
 
1080
        check(['a', '.b'], 'a\\.b')
1056
1081
 
1057
1082
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
1058
1083
 
1086
1111
class TestChunksToLines(tests.TestCase):
1087
1112
 
1088
1113
    def test_smoketest(self):
1089
 
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1090
 
                         osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
1091
 
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1092
 
                         osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
 
1114
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
1115
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
 
1116
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
1117
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
1093
1118
 
1094
1119
    def test_osutils_binding(self):
1095
 
        from . import test__chunks_to_lines
 
1120
        from brzlib.tests import test__chunks_to_lines
1096
1121
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
1097
 
            from .._chunks_to_lines_pyx import chunks_to_lines
 
1122
            from brzlib._chunks_to_lines_pyx import chunks_to_lines
1098
1123
        else:
1099
 
            from .._chunks_to_lines_py import chunks_to_lines
 
1124
            from brzlib._chunks_to_lines_py import chunks_to_lines
1100
1125
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1101
1126
 
1102
1127
 
1109
1134
                         osutils.split_lines(u'foo\nbar\xae\n'))
1110
1135
 
1111
1136
    def test_split_with_carriage_returns(self):
1112
 
        self.assertEqual([b'foo\rbar\n'],
1113
 
                         osutils.split_lines(b'foo\rbar\n'))
 
1137
        self.assertEqual(['foo\rbar\n'],
 
1138
                         osutils.split_lines('foo\rbar\n'))
1114
1139
 
1115
1140
 
1116
1141
class TestWalkDirs(tests.TestCaseInTempDir):
1131
1156
            ]
1132
1157
        self.build_tree(tree)
1133
1158
        expected_dirblocks = [
1134
 
            (('', '.'),
1135
 
             [('0file', '0file', 'file'),
1136
 
              ('1dir', '1dir', 'directory'),
1137
 
              ('2file', '2file', 'file'),
1138
 
              ]
1139
 
             ),
1140
 
            (('1dir', './1dir'),
1141
 
             [('1dir/0file', '0file', 'file'),
1142
 
              ('1dir/1dir', '1dir', 'directory'),
1143
 
              ]
1144
 
             ),
1145
 
            (('1dir/1dir', './1dir/1dir'),
1146
 
             [
1147
 
                ]
1148
 
             ),
 
1159
                (('', '.'),
 
1160
                 [('0file', '0file', 'file'),
 
1161
                  ('1dir', '1dir', 'directory'),
 
1162
                  ('2file', '2file', 'file'),
 
1163
                 ]
 
1164
                ),
 
1165
                (('1dir', './1dir'),
 
1166
                 [('1dir/0file', '0file', 'file'),
 
1167
                  ('1dir/1dir', '1dir', 'directory'),
 
1168
                 ]
 
1169
                ),
 
1170
                (('1dir/1dir', './1dir/1dir'),
 
1171
                 [
 
1172
                 ]
 
1173
                ),
1149
1174
            ]
1150
1175
        result = []
1151
1176
        found_bzrdir = False
1175
1200
        os.mkdir("test-unreadable")
1176
1201
        os.chmod("test-unreadable", 0000)
1177
1202
        # must chmod it back so that it can be removed
1178
 
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
 
1203
        self.addCleanup(os.chmod, "test-unreadable", 0700)
1179
1204
        # The error is not raised until the generator is actually evaluated.
1180
1205
        # (It would be ok if it happened earlier but at the moment it
1181
1206
        # doesn't.)
1182
1207
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1183
 
        self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
 
1208
        self.assertEqual('./test-unreadable', e.filename)
1184
1209
        self.assertEqual(errno.EACCES, e.errno)
1185
1210
        # Ensure the message contains the file name
1186
 
        self.assertContainsRe(str(e), "\\./test-unreadable")
 
1211
        self.assertContainsRe(str(e), "\./test-unreadable")
 
1212
 
1187
1213
 
1188
1214
    def test_walkdirs_encoding_error(self):
1189
1215
        # <https://bugs.launchpad.net/bzr/+bug/488519>
1205
1231
        self.build_tree(tree)
1206
1232
 
1207
1233
        # rename the 1file to a latin-1 filename
1208
 
        os.rename(b"./1file", b"\xe8file")
1209
 
        if b"\xe8file" not in os.listdir("."):
1210
 
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
 
1234
        os.rename("./1file", "\xe8file")
 
1235
        if "\xe8file" not in os.listdir("."):
 
1236
            self.skip("Lack filesystem that preserves arbitrary bytes")
1211
1237
 
1212
1238
        self._save_platform_info()
 
1239
        win32utils.winver = None # Avoid the win32 detection code
1213
1240
        osutils._fs_enc = 'UTF-8'
1214
1241
 
1215
1242
        # this should raise on error
1216
1243
        def attempt():
1217
 
            for dirdetail, dirblock in osutils.walkdirs(b'.'):
 
1244
            for dirdetail, dirblock in osutils.walkdirs('.'):
1218
1245
                pass
1219
1246
 
1220
1247
        self.assertRaises(errors.BadFilenameEncoding, attempt)
1230
1257
            ]
1231
1258
        self.build_tree(tree)
1232
1259
        expected_dirblocks = [
1233
 
            (('', '.'),
1234
 
             [('0file', '0file', 'file'),
1235
 
              ('1dir', '1dir', 'directory'),
1236
 
              ('2file', '2file', 'file'),
1237
 
              ]
1238
 
             ),
1239
 
            (('1dir', './1dir'),
1240
 
             [('1dir/0file', '0file', 'file'),
1241
 
              ('1dir/1dir', '1dir', 'directory'),
1242
 
              ]
1243
 
             ),
1244
 
            (('1dir/1dir', './1dir/1dir'),
1245
 
             [
1246
 
                ]
1247
 
             ),
 
1260
                (('', '.'),
 
1261
                 [('0file', '0file', 'file'),
 
1262
                  ('1dir', '1dir', 'directory'),
 
1263
                  ('2file', '2file', 'file'),
 
1264
                 ]
 
1265
                ),
 
1266
                (('1dir', './1dir'),
 
1267
                 [('1dir/0file', '0file', 'file'),
 
1268
                  ('1dir/1dir', '1dir', 'directory'),
 
1269
                 ]
 
1270
                ),
 
1271
                (('1dir/1dir', './1dir/1dir'),
 
1272
                 [
 
1273
                 ]
 
1274
                ),
1248
1275
            ]
1249
1276
        result = []
1250
1277
        found_bzrdir = False
1251
 
        for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
1252
 
            if len(dirblock) and dirblock[0][1] == b'.bzr':
 
1278
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1279
            if len(dirblock) and dirblock[0][1] == '.bzr':
1253
1280
                # this tests the filtering of selected paths
1254
1281
                found_bzrdir = True
1255
1282
                del dirblock[0]
1256
 
            dirdetail = (dirdetail[0].decode('utf-8'),
1257
 
                         osutils.safe_unicode(dirdetail[1]))
1258
 
            dirblock = [
1259
 
                (entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
1260
 
                for entry in dirblock]
1261
1283
            result.append((dirdetail, dirblock))
1262
1284
 
1263
1285
        self.assertTrue(found_bzrdir)
1279
1301
            dirblock[:] = new_dirblock
1280
1302
 
1281
1303
    def _save_platform_info(self):
 
1304
        self.overrideAttr(win32utils, 'winver')
1282
1305
        self.overrideAttr(osutils, '_fs_enc')
1283
1306
        self.overrideAttr(osutils, '_selected_dir_reader')
1284
1307
 
1285
 
    def assertDirReaderIs(self, expected, top):
 
1308
    def assertDirReaderIs(self, expected):
1286
1309
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
1287
1310
        # Force it to redetect
1288
1311
        osutils._selected_dir_reader = None
1289
1312
        # Nothing to list, but should still trigger the selection logic
1290
 
        self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
 
1313
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1291
1314
        self.assertIsInstance(osutils._selected_dir_reader, expected)
1292
1315
 
1293
1316
    def test_force_walkdirs_utf8_fs_utf8(self):
1294
1317
        self.requireFeature(UTF8DirReaderFeature)
1295
1318
        self._save_platform_info()
 
1319
        win32utils.winver = None # Avoid the win32 detection code
1296
1320
        osutils._fs_enc = 'utf-8'
1297
 
        self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1321
        self.assertDirReaderIs(
 
1322
            UTF8DirReaderFeature.module.UTF8DirReader)
1298
1323
 
1299
1324
    def test_force_walkdirs_utf8_fs_ascii(self):
1300
1325
        self.requireFeature(UTF8DirReaderFeature)
1301
1326
        self._save_platform_info()
 
1327
        win32utils.winver = None # Avoid the win32 detection code
1302
1328
        osutils._fs_enc = 'ascii'
1303
1329
        self.assertDirReaderIs(
1304
 
            UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1330
            UTF8DirReaderFeature.module.UTF8DirReader)
1305
1331
 
1306
1332
    def test_force_walkdirs_utf8_fs_latin1(self):
1307
1333
        self._save_platform_info()
 
1334
        win32utils.winver = None # Avoid the win32 detection code
1308
1335
        osutils._fs_enc = 'iso-8859-1'
1309
 
        self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
 
1336
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1310
1337
 
1311
1338
    def test_force_walkdirs_utf8_nt(self):
1312
1339
        # Disabled because the thunk of the whole walkdirs api is disabled.
1313
1340
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1314
1341
        self._save_platform_info()
1315
 
        from .._walkdirs_win32 import Win32ReadDir
1316
 
        self.assertDirReaderIs(Win32ReadDir, ".")
 
1342
        win32utils.winver = 'Windows NT'
 
1343
        from brzlib._walkdirs_win32 import Win32ReadDir
 
1344
        self.assertDirReaderIs(Win32ReadDir)
 
1345
 
 
1346
    def test_force_walkdirs_utf8_98(self):
 
1347
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1348
        self._save_platform_info()
 
1349
        win32utils.winver = 'Windows 98'
 
1350
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1317
1351
 
1318
1352
    def test_unicode_walkdirs(self):
1319
1353
        """Walkdirs should always return unicode paths."""
1330
1364
            ]
1331
1365
        self.build_tree(tree)
1332
1366
        expected_dirblocks = [
1333
 
            ((u'', u'.'),
1334
 
             [(name0, name0, 'file', './' + name0),
1335
 
              (name1, name1, 'directory', './' + name1),
1336
 
              (name2, name2, 'file', './' + name2),
1337
 
              ]
1338
 
             ),
1339
 
            ((name1, './' + name1),
1340
 
             [(name1 + '/' + name0, name0, 'file', './' + name1
1341
 
               + '/' + name0),
1342
 
              (name1 + '/' + name1, name1, 'directory', './' + name1
1343
 
               + '/' + name1),
1344
 
              ]
1345
 
             ),
1346
 
            ((name1 + '/' + name1, './' + name1 + '/' + name1),
1347
 
             [
1348
 
                ]
1349
 
             ),
 
1367
                ((u'', u'.'),
 
1368
                 [(name0, name0, 'file', './' + name0),
 
1369
                  (name1, name1, 'directory', './' + name1),
 
1370
                  (name2, name2, 'file', './' + name2),
 
1371
                 ]
 
1372
                ),
 
1373
                ((name1, './' + name1),
 
1374
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1375
                                                        + '/' + name0),
 
1376
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1377
                                                            + '/' + name1),
 
1378
                 ]
 
1379
                ),
 
1380
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1381
                 [
 
1382
                 ]
 
1383
                ),
1350
1384
            ]
1351
1385
        result = list(osutils.walkdirs('.'))
1352
1386
        self._filter_out_stat(result)
1353
1387
        self.assertEqual(expected_dirblocks, result)
1354
 
        result = list(osutils.walkdirs(u'./' + name1, name1))
 
1388
        result = list(osutils.walkdirs(u'./'+name1, name1))
1355
1389
        self._filter_out_stat(result)
1356
1390
        self.assertEqual(expected_dirblocks[1:], result)
1357
1391
 
1377
1411
        name2 = name2.encode('utf8')
1378
1412
 
1379
1413
        expected_dirblocks = [
1380
 
            ((b'', b'.'),
1381
 
             [(name0, name0, 'file', b'./' + name0),
1382
 
              (name1, name1, 'directory', b'./' + name1),
1383
 
              (name2, name2, 'file', b'./' + name2),
1384
 
              ]
1385
 
             ),
1386
 
            ((name1, b'./' + name1),
1387
 
             [(name1 + b'/' + name0, name0, 'file', b'./' + name1
1388
 
               + b'/' + name0),
1389
 
              (name1 + b'/' + name1, name1, 'directory', b'./' + name1
1390
 
               + b'/' + name1),
1391
 
              ]
1392
 
             ),
1393
 
            ((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
1394
 
             [
1395
 
                ]
1396
 
             ),
 
1414
                (('', '.'),
 
1415
                 [(name0, name0, 'file', './' + name0),
 
1416
                  (name1, name1, 'directory', './' + name1),
 
1417
                  (name2, name2, 'file', './' + name2),
 
1418
                 ]
 
1419
                ),
 
1420
                ((name1, './' + name1),
 
1421
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1422
                                                        + '/' + name0),
 
1423
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1424
                                                            + '/' + name1),
 
1425
                 ]
 
1426
                ),
 
1427
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1428
                 [
 
1429
                 ]
 
1430
                ),
1397
1431
            ]
1398
1432
        result = []
1399
1433
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1400
1434
        # all abspaths are Unicode, and encode them back into utf8.
1401
1435
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1402
 
            self.assertIsInstance(dirdetail[0], bytes)
1403
 
            if isinstance(dirdetail[1], str):
 
1436
            self.assertIsInstance(dirdetail[0], str)
 
1437
            if isinstance(dirdetail[1], unicode):
1404
1438
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1405
1439
                dirblock = [list(info) for info in dirblock]
1406
1440
                for info in dirblock:
1407
 
                    self.assertIsInstance(info[4], str)
 
1441
                    self.assertIsInstance(info[4], unicode)
1408
1442
                    info[4] = info[4].encode('utf8')
1409
1443
            new_dirblock = []
1410
1444
            for info in dirblock:
1411
 
                self.assertIsInstance(info[0], bytes)
1412
 
                self.assertIsInstance(info[1], bytes)
1413
 
                self.assertIsInstance(info[4], bytes)
 
1445
                self.assertIsInstance(info[0], str)
 
1446
                self.assertIsInstance(info[1], str)
 
1447
                self.assertIsInstance(info[4], str)
1414
1448
                # Remove the stat information
1415
1449
                new_dirblock.append((info[0], info[1], info[2], info[4]))
1416
1450
            result.append((dirdetail, new_dirblock))
1444
1478
        # All of the abspaths should be in unicode, all of the relative paths
1445
1479
        # should be in utf8
1446
1480
        expected_dirblocks = [
1447
 
            ((b'', '.'),
1448
 
             [(name0, name0, 'file', './' + name0u),
1449
 
              (name1, name1, 'directory', './' + name1u),
1450
 
              (name2, name2, 'file', './' + name2u),
1451
 
              ]
1452
 
             ),
1453
 
            ((name1, './' + name1u),
1454
 
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
1455
 
               + '/' + name0u),
1456
 
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
1457
 
               + '/' + name1u),
1458
 
              ]
1459
 
             ),
1460
 
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1461
 
             [
1462
 
                ]
1463
 
             ),
 
1481
                (('', '.'),
 
1482
                 [(name0, name0, 'file', './' + name0u),
 
1483
                  (name1, name1, 'directory', './' + name1u),
 
1484
                  (name2, name2, 'file', './' + name2u),
 
1485
                 ]
 
1486
                ),
 
1487
                ((name1, './' + name1u),
 
1488
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1489
                                                        + '/' + name0u),
 
1490
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1491
                                                            + '/' + name1u),
 
1492
                 ]
 
1493
                ),
 
1494
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1495
                 [
 
1496
                 ]
 
1497
                ),
1464
1498
            ]
1465
1499
        result = list(osutils._walkdirs_utf8('.'))
1466
1500
        self._filter_out_stat(result)
1469
1503
    def test__walkdirs_utf8_win32readdir(self):
1470
1504
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1471
1505
        self.requireFeature(features.UnicodeFilenameFeature)
1472
 
        from .._walkdirs_win32 import Win32ReadDir
 
1506
        from brzlib._walkdirs_win32 import Win32ReadDir
1473
1507
        self._save_platform_info()
1474
1508
        osutils._selected_dir_reader = Win32ReadDir()
1475
1509
        name0u = u'0file-\xb6'
1490
1524
        # All of the abspaths should be in unicode, all of the relative paths
1491
1525
        # should be in utf8
1492
1526
        expected_dirblocks = [
1493
 
            (('', '.'),
1494
 
             [(name0, name0, 'file', './' + name0u),
1495
 
              (name1, name1, 'directory', './' + name1u),
1496
 
              (name2, name2, 'file', './' + name2u),
1497
 
              ]
1498
 
             ),
1499
 
            ((name1, './' + name1u),
1500
 
             [(name1 + '/' + name0, name0, 'file', './' + name1u
1501
 
               + '/' + name0u),
1502
 
              (name1 + '/' + name1, name1, 'directory', './' + name1u
1503
 
               + '/' + name1u),
1504
 
              ]
1505
 
             ),
1506
 
            ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1507
 
             [
1508
 
                ]
1509
 
             ),
 
1527
                (('', '.'),
 
1528
                 [(name0, name0, 'file', './' + name0u),
 
1529
                  (name1, name1, 'directory', './' + name1u),
 
1530
                  (name2, name2, 'file', './' + name2u),
 
1531
                 ]
 
1532
                ),
 
1533
                ((name1, './' + name1u),
 
1534
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1535
                                                        + '/' + name0u),
 
1536
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1537
                                                            + '/' + name1u),
 
1538
                 ]
 
1539
                ),
 
1540
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1541
                 [
 
1542
                 ]
 
1543
                ),
1510
1544
            ]
1511
1545
        result = list(osutils._walkdirs_utf8(u'.'))
1512
1546
        self._filter_out_stat(result)
1526
1560
        """make sure our Stat values are valid"""
1527
1561
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1528
1562
        self.requireFeature(features.UnicodeFilenameFeature)
1529
 
        from .._walkdirs_win32 import Win32ReadDir
 
1563
        from brzlib._walkdirs_win32 import Win32ReadDir
1530
1564
        name0u = u'0file-\xb6'
1531
1565
        name0 = name0u.encode('utf8')
1532
1566
        self.build_tree([name0u])
1533
1567
        # I hate to sleep() here, but I'm trying to make the ctime different
1534
1568
        # from the mtime
1535
1569
        time.sleep(2)
1536
 
        with open(name0u, 'ab') as f:
1537
 
            f.write(b'just a small update')
 
1570
        f = open(name0u, 'ab')
 
1571
        try:
 
1572
            f.write('just a small update')
 
1573
        finally:
 
1574
            f.close()
1538
1575
 
1539
1576
        result = Win32ReadDir().read_dir('', u'.')
1540
1577
        entry = result[0]
1547
1584
        """make sure our Stat values are valid"""
1548
1585
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1549
1586
        self.requireFeature(features.UnicodeFilenameFeature)
1550
 
        from .._walkdirs_win32 import Win32ReadDir
 
1587
        from brzlib._walkdirs_win32 import Win32ReadDir
1551
1588
        name0u = u'0dir-\u062c\u0648'
1552
1589
        name0 = name0u.encode('utf8')
1553
1590
        self.build_tree([name0u + '/'])
1633
1670
        # using the comparison routine shoudl work too:
1634
1671
        self.assertEqual(
1635
1672
            dir_sorted_paths,
1636
 
            sorted(original_paths, key=osutils.path_prefix_key))
 
1673
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1637
1674
 
1638
1675
 
1639
1676
class TestCopyTree(tests.TestCaseInTempDir):
1662
1699
    def test_copy_tree_handlers(self):
1663
1700
        processed_files = []
1664
1701
        processed_links = []
1665
 
 
1666
1702
        def file_handler(from_path, to_path):
1667
1703
            processed_files.append(('f', from_path, to_path))
1668
 
 
1669
1704
        def dir_handler(from_path, to_path):
1670
1705
            processed_files.append(('d', from_path, to_path))
1671
 
 
1672
1706
        def link_handler(from_path, to_path):
1673
1707
            processed_links.append((from_path, to_path))
1674
 
        handlers = {'file': file_handler,
1675
 
                    'directory': dir_handler,
1676
 
                    'symlink': link_handler,
1677
 
                    }
 
1708
        handlers = {'file':file_handler,
 
1709
                    'directory':dir_handler,
 
1710
                    'symlink':link_handler,
 
1711
                   }
1678
1712
 
1679
1713
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1680
1714
        if osutils.has_symlinks():
1685
1719
                          ('f', 'source/a', 'target/a'),
1686
1720
                          ('d', 'source/b', 'target/b'),
1687
1721
                          ('f', 'source/b/c', 'target/b/c'),
1688
 
                          ], processed_files)
 
1722
                         ], processed_files)
1689
1723
        self.assertPathDoesNotExist('target')
1690
1724
        if osutils.has_symlinks():
1691
1725
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1697
1731
    def setUp(self):
1698
1732
        super(TestSetUnsetEnv, self).setUp()
1699
1733
 
1700
 
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
 
1734
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1701
1735
                         'Environment was not cleaned up properly.'
1702
 
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
1703
 
 
 
1736
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
1704
1737
        def cleanup():
1705
 
            if 'BRZ_TEST_ENV_VAR' in os.environ:
1706
 
                del os.environ['BRZ_TEST_ENV_VAR']
 
1738
            if 'BZR_TEST_ENV_VAR' in os.environ:
 
1739
                del os.environ['BZR_TEST_ENV_VAR']
1707
1740
        self.addCleanup(cleanup)
1708
1741
 
1709
1742
    def test_set(self):
1710
1743
        """Test that we can set an env variable"""
1711
 
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1744
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1712
1745
        self.assertEqual(None, old)
1713
 
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1746
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1714
1747
 
1715
1748
    def test_double_set(self):
1716
1749
        """Test that we get the old value out"""
1717
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1718
 
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
 
1750
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1751
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1719
1752
        self.assertEqual('foo', old)
1720
 
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1753
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1721
1754
 
1722
1755
    def test_unicode(self):
1723
1756
        """Environment can only contain plain strings
1730
1763
                'Cannot find a unicode character that works in encoding %s'
1731
1764
                % (osutils.get_user_encoding(),))
1732
1765
 
1733
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
1734
 
        self.assertEqual(uni_val, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1766
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
 
1767
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1735
1768
 
1736
1769
    def test_unset(self):
1737
1770
        """Test that passing None will remove the env var"""
1738
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1739
 
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
 
1771
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1772
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1740
1773
        self.assertEqual('foo', old)
1741
 
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
1742
 
        self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
 
1774
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
 
1775
        self.assertFalse('BZR_TEST_ENV_VAR' in os.environ)
1743
1776
 
1744
1777
 
1745
1778
class TestSizeShaFile(tests.TestCaseInTempDir):
1746
1779
 
1747
1780
    def test_sha_empty(self):
1748
 
        self.build_tree_contents([('foo', b'')])
1749
 
        expected_sha = osutils.sha_string(b'')
 
1781
        self.build_tree_contents([('foo', '')])
 
1782
        expected_sha = osutils.sha_string('')
1750
1783
        f = open('foo')
1751
1784
        self.addCleanup(f.close)
1752
1785
        size, sha = osutils.size_sha_file(f)
1754
1787
        self.assertEqual(expected_sha, sha)
1755
1788
 
1756
1789
    def test_sha_mixed_endings(self):
1757
 
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1790
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1758
1791
        self.build_tree_contents([('foo', text)])
1759
1792
        expected_sha = osutils.sha_string(text)
1760
1793
        f = open('foo', 'rb')
1767
1800
class TestShaFileByName(tests.TestCaseInTempDir):
1768
1801
 
1769
1802
    def test_sha_empty(self):
1770
 
        self.build_tree_contents([('foo', b'')])
1771
 
        expected_sha = osutils.sha_string(b'')
 
1803
        self.build_tree_contents([('foo', '')])
 
1804
        expected_sha = osutils.sha_string('')
1772
1805
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1773
1806
 
1774
1807
    def test_sha_mixed_endings(self):
1775
 
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1808
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1776
1809
        self.build_tree_contents([('foo', text)])
1777
1810
        expected_sha = osutils.sha_string(text)
1778
1811
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1781
1814
class TestResourceLoading(tests.TestCaseInTempDir):
1782
1815
 
1783
1816
    def test_resource_string(self):
1784
 
        # test resource in breezy
1785
 
        text = osutils.resource_string('breezy', 'debug.py')
 
1817
        # test resource in brzlib
 
1818
        text = osutils.resource_string('brzlib', 'debug.py')
1786
1819
        self.assertContainsRe(text, "debug_flags = set()")
1787
 
        # test resource under breezy
1788
 
        text = osutils.resource_string('breezy.ui', 'text.py')
 
1820
        # test resource under brzlib
 
1821
        text = osutils.resource_string('brzlib.ui', 'text.py')
1789
1822
        self.assertContainsRe(text, "class TextUIFactory")
1790
1823
        # test unsupported package
1791
1824
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1792
 
                          'yyy.xx')
 
1825
            'yyy.xx')
1793
1826
        # test unknown resource
1794
 
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
 
1827
        self.assertRaises(IOError, osutils.resource_string, 'brzlib', 'yyy.xx')
 
1828
 
 
1829
 
 
1830
class TestReCompile(tests.TestCase):
 
1831
 
 
1832
    def _deprecated_re_compile_checked(self, *args, **kwargs):
 
1833
        return self.applyDeprecated(symbol_versioning.deprecated_in((2, 2, 0)),
 
1834
            osutils.re_compile_checked, *args, **kwargs)
 
1835
 
 
1836
    def test_re_compile_checked(self):
 
1837
        r = self._deprecated_re_compile_checked(r'A*', re.IGNORECASE)
 
1838
        self.assertTrue(r.match('aaaa'))
 
1839
        self.assertTrue(r.match('aAaA'))
 
1840
 
 
1841
    def test_re_compile_checked_error(self):
 
1842
        # like https://bugs.launchpad.net/bzr/+bug/251352
 
1843
 
 
1844
        # Due to possible test isolation error, re.compile is not lazy at
 
1845
        # this point. We re-install lazy compile.
 
1846
        lazy_regex.install_lazy_compile()
 
1847
        err = self.assertRaises(
 
1848
            errors.BzrCommandError,
 
1849
            self._deprecated_re_compile_checked, '*', re.IGNORECASE, 'test case')
 
1850
        self.assertEqual(
 
1851
            'Invalid regular expression in test case: '
 
1852
            '"*" nothing to repeat',
 
1853
            str(err))
1795
1854
 
1796
1855
 
1797
1856
class TestDirReader(tests.TestCaseInTempDir):
1816
1875
            '2file'
1817
1876
            ]
1818
1877
        expected_dirblocks = [
1819
 
            ((b'', '.'),
1820
 
             [(b'0file', b'0file', 'file', './0file'),
1821
 
              (b'1dir', b'1dir', 'directory', './1dir'),
1822
 
              (b'2file', b'2file', 'file', './2file'),
1823
 
              ]
1824
 
             ),
1825
 
            ((b'1dir', './1dir'),
1826
 
             [(b'1dir/0file', b'0file', 'file', './1dir/0file'),
1827
 
              (b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
1828
 
              ]
1829
 
             ),
1830
 
            ((b'1dir/1dir', './1dir/1dir'),
1831
 
             [
1832
 
                ]
1833
 
             ),
 
1878
                (('', '.'),
 
1879
                 [('0file', '0file', 'file'),
 
1880
                  ('1dir', '1dir', 'directory'),
 
1881
                  ('2file', '2file', 'file'),
 
1882
                 ]
 
1883
                ),
 
1884
                (('1dir', './1dir'),
 
1885
                 [('1dir/0file', '0file', 'file'),
 
1886
                  ('1dir/1dir', '1dir', 'directory'),
 
1887
                 ]
 
1888
                ),
 
1889
                (('1dir/1dir', './1dir/1dir'),
 
1890
                 [
 
1891
                 ]
 
1892
                ),
1834
1893
            ]
1835
1894
        return tree, expected_dirblocks
1836
1895
 
1840
1899
        result = list(osutils._walkdirs_utf8('.'))
1841
1900
        # Filter out stat and abspath
1842
1901
        self.assertEqual(expected_dirblocks,
1843
 
                         self._filter_out(result))
 
1902
                         [(dirinfo, [line[0:3] for line in block])
 
1903
                          for dirinfo, block in result])
1844
1904
 
1845
1905
    def test_walk_sub_dir(self):
1846
1906
        tree, expected_dirblocks = self._get_ascii_tree()
1847
1907
        self.build_tree(tree)
1848
1908
        # you can search a subdir only, with a supplied prefix.
1849
 
        result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
 
1909
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1850
1910
        # Filter out stat and abspath
1851
1911
        self.assertEqual(expected_dirblocks[1:],
1852
 
                         self._filter_out(result))
 
1912
                         [(dirinfo, [line[0:3] for line in block])
 
1913
                          for dirinfo, block in result])
1853
1914
 
1854
1915
    def _get_unicode_tree(self):
1855
1916
        name0u = u'0file-\xb6'
1866
1927
        name1 = name1u.encode('UTF-8')
1867
1928
        name2 = name2u.encode('UTF-8')
1868
1929
        expected_dirblocks = [
1869
 
            ((b'', '.'),
1870
 
             [(name0, name0, 'file', './' + name0u),
1871
 
              (name1, name1, 'directory', './' + name1u),
1872
 
              (name2, name2, 'file', './' + name2u),
1873
 
              ]
1874
 
             ),
1875
 
            ((name1, './' + name1u),
1876
 
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
1877
 
               + '/' + name0u),
1878
 
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
1879
 
               + '/' + name1u),
1880
 
              ]
1881
 
             ),
1882
 
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1883
 
             [
1884
 
                ]
1885
 
             ),
 
1930
                (('', '.'),
 
1931
                 [(name0, name0, 'file', './' + name0u),
 
1932
                  (name1, name1, 'directory', './' + name1u),
 
1933
                  (name2, name2, 'file', './' + name2u),
 
1934
                 ]
 
1935
                ),
 
1936
                ((name1, './' + name1u),
 
1937
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1938
                                                        + '/' + name0u),
 
1939
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1940
                                                            + '/' + name1u),
 
1941
                 ]
 
1942
                ),
 
1943
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1944
                 [
 
1945
                 ]
 
1946
                ),
1886
1947
            ]
1887
1948
        return tree, expected_dirblocks
1888
1949
 
1896
1957
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1897
1958
            details = []
1898
1959
            for line in block:
1899
 
                details.append(
1900
 
                    line[0:3] + (self._native_to_unicode(line[4]), ))
 
1960
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1901
1961
            filtered_dirblocks.append((dirinfo, details))
1902
1962
        return filtered_dirblocks
1903
1963
 
1914
1974
        target = u'target\N{Euro Sign}'
1915
1975
        link_name = u'l\N{Euro Sign}nk'
1916
1976
        os.symlink(target, link_name)
 
1977
        target_utf8 = target.encode('UTF-8')
1917
1978
        link_name_utf8 = link_name.encode('UTF-8')
1918
1979
        expected_dirblocks = [
1919
 
            ((b'', '.'),
1920
 
             [(link_name_utf8, link_name_utf8,
1921
 
               'symlink', './' + link_name), ],
1922
 
             )]
 
1980
                (('', '.'),
 
1981
                 [(link_name_utf8, link_name_utf8,
 
1982
                   'symlink', './' + link_name),],
 
1983
                 )]
1923
1984
        result = list(osutils._walkdirs_utf8('.'))
1924
1985
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1925
1986
 
1933
1994
    But prior python versions failed to properly encode the passed unicode
1934
1995
    string.
1935
1996
    """
1936
 
    _test_needs_features = [features.SymlinkFeature,
1937
 
                            features.UnicodeFilenameFeature]
 
1997
    _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
1938
1998
 
1939
1999
    def setUp(self):
1940
2000
        super(tests.TestCaseInTempDir, self).setUp()
1943
2003
        os.symlink(self.target, self.link)
1944
2004
 
1945
2005
    def test_os_readlink_link_encoding(self):
1946
 
        self.assertEqual(self.target, os.readlink(self.link))
 
2006
        self.assertEqual(self.target,  os.readlink(self.link))
1947
2007
 
1948
2008
    def test_os_readlink_link_decoding(self):
1949
2009
        self.assertEqual(self.target.encode(osutils._fs_enc),
1950
 
                         os.readlink(self.link.encode(osutils._fs_enc)))
 
2010
                          os.readlink(self.link.encode(osutils._fs_enc)))
1951
2011
 
1952
2012
 
1953
2013
class TestConcurrency(tests.TestCase):
1961
2021
        self.assertIsInstance(concurrency, int)
1962
2022
 
1963
2023
    def test_local_concurrency_environment_variable(self):
1964
 
        self.overrideEnv('BRZ_CONCURRENCY', '2')
 
2024
        self.overrideEnv('BZR_CONCURRENCY', '2')
1965
2025
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1966
 
        self.overrideEnv('BRZ_CONCURRENCY', '3')
 
2026
        self.overrideEnv('BZR_CONCURRENCY', '3')
1967
2027
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1968
 
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
 
2028
        self.overrideEnv('BZR_CONCURRENCY', 'foo')
1969
2029
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1970
2030
 
1971
2031
    def test_option_concurrency(self):
1972
 
        self.overrideEnv('BRZ_CONCURRENCY', '1')
 
2032
        self.overrideEnv('BZR_CONCURRENCY', '1')
1973
2033
        self.run_bzr('rocks --concurrency 42')
1974
2034
        # Command line overrides environment variable
1975
 
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
 
2035
        self.assertEqual('42', os.environ['BZR_CONCURRENCY'])
1976
2036
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
1977
2037
 
1978
2038
 
1980
2040
 
1981
2041
    def _try_loading(self):
1982
2042
        try:
1983
 
            import breezy._fictional_extension_py  # noqa: F401
1984
 
        except ImportError as e:
 
2043
            import brzlib._fictional_extension_py
 
2044
        except ImportError, e:
1985
2045
            osutils.failed_to_load_extension(e)
1986
2046
            return True
1987
2047
 
1992
2052
    def test_failure_to_load(self):
1993
2053
        self._try_loading()
1994
2054
        self.assertLength(1, osutils._extension_load_failures)
1995
 
        self.assertEqual(
1996
 
            osutils._extension_load_failures[0],
1997
 
            "No module named 'breezy._fictional_extension_py'")
 
2055
        self.assertEqual(osutils._extension_load_failures[0],
 
2056
            "No module named _fictional_extension_py")
1998
2057
 
1999
2058
    def test_report_extension_load_failures_no_warning(self):
2000
2059
        self.assertTrue(self._try_loading())
2001
 
        warnings, result = self.callCatchWarnings(
2002
 
            osutils.report_extension_load_failures)
 
2060
        warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
2003
2061
        # it used to give a Python warning; it no longer does
2004
2062
        self.assertLength(0, warnings)
2005
2063
 
2006
2064
    def test_report_extension_load_failures_message(self):
2007
 
        log = BytesIO()
 
2065
        log = StringIO()
2008
2066
        trace.push_log_file(log)
2009
2067
        self.assertTrue(self._try_loading())
2010
2068
        osutils.report_extension_load_failures()
2011
2069
        self.assertContainsRe(
2012
2070
            log.getvalue(),
2013
 
            br"brz: warning: some compiled extensions could not be loaded; "
2014
 
            b"see ``brz help missing-extensions``\n"
 
2071
            r"brz: warning: some compiled extensions could not be loaded; "
 
2072
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
2015
2073
            )
2016
2074
 
2017
2075
 
2046
2104
    def test_default_values(self):
2047
2105
        self.assertEqual(80, osutils.default_terminal_width)
2048
2106
 
2049
 
    def test_defaults_to_BRZ_COLUMNS(self):
2050
 
        # BRZ_COLUMNS is set by the test framework
2051
 
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
2052
 
        self.overrideEnv('BRZ_COLUMNS', '12')
 
2107
    def test_defaults_to_BZR_COLUMNS(self):
 
2108
        # BZR_COLUMNS is set by the test framework
 
2109
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
 
2110
        self.overrideEnv('BZR_COLUMNS', '12')
2053
2111
        self.assertEqual(12, osutils.terminal_width())
2054
2112
 
2055
 
    def test_BRZ_COLUMNS_0_no_limit(self):
2056
 
        self.overrideEnv('BRZ_COLUMNS', '0')
 
2113
    def test_BZR_COLUMNS_0_no_limit(self):
 
2114
        self.overrideEnv('BZR_COLUMNS', '0')
2057
2115
        self.assertEqual(None, osutils.terminal_width())
2058
2116
 
2059
2117
    def test_falls_back_to_COLUMNS(self):
2060
 
        self.overrideEnv('BRZ_COLUMNS', None)
 
2118
        self.overrideEnv('BZR_COLUMNS', None)
2061
2119
        self.assertNotEqual('42', os.environ['COLUMNS'])
2062
2120
        self.set_fake_tty()
2063
2121
        self.overrideEnv('COLUMNS', '42')
2064
2122
        self.assertEqual(42, osutils.terminal_width())
2065
2123
 
2066
2124
    def test_tty_default_without_columns(self):
2067
 
        self.overrideEnv('BRZ_COLUMNS', None)
 
2125
        self.overrideEnv('BZR_COLUMNS', None)
2068
2126
        self.overrideEnv('COLUMNS', None)
2069
2127
 
2070
2128
        def terminal_size(w, h):
2078
2136
        self.assertEqual(42, osutils.terminal_width())
2079
2137
 
2080
2138
    def test_non_tty_default_without_columns(self):
2081
 
        self.overrideEnv('BRZ_COLUMNS', None)
 
2139
        self.overrideEnv('BZR_COLUMNS', None)
2082
2140
        self.overrideEnv('COLUMNS', None)
2083
2141
        self.replace_stdout(None)
2084
2142
        self.assertEqual(None, osutils.terminal_width())
2088
2146
        termios = term_ios_feature.module
2089
2147
        # bug 63539 is about a termios without TIOCGWINSZ attribute
2090
2148
        try:
2091
 
            termios.TIOCGWINSZ
 
2149
            orig = termios.TIOCGWINSZ
2092
2150
        except AttributeError:
2093
2151
            # We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
2094
2152
            pass
2095
2153
        else:
2096
2154
            self.overrideAttr(termios, 'TIOCGWINSZ')
2097
2155
            del termios.TIOCGWINSZ
2098
 
        self.overrideEnv('BRZ_COLUMNS', None)
 
2156
        self.overrideEnv('BZR_COLUMNS', None)
2099
2157
        self.overrideEnv('COLUMNS', None)
2100
2158
        # Whatever the result is, if we don't raise an exception, it's ok.
2101
2159
        osutils.terminal_width()
2117
2175
    def test_copy_ownership_from_path(self):
2118
2176
        """copy_ownership_from_path test with specified src."""
2119
2177
        ownsrc = '/'
2120
 
        open('test_file', 'wt').close()
 
2178
        f = open('test_file', 'wt')
2121
2179
        osutils.copy_ownership_from_path('test_file', ownsrc)
2122
2180
 
2123
2181
        s = os.stat(ownsrc)
2127
2185
 
2128
2186
    def test_copy_ownership_nonesrc(self):
2129
2187
        """copy_ownership_from_path test with src=None."""
2130
 
        open('test_file', 'wt').close()
 
2188
        f = open('test_file', 'wt')
2131
2189
        # should use parent dir for permissions
2132
2190
        osutils.copy_ownership_from_path('test_file')
2133
2191
 
2140
2198
class TestPathFromEnviron(tests.TestCase):
2141
2199
 
2142
2200
    def test_is_unicode(self):
2143
 
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
2144
 
        path = osutils.path_from_environ('BRZ_TEST_PATH')
2145
 
        self.assertIsInstance(path, str)
 
2201
        self.overrideEnv('BZR_TEST_PATH', './anywhere at all/')
 
2202
        path = osutils.path_from_environ('BZR_TEST_PATH')
 
2203
        self.assertIsInstance(path, unicode)
2146
2204
        self.assertEqual(u'./anywhere at all/', path)
2147
2205
 
2148
2206
    def test_posix_path_env_ascii(self):
2149
 
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
2150
 
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
2151
 
        self.assertIsInstance(home, str)
 
2207
        self.overrideEnv('BZR_TEST_PATH', '/tmp')
 
2208
        home = osutils._posix_path_from_environ('BZR_TEST_PATH')
 
2209
        self.assertIsInstance(home, unicode)
2152
2210
        self.assertEqual(u'/tmp', home)
2153
2211
 
2154
2212
    def test_posix_path_env_unicode(self):
2155
2213
        self.requireFeature(features.ByteStringNamedFilesystem)
2156
 
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
 
2214
        self.overrideEnv('BZR_TEST_PATH', '/home/\xa7test')
2157
2215
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2158
2216
        self.assertEqual(u'/home/\xa7test',
2159
 
                         osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2217
            osutils._posix_path_from_environ('BZR_TEST_PATH'))
2160
2218
        osutils._fs_enc = "iso8859-5"
2161
 
        # In Python 3, os.environ returns unicode.
2162
 
        self.assertEqual(u'/home/\xa7test',
2163
 
                         osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2219
        self.assertEqual(u'/home/\u0407test',
 
2220
            osutils._posix_path_from_environ('BZR_TEST_PATH'))
 
2221
        osutils._fs_enc = "utf-8"
 
2222
        self.assertRaises(errors.BadFilenameEncoding,
 
2223
            osutils._posix_path_from_environ, 'BZR_TEST_PATH')
2164
2224
 
2165
2225
 
2166
2226
class TestGetHomeDir(tests.TestCase):
2167
2227
 
2168
2228
    def test_is_unicode(self):
2169
2229
        home = osutils._get_home_dir()
2170
 
        self.assertIsInstance(home, str)
 
2230
        self.assertIsInstance(home, unicode)
2171
2231
 
2172
2232
    def test_posix_homeless(self):
2173
2233
        self.overrideEnv('HOME', None)
2174
2234
        home = osutils._get_home_dir()
2175
 
        self.assertIsInstance(home, str)
 
2235
        self.assertIsInstance(home, unicode)
2176
2236
 
2177
2237
    def test_posix_home_ascii(self):
2178
2238
        self.overrideEnv('HOME', '/home/test')
2179
2239
        home = osutils._posix_get_home_dir()
2180
 
        self.assertIsInstance(home, str)
 
2240
        self.assertIsInstance(home, unicode)
2181
2241
        self.assertEqual(u'/home/test', home)
2182
2242
 
2183
2243
    def test_posix_home_unicode(self):
2186
2246
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2187
2247
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2188
2248
        osutils._fs_enc = "iso8859-5"
2189
 
        # In python 3, os.environ returns unicode
2190
 
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2249
        self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
 
2250
        osutils._fs_enc = "utf-8"
 
2251
        self.assertRaises(errors.BadFilenameEncoding,
 
2252
            osutils._posix_get_home_dir)
2191
2253
 
2192
2254
 
2193
2255
class TestGetuserUnicode(tests.TestCase):
2194
2256
 
2195
2257
    def test_is_unicode(self):
2196
2258
        user = osutils.getuser_unicode()
2197
 
        self.assertIsInstance(user, str)
 
2259
        self.assertIsInstance(user, unicode)
2198
2260
 
2199
2261
    def envvar_to_override(self):
2200
2262
        if sys.platform == "win32":
2201
2263
            # Disable use of platform calls on windows so envvar is used
2202
2264
            self.overrideAttr(win32utils, 'has_ctypes', False)
2203
 
            return 'USERNAME'  # only variable used on windows
2204
 
        return 'LOGNAME'  # first variable checked by getpass.getuser()
 
2265
            return 'USERNAME' # only variable used on windows
 
2266
        return 'LOGNAME' # first variable checked by getpass.getuser()
2205
2267
 
2206
2268
    def test_ascii_user(self):
2207
2269
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
2216
2278
                % (osutils.get_user_encoding(),))
2217
2279
        uni_username = u'jrandom' + uni_val
2218
2280
        encoded_username = uni_username.encode(ue)
2219
 
        self.overrideEnv(self.envvar_to_override(), uni_username)
 
2281
        self.overrideEnv(self.envvar_to_override(), encoded_username)
2220
2282
        self.assertEqual(uni_username, osutils.getuser_unicode())
2221
2283
 
2222
2284
 
2255
2317
    def test_windows(self):
2256
2318
        if sys.platform != 'win32':
2257
2319
            raise tests.TestSkipped('test requires win32')
2258
 
        self.assertTrue(osutils.find_executable_on_path(
2259
 
            'explorer') is not None)
 
2320
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
2260
2321
        self.assertTrue(
2261
2322
            osutils.find_executable_on_path('explorer.exe') is not None)
2262
2323
        self.assertTrue(
2264
2325
        self.assertTrue(
2265
2326
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2266
2327
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
2267
 
 
 
2328
        
2268
2329
    def test_windows_app_path(self):
2269
2330
        if sys.platform != 'win32':
2270
2331
            raise tests.TestSkipped('test requires win32')
2271
2332
        # Override PATH env var so that exe can only be found on App Path
2272
2333
        self.overrideEnv('PATH', '')
2273
2334
        # Internt Explorer is always registered in the App Path
2274
 
        self.assertTrue(osutils.find_executable_on_path(
2275
 
            'iexplore') is not None)
 
2335
        self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
2276
2336
 
2277
2337
    def test_other(self):
2278
2338
        if sys.platform == 'win32':
2306
2366
        import pywintypes
2307
2367
        self.assertTrue(osutils.is_environment_error(
2308
2368
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))
2309
 
 
2310
 
 
2311
 
class SupportsExecutableTests(tests.TestCaseInTempDir):
2312
 
 
2313
 
    def test_returns_bool(self):
2314
 
        self.assertIsInstance(osutils.supports_executable(self.test_dir), bool)
2315
 
 
2316
 
 
2317
 
class SupportsSymlinksTests(tests.TestCaseInTempDir):
2318
 
 
2319
 
    def test_returns_bool(self):
2320
 
        self.assertIsInstance(osutils.supports_symlinks(self.test_dir), bool)
2321
 
 
2322
 
 
2323
 
class MtabReader(tests.TestCaseInTempDir):
2324
 
 
2325
 
    def test_read_mtab(self):
2326
 
        self.build_tree_contents([('mtab', """\
2327
 
/dev/mapper/blah--vg-root / ext4 rw,relatime,errors=remount-ro 0 0
2328
 
/dev/mapper/blah--vg-home /home vfat rw,relatime 0 0
2329
 
# comment
2330
 
 
2331
 
iminvalid
2332
 
""")])
2333
 
        self.assertEqual(
2334
 
            list(osutils.read_mtab('mtab')),
2335
 
            [(b'/', 'ext4'),
2336
 
             (b'/home', 'vfat')])
2337
 
 
2338
 
 
2339
 
class GetFsTypeTests(tests.TestCaseInTempDir):
2340
 
 
2341
 
    def test_returns_string_or_none(self):
2342
 
        ret = osutils.get_fs_type(self.test_dir)
2343
 
        self.assertTrue(isinstance(ret, str) or ret is None)
2344
 
 
2345
 
    def test_returns_most_specific(self):
2346
 
        self.overrideAttr(
2347
 
            osutils, '_FILESYSTEM_FINDER',
2348
 
            osutils.FilesystemFinder(
2349
 
                [(b'/', 'ext4'), (b'/home', 'vfat'),
2350
 
                 (b'/home/jelmer', 'ext2')]))
2351
 
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer/blah'), 'ext2')
2352
 
        self.assertEqual(osutils.get_fs_type('/home/jelmer/blah'), 'ext2')
2353
 
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer'), 'ext2')
2354
 
        self.assertEqual(osutils.get_fs_type(b'/home/martin'), 'vfat')
2355
 
        self.assertEqual(osutils.get_fs_type(b'/home'), 'vfat')
2356
 
        self.assertEqual(osutils.get_fs_type(b'/other'), 'ext4')
2357
 
 
2358
 
    def test_returns_none(self):
2359
 
        self.overrideAttr(
2360
 
            osutils, '_FILESYSTEM_FINDER',
2361
 
            osutils.FilesystemFinder([]))
2362
 
        self.assertIs(osutils.get_fs_type('/home/jelmer/blah'), None)
2363
 
        self.assertIs(osutils.get_fs_type(b'/home/jelmer/blah'), None)
2364
 
        self.assertIs(osutils.get_fs_type('/home/jelmer'), None)