/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_osutils.py

  • Committer: Jelmer Vernooij
  • Date: 2017-11-21 20:14:44 UTC
  • mfrom: (6821.1.1 ignore-warnings)
  • Revision ID: jelmer@jelmer.uk-20171121201444-dvb7yjku3zwjev83
Merge lp:~jelmer/brz/ignore-warnings.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
 
from cStringIO import StringIO
 
19
from __future__ import absolute_import, division
 
20
 
20
21
import errno
21
22
import os
22
23
import re
 
24
import select
23
25
import socket
24
 
import stat
25
26
import sys
 
27
import tempfile
26
28
import time
27
29
 
28
 
from bzrlib import (
 
30
from .. import (
29
31
    errors,
 
32
    lazy_regex,
30
33
    osutils,
 
34
    symbol_versioning,
31
35
    tests,
32
36
    trace,
33
37
    win32utils,
34
38
    )
35
 
from bzrlib.tests import (
 
39
from ..sixish import (
 
40
    BytesIO,
 
41
    )
 
42
from . import (
36
43
    features,
37
44
    file_utils,
38
45
    test__walkdirs_win32,
39
46
    )
40
 
 
41
 
 
42
 
class _UTF8DirReaderFeature(tests.Feature):
 
47
from .scenarios import load_tests_apply_scenarios
 
48
 
 
49
 
 
50
class _UTF8DirReaderFeature(features.ModuleAvailableFeature):
43
51
 
44
52
    def _probe(self):
45
53
        try:
46
 
            from bzrlib import _readdir_pyx
 
54
            from .. import _readdir_pyx
 
55
            self._module = _readdir_pyx
47
56
            self.reader = _readdir_pyx.UTF8DirReader
48
57
            return True
49
58
        except ImportError:
50
59
            return False
51
60
 
52
 
    def feature_name(self):
53
 
        return 'bzrlib._readdir_pyx'
54
 
 
55
 
UTF8DirReaderFeature = _UTF8DirReaderFeature()
56
 
 
57
 
term_ios_feature = tests.ModuleAvailableFeature('termios')
 
61
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
 
62
 
 
63
term_ios_feature = features.ModuleAvailableFeature('termios')
58
64
 
59
65
 
60
66
def _already_unicode(s):
78
84
    # Some DirReaders are platform specific and even there they may not be
79
85
    # available.
80
86
    if UTF8DirReaderFeature.available():
81
 
        from bzrlib import _readdir_pyx
 
87
        from .. import _readdir_pyx
82
88
        scenarios.append(('utf8',
83
89
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
84
90
                               _native_to_unicode=_utf8_to_unicode)))
85
91
 
86
92
    if test__walkdirs_win32.win32_readdir_feature.available():
87
93
        try:
88
 
            from bzrlib import _walkdirs_win32
 
94
            from .. import _walkdirs_win32
89
95
            scenarios.append(
90
96
                ('win32',
91
97
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
95
101
    return scenarios
96
102
 
97
103
 
98
 
def load_tests(basic_tests, module, loader):
99
 
    suite = loader.suiteClass()
100
 
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
101
 
        basic_tests, tests.condition_isinstance(TestDirReader))
102
 
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
103
 
    suite.addTest(remaining_tests)
104
 
    return suite
 
104
load_tests = load_tests_apply_scenarios
105
105
 
106
106
 
107
107
class TestContainsWhitespace(tests.TestCase):
108
108
 
109
109
    def test_contains_whitespace(self):
110
 
        self.failUnless(osutils.contains_whitespace(u' '))
111
 
        self.failUnless(osutils.contains_whitespace(u'hello there'))
112
 
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
113
 
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
114
 
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
115
 
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
110
        self.assertTrue(osutils.contains_whitespace(u' '))
 
111
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
 
112
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
 
113
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
 
114
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
 
115
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
116
116
 
117
117
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
118
118
        # is whitespace, but we do not.
119
 
        self.failIf(osutils.contains_whitespace(u''))
120
 
        self.failIf(osutils.contains_whitespace(u'hellothere'))
121
 
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
119
        self.assertFalse(osutils.contains_whitespace(u''))
 
120
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
 
121
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
122
122
 
123
123
 
124
124
class TestRename(tests.TestCaseInTempDir):
138
138
        # This should work everywhere
139
139
        self.create_file('a', 'something in a\n')
140
140
        self._fancy_rename('a', 'b')
141
 
        self.failIfExists('a')
142
 
        self.failUnlessExists('b')
 
141
        self.assertPathDoesNotExist('a')
 
142
        self.assertPathExists('b')
143
143
        self.check_file_contents('b', 'something in a\n')
144
144
 
145
145
        self.create_file('a', 'new something in a\n')
152
152
        self.create_file('target', 'data in target\n')
153
153
        self.assertRaises((IOError, OSError), self._fancy_rename,
154
154
                          'missingsource', 'target')
155
 
        self.failUnlessExists('target')
 
155
        self.assertPathExists('target')
156
156
        self.check_file_contents('target', 'data in target\n')
157
157
 
158
158
    def test_fancy_rename_fails_if_source_and_target_missing(self):
163
163
        # Rename should be semi-atomic on all platforms
164
164
        self.create_file('a', 'something in a\n')
165
165
        osutils.rename('a', 'b')
166
 
        self.failIfExists('a')
167
 
        self.failUnlessExists('b')
 
166
        self.assertPathDoesNotExist('a')
 
167
        self.assertPathExists('b')
168
168
        self.check_file_contents('b', 'something in a\n')
169
169
 
170
170
        self.create_file('a', 'new something in a\n')
182
182
        # we can't use failUnlessExists on case-insensitive filesystem
183
183
        # so try to check shape of the tree
184
184
        shape = sorted(os.listdir('.'))
185
 
        self.assertEquals(['A', 'B'], shape)
 
185
        self.assertEqual(['A', 'B'], shape)
186
186
 
187
 
    def test_rename_error(self):
188
 
        # We wrap os.rename to make it give an error including the filenames
189
 
        # https://bugs.launchpad.net/bzr/+bug/491763
190
 
        err = self.assertRaises(OSError, osutils.rename,
191
 
            'nonexistent', 'target')
192
 
        self.assertContainsString(str(err), 'nonexistent')
 
187
    def test_rename_exception(self):
 
188
        try:
 
189
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
 
190
        except OSError as e:
 
191
            self.assertEqual(e.old_filename, 'nonexistent_path')
 
192
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
 
193
            self.assertTrue('nonexistent_path' in e.strerror)
 
194
            self.assertTrue('different_nonexistent_path' in e.strerror)
193
195
 
194
196
 
195
197
class TestRandChars(tests.TestCase):
222
224
                         (['src'], SRC_FOO_C),
223
225
                         (['src'], 'src'),
224
226
                         ]:
225
 
            self.assert_(osutils.is_inside_any(dirs, fn))
 
227
            self.assertTrue(osutils.is_inside_any(dirs, fn))
226
228
        for dirs, fn in [(['src'], 'srccontrol'),
227
229
                         (['src'], 'srccontrol/foo')]:
228
230
            self.assertFalse(osutils.is_inside_any(dirs, fn))
234
236
                         (['src/bar.c', 'bla/foo.c'], 'src'),
235
237
                         (['src'], 'src'),
236
238
                         ]:
237
 
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
 
239
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
238
240
 
239
241
        for dirs, fn in [(['src'], 'srccontrol'),
240
242
                         (['srccontrol/foo.c'], 'src'),
242
244
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
243
245
 
244
246
 
 
247
class TestLstat(tests.TestCaseInTempDir):
 
248
 
 
249
    def test_lstat_matches_fstat(self):
 
250
        # On Windows, lstat and fstat don't always agree, primarily in the
 
251
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
 
252
        # custom implementation.
 
253
        if sys.platform == 'win32':
 
254
            # We only have special lstat/fstat if we have the extension.
 
255
            # Without it, we may end up re-reading content when we don't have
 
256
            # to, but otherwise it doesn't effect correctness.
 
257
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
258
        f = open('test-file.txt', 'wb')
 
259
        self.addCleanup(f.close)
 
260
        f.write('some content\n')
 
261
        f.flush()
 
262
        self.assertEqualStat(osutils.fstat(f.fileno()),
 
263
                             osutils.lstat('test-file.txt'))
 
264
 
 
265
 
245
266
class TestRmTree(tests.TestCaseInTempDir):
246
267
 
247
268
    def test_rmtree(self):
259
280
 
260
281
        osutils.rmtree('dir')
261
282
 
262
 
        self.failIfExists('dir/file')
263
 
        self.failIfExists('dir')
 
283
        self.assertPathDoesNotExist('dir/file')
 
284
        self.assertPathDoesNotExist('dir')
264
285
 
265
286
 
266
287
class TestDeleteAny(tests.TestCaseInTempDir):
279
300
 
280
301
    def test_file_kind(self):
281
302
        self.build_tree(['file', 'dir/'])
282
 
        self.assertEquals('file', osutils.file_kind('file'))
283
 
        self.assertEquals('directory', osutils.file_kind('dir/'))
 
303
        self.assertEqual('file', osutils.file_kind('file'))
 
304
        self.assertEqual('directory', osutils.file_kind('dir/'))
284
305
        if osutils.has_symlinks():
285
306
            os.symlink('symlink', 'symlink')
286
 
            self.assertEquals('symlink', osutils.file_kind('symlink'))
 
307
            self.assertEqual('symlink', osutils.file_kind('symlink'))
287
308
 
288
309
        # TODO: jam 20060529 Test a block device
289
310
        try:
290
311
            os.lstat('/dev/null')
291
 
        except OSError, e:
 
312
        except OSError as e:
292
313
            if e.errno not in (errno.ENOENT,):
293
314
                raise
294
315
        else:
295
 
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
 
316
            self.assertEqual('chardev', osutils.file_kind('/dev/null'))
296
317
 
297
318
        mkfifo = getattr(os, 'mkfifo', None)
298
319
        if mkfifo:
299
320
            mkfifo('fifo')
300
321
            try:
301
 
                self.assertEquals('fifo', osutils.file_kind('fifo'))
 
322
                self.assertEqual('fifo', osutils.file_kind('fifo'))
302
323
            finally:
303
324
                os.remove('fifo')
304
325
 
307
328
            s = socket.socket(AF_UNIX)
308
329
            s.bind('socket')
309
330
            try:
310
 
                self.assertEquals('socket', osutils.file_kind('socket'))
 
331
                self.assertEqual('socket', osutils.file_kind('socket'))
311
332
            finally:
312
333
                os.remove('socket')
313
334
 
332
353
 
333
354
        orig_umask = osutils.get_umask()
334
355
        self.addCleanup(os.umask, orig_umask)
335
 
        os.umask(0222)
336
 
        self.assertEqual(0222, osutils.get_umask())
337
 
        os.umask(0022)
338
 
        self.assertEqual(0022, osutils.get_umask())
339
 
        os.umask(0002)
340
 
        self.assertEqual(0002, osutils.get_umask())
341
 
        os.umask(0027)
342
 
        self.assertEqual(0027, osutils.get_umask())
 
356
        os.umask(0o222)
 
357
        self.assertEqual(0o222, osutils.get_umask())
 
358
        os.umask(0o022)
 
359
        self.assertEqual(0o022, osutils.get_umask())
 
360
        os.umask(0o002)
 
361
        self.assertEqual(0o002, osutils.get_umask())
 
362
        os.umask(0o027)
 
363
        self.assertEqual(0o027, osutils.get_umask())
343
364
 
344
365
 
345
366
class TestDateTime(tests.TestCase):
381
402
        self.assertFormatedDelta('2 seconds in the future', -2)
382
403
 
383
404
    def test_format_date(self):
384
 
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
405
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
385
406
            osutils.format_date, 0, timezone='foo')
386
407
        self.assertIsInstance(osutils.format_date(0), str)
387
408
        self.assertIsInstance(osutils.format_local_date(0), unicode)
418
439
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
419
440
 
420
441
 
 
442
class TestFdatasync(tests.TestCaseInTempDir):
 
443
 
 
444
    def do_fdatasync(self):
 
445
        f = tempfile.NamedTemporaryFile()
 
446
        osutils.fdatasync(f.fileno())
 
447
        f.close()
 
448
 
 
449
    @staticmethod
 
450
    def raise_eopnotsupp(*args, **kwargs):
 
451
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
 
452
 
 
453
    @staticmethod
 
454
    def raise_enotsup(*args, **kwargs):
 
455
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
 
456
 
 
457
    def test_fdatasync_handles_system_function(self):
 
458
        self.overrideAttr(os, "fdatasync")
 
459
        self.do_fdatasync()
 
460
 
 
461
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
 
462
        self.overrideAttr(os, "fdatasync")
 
463
        self.overrideAttr(os, "fsync")
 
464
        self.do_fdatasync()
 
465
 
 
466
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
 
467
        self.overrideAttr(errno, "EOPNOTSUPP")
 
468
        self.do_fdatasync()
 
469
 
 
470
    def test_fdatasync_catches_ENOTSUP(self):
 
471
        enotsup = getattr(errno, "ENOTSUP", None)
 
472
        if enotsup is None:
 
473
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
 
474
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
 
475
        self.do_fdatasync()
 
476
 
 
477
    def test_fdatasync_catches_EOPNOTSUPP(self):
 
478
        enotsup = getattr(errno, "EOPNOTSUPP", None)
 
479
        if enotsup is None:
 
480
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
 
481
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
 
482
        self.do_fdatasync()
 
483
 
 
484
 
421
485
class TestLinks(tests.TestCaseInTempDir):
422
486
 
423
487
    def test_dereference_path(self):
424
 
        self.requireFeature(tests.SymlinkFeature)
 
488
        self.requireFeature(features.SymlinkFeature)
425
489
        cwd = osutils.realpath('.')
426
490
        os.mkdir('bar')
427
491
        bar_path = osutils.pathjoin(cwd, 'bar')
455
519
        # Make a file readonly
456
520
        osutils.make_readonly('file')
457
521
        mode = os.lstat('file').st_mode
458
 
        self.assertEqual(mode, mode & 0777555)
 
522
        self.assertEqual(mode, mode & 0o777555)
459
523
 
460
524
        # Make a file writable
461
525
        osutils.make_writable('file')
462
526
        mode = os.lstat('file').st_mode
463
 
        self.assertEqual(mode, mode | 0200)
 
527
        self.assertEqual(mode, mode | 0o200)
464
528
 
465
529
        if osutils.has_symlinks():
466
530
            # should not error when handed a symlink
474
538
 
475
539
class TestCanonicalRelPath(tests.TestCaseInTempDir):
476
540
 
477
 
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
 
541
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
478
542
 
479
543
    def test_canonical_relpath_simple(self):
480
544
        f = file('MixedCaseName', 'w')
481
545
        f.close()
482
546
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
483
 
        self.failUnlessEqual('work/MixedCaseName', actual)
 
547
        self.assertEqual('work/MixedCaseName', actual)
484
548
 
485
549
    def test_canonical_relpath_missing_tail(self):
486
550
        os.mkdir('MixedCaseParent')
487
551
        actual = osutils.canonical_relpath(self.test_base_dir,
488
552
                                           'mixedcaseparent/nochild')
489
 
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
 
553
        self.assertEqual('work/MixedCaseParent/nochild', actual)
490
554
 
491
555
 
492
556
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
536
600
    """Test pumpfile method."""
537
601
 
538
602
    def setUp(self):
539
 
        tests.TestCase.setUp(self)
 
603
        super(TestPumpFile, self).setUp()
540
604
        # create a test datablock
541
605
        self.block_size = 512
542
 
        pattern = '0123456789ABCDEF'
543
 
        self.test_data = pattern * (3 * self.block_size / len(pattern))
 
606
        pattern = b'0123456789ABCDEF'
 
607
        self.test_data = pattern * (3 * self.block_size // len(pattern))
544
608
        self.test_data_len = len(self.test_data)
545
609
 
546
610
    def test_bracket_block_size(self):
550
614
        self.assertTrue(self.test_data_len > self.block_size)
551
615
 
552
616
        from_file = file_utils.FakeReadFile(self.test_data)
553
 
        to_file = StringIO()
 
617
        to_file = BytesIO()
554
618
 
555
 
        # read (max / 2) bytes and verify read size wasn't affected
556
 
        num_bytes_to_read = self.block_size / 2
 
619
        # read (max // 2) bytes and verify read size wasn't affected
 
620
        num_bytes_to_read = self.block_size // 2
557
621
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
558
622
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
559
623
        self.assertEqual(from_file.get_read_count(), 1)
591
655
 
592
656
        # retrieve data in blocks
593
657
        from_file = file_utils.FakeReadFile(self.test_data)
594
 
        to_file = StringIO()
 
658
        to_file = BytesIO()
595
659
        osutils.pumpfile(from_file, to_file, self.test_data_len,
596
660
                         self.block_size)
597
661
 
615
679
 
616
680
        # retrieve data to EOF
617
681
        from_file = file_utils.FakeReadFile(self.test_data)
618
 
        to_file = StringIO()
 
682
        to_file = BytesIO()
619
683
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
620
684
 
621
685
        # verify read size was equal to the maximum read size
635
699
        with this new version."""
636
700
        # retrieve data using default (old) pumpfile method
637
701
        from_file = file_utils.FakeReadFile(self.test_data)
638
 
        to_file = StringIO()
 
702
        to_file = BytesIO()
639
703
        osutils.pumpfile(from_file, to_file)
640
704
 
641
705
        # report error if the data wasn't equal (we only report the size due
649
713
        activity = []
650
714
        def log_activity(length, direction):
651
715
            activity.append((length, direction))
652
 
        from_file = StringIO(self.test_data)
653
 
        to_file = StringIO()
 
716
        from_file = BytesIO(self.test_data)
 
717
        to_file = BytesIO()
654
718
        osutils.pumpfile(from_file, to_file, buff_size=500,
655
719
                         report_activity=log_activity, direction='read')
656
720
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
657
721
                          (36, 'read')], activity)
658
722
 
659
 
        from_file = StringIO(self.test_data)
660
 
        to_file = StringIO()
 
723
        from_file = BytesIO(self.test_data)
 
724
        to_file = BytesIO()
661
725
        del activity[:]
662
726
        osutils.pumpfile(from_file, to_file, buff_size=500,
663
727
                         report_activity=log_activity, direction='write')
665
729
                          (36, 'write')], activity)
666
730
 
667
731
        # And with a limited amount of data
668
 
        from_file = StringIO(self.test_data)
669
 
        to_file = StringIO()
 
732
        from_file = BytesIO(self.test_data)
 
733
        to_file = BytesIO()
670
734
        del activity[:]
671
735
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
672
736
                         report_activity=log_activity, direction='read')
677
741
class TestPumpStringFile(tests.TestCase):
678
742
 
679
743
    def test_empty(self):
680
 
        output = StringIO()
681
 
        osutils.pump_string_file("", output)
682
 
        self.assertEqual("", output.getvalue())
 
744
        output = BytesIO()
 
745
        osutils.pump_string_file(b"", output)
 
746
        self.assertEqual(b"", output.getvalue())
683
747
 
684
748
    def test_more_than_segment_size(self):
685
 
        output = StringIO()
686
 
        osutils.pump_string_file("123456789", output, 2)
687
 
        self.assertEqual("123456789", output.getvalue())
 
749
        output = BytesIO()
 
750
        osutils.pump_string_file(b"123456789", output, 2)
 
751
        self.assertEqual(b"123456789", output.getvalue())
688
752
 
689
753
    def test_segment_size(self):
690
 
        output = StringIO()
691
 
        osutils.pump_string_file("12", output, 2)
692
 
        self.assertEqual("12", output.getvalue())
 
754
        output = BytesIO()
 
755
        osutils.pump_string_file(b"12", output, 2)
 
756
        self.assertEqual(b"12", output.getvalue())
693
757
 
694
758
    def test_segment_size_multiple(self):
695
 
        output = StringIO()
696
 
        osutils.pump_string_file("1234", output, 2)
697
 
        self.assertEqual("1234", output.getvalue())
 
759
        output = BytesIO()
 
760
        osutils.pump_string_file(b"1234", output, 2)
 
761
        self.assertEqual(b"1234", output.getvalue())
698
762
 
699
763
 
700
764
class TestRelpath(tests.TestCase):
763
827
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
764
828
 
765
829
    def test_from_unicode_string_ascii_contents(self):
766
 
        self.assertEqual('bargam',
767
 
                         osutils.safe_revision_id(u'bargam', warn=False))
768
 
 
769
 
    def test_from_unicode_deprecated(self):
770
 
        self.assertEqual('bargam',
771
 
            self.callDeprecated([osutils._revision_id_warning],
772
 
                                osutils.safe_revision_id, u'bargam'))
 
830
        self.assertRaises(TypeError,
 
831
                          osutils.safe_revision_id, u'bargam')
773
832
 
774
833
    def test_from_unicode_string_unicode_contents(self):
775
 
        self.assertEqual('bargam\xc2\xae',
776
 
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
834
        self.assertRaises(TypeError,
 
835
                         osutils.safe_revision_id, u'bargam\xae')
777
836
 
778
837
    def test_from_utf8_string(self):
779
838
        self.assertEqual('foo\xc2\xae',
790
849
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
791
850
 
792
851
    def test_from_unicode_string_ascii_contents(self):
793
 
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
794
 
 
795
 
    def test_from_unicode_deprecated(self):
796
 
        self.assertEqual('bargam',
797
 
            self.callDeprecated([osutils._file_id_warning],
798
 
                                osutils.safe_file_id, u'bargam'))
 
852
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
799
853
 
800
854
    def test_from_unicode_string_unicode_contents(self):
801
 
        self.assertEqual('bargam\xc2\xae',
802
 
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
855
        self.assertRaises(TypeError,
 
856
                          osutils.safe_file_id, u'bargam\xae')
803
857
 
804
858
    def test_from_utf8_string(self):
805
859
        self.assertEqual('foo\xc2\xae',
810
864
        self.assertEqual(None, osutils.safe_file_id(None))
811
865
 
812
866
 
 
867
class TestSendAll(tests.TestCase):
 
868
 
 
869
    def test_send_with_disconnected_socket(self):
 
870
        class DisconnectedSocket(object):
 
871
            def __init__(self, err):
 
872
                self.err = err
 
873
            def send(self, content):
 
874
                raise self.err
 
875
            def close(self):
 
876
                pass
 
877
        # All of these should be treated as ConnectionReset
 
878
        errs = []
 
879
        for err_cls in (IOError, socket.error):
 
880
            for errnum in osutils._end_of_stream_errors:
 
881
                errs.append(err_cls(errnum))
 
882
        for err in errs:
 
883
            sock = DisconnectedSocket(err)
 
884
            self.assertRaises(errors.ConnectionReset,
 
885
                osutils.send_all, sock, b'some more content')
 
886
 
 
887
    def test_send_with_no_progress(self):
 
888
        # See https://bugs.launchpad.net/bzr/+bug/1047309
 
889
        # It seems that paramiko can get into a state where it doesn't error,
 
890
        # but it returns 0 bytes sent for requests over and over again.
 
891
        class NoSendingSocket(object):
 
892
            def __init__(self):
 
893
                self.call_count = 0
 
894
            def send(self, bytes):
 
895
                self.call_count += 1
 
896
                if self.call_count > 100:
 
897
                    # Prevent the test suite from hanging
 
898
                    raise RuntimeError('too many calls')
 
899
                return 0
 
900
        sock = NoSendingSocket()
 
901
        self.assertRaises(errors.ConnectionReset,
 
902
                          osutils.send_all, sock, b'content')
 
903
        self.assertEqual(1, sock.call_count)
 
904
 
 
905
 
 
906
class TestPosixFuncs(tests.TestCase):
 
907
    """Test that the posix version of normpath returns an appropriate path
 
908
       when used with 2 leading slashes."""
 
909
 
 
910
    def test_normpath(self):
 
911
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
 
912
        self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
913
        self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
914
 
 
915
 
813
916
class TestWin32Funcs(tests.TestCase):
814
917
    """Test that _win32 versions of os utilities return appropriate paths."""
815
918
 
816
919
    def test_abspath(self):
 
920
        self.requireFeature(features.win32_feature)
817
921
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
818
922
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
819
923
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
832
936
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
833
937
        self.assertEqual('path/to/foo',
834
938
                         osutils._win32_pathjoin('path/to/', 'foo'))
835
 
        self.assertEqual('/foo',
 
939
 
 
940
    def test_pathjoin_late_bugfix(self):
 
941
        if sys.version_info < (2, 7, 6):
 
942
            expected = '/foo'
 
943
        else:
 
944
            expected = 'C:/foo'
 
945
        self.assertEqual(expected,
836
946
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
837
 
        self.assertEqual('/foo',
 
947
        self.assertEqual(expected,
838
948
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
839
949
 
840
950
    def test_normpath(self):
845
955
 
846
956
    def test_getcwd(self):
847
957
        cwd = osutils._win32_getcwd()
848
 
        os_cwd = os.getcwdu()
 
958
        os_cwd = osutils._getcwd()
849
959
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
850
960
        # win32 is inconsistent whether it returns lower or upper case
851
961
        # and even if it was consistent the user might type the other
859
969
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
860
970
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
861
971
 
862
 
    def test_win98_abspath(self):
863
 
        # absolute path
864
 
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
865
 
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
866
 
        # UNC path
867
 
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
868
 
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
869
 
        # relative path
870
 
        cwd = osutils.getcwd().rstrip('/')
871
 
        drive = osutils._nt_splitdrive(cwd)[0]
872
 
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
873
 
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
874
 
        # unicode path
875
 
        u = u'\u1234'
876
 
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
877
 
 
878
972
 
879
973
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
880
974
    """Test win32 functions that create files."""
881
975
 
882
976
    def test_getcwd(self):
883
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
977
        self.requireFeature(features.UnicodeFilenameFeature)
884
978
        os.mkdir(u'mu-\xb5')
885
979
        os.chdir(u'mu-\xb5')
886
980
        # TODO: jam 20060427 This will probably fail on Mac OSX because
892
986
    def test_minimum_path_selection(self):
893
987
        self.assertEqual(set(),
894
988
            osutils.minimum_path_selection([]))
895
 
        self.assertEqual(set(['a']),
 
989
        self.assertEqual({'a'},
896
990
            osutils.minimum_path_selection(['a']))
897
 
        self.assertEqual(set(['a', 'b']),
 
991
        self.assertEqual({'a', 'b'},
898
992
            osutils.minimum_path_selection(['a', 'b']))
899
 
        self.assertEqual(set(['a/', 'b']),
 
993
        self.assertEqual({'a/', 'b'},
900
994
            osutils.minimum_path_selection(['a/', 'b']))
901
 
        self.assertEqual(set(['a/', 'b']),
 
995
        self.assertEqual({'a/', 'b'},
902
996
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
903
 
        self.assertEqual(set(['a-b', 'a', 'a0b']),
 
997
        self.assertEqual({'a-b', 'a', 'a0b'},
904
998
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
905
999
 
906
1000
    def test_mkdtemp(self):
916
1010
        b.close()
917
1011
 
918
1012
        osutils._win32_rename('b', 'a')
919
 
        self.failUnlessExists('a')
920
 
        self.failIfExists('b')
 
1013
        self.assertPathExists('a')
 
1014
        self.assertPathDoesNotExist('b')
921
1015
        self.assertFileEqual('baz\n', 'a')
922
1016
 
923
1017
    def test_rename_missing_file(self):
927
1021
 
928
1022
        try:
929
1023
            osutils._win32_rename('b', 'a')
930
 
        except (IOError, OSError), e:
 
1024
        except (IOError, OSError) as e:
931
1025
            self.assertEqual(errno.ENOENT, e.errno)
932
1026
        self.assertFileEqual('foo\n', 'a')
933
1027
 
935
1029
        os.mkdir('a')
936
1030
        try:
937
1031
            osutils._win32_rename('b', 'a')
938
 
        except (IOError, OSError), e:
 
1032
        except (IOError, OSError) as e:
939
1033
            self.assertEqual(errno.ENOENT, e.errno)
940
1034
 
941
1035
    def test_rename_current_dir(self):
947
1041
        # doesn't exist.
948
1042
        try:
949
1043
            osutils._win32_rename('b', '.')
950
 
        except (IOError, OSError), e:
 
1044
        except (IOError, OSError) as e:
951
1045
            self.assertEqual(errno.ENOENT, e.errno)
952
1046
 
953
1047
    def test_splitpath(self):
976
1070
    """Test mac special functions that require directories."""
977
1071
 
978
1072
    def test_getcwd(self):
979
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1073
        self.requireFeature(features.UnicodeFilenameFeature)
980
1074
        os.mkdir(u'B\xe5gfors')
981
1075
        os.chdir(u'B\xe5gfors')
982
1076
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
983
1077
 
984
1078
    def test_getcwd_nonnorm(self):
985
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1079
        self.requireFeature(features.UnicodeFilenameFeature)
986
1080
        # Test that _mac_getcwd() will normalize this path
987
1081
        os.mkdir(u'Ba\u030agfors')
988
1082
        os.chdir(u'Ba\u030agfors')
998
1092
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
999
1093
 
1000
1094
    def test_osutils_binding(self):
1001
 
        from bzrlib.tests import test__chunks_to_lines
 
1095
        from . import test__chunks_to_lines
1002
1096
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
1003
 
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
 
1097
            from .._chunks_to_lines_pyx import chunks_to_lines
1004
1098
        else:
1005
 
            from bzrlib._chunks_to_lines_py import chunks_to_lines
 
1099
            from .._chunks_to_lines_py import chunks_to_lines
1006
1100
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1007
1101
 
1008
1102
 
1071
1165
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
1072
1166
 
1073
1167
    def test_walkdirs_os_error(self):
1074
 
        # <https://bugs.edge.launchpad.net/bzr/+bug/338653>
 
1168
        # <https://bugs.launchpad.net/bzr/+bug/338653>
1075
1169
        # Pyrex readdir didn't raise useful messages if it had an error
1076
1170
        # reading the directory
1077
1171
        if sys.platform == 'win32':
1078
1172
            raise tests.TestNotApplicable(
1079
1173
                "readdir IOError not tested on win32")
 
1174
        self.requireFeature(features.not_running_as_root)
1080
1175
        os.mkdir("test-unreadable")
1081
1176
        os.chmod("test-unreadable", 0000)
1082
1177
        # must chmod it back so that it can be removed
1083
 
        self.addCleanup(os.chmod, "test-unreadable", 0700)
 
1178
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
1084
1179
        # The error is not raised until the generator is actually evaluated.
1085
1180
        # (It would be ok if it happened earlier but at the moment it
1086
1181
        # doesn't.)
1087
1182
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1088
 
        self.assertEquals('./test-unreadable', e.filename)
1089
 
        self.assertEquals(errno.EACCES, e.errno)
 
1183
        self.assertEqual('./test-unreadable', e.filename)
 
1184
        self.assertEqual(errno.EACCES, e.errno)
1090
1185
        # Ensure the message contains the file name
1091
 
        self.assertContainsRe(str(e), "\./test-unreadable")
 
1186
        self.assertContainsRe(str(e), "\\./test-unreadable")
 
1187
 
 
1188
 
 
1189
    def test_walkdirs_encoding_error(self):
 
1190
        # <https://bugs.launchpad.net/bzr/+bug/488519>
 
1191
        # walkdirs didn't raise a useful message when the filenames
 
1192
        # are not using the filesystem's encoding
 
1193
 
 
1194
        # require a bytestring based filesystem
 
1195
        self.requireFeature(features.ByteStringNamedFilesystem)
 
1196
 
 
1197
        tree = [
 
1198
            '.bzr',
 
1199
            '0file',
 
1200
            '1dir/',
 
1201
            '1dir/0file',
 
1202
            '1dir/1dir/',
 
1203
            '1file'
 
1204
            ]
 
1205
 
 
1206
        self.build_tree(tree)
 
1207
 
 
1208
        # rename the 1file to a latin-1 filename
 
1209
        os.rename("./1file", "\xe8file")
 
1210
        if "\xe8file" not in os.listdir("."):
 
1211
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
 
1212
 
 
1213
        self._save_platform_info()
 
1214
        osutils._fs_enc = 'UTF-8'
 
1215
 
 
1216
        # this should raise on error
 
1217
        def attempt():
 
1218
            for dirdetail, dirblock in osutils.walkdirs('.'):
 
1219
                pass
 
1220
 
 
1221
        self.assertRaises(errors.BadFilenameEncoding, attempt)
1092
1222
 
1093
1223
    def test__walkdirs_utf8(self):
1094
1224
        tree = [
1145
1275
            dirblock[:] = new_dirblock
1146
1276
 
1147
1277
    def _save_platform_info(self):
1148
 
        self.overrideAttr(win32utils, 'winver')
1149
1278
        self.overrideAttr(osutils, '_fs_enc')
1150
1279
        self.overrideAttr(osutils, '_selected_dir_reader')
1151
1280
 
1160
1289
    def test_force_walkdirs_utf8_fs_utf8(self):
1161
1290
        self.requireFeature(UTF8DirReaderFeature)
1162
1291
        self._save_platform_info()
1163
 
        win32utils.winver = None # Avoid the win32 detection code
1164
 
        osutils._fs_enc = 'UTF-8'
1165
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1292
        osutils._fs_enc = 'utf-8'
 
1293
        self.assertDirReaderIs(
 
1294
            UTF8DirReaderFeature.module.UTF8DirReader)
1166
1295
 
1167
1296
    def test_force_walkdirs_utf8_fs_ascii(self):
1168
1297
        self.requireFeature(UTF8DirReaderFeature)
1169
1298
        self._save_platform_info()
1170
 
        win32utils.winver = None # Avoid the win32 detection code
1171
 
        osutils._fs_enc = 'US-ASCII'
1172
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1173
 
 
1174
 
    def test_force_walkdirs_utf8_fs_ANSI(self):
1175
 
        self.requireFeature(UTF8DirReaderFeature)
1176
 
        self._save_platform_info()
1177
 
        win32utils.winver = None # Avoid the win32 detection code
1178
 
        osutils._fs_enc = 'ANSI_X3.4-1968'
1179
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1299
        osutils._fs_enc = 'ascii'
 
1300
        self.assertDirReaderIs(
 
1301
            UTF8DirReaderFeature.module.UTF8DirReader)
1180
1302
 
1181
1303
    def test_force_walkdirs_utf8_fs_latin1(self):
1182
1304
        self._save_platform_info()
1183
 
        win32utils.winver = None # Avoid the win32 detection code
1184
 
        osutils._fs_enc = 'latin1'
 
1305
        osutils._fs_enc = 'iso-8859-1'
1185
1306
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1186
1307
 
1187
1308
    def test_force_walkdirs_utf8_nt(self):
1188
1309
        # Disabled because the thunk of the whole walkdirs api is disabled.
1189
1310
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1190
1311
        self._save_platform_info()
1191
 
        win32utils.winver = 'Windows NT'
1192
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1312
        from .._walkdirs_win32 import Win32ReadDir
1193
1313
        self.assertDirReaderIs(Win32ReadDir)
1194
1314
 
1195
 
    def test_force_walkdirs_utf8_98(self):
1196
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1197
 
        self._save_platform_info()
1198
 
        win32utils.winver = 'Windows 98'
1199
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1200
 
 
1201
1315
    def test_unicode_walkdirs(self):
1202
1316
        """Walkdirs should always return unicode paths."""
1203
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1317
        self.requireFeature(features.UnicodeFilenameFeature)
1204
1318
        name0 = u'0file-\xb6'
1205
1319
        name1 = u'1dir-\u062c\u0648'
1206
1320
        name2 = u'2file-\u0633'
1243
1357
 
1244
1358
        The abspath portion might be in unicode or utf-8
1245
1359
        """
1246
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1360
        self.requireFeature(features.UnicodeFilenameFeature)
1247
1361
        name0 = u'0file-\xb6'
1248
1362
        name1 = u'1dir-\u062c\u0648'
1249
1363
        name2 = u'2file-\u0633'
1304
1418
 
1305
1419
        The abspath portion should be in unicode
1306
1420
        """
1307
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1421
        self.requireFeature(features.UnicodeFilenameFeature)
1308
1422
        # Use the unicode reader. TODO: split into driver-and-driven unit
1309
1423
        # tests.
1310
1424
        self._save_platform_info()
1351
1465
 
1352
1466
    def test__walkdirs_utf8_win32readdir(self):
1353
1467
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1354
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1355
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1468
        self.requireFeature(features.UnicodeFilenameFeature)
 
1469
        from .._walkdirs_win32 import Win32ReadDir
1356
1470
        self._save_platform_info()
1357
1471
        osutils._selected_dir_reader = Win32ReadDir()
1358
1472
        name0u = u'0file-\xb6'
1408
1522
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1409
1523
        """make sure our Stat values are valid"""
1410
1524
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1411
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1412
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1525
        self.requireFeature(features.UnicodeFilenameFeature)
 
1526
        from .._walkdirs_win32 import Win32ReadDir
1413
1527
        name0u = u'0file-\xb6'
1414
1528
        name0 = name0u.encode('utf8')
1415
1529
        self.build_tree([name0u])
1432
1546
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1433
1547
        """make sure our Stat values are valid"""
1434
1548
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1435
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1436
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1549
        self.requireFeature(features.UnicodeFilenameFeature)
 
1550
        from .._walkdirs_win32 import Win32ReadDir
1437
1551
        name0u = u'0dir-\u062c\u0648'
1438
1552
        name0 = name0u.encode('utf8')
1439
1553
        self.build_tree([name0u + '/'])
1538
1652
        self.assertEqual(['c'], os.listdir('target/b'))
1539
1653
 
1540
1654
    def test_copy_tree_symlinks(self):
1541
 
        self.requireFeature(tests.SymlinkFeature)
 
1655
        self.requireFeature(features.SymlinkFeature)
1542
1656
        self.build_tree(['source/'])
1543
1657
        os.symlink('a/generic/path', 'source/lnk')
1544
1658
        osutils.copy_tree('source', 'target')
1554
1668
            processed_files.append(('d', from_path, to_path))
1555
1669
        def link_handler(from_path, to_path):
1556
1670
            processed_links.append((from_path, to_path))
1557
 
        handlers = {'file':file_handler,
1558
 
                    'directory':dir_handler,
1559
 
                    'symlink':link_handler,
 
1671
        handlers = {'file': file_handler,
 
1672
                    'directory': dir_handler,
 
1673
                    'symlink': link_handler,
1560
1674
                   }
1561
1675
 
1562
1676
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1569
1683
                          ('d', 'source/b', 'target/b'),
1570
1684
                          ('f', 'source/b/c', 'target/b/c'),
1571
1685
                         ], processed_files)
1572
 
        self.failIfExists('target')
 
1686
        self.assertPathDoesNotExist('target')
1573
1687
        if osutils.has_symlinks():
1574
1688
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1575
1689
 
1580
1694
    def setUp(self):
1581
1695
        super(TestSetUnsetEnv, self).setUp()
1582
1696
 
1583
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
 
1697
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
1584
1698
                         'Environment was not cleaned up properly.'
1585
 
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
 
1699
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
1586
1700
        def cleanup():
1587
 
            if 'BZR_TEST_ENV_VAR' in os.environ:
1588
 
                del os.environ['BZR_TEST_ENV_VAR']
 
1701
            if 'BRZ_TEST_ENV_VAR' in os.environ:
 
1702
                del os.environ['BRZ_TEST_ENV_VAR']
1589
1703
        self.addCleanup(cleanup)
1590
1704
 
1591
1705
    def test_set(self):
1592
1706
        """Test that we can set an env variable"""
1593
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1707
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1594
1708
        self.assertEqual(None, old)
1595
 
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
 
1709
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
1596
1710
 
1597
1711
    def test_double_set(self):
1598
1712
        """Test that we get the old value out"""
1599
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1600
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
 
1713
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1714
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
1601
1715
        self.assertEqual('foo', old)
1602
 
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
 
1716
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
1603
1717
 
1604
1718
    def test_unicode(self):
1605
1719
        """Environment can only contain plain strings
1612
1726
                'Cannot find a unicode character that works in encoding %s'
1613
1727
                % (osutils.get_user_encoding(),))
1614
1728
 
1615
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1616
 
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
 
1729
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
 
1730
        self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1617
1731
 
1618
1732
    def test_unset(self):
1619
1733
        """Test that passing None will remove the env var"""
1620
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1621
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
 
1734
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1735
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
1622
1736
        self.assertEqual('foo', old)
1623
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1624
 
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1737
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1738
        self.assertFalse('BRZ_TEST_ENV_VAR' in os.environ)
1625
1739
 
1626
1740
 
1627
1741
class TestSizeShaFile(tests.TestCaseInTempDir):
1663
1777
class TestResourceLoading(tests.TestCaseInTempDir):
1664
1778
 
1665
1779
    def test_resource_string(self):
1666
 
        # test resource in bzrlib
1667
 
        text = osutils.resource_string('bzrlib', 'debug.py')
 
1780
        # test resource in breezy
 
1781
        text = osutils.resource_string('breezy', 'debug.py')
1668
1782
        self.assertContainsRe(text, "debug_flags = set()")
1669
 
        # test resource under bzrlib
1670
 
        text = osutils.resource_string('bzrlib.ui', 'text.py')
 
1783
        # test resource under breezy
 
1784
        text = osutils.resource_string('breezy.ui', 'text.py')
1671
1785
        self.assertContainsRe(text, "class TextUIFactory")
1672
1786
        # test unsupported package
1673
1787
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1674
1788
            'yyy.xx')
1675
1789
        # test unknown resource
1676
 
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1677
 
 
1678
 
 
1679
 
class TestReCompile(tests.TestCase):
1680
 
 
1681
 
    def test_re_compile_checked(self):
1682
 
        r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1683
 
        self.assertTrue(r.match('aaaa'))
1684
 
        self.assertTrue(r.match('aAaA'))
1685
 
 
1686
 
    def test_re_compile_checked_error(self):
1687
 
        # like https://bugs.launchpad.net/bzr/+bug/251352
1688
 
        err = self.assertRaises(
1689
 
            errors.BzrCommandError,
1690
 
            osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1691
 
        self.assertEqual(
1692
 
            "Invalid regular expression in test case: '*': "
1693
 
            "nothing to repeat",
1694
 
            str(err))
 
1790
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
1695
1791
 
1696
1792
 
1697
1793
class TestDirReader(tests.TestCaseInTempDir):
1698
1794
 
 
1795
    scenarios = dir_reader_scenarios()
 
1796
 
1699
1797
    # Set by load_tests
1700
1798
    _dir_reader_class = None
1701
1799
    _native_to_unicode = None
1702
1800
 
1703
1801
    def setUp(self):
1704
 
        tests.TestCaseInTempDir.setUp(self)
 
1802
        super(TestDirReader, self).setUp()
1705
1803
        self.overrideAttr(osutils,
1706
1804
                          '_selected_dir_reader', self._dir_reader_class())
1707
1805
 
1801
1899
        return filtered_dirblocks
1802
1900
 
1803
1901
    def test_walk_unicode_tree(self):
1804
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1902
        self.requireFeature(features.UnicodeFilenameFeature)
1805
1903
        tree, expected_dirblocks = self._get_unicode_tree()
1806
1904
        self.build_tree(tree)
1807
1905
        result = list(osutils._walkdirs_utf8('.'))
1808
1906
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1809
1907
 
1810
1908
    def test_symlink(self):
1811
 
        self.requireFeature(tests.SymlinkFeature)
1812
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1909
        self.requireFeature(features.SymlinkFeature)
 
1910
        self.requireFeature(features.UnicodeFilenameFeature)
1813
1911
        target = u'target\N{Euro Sign}'
1814
1912
        link_name = u'l\N{Euro Sign}nk'
1815
1913
        os.symlink(target, link_name)
1833
1931
    But prior python versions failed to properly encode the passed unicode
1834
1932
    string.
1835
1933
    """
1836
 
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
 
1934
    _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
1837
1935
 
1838
1936
    def setUp(self):
1839
1937
        super(tests.TestCaseInTempDir, self).setUp()
1842
1940
        os.symlink(self.target, self.link)
1843
1941
 
1844
1942
    def test_os_readlink_link_encoding(self):
1845
 
        if sys.version_info < (2, 6):
1846
 
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1847
 
        else:
1848
 
            self.assertEquals(self.target,  os.readlink(self.link))
 
1943
        self.assertEqual(self.target,  os.readlink(self.link))
1849
1944
 
1850
1945
    def test_os_readlink_link_decoding(self):
1851
 
        self.assertEquals(self.target.encode(osutils._fs_enc),
 
1946
        self.assertEqual(self.target.encode(osutils._fs_enc),
1852
1947
                          os.readlink(self.link.encode(osutils._fs_enc)))
1853
1948
 
1854
1949
 
1863
1958
        self.assertIsInstance(concurrency, int)
1864
1959
 
1865
1960
    def test_local_concurrency_environment_variable(self):
1866
 
        os.environ['BZR_CONCURRENCY'] = '2'
 
1961
        self.overrideEnv('BRZ_CONCURRENCY', '2')
1867
1962
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1868
 
        os.environ['BZR_CONCURRENCY'] = '3'
 
1963
        self.overrideEnv('BRZ_CONCURRENCY', '3')
1869
1964
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1870
 
        os.environ['BZR_CONCURRENCY'] = 'foo'
 
1965
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
1871
1966
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1872
1967
 
1873
1968
    def test_option_concurrency(self):
1874
 
        os.environ['BZR_CONCURRENCY'] = '1'
 
1969
        self.overrideEnv('BRZ_CONCURRENCY', '1')
1875
1970
        self.run_bzr('rocks --concurrency 42')
1876
 
        # Command line overrides envrionment variable
1877
 
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1878
 
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
 
1971
        # Command line overrides environment variable
 
1972
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
 
1973
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
1879
1974
 
1880
1975
 
1881
1976
class TestFailedToLoadExtension(tests.TestCase):
1882
1977
 
1883
1978
    def _try_loading(self):
1884
1979
        try:
1885
 
            import bzrlib._fictional_extension_py
1886
 
        except ImportError, e:
 
1980
            import breezy._fictional_extension_py
 
1981
        except ImportError as e:
1887
1982
            osutils.failed_to_load_extension(e)
1888
1983
            return True
1889
1984
 
1894
1989
    def test_failure_to_load(self):
1895
1990
        self._try_loading()
1896
1991
        self.assertLength(1, osutils._extension_load_failures)
1897
 
        self.assertEquals(osutils._extension_load_failures[0],
 
1992
        self.assertEqual(osutils._extension_load_failures[0],
1898
1993
            "No module named _fictional_extension_py")
1899
1994
 
1900
1995
    def test_report_extension_load_failures_no_warning(self):
1904
1999
        self.assertLength(0, warnings)
1905
2000
 
1906
2001
    def test_report_extension_load_failures_message(self):
1907
 
        log = StringIO()
 
2002
        log = BytesIO()
1908
2003
        trace.push_log_file(log)
1909
2004
        self.assertTrue(self._try_loading())
1910
2005
        osutils.report_extension_load_failures()
1911
2006
        self.assertContainsRe(
1912
2007
            log.getvalue(),
1913
 
            r"bzr: warning: some compiled extensions could not be loaded; "
1914
 
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
 
2008
            r"brz: warning: some compiled extensions could not be loaded; "
 
2009
            "see ``brz help missing-extensions``\n"
1915
2010
            )
1916
2011
 
1917
2012
 
1918
2013
class TestTerminalWidth(tests.TestCase):
1919
2014
 
 
2015
    def setUp(self):
 
2016
        super(TestTerminalWidth, self).setUp()
 
2017
        self._orig_terminal_size_state = osutils._terminal_size_state
 
2018
        self._orig_first_terminal_size = osutils._first_terminal_size
 
2019
        self.addCleanup(self.restore_osutils_globals)
 
2020
        osutils._terminal_size_state = 'no_data'
 
2021
        osutils._first_terminal_size = None
 
2022
 
 
2023
    def restore_osutils_globals(self):
 
2024
        osutils._terminal_size_state = self._orig_terminal_size_state
 
2025
        osutils._first_terminal_size = self._orig_first_terminal_size
 
2026
 
1920
2027
    def replace_stdout(self, new):
1921
2028
        self.overrideAttr(sys, 'stdout', new)
1922
2029
 
1934
2041
    def test_default_values(self):
1935
2042
        self.assertEqual(80, osutils.default_terminal_width)
1936
2043
 
1937
 
    def test_defaults_to_BZR_COLUMNS(self):
1938
 
        # BZR_COLUMNS is set by the test framework
1939
 
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1940
 
        os.environ['BZR_COLUMNS'] = '12'
 
2044
    def test_defaults_to_BRZ_COLUMNS(self):
 
2045
        # BRZ_COLUMNS is set by the test framework
 
2046
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
 
2047
        self.overrideEnv('BRZ_COLUMNS', '12')
1941
2048
        self.assertEqual(12, osutils.terminal_width())
1942
2049
 
 
2050
    def test_BRZ_COLUMNS_0_no_limit(self):
 
2051
        self.overrideEnv('BRZ_COLUMNS', '0')
 
2052
        self.assertEqual(None, osutils.terminal_width())
 
2053
 
1943
2054
    def test_falls_back_to_COLUMNS(self):
1944
 
        del os.environ['BZR_COLUMNS']
 
2055
        self.overrideEnv('BRZ_COLUMNS', None)
1945
2056
        self.assertNotEqual('42', os.environ['COLUMNS'])
1946
2057
        self.set_fake_tty()
1947
 
        os.environ['COLUMNS'] = '42'
 
2058
        self.overrideEnv('COLUMNS', '42')
1948
2059
        self.assertEqual(42, osutils.terminal_width())
1949
2060
 
1950
2061
    def test_tty_default_without_columns(self):
1951
 
        del os.environ['BZR_COLUMNS']
1952
 
        del os.environ['COLUMNS']
 
2062
        self.overrideEnv('BRZ_COLUMNS', None)
 
2063
        self.overrideEnv('COLUMNS', None)
1953
2064
 
1954
2065
        def terminal_size(w, h):
1955
2066
            return 42, 42
1962
2073
        self.assertEqual(42, osutils.terminal_width())
1963
2074
 
1964
2075
    def test_non_tty_default_without_columns(self):
1965
 
        del os.environ['BZR_COLUMNS']
1966
 
        del os.environ['COLUMNS']
 
2076
        self.overrideEnv('BRZ_COLUMNS', None)
 
2077
        self.overrideEnv('COLUMNS', None)
1967
2078
        self.replace_stdout(None)
1968
2079
        self.assertEqual(None, osutils.terminal_width())
1969
2080
 
1979
2090
        else:
1980
2091
            self.overrideAttr(termios, 'TIOCGWINSZ')
1981
2092
            del termios.TIOCGWINSZ
1982
 
        del os.environ['BZR_COLUMNS']
1983
 
        del os.environ['COLUMNS']
 
2093
        self.overrideEnv('BRZ_COLUMNS', None)
 
2094
        self.overrideEnv('COLUMNS', None)
1984
2095
        # Whatever the result is, if we don't raise an exception, it's ok.
1985
2096
        osutils.terminal_width()
1986
2097
 
 
2098
 
1987
2099
class TestCreationOps(tests.TestCaseInTempDir):
1988
2100
    _test_needs_features = [features.chown_feature]
1989
2101
 
1990
2102
    def setUp(self):
1991
 
        tests.TestCaseInTempDir.setUp(self)
 
2103
        super(TestCreationOps, self).setUp()
1992
2104
        self.overrideAttr(os, 'chown', self._dummy_chown)
1993
2105
 
1994
2106
        # params set by call to _dummy_chown
2004
2116
        osutils.copy_ownership_from_path('test_file', ownsrc)
2005
2117
 
2006
2118
        s = os.stat(ownsrc)
2007
 
        self.assertEquals(self.path, 'test_file')
2008
 
        self.assertEquals(self.uid, s.st_uid)
2009
 
        self.assertEquals(self.gid, s.st_gid)
 
2119
        self.assertEqual(self.path, 'test_file')
 
2120
        self.assertEqual(self.uid, s.st_uid)
 
2121
        self.assertEqual(self.gid, s.st_gid)
2010
2122
 
2011
2123
    def test_copy_ownership_nonesrc(self):
2012
2124
        """copy_ownership_from_path test with src=None."""
2015
2127
        osutils.copy_ownership_from_path('test_file')
2016
2128
 
2017
2129
        s = os.stat('..')
2018
 
        self.assertEquals(self.path, 'test_file')
2019
 
        self.assertEquals(self.uid, s.st_uid)
2020
 
        self.assertEquals(self.gid, s.st_gid)
 
2130
        self.assertEqual(self.path, 'test_file')
 
2131
        self.assertEqual(self.uid, s.st_uid)
 
2132
        self.assertEqual(self.gid, s.st_gid)
 
2133
 
 
2134
 
 
2135
class TestPathFromEnviron(tests.TestCase):
 
2136
 
 
2137
    def test_is_unicode(self):
 
2138
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
 
2139
        path = osutils.path_from_environ('BRZ_TEST_PATH')
 
2140
        self.assertIsInstance(path, unicode)
 
2141
        self.assertEqual(u'./anywhere at all/', path)
 
2142
 
 
2143
    def test_posix_path_env_ascii(self):
 
2144
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
 
2145
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
 
2146
        self.assertIsInstance(home, unicode)
 
2147
        self.assertEqual(u'/tmp', home)
 
2148
 
 
2149
    def test_posix_path_env_unicode(self):
 
2150
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2151
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
 
2152
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2153
        self.assertEqual(u'/home/\xa7test',
 
2154
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2155
        osutils._fs_enc = "iso8859-5"
 
2156
        self.assertEqual(u'/home/\u0407test',
 
2157
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2158
        osutils._fs_enc = "utf-8"
 
2159
        self.assertRaises(errors.BadFilenameEncoding,
 
2160
            osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
 
2161
 
 
2162
 
 
2163
class TestGetHomeDir(tests.TestCase):
 
2164
 
 
2165
    def test_is_unicode(self):
 
2166
        home = osutils._get_home_dir()
 
2167
        self.assertIsInstance(home, unicode)
 
2168
 
 
2169
    def test_posix_homeless(self):
 
2170
        self.overrideEnv('HOME', None)
 
2171
        home = osutils._get_home_dir()
 
2172
        self.assertIsInstance(home, unicode)
 
2173
 
 
2174
    def test_posix_home_ascii(self):
 
2175
        self.overrideEnv('HOME', '/home/test')
 
2176
        home = osutils._posix_get_home_dir()
 
2177
        self.assertIsInstance(home, unicode)
 
2178
        self.assertEqual(u'/home/test', home)
 
2179
 
 
2180
    def test_posix_home_unicode(self):
 
2181
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2182
        self.overrideEnv('HOME', '/home/\xa7test')
 
2183
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2184
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2185
        osutils._fs_enc = "iso8859-5"
 
2186
        self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
 
2187
        osutils._fs_enc = "utf-8"
 
2188
        self.assertRaises(errors.BadFilenameEncoding,
 
2189
            osutils._posix_get_home_dir)
 
2190
 
 
2191
 
 
2192
class TestGetuserUnicode(tests.TestCase):
 
2193
 
 
2194
    def test_is_unicode(self):
 
2195
        user = osutils.getuser_unicode()
 
2196
        self.assertIsInstance(user, unicode)
 
2197
 
 
2198
    def envvar_to_override(self):
 
2199
        if sys.platform == "win32":
 
2200
            # Disable use of platform calls on windows so envvar is used
 
2201
            self.overrideAttr(win32utils, 'has_ctypes', False)
 
2202
            return 'USERNAME' # only variable used on windows
 
2203
        return 'LOGNAME' # first variable checked by getpass.getuser()
 
2204
 
 
2205
    def test_ascii_user(self):
 
2206
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
 
2207
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
 
2208
 
 
2209
    def test_unicode_user(self):
 
2210
        ue = osutils.get_user_encoding()
 
2211
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
2212
        if uni_val is None:
 
2213
            raise tests.TestSkipped(
 
2214
                'Cannot find a unicode character that works in encoding %s'
 
2215
                % (osutils.get_user_encoding(),))
 
2216
        uni_username = u'jrandom' + uni_val
 
2217
        encoded_username = uni_username.encode(ue)
 
2218
        self.overrideEnv(self.envvar_to_override(), encoded_username)
 
2219
        self.assertEqual(uni_username, osutils.getuser_unicode())
 
2220
 
 
2221
 
 
2222
class TestBackupNames(tests.TestCase):
 
2223
 
 
2224
    def setUp(self):
 
2225
        super(TestBackupNames, self).setUp()
 
2226
        self.backups = []
 
2227
 
 
2228
    def backup_exists(self, name):
 
2229
        return name in self.backups
 
2230
 
 
2231
    def available_backup_name(self, name):
 
2232
        backup_name = osutils.available_backup_name(name, self.backup_exists)
 
2233
        self.backups.append(backup_name)
 
2234
        return backup_name
 
2235
 
 
2236
    def assertBackupName(self, expected, name):
 
2237
        self.assertEqual(expected, self.available_backup_name(name))
 
2238
 
 
2239
    def test_empty(self):
 
2240
        self.assertBackupName('file.~1~', 'file')
 
2241
 
 
2242
    def test_existing(self):
 
2243
        self.available_backup_name('file')
 
2244
        self.available_backup_name('file')
 
2245
        self.assertBackupName('file.~3~', 'file')
 
2246
        # Empty slots are found, this is not a strict requirement and may be
 
2247
        # revisited if we test against all implementations.
 
2248
        self.backups.remove('file.~2~')
 
2249
        self.assertBackupName('file.~2~', 'file')
 
2250
 
 
2251
 
 
2252
class TestFindExecutableInPath(tests.TestCase):
 
2253
 
 
2254
    def test_windows(self):
 
2255
        if sys.platform != 'win32':
 
2256
            raise tests.TestSkipped('test requires win32')
 
2257
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
 
2258
        self.assertTrue(
 
2259
            osutils.find_executable_on_path('explorer.exe') is not None)
 
2260
        self.assertTrue(
 
2261
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
 
2262
        self.assertTrue(
 
2263
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2264
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
 
2265
        
 
2266
    def test_windows_app_path(self):
 
2267
        if sys.platform != 'win32':
 
2268
            raise tests.TestSkipped('test requires win32')
 
2269
        # Override PATH env var so that exe can only be found on App Path
 
2270
        self.overrideEnv('PATH', '')
 
2271
        # Internt Explorer is always registered in the App Path
 
2272
        self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
 
2273
 
 
2274
    def test_other(self):
 
2275
        if sys.platform == 'win32':
 
2276
            raise tests.TestSkipped('test requires non-win32')
 
2277
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
 
2278
        self.assertTrue(
 
2279
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2280
 
 
2281
 
 
2282
class TestEnvironmentErrors(tests.TestCase):
 
2283
    """Test handling of environmental errors"""
 
2284
 
 
2285
    def test_is_oserror(self):
 
2286
        self.assertTrue(osutils.is_environment_error(
 
2287
            OSError(errno.EINVAL, "Invalid parameter")))
 
2288
 
 
2289
    def test_is_ioerror(self):
 
2290
        self.assertTrue(osutils.is_environment_error(
 
2291
            IOError(errno.EINVAL, "Invalid parameter")))
 
2292
 
 
2293
    def test_is_socket_error(self):
 
2294
        self.assertTrue(osutils.is_environment_error(
 
2295
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2296
 
 
2297
    def test_is_select_error(self):
 
2298
        self.assertTrue(osutils.is_environment_error(
 
2299
            select.error(errno.EINVAL, "Invalid parameter")))
 
2300
 
 
2301
    def test_is_pywintypes_error(self):
 
2302
        self.requireFeature(features.pywintypes)
 
2303
        import pywintypes
 
2304
        self.assertTrue(osutils.is_environment_error(
 
2305
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))