/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_osutils.py

  • Committer: Martin
  • Date: 2018-11-16 16:38:22 UTC
  • mto: This revision was merged to the branch mainline in revision 7172.
  • Revision ID: gzlist@googlemail.com-20181116163822-yg1h1cdng6w7w9kn
Make --profile-imports work on Python 3

Also tweak heading to line up correctly.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
 
from cStringIO import StringIO
 
19
from __future__ import absolute_import, division
 
20
 
20
21
import errno
21
22
import os
22
23
import re
 
24
import select
23
25
import socket
24
 
import stat
25
26
import sys
 
27
import tempfile
26
28
import time
27
29
 
28
 
from bzrlib import (
 
30
from .. import (
29
31
    errors,
 
32
    lazy_regex,
30
33
    osutils,
 
34
    symbol_versioning,
31
35
    tests,
32
36
    trace,
33
37
    win32utils,
34
38
    )
35
 
from bzrlib.tests import (
 
39
from ..sixish import (
 
40
    BytesIO,
 
41
    PY3,
 
42
    text_type,
 
43
    )
 
44
from . import (
36
45
    features,
37
46
    file_utils,
38
47
    test__walkdirs_win32,
39
48
    )
40
 
 
41
 
 
42
 
class _UTF8DirReaderFeature(tests.Feature):
 
49
from .scenarios import load_tests_apply_scenarios
 
50
 
 
51
 
 
52
class _UTF8DirReaderFeature(features.ModuleAvailableFeature):
43
53
 
44
54
    def _probe(self):
45
55
        try:
46
 
            from bzrlib import _readdir_pyx
 
56
            from .. import _readdir_pyx
 
57
            self._module = _readdir_pyx
47
58
            self.reader = _readdir_pyx.UTF8DirReader
48
59
            return True
49
60
        except ImportError:
50
61
            return False
51
62
 
52
 
    def feature_name(self):
53
 
        return 'bzrlib._readdir_pyx'
54
 
 
55
 
UTF8DirReaderFeature = _UTF8DirReaderFeature()
56
 
 
57
 
term_ios_feature = tests.ModuleAvailableFeature('termios')
 
63
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
 
64
 
 
65
term_ios_feature = features.ModuleAvailableFeature('termios')
58
66
 
59
67
 
60
68
def _already_unicode(s):
78
86
    # Some DirReaders are platform specific and even there they may not be
79
87
    # available.
80
88
    if UTF8DirReaderFeature.available():
81
 
        from bzrlib import _readdir_pyx
 
89
        from .. import _readdir_pyx
82
90
        scenarios.append(('utf8',
83
91
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
84
92
                               _native_to_unicode=_utf8_to_unicode)))
85
93
 
86
94
    if test__walkdirs_win32.win32_readdir_feature.available():
87
95
        try:
88
 
            from bzrlib import _walkdirs_win32
 
96
            from .. import _walkdirs_win32
89
97
            scenarios.append(
90
98
                ('win32',
91
99
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
95
103
    return scenarios
96
104
 
97
105
 
98
 
def load_tests(basic_tests, module, loader):
99
 
    suite = loader.suiteClass()
100
 
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
101
 
        basic_tests, tests.condition_isinstance(TestDirReader))
102
 
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
103
 
    suite.addTest(remaining_tests)
104
 
    return suite
 
106
load_tests = load_tests_apply_scenarios
105
107
 
106
108
 
107
109
class TestContainsWhitespace(tests.TestCase):
108
110
 
109
111
    def test_contains_whitespace(self):
110
 
        self.failUnless(osutils.contains_whitespace(u' '))
111
 
        self.failUnless(osutils.contains_whitespace(u'hello there'))
112
 
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
113
 
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
114
 
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
115
 
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
112
        self.assertTrue(osutils.contains_whitespace(u' '))
 
113
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
 
114
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
 
115
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
 
116
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
 
117
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
116
118
 
117
119
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
118
120
        # is whitespace, but we do not.
119
 
        self.failIf(osutils.contains_whitespace(u''))
120
 
        self.failIf(osutils.contains_whitespace(u'hellothere'))
121
 
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
121
        self.assertFalse(osutils.contains_whitespace(u''))
 
122
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
 
123
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
122
124
 
123
125
 
124
126
class TestRename(tests.TestCaseInTempDir):
136
138
 
137
139
    def test_fancy_rename(self):
138
140
        # This should work everywhere
139
 
        self.create_file('a', 'something in a\n')
 
141
        self.create_file('a', b'something in a\n')
140
142
        self._fancy_rename('a', 'b')
141
 
        self.failIfExists('a')
142
 
        self.failUnlessExists('b')
143
 
        self.check_file_contents('b', 'something in a\n')
 
143
        self.assertPathDoesNotExist('a')
 
144
        self.assertPathExists('b')
 
145
        self.check_file_contents('b', b'something in a\n')
144
146
 
145
 
        self.create_file('a', 'new something in a\n')
 
147
        self.create_file('a', b'new something in a\n')
146
148
        self._fancy_rename('b', 'a')
147
149
 
148
 
        self.check_file_contents('a', 'something in a\n')
 
150
        self.check_file_contents('a', b'something in a\n')
149
151
 
150
152
    def test_fancy_rename_fails_source_missing(self):
151
153
        # An exception should be raised, and the target should be left in place
152
 
        self.create_file('target', 'data in target\n')
 
154
        self.create_file('target', b'data in target\n')
153
155
        self.assertRaises((IOError, OSError), self._fancy_rename,
154
156
                          'missingsource', 'target')
155
 
        self.failUnlessExists('target')
156
 
        self.check_file_contents('target', 'data in target\n')
 
157
        self.assertPathExists('target')
 
158
        self.check_file_contents('target', b'data in target\n')
157
159
 
158
160
    def test_fancy_rename_fails_if_source_and_target_missing(self):
159
161
        self.assertRaises((IOError, OSError), self._fancy_rename,
161
163
 
162
164
    def test_rename(self):
163
165
        # Rename should be semi-atomic on all platforms
164
 
        self.create_file('a', 'something in a\n')
 
166
        self.create_file('a', b'something in a\n')
165
167
        osutils.rename('a', 'b')
166
 
        self.failIfExists('a')
167
 
        self.failUnlessExists('b')
168
 
        self.check_file_contents('b', 'something in a\n')
 
168
        self.assertPathDoesNotExist('a')
 
169
        self.assertPathExists('b')
 
170
        self.check_file_contents('b', b'something in a\n')
169
171
 
170
 
        self.create_file('a', 'new something in a\n')
 
172
        self.create_file('a', b'new something in a\n')
171
173
        osutils.rename('b', 'a')
172
174
 
173
 
        self.check_file_contents('a', 'something in a\n')
 
175
        self.check_file_contents('a', b'something in a\n')
174
176
 
175
177
    # TODO: test fancy_rename using a MemoryTransport
176
178
 
182
184
        # we can't use failUnlessExists on case-insensitive filesystem
183
185
        # so try to check shape of the tree
184
186
        shape = sorted(os.listdir('.'))
185
 
        self.assertEquals(['A', 'B'], shape)
 
187
        self.assertEqual(['A', 'B'], shape)
186
188
 
187
 
    def test_rename_error(self):
188
 
        # We wrap os.rename to make it give an error including the filenames
189
 
        # https://bugs.launchpad.net/bzr/+bug/491763
190
 
        err = self.assertRaises(OSError, osutils.rename,
191
 
            'nonexistent', 'target')
192
 
        self.assertContainsString(str(err), 'nonexistent')
 
189
    def test_rename_exception(self):
 
190
        try:
 
191
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
 
192
        except OSError as e:
 
193
            self.assertEqual(e.old_filename, 'nonexistent_path')
 
194
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
 
195
            self.assertTrue('nonexistent_path' in e.strerror)
 
196
            self.assertTrue('different_nonexistent_path' in e.strerror)
193
197
 
194
198
 
195
199
class TestRandChars(tests.TestCase):
222
226
                         (['src'], SRC_FOO_C),
223
227
                         (['src'], 'src'),
224
228
                         ]:
225
 
            self.assert_(osutils.is_inside_any(dirs, fn))
 
229
            self.assertTrue(osutils.is_inside_any(dirs, fn))
226
230
        for dirs, fn in [(['src'], 'srccontrol'),
227
231
                         (['src'], 'srccontrol/foo')]:
228
232
            self.assertFalse(osutils.is_inside_any(dirs, fn))
234
238
                         (['src/bar.c', 'bla/foo.c'], 'src'),
235
239
                         (['src'], 'src'),
236
240
                         ]:
237
 
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
 
241
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
238
242
 
239
243
        for dirs, fn in [(['src'], 'srccontrol'),
240
244
                         (['srccontrol/foo.c'], 'src'),
242
246
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
243
247
 
244
248
 
 
249
class TestLstat(tests.TestCaseInTempDir):
 
250
 
 
251
    def test_lstat_matches_fstat(self):
 
252
        # On Windows, lstat and fstat don't always agree, primarily in the
 
253
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
 
254
        # custom implementation.
 
255
        if sys.platform == 'win32':
 
256
            # We only have special lstat/fstat if we have the extension.
 
257
            # Without it, we may end up re-reading content when we don't have
 
258
            # to, but otherwise it doesn't effect correctness.
 
259
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
260
        with open('test-file.txt', 'wb') as f:
 
261
            f.write(b'some content\n')
 
262
            f.flush()
 
263
            self.assertEqualStat(osutils.fstat(f.fileno()),
 
264
                                 osutils.lstat('test-file.txt'))
 
265
 
 
266
 
245
267
class TestRmTree(tests.TestCaseInTempDir):
246
268
 
247
269
    def test_rmtree(self):
248
270
        # Check to remove tree with read-only files/dirs
249
271
        os.mkdir('dir')
250
 
        f = file('dir/file', 'w')
251
 
        f.write('spam')
252
 
        f.close()
 
272
        with open('dir/file', 'w') as f:
 
273
            f.write('spam')
253
274
        # would like to also try making the directory readonly, but at the
254
275
        # moment python shutil.rmtree doesn't handle that properly - it would
255
276
        # need to chmod the directory before removing things inside it - deferred
259
280
 
260
281
        osutils.rmtree('dir')
261
282
 
262
 
        self.failIfExists('dir/file')
263
 
        self.failIfExists('dir')
 
283
        self.assertPathDoesNotExist('dir/file')
 
284
        self.assertPathDoesNotExist('dir')
264
285
 
265
286
 
266
287
class TestDeleteAny(tests.TestCaseInTempDir):
279
300
 
280
301
    def test_file_kind(self):
281
302
        self.build_tree(['file', 'dir/'])
282
 
        self.assertEquals('file', osutils.file_kind('file'))
283
 
        self.assertEquals('directory', osutils.file_kind('dir/'))
 
303
        self.assertEqual('file', osutils.file_kind('file'))
 
304
        self.assertEqual('directory', osutils.file_kind('dir/'))
284
305
        if osutils.has_symlinks():
285
306
            os.symlink('symlink', 'symlink')
286
 
            self.assertEquals('symlink', osutils.file_kind('symlink'))
 
307
            self.assertEqual('symlink', osutils.file_kind('symlink'))
287
308
 
288
309
        # TODO: jam 20060529 Test a block device
289
310
        try:
290
311
            os.lstat('/dev/null')
291
 
        except OSError, e:
 
312
        except OSError as e:
292
313
            if e.errno not in (errno.ENOENT,):
293
314
                raise
294
315
        else:
295
 
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
 
316
            self.assertEqual(
 
317
                    'chardev',
 
318
                    osutils.file_kind(os.path.realpath('/dev/null')))
296
319
 
297
320
        mkfifo = getattr(os, 'mkfifo', None)
298
321
        if mkfifo:
299
322
            mkfifo('fifo')
300
323
            try:
301
 
                self.assertEquals('fifo', osutils.file_kind('fifo'))
 
324
                self.assertEqual('fifo', osutils.file_kind('fifo'))
302
325
            finally:
303
326
                os.remove('fifo')
304
327
 
307
330
            s = socket.socket(AF_UNIX)
308
331
            s.bind('socket')
309
332
            try:
310
 
                self.assertEquals('socket', osutils.file_kind('socket'))
 
333
                self.assertEqual('socket', osutils.file_kind('socket'))
311
334
            finally:
312
335
                os.remove('socket')
313
336
 
332
355
 
333
356
        orig_umask = osutils.get_umask()
334
357
        self.addCleanup(os.umask, orig_umask)
335
 
        os.umask(0222)
336
 
        self.assertEqual(0222, osutils.get_umask())
337
 
        os.umask(0022)
338
 
        self.assertEqual(0022, osutils.get_umask())
339
 
        os.umask(0002)
340
 
        self.assertEqual(0002, osutils.get_umask())
341
 
        os.umask(0027)
342
 
        self.assertEqual(0027, osutils.get_umask())
 
358
        os.umask(0o222)
 
359
        self.assertEqual(0o222, osutils.get_umask())
 
360
        os.umask(0o022)
 
361
        self.assertEqual(0o022, osutils.get_umask())
 
362
        os.umask(0o002)
 
363
        self.assertEqual(0o002, osutils.get_umask())
 
364
        os.umask(0o027)
 
365
        self.assertEqual(0o027, osutils.get_umask())
343
366
 
344
367
 
345
368
class TestDateTime(tests.TestCase):
381
404
        self.assertFormatedDelta('2 seconds in the future', -2)
382
405
 
383
406
    def test_format_date(self):
384
 
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
407
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
385
408
            osutils.format_date, 0, timezone='foo')
386
409
        self.assertIsInstance(osutils.format_date(0), str)
387
 
        self.assertIsInstance(osutils.format_local_date(0), unicode)
 
410
        self.assertIsInstance(osutils.format_local_date(0), text_type)
388
411
        # Testing for the actual value of the local weekday without
389
412
        # duplicating the code from format_date is difficult.
390
413
        # Instead blackbox.test_locale should check for localized
418
441
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
419
442
 
420
443
 
 
444
class TestFdatasync(tests.TestCaseInTempDir):
 
445
 
 
446
    def do_fdatasync(self):
 
447
        f = tempfile.NamedTemporaryFile()
 
448
        osutils.fdatasync(f.fileno())
 
449
        f.close()
 
450
 
 
451
    @staticmethod
 
452
    def raise_eopnotsupp(*args, **kwargs):
 
453
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
 
454
 
 
455
    @staticmethod
 
456
    def raise_enotsup(*args, **kwargs):
 
457
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
 
458
 
 
459
    def test_fdatasync_handles_system_function(self):
 
460
        self.overrideAttr(os, "fdatasync")
 
461
        self.do_fdatasync()
 
462
 
 
463
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
 
464
        self.overrideAttr(os, "fdatasync")
 
465
        self.overrideAttr(os, "fsync")
 
466
        self.do_fdatasync()
 
467
 
 
468
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
 
469
        self.overrideAttr(errno, "EOPNOTSUPP")
 
470
        self.do_fdatasync()
 
471
 
 
472
    def test_fdatasync_catches_ENOTSUP(self):
 
473
        enotsup = getattr(errno, "ENOTSUP", None)
 
474
        if enotsup is None:
 
475
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
 
476
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
 
477
        self.do_fdatasync()
 
478
 
 
479
    def test_fdatasync_catches_EOPNOTSUPP(self):
 
480
        enotsup = getattr(errno, "EOPNOTSUPP", None)
 
481
        if enotsup is None:
 
482
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
 
483
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
 
484
        self.do_fdatasync()
 
485
 
 
486
 
421
487
class TestLinks(tests.TestCaseInTempDir):
422
488
 
423
489
    def test_dereference_path(self):
424
 
        self.requireFeature(tests.SymlinkFeature)
 
490
        self.requireFeature(features.SymlinkFeature)
425
491
        cwd = osutils.realpath('.')
426
492
        os.mkdir('bar')
427
493
        bar_path = osutils.pathjoin(cwd, 'bar')
448
514
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
449
515
 
450
516
    def test_changing_access(self):
451
 
        f = file('file', 'w')
452
 
        f.write('monkey')
453
 
        f.close()
 
517
        with open('file', 'w') as f:
 
518
            f.write('monkey')
454
519
 
455
520
        # Make a file readonly
456
521
        osutils.make_readonly('file')
457
522
        mode = os.lstat('file').st_mode
458
 
        self.assertEqual(mode, mode & 0777555)
 
523
        self.assertEqual(mode, mode & 0o777555)
459
524
 
460
525
        # Make a file writable
461
526
        osutils.make_writable('file')
462
527
        mode = os.lstat('file').st_mode
463
 
        self.assertEqual(mode, mode | 0200)
 
528
        self.assertEqual(mode, mode | 0o200)
464
529
 
465
530
        if osutils.has_symlinks():
466
531
            # should not error when handed a symlink
474
539
 
475
540
class TestCanonicalRelPath(tests.TestCaseInTempDir):
476
541
 
477
 
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
 
542
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
478
543
 
479
544
    def test_canonical_relpath_simple(self):
480
 
        f = file('MixedCaseName', 'w')
 
545
        f = open('MixedCaseName', 'w')
481
546
        f.close()
482
547
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
483
 
        self.failUnlessEqual('work/MixedCaseName', actual)
 
548
        self.assertEqual('work/MixedCaseName', actual)
484
549
 
485
550
    def test_canonical_relpath_missing_tail(self):
486
551
        os.mkdir('MixedCaseParent')
487
552
        actual = osutils.canonical_relpath(self.test_base_dir,
488
553
                                           'mixedcaseparent/nochild')
489
 
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
 
554
        self.assertEqual('work/MixedCaseParent/nochild', actual)
490
555
 
491
556
 
492
557
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
536
601
    """Test pumpfile method."""
537
602
 
538
603
    def setUp(self):
539
 
        tests.TestCase.setUp(self)
 
604
        super(TestPumpFile, self).setUp()
540
605
        # create a test datablock
541
606
        self.block_size = 512
542
 
        pattern = '0123456789ABCDEF'
543
 
        self.test_data = pattern * (3 * self.block_size / len(pattern))
 
607
        pattern = b'0123456789ABCDEF'
 
608
        self.test_data = pattern * (3 * self.block_size // len(pattern))
544
609
        self.test_data_len = len(self.test_data)
545
610
 
546
611
    def test_bracket_block_size(self):
550
615
        self.assertTrue(self.test_data_len > self.block_size)
551
616
 
552
617
        from_file = file_utils.FakeReadFile(self.test_data)
553
 
        to_file = StringIO()
 
618
        to_file = BytesIO()
554
619
 
555
 
        # read (max / 2) bytes and verify read size wasn't affected
556
 
        num_bytes_to_read = self.block_size / 2
 
620
        # read (max // 2) bytes and verify read size wasn't affected
 
621
        num_bytes_to_read = self.block_size // 2
557
622
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
558
623
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
559
624
        self.assertEqual(from_file.get_read_count(), 1)
591
656
 
592
657
        # retrieve data in blocks
593
658
        from_file = file_utils.FakeReadFile(self.test_data)
594
 
        to_file = StringIO()
 
659
        to_file = BytesIO()
595
660
        osutils.pumpfile(from_file, to_file, self.test_data_len,
596
661
                         self.block_size)
597
662
 
615
680
 
616
681
        # retrieve data to EOF
617
682
        from_file = file_utils.FakeReadFile(self.test_data)
618
 
        to_file = StringIO()
 
683
        to_file = BytesIO()
619
684
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
620
685
 
621
686
        # verify read size was equal to the maximum read size
635
700
        with this new version."""
636
701
        # retrieve data using default (old) pumpfile method
637
702
        from_file = file_utils.FakeReadFile(self.test_data)
638
 
        to_file = StringIO()
 
703
        to_file = BytesIO()
639
704
        osutils.pumpfile(from_file, to_file)
640
705
 
641
706
        # report error if the data wasn't equal (we only report the size due
649
714
        activity = []
650
715
        def log_activity(length, direction):
651
716
            activity.append((length, direction))
652
 
        from_file = StringIO(self.test_data)
653
 
        to_file = StringIO()
 
717
        from_file = BytesIO(self.test_data)
 
718
        to_file = BytesIO()
654
719
        osutils.pumpfile(from_file, to_file, buff_size=500,
655
720
                         report_activity=log_activity, direction='read')
656
721
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
657
722
                          (36, 'read')], activity)
658
723
 
659
 
        from_file = StringIO(self.test_data)
660
 
        to_file = StringIO()
 
724
        from_file = BytesIO(self.test_data)
 
725
        to_file = BytesIO()
661
726
        del activity[:]
662
727
        osutils.pumpfile(from_file, to_file, buff_size=500,
663
728
                         report_activity=log_activity, direction='write')
665
730
                          (36, 'write')], activity)
666
731
 
667
732
        # And with a limited amount of data
668
 
        from_file = StringIO(self.test_data)
669
 
        to_file = StringIO()
 
733
        from_file = BytesIO(self.test_data)
 
734
        to_file = BytesIO()
670
735
        del activity[:]
671
736
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
672
737
                         report_activity=log_activity, direction='read')
677
742
class TestPumpStringFile(tests.TestCase):
678
743
 
679
744
    def test_empty(self):
680
 
        output = StringIO()
681
 
        osutils.pump_string_file("", output)
682
 
        self.assertEqual("", output.getvalue())
 
745
        output = BytesIO()
 
746
        osutils.pump_string_file(b"", output)
 
747
        self.assertEqual(b"", output.getvalue())
683
748
 
684
749
    def test_more_than_segment_size(self):
685
 
        output = StringIO()
686
 
        osutils.pump_string_file("123456789", output, 2)
687
 
        self.assertEqual("123456789", output.getvalue())
 
750
        output = BytesIO()
 
751
        osutils.pump_string_file(b"123456789", output, 2)
 
752
        self.assertEqual(b"123456789", output.getvalue())
688
753
 
689
754
    def test_segment_size(self):
690
 
        output = StringIO()
691
 
        osutils.pump_string_file("12", output, 2)
692
 
        self.assertEqual("12", output.getvalue())
 
755
        output = BytesIO()
 
756
        osutils.pump_string_file(b"12", output, 2)
 
757
        self.assertEqual(b"12", output.getvalue())
693
758
 
694
759
    def test_segment_size_multiple(self):
695
 
        output = StringIO()
696
 
        osutils.pump_string_file("1234", output, 2)
697
 
        self.assertEqual("1234", output.getvalue())
 
760
        output = BytesIO()
 
761
        osutils.pump_string_file(b"1234", output, 2)
 
762
        self.assertEqual(b"1234", output.getvalue())
698
763
 
699
764
 
700
765
class TestRelpath(tests.TestCase):
719
784
class TestSafeUnicode(tests.TestCase):
720
785
 
721
786
    def test_from_ascii_string(self):
722
 
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
 
787
        self.assertEqual(u'foobar', osutils.safe_unicode(b'foobar'))
723
788
 
724
789
    def test_from_unicode_string_ascii_contents(self):
725
790
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
728
793
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
729
794
 
730
795
    def test_from_utf8_string(self):
731
 
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
 
796
        self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
732
797
 
733
798
    def test_bad_utf8_string(self):
734
799
        self.assertRaises(errors.BzrBadParameterNotUnicode,
735
800
                          osutils.safe_unicode,
736
 
                          '\xbb\xbb')
 
801
                          b'\xbb\xbb')
737
802
 
738
803
 
739
804
class TestSafeUtf8(tests.TestCase):
740
805
 
741
806
    def test_from_ascii_string(self):
742
 
        f = 'foobar'
743
 
        self.assertEqual('foobar', osutils.safe_utf8(f))
 
807
        f = b'foobar'
 
808
        self.assertEqual(b'foobar', osutils.safe_utf8(f))
744
809
 
745
810
    def test_from_unicode_string_ascii_contents(self):
746
 
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
 
811
        self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
747
812
 
748
813
    def test_from_unicode_string_unicode_contents(self):
749
 
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
814
        self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
750
815
 
751
816
    def test_from_utf8_string(self):
752
 
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
 
817
        self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
753
818
 
754
819
    def test_bad_utf8_string(self):
755
820
        self.assertRaises(errors.BzrBadParameterNotUnicode,
756
 
                          osutils.safe_utf8, '\xbb\xbb')
 
821
                          osutils.safe_utf8, b'\xbb\xbb')
757
822
 
758
823
 
759
824
class TestSafeRevisionId(tests.TestCase):
760
825
 
761
826
    def test_from_ascii_string(self):
762
827
        # this shouldn't give a warning because it's getting an ascii string
763
 
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
828
        self.assertEqual(b'foobar', osutils.safe_revision_id(b'foobar'))
764
829
 
765
830
    def test_from_unicode_string_ascii_contents(self):
766
 
        self.assertEqual('bargam',
767
 
                         osutils.safe_revision_id(u'bargam', warn=False))
768
 
 
769
 
    def test_from_unicode_deprecated(self):
770
 
        self.assertEqual('bargam',
771
 
            self.callDeprecated([osutils._revision_id_warning],
772
 
                                osutils.safe_revision_id, u'bargam'))
 
831
        self.assertRaises(TypeError,
 
832
                          osutils.safe_revision_id, u'bargam')
773
833
 
774
834
    def test_from_unicode_string_unicode_contents(self):
775
 
        self.assertEqual('bargam\xc2\xae',
776
 
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
835
        self.assertRaises(TypeError,
 
836
                         osutils.safe_revision_id, u'bargam\xae')
777
837
 
778
838
    def test_from_utf8_string(self):
779
 
        self.assertEqual('foo\xc2\xae',
780
 
                         osutils.safe_revision_id('foo\xc2\xae'))
 
839
        self.assertEqual(b'foo\xc2\xae',
 
840
                         osutils.safe_revision_id(b'foo\xc2\xae'))
781
841
 
782
842
    def test_none(self):
783
843
        """Currently, None is a valid revision_id"""
787
847
class TestSafeFileId(tests.TestCase):
788
848
 
789
849
    def test_from_ascii_string(self):
790
 
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
850
        self.assertEqual(b'foobar', osutils.safe_file_id(b'foobar'))
791
851
 
792
852
    def test_from_unicode_string_ascii_contents(self):
793
 
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
794
 
 
795
 
    def test_from_unicode_deprecated(self):
796
 
        self.assertEqual('bargam',
797
 
            self.callDeprecated([osutils._file_id_warning],
798
 
                                osutils.safe_file_id, u'bargam'))
 
853
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
799
854
 
800
855
    def test_from_unicode_string_unicode_contents(self):
801
 
        self.assertEqual('bargam\xc2\xae',
802
 
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
856
        self.assertRaises(TypeError,
 
857
                          osutils.safe_file_id, u'bargam\xae')
803
858
 
804
859
    def test_from_utf8_string(self):
805
 
        self.assertEqual('foo\xc2\xae',
806
 
                         osutils.safe_file_id('foo\xc2\xae'))
 
860
        self.assertEqual(b'foo\xc2\xae',
 
861
                         osutils.safe_file_id(b'foo\xc2\xae'))
807
862
 
808
863
    def test_none(self):
809
864
        """Currently, None is a valid revision_id"""
810
865
        self.assertEqual(None, osutils.safe_file_id(None))
811
866
 
812
867
 
 
868
class TestSendAll(tests.TestCase):
 
869
 
 
870
    def test_send_with_disconnected_socket(self):
 
871
        class DisconnectedSocket(object):
 
872
            def __init__(self, err):
 
873
                self.err = err
 
874
            def send(self, content):
 
875
                raise self.err
 
876
            def close(self):
 
877
                pass
 
878
        # All of these should be treated as ConnectionReset
 
879
        errs = []
 
880
        for err_cls in (IOError, socket.error):
 
881
            for errnum in osutils._end_of_stream_errors:
 
882
                errs.append(err_cls(errnum))
 
883
        for err in errs:
 
884
            sock = DisconnectedSocket(err)
 
885
            self.assertRaises(errors.ConnectionReset,
 
886
                osutils.send_all, sock, b'some more content')
 
887
 
 
888
    def test_send_with_no_progress(self):
 
889
        # See https://bugs.launchpad.net/bzr/+bug/1047309
 
890
        # It seems that paramiko can get into a state where it doesn't error,
 
891
        # but it returns 0 bytes sent for requests over and over again.
 
892
        class NoSendingSocket(object):
 
893
            def __init__(self):
 
894
                self.call_count = 0
 
895
            def send(self, bytes):
 
896
                self.call_count += 1
 
897
                if self.call_count > 100:
 
898
                    # Prevent the test suite from hanging
 
899
                    raise RuntimeError('too many calls')
 
900
                return 0
 
901
        sock = NoSendingSocket()
 
902
        self.assertRaises(errors.ConnectionReset,
 
903
                          osutils.send_all, sock, b'content')
 
904
        self.assertEqual(1, sock.call_count)
 
905
 
 
906
 
 
907
class TestPosixFuncs(tests.TestCase):
 
908
    """Test that the posix version of normpath returns an appropriate path
 
909
       when used with 2 leading slashes."""
 
910
 
 
911
    def test_normpath(self):
 
912
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
 
913
        self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
914
        self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
915
 
 
916
 
813
917
class TestWin32Funcs(tests.TestCase):
814
918
    """Test that _win32 versions of os utilities return appropriate paths."""
815
919
 
816
920
    def test_abspath(self):
 
921
        self.requireFeature(features.win32_feature)
817
922
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
818
923
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
819
924
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
832
937
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
833
938
        self.assertEqual('path/to/foo',
834
939
                         osutils._win32_pathjoin('path/to/', 'foo'))
835
 
        self.assertEqual('/foo',
 
940
 
 
941
    def test_pathjoin_late_bugfix(self):
 
942
        if sys.version_info < (2, 7, 6):
 
943
            expected = '/foo'
 
944
        else:
 
945
            expected = 'C:/foo'
 
946
        self.assertEqual(expected,
836
947
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
837
 
        self.assertEqual('/foo',
 
948
        self.assertEqual(expected,
838
949
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
839
950
 
840
951
    def test_normpath(self):
845
956
 
846
957
    def test_getcwd(self):
847
958
        cwd = osutils._win32_getcwd()
848
 
        os_cwd = os.getcwdu()
 
959
        os_cwd = osutils._getcwd()
849
960
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
850
961
        # win32 is inconsistent whether it returns lower or upper case
851
962
        # and even if it was consistent the user might type the other
859
970
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
860
971
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
861
972
 
862
 
    def test_win98_abspath(self):
863
 
        # absolute path
864
 
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
865
 
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
866
 
        # UNC path
867
 
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
868
 
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
869
 
        # relative path
870
 
        cwd = osutils.getcwd().rstrip('/')
871
 
        drive = osutils._nt_splitdrive(cwd)[0]
872
 
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
873
 
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
874
 
        # unicode path
875
 
        u = u'\u1234'
876
 
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
877
 
 
878
973
 
879
974
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
880
975
    """Test win32 functions that create files."""
881
976
 
882
977
    def test_getcwd(self):
883
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
978
        self.requireFeature(features.UnicodeFilenameFeature)
884
979
        os.mkdir(u'mu-\xb5')
885
980
        os.chdir(u'mu-\xb5')
886
981
        # TODO: jam 20060427 This will probably fail on Mac OSX because
892
987
    def test_minimum_path_selection(self):
893
988
        self.assertEqual(set(),
894
989
            osutils.minimum_path_selection([]))
895
 
        self.assertEqual(set(['a']),
 
990
        self.assertEqual({'a'},
896
991
            osutils.minimum_path_selection(['a']))
897
 
        self.assertEqual(set(['a', 'b']),
 
992
        self.assertEqual({'a', 'b'},
898
993
            osutils.minimum_path_selection(['a', 'b']))
899
 
        self.assertEqual(set(['a/', 'b']),
 
994
        self.assertEqual({'a/', 'b'},
900
995
            osutils.minimum_path_selection(['a/', 'b']))
901
 
        self.assertEqual(set(['a/', 'b']),
 
996
        self.assertEqual({'a/', 'b'},
902
997
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
903
 
        self.assertEqual(set(['a-b', 'a', 'a0b']),
 
998
        self.assertEqual({'a-b', 'a', 'a0b'},
904
999
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
905
1000
 
906
1001
    def test_mkdtemp(self):
908
1003
        self.assertFalse('\\' in tmpdir)
909
1004
 
910
1005
    def test_rename(self):
911
 
        a = open('a', 'wb')
912
 
        a.write('foo\n')
913
 
        a.close()
914
 
        b = open('b', 'wb')
915
 
        b.write('baz\n')
916
 
        b.close()
 
1006
        with open('a', 'wb') as a:
 
1007
            a.write(b'foo\n')
 
1008
        with open('b', 'wb') as b:
 
1009
            b.write(b'baz\n')
917
1010
 
918
1011
        osutils._win32_rename('b', 'a')
919
 
        self.failUnlessExists('a')
920
 
        self.failIfExists('b')
921
 
        self.assertFileEqual('baz\n', 'a')
 
1012
        self.assertPathExists('a')
 
1013
        self.assertPathDoesNotExist('b')
 
1014
        self.assertFileEqual(b'baz\n', 'a')
922
1015
 
923
1016
    def test_rename_missing_file(self):
924
 
        a = open('a', 'wb')
925
 
        a.write('foo\n')
926
 
        a.close()
 
1017
        with open('a', 'wb') as a:
 
1018
            a.write(b'foo\n')
927
1019
 
928
1020
        try:
929
1021
            osutils._win32_rename('b', 'a')
930
 
        except (IOError, OSError), e:
 
1022
        except (IOError, OSError) as e:
931
1023
            self.assertEqual(errno.ENOENT, e.errno)
932
 
        self.assertFileEqual('foo\n', 'a')
 
1024
        self.assertFileEqual(b'foo\n', 'a')
933
1025
 
934
1026
    def test_rename_missing_dir(self):
935
1027
        os.mkdir('a')
936
1028
        try:
937
1029
            osutils._win32_rename('b', 'a')
938
 
        except (IOError, OSError), e:
 
1030
        except (IOError, OSError) as e:
939
1031
            self.assertEqual(errno.ENOENT, e.errno)
940
1032
 
941
1033
    def test_rename_current_dir(self):
947
1039
        # doesn't exist.
948
1040
        try:
949
1041
            osutils._win32_rename('b', '.')
950
 
        except (IOError, OSError), e:
 
1042
        except (IOError, OSError) as e:
951
1043
            self.assertEqual(errno.ENOENT, e.errno)
952
1044
 
953
1045
    def test_splitpath(self):
976
1068
    """Test mac special functions that require directories."""
977
1069
 
978
1070
    def test_getcwd(self):
979
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1071
        self.requireFeature(features.UnicodeFilenameFeature)
980
1072
        os.mkdir(u'B\xe5gfors')
981
1073
        os.chdir(u'B\xe5gfors')
982
1074
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
983
1075
 
984
1076
    def test_getcwd_nonnorm(self):
985
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1077
        self.requireFeature(features.UnicodeFilenameFeature)
986
1078
        # Test that _mac_getcwd() will normalize this path
987
1079
        os.mkdir(u'Ba\u030agfors')
988
1080
        os.chdir(u'Ba\u030agfors')
992
1084
class TestChunksToLines(tests.TestCase):
993
1085
 
994
1086
    def test_smoketest(self):
995
 
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
996
 
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
997
 
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
998
 
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
 
1087
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
 
1088
                         osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
 
1089
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
 
1090
                         osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
999
1091
 
1000
1092
    def test_osutils_binding(self):
1001
 
        from bzrlib.tests import test__chunks_to_lines
 
1093
        from . import test__chunks_to_lines
1002
1094
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
1003
 
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
 
1095
            from .._chunks_to_lines_pyx import chunks_to_lines
1004
1096
        else:
1005
 
            from bzrlib._chunks_to_lines_py import chunks_to_lines
 
1097
            from .._chunks_to_lines_py import chunks_to_lines
1006
1098
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1007
1099
 
1008
1100
 
1015
1107
                         osutils.split_lines(u'foo\nbar\xae\n'))
1016
1108
 
1017
1109
    def test_split_with_carriage_returns(self):
1018
 
        self.assertEqual(['foo\rbar\n'],
1019
 
                         osutils.split_lines('foo\rbar\n'))
 
1110
        self.assertEqual([b'foo\rbar\n'],
 
1111
                         osutils.split_lines(b'foo\rbar\n'))
1020
1112
 
1021
1113
 
1022
1114
class TestWalkDirs(tests.TestCaseInTempDir):
1071
1163
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
1072
1164
 
1073
1165
    def test_walkdirs_os_error(self):
1074
 
        # <https://bugs.edge.launchpad.net/bzr/+bug/338653>
 
1166
        # <https://bugs.launchpad.net/bzr/+bug/338653>
1075
1167
        # Pyrex readdir didn't raise useful messages if it had an error
1076
1168
        # reading the directory
1077
1169
        if sys.platform == 'win32':
1078
1170
            raise tests.TestNotApplicable(
1079
1171
                "readdir IOError not tested on win32")
 
1172
        self.requireFeature(features.not_running_as_root)
1080
1173
        os.mkdir("test-unreadable")
1081
1174
        os.chmod("test-unreadable", 0000)
1082
1175
        # must chmod it back so that it can be removed
1083
 
        self.addCleanup(os.chmod, "test-unreadable", 0700)
 
1176
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
1084
1177
        # The error is not raised until the generator is actually evaluated.
1085
1178
        # (It would be ok if it happened earlier but at the moment it
1086
1179
        # doesn't.)
1087
1180
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1088
 
        self.assertEquals('./test-unreadable', e.filename)
1089
 
        self.assertEquals(errno.EACCES, e.errno)
 
1181
        self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
 
1182
        self.assertEqual(errno.EACCES, e.errno)
1090
1183
        # Ensure the message contains the file name
1091
 
        self.assertContainsRe(str(e), "\./test-unreadable")
 
1184
        self.assertContainsRe(str(e), "\\./test-unreadable")
 
1185
 
 
1186
    def test_walkdirs_encoding_error(self):
 
1187
        # <https://bugs.launchpad.net/bzr/+bug/488519>
 
1188
        # walkdirs didn't raise a useful message when the filenames
 
1189
        # are not using the filesystem's encoding
 
1190
 
 
1191
        # require a bytestring based filesystem
 
1192
        self.requireFeature(features.ByteStringNamedFilesystem)
 
1193
 
 
1194
        tree = [
 
1195
            '.bzr',
 
1196
            '0file',
 
1197
            '1dir/',
 
1198
            '1dir/0file',
 
1199
            '1dir/1dir/',
 
1200
            '1file'
 
1201
            ]
 
1202
 
 
1203
        self.build_tree(tree)
 
1204
 
 
1205
        # rename the 1file to a latin-1 filename
 
1206
        os.rename(b"./1file", b"\xe8file")
 
1207
        if b"\xe8file" not in os.listdir("."):
 
1208
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
 
1209
 
 
1210
        self._save_platform_info()
 
1211
        osutils._fs_enc = 'UTF-8'
 
1212
 
 
1213
        # this should raise on error
 
1214
        def attempt():
 
1215
            for dirdetail, dirblock in osutils.walkdirs(b'.'):
 
1216
                pass
 
1217
 
 
1218
        self.assertRaises(errors.BadFilenameEncoding, attempt)
1092
1219
 
1093
1220
    def test__walkdirs_utf8(self):
1094
1221
        tree = [
1119
1246
            ]
1120
1247
        result = []
1121
1248
        found_bzrdir = False
1122
 
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1123
 
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
1249
        for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
 
1250
            if len(dirblock) and dirblock[0][1] == b'.bzr':
1124
1251
                # this tests the filtering of selected paths
1125
1252
                found_bzrdir = True
1126
1253
                del dirblock[0]
 
1254
            dirdetail = (dirdetail[0].decode('utf-8'), osutils.safe_unicode(dirdetail[1]))
 
1255
            dirblock = [
 
1256
                    (entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
 
1257
                    for entry in dirblock]
1127
1258
            result.append((dirdetail, dirblock))
1128
1259
 
1129
1260
        self.assertTrue(found_bzrdir)
1145
1276
            dirblock[:] = new_dirblock
1146
1277
 
1147
1278
    def _save_platform_info(self):
1148
 
        self.overrideAttr(win32utils, 'winver')
1149
1279
        self.overrideAttr(osutils, '_fs_enc')
1150
1280
        self.overrideAttr(osutils, '_selected_dir_reader')
1151
1281
 
1152
 
    def assertDirReaderIs(self, expected):
 
1282
    def assertDirReaderIs(self, expected, top):
1153
1283
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
1154
1284
        # Force it to redetect
1155
1285
        osutils._selected_dir_reader = None
1156
1286
        # Nothing to list, but should still trigger the selection logic
1157
 
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
 
1287
        self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
1158
1288
        self.assertIsInstance(osutils._selected_dir_reader, expected)
1159
1289
 
1160
1290
    def test_force_walkdirs_utf8_fs_utf8(self):
1161
1291
        self.requireFeature(UTF8DirReaderFeature)
1162
1292
        self._save_platform_info()
1163
 
        win32utils.winver = None # Avoid the win32 detection code
1164
 
        osutils._fs_enc = 'UTF-8'
1165
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1293
        osutils._fs_enc = 'utf-8'
 
1294
        self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
1166
1295
 
1167
1296
    def test_force_walkdirs_utf8_fs_ascii(self):
1168
1297
        self.requireFeature(UTF8DirReaderFeature)
1169
1298
        self._save_platform_info()
1170
 
        win32utils.winver = None # Avoid the win32 detection code
1171
 
        osutils._fs_enc = 'US-ASCII'
1172
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1173
 
 
1174
 
    def test_force_walkdirs_utf8_fs_ANSI(self):
1175
 
        self.requireFeature(UTF8DirReaderFeature)
1176
 
        self._save_platform_info()
1177
 
        win32utils.winver = None # Avoid the win32 detection code
1178
 
        osutils._fs_enc = 'ANSI_X3.4-1968'
1179
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1299
        osutils._fs_enc = 'ascii'
 
1300
        self.assertDirReaderIs(
 
1301
            UTF8DirReaderFeature.module.UTF8DirReader, b".")
1180
1302
 
1181
1303
    def test_force_walkdirs_utf8_fs_latin1(self):
1182
1304
        self._save_platform_info()
1183
 
        win32utils.winver = None # Avoid the win32 detection code
1184
 
        osutils._fs_enc = 'latin1'
1185
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
1305
        osutils._fs_enc = 'iso-8859-1'
 
1306
        self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
1186
1307
 
1187
1308
    def test_force_walkdirs_utf8_nt(self):
1188
1309
        # Disabled because the thunk of the whole walkdirs api is disabled.
1189
1310
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1190
1311
        self._save_platform_info()
1191
 
        win32utils.winver = 'Windows NT'
1192
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
1193
 
        self.assertDirReaderIs(Win32ReadDir)
1194
 
 
1195
 
    def test_force_walkdirs_utf8_98(self):
1196
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1197
 
        self._save_platform_info()
1198
 
        win32utils.winver = 'Windows 98'
1199
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
1312
        from .._walkdirs_win32 import Win32ReadDir
 
1313
        self.assertDirReaderIs(Win32ReadDir, ".")
1200
1314
 
1201
1315
    def test_unicode_walkdirs(self):
1202
1316
        """Walkdirs should always return unicode paths."""
1203
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1317
        self.requireFeature(features.UnicodeFilenameFeature)
1204
1318
        name0 = u'0file-\xb6'
1205
1319
        name1 = u'1dir-\u062c\u0648'
1206
1320
        name2 = u'2file-\u0633'
1243
1357
 
1244
1358
        The abspath portion might be in unicode or utf-8
1245
1359
        """
1246
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1360
        self.requireFeature(features.UnicodeFilenameFeature)
1247
1361
        name0 = u'0file-\xb6'
1248
1362
        name1 = u'1dir-\u062c\u0648'
1249
1363
        name2 = u'2file-\u0633'
1260
1374
        name2 = name2.encode('utf8')
1261
1375
 
1262
1376
        expected_dirblocks = [
1263
 
                (('', '.'),
1264
 
                 [(name0, name0, 'file', './' + name0),
1265
 
                  (name1, name1, 'directory', './' + name1),
1266
 
                  (name2, name2, 'file', './' + name2),
1267
 
                 ]
1268
 
                ),
1269
 
                ((name1, './' + name1),
1270
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1
1271
 
                                                        + '/' + name0),
1272
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1
1273
 
                                                            + '/' + name1),
1274
 
                 ]
1275
 
                ),
1276
 
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1377
                ((b'', b'.'),
 
1378
                 [(name0, name0, 'file', b'./' + name0),
 
1379
                  (name1, name1, 'directory', b'./' + name1),
 
1380
                  (name2, name2, 'file', b'./' + name2),
 
1381
                 ]
 
1382
                ),
 
1383
                ((name1, b'./' + name1),
 
1384
                 [(name1 + b'/' + name0, name0, 'file', b'./' + name1
 
1385
                                                        + b'/' + name0),
 
1386
                  (name1 + b'/' + name1, name1, 'directory', b'./' + name1
 
1387
                                                            + b'/' + name1),
 
1388
                 ]
 
1389
                ),
 
1390
                ((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
1277
1391
                 [
1278
1392
                 ]
1279
1393
                ),
1282
1396
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1283
1397
        # all abspaths are Unicode, and encode them back into utf8.
1284
1398
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1285
 
            self.assertIsInstance(dirdetail[0], str)
1286
 
            if isinstance(dirdetail[1], unicode):
 
1399
            self.assertIsInstance(dirdetail[0], bytes)
 
1400
            if isinstance(dirdetail[1], text_type):
1287
1401
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1288
1402
                dirblock = [list(info) for info in dirblock]
1289
1403
                for info in dirblock:
1290
 
                    self.assertIsInstance(info[4], unicode)
 
1404
                    self.assertIsInstance(info[4], text_type)
1291
1405
                    info[4] = info[4].encode('utf8')
1292
1406
            new_dirblock = []
1293
1407
            for info in dirblock:
1294
 
                self.assertIsInstance(info[0], str)
1295
 
                self.assertIsInstance(info[1], str)
1296
 
                self.assertIsInstance(info[4], str)
 
1408
                self.assertIsInstance(info[0], bytes)
 
1409
                self.assertIsInstance(info[1], bytes)
 
1410
                self.assertIsInstance(info[4], bytes)
1297
1411
                # Remove the stat information
1298
1412
                new_dirblock.append((info[0], info[1], info[2], info[4]))
1299
1413
            result.append((dirdetail, new_dirblock))
1304
1418
 
1305
1419
        The abspath portion should be in unicode
1306
1420
        """
1307
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1421
        self.requireFeature(features.UnicodeFilenameFeature)
1308
1422
        # Use the unicode reader. TODO: split into driver-and-driven unit
1309
1423
        # tests.
1310
1424
        self._save_platform_info()
1327
1441
        # All of the abspaths should be in unicode, all of the relative paths
1328
1442
        # should be in utf8
1329
1443
        expected_dirblocks = [
1330
 
                (('', '.'),
 
1444
                ((b'', '.'),
1331
1445
                 [(name0, name0, 'file', './' + name0u),
1332
1446
                  (name1, name1, 'directory', './' + name1u),
1333
1447
                  (name2, name2, 'file', './' + name2u),
1334
1448
                 ]
1335
1449
                ),
1336
1450
                ((name1, './' + name1u),
1337
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1451
                 [(name1 + b'/' + name0, name0, 'file', './' + name1u
1338
1452
                                                        + '/' + name0u),
1339
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1453
                  (name1 + b'/' + name1, name1, 'directory', './' + name1u
1340
1454
                                                            + '/' + name1u),
1341
1455
                 ]
1342
1456
                ),
1343
 
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1457
                ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1344
1458
                 [
1345
1459
                 ]
1346
1460
                ),
1351
1465
 
1352
1466
    def test__walkdirs_utf8_win32readdir(self):
1353
1467
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1354
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1355
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1468
        self.requireFeature(features.UnicodeFilenameFeature)
 
1469
        from .._walkdirs_win32 import Win32ReadDir
1356
1470
        self._save_platform_info()
1357
1471
        osutils._selected_dir_reader = Win32ReadDir()
1358
1472
        name0u = u'0file-\xb6'
1408
1522
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1409
1523
        """make sure our Stat values are valid"""
1410
1524
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1411
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1412
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1525
        self.requireFeature(features.UnicodeFilenameFeature)
 
1526
        from .._walkdirs_win32 import Win32ReadDir
1413
1527
        name0u = u'0file-\xb6'
1414
1528
        name0 = name0u.encode('utf8')
1415
1529
        self.build_tree([name0u])
1416
1530
        # I hate to sleep() here, but I'm trying to make the ctime different
1417
1531
        # from the mtime
1418
1532
        time.sleep(2)
1419
 
        f = open(name0u, 'ab')
1420
 
        try:
1421
 
            f.write('just a small update')
1422
 
        finally:
1423
 
            f.close()
 
1533
        with open(name0u, 'ab') as f:
 
1534
            f.write(b'just a small update')
1424
1535
 
1425
1536
        result = Win32ReadDir().read_dir('', u'.')
1426
1537
        entry = result[0]
1432
1543
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1433
1544
        """make sure our Stat values are valid"""
1434
1545
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1435
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1436
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1546
        self.requireFeature(features.UnicodeFilenameFeature)
 
1547
        from .._walkdirs_win32 import Win32ReadDir
1437
1548
        name0u = u'0dir-\u062c\u0648'
1438
1549
        name0 = name0u.encode('utf8')
1439
1550
        self.build_tree([name0u + '/'])
1519
1630
        # using the comparison routine shoudl work too:
1520
1631
        self.assertEqual(
1521
1632
            dir_sorted_paths,
1522
 
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
 
1633
            sorted(original_paths, key=osutils.path_prefix_key))
1523
1634
 
1524
1635
 
1525
1636
class TestCopyTree(tests.TestCaseInTempDir):
1538
1649
        self.assertEqual(['c'], os.listdir('target/b'))
1539
1650
 
1540
1651
    def test_copy_tree_symlinks(self):
1541
 
        self.requireFeature(tests.SymlinkFeature)
 
1652
        self.requireFeature(features.SymlinkFeature)
1542
1653
        self.build_tree(['source/'])
1543
1654
        os.symlink('a/generic/path', 'source/lnk')
1544
1655
        osutils.copy_tree('source', 'target')
1554
1665
            processed_files.append(('d', from_path, to_path))
1555
1666
        def link_handler(from_path, to_path):
1556
1667
            processed_links.append((from_path, to_path))
1557
 
        handlers = {'file':file_handler,
1558
 
                    'directory':dir_handler,
1559
 
                    'symlink':link_handler,
 
1668
        handlers = {'file': file_handler,
 
1669
                    'directory': dir_handler,
 
1670
                    'symlink': link_handler,
1560
1671
                   }
1561
1672
 
1562
1673
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1569
1680
                          ('d', 'source/b', 'target/b'),
1570
1681
                          ('f', 'source/b/c', 'target/b/c'),
1571
1682
                         ], processed_files)
1572
 
        self.failIfExists('target')
 
1683
        self.assertPathDoesNotExist('target')
1573
1684
        if osutils.has_symlinks():
1574
1685
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1575
1686
 
1580
1691
    def setUp(self):
1581
1692
        super(TestSetUnsetEnv, self).setUp()
1582
1693
 
1583
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
 
1694
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
1584
1695
                         'Environment was not cleaned up properly.'
1585
 
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
 
1696
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
1586
1697
        def cleanup():
1587
 
            if 'BZR_TEST_ENV_VAR' in os.environ:
1588
 
                del os.environ['BZR_TEST_ENV_VAR']
 
1698
            if 'BRZ_TEST_ENV_VAR' in os.environ:
 
1699
                del os.environ['BRZ_TEST_ENV_VAR']
1589
1700
        self.addCleanup(cleanup)
1590
1701
 
1591
1702
    def test_set(self):
1592
1703
        """Test that we can set an env variable"""
1593
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1704
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1594
1705
        self.assertEqual(None, old)
1595
 
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
 
1706
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
1596
1707
 
1597
1708
    def test_double_set(self):
1598
1709
        """Test that we get the old value out"""
1599
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1600
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
 
1710
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1711
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
1601
1712
        self.assertEqual('foo', old)
1602
 
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
 
1713
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
1603
1714
 
1604
1715
    def test_unicode(self):
1605
1716
        """Environment can only contain plain strings
1612
1723
                'Cannot find a unicode character that works in encoding %s'
1613
1724
                % (osutils.get_user_encoding(),))
1614
1725
 
1615
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1616
 
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
 
1726
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
 
1727
        if PY3:
 
1728
            self.assertEqual(uni_val, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1729
        else:
 
1730
            self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1617
1731
 
1618
1732
    def test_unset(self):
1619
1733
        """Test that passing None will remove the env var"""
1620
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1621
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
 
1734
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1735
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
1622
1736
        self.assertEqual('foo', old)
1623
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1624
 
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1737
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1738
        self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
1625
1739
 
1626
1740
 
1627
1741
class TestSizeShaFile(tests.TestCaseInTempDir):
1628
1742
 
1629
1743
    def test_sha_empty(self):
1630
 
        self.build_tree_contents([('foo', '')])
1631
 
        expected_sha = osutils.sha_string('')
 
1744
        self.build_tree_contents([('foo', b'')])
 
1745
        expected_sha = osutils.sha_string(b'')
1632
1746
        f = open('foo')
1633
1747
        self.addCleanup(f.close)
1634
1748
        size, sha = osutils.size_sha_file(f)
1636
1750
        self.assertEqual(expected_sha, sha)
1637
1751
 
1638
1752
    def test_sha_mixed_endings(self):
1639
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1753
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
1640
1754
        self.build_tree_contents([('foo', text)])
1641
1755
        expected_sha = osutils.sha_string(text)
1642
1756
        f = open('foo', 'rb')
1649
1763
class TestShaFileByName(tests.TestCaseInTempDir):
1650
1764
 
1651
1765
    def test_sha_empty(self):
1652
 
        self.build_tree_contents([('foo', '')])
1653
 
        expected_sha = osutils.sha_string('')
 
1766
        self.build_tree_contents([('foo', b'')])
 
1767
        expected_sha = osutils.sha_string(b'')
1654
1768
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1655
1769
 
1656
1770
    def test_sha_mixed_endings(self):
1657
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1771
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
1658
1772
        self.build_tree_contents([('foo', text)])
1659
1773
        expected_sha = osutils.sha_string(text)
1660
1774
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1663
1777
class TestResourceLoading(tests.TestCaseInTempDir):
1664
1778
 
1665
1779
    def test_resource_string(self):
1666
 
        # test resource in bzrlib
1667
 
        text = osutils.resource_string('bzrlib', 'debug.py')
 
1780
        # test resource in breezy
 
1781
        text = osutils.resource_string('breezy', 'debug.py')
1668
1782
        self.assertContainsRe(text, "debug_flags = set()")
1669
 
        # test resource under bzrlib
1670
 
        text = osutils.resource_string('bzrlib.ui', 'text.py')
 
1783
        # test resource under breezy
 
1784
        text = osutils.resource_string('breezy.ui', 'text.py')
1671
1785
        self.assertContainsRe(text, "class TextUIFactory")
1672
1786
        # test unsupported package
1673
1787
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1674
1788
            'yyy.xx')
1675
1789
        # test unknown resource
1676
 
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1677
 
 
1678
 
 
1679
 
class TestReCompile(tests.TestCase):
1680
 
 
1681
 
    def test_re_compile_checked(self):
1682
 
        r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1683
 
        self.assertTrue(r.match('aaaa'))
1684
 
        self.assertTrue(r.match('aAaA'))
1685
 
 
1686
 
    def test_re_compile_checked_error(self):
1687
 
        # like https://bugs.launchpad.net/bzr/+bug/251352
1688
 
        err = self.assertRaises(
1689
 
            errors.BzrCommandError,
1690
 
            osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1691
 
        self.assertEqual(
1692
 
            "Invalid regular expression in test case: '*': "
1693
 
            "nothing to repeat",
1694
 
            str(err))
 
1790
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
1695
1791
 
1696
1792
 
1697
1793
class TestDirReader(tests.TestCaseInTempDir):
1698
1794
 
 
1795
    scenarios = dir_reader_scenarios()
 
1796
 
1699
1797
    # Set by load_tests
1700
1798
    _dir_reader_class = None
1701
1799
    _native_to_unicode = None
1702
1800
 
1703
1801
    def setUp(self):
1704
 
        tests.TestCaseInTempDir.setUp(self)
 
1802
        super(TestDirReader, self).setUp()
1705
1803
        self.overrideAttr(osutils,
1706
1804
                          '_selected_dir_reader', self._dir_reader_class())
1707
1805
 
1714
1812
            '2file'
1715
1813
            ]
1716
1814
        expected_dirblocks = [
1717
 
                (('', '.'),
1718
 
                 [('0file', '0file', 'file'),
1719
 
                  ('1dir', '1dir', 'directory'),
1720
 
                  ('2file', '2file', 'file'),
1721
 
                 ]
1722
 
                ),
1723
 
                (('1dir', './1dir'),
1724
 
                 [('1dir/0file', '0file', 'file'),
1725
 
                  ('1dir/1dir', '1dir', 'directory'),
1726
 
                 ]
1727
 
                ),
1728
 
                (('1dir/1dir', './1dir/1dir'),
 
1815
                ((b'', '.'),
 
1816
                 [(b'0file', b'0file', 'file', './0file'),
 
1817
                  (b'1dir', b'1dir', 'directory', './1dir'),
 
1818
                  (b'2file', b'2file', 'file', './2file'),
 
1819
                 ]
 
1820
                ),
 
1821
                ((b'1dir', './1dir'),
 
1822
                 [(b'1dir/0file', b'0file', 'file', './1dir/0file'),
 
1823
                  (b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
 
1824
                 ]
 
1825
                ),
 
1826
                ((b'1dir/1dir', './1dir/1dir'),
1729
1827
                 [
1730
1828
                 ]
1731
1829
                ),
1738
1836
        result = list(osutils._walkdirs_utf8('.'))
1739
1837
        # Filter out stat and abspath
1740
1838
        self.assertEqual(expected_dirblocks,
1741
 
                         [(dirinfo, [line[0:3] for line in block])
1742
 
                          for dirinfo, block in result])
 
1839
                self._filter_out(result))
1743
1840
 
1744
1841
    def test_walk_sub_dir(self):
1745
1842
        tree, expected_dirblocks = self._get_ascii_tree()
1746
1843
        self.build_tree(tree)
1747
1844
        # you can search a subdir only, with a supplied prefix.
1748
 
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
 
1845
        result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
1749
1846
        # Filter out stat and abspath
1750
1847
        self.assertEqual(expected_dirblocks[1:],
1751
 
                         [(dirinfo, [line[0:3] for line in block])
1752
 
                          for dirinfo, block in result])
 
1848
                self._filter_out(result))
1753
1849
 
1754
1850
    def _get_unicode_tree(self):
1755
1851
        name0u = u'0file-\xb6'
1766
1862
        name1 = name1u.encode('UTF-8')
1767
1863
        name2 = name2u.encode('UTF-8')
1768
1864
        expected_dirblocks = [
1769
 
                (('', '.'),
 
1865
                ((b'', '.'),
1770
1866
                 [(name0, name0, 'file', './' + name0u),
1771
1867
                  (name1, name1, 'directory', './' + name1u),
1772
1868
                  (name2, name2, 'file', './' + name2u),
1773
1869
                 ]
1774
1870
                ),
1775
1871
                ((name1, './' + name1u),
1776
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1872
                 [(name1 + b'/' + name0, name0, 'file', './' + name1u
1777
1873
                                                        + '/' + name0u),
1778
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1874
                  (name1 + b'/' + name1, name1, 'directory', './' + name1u
1779
1875
                                                            + '/' + name1u),
1780
1876
                 ]
1781
1877
                ),
1782
 
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1878
                ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1783
1879
                 [
1784
1880
                 ]
1785
1881
                ),
1801
1897
        return filtered_dirblocks
1802
1898
 
1803
1899
    def test_walk_unicode_tree(self):
1804
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1900
        self.requireFeature(features.UnicodeFilenameFeature)
1805
1901
        tree, expected_dirblocks = self._get_unicode_tree()
1806
1902
        self.build_tree(tree)
1807
1903
        result = list(osutils._walkdirs_utf8('.'))
1808
1904
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1809
1905
 
1810
1906
    def test_symlink(self):
1811
 
        self.requireFeature(tests.SymlinkFeature)
1812
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1907
        self.requireFeature(features.SymlinkFeature)
 
1908
        self.requireFeature(features.UnicodeFilenameFeature)
1813
1909
        target = u'target\N{Euro Sign}'
1814
1910
        link_name = u'l\N{Euro Sign}nk'
1815
1911
        os.symlink(target, link_name)
1816
1912
        target_utf8 = target.encode('UTF-8')
1817
1913
        link_name_utf8 = link_name.encode('UTF-8')
1818
1914
        expected_dirblocks = [
1819
 
                (('', '.'),
 
1915
                ((b'', '.'),
1820
1916
                 [(link_name_utf8, link_name_utf8,
1821
1917
                   'symlink', './' + link_name),],
1822
1918
                 )]
1833
1929
    But prior python versions failed to properly encode the passed unicode
1834
1930
    string.
1835
1931
    """
1836
 
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
 
1932
    _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
1837
1933
 
1838
1934
    def setUp(self):
1839
1935
        super(tests.TestCaseInTempDir, self).setUp()
1842
1938
        os.symlink(self.target, self.link)
1843
1939
 
1844
1940
    def test_os_readlink_link_encoding(self):
1845
 
        if sys.version_info < (2, 6):
1846
 
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1847
 
        else:
1848
 
            self.assertEquals(self.target,  os.readlink(self.link))
 
1941
        self.assertEqual(self.target,  os.readlink(self.link))
1849
1942
 
1850
1943
    def test_os_readlink_link_decoding(self):
1851
 
        self.assertEquals(self.target.encode(osutils._fs_enc),
 
1944
        self.assertEqual(self.target.encode(osutils._fs_enc),
1852
1945
                          os.readlink(self.link.encode(osutils._fs_enc)))
1853
1946
 
1854
1947
 
1863
1956
        self.assertIsInstance(concurrency, int)
1864
1957
 
1865
1958
    def test_local_concurrency_environment_variable(self):
1866
 
        os.environ['BZR_CONCURRENCY'] = '2'
 
1959
        self.overrideEnv('BRZ_CONCURRENCY', '2')
1867
1960
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1868
 
        os.environ['BZR_CONCURRENCY'] = '3'
 
1961
        self.overrideEnv('BRZ_CONCURRENCY', '3')
1869
1962
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1870
 
        os.environ['BZR_CONCURRENCY'] = 'foo'
 
1963
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
1871
1964
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1872
1965
 
1873
1966
    def test_option_concurrency(self):
1874
 
        os.environ['BZR_CONCURRENCY'] = '1'
 
1967
        self.overrideEnv('BRZ_CONCURRENCY', '1')
1875
1968
        self.run_bzr('rocks --concurrency 42')
1876
 
        # Command line overrides envrionment variable
1877
 
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1878
 
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
 
1969
        # Command line overrides environment variable
 
1970
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
 
1971
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
1879
1972
 
1880
1973
 
1881
1974
class TestFailedToLoadExtension(tests.TestCase):
1882
1975
 
1883
1976
    def _try_loading(self):
1884
1977
        try:
1885
 
            import bzrlib._fictional_extension_py
1886
 
        except ImportError, e:
 
1978
            import breezy._fictional_extension_py
 
1979
        except ImportError as e:
1887
1980
            osutils.failed_to_load_extension(e)
1888
1981
            return True
1889
1982
 
1894
1987
    def test_failure_to_load(self):
1895
1988
        self._try_loading()
1896
1989
        self.assertLength(1, osutils._extension_load_failures)
1897
 
        self.assertEquals(osutils._extension_load_failures[0],
1898
 
            "No module named _fictional_extension_py")
 
1990
        if PY3:
 
1991
            self.assertEqual(osutils._extension_load_failures[0],
 
1992
                "No module named 'breezy._fictional_extension_py'")
 
1993
        else:
 
1994
            self.assertEqual(osutils._extension_load_failures[0],
 
1995
                "No module named _fictional_extension_py")
1899
1996
 
1900
1997
    def test_report_extension_load_failures_no_warning(self):
1901
1998
        self.assertTrue(self._try_loading())
1904
2001
        self.assertLength(0, warnings)
1905
2002
 
1906
2003
    def test_report_extension_load_failures_message(self):
1907
 
        log = StringIO()
 
2004
        log = BytesIO()
1908
2005
        trace.push_log_file(log)
1909
2006
        self.assertTrue(self._try_loading())
1910
2007
        osutils.report_extension_load_failures()
1911
2008
        self.assertContainsRe(
1912
2009
            log.getvalue(),
1913
 
            r"bzr: warning: some compiled extensions could not be loaded; "
1914
 
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
 
2010
            br"brz: warning: some compiled extensions could not be loaded; "
 
2011
            b"see ``brz help missing-extensions``\n"
1915
2012
            )
1916
2013
 
1917
2014
 
1918
2015
class TestTerminalWidth(tests.TestCase):
1919
2016
 
 
2017
    def setUp(self):
 
2018
        super(TestTerminalWidth, self).setUp()
 
2019
        self._orig_terminal_size_state = osutils._terminal_size_state
 
2020
        self._orig_first_terminal_size = osutils._first_terminal_size
 
2021
        self.addCleanup(self.restore_osutils_globals)
 
2022
        osutils._terminal_size_state = 'no_data'
 
2023
        osutils._first_terminal_size = None
 
2024
 
 
2025
    def restore_osutils_globals(self):
 
2026
        osutils._terminal_size_state = self._orig_terminal_size_state
 
2027
        osutils._first_terminal_size = self._orig_first_terminal_size
 
2028
 
1920
2029
    def replace_stdout(self, new):
1921
2030
        self.overrideAttr(sys, 'stdout', new)
1922
2031
 
1934
2043
    def test_default_values(self):
1935
2044
        self.assertEqual(80, osutils.default_terminal_width)
1936
2045
 
1937
 
    def test_defaults_to_BZR_COLUMNS(self):
1938
 
        # BZR_COLUMNS is set by the test framework
1939
 
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1940
 
        os.environ['BZR_COLUMNS'] = '12'
 
2046
    def test_defaults_to_BRZ_COLUMNS(self):
 
2047
        # BRZ_COLUMNS is set by the test framework
 
2048
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
 
2049
        self.overrideEnv('BRZ_COLUMNS', '12')
1941
2050
        self.assertEqual(12, osutils.terminal_width())
1942
2051
 
 
2052
    def test_BRZ_COLUMNS_0_no_limit(self):
 
2053
        self.overrideEnv('BRZ_COLUMNS', '0')
 
2054
        self.assertEqual(None, osutils.terminal_width())
 
2055
 
1943
2056
    def test_falls_back_to_COLUMNS(self):
1944
 
        del os.environ['BZR_COLUMNS']
 
2057
        self.overrideEnv('BRZ_COLUMNS', None)
1945
2058
        self.assertNotEqual('42', os.environ['COLUMNS'])
1946
2059
        self.set_fake_tty()
1947
 
        os.environ['COLUMNS'] = '42'
 
2060
        self.overrideEnv('COLUMNS', '42')
1948
2061
        self.assertEqual(42, osutils.terminal_width())
1949
2062
 
1950
2063
    def test_tty_default_without_columns(self):
1951
 
        del os.environ['BZR_COLUMNS']
1952
 
        del os.environ['COLUMNS']
 
2064
        self.overrideEnv('BRZ_COLUMNS', None)
 
2065
        self.overrideEnv('COLUMNS', None)
1953
2066
 
1954
2067
        def terminal_size(w, h):
1955
2068
            return 42, 42
1962
2075
        self.assertEqual(42, osutils.terminal_width())
1963
2076
 
1964
2077
    def test_non_tty_default_without_columns(self):
1965
 
        del os.environ['BZR_COLUMNS']
1966
 
        del os.environ['COLUMNS']
 
2078
        self.overrideEnv('BRZ_COLUMNS', None)
 
2079
        self.overrideEnv('COLUMNS', None)
1967
2080
        self.replace_stdout(None)
1968
2081
        self.assertEqual(None, osutils.terminal_width())
1969
2082
 
1979
2092
        else:
1980
2093
            self.overrideAttr(termios, 'TIOCGWINSZ')
1981
2094
            del termios.TIOCGWINSZ
1982
 
        del os.environ['BZR_COLUMNS']
1983
 
        del os.environ['COLUMNS']
 
2095
        self.overrideEnv('BRZ_COLUMNS', None)
 
2096
        self.overrideEnv('COLUMNS', None)
1984
2097
        # Whatever the result is, if we don't raise an exception, it's ok.
1985
2098
        osutils.terminal_width()
1986
2099
 
 
2100
 
1987
2101
class TestCreationOps(tests.TestCaseInTempDir):
1988
2102
    _test_needs_features = [features.chown_feature]
1989
2103
 
1990
2104
    def setUp(self):
1991
 
        tests.TestCaseInTempDir.setUp(self)
 
2105
        super(TestCreationOps, self).setUp()
1992
2106
        self.overrideAttr(os, 'chown', self._dummy_chown)
1993
2107
 
1994
2108
        # params set by call to _dummy_chown
2004
2118
        osutils.copy_ownership_from_path('test_file', ownsrc)
2005
2119
 
2006
2120
        s = os.stat(ownsrc)
2007
 
        self.assertEquals(self.path, 'test_file')
2008
 
        self.assertEquals(self.uid, s.st_uid)
2009
 
        self.assertEquals(self.gid, s.st_gid)
 
2121
        self.assertEqual(self.path, 'test_file')
 
2122
        self.assertEqual(self.uid, s.st_uid)
 
2123
        self.assertEqual(self.gid, s.st_gid)
2010
2124
 
2011
2125
    def test_copy_ownership_nonesrc(self):
2012
2126
        """copy_ownership_from_path test with src=None."""
2015
2129
        osutils.copy_ownership_from_path('test_file')
2016
2130
 
2017
2131
        s = os.stat('..')
2018
 
        self.assertEquals(self.path, 'test_file')
2019
 
        self.assertEquals(self.uid, s.st_uid)
2020
 
        self.assertEquals(self.gid, s.st_gid)
 
2132
        self.assertEqual(self.path, 'test_file')
 
2133
        self.assertEqual(self.uid, s.st_uid)
 
2134
        self.assertEqual(self.gid, s.st_gid)
 
2135
 
 
2136
 
 
2137
class TestPathFromEnviron(tests.TestCase):
 
2138
 
 
2139
    def test_is_unicode(self):
 
2140
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
 
2141
        path = osutils.path_from_environ('BRZ_TEST_PATH')
 
2142
        self.assertIsInstance(path, text_type)
 
2143
        self.assertEqual(u'./anywhere at all/', path)
 
2144
 
 
2145
    def test_posix_path_env_ascii(self):
 
2146
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
 
2147
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
 
2148
        self.assertIsInstance(home, text_type)
 
2149
        self.assertEqual(u'/tmp', home)
 
2150
 
 
2151
    def test_posix_path_env_unicode(self):
 
2152
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2153
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
 
2154
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2155
        self.assertEqual(u'/home/\xa7test',
 
2156
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2157
        osutils._fs_enc = "iso8859-5"
 
2158
        if PY3:
 
2159
            # In Python 3, os.environ returns unicode.
 
2160
            self.assertEqual(u'/home/\xa7test',
 
2161
                osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2162
        else:
 
2163
            self.assertEqual(u'/home/\u0407test',
 
2164
                osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2165
            osutils._fs_enc = "utf-8"
 
2166
            self.assertRaises(errors.BadFilenameEncoding,
 
2167
                osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
 
2168
 
 
2169
 
 
2170
class TestGetHomeDir(tests.TestCase):
 
2171
 
 
2172
    def test_is_unicode(self):
 
2173
        home = osutils._get_home_dir()
 
2174
        self.assertIsInstance(home, text_type)
 
2175
 
 
2176
    def test_posix_homeless(self):
 
2177
        self.overrideEnv('HOME', None)
 
2178
        home = osutils._get_home_dir()
 
2179
        self.assertIsInstance(home, text_type)
 
2180
 
 
2181
    def test_posix_home_ascii(self):
 
2182
        self.overrideEnv('HOME', '/home/test')
 
2183
        home = osutils._posix_get_home_dir()
 
2184
        self.assertIsInstance(home, text_type)
 
2185
        self.assertEqual(u'/home/test', home)
 
2186
 
 
2187
    def test_posix_home_unicode(self):
 
2188
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2189
        self.overrideEnv('HOME', '/home/\xa7test')
 
2190
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2191
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2192
        osutils._fs_enc = "iso8859-5"
 
2193
        if PY3:
 
2194
            # In python 3, os.environ returns unicode
 
2195
            self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2196
        else:
 
2197
            self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
 
2198
            osutils._fs_enc = "utf-8"
 
2199
            self.assertRaises(errors.BadFilenameEncoding,
 
2200
                osutils._posix_get_home_dir)
 
2201
 
 
2202
 
 
2203
class TestGetuserUnicode(tests.TestCase):
 
2204
 
 
2205
    def test_is_unicode(self):
 
2206
        user = osutils.getuser_unicode()
 
2207
        self.assertIsInstance(user, text_type)
 
2208
 
 
2209
    def envvar_to_override(self):
 
2210
        if sys.platform == "win32":
 
2211
            # Disable use of platform calls on windows so envvar is used
 
2212
            self.overrideAttr(win32utils, 'has_ctypes', False)
 
2213
            return 'USERNAME' # only variable used on windows
 
2214
        return 'LOGNAME' # first variable checked by getpass.getuser()
 
2215
 
 
2216
    def test_ascii_user(self):
 
2217
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
 
2218
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
 
2219
 
 
2220
    def test_unicode_user(self):
 
2221
        ue = osutils.get_user_encoding()
 
2222
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
2223
        if uni_val is None:
 
2224
            raise tests.TestSkipped(
 
2225
                'Cannot find a unicode character that works in encoding %s'
 
2226
                % (osutils.get_user_encoding(),))
 
2227
        uni_username = u'jrandom' + uni_val
 
2228
        encoded_username = uni_username.encode(ue)
 
2229
        if PY3:
 
2230
            self.overrideEnv(self.envvar_to_override(), uni_username)
 
2231
        else:
 
2232
            self.overrideEnv(self.envvar_to_override(), encoded_username)
 
2233
        self.assertEqual(uni_username, osutils.getuser_unicode())
 
2234
 
 
2235
 
 
2236
class TestBackupNames(tests.TestCase):
 
2237
 
 
2238
    def setUp(self):
 
2239
        super(TestBackupNames, self).setUp()
 
2240
        self.backups = []
 
2241
 
 
2242
    def backup_exists(self, name):
 
2243
        return name in self.backups
 
2244
 
 
2245
    def available_backup_name(self, name):
 
2246
        backup_name = osutils.available_backup_name(name, self.backup_exists)
 
2247
        self.backups.append(backup_name)
 
2248
        return backup_name
 
2249
 
 
2250
    def assertBackupName(self, expected, name):
 
2251
        self.assertEqual(expected, self.available_backup_name(name))
 
2252
 
 
2253
    def test_empty(self):
 
2254
        self.assertBackupName('file.~1~', 'file')
 
2255
 
 
2256
    def test_existing(self):
 
2257
        self.available_backup_name('file')
 
2258
        self.available_backup_name('file')
 
2259
        self.assertBackupName('file.~3~', 'file')
 
2260
        # Empty slots are found, this is not a strict requirement and may be
 
2261
        # revisited if we test against all implementations.
 
2262
        self.backups.remove('file.~2~')
 
2263
        self.assertBackupName('file.~2~', 'file')
 
2264
 
 
2265
 
 
2266
class TestFindExecutableInPath(tests.TestCase):
 
2267
 
 
2268
    def test_windows(self):
 
2269
        if sys.platform != 'win32':
 
2270
            raise tests.TestSkipped('test requires win32')
 
2271
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
 
2272
        self.assertTrue(
 
2273
            osutils.find_executable_on_path('explorer.exe') is not None)
 
2274
        self.assertTrue(
 
2275
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
 
2276
        self.assertTrue(
 
2277
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2278
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
 
2279
        
 
2280
    def test_windows_app_path(self):
 
2281
        if sys.platform != 'win32':
 
2282
            raise tests.TestSkipped('test requires win32')
 
2283
        # Override PATH env var so that exe can only be found on App Path
 
2284
        self.overrideEnv('PATH', '')
 
2285
        # Internt Explorer is always registered in the App Path
 
2286
        self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
 
2287
 
 
2288
    def test_other(self):
 
2289
        if sys.platform == 'win32':
 
2290
            raise tests.TestSkipped('test requires non-win32')
 
2291
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
 
2292
        self.assertTrue(
 
2293
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2294
 
 
2295
 
 
2296
class TestEnvironmentErrors(tests.TestCase):
 
2297
    """Test handling of environmental errors"""
 
2298
 
 
2299
    def test_is_oserror(self):
 
2300
        self.assertTrue(osutils.is_environment_error(
 
2301
            OSError(errno.EINVAL, "Invalid parameter")))
 
2302
 
 
2303
    def test_is_ioerror(self):
 
2304
        self.assertTrue(osutils.is_environment_error(
 
2305
            IOError(errno.EINVAL, "Invalid parameter")))
 
2306
 
 
2307
    def test_is_socket_error(self):
 
2308
        self.assertTrue(osutils.is_environment_error(
 
2309
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2310
 
 
2311
    def test_is_select_error(self):
 
2312
        self.assertTrue(osutils.is_environment_error(
 
2313
            select.error(errno.EINVAL, "Invalid parameter")))
 
2314
 
 
2315
    def test_is_pywintypes_error(self):
 
2316
        self.requireFeature(features.pywintypes)
 
2317
        import pywintypes
 
2318
        self.assertTrue(osutils.is_environment_error(
 
2319
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))