/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_osutils.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-10 01:35:53 UTC
  • mto: (6670.4.8 move-bzr)
  • mto: This revision was merged to the branch mainline in revision 6681.
  • Revision ID: jelmer@jelmer.uk-20170610013553-560y7mn3su4pp763
Fix remaining tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
 
from cStringIO import StringIO
 
19
from __future__ import absolute_import, division
 
20
 
20
21
import errno
21
22
import os
22
23
import re
 
24
import select
23
25
import socket
24
 
import stat
25
26
import sys
 
27
import tempfile
26
28
import time
27
29
 
28
 
from bzrlib import (
 
30
from .. import (
29
31
    errors,
 
32
    lazy_regex,
30
33
    osutils,
 
34
    symbol_versioning,
31
35
    tests,
32
36
    trace,
33
37
    win32utils,
34
38
    )
35
 
from bzrlib.tests import (
 
39
from ..sixish import (
 
40
    BytesIO,
 
41
    )
 
42
from . import (
36
43
    features,
37
44
    file_utils,
38
45
    test__walkdirs_win32,
39
46
    )
40
 
 
41
 
 
42
 
class _UTF8DirReaderFeature(tests.Feature):
 
47
from .scenarios import load_tests_apply_scenarios
 
48
 
 
49
 
 
50
class _UTF8DirReaderFeature(features.Feature):
43
51
 
44
52
    def _probe(self):
45
53
        try:
46
 
            from bzrlib import _readdir_pyx
 
54
            from .. import _readdir_pyx
47
55
            self.reader = _readdir_pyx.UTF8DirReader
48
56
            return True
49
57
        except ImportError:
50
58
            return False
51
59
 
52
60
    def feature_name(self):
53
 
        return 'bzrlib._readdir_pyx'
54
 
 
55
 
UTF8DirReaderFeature = _UTF8DirReaderFeature()
56
 
 
57
 
term_ios_feature = tests.ModuleAvailableFeature('termios')
 
61
        return 'breezy._readdir_pyx'
 
62
 
 
63
UTF8DirReaderFeature = features.ModuleAvailableFeature('breezy._readdir_pyx')
 
64
 
 
65
term_ios_feature = features.ModuleAvailableFeature('termios')
58
66
 
59
67
 
60
68
def _already_unicode(s):
78
86
    # Some DirReaders are platform specific and even there they may not be
79
87
    # available.
80
88
    if UTF8DirReaderFeature.available():
81
 
        from bzrlib import _readdir_pyx
 
89
        from .. import _readdir_pyx
82
90
        scenarios.append(('utf8',
83
91
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
84
92
                               _native_to_unicode=_utf8_to_unicode)))
85
93
 
86
94
    if test__walkdirs_win32.win32_readdir_feature.available():
87
95
        try:
88
 
            from bzrlib import _walkdirs_win32
 
96
            from .. import _walkdirs_win32
89
97
            scenarios.append(
90
98
                ('win32',
91
99
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
95
103
    return scenarios
96
104
 
97
105
 
98
 
def load_tests(basic_tests, module, loader):
99
 
    suite = loader.suiteClass()
100
 
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
101
 
        basic_tests, tests.condition_isinstance(TestDirReader))
102
 
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
103
 
    suite.addTest(remaining_tests)
104
 
    return suite
 
106
load_tests = load_tests_apply_scenarios
105
107
 
106
108
 
107
109
class TestContainsWhitespace(tests.TestCase):
108
110
 
109
111
    def test_contains_whitespace(self):
110
 
        self.failUnless(osutils.contains_whitespace(u' '))
111
 
        self.failUnless(osutils.contains_whitespace(u'hello there'))
112
 
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
113
 
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
114
 
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
115
 
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
112
        self.assertTrue(osutils.contains_whitespace(u' '))
 
113
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
 
114
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
 
115
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
 
116
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
 
117
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
116
118
 
117
119
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
118
120
        # is whitespace, but we do not.
119
 
        self.failIf(osutils.contains_whitespace(u''))
120
 
        self.failIf(osutils.contains_whitespace(u'hellothere'))
121
 
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
121
        self.assertFalse(osutils.contains_whitespace(u''))
 
122
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
 
123
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
122
124
 
123
125
 
124
126
class TestRename(tests.TestCaseInTempDir):
138
140
        # This should work everywhere
139
141
        self.create_file('a', 'something in a\n')
140
142
        self._fancy_rename('a', 'b')
141
 
        self.failIfExists('a')
142
 
        self.failUnlessExists('b')
 
143
        self.assertPathDoesNotExist('a')
 
144
        self.assertPathExists('b')
143
145
        self.check_file_contents('b', 'something in a\n')
144
146
 
145
147
        self.create_file('a', 'new something in a\n')
152
154
        self.create_file('target', 'data in target\n')
153
155
        self.assertRaises((IOError, OSError), self._fancy_rename,
154
156
                          'missingsource', 'target')
155
 
        self.failUnlessExists('target')
 
157
        self.assertPathExists('target')
156
158
        self.check_file_contents('target', 'data in target\n')
157
159
 
158
160
    def test_fancy_rename_fails_if_source_and_target_missing(self):
163
165
        # Rename should be semi-atomic on all platforms
164
166
        self.create_file('a', 'something in a\n')
165
167
        osutils.rename('a', 'b')
166
 
        self.failIfExists('a')
167
 
        self.failUnlessExists('b')
 
168
        self.assertPathDoesNotExist('a')
 
169
        self.assertPathExists('b')
168
170
        self.check_file_contents('b', 'something in a\n')
169
171
 
170
172
        self.create_file('a', 'new something in a\n')
182
184
        # we can't use failUnlessExists on case-insensitive filesystem
183
185
        # so try to check shape of the tree
184
186
        shape = sorted(os.listdir('.'))
185
 
        self.assertEquals(['A', 'B'], shape)
 
187
        self.assertEqual(['A', 'B'], shape)
186
188
 
187
 
    def test_rename_error(self):
188
 
        # We wrap os.rename to make it give an error including the filenames
189
 
        # https://bugs.launchpad.net/bzr/+bug/491763
190
 
        err = self.assertRaises(OSError, osutils.rename,
191
 
            'nonexistent', 'target')
192
 
        self.assertContainsString(str(err), 'nonexistent')
 
189
    def test_rename_exception(self):
 
190
        try:
 
191
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
 
192
        except OSError as e:
 
193
            self.assertEqual(e.old_filename, 'nonexistent_path')
 
194
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
 
195
            self.assertTrue('nonexistent_path' in e.strerror)
 
196
            self.assertTrue('different_nonexistent_path' in e.strerror)
193
197
 
194
198
 
195
199
class TestRandChars(tests.TestCase):
222
226
                         (['src'], SRC_FOO_C),
223
227
                         (['src'], 'src'),
224
228
                         ]:
225
 
            self.assert_(osutils.is_inside_any(dirs, fn))
 
229
            self.assertTrue(osutils.is_inside_any(dirs, fn))
226
230
        for dirs, fn in [(['src'], 'srccontrol'),
227
231
                         (['src'], 'srccontrol/foo')]:
228
232
            self.assertFalse(osutils.is_inside_any(dirs, fn))
234
238
                         (['src/bar.c', 'bla/foo.c'], 'src'),
235
239
                         (['src'], 'src'),
236
240
                         ]:
237
 
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
 
241
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
238
242
 
239
243
        for dirs, fn in [(['src'], 'srccontrol'),
240
244
                         (['srccontrol/foo.c'], 'src'),
242
246
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
243
247
 
244
248
 
 
249
class TestLstat(tests.TestCaseInTempDir):
 
250
 
 
251
    def test_lstat_matches_fstat(self):
 
252
        # On Windows, lstat and fstat don't always agree, primarily in the
 
253
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
 
254
        # custom implementation.
 
255
        if sys.platform == 'win32':
 
256
            # We only have special lstat/fstat if we have the extension.
 
257
            # Without it, we may end up re-reading content when we don't have
 
258
            # to, but otherwise it doesn't effect correctness.
 
259
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
260
        f = open('test-file.txt', 'wb')
 
261
        self.addCleanup(f.close)
 
262
        f.write('some content\n')
 
263
        f.flush()
 
264
        self.assertEqualStat(osutils.fstat(f.fileno()),
 
265
                             osutils.lstat('test-file.txt'))
 
266
 
 
267
 
245
268
class TestRmTree(tests.TestCaseInTempDir):
246
269
 
247
270
    def test_rmtree(self):
259
282
 
260
283
        osutils.rmtree('dir')
261
284
 
262
 
        self.failIfExists('dir/file')
263
 
        self.failIfExists('dir')
 
285
        self.assertPathDoesNotExist('dir/file')
 
286
        self.assertPathDoesNotExist('dir')
264
287
 
265
288
 
266
289
class TestDeleteAny(tests.TestCaseInTempDir):
279
302
 
280
303
    def test_file_kind(self):
281
304
        self.build_tree(['file', 'dir/'])
282
 
        self.assertEquals('file', osutils.file_kind('file'))
283
 
        self.assertEquals('directory', osutils.file_kind('dir/'))
 
305
        self.assertEqual('file', osutils.file_kind('file'))
 
306
        self.assertEqual('directory', osutils.file_kind('dir/'))
284
307
        if osutils.has_symlinks():
285
308
            os.symlink('symlink', 'symlink')
286
 
            self.assertEquals('symlink', osutils.file_kind('symlink'))
 
309
            self.assertEqual('symlink', osutils.file_kind('symlink'))
287
310
 
288
311
        # TODO: jam 20060529 Test a block device
289
312
        try:
290
313
            os.lstat('/dev/null')
291
 
        except OSError, e:
 
314
        except OSError as e:
292
315
            if e.errno not in (errno.ENOENT,):
293
316
                raise
294
317
        else:
295
 
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
 
318
            self.assertEqual('chardev', osutils.file_kind('/dev/null'))
296
319
 
297
320
        mkfifo = getattr(os, 'mkfifo', None)
298
321
        if mkfifo:
299
322
            mkfifo('fifo')
300
323
            try:
301
 
                self.assertEquals('fifo', osutils.file_kind('fifo'))
 
324
                self.assertEqual('fifo', osutils.file_kind('fifo'))
302
325
            finally:
303
326
                os.remove('fifo')
304
327
 
307
330
            s = socket.socket(AF_UNIX)
308
331
            s.bind('socket')
309
332
            try:
310
 
                self.assertEquals('socket', osutils.file_kind('socket'))
 
333
                self.assertEqual('socket', osutils.file_kind('socket'))
311
334
            finally:
312
335
                os.remove('socket')
313
336
 
332
355
 
333
356
        orig_umask = osutils.get_umask()
334
357
        self.addCleanup(os.umask, orig_umask)
335
 
        os.umask(0222)
336
 
        self.assertEqual(0222, osutils.get_umask())
337
 
        os.umask(0022)
338
 
        self.assertEqual(0022, osutils.get_umask())
339
 
        os.umask(0002)
340
 
        self.assertEqual(0002, osutils.get_umask())
341
 
        os.umask(0027)
342
 
        self.assertEqual(0027, osutils.get_umask())
 
358
        os.umask(0o222)
 
359
        self.assertEqual(0o222, osutils.get_umask())
 
360
        os.umask(0o022)
 
361
        self.assertEqual(0o022, osutils.get_umask())
 
362
        os.umask(0o002)
 
363
        self.assertEqual(0o002, osutils.get_umask())
 
364
        os.umask(0o027)
 
365
        self.assertEqual(0o027, osutils.get_umask())
343
366
 
344
367
 
345
368
class TestDateTime(tests.TestCase):
418
441
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
419
442
 
420
443
 
 
444
class TestFdatasync(tests.TestCaseInTempDir):
 
445
 
 
446
    def do_fdatasync(self):
 
447
        f = tempfile.NamedTemporaryFile()
 
448
        osutils.fdatasync(f.fileno())
 
449
        f.close()
 
450
 
 
451
    @staticmethod
 
452
    def raise_eopnotsupp(*args, **kwargs):
 
453
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
 
454
 
 
455
    @staticmethod
 
456
    def raise_enotsup(*args, **kwargs):
 
457
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
 
458
 
 
459
    def test_fdatasync_handles_system_function(self):
 
460
        self.overrideAttr(os, "fdatasync")
 
461
        self.do_fdatasync()
 
462
 
 
463
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
 
464
        self.overrideAttr(os, "fdatasync")
 
465
        self.overrideAttr(os, "fsync")
 
466
        self.do_fdatasync()
 
467
 
 
468
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
 
469
        self.overrideAttr(errno, "EOPNOTSUPP")
 
470
        self.do_fdatasync()
 
471
 
 
472
    def test_fdatasync_catches_ENOTSUP(self):
 
473
        enotsup = getattr(errno, "ENOTSUP", None)
 
474
        if enotsup is None:
 
475
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
 
476
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
 
477
        self.do_fdatasync()
 
478
 
 
479
    def test_fdatasync_catches_EOPNOTSUPP(self):
 
480
        enotsup = getattr(errno, "EOPNOTSUPP", None)
 
481
        if enotsup is None:
 
482
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
 
483
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
 
484
        self.do_fdatasync()
 
485
 
 
486
 
421
487
class TestLinks(tests.TestCaseInTempDir):
422
488
 
423
489
    def test_dereference_path(self):
424
 
        self.requireFeature(tests.SymlinkFeature)
 
490
        self.requireFeature(features.SymlinkFeature)
425
491
        cwd = osutils.realpath('.')
426
492
        os.mkdir('bar')
427
493
        bar_path = osutils.pathjoin(cwd, 'bar')
455
521
        # Make a file readonly
456
522
        osutils.make_readonly('file')
457
523
        mode = os.lstat('file').st_mode
458
 
        self.assertEqual(mode, mode & 0777555)
 
524
        self.assertEqual(mode, mode & 0o777555)
459
525
 
460
526
        # Make a file writable
461
527
        osutils.make_writable('file')
462
528
        mode = os.lstat('file').st_mode
463
 
        self.assertEqual(mode, mode | 0200)
 
529
        self.assertEqual(mode, mode | 0o200)
464
530
 
465
531
        if osutils.has_symlinks():
466
532
            # should not error when handed a symlink
474
540
 
475
541
class TestCanonicalRelPath(tests.TestCaseInTempDir):
476
542
 
477
 
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
 
543
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
478
544
 
479
545
    def test_canonical_relpath_simple(self):
480
546
        f = file('MixedCaseName', 'w')
481
547
        f.close()
482
548
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
483
 
        self.failUnlessEqual('work/MixedCaseName', actual)
 
549
        self.assertEqual('work/MixedCaseName', actual)
484
550
 
485
551
    def test_canonical_relpath_missing_tail(self):
486
552
        os.mkdir('MixedCaseParent')
487
553
        actual = osutils.canonical_relpath(self.test_base_dir,
488
554
                                           'mixedcaseparent/nochild')
489
 
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
 
555
        self.assertEqual('work/MixedCaseParent/nochild', actual)
490
556
 
491
557
 
492
558
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
536
602
    """Test pumpfile method."""
537
603
 
538
604
    def setUp(self):
539
 
        tests.TestCase.setUp(self)
 
605
        super(TestPumpFile, self).setUp()
540
606
        # create a test datablock
541
607
        self.block_size = 512
542
 
        pattern = '0123456789ABCDEF'
543
 
        self.test_data = pattern * (3 * self.block_size / len(pattern))
 
608
        pattern = b'0123456789ABCDEF'
 
609
        self.test_data = pattern * (3 * self.block_size // len(pattern))
544
610
        self.test_data_len = len(self.test_data)
545
611
 
546
612
    def test_bracket_block_size(self):
550
616
        self.assertTrue(self.test_data_len > self.block_size)
551
617
 
552
618
        from_file = file_utils.FakeReadFile(self.test_data)
553
 
        to_file = StringIO()
 
619
        to_file = BytesIO()
554
620
 
555
 
        # read (max / 2) bytes and verify read size wasn't affected
556
 
        num_bytes_to_read = self.block_size / 2
 
621
        # read (max // 2) bytes and verify read size wasn't affected
 
622
        num_bytes_to_read = self.block_size // 2
557
623
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
558
624
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
559
625
        self.assertEqual(from_file.get_read_count(), 1)
591
657
 
592
658
        # retrieve data in blocks
593
659
        from_file = file_utils.FakeReadFile(self.test_data)
594
 
        to_file = StringIO()
 
660
        to_file = BytesIO()
595
661
        osutils.pumpfile(from_file, to_file, self.test_data_len,
596
662
                         self.block_size)
597
663
 
615
681
 
616
682
        # retrieve data to EOF
617
683
        from_file = file_utils.FakeReadFile(self.test_data)
618
 
        to_file = StringIO()
 
684
        to_file = BytesIO()
619
685
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
620
686
 
621
687
        # verify read size was equal to the maximum read size
635
701
        with this new version."""
636
702
        # retrieve data using default (old) pumpfile method
637
703
        from_file = file_utils.FakeReadFile(self.test_data)
638
 
        to_file = StringIO()
 
704
        to_file = BytesIO()
639
705
        osutils.pumpfile(from_file, to_file)
640
706
 
641
707
        # report error if the data wasn't equal (we only report the size due
649
715
        activity = []
650
716
        def log_activity(length, direction):
651
717
            activity.append((length, direction))
652
 
        from_file = StringIO(self.test_data)
653
 
        to_file = StringIO()
 
718
        from_file = BytesIO(self.test_data)
 
719
        to_file = BytesIO()
654
720
        osutils.pumpfile(from_file, to_file, buff_size=500,
655
721
                         report_activity=log_activity, direction='read')
656
722
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
657
723
                          (36, 'read')], activity)
658
724
 
659
 
        from_file = StringIO(self.test_data)
660
 
        to_file = StringIO()
 
725
        from_file = BytesIO(self.test_data)
 
726
        to_file = BytesIO()
661
727
        del activity[:]
662
728
        osutils.pumpfile(from_file, to_file, buff_size=500,
663
729
                         report_activity=log_activity, direction='write')
665
731
                          (36, 'write')], activity)
666
732
 
667
733
        # And with a limited amount of data
668
 
        from_file = StringIO(self.test_data)
669
 
        to_file = StringIO()
 
734
        from_file = BytesIO(self.test_data)
 
735
        to_file = BytesIO()
670
736
        del activity[:]
671
737
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
672
738
                         report_activity=log_activity, direction='read')
677
743
class TestPumpStringFile(tests.TestCase):
678
744
 
679
745
    def test_empty(self):
680
 
        output = StringIO()
681
 
        osutils.pump_string_file("", output)
682
 
        self.assertEqual("", output.getvalue())
 
746
        output = BytesIO()
 
747
        osutils.pump_string_file(b"", output)
 
748
        self.assertEqual(b"", output.getvalue())
683
749
 
684
750
    def test_more_than_segment_size(self):
685
 
        output = StringIO()
686
 
        osutils.pump_string_file("123456789", output, 2)
687
 
        self.assertEqual("123456789", output.getvalue())
 
751
        output = BytesIO()
 
752
        osutils.pump_string_file(b"123456789", output, 2)
 
753
        self.assertEqual(b"123456789", output.getvalue())
688
754
 
689
755
    def test_segment_size(self):
690
 
        output = StringIO()
691
 
        osutils.pump_string_file("12", output, 2)
692
 
        self.assertEqual("12", output.getvalue())
 
756
        output = BytesIO()
 
757
        osutils.pump_string_file(b"12", output, 2)
 
758
        self.assertEqual(b"12", output.getvalue())
693
759
 
694
760
    def test_segment_size_multiple(self):
695
 
        output = StringIO()
696
 
        osutils.pump_string_file("1234", output, 2)
697
 
        self.assertEqual("1234", output.getvalue())
 
761
        output = BytesIO()
 
762
        osutils.pump_string_file(b"1234", output, 2)
 
763
        self.assertEqual(b"1234", output.getvalue())
698
764
 
699
765
 
700
766
class TestRelpath(tests.TestCase):
763
829
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
764
830
 
765
831
    def test_from_unicode_string_ascii_contents(self):
766
 
        self.assertEqual('bargam',
767
 
                         osutils.safe_revision_id(u'bargam', warn=False))
768
 
 
769
 
    def test_from_unicode_deprecated(self):
770
 
        self.assertEqual('bargam',
771
 
            self.callDeprecated([osutils._revision_id_warning],
772
 
                                osutils.safe_revision_id, u'bargam'))
 
832
        self.assertRaises(TypeError,
 
833
                          osutils.safe_revision_id, u'bargam')
773
834
 
774
835
    def test_from_unicode_string_unicode_contents(self):
775
 
        self.assertEqual('bargam\xc2\xae',
776
 
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
836
        self.assertRaises(TypeError,
 
837
                         osutils.safe_revision_id, u'bargam\xae')
777
838
 
778
839
    def test_from_utf8_string(self):
779
840
        self.assertEqual('foo\xc2\xae',
790
851
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
791
852
 
792
853
    def test_from_unicode_string_ascii_contents(self):
793
 
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
794
 
 
795
 
    def test_from_unicode_deprecated(self):
796
 
        self.assertEqual('bargam',
797
 
            self.callDeprecated([osutils._file_id_warning],
798
 
                                osutils.safe_file_id, u'bargam'))
 
854
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
799
855
 
800
856
    def test_from_unicode_string_unicode_contents(self):
801
 
        self.assertEqual('bargam\xc2\xae',
802
 
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
857
        self.assertRaises(TypeError,
 
858
                          osutils.safe_file_id, u'bargam\xae')
803
859
 
804
860
    def test_from_utf8_string(self):
805
861
        self.assertEqual('foo\xc2\xae',
810
866
        self.assertEqual(None, osutils.safe_file_id(None))
811
867
 
812
868
 
 
869
class TestSendAll(tests.TestCase):
 
870
 
 
871
    def test_send_with_disconnected_socket(self):
 
872
        class DisconnectedSocket(object):
 
873
            def __init__(self, err):
 
874
                self.err = err
 
875
            def send(self, content):
 
876
                raise self.err
 
877
            def close(self):
 
878
                pass
 
879
        # All of these should be treated as ConnectionReset
 
880
        errs = []
 
881
        for err_cls in (IOError, socket.error):
 
882
            for errnum in osutils._end_of_stream_errors:
 
883
                errs.append(err_cls(errnum))
 
884
        for err in errs:
 
885
            sock = DisconnectedSocket(err)
 
886
            self.assertRaises(errors.ConnectionReset,
 
887
                osutils.send_all, sock, b'some more content')
 
888
 
 
889
    def test_send_with_no_progress(self):
 
890
        # See https://bugs.launchpad.net/bzr/+bug/1047309
 
891
        # It seems that paramiko can get into a state where it doesn't error,
 
892
        # but it returns 0 bytes sent for requests over and over again.
 
893
        class NoSendingSocket(object):
 
894
            def __init__(self):
 
895
                self.call_count = 0
 
896
            def send(self, bytes):
 
897
                self.call_count += 1
 
898
                if self.call_count > 100:
 
899
                    # Prevent the test suite from hanging
 
900
                    raise RuntimeError('too many calls')
 
901
                return 0
 
902
        sock = NoSendingSocket()
 
903
        self.assertRaises(errors.ConnectionReset,
 
904
                          osutils.send_all, sock, b'content')
 
905
        self.assertEqual(1, sock.call_count)
 
906
 
 
907
 
 
908
class TestPosixFuncs(tests.TestCase):
 
909
    """Test that the posix version of normpath returns an appropriate path
 
910
       when used with 2 leading slashes."""
 
911
 
 
912
    def test_normpath(self):
 
913
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
 
914
        self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
915
        self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
916
 
 
917
 
813
918
class TestWin32Funcs(tests.TestCase):
814
919
    """Test that _win32 versions of os utilities return appropriate paths."""
815
920
 
816
921
    def test_abspath(self):
 
922
        self.requireFeature(features.win32_feature)
817
923
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
818
924
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
819
925
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
832
938
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
833
939
        self.assertEqual('path/to/foo',
834
940
                         osutils._win32_pathjoin('path/to/', 'foo'))
835
 
        self.assertEqual('/foo',
 
941
 
 
942
    def test_pathjoin_late_bugfix(self):
 
943
        if sys.version_info < (2, 7, 6):
 
944
            expected = '/foo'
 
945
        else:
 
946
            expected = 'C:/foo'
 
947
        self.assertEqual(expected,
836
948
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
837
 
        self.assertEqual('/foo',
 
949
        self.assertEqual(expected,
838
950
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
839
951
 
840
952
    def test_normpath(self):
845
957
 
846
958
    def test_getcwd(self):
847
959
        cwd = osutils._win32_getcwd()
848
 
        os_cwd = os.getcwdu()
 
960
        os_cwd = osutils._getcwd()
849
961
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
850
962
        # win32 is inconsistent whether it returns lower or upper case
851
963
        # and even if it was consistent the user might type the other
859
971
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
860
972
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
861
973
 
862
 
    def test_win98_abspath(self):
863
 
        # absolute path
864
 
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
865
 
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
866
 
        # UNC path
867
 
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
868
 
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
869
 
        # relative path
870
 
        cwd = osutils.getcwd().rstrip('/')
871
 
        drive = osutils._nt_splitdrive(cwd)[0]
872
 
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
873
 
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
874
 
        # unicode path
875
 
        u = u'\u1234'
876
 
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
877
 
 
878
974
 
879
975
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
880
976
    """Test win32 functions that create files."""
881
977
 
882
978
    def test_getcwd(self):
883
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
979
        self.requireFeature(features.UnicodeFilenameFeature)
884
980
        os.mkdir(u'mu-\xb5')
885
981
        os.chdir(u'mu-\xb5')
886
982
        # TODO: jam 20060427 This will probably fail on Mac OSX because
892
988
    def test_minimum_path_selection(self):
893
989
        self.assertEqual(set(),
894
990
            osutils.minimum_path_selection([]))
895
 
        self.assertEqual(set(['a']),
 
991
        self.assertEqual({'a'},
896
992
            osutils.minimum_path_selection(['a']))
897
 
        self.assertEqual(set(['a', 'b']),
 
993
        self.assertEqual({'a', 'b'},
898
994
            osutils.minimum_path_selection(['a', 'b']))
899
 
        self.assertEqual(set(['a/', 'b']),
 
995
        self.assertEqual({'a/', 'b'},
900
996
            osutils.minimum_path_selection(['a/', 'b']))
901
 
        self.assertEqual(set(['a/', 'b']),
 
997
        self.assertEqual({'a/', 'b'},
902
998
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
903
 
        self.assertEqual(set(['a-b', 'a', 'a0b']),
 
999
        self.assertEqual({'a-b', 'a', 'a0b'},
904
1000
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
905
1001
 
906
1002
    def test_mkdtemp(self):
916
1012
        b.close()
917
1013
 
918
1014
        osutils._win32_rename('b', 'a')
919
 
        self.failUnlessExists('a')
920
 
        self.failIfExists('b')
 
1015
        self.assertPathExists('a')
 
1016
        self.assertPathDoesNotExist('b')
921
1017
        self.assertFileEqual('baz\n', 'a')
922
1018
 
923
1019
    def test_rename_missing_file(self):
927
1023
 
928
1024
        try:
929
1025
            osutils._win32_rename('b', 'a')
930
 
        except (IOError, OSError), e:
 
1026
        except (IOError, OSError) as e:
931
1027
            self.assertEqual(errno.ENOENT, e.errno)
932
1028
        self.assertFileEqual('foo\n', 'a')
933
1029
 
935
1031
        os.mkdir('a')
936
1032
        try:
937
1033
            osutils._win32_rename('b', 'a')
938
 
        except (IOError, OSError), e:
 
1034
        except (IOError, OSError) as e:
939
1035
            self.assertEqual(errno.ENOENT, e.errno)
940
1036
 
941
1037
    def test_rename_current_dir(self):
947
1043
        # doesn't exist.
948
1044
        try:
949
1045
            osutils._win32_rename('b', '.')
950
 
        except (IOError, OSError), e:
 
1046
        except (IOError, OSError) as e:
951
1047
            self.assertEqual(errno.ENOENT, e.errno)
952
1048
 
953
1049
    def test_splitpath(self):
976
1072
    """Test mac special functions that require directories."""
977
1073
 
978
1074
    def test_getcwd(self):
979
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1075
        self.requireFeature(features.UnicodeFilenameFeature)
980
1076
        os.mkdir(u'B\xe5gfors')
981
1077
        os.chdir(u'B\xe5gfors')
982
1078
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
983
1079
 
984
1080
    def test_getcwd_nonnorm(self):
985
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1081
        self.requireFeature(features.UnicodeFilenameFeature)
986
1082
        # Test that _mac_getcwd() will normalize this path
987
1083
        os.mkdir(u'Ba\u030agfors')
988
1084
        os.chdir(u'Ba\u030agfors')
998
1094
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
999
1095
 
1000
1096
    def test_osutils_binding(self):
1001
 
        from bzrlib.tests import test__chunks_to_lines
 
1097
        from . import test__chunks_to_lines
1002
1098
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
1003
 
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
 
1099
            from .._chunks_to_lines_pyx import chunks_to_lines
1004
1100
        else:
1005
 
            from bzrlib._chunks_to_lines_py import chunks_to_lines
 
1101
            from .._chunks_to_lines_py import chunks_to_lines
1006
1102
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1007
1103
 
1008
1104
 
1071
1167
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
1072
1168
 
1073
1169
    def test_walkdirs_os_error(self):
1074
 
        # <https://bugs.edge.launchpad.net/bzr/+bug/338653>
 
1170
        # <https://bugs.launchpad.net/bzr/+bug/338653>
1075
1171
        # Pyrex readdir didn't raise useful messages if it had an error
1076
1172
        # reading the directory
1077
1173
        if sys.platform == 'win32':
1078
1174
            raise tests.TestNotApplicable(
1079
1175
                "readdir IOError not tested on win32")
 
1176
        self.requireFeature(features.not_running_as_root)
1080
1177
        os.mkdir("test-unreadable")
1081
1178
        os.chmod("test-unreadable", 0000)
1082
1179
        # must chmod it back so that it can be removed
1083
 
        self.addCleanup(os.chmod, "test-unreadable", 0700)
 
1180
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
1084
1181
        # The error is not raised until the generator is actually evaluated.
1085
1182
        # (It would be ok if it happened earlier but at the moment it
1086
1183
        # doesn't.)
1087
1184
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1088
 
        self.assertEquals('./test-unreadable', e.filename)
1089
 
        self.assertEquals(errno.EACCES, e.errno)
 
1185
        self.assertEqual('./test-unreadable', e.filename)
 
1186
        self.assertEqual(errno.EACCES, e.errno)
1090
1187
        # Ensure the message contains the file name
1091
1188
        self.assertContainsRe(str(e), "\./test-unreadable")
1092
1189
 
 
1190
 
 
1191
    def test_walkdirs_encoding_error(self):
 
1192
        # <https://bugs.launchpad.net/bzr/+bug/488519>
 
1193
        # walkdirs didn't raise a useful message when the filenames
 
1194
        # are not using the filesystem's encoding
 
1195
 
 
1196
        # require a bytestring based filesystem
 
1197
        self.requireFeature(features.ByteStringNamedFilesystem)
 
1198
 
 
1199
        tree = [
 
1200
            '.bzr',
 
1201
            '0file',
 
1202
            '1dir/',
 
1203
            '1dir/0file',
 
1204
            '1dir/1dir/',
 
1205
            '1file'
 
1206
            ]
 
1207
 
 
1208
        self.build_tree(tree)
 
1209
 
 
1210
        # rename the 1file to a latin-1 filename
 
1211
        os.rename("./1file", "\xe8file")
 
1212
        if "\xe8file" not in os.listdir("."):
 
1213
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
 
1214
 
 
1215
        self._save_platform_info()
 
1216
        osutils._fs_enc = 'UTF-8'
 
1217
 
 
1218
        # this should raise on error
 
1219
        def attempt():
 
1220
            for dirdetail, dirblock in osutils.walkdirs('.'):
 
1221
                pass
 
1222
 
 
1223
        self.assertRaises(errors.BadFilenameEncoding, attempt)
 
1224
 
1093
1225
    def test__walkdirs_utf8(self):
1094
1226
        tree = [
1095
1227
            '.bzr',
1145
1277
            dirblock[:] = new_dirblock
1146
1278
 
1147
1279
    def _save_platform_info(self):
1148
 
        self.overrideAttr(win32utils, 'winver')
1149
1280
        self.overrideAttr(osutils, '_fs_enc')
1150
1281
        self.overrideAttr(osutils, '_selected_dir_reader')
1151
1282
 
1160
1291
    def test_force_walkdirs_utf8_fs_utf8(self):
1161
1292
        self.requireFeature(UTF8DirReaderFeature)
1162
1293
        self._save_platform_info()
1163
 
        win32utils.winver = None # Avoid the win32 detection code
1164
 
        osutils._fs_enc = 'UTF-8'
1165
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1294
        osutils._fs_enc = 'utf-8'
 
1295
        self.assertDirReaderIs(
 
1296
            UTF8DirReaderFeature.module.UTF8DirReader)
1166
1297
 
1167
1298
    def test_force_walkdirs_utf8_fs_ascii(self):
1168
1299
        self.requireFeature(UTF8DirReaderFeature)
1169
1300
        self._save_platform_info()
1170
 
        win32utils.winver = None # Avoid the win32 detection code
1171
 
        osutils._fs_enc = 'US-ASCII'
1172
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1173
 
 
1174
 
    def test_force_walkdirs_utf8_fs_ANSI(self):
1175
 
        self.requireFeature(UTF8DirReaderFeature)
1176
 
        self._save_platform_info()
1177
 
        win32utils.winver = None # Avoid the win32 detection code
1178
 
        osutils._fs_enc = 'ANSI_X3.4-1968'
1179
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1301
        osutils._fs_enc = 'ascii'
 
1302
        self.assertDirReaderIs(
 
1303
            UTF8DirReaderFeature.module.UTF8DirReader)
1180
1304
 
1181
1305
    def test_force_walkdirs_utf8_fs_latin1(self):
1182
1306
        self._save_platform_info()
1183
 
        win32utils.winver = None # Avoid the win32 detection code
1184
 
        osutils._fs_enc = 'latin1'
 
1307
        osutils._fs_enc = 'iso-8859-1'
1185
1308
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1186
1309
 
1187
1310
    def test_force_walkdirs_utf8_nt(self):
1188
1311
        # Disabled because the thunk of the whole walkdirs api is disabled.
1189
1312
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1190
1313
        self._save_platform_info()
1191
 
        win32utils.winver = 'Windows NT'
1192
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1314
        from .._walkdirs_win32 import Win32ReadDir
1193
1315
        self.assertDirReaderIs(Win32ReadDir)
1194
1316
 
1195
 
    def test_force_walkdirs_utf8_98(self):
1196
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1197
 
        self._save_platform_info()
1198
 
        win32utils.winver = 'Windows 98'
1199
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1200
 
 
1201
1317
    def test_unicode_walkdirs(self):
1202
1318
        """Walkdirs should always return unicode paths."""
1203
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1319
        self.requireFeature(features.UnicodeFilenameFeature)
1204
1320
        name0 = u'0file-\xb6'
1205
1321
        name1 = u'1dir-\u062c\u0648'
1206
1322
        name2 = u'2file-\u0633'
1243
1359
 
1244
1360
        The abspath portion might be in unicode or utf-8
1245
1361
        """
1246
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1362
        self.requireFeature(features.UnicodeFilenameFeature)
1247
1363
        name0 = u'0file-\xb6'
1248
1364
        name1 = u'1dir-\u062c\u0648'
1249
1365
        name2 = u'2file-\u0633'
1304
1420
 
1305
1421
        The abspath portion should be in unicode
1306
1422
        """
1307
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1423
        self.requireFeature(features.UnicodeFilenameFeature)
1308
1424
        # Use the unicode reader. TODO: split into driver-and-driven unit
1309
1425
        # tests.
1310
1426
        self._save_platform_info()
1351
1467
 
1352
1468
    def test__walkdirs_utf8_win32readdir(self):
1353
1469
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1354
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1355
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1470
        self.requireFeature(features.UnicodeFilenameFeature)
 
1471
        from .._walkdirs_win32 import Win32ReadDir
1356
1472
        self._save_platform_info()
1357
1473
        osutils._selected_dir_reader = Win32ReadDir()
1358
1474
        name0u = u'0file-\xb6'
1408
1524
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1409
1525
        """make sure our Stat values are valid"""
1410
1526
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1411
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1412
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1527
        self.requireFeature(features.UnicodeFilenameFeature)
 
1528
        from .._walkdirs_win32 import Win32ReadDir
1413
1529
        name0u = u'0file-\xb6'
1414
1530
        name0 = name0u.encode('utf8')
1415
1531
        self.build_tree([name0u])
1432
1548
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1433
1549
        """make sure our Stat values are valid"""
1434
1550
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1435
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1436
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1551
        self.requireFeature(features.UnicodeFilenameFeature)
 
1552
        from .._walkdirs_win32 import Win32ReadDir
1437
1553
        name0u = u'0dir-\u062c\u0648'
1438
1554
        name0 = name0u.encode('utf8')
1439
1555
        self.build_tree([name0u + '/'])
1538
1654
        self.assertEqual(['c'], os.listdir('target/b'))
1539
1655
 
1540
1656
    def test_copy_tree_symlinks(self):
1541
 
        self.requireFeature(tests.SymlinkFeature)
 
1657
        self.requireFeature(features.SymlinkFeature)
1542
1658
        self.build_tree(['source/'])
1543
1659
        os.symlink('a/generic/path', 'source/lnk')
1544
1660
        osutils.copy_tree('source', 'target')
1569
1685
                          ('d', 'source/b', 'target/b'),
1570
1686
                          ('f', 'source/b/c', 'target/b/c'),
1571
1687
                         ], processed_files)
1572
 
        self.failIfExists('target')
 
1688
        self.assertPathDoesNotExist('target')
1573
1689
        if osutils.has_symlinks():
1574
1690
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1575
1691
 
1580
1696
    def setUp(self):
1581
1697
        super(TestSetUnsetEnv, self).setUp()
1582
1698
 
1583
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
 
1699
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
1584
1700
                         'Environment was not cleaned up properly.'
1585
 
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
 
1701
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
1586
1702
        def cleanup():
1587
 
            if 'BZR_TEST_ENV_VAR' in os.environ:
1588
 
                del os.environ['BZR_TEST_ENV_VAR']
 
1703
            if 'BRZ_TEST_ENV_VAR' in os.environ:
 
1704
                del os.environ['BRZ_TEST_ENV_VAR']
1589
1705
        self.addCleanup(cleanup)
1590
1706
 
1591
1707
    def test_set(self):
1592
1708
        """Test that we can set an env variable"""
1593
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1709
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1594
1710
        self.assertEqual(None, old)
1595
 
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
 
1711
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
1596
1712
 
1597
1713
    def test_double_set(self):
1598
1714
        """Test that we get the old value out"""
1599
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1600
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
 
1715
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1716
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
1601
1717
        self.assertEqual('foo', old)
1602
 
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
 
1718
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
1603
1719
 
1604
1720
    def test_unicode(self):
1605
1721
        """Environment can only contain plain strings
1612
1728
                'Cannot find a unicode character that works in encoding %s'
1613
1729
                % (osutils.get_user_encoding(),))
1614
1730
 
1615
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1616
 
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
 
1731
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
 
1732
        self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1617
1733
 
1618
1734
    def test_unset(self):
1619
1735
        """Test that passing None will remove the env var"""
1620
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1621
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
 
1736
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1737
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
1622
1738
        self.assertEqual('foo', old)
1623
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1624
 
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1739
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1740
        self.assertFalse('BRZ_TEST_ENV_VAR' in os.environ)
1625
1741
 
1626
1742
 
1627
1743
class TestSizeShaFile(tests.TestCaseInTempDir):
1663
1779
class TestResourceLoading(tests.TestCaseInTempDir):
1664
1780
 
1665
1781
    def test_resource_string(self):
1666
 
        # test resource in bzrlib
1667
 
        text = osutils.resource_string('bzrlib', 'debug.py')
 
1782
        # test resource in breezy
 
1783
        text = osutils.resource_string('breezy', 'debug.py')
1668
1784
        self.assertContainsRe(text, "debug_flags = set()")
1669
 
        # test resource under bzrlib
1670
 
        text = osutils.resource_string('bzrlib.ui', 'text.py')
 
1785
        # test resource under breezy
 
1786
        text = osutils.resource_string('breezy.ui', 'text.py')
1671
1787
        self.assertContainsRe(text, "class TextUIFactory")
1672
1788
        # test unsupported package
1673
1789
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1674
1790
            'yyy.xx')
1675
1791
        # test unknown resource
1676
 
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1677
 
 
1678
 
 
1679
 
class TestReCompile(tests.TestCase):
1680
 
 
1681
 
    def test_re_compile_checked(self):
1682
 
        r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1683
 
        self.assertTrue(r.match('aaaa'))
1684
 
        self.assertTrue(r.match('aAaA'))
1685
 
 
1686
 
    def test_re_compile_checked_error(self):
1687
 
        # like https://bugs.launchpad.net/bzr/+bug/251352
1688
 
        err = self.assertRaises(
1689
 
            errors.BzrCommandError,
1690
 
            osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1691
 
        self.assertEqual(
1692
 
            "Invalid regular expression in test case: '*': "
1693
 
            "nothing to repeat",
1694
 
            str(err))
 
1792
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
1695
1793
 
1696
1794
 
1697
1795
class TestDirReader(tests.TestCaseInTempDir):
1698
1796
 
 
1797
    scenarios = dir_reader_scenarios()
 
1798
 
1699
1799
    # Set by load_tests
1700
1800
    _dir_reader_class = None
1701
1801
    _native_to_unicode = None
1702
1802
 
1703
1803
    def setUp(self):
1704
 
        tests.TestCaseInTempDir.setUp(self)
 
1804
        super(TestDirReader, self).setUp()
1705
1805
        self.overrideAttr(osutils,
1706
1806
                          '_selected_dir_reader', self._dir_reader_class())
1707
1807
 
1801
1901
        return filtered_dirblocks
1802
1902
 
1803
1903
    def test_walk_unicode_tree(self):
1804
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1904
        self.requireFeature(features.UnicodeFilenameFeature)
1805
1905
        tree, expected_dirblocks = self._get_unicode_tree()
1806
1906
        self.build_tree(tree)
1807
1907
        result = list(osutils._walkdirs_utf8('.'))
1808
1908
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1809
1909
 
1810
1910
    def test_symlink(self):
1811
 
        self.requireFeature(tests.SymlinkFeature)
1812
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1911
        self.requireFeature(features.SymlinkFeature)
 
1912
        self.requireFeature(features.UnicodeFilenameFeature)
1813
1913
        target = u'target\N{Euro Sign}'
1814
1914
        link_name = u'l\N{Euro Sign}nk'
1815
1915
        os.symlink(target, link_name)
1833
1933
    But prior python versions failed to properly encode the passed unicode
1834
1934
    string.
1835
1935
    """
1836
 
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
 
1936
    _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
1837
1937
 
1838
1938
    def setUp(self):
1839
1939
        super(tests.TestCaseInTempDir, self).setUp()
1842
1942
        os.symlink(self.target, self.link)
1843
1943
 
1844
1944
    def test_os_readlink_link_encoding(self):
1845
 
        if sys.version_info < (2, 6):
1846
 
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1847
 
        else:
1848
 
            self.assertEquals(self.target,  os.readlink(self.link))
 
1945
        self.assertEqual(self.target,  os.readlink(self.link))
1849
1946
 
1850
1947
    def test_os_readlink_link_decoding(self):
1851
 
        self.assertEquals(self.target.encode(osutils._fs_enc),
 
1948
        self.assertEqual(self.target.encode(osutils._fs_enc),
1852
1949
                          os.readlink(self.link.encode(osutils._fs_enc)))
1853
1950
 
1854
1951
 
1863
1960
        self.assertIsInstance(concurrency, int)
1864
1961
 
1865
1962
    def test_local_concurrency_environment_variable(self):
1866
 
        os.environ['BZR_CONCURRENCY'] = '2'
 
1963
        self.overrideEnv('BRZ_CONCURRENCY', '2')
1867
1964
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1868
 
        os.environ['BZR_CONCURRENCY'] = '3'
 
1965
        self.overrideEnv('BRZ_CONCURRENCY', '3')
1869
1966
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1870
 
        os.environ['BZR_CONCURRENCY'] = 'foo'
 
1967
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
1871
1968
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1872
1969
 
1873
1970
    def test_option_concurrency(self):
1874
 
        os.environ['BZR_CONCURRENCY'] = '1'
 
1971
        self.overrideEnv('BRZ_CONCURRENCY', '1')
1875
1972
        self.run_bzr('rocks --concurrency 42')
1876
 
        # Command line overrides envrionment variable
1877
 
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1878
 
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
 
1973
        # Command line overrides environment variable
 
1974
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
 
1975
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
1879
1976
 
1880
1977
 
1881
1978
class TestFailedToLoadExtension(tests.TestCase):
1882
1979
 
1883
1980
    def _try_loading(self):
1884
1981
        try:
1885
 
            import bzrlib._fictional_extension_py
1886
 
        except ImportError, e:
 
1982
            import breezy._fictional_extension_py
 
1983
        except ImportError as e:
1887
1984
            osutils.failed_to_load_extension(e)
1888
1985
            return True
1889
1986
 
1894
1991
    def test_failure_to_load(self):
1895
1992
        self._try_loading()
1896
1993
        self.assertLength(1, osutils._extension_load_failures)
1897
 
        self.assertEquals(osutils._extension_load_failures[0],
 
1994
        self.assertEqual(osutils._extension_load_failures[0],
1898
1995
            "No module named _fictional_extension_py")
1899
1996
 
1900
1997
    def test_report_extension_load_failures_no_warning(self):
1904
2001
        self.assertLength(0, warnings)
1905
2002
 
1906
2003
    def test_report_extension_load_failures_message(self):
1907
 
        log = StringIO()
 
2004
        log = BytesIO()
1908
2005
        trace.push_log_file(log)
1909
2006
        self.assertTrue(self._try_loading())
1910
2007
        osutils.report_extension_load_failures()
1911
2008
        self.assertContainsRe(
1912
2009
            log.getvalue(),
1913
 
            r"bzr: warning: some compiled extensions could not be loaded; "
 
2010
            r"brz: warning: some compiled extensions could not be loaded; "
1914
2011
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
1915
2012
            )
1916
2013
 
1917
2014
 
1918
2015
class TestTerminalWidth(tests.TestCase):
1919
2016
 
 
2017
    def setUp(self):
 
2018
        super(TestTerminalWidth, self).setUp()
 
2019
        self._orig_terminal_size_state = osutils._terminal_size_state
 
2020
        self._orig_first_terminal_size = osutils._first_terminal_size
 
2021
        self.addCleanup(self.restore_osutils_globals)
 
2022
        osutils._terminal_size_state = 'no_data'
 
2023
        osutils._first_terminal_size = None
 
2024
 
 
2025
    def restore_osutils_globals(self):
 
2026
        osutils._terminal_size_state = self._orig_terminal_size_state
 
2027
        osutils._first_terminal_size = self._orig_first_terminal_size
 
2028
 
1920
2029
    def replace_stdout(self, new):
1921
2030
        self.overrideAttr(sys, 'stdout', new)
1922
2031
 
1934
2043
    def test_default_values(self):
1935
2044
        self.assertEqual(80, osutils.default_terminal_width)
1936
2045
 
1937
 
    def test_defaults_to_BZR_COLUMNS(self):
1938
 
        # BZR_COLUMNS is set by the test framework
1939
 
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1940
 
        os.environ['BZR_COLUMNS'] = '12'
 
2046
    def test_defaults_to_BRZ_COLUMNS(self):
 
2047
        # BRZ_COLUMNS is set by the test framework
 
2048
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
 
2049
        self.overrideEnv('BRZ_COLUMNS', '12')
1941
2050
        self.assertEqual(12, osutils.terminal_width())
1942
2051
 
 
2052
    def test_BRZ_COLUMNS_0_no_limit(self):
 
2053
        self.overrideEnv('BRZ_COLUMNS', '0')
 
2054
        self.assertEqual(None, osutils.terminal_width())
 
2055
 
1943
2056
    def test_falls_back_to_COLUMNS(self):
1944
 
        del os.environ['BZR_COLUMNS']
 
2057
        self.overrideEnv('BRZ_COLUMNS', None)
1945
2058
        self.assertNotEqual('42', os.environ['COLUMNS'])
1946
2059
        self.set_fake_tty()
1947
 
        os.environ['COLUMNS'] = '42'
 
2060
        self.overrideEnv('COLUMNS', '42')
1948
2061
        self.assertEqual(42, osutils.terminal_width())
1949
2062
 
1950
2063
    def test_tty_default_without_columns(self):
1951
 
        del os.environ['BZR_COLUMNS']
1952
 
        del os.environ['COLUMNS']
 
2064
        self.overrideEnv('BRZ_COLUMNS', None)
 
2065
        self.overrideEnv('COLUMNS', None)
1953
2066
 
1954
2067
        def terminal_size(w, h):
1955
2068
            return 42, 42
1962
2075
        self.assertEqual(42, osutils.terminal_width())
1963
2076
 
1964
2077
    def test_non_tty_default_without_columns(self):
1965
 
        del os.environ['BZR_COLUMNS']
1966
 
        del os.environ['COLUMNS']
 
2078
        self.overrideEnv('BRZ_COLUMNS', None)
 
2079
        self.overrideEnv('COLUMNS', None)
1967
2080
        self.replace_stdout(None)
1968
2081
        self.assertEqual(None, osutils.terminal_width())
1969
2082
 
1979
2092
        else:
1980
2093
            self.overrideAttr(termios, 'TIOCGWINSZ')
1981
2094
            del termios.TIOCGWINSZ
1982
 
        del os.environ['BZR_COLUMNS']
1983
 
        del os.environ['COLUMNS']
 
2095
        self.overrideEnv('BRZ_COLUMNS', None)
 
2096
        self.overrideEnv('COLUMNS', None)
1984
2097
        # Whatever the result is, if we don't raise an exception, it's ok.
1985
2098
        osutils.terminal_width()
1986
2099
 
 
2100
 
1987
2101
class TestCreationOps(tests.TestCaseInTempDir):
1988
2102
    _test_needs_features = [features.chown_feature]
1989
2103
 
1990
2104
    def setUp(self):
1991
 
        tests.TestCaseInTempDir.setUp(self)
 
2105
        super(TestCreationOps, self).setUp()
1992
2106
        self.overrideAttr(os, 'chown', self._dummy_chown)
1993
2107
 
1994
2108
        # params set by call to _dummy_chown
2004
2118
        osutils.copy_ownership_from_path('test_file', ownsrc)
2005
2119
 
2006
2120
        s = os.stat(ownsrc)
2007
 
        self.assertEquals(self.path, 'test_file')
2008
 
        self.assertEquals(self.uid, s.st_uid)
2009
 
        self.assertEquals(self.gid, s.st_gid)
 
2121
        self.assertEqual(self.path, 'test_file')
 
2122
        self.assertEqual(self.uid, s.st_uid)
 
2123
        self.assertEqual(self.gid, s.st_gid)
2010
2124
 
2011
2125
    def test_copy_ownership_nonesrc(self):
2012
2126
        """copy_ownership_from_path test with src=None."""
2015
2129
        osutils.copy_ownership_from_path('test_file')
2016
2130
 
2017
2131
        s = os.stat('..')
2018
 
        self.assertEquals(self.path, 'test_file')
2019
 
        self.assertEquals(self.uid, s.st_uid)
2020
 
        self.assertEquals(self.gid, s.st_gid)
 
2132
        self.assertEqual(self.path, 'test_file')
 
2133
        self.assertEqual(self.uid, s.st_uid)
 
2134
        self.assertEqual(self.gid, s.st_gid)
 
2135
 
 
2136
 
 
2137
class TestPathFromEnviron(tests.TestCase):
 
2138
 
 
2139
    def test_is_unicode(self):
 
2140
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
 
2141
        path = osutils.path_from_environ('BRZ_TEST_PATH')
 
2142
        self.assertIsInstance(path, unicode)
 
2143
        self.assertEqual(u'./anywhere at all/', path)
 
2144
 
 
2145
    def test_posix_path_env_ascii(self):
 
2146
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
 
2147
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
 
2148
        self.assertIsInstance(home, unicode)
 
2149
        self.assertEqual(u'/tmp', home)
 
2150
 
 
2151
    def test_posix_path_env_unicode(self):
 
2152
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2153
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
 
2154
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2155
        self.assertEqual(u'/home/\xa7test',
 
2156
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2157
        osutils._fs_enc = "iso8859-5"
 
2158
        self.assertEqual(u'/home/\u0407test',
 
2159
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2160
        osutils._fs_enc = "utf-8"
 
2161
        self.assertRaises(errors.BadFilenameEncoding,
 
2162
            osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
 
2163
 
 
2164
 
 
2165
class TestGetHomeDir(tests.TestCase):
 
2166
 
 
2167
    def test_is_unicode(self):
 
2168
        home = osutils._get_home_dir()
 
2169
        self.assertIsInstance(home, unicode)
 
2170
 
 
2171
    def test_posix_homeless(self):
 
2172
        self.overrideEnv('HOME', None)
 
2173
        home = osutils._get_home_dir()
 
2174
        self.assertIsInstance(home, unicode)
 
2175
 
 
2176
    def test_posix_home_ascii(self):
 
2177
        self.overrideEnv('HOME', '/home/test')
 
2178
        home = osutils._posix_get_home_dir()
 
2179
        self.assertIsInstance(home, unicode)
 
2180
        self.assertEqual(u'/home/test', home)
 
2181
 
 
2182
    def test_posix_home_unicode(self):
 
2183
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2184
        self.overrideEnv('HOME', '/home/\xa7test')
 
2185
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2186
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2187
        osutils._fs_enc = "iso8859-5"
 
2188
        self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
 
2189
        osutils._fs_enc = "utf-8"
 
2190
        self.assertRaises(errors.BadFilenameEncoding,
 
2191
            osutils._posix_get_home_dir)
 
2192
 
 
2193
 
 
2194
class TestGetuserUnicode(tests.TestCase):
 
2195
 
 
2196
    def test_is_unicode(self):
 
2197
        user = osutils.getuser_unicode()
 
2198
        self.assertIsInstance(user, unicode)
 
2199
 
 
2200
    def envvar_to_override(self):
 
2201
        if sys.platform == "win32":
 
2202
            # Disable use of platform calls on windows so envvar is used
 
2203
            self.overrideAttr(win32utils, 'has_ctypes', False)
 
2204
            return 'USERNAME' # only variable used on windows
 
2205
        return 'LOGNAME' # first variable checked by getpass.getuser()
 
2206
 
 
2207
    def test_ascii_user(self):
 
2208
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
 
2209
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
 
2210
 
 
2211
    def test_unicode_user(self):
 
2212
        ue = osutils.get_user_encoding()
 
2213
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
2214
        if uni_val is None:
 
2215
            raise tests.TestSkipped(
 
2216
                'Cannot find a unicode character that works in encoding %s'
 
2217
                % (osutils.get_user_encoding(),))
 
2218
        uni_username = u'jrandom' + uni_val
 
2219
        encoded_username = uni_username.encode(ue)
 
2220
        self.overrideEnv(self.envvar_to_override(), encoded_username)
 
2221
        self.assertEqual(uni_username, osutils.getuser_unicode())
 
2222
 
 
2223
 
 
2224
class TestBackupNames(tests.TestCase):
 
2225
 
 
2226
    def setUp(self):
 
2227
        super(TestBackupNames, self).setUp()
 
2228
        self.backups = []
 
2229
 
 
2230
    def backup_exists(self, name):
 
2231
        return name in self.backups
 
2232
 
 
2233
    def available_backup_name(self, name):
 
2234
        backup_name = osutils.available_backup_name(name, self.backup_exists)
 
2235
        self.backups.append(backup_name)
 
2236
        return backup_name
 
2237
 
 
2238
    def assertBackupName(self, expected, name):
 
2239
        self.assertEqual(expected, self.available_backup_name(name))
 
2240
 
 
2241
    def test_empty(self):
 
2242
        self.assertBackupName('file.~1~', 'file')
 
2243
 
 
2244
    def test_existing(self):
 
2245
        self.available_backup_name('file')
 
2246
        self.available_backup_name('file')
 
2247
        self.assertBackupName('file.~3~', 'file')
 
2248
        # Empty slots are found, this is not a strict requirement and may be
 
2249
        # revisited if we test against all implementations.
 
2250
        self.backups.remove('file.~2~')
 
2251
        self.assertBackupName('file.~2~', 'file')
 
2252
 
 
2253
 
 
2254
class TestFindExecutableInPath(tests.TestCase):
 
2255
 
 
2256
    def test_windows(self):
 
2257
        if sys.platform != 'win32':
 
2258
            raise tests.TestSkipped('test requires win32')
 
2259
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
 
2260
        self.assertTrue(
 
2261
            osutils.find_executable_on_path('explorer.exe') is not None)
 
2262
        self.assertTrue(
 
2263
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
 
2264
        self.assertTrue(
 
2265
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2266
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
 
2267
        
 
2268
    def test_windows_app_path(self):
 
2269
        if sys.platform != 'win32':
 
2270
            raise tests.TestSkipped('test requires win32')
 
2271
        # Override PATH env var so that exe can only be found on App Path
 
2272
        self.overrideEnv('PATH', '')
 
2273
        # Internt Explorer is always registered in the App Path
 
2274
        self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
 
2275
 
 
2276
    def test_other(self):
 
2277
        if sys.platform == 'win32':
 
2278
            raise tests.TestSkipped('test requires non-win32')
 
2279
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
 
2280
        self.assertTrue(
 
2281
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2282
 
 
2283
 
 
2284
class TestEnvironmentErrors(tests.TestCase):
 
2285
    """Test handling of environmental errors"""
 
2286
 
 
2287
    def test_is_oserror(self):
 
2288
        self.assertTrue(osutils.is_environment_error(
 
2289
            OSError(errno.EINVAL, "Invalid parameter")))
 
2290
 
 
2291
    def test_is_ioerror(self):
 
2292
        self.assertTrue(osutils.is_environment_error(
 
2293
            IOError(errno.EINVAL, "Invalid parameter")))
 
2294
 
 
2295
    def test_is_socket_error(self):
 
2296
        self.assertTrue(osutils.is_environment_error(
 
2297
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2298
 
 
2299
    def test_is_select_error(self):
 
2300
        self.assertTrue(osutils.is_environment_error(
 
2301
            select.error(errno.EINVAL, "Invalid parameter")))
 
2302
 
 
2303
    def test_is_pywintypes_error(self):
 
2304
        self.requireFeature(features.pywintypes)
 
2305
        import pywintypes
 
2306
        self.assertTrue(osutils.is_environment_error(
 
2307
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))