/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_osutils.py

  • Committer: Breezy landing bot
  • Author(s): Jelmer Vernooij
  • Date: 2018-09-13 12:50:28 UTC
  • mfrom: (7096.2.2 empty-port)
  • Revision ID: breezy.the.bot@gmail.com-20180913125028-mja5gz8xsams9iey
Allow port to be empty when parsing URLs.

Merged from https://code.launchpad.net/~jelmer/brz/empty-port/+merge/354640

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
 
from cStringIO import StringIO
 
19
from __future__ import absolute_import, division
 
20
 
20
21
import errno
21
22
import os
22
23
import re
 
24
import select
23
25
import socket
24
 
import stat
25
26
import sys
 
27
import tempfile
26
28
import time
27
29
 
28
 
from bzrlib import (
 
30
from .. import (
29
31
    errors,
 
32
    lazy_regex,
30
33
    osutils,
 
34
    symbol_versioning,
31
35
    tests,
32
36
    trace,
33
37
    win32utils,
34
38
    )
35
 
from bzrlib.tests import (
 
39
from ..sixish import (
 
40
    BytesIO,
 
41
    PY3,
 
42
    text_type,
 
43
    )
 
44
from . import (
36
45
    features,
37
46
    file_utils,
38
47
    test__walkdirs_win32,
39
48
    )
40
 
 
41
 
 
42
 
class _UTF8DirReaderFeature(tests.Feature):
 
49
from .scenarios import load_tests_apply_scenarios
 
50
 
 
51
 
 
52
class _UTF8DirReaderFeature(features.ModuleAvailableFeature):
43
53
 
44
54
    def _probe(self):
45
55
        try:
46
 
            from bzrlib import _readdir_pyx
 
56
            from .. import _readdir_pyx
 
57
            self._module = _readdir_pyx
47
58
            self.reader = _readdir_pyx.UTF8DirReader
48
59
            return True
49
60
        except ImportError:
50
61
            return False
51
62
 
52
 
    def feature_name(self):
53
 
        return 'bzrlib._readdir_pyx'
54
 
 
55
 
UTF8DirReaderFeature = _UTF8DirReaderFeature()
56
 
 
57
 
term_ios_feature = tests.ModuleAvailableFeature('termios')
 
63
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
 
64
 
 
65
term_ios_feature = features.ModuleAvailableFeature('termios')
58
66
 
59
67
 
60
68
def _already_unicode(s):
78
86
    # Some DirReaders are platform specific and even there they may not be
79
87
    # available.
80
88
    if UTF8DirReaderFeature.available():
81
 
        from bzrlib import _readdir_pyx
 
89
        from .. import _readdir_pyx
82
90
        scenarios.append(('utf8',
83
91
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
84
92
                               _native_to_unicode=_utf8_to_unicode)))
85
93
 
86
94
    if test__walkdirs_win32.win32_readdir_feature.available():
87
95
        try:
88
 
            from bzrlib import _walkdirs_win32
 
96
            from .. import _walkdirs_win32
89
97
            scenarios.append(
90
98
                ('win32',
91
99
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
95
103
    return scenarios
96
104
 
97
105
 
98
 
def load_tests(basic_tests, module, loader):
99
 
    suite = loader.suiteClass()
100
 
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
101
 
        basic_tests, tests.condition_isinstance(TestDirReader))
102
 
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
103
 
    suite.addTest(remaining_tests)
104
 
    return suite
 
106
load_tests = load_tests_apply_scenarios
105
107
 
106
108
 
107
109
class TestContainsWhitespace(tests.TestCase):
108
110
 
109
111
    def test_contains_whitespace(self):
110
 
        self.failUnless(osutils.contains_whitespace(u' '))
111
 
        self.failUnless(osutils.contains_whitespace(u'hello there'))
112
 
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
113
 
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
114
 
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
115
 
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
112
        self.assertTrue(osutils.contains_whitespace(u' '))
 
113
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
 
114
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
 
115
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
 
116
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
 
117
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
116
118
 
117
119
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
118
120
        # is whitespace, but we do not.
119
 
        self.failIf(osutils.contains_whitespace(u''))
120
 
        self.failIf(osutils.contains_whitespace(u'hellothere'))
121
 
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
121
        self.assertFalse(osutils.contains_whitespace(u''))
 
122
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
 
123
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
122
124
 
123
125
 
124
126
class TestRename(tests.TestCaseInTempDir):
136
138
 
137
139
    def test_fancy_rename(self):
138
140
        # This should work everywhere
139
 
        self.create_file('a', 'something in a\n')
 
141
        self.create_file('a', b'something in a\n')
140
142
        self._fancy_rename('a', 'b')
141
 
        self.failIfExists('a')
142
 
        self.failUnlessExists('b')
143
 
        self.check_file_contents('b', 'something in a\n')
 
143
        self.assertPathDoesNotExist('a')
 
144
        self.assertPathExists('b')
 
145
        self.check_file_contents('b', b'something in a\n')
144
146
 
145
 
        self.create_file('a', 'new something in a\n')
 
147
        self.create_file('a', b'new something in a\n')
146
148
        self._fancy_rename('b', 'a')
147
149
 
148
 
        self.check_file_contents('a', 'something in a\n')
 
150
        self.check_file_contents('a', b'something in a\n')
149
151
 
150
152
    def test_fancy_rename_fails_source_missing(self):
151
153
        # An exception should be raised, and the target should be left in place
152
 
        self.create_file('target', 'data in target\n')
 
154
        self.create_file('target', b'data in target\n')
153
155
        self.assertRaises((IOError, OSError), self._fancy_rename,
154
156
                          'missingsource', 'target')
155
 
        self.failUnlessExists('target')
156
 
        self.check_file_contents('target', 'data in target\n')
 
157
        self.assertPathExists('target')
 
158
        self.check_file_contents('target', b'data in target\n')
157
159
 
158
160
    def test_fancy_rename_fails_if_source_and_target_missing(self):
159
161
        self.assertRaises((IOError, OSError), self._fancy_rename,
161
163
 
162
164
    def test_rename(self):
163
165
        # Rename should be semi-atomic on all platforms
164
 
        self.create_file('a', 'something in a\n')
 
166
        self.create_file('a', b'something in a\n')
165
167
        osutils.rename('a', 'b')
166
 
        self.failIfExists('a')
167
 
        self.failUnlessExists('b')
168
 
        self.check_file_contents('b', 'something in a\n')
 
168
        self.assertPathDoesNotExist('a')
 
169
        self.assertPathExists('b')
 
170
        self.check_file_contents('b', b'something in a\n')
169
171
 
170
 
        self.create_file('a', 'new something in a\n')
 
172
        self.create_file('a', b'new something in a\n')
171
173
        osutils.rename('b', 'a')
172
174
 
173
 
        self.check_file_contents('a', 'something in a\n')
 
175
        self.check_file_contents('a', b'something in a\n')
174
176
 
175
177
    # TODO: test fancy_rename using a MemoryTransport
176
178
 
182
184
        # we can't use failUnlessExists on case-insensitive filesystem
183
185
        # so try to check shape of the tree
184
186
        shape = sorted(os.listdir('.'))
185
 
        self.assertEquals(['A', 'B'], shape)
 
187
        self.assertEqual(['A', 'B'], shape)
186
188
 
187
 
    def test_rename_error(self):
188
 
        # We wrap os.rename to make it give an error including the filenames
189
 
        # https://bugs.launchpad.net/bzr/+bug/491763
190
 
        err = self.assertRaises(OSError, osutils.rename,
191
 
            'nonexistent', 'target')
192
 
        self.assertContainsString(str(err), 'nonexistent')
 
189
    def test_rename_exception(self):
 
190
        try:
 
191
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
 
192
        except OSError as e:
 
193
            self.assertEqual(e.old_filename, 'nonexistent_path')
 
194
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
 
195
            self.assertTrue('nonexistent_path' in e.strerror)
 
196
            self.assertTrue('different_nonexistent_path' in e.strerror)
193
197
 
194
198
 
195
199
class TestRandChars(tests.TestCase):
222
226
                         (['src'], SRC_FOO_C),
223
227
                         (['src'], 'src'),
224
228
                         ]:
225
 
            self.assert_(osutils.is_inside_any(dirs, fn))
 
229
            self.assertTrue(osutils.is_inside_any(dirs, fn))
226
230
        for dirs, fn in [(['src'], 'srccontrol'),
227
231
                         (['src'], 'srccontrol/foo')]:
228
232
            self.assertFalse(osutils.is_inside_any(dirs, fn))
234
238
                         (['src/bar.c', 'bla/foo.c'], 'src'),
235
239
                         (['src'], 'src'),
236
240
                         ]:
237
 
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
 
241
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
238
242
 
239
243
        for dirs, fn in [(['src'], 'srccontrol'),
240
244
                         (['srccontrol/foo.c'], 'src'),
242
246
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
243
247
 
244
248
 
 
249
class TestLstat(tests.TestCaseInTempDir):
 
250
 
 
251
    def test_lstat_matches_fstat(self):
 
252
        # On Windows, lstat and fstat don't always agree, primarily in the
 
253
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
 
254
        # custom implementation.
 
255
        if sys.platform == 'win32':
 
256
            # We only have special lstat/fstat if we have the extension.
 
257
            # Without it, we may end up re-reading content when we don't have
 
258
            # to, but otherwise it doesn't effect correctness.
 
259
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
260
        with open('test-file.txt', 'wb') as f:
 
261
            f.write(b'some content\n')
 
262
            f.flush()
 
263
            self.assertEqualStat(osutils.fstat(f.fileno()),
 
264
                                 osutils.lstat('test-file.txt'))
 
265
 
 
266
 
245
267
class TestRmTree(tests.TestCaseInTempDir):
246
268
 
247
269
    def test_rmtree(self):
248
270
        # Check to remove tree with read-only files/dirs
249
271
        os.mkdir('dir')
250
 
        f = file('dir/file', 'w')
251
 
        f.write('spam')
252
 
        f.close()
 
272
        with open('dir/file', 'w') as f:
 
273
            f.write('spam')
253
274
        # would like to also try making the directory readonly, but at the
254
275
        # moment python shutil.rmtree doesn't handle that properly - it would
255
276
        # need to chmod the directory before removing things inside it - deferred
259
280
 
260
281
        osutils.rmtree('dir')
261
282
 
262
 
        self.failIfExists('dir/file')
263
 
        self.failIfExists('dir')
 
283
        self.assertPathDoesNotExist('dir/file')
 
284
        self.assertPathDoesNotExist('dir')
264
285
 
265
286
 
266
287
class TestDeleteAny(tests.TestCaseInTempDir):
279
300
 
280
301
    def test_file_kind(self):
281
302
        self.build_tree(['file', 'dir/'])
282
 
        self.assertEquals('file', osutils.file_kind('file'))
283
 
        self.assertEquals('directory', osutils.file_kind('dir/'))
 
303
        self.assertEqual('file', osutils.file_kind('file'))
 
304
        self.assertEqual('directory', osutils.file_kind('dir/'))
284
305
        if osutils.has_symlinks():
285
306
            os.symlink('symlink', 'symlink')
286
 
            self.assertEquals('symlink', osutils.file_kind('symlink'))
 
307
            self.assertEqual('symlink', osutils.file_kind('symlink'))
287
308
 
288
309
        # TODO: jam 20060529 Test a block device
289
310
        try:
290
311
            os.lstat('/dev/null')
291
 
        except OSError, e:
 
312
        except OSError as e:
292
313
            if e.errno not in (errno.ENOENT,):
293
314
                raise
294
315
        else:
295
 
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
 
316
            self.assertEqual('chardev', osutils.file_kind('/dev/null'))
296
317
 
297
318
        mkfifo = getattr(os, 'mkfifo', None)
298
319
        if mkfifo:
299
320
            mkfifo('fifo')
300
321
            try:
301
 
                self.assertEquals('fifo', osutils.file_kind('fifo'))
 
322
                self.assertEqual('fifo', osutils.file_kind('fifo'))
302
323
            finally:
303
324
                os.remove('fifo')
304
325
 
307
328
            s = socket.socket(AF_UNIX)
308
329
            s.bind('socket')
309
330
            try:
310
 
                self.assertEquals('socket', osutils.file_kind('socket'))
 
331
                self.assertEqual('socket', osutils.file_kind('socket'))
311
332
            finally:
312
333
                os.remove('socket')
313
334
 
332
353
 
333
354
        orig_umask = osutils.get_umask()
334
355
        self.addCleanup(os.umask, orig_umask)
335
 
        os.umask(0222)
336
 
        self.assertEqual(0222, osutils.get_umask())
337
 
        os.umask(0022)
338
 
        self.assertEqual(0022, osutils.get_umask())
339
 
        os.umask(0002)
340
 
        self.assertEqual(0002, osutils.get_umask())
341
 
        os.umask(0027)
342
 
        self.assertEqual(0027, osutils.get_umask())
 
356
        os.umask(0o222)
 
357
        self.assertEqual(0o222, osutils.get_umask())
 
358
        os.umask(0o022)
 
359
        self.assertEqual(0o022, osutils.get_umask())
 
360
        os.umask(0o002)
 
361
        self.assertEqual(0o002, osutils.get_umask())
 
362
        os.umask(0o027)
 
363
        self.assertEqual(0o027, osutils.get_umask())
343
364
 
344
365
 
345
366
class TestDateTime(tests.TestCase):
381
402
        self.assertFormatedDelta('2 seconds in the future', -2)
382
403
 
383
404
    def test_format_date(self):
384
 
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
405
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
385
406
            osutils.format_date, 0, timezone='foo')
386
407
        self.assertIsInstance(osutils.format_date(0), str)
387
 
        self.assertIsInstance(osutils.format_local_date(0), unicode)
 
408
        self.assertIsInstance(osutils.format_local_date(0), text_type)
388
409
        # Testing for the actual value of the local weekday without
389
410
        # duplicating the code from format_date is difficult.
390
411
        # Instead blackbox.test_locale should check for localized
418
439
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
419
440
 
420
441
 
 
442
class TestFdatasync(tests.TestCaseInTempDir):
 
443
 
 
444
    def do_fdatasync(self):
 
445
        f = tempfile.NamedTemporaryFile()
 
446
        osutils.fdatasync(f.fileno())
 
447
        f.close()
 
448
 
 
449
    @staticmethod
 
450
    def raise_eopnotsupp(*args, **kwargs):
 
451
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
 
452
 
 
453
    @staticmethod
 
454
    def raise_enotsup(*args, **kwargs):
 
455
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
 
456
 
 
457
    def test_fdatasync_handles_system_function(self):
 
458
        self.overrideAttr(os, "fdatasync")
 
459
        self.do_fdatasync()
 
460
 
 
461
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
 
462
        self.overrideAttr(os, "fdatasync")
 
463
        self.overrideAttr(os, "fsync")
 
464
        self.do_fdatasync()
 
465
 
 
466
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
 
467
        self.overrideAttr(errno, "EOPNOTSUPP")
 
468
        self.do_fdatasync()
 
469
 
 
470
    def test_fdatasync_catches_ENOTSUP(self):
 
471
        enotsup = getattr(errno, "ENOTSUP", None)
 
472
        if enotsup is None:
 
473
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
 
474
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
 
475
        self.do_fdatasync()
 
476
 
 
477
    def test_fdatasync_catches_EOPNOTSUPP(self):
 
478
        enotsup = getattr(errno, "EOPNOTSUPP", None)
 
479
        if enotsup is None:
 
480
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
 
481
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
 
482
        self.do_fdatasync()
 
483
 
 
484
 
421
485
class TestLinks(tests.TestCaseInTempDir):
422
486
 
423
487
    def test_dereference_path(self):
424
 
        self.requireFeature(tests.SymlinkFeature)
 
488
        self.requireFeature(features.SymlinkFeature)
425
489
        cwd = osutils.realpath('.')
426
490
        os.mkdir('bar')
427
491
        bar_path = osutils.pathjoin(cwd, 'bar')
448
512
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
449
513
 
450
514
    def test_changing_access(self):
451
 
        f = file('file', 'w')
452
 
        f.write('monkey')
453
 
        f.close()
 
515
        with open('file', 'w') as f:
 
516
            f.write('monkey')
454
517
 
455
518
        # Make a file readonly
456
519
        osutils.make_readonly('file')
457
520
        mode = os.lstat('file').st_mode
458
 
        self.assertEqual(mode, mode & 0777555)
 
521
        self.assertEqual(mode, mode & 0o777555)
459
522
 
460
523
        # Make a file writable
461
524
        osutils.make_writable('file')
462
525
        mode = os.lstat('file').st_mode
463
 
        self.assertEqual(mode, mode | 0200)
 
526
        self.assertEqual(mode, mode | 0o200)
464
527
 
465
528
        if osutils.has_symlinks():
466
529
            # should not error when handed a symlink
474
537
 
475
538
class TestCanonicalRelPath(tests.TestCaseInTempDir):
476
539
 
477
 
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
 
540
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
478
541
 
479
542
    def test_canonical_relpath_simple(self):
480
 
        f = file('MixedCaseName', 'w')
 
543
        f = open('MixedCaseName', 'w')
481
544
        f.close()
482
545
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
483
 
        self.failUnlessEqual('work/MixedCaseName', actual)
 
546
        self.assertEqual('work/MixedCaseName', actual)
484
547
 
485
548
    def test_canonical_relpath_missing_tail(self):
486
549
        os.mkdir('MixedCaseParent')
487
550
        actual = osutils.canonical_relpath(self.test_base_dir,
488
551
                                           'mixedcaseparent/nochild')
489
 
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
 
552
        self.assertEqual('work/MixedCaseParent/nochild', actual)
490
553
 
491
554
 
492
555
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
536
599
    """Test pumpfile method."""
537
600
 
538
601
    def setUp(self):
539
 
        tests.TestCase.setUp(self)
 
602
        super(TestPumpFile, self).setUp()
540
603
        # create a test datablock
541
604
        self.block_size = 512
542
 
        pattern = '0123456789ABCDEF'
543
 
        self.test_data = pattern * (3 * self.block_size / len(pattern))
 
605
        pattern = b'0123456789ABCDEF'
 
606
        self.test_data = pattern * (3 * self.block_size // len(pattern))
544
607
        self.test_data_len = len(self.test_data)
545
608
 
546
609
    def test_bracket_block_size(self):
550
613
        self.assertTrue(self.test_data_len > self.block_size)
551
614
 
552
615
        from_file = file_utils.FakeReadFile(self.test_data)
553
 
        to_file = StringIO()
 
616
        to_file = BytesIO()
554
617
 
555
 
        # read (max / 2) bytes and verify read size wasn't affected
556
 
        num_bytes_to_read = self.block_size / 2
 
618
        # read (max // 2) bytes and verify read size wasn't affected
 
619
        num_bytes_to_read = self.block_size // 2
557
620
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
558
621
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
559
622
        self.assertEqual(from_file.get_read_count(), 1)
591
654
 
592
655
        # retrieve data in blocks
593
656
        from_file = file_utils.FakeReadFile(self.test_data)
594
 
        to_file = StringIO()
 
657
        to_file = BytesIO()
595
658
        osutils.pumpfile(from_file, to_file, self.test_data_len,
596
659
                         self.block_size)
597
660
 
615
678
 
616
679
        # retrieve data to EOF
617
680
        from_file = file_utils.FakeReadFile(self.test_data)
618
 
        to_file = StringIO()
 
681
        to_file = BytesIO()
619
682
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
620
683
 
621
684
        # verify read size was equal to the maximum read size
635
698
        with this new version."""
636
699
        # retrieve data using default (old) pumpfile method
637
700
        from_file = file_utils.FakeReadFile(self.test_data)
638
 
        to_file = StringIO()
 
701
        to_file = BytesIO()
639
702
        osutils.pumpfile(from_file, to_file)
640
703
 
641
704
        # report error if the data wasn't equal (we only report the size due
649
712
        activity = []
650
713
        def log_activity(length, direction):
651
714
            activity.append((length, direction))
652
 
        from_file = StringIO(self.test_data)
653
 
        to_file = StringIO()
 
715
        from_file = BytesIO(self.test_data)
 
716
        to_file = BytesIO()
654
717
        osutils.pumpfile(from_file, to_file, buff_size=500,
655
718
                         report_activity=log_activity, direction='read')
656
719
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
657
720
                          (36, 'read')], activity)
658
721
 
659
 
        from_file = StringIO(self.test_data)
660
 
        to_file = StringIO()
 
722
        from_file = BytesIO(self.test_data)
 
723
        to_file = BytesIO()
661
724
        del activity[:]
662
725
        osutils.pumpfile(from_file, to_file, buff_size=500,
663
726
                         report_activity=log_activity, direction='write')
665
728
                          (36, 'write')], activity)
666
729
 
667
730
        # And with a limited amount of data
668
 
        from_file = StringIO(self.test_data)
669
 
        to_file = StringIO()
 
731
        from_file = BytesIO(self.test_data)
 
732
        to_file = BytesIO()
670
733
        del activity[:]
671
734
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
672
735
                         report_activity=log_activity, direction='read')
677
740
class TestPumpStringFile(tests.TestCase):
678
741
 
679
742
    def test_empty(self):
680
 
        output = StringIO()
681
 
        osutils.pump_string_file("", output)
682
 
        self.assertEqual("", output.getvalue())
 
743
        output = BytesIO()
 
744
        osutils.pump_string_file(b"", output)
 
745
        self.assertEqual(b"", output.getvalue())
683
746
 
684
747
    def test_more_than_segment_size(self):
685
 
        output = StringIO()
686
 
        osutils.pump_string_file("123456789", output, 2)
687
 
        self.assertEqual("123456789", output.getvalue())
 
748
        output = BytesIO()
 
749
        osutils.pump_string_file(b"123456789", output, 2)
 
750
        self.assertEqual(b"123456789", output.getvalue())
688
751
 
689
752
    def test_segment_size(self):
690
 
        output = StringIO()
691
 
        osutils.pump_string_file("12", output, 2)
692
 
        self.assertEqual("12", output.getvalue())
 
753
        output = BytesIO()
 
754
        osutils.pump_string_file(b"12", output, 2)
 
755
        self.assertEqual(b"12", output.getvalue())
693
756
 
694
757
    def test_segment_size_multiple(self):
695
 
        output = StringIO()
696
 
        osutils.pump_string_file("1234", output, 2)
697
 
        self.assertEqual("1234", output.getvalue())
 
758
        output = BytesIO()
 
759
        osutils.pump_string_file(b"1234", output, 2)
 
760
        self.assertEqual(b"1234", output.getvalue())
698
761
 
699
762
 
700
763
class TestRelpath(tests.TestCase):
719
782
class TestSafeUnicode(tests.TestCase):
720
783
 
721
784
    def test_from_ascii_string(self):
722
 
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
 
785
        self.assertEqual(u'foobar', osutils.safe_unicode(b'foobar'))
723
786
 
724
787
    def test_from_unicode_string_ascii_contents(self):
725
788
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
728
791
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
729
792
 
730
793
    def test_from_utf8_string(self):
731
 
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
 
794
        self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
732
795
 
733
796
    def test_bad_utf8_string(self):
734
797
        self.assertRaises(errors.BzrBadParameterNotUnicode,
735
798
                          osutils.safe_unicode,
736
 
                          '\xbb\xbb')
 
799
                          b'\xbb\xbb')
737
800
 
738
801
 
739
802
class TestSafeUtf8(tests.TestCase):
740
803
 
741
804
    def test_from_ascii_string(self):
742
 
        f = 'foobar'
743
 
        self.assertEqual('foobar', osutils.safe_utf8(f))
 
805
        f = b'foobar'
 
806
        self.assertEqual(b'foobar', osutils.safe_utf8(f))
744
807
 
745
808
    def test_from_unicode_string_ascii_contents(self):
746
 
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
 
809
        self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
747
810
 
748
811
    def test_from_unicode_string_unicode_contents(self):
749
 
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
812
        self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
750
813
 
751
814
    def test_from_utf8_string(self):
752
 
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
 
815
        self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
753
816
 
754
817
    def test_bad_utf8_string(self):
755
818
        self.assertRaises(errors.BzrBadParameterNotUnicode,
756
 
                          osutils.safe_utf8, '\xbb\xbb')
 
819
                          osutils.safe_utf8, b'\xbb\xbb')
757
820
 
758
821
 
759
822
class TestSafeRevisionId(tests.TestCase):
760
823
 
761
824
    def test_from_ascii_string(self):
762
825
        # this shouldn't give a warning because it's getting an ascii string
763
 
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
826
        self.assertEqual(b'foobar', osutils.safe_revision_id(b'foobar'))
764
827
 
765
828
    def test_from_unicode_string_ascii_contents(self):
766
 
        self.assertEqual('bargam',
767
 
                         osutils.safe_revision_id(u'bargam', warn=False))
768
 
 
769
 
    def test_from_unicode_deprecated(self):
770
 
        self.assertEqual('bargam',
771
 
            self.callDeprecated([osutils._revision_id_warning],
772
 
                                osutils.safe_revision_id, u'bargam'))
 
829
        self.assertRaises(TypeError,
 
830
                          osutils.safe_revision_id, u'bargam')
773
831
 
774
832
    def test_from_unicode_string_unicode_contents(self):
775
 
        self.assertEqual('bargam\xc2\xae',
776
 
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
833
        self.assertRaises(TypeError,
 
834
                         osutils.safe_revision_id, u'bargam\xae')
777
835
 
778
836
    def test_from_utf8_string(self):
779
 
        self.assertEqual('foo\xc2\xae',
780
 
                         osutils.safe_revision_id('foo\xc2\xae'))
 
837
        self.assertEqual(b'foo\xc2\xae',
 
838
                         osutils.safe_revision_id(b'foo\xc2\xae'))
781
839
 
782
840
    def test_none(self):
783
841
        """Currently, None is a valid revision_id"""
787
845
class TestSafeFileId(tests.TestCase):
788
846
 
789
847
    def test_from_ascii_string(self):
790
 
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
848
        self.assertEqual(b'foobar', osutils.safe_file_id(b'foobar'))
791
849
 
792
850
    def test_from_unicode_string_ascii_contents(self):
793
 
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
794
 
 
795
 
    def test_from_unicode_deprecated(self):
796
 
        self.assertEqual('bargam',
797
 
            self.callDeprecated([osutils._file_id_warning],
798
 
                                osutils.safe_file_id, u'bargam'))
 
851
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
799
852
 
800
853
    def test_from_unicode_string_unicode_contents(self):
801
 
        self.assertEqual('bargam\xc2\xae',
802
 
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
854
        self.assertRaises(TypeError,
 
855
                          osutils.safe_file_id, u'bargam\xae')
803
856
 
804
857
    def test_from_utf8_string(self):
805
 
        self.assertEqual('foo\xc2\xae',
806
 
                         osutils.safe_file_id('foo\xc2\xae'))
 
858
        self.assertEqual(b'foo\xc2\xae',
 
859
                         osutils.safe_file_id(b'foo\xc2\xae'))
807
860
 
808
861
    def test_none(self):
809
862
        """Currently, None is a valid revision_id"""
810
863
        self.assertEqual(None, osutils.safe_file_id(None))
811
864
 
812
865
 
 
866
class TestSendAll(tests.TestCase):
 
867
 
 
868
    def test_send_with_disconnected_socket(self):
 
869
        class DisconnectedSocket(object):
 
870
            def __init__(self, err):
 
871
                self.err = err
 
872
            def send(self, content):
 
873
                raise self.err
 
874
            def close(self):
 
875
                pass
 
876
        # All of these should be treated as ConnectionReset
 
877
        errs = []
 
878
        for err_cls in (IOError, socket.error):
 
879
            for errnum in osutils._end_of_stream_errors:
 
880
                errs.append(err_cls(errnum))
 
881
        for err in errs:
 
882
            sock = DisconnectedSocket(err)
 
883
            self.assertRaises(errors.ConnectionReset,
 
884
                osutils.send_all, sock, b'some more content')
 
885
 
 
886
    def test_send_with_no_progress(self):
 
887
        # See https://bugs.launchpad.net/bzr/+bug/1047309
 
888
        # It seems that paramiko can get into a state where it doesn't error,
 
889
        # but it returns 0 bytes sent for requests over and over again.
 
890
        class NoSendingSocket(object):
 
891
            def __init__(self):
 
892
                self.call_count = 0
 
893
            def send(self, bytes):
 
894
                self.call_count += 1
 
895
                if self.call_count > 100:
 
896
                    # Prevent the test suite from hanging
 
897
                    raise RuntimeError('too many calls')
 
898
                return 0
 
899
        sock = NoSendingSocket()
 
900
        self.assertRaises(errors.ConnectionReset,
 
901
                          osutils.send_all, sock, b'content')
 
902
        self.assertEqual(1, sock.call_count)
 
903
 
 
904
 
 
905
class TestPosixFuncs(tests.TestCase):
 
906
    """Test that the posix version of normpath returns an appropriate path
 
907
       when used with 2 leading slashes."""
 
908
 
 
909
    def test_normpath(self):
 
910
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
 
911
        self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
912
        self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
913
 
 
914
 
813
915
class TestWin32Funcs(tests.TestCase):
814
916
    """Test that _win32 versions of os utilities return appropriate paths."""
815
917
 
816
918
    def test_abspath(self):
 
919
        self.requireFeature(features.win32_feature)
817
920
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
818
921
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
819
922
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
832
935
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
833
936
        self.assertEqual('path/to/foo',
834
937
                         osutils._win32_pathjoin('path/to/', 'foo'))
835
 
        self.assertEqual('/foo',
 
938
 
 
939
    def test_pathjoin_late_bugfix(self):
 
940
        if sys.version_info < (2, 7, 6):
 
941
            expected = '/foo'
 
942
        else:
 
943
            expected = 'C:/foo'
 
944
        self.assertEqual(expected,
836
945
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
837
 
        self.assertEqual('/foo',
 
946
        self.assertEqual(expected,
838
947
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
839
948
 
840
949
    def test_normpath(self):
845
954
 
846
955
    def test_getcwd(self):
847
956
        cwd = osutils._win32_getcwd()
848
 
        os_cwd = os.getcwdu()
 
957
        os_cwd = osutils._getcwd()
849
958
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
850
959
        # win32 is inconsistent whether it returns lower or upper case
851
960
        # and even if it was consistent the user might type the other
859
968
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
860
969
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
861
970
 
862
 
    def test_win98_abspath(self):
863
 
        # absolute path
864
 
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
865
 
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
866
 
        # UNC path
867
 
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
868
 
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
869
 
        # relative path
870
 
        cwd = osutils.getcwd().rstrip('/')
871
 
        drive = osutils._nt_splitdrive(cwd)[0]
872
 
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
873
 
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
874
 
        # unicode path
875
 
        u = u'\u1234'
876
 
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
877
 
 
878
971
 
879
972
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
880
973
    """Test win32 functions that create files."""
881
974
 
882
975
    def test_getcwd(self):
883
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
976
        self.requireFeature(features.UnicodeFilenameFeature)
884
977
        os.mkdir(u'mu-\xb5')
885
978
        os.chdir(u'mu-\xb5')
886
979
        # TODO: jam 20060427 This will probably fail on Mac OSX because
892
985
    def test_minimum_path_selection(self):
893
986
        self.assertEqual(set(),
894
987
            osutils.minimum_path_selection([]))
895
 
        self.assertEqual(set(['a']),
 
988
        self.assertEqual({'a'},
896
989
            osutils.minimum_path_selection(['a']))
897
 
        self.assertEqual(set(['a', 'b']),
 
990
        self.assertEqual({'a', 'b'},
898
991
            osutils.minimum_path_selection(['a', 'b']))
899
 
        self.assertEqual(set(['a/', 'b']),
 
992
        self.assertEqual({'a/', 'b'},
900
993
            osutils.minimum_path_selection(['a/', 'b']))
901
 
        self.assertEqual(set(['a/', 'b']),
 
994
        self.assertEqual({'a/', 'b'},
902
995
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
903
 
        self.assertEqual(set(['a-b', 'a', 'a0b']),
 
996
        self.assertEqual({'a-b', 'a', 'a0b'},
904
997
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
905
998
 
906
999
    def test_mkdtemp(self):
908
1001
        self.assertFalse('\\' in tmpdir)
909
1002
 
910
1003
    def test_rename(self):
911
 
        a = open('a', 'wb')
912
 
        a.write('foo\n')
913
 
        a.close()
914
 
        b = open('b', 'wb')
915
 
        b.write('baz\n')
916
 
        b.close()
 
1004
        with open('a', 'wb') as a:
 
1005
            a.write(b'foo\n')
 
1006
        with open('b', 'wb') as b:
 
1007
            b.write(b'baz\n')
917
1008
 
918
1009
        osutils._win32_rename('b', 'a')
919
 
        self.failUnlessExists('a')
920
 
        self.failIfExists('b')
921
 
        self.assertFileEqual('baz\n', 'a')
 
1010
        self.assertPathExists('a')
 
1011
        self.assertPathDoesNotExist('b')
 
1012
        self.assertFileEqual(b'baz\n', 'a')
922
1013
 
923
1014
    def test_rename_missing_file(self):
924
 
        a = open('a', 'wb')
925
 
        a.write('foo\n')
926
 
        a.close()
 
1015
        with open('a', 'wb') as a:
 
1016
            a.write(b'foo\n')
927
1017
 
928
1018
        try:
929
1019
            osutils._win32_rename('b', 'a')
930
 
        except (IOError, OSError), e:
 
1020
        except (IOError, OSError) as e:
931
1021
            self.assertEqual(errno.ENOENT, e.errno)
932
 
        self.assertFileEqual('foo\n', 'a')
 
1022
        self.assertFileEqual(b'foo\n', 'a')
933
1023
 
934
1024
    def test_rename_missing_dir(self):
935
1025
        os.mkdir('a')
936
1026
        try:
937
1027
            osutils._win32_rename('b', 'a')
938
 
        except (IOError, OSError), e:
 
1028
        except (IOError, OSError) as e:
939
1029
            self.assertEqual(errno.ENOENT, e.errno)
940
1030
 
941
1031
    def test_rename_current_dir(self):
947
1037
        # doesn't exist.
948
1038
        try:
949
1039
            osutils._win32_rename('b', '.')
950
 
        except (IOError, OSError), e:
 
1040
        except (IOError, OSError) as e:
951
1041
            self.assertEqual(errno.ENOENT, e.errno)
952
1042
 
953
1043
    def test_splitpath(self):
976
1066
    """Test mac special functions that require directories."""
977
1067
 
978
1068
    def test_getcwd(self):
979
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1069
        self.requireFeature(features.UnicodeFilenameFeature)
980
1070
        os.mkdir(u'B\xe5gfors')
981
1071
        os.chdir(u'B\xe5gfors')
982
1072
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
983
1073
 
984
1074
    def test_getcwd_nonnorm(self):
985
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1075
        self.requireFeature(features.UnicodeFilenameFeature)
986
1076
        # Test that _mac_getcwd() will normalize this path
987
1077
        os.mkdir(u'Ba\u030agfors')
988
1078
        os.chdir(u'Ba\u030agfors')
992
1082
class TestChunksToLines(tests.TestCase):
993
1083
 
994
1084
    def test_smoketest(self):
995
 
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
996
 
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
997
 
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
998
 
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
 
1085
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
 
1086
                         osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
 
1087
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
 
1088
                         osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
999
1089
 
1000
1090
    def test_osutils_binding(self):
1001
 
        from bzrlib.tests import test__chunks_to_lines
 
1091
        from . import test__chunks_to_lines
1002
1092
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
1003
 
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
 
1093
            from .._chunks_to_lines_pyx import chunks_to_lines
1004
1094
        else:
1005
 
            from bzrlib._chunks_to_lines_py import chunks_to_lines
 
1095
            from .._chunks_to_lines_py import chunks_to_lines
1006
1096
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1007
1097
 
1008
1098
 
1015
1105
                         osutils.split_lines(u'foo\nbar\xae\n'))
1016
1106
 
1017
1107
    def test_split_with_carriage_returns(self):
1018
 
        self.assertEqual(['foo\rbar\n'],
1019
 
                         osutils.split_lines('foo\rbar\n'))
 
1108
        self.assertEqual([b'foo\rbar\n'],
 
1109
                         osutils.split_lines(b'foo\rbar\n'))
1020
1110
 
1021
1111
 
1022
1112
class TestWalkDirs(tests.TestCaseInTempDir):
1071
1161
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
1072
1162
 
1073
1163
    def test_walkdirs_os_error(self):
1074
 
        # <https://bugs.edge.launchpad.net/bzr/+bug/338653>
 
1164
        # <https://bugs.launchpad.net/bzr/+bug/338653>
1075
1165
        # Pyrex readdir didn't raise useful messages if it had an error
1076
1166
        # reading the directory
1077
1167
        if sys.platform == 'win32':
1078
1168
            raise tests.TestNotApplicable(
1079
1169
                "readdir IOError not tested on win32")
 
1170
        self.requireFeature(features.not_running_as_root)
1080
1171
        os.mkdir("test-unreadable")
1081
1172
        os.chmod("test-unreadable", 0000)
1082
1173
        # must chmod it back so that it can be removed
1083
 
        self.addCleanup(os.chmod, "test-unreadable", 0700)
 
1174
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
1084
1175
        # The error is not raised until the generator is actually evaluated.
1085
1176
        # (It would be ok if it happened earlier but at the moment it
1086
1177
        # doesn't.)
1087
1178
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1088
 
        self.assertEquals('./test-unreadable', e.filename)
1089
 
        self.assertEquals(errno.EACCES, e.errno)
 
1179
        self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
 
1180
        self.assertEqual(errno.EACCES, e.errno)
1090
1181
        # Ensure the message contains the file name
1091
 
        self.assertContainsRe(str(e), "\./test-unreadable")
 
1182
        self.assertContainsRe(str(e), "\\./test-unreadable")
 
1183
 
 
1184
    def test_walkdirs_encoding_error(self):
 
1185
        # <https://bugs.launchpad.net/bzr/+bug/488519>
 
1186
        # walkdirs didn't raise a useful message when the filenames
 
1187
        # are not using the filesystem's encoding
 
1188
 
 
1189
        # require a bytestring based filesystem
 
1190
        self.requireFeature(features.ByteStringNamedFilesystem)
 
1191
 
 
1192
        tree = [
 
1193
            '.bzr',
 
1194
            '0file',
 
1195
            '1dir/',
 
1196
            '1dir/0file',
 
1197
            '1dir/1dir/',
 
1198
            '1file'
 
1199
            ]
 
1200
 
 
1201
        self.build_tree(tree)
 
1202
 
 
1203
        # rename the 1file to a latin-1 filename
 
1204
        os.rename(b"./1file", b"\xe8file")
 
1205
        if b"\xe8file" not in os.listdir("."):
 
1206
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
 
1207
 
 
1208
        self._save_platform_info()
 
1209
        osutils._fs_enc = 'UTF-8'
 
1210
 
 
1211
        # this should raise on error
 
1212
        def attempt():
 
1213
            for dirdetail, dirblock in osutils.walkdirs(b'.'):
 
1214
                pass
 
1215
 
 
1216
        self.assertRaises(errors.BadFilenameEncoding, attempt)
1092
1217
 
1093
1218
    def test__walkdirs_utf8(self):
1094
1219
        tree = [
1119
1244
            ]
1120
1245
        result = []
1121
1246
        found_bzrdir = False
1122
 
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1123
 
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
1247
        for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
 
1248
            if len(dirblock) and dirblock[0][1] == b'.bzr':
1124
1249
                # this tests the filtering of selected paths
1125
1250
                found_bzrdir = True
1126
1251
                del dirblock[0]
 
1252
            dirdetail = (dirdetail[0].decode('utf-8'), osutils.safe_unicode(dirdetail[1]))
 
1253
            dirblock = [
 
1254
                    (entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
 
1255
                    for entry in dirblock]
1127
1256
            result.append((dirdetail, dirblock))
1128
1257
 
1129
1258
        self.assertTrue(found_bzrdir)
1145
1274
            dirblock[:] = new_dirblock
1146
1275
 
1147
1276
    def _save_platform_info(self):
1148
 
        self.overrideAttr(win32utils, 'winver')
1149
1277
        self.overrideAttr(osutils, '_fs_enc')
1150
1278
        self.overrideAttr(osutils, '_selected_dir_reader')
1151
1279
 
1152
 
    def assertDirReaderIs(self, expected):
 
1280
    def assertDirReaderIs(self, expected, top):
1153
1281
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
1154
1282
        # Force it to redetect
1155
1283
        osutils._selected_dir_reader = None
1156
1284
        # Nothing to list, but should still trigger the selection logic
1157
 
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
 
1285
        self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
1158
1286
        self.assertIsInstance(osutils._selected_dir_reader, expected)
1159
1287
 
1160
1288
    def test_force_walkdirs_utf8_fs_utf8(self):
1161
1289
        self.requireFeature(UTF8DirReaderFeature)
1162
1290
        self._save_platform_info()
1163
 
        win32utils.winver = None # Avoid the win32 detection code
1164
 
        osutils._fs_enc = 'UTF-8'
1165
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1291
        osutils._fs_enc = 'utf-8'
 
1292
        self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
1166
1293
 
1167
1294
    def test_force_walkdirs_utf8_fs_ascii(self):
1168
1295
        self.requireFeature(UTF8DirReaderFeature)
1169
1296
        self._save_platform_info()
1170
 
        win32utils.winver = None # Avoid the win32 detection code
1171
 
        osutils._fs_enc = 'US-ASCII'
1172
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1173
 
 
1174
 
    def test_force_walkdirs_utf8_fs_ANSI(self):
1175
 
        self.requireFeature(UTF8DirReaderFeature)
1176
 
        self._save_platform_info()
1177
 
        win32utils.winver = None # Avoid the win32 detection code
1178
 
        osutils._fs_enc = 'ANSI_X3.4-1968'
1179
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1297
        osutils._fs_enc = 'ascii'
 
1298
        self.assertDirReaderIs(
 
1299
            UTF8DirReaderFeature.module.UTF8DirReader, b".")
1180
1300
 
1181
1301
    def test_force_walkdirs_utf8_fs_latin1(self):
1182
1302
        self._save_platform_info()
1183
 
        win32utils.winver = None # Avoid the win32 detection code
1184
 
        osutils._fs_enc = 'latin1'
1185
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
1303
        osutils._fs_enc = 'iso-8859-1'
 
1304
        self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
1186
1305
 
1187
1306
    def test_force_walkdirs_utf8_nt(self):
1188
1307
        # Disabled because the thunk of the whole walkdirs api is disabled.
1189
1308
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1190
1309
        self._save_platform_info()
1191
 
        win32utils.winver = 'Windows NT'
1192
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
1193
 
        self.assertDirReaderIs(Win32ReadDir)
1194
 
 
1195
 
    def test_force_walkdirs_utf8_98(self):
1196
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1197
 
        self._save_platform_info()
1198
 
        win32utils.winver = 'Windows 98'
1199
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
1310
        from .._walkdirs_win32 import Win32ReadDir
 
1311
        self.assertDirReaderIs(Win32ReadDir, ".")
1200
1312
 
1201
1313
    def test_unicode_walkdirs(self):
1202
1314
        """Walkdirs should always return unicode paths."""
1203
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1315
        self.requireFeature(features.UnicodeFilenameFeature)
1204
1316
        name0 = u'0file-\xb6'
1205
1317
        name1 = u'1dir-\u062c\u0648'
1206
1318
        name2 = u'2file-\u0633'
1243
1355
 
1244
1356
        The abspath portion might be in unicode or utf-8
1245
1357
        """
1246
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1358
        self.requireFeature(features.UnicodeFilenameFeature)
1247
1359
        name0 = u'0file-\xb6'
1248
1360
        name1 = u'1dir-\u062c\u0648'
1249
1361
        name2 = u'2file-\u0633'
1260
1372
        name2 = name2.encode('utf8')
1261
1373
 
1262
1374
        expected_dirblocks = [
1263
 
                (('', '.'),
1264
 
                 [(name0, name0, 'file', './' + name0),
1265
 
                  (name1, name1, 'directory', './' + name1),
1266
 
                  (name2, name2, 'file', './' + name2),
1267
 
                 ]
1268
 
                ),
1269
 
                ((name1, './' + name1),
1270
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1
1271
 
                                                        + '/' + name0),
1272
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1
1273
 
                                                            + '/' + name1),
1274
 
                 ]
1275
 
                ),
1276
 
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1375
                ((b'', b'.'),
 
1376
                 [(name0, name0, 'file', b'./' + name0),
 
1377
                  (name1, name1, 'directory', b'./' + name1),
 
1378
                  (name2, name2, 'file', b'./' + name2),
 
1379
                 ]
 
1380
                ),
 
1381
                ((name1, b'./' + name1),
 
1382
                 [(name1 + b'/' + name0, name0, 'file', b'./' + name1
 
1383
                                                        + b'/' + name0),
 
1384
                  (name1 + b'/' + name1, name1, 'directory', b'./' + name1
 
1385
                                                            + b'/' + name1),
 
1386
                 ]
 
1387
                ),
 
1388
                ((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
1277
1389
                 [
1278
1390
                 ]
1279
1391
                ),
1282
1394
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1283
1395
        # all abspaths are Unicode, and encode them back into utf8.
1284
1396
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1285
 
            self.assertIsInstance(dirdetail[0], str)
1286
 
            if isinstance(dirdetail[1], unicode):
 
1397
            self.assertIsInstance(dirdetail[0], bytes)
 
1398
            if isinstance(dirdetail[1], text_type):
1287
1399
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1288
1400
                dirblock = [list(info) for info in dirblock]
1289
1401
                for info in dirblock:
1290
 
                    self.assertIsInstance(info[4], unicode)
 
1402
                    self.assertIsInstance(info[4], text_type)
1291
1403
                    info[4] = info[4].encode('utf8')
1292
1404
            new_dirblock = []
1293
1405
            for info in dirblock:
1294
 
                self.assertIsInstance(info[0], str)
1295
 
                self.assertIsInstance(info[1], str)
1296
 
                self.assertIsInstance(info[4], str)
 
1406
                self.assertIsInstance(info[0], bytes)
 
1407
                self.assertIsInstance(info[1], bytes)
 
1408
                self.assertIsInstance(info[4], bytes)
1297
1409
                # Remove the stat information
1298
1410
                new_dirblock.append((info[0], info[1], info[2], info[4]))
1299
1411
            result.append((dirdetail, new_dirblock))
1304
1416
 
1305
1417
        The abspath portion should be in unicode
1306
1418
        """
1307
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1419
        self.requireFeature(features.UnicodeFilenameFeature)
1308
1420
        # Use the unicode reader. TODO: split into driver-and-driven unit
1309
1421
        # tests.
1310
1422
        self._save_platform_info()
1327
1439
        # All of the abspaths should be in unicode, all of the relative paths
1328
1440
        # should be in utf8
1329
1441
        expected_dirblocks = [
1330
 
                (('', '.'),
 
1442
                ((b'', '.'),
1331
1443
                 [(name0, name0, 'file', './' + name0u),
1332
1444
                  (name1, name1, 'directory', './' + name1u),
1333
1445
                  (name2, name2, 'file', './' + name2u),
1334
1446
                 ]
1335
1447
                ),
1336
1448
                ((name1, './' + name1u),
1337
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1449
                 [(name1 + b'/' + name0, name0, 'file', './' + name1u
1338
1450
                                                        + '/' + name0u),
1339
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1451
                  (name1 + b'/' + name1, name1, 'directory', './' + name1u
1340
1452
                                                            + '/' + name1u),
1341
1453
                 ]
1342
1454
                ),
1343
 
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1455
                ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1344
1456
                 [
1345
1457
                 ]
1346
1458
                ),
1351
1463
 
1352
1464
    def test__walkdirs_utf8_win32readdir(self):
1353
1465
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1354
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1355
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1466
        self.requireFeature(features.UnicodeFilenameFeature)
 
1467
        from .._walkdirs_win32 import Win32ReadDir
1356
1468
        self._save_platform_info()
1357
1469
        osutils._selected_dir_reader = Win32ReadDir()
1358
1470
        name0u = u'0file-\xb6'
1408
1520
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1409
1521
        """make sure our Stat values are valid"""
1410
1522
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1411
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1412
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1523
        self.requireFeature(features.UnicodeFilenameFeature)
 
1524
        from .._walkdirs_win32 import Win32ReadDir
1413
1525
        name0u = u'0file-\xb6'
1414
1526
        name0 = name0u.encode('utf8')
1415
1527
        self.build_tree([name0u])
1416
1528
        # I hate to sleep() here, but I'm trying to make the ctime different
1417
1529
        # from the mtime
1418
1530
        time.sleep(2)
1419
 
        f = open(name0u, 'ab')
1420
 
        try:
1421
 
            f.write('just a small update')
1422
 
        finally:
1423
 
            f.close()
 
1531
        with open(name0u, 'ab') as f:
 
1532
            f.write(b'just a small update')
1424
1533
 
1425
1534
        result = Win32ReadDir().read_dir('', u'.')
1426
1535
        entry = result[0]
1432
1541
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1433
1542
        """make sure our Stat values are valid"""
1434
1543
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1435
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1436
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1544
        self.requireFeature(features.UnicodeFilenameFeature)
 
1545
        from .._walkdirs_win32 import Win32ReadDir
1437
1546
        name0u = u'0dir-\u062c\u0648'
1438
1547
        name0 = name0u.encode('utf8')
1439
1548
        self.build_tree([name0u + '/'])
1519
1628
        # using the comparison routine shoudl work too:
1520
1629
        self.assertEqual(
1521
1630
            dir_sorted_paths,
1522
 
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
 
1631
            sorted(original_paths, key=osutils.path_prefix_key))
1523
1632
 
1524
1633
 
1525
1634
class TestCopyTree(tests.TestCaseInTempDir):
1538
1647
        self.assertEqual(['c'], os.listdir('target/b'))
1539
1648
 
1540
1649
    def test_copy_tree_symlinks(self):
1541
 
        self.requireFeature(tests.SymlinkFeature)
 
1650
        self.requireFeature(features.SymlinkFeature)
1542
1651
        self.build_tree(['source/'])
1543
1652
        os.symlink('a/generic/path', 'source/lnk')
1544
1653
        osutils.copy_tree('source', 'target')
1554
1663
            processed_files.append(('d', from_path, to_path))
1555
1664
        def link_handler(from_path, to_path):
1556
1665
            processed_links.append((from_path, to_path))
1557
 
        handlers = {'file':file_handler,
1558
 
                    'directory':dir_handler,
1559
 
                    'symlink':link_handler,
 
1666
        handlers = {'file': file_handler,
 
1667
                    'directory': dir_handler,
 
1668
                    'symlink': link_handler,
1560
1669
                   }
1561
1670
 
1562
1671
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1569
1678
                          ('d', 'source/b', 'target/b'),
1570
1679
                          ('f', 'source/b/c', 'target/b/c'),
1571
1680
                         ], processed_files)
1572
 
        self.failIfExists('target')
 
1681
        self.assertPathDoesNotExist('target')
1573
1682
        if osutils.has_symlinks():
1574
1683
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1575
1684
 
1580
1689
    def setUp(self):
1581
1690
        super(TestSetUnsetEnv, self).setUp()
1582
1691
 
1583
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
 
1692
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
1584
1693
                         'Environment was not cleaned up properly.'
1585
 
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
 
1694
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
1586
1695
        def cleanup():
1587
 
            if 'BZR_TEST_ENV_VAR' in os.environ:
1588
 
                del os.environ['BZR_TEST_ENV_VAR']
 
1696
            if 'BRZ_TEST_ENV_VAR' in os.environ:
 
1697
                del os.environ['BRZ_TEST_ENV_VAR']
1589
1698
        self.addCleanup(cleanup)
1590
1699
 
1591
1700
    def test_set(self):
1592
1701
        """Test that we can set an env variable"""
1593
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1702
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1594
1703
        self.assertEqual(None, old)
1595
 
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
 
1704
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
1596
1705
 
1597
1706
    def test_double_set(self):
1598
1707
        """Test that we get the old value out"""
1599
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1600
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
 
1708
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1709
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
1601
1710
        self.assertEqual('foo', old)
1602
 
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
 
1711
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
1603
1712
 
1604
1713
    def test_unicode(self):
1605
1714
        """Environment can only contain plain strings
1612
1721
                'Cannot find a unicode character that works in encoding %s'
1613
1722
                % (osutils.get_user_encoding(),))
1614
1723
 
1615
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1616
 
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
 
1724
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
 
1725
        if PY3:
 
1726
            self.assertEqual(uni_val, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1727
        else:
 
1728
            self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1617
1729
 
1618
1730
    def test_unset(self):
1619
1731
        """Test that passing None will remove the env var"""
1620
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1621
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
 
1732
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1733
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
1622
1734
        self.assertEqual('foo', old)
1623
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1624
 
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1735
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1736
        self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
1625
1737
 
1626
1738
 
1627
1739
class TestSizeShaFile(tests.TestCaseInTempDir):
1628
1740
 
1629
1741
    def test_sha_empty(self):
1630
 
        self.build_tree_contents([('foo', '')])
1631
 
        expected_sha = osutils.sha_string('')
 
1742
        self.build_tree_contents([('foo', b'')])
 
1743
        expected_sha = osutils.sha_string(b'')
1632
1744
        f = open('foo')
1633
1745
        self.addCleanup(f.close)
1634
1746
        size, sha = osutils.size_sha_file(f)
1636
1748
        self.assertEqual(expected_sha, sha)
1637
1749
 
1638
1750
    def test_sha_mixed_endings(self):
1639
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1751
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
1640
1752
        self.build_tree_contents([('foo', text)])
1641
1753
        expected_sha = osutils.sha_string(text)
1642
1754
        f = open('foo', 'rb')
1649
1761
class TestShaFileByName(tests.TestCaseInTempDir):
1650
1762
 
1651
1763
    def test_sha_empty(self):
1652
 
        self.build_tree_contents([('foo', '')])
1653
 
        expected_sha = osutils.sha_string('')
 
1764
        self.build_tree_contents([('foo', b'')])
 
1765
        expected_sha = osutils.sha_string(b'')
1654
1766
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1655
1767
 
1656
1768
    def test_sha_mixed_endings(self):
1657
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1769
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
1658
1770
        self.build_tree_contents([('foo', text)])
1659
1771
        expected_sha = osutils.sha_string(text)
1660
1772
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1663
1775
class TestResourceLoading(tests.TestCaseInTempDir):
1664
1776
 
1665
1777
    def test_resource_string(self):
1666
 
        # test resource in bzrlib
1667
 
        text = osutils.resource_string('bzrlib', 'debug.py')
 
1778
        # test resource in breezy
 
1779
        text = osutils.resource_string('breezy', 'debug.py')
1668
1780
        self.assertContainsRe(text, "debug_flags = set()")
1669
 
        # test resource under bzrlib
1670
 
        text = osutils.resource_string('bzrlib.ui', 'text.py')
 
1781
        # test resource under breezy
 
1782
        text = osutils.resource_string('breezy.ui', 'text.py')
1671
1783
        self.assertContainsRe(text, "class TextUIFactory")
1672
1784
        # test unsupported package
1673
1785
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1674
1786
            'yyy.xx')
1675
1787
        # test unknown resource
1676
 
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1677
 
 
1678
 
 
1679
 
class TestReCompile(tests.TestCase):
1680
 
 
1681
 
    def test_re_compile_checked(self):
1682
 
        r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1683
 
        self.assertTrue(r.match('aaaa'))
1684
 
        self.assertTrue(r.match('aAaA'))
1685
 
 
1686
 
    def test_re_compile_checked_error(self):
1687
 
        # like https://bugs.launchpad.net/bzr/+bug/251352
1688
 
        err = self.assertRaises(
1689
 
            errors.BzrCommandError,
1690
 
            osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1691
 
        self.assertEqual(
1692
 
            "Invalid regular expression in test case: '*': "
1693
 
            "nothing to repeat",
1694
 
            str(err))
 
1788
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
1695
1789
 
1696
1790
 
1697
1791
class TestDirReader(tests.TestCaseInTempDir):
1698
1792
 
 
1793
    scenarios = dir_reader_scenarios()
 
1794
 
1699
1795
    # Set by load_tests
1700
1796
    _dir_reader_class = None
1701
1797
    _native_to_unicode = None
1702
1798
 
1703
1799
    def setUp(self):
1704
 
        tests.TestCaseInTempDir.setUp(self)
 
1800
        super(TestDirReader, self).setUp()
1705
1801
        self.overrideAttr(osutils,
1706
1802
                          '_selected_dir_reader', self._dir_reader_class())
1707
1803
 
1714
1810
            '2file'
1715
1811
            ]
1716
1812
        expected_dirblocks = [
1717
 
                (('', '.'),
1718
 
                 [('0file', '0file', 'file'),
1719
 
                  ('1dir', '1dir', 'directory'),
1720
 
                  ('2file', '2file', 'file'),
1721
 
                 ]
1722
 
                ),
1723
 
                (('1dir', './1dir'),
1724
 
                 [('1dir/0file', '0file', 'file'),
1725
 
                  ('1dir/1dir', '1dir', 'directory'),
1726
 
                 ]
1727
 
                ),
1728
 
                (('1dir/1dir', './1dir/1dir'),
 
1813
                ((b'', '.'),
 
1814
                 [(b'0file', b'0file', 'file', './0file'),
 
1815
                  (b'1dir', b'1dir', 'directory', './1dir'),
 
1816
                  (b'2file', b'2file', 'file', './2file'),
 
1817
                 ]
 
1818
                ),
 
1819
                ((b'1dir', './1dir'),
 
1820
                 [(b'1dir/0file', b'0file', 'file', './1dir/0file'),
 
1821
                  (b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
 
1822
                 ]
 
1823
                ),
 
1824
                ((b'1dir/1dir', './1dir/1dir'),
1729
1825
                 [
1730
1826
                 ]
1731
1827
                ),
1738
1834
        result = list(osutils._walkdirs_utf8('.'))
1739
1835
        # Filter out stat and abspath
1740
1836
        self.assertEqual(expected_dirblocks,
1741
 
                         [(dirinfo, [line[0:3] for line in block])
1742
 
                          for dirinfo, block in result])
 
1837
                self._filter_out(result))
1743
1838
 
1744
1839
    def test_walk_sub_dir(self):
1745
1840
        tree, expected_dirblocks = self._get_ascii_tree()
1746
1841
        self.build_tree(tree)
1747
1842
        # you can search a subdir only, with a supplied prefix.
1748
 
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
 
1843
        result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
1749
1844
        # Filter out stat and abspath
1750
1845
        self.assertEqual(expected_dirblocks[1:],
1751
 
                         [(dirinfo, [line[0:3] for line in block])
1752
 
                          for dirinfo, block in result])
 
1846
                self._filter_out(result))
1753
1847
 
1754
1848
    def _get_unicode_tree(self):
1755
1849
        name0u = u'0file-\xb6'
1766
1860
        name1 = name1u.encode('UTF-8')
1767
1861
        name2 = name2u.encode('UTF-8')
1768
1862
        expected_dirblocks = [
1769
 
                (('', '.'),
 
1863
                ((b'', '.'),
1770
1864
                 [(name0, name0, 'file', './' + name0u),
1771
1865
                  (name1, name1, 'directory', './' + name1u),
1772
1866
                  (name2, name2, 'file', './' + name2u),
1773
1867
                 ]
1774
1868
                ),
1775
1869
                ((name1, './' + name1u),
1776
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1870
                 [(name1 + b'/' + name0, name0, 'file', './' + name1u
1777
1871
                                                        + '/' + name0u),
1778
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1872
                  (name1 + b'/' + name1, name1, 'directory', './' + name1u
1779
1873
                                                            + '/' + name1u),
1780
1874
                 ]
1781
1875
                ),
1782
 
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1876
                ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1783
1877
                 [
1784
1878
                 ]
1785
1879
                ),
1801
1895
        return filtered_dirblocks
1802
1896
 
1803
1897
    def test_walk_unicode_tree(self):
1804
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1898
        self.requireFeature(features.UnicodeFilenameFeature)
1805
1899
        tree, expected_dirblocks = self._get_unicode_tree()
1806
1900
        self.build_tree(tree)
1807
1901
        result = list(osutils._walkdirs_utf8('.'))
1808
1902
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1809
1903
 
1810
1904
    def test_symlink(self):
1811
 
        self.requireFeature(tests.SymlinkFeature)
1812
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1905
        self.requireFeature(features.SymlinkFeature)
 
1906
        self.requireFeature(features.UnicodeFilenameFeature)
1813
1907
        target = u'target\N{Euro Sign}'
1814
1908
        link_name = u'l\N{Euro Sign}nk'
1815
1909
        os.symlink(target, link_name)
1816
1910
        target_utf8 = target.encode('UTF-8')
1817
1911
        link_name_utf8 = link_name.encode('UTF-8')
1818
1912
        expected_dirblocks = [
1819
 
                (('', '.'),
 
1913
                ((b'', '.'),
1820
1914
                 [(link_name_utf8, link_name_utf8,
1821
1915
                   'symlink', './' + link_name),],
1822
1916
                 )]
1833
1927
    But prior python versions failed to properly encode the passed unicode
1834
1928
    string.
1835
1929
    """
1836
 
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
 
1930
    _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
1837
1931
 
1838
1932
    def setUp(self):
1839
1933
        super(tests.TestCaseInTempDir, self).setUp()
1842
1936
        os.symlink(self.target, self.link)
1843
1937
 
1844
1938
    def test_os_readlink_link_encoding(self):
1845
 
        if sys.version_info < (2, 6):
1846
 
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1847
 
        else:
1848
 
            self.assertEquals(self.target,  os.readlink(self.link))
 
1939
        self.assertEqual(self.target,  os.readlink(self.link))
1849
1940
 
1850
1941
    def test_os_readlink_link_decoding(self):
1851
 
        self.assertEquals(self.target.encode(osutils._fs_enc),
 
1942
        self.assertEqual(self.target.encode(osutils._fs_enc),
1852
1943
                          os.readlink(self.link.encode(osutils._fs_enc)))
1853
1944
 
1854
1945
 
1863
1954
        self.assertIsInstance(concurrency, int)
1864
1955
 
1865
1956
    def test_local_concurrency_environment_variable(self):
1866
 
        os.environ['BZR_CONCURRENCY'] = '2'
 
1957
        self.overrideEnv('BRZ_CONCURRENCY', '2')
1867
1958
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1868
 
        os.environ['BZR_CONCURRENCY'] = '3'
 
1959
        self.overrideEnv('BRZ_CONCURRENCY', '3')
1869
1960
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1870
 
        os.environ['BZR_CONCURRENCY'] = 'foo'
 
1961
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
1871
1962
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1872
1963
 
1873
1964
    def test_option_concurrency(self):
1874
 
        os.environ['BZR_CONCURRENCY'] = '1'
 
1965
        self.overrideEnv('BRZ_CONCURRENCY', '1')
1875
1966
        self.run_bzr('rocks --concurrency 42')
1876
 
        # Command line overrides envrionment variable
1877
 
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1878
 
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
 
1967
        # Command line overrides environment variable
 
1968
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
 
1969
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
1879
1970
 
1880
1971
 
1881
1972
class TestFailedToLoadExtension(tests.TestCase):
1882
1973
 
1883
1974
    def _try_loading(self):
1884
1975
        try:
1885
 
            import bzrlib._fictional_extension_py
1886
 
        except ImportError, e:
 
1976
            import breezy._fictional_extension_py
 
1977
        except ImportError as e:
1887
1978
            osutils.failed_to_load_extension(e)
1888
1979
            return True
1889
1980
 
1894
1985
    def test_failure_to_load(self):
1895
1986
        self._try_loading()
1896
1987
        self.assertLength(1, osutils._extension_load_failures)
1897
 
        self.assertEquals(osutils._extension_load_failures[0],
1898
 
            "No module named _fictional_extension_py")
 
1988
        if PY3:
 
1989
            self.assertEqual(osutils._extension_load_failures[0],
 
1990
                "No module named 'breezy._fictional_extension_py'")
 
1991
        else:
 
1992
            self.assertEqual(osutils._extension_load_failures[0],
 
1993
                "No module named _fictional_extension_py")
1899
1994
 
1900
1995
    def test_report_extension_load_failures_no_warning(self):
1901
1996
        self.assertTrue(self._try_loading())
1904
1999
        self.assertLength(0, warnings)
1905
2000
 
1906
2001
    def test_report_extension_load_failures_message(self):
1907
 
        log = StringIO()
 
2002
        log = BytesIO()
1908
2003
        trace.push_log_file(log)
1909
2004
        self.assertTrue(self._try_loading())
1910
2005
        osutils.report_extension_load_failures()
1911
2006
        self.assertContainsRe(
1912
2007
            log.getvalue(),
1913
 
            r"bzr: warning: some compiled extensions could not be loaded; "
1914
 
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
 
2008
            br"brz: warning: some compiled extensions could not be loaded; "
 
2009
            b"see ``brz help missing-extensions``\n"
1915
2010
            )
1916
2011
 
1917
2012
 
1918
2013
class TestTerminalWidth(tests.TestCase):
1919
2014
 
 
2015
    def setUp(self):
 
2016
        super(TestTerminalWidth, self).setUp()
 
2017
        self._orig_terminal_size_state = osutils._terminal_size_state
 
2018
        self._orig_first_terminal_size = osutils._first_terminal_size
 
2019
        self.addCleanup(self.restore_osutils_globals)
 
2020
        osutils._terminal_size_state = 'no_data'
 
2021
        osutils._first_terminal_size = None
 
2022
 
 
2023
    def restore_osutils_globals(self):
 
2024
        osutils._terminal_size_state = self._orig_terminal_size_state
 
2025
        osutils._first_terminal_size = self._orig_first_terminal_size
 
2026
 
1920
2027
    def replace_stdout(self, new):
1921
2028
        self.overrideAttr(sys, 'stdout', new)
1922
2029
 
1934
2041
    def test_default_values(self):
1935
2042
        self.assertEqual(80, osutils.default_terminal_width)
1936
2043
 
1937
 
    def test_defaults_to_BZR_COLUMNS(self):
1938
 
        # BZR_COLUMNS is set by the test framework
1939
 
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1940
 
        os.environ['BZR_COLUMNS'] = '12'
 
2044
    def test_defaults_to_BRZ_COLUMNS(self):
 
2045
        # BRZ_COLUMNS is set by the test framework
 
2046
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
 
2047
        self.overrideEnv('BRZ_COLUMNS', '12')
1941
2048
        self.assertEqual(12, osutils.terminal_width())
1942
2049
 
 
2050
    def test_BRZ_COLUMNS_0_no_limit(self):
 
2051
        self.overrideEnv('BRZ_COLUMNS', '0')
 
2052
        self.assertEqual(None, osutils.terminal_width())
 
2053
 
1943
2054
    def test_falls_back_to_COLUMNS(self):
1944
 
        del os.environ['BZR_COLUMNS']
 
2055
        self.overrideEnv('BRZ_COLUMNS', None)
1945
2056
        self.assertNotEqual('42', os.environ['COLUMNS'])
1946
2057
        self.set_fake_tty()
1947
 
        os.environ['COLUMNS'] = '42'
 
2058
        self.overrideEnv('COLUMNS', '42')
1948
2059
        self.assertEqual(42, osutils.terminal_width())
1949
2060
 
1950
2061
    def test_tty_default_without_columns(self):
1951
 
        del os.environ['BZR_COLUMNS']
1952
 
        del os.environ['COLUMNS']
 
2062
        self.overrideEnv('BRZ_COLUMNS', None)
 
2063
        self.overrideEnv('COLUMNS', None)
1953
2064
 
1954
2065
        def terminal_size(w, h):
1955
2066
            return 42, 42
1962
2073
        self.assertEqual(42, osutils.terminal_width())
1963
2074
 
1964
2075
    def test_non_tty_default_without_columns(self):
1965
 
        del os.environ['BZR_COLUMNS']
1966
 
        del os.environ['COLUMNS']
 
2076
        self.overrideEnv('BRZ_COLUMNS', None)
 
2077
        self.overrideEnv('COLUMNS', None)
1967
2078
        self.replace_stdout(None)
1968
2079
        self.assertEqual(None, osutils.terminal_width())
1969
2080
 
1979
2090
        else:
1980
2091
            self.overrideAttr(termios, 'TIOCGWINSZ')
1981
2092
            del termios.TIOCGWINSZ
1982
 
        del os.environ['BZR_COLUMNS']
1983
 
        del os.environ['COLUMNS']
 
2093
        self.overrideEnv('BRZ_COLUMNS', None)
 
2094
        self.overrideEnv('COLUMNS', None)
1984
2095
        # Whatever the result is, if we don't raise an exception, it's ok.
1985
2096
        osutils.terminal_width()
1986
2097
 
 
2098
 
1987
2099
class TestCreationOps(tests.TestCaseInTempDir):
1988
2100
    _test_needs_features = [features.chown_feature]
1989
2101
 
1990
2102
    def setUp(self):
1991
 
        tests.TestCaseInTempDir.setUp(self)
 
2103
        super(TestCreationOps, self).setUp()
1992
2104
        self.overrideAttr(os, 'chown', self._dummy_chown)
1993
2105
 
1994
2106
        # params set by call to _dummy_chown
2004
2116
        osutils.copy_ownership_from_path('test_file', ownsrc)
2005
2117
 
2006
2118
        s = os.stat(ownsrc)
2007
 
        self.assertEquals(self.path, 'test_file')
2008
 
        self.assertEquals(self.uid, s.st_uid)
2009
 
        self.assertEquals(self.gid, s.st_gid)
 
2119
        self.assertEqual(self.path, 'test_file')
 
2120
        self.assertEqual(self.uid, s.st_uid)
 
2121
        self.assertEqual(self.gid, s.st_gid)
2010
2122
 
2011
2123
    def test_copy_ownership_nonesrc(self):
2012
2124
        """copy_ownership_from_path test with src=None."""
2015
2127
        osutils.copy_ownership_from_path('test_file')
2016
2128
 
2017
2129
        s = os.stat('..')
2018
 
        self.assertEquals(self.path, 'test_file')
2019
 
        self.assertEquals(self.uid, s.st_uid)
2020
 
        self.assertEquals(self.gid, s.st_gid)
 
2130
        self.assertEqual(self.path, 'test_file')
 
2131
        self.assertEqual(self.uid, s.st_uid)
 
2132
        self.assertEqual(self.gid, s.st_gid)
 
2133
 
 
2134
 
 
2135
class TestPathFromEnviron(tests.TestCase):
 
2136
 
 
2137
    def test_is_unicode(self):
 
2138
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
 
2139
        path = osutils.path_from_environ('BRZ_TEST_PATH')
 
2140
        self.assertIsInstance(path, text_type)
 
2141
        self.assertEqual(u'./anywhere at all/', path)
 
2142
 
 
2143
    def test_posix_path_env_ascii(self):
 
2144
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
 
2145
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
 
2146
        self.assertIsInstance(home, text_type)
 
2147
        self.assertEqual(u'/tmp', home)
 
2148
 
 
2149
    def test_posix_path_env_unicode(self):
 
2150
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2151
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
 
2152
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2153
        self.assertEqual(u'/home/\xa7test',
 
2154
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2155
        osutils._fs_enc = "iso8859-5"
 
2156
        if PY3:
 
2157
            # In Python 3, os.environ returns unicode.
 
2158
            self.assertEqual(u'/home/\xa7test',
 
2159
                osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2160
        else:
 
2161
            self.assertEqual(u'/home/\u0407test',
 
2162
                osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2163
            osutils._fs_enc = "utf-8"
 
2164
            self.assertRaises(errors.BadFilenameEncoding,
 
2165
                osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
 
2166
 
 
2167
 
 
2168
class TestGetHomeDir(tests.TestCase):
 
2169
 
 
2170
    def test_is_unicode(self):
 
2171
        home = osutils._get_home_dir()
 
2172
        self.assertIsInstance(home, text_type)
 
2173
 
 
2174
    def test_posix_homeless(self):
 
2175
        self.overrideEnv('HOME', None)
 
2176
        home = osutils._get_home_dir()
 
2177
        self.assertIsInstance(home, text_type)
 
2178
 
 
2179
    def test_posix_home_ascii(self):
 
2180
        self.overrideEnv('HOME', '/home/test')
 
2181
        home = osutils._posix_get_home_dir()
 
2182
        self.assertIsInstance(home, text_type)
 
2183
        self.assertEqual(u'/home/test', home)
 
2184
 
 
2185
    def test_posix_home_unicode(self):
 
2186
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2187
        self.overrideEnv('HOME', '/home/\xa7test')
 
2188
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2189
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2190
        osutils._fs_enc = "iso8859-5"
 
2191
        if PY3:
 
2192
            # In python 3, os.environ returns unicode
 
2193
            self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2194
        else:
 
2195
            self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
 
2196
            osutils._fs_enc = "utf-8"
 
2197
            self.assertRaises(errors.BadFilenameEncoding,
 
2198
                osutils._posix_get_home_dir)
 
2199
 
 
2200
 
 
2201
class TestGetuserUnicode(tests.TestCase):
 
2202
 
 
2203
    def test_is_unicode(self):
 
2204
        user = osutils.getuser_unicode()
 
2205
        self.assertIsInstance(user, text_type)
 
2206
 
 
2207
    def envvar_to_override(self):
 
2208
        if sys.platform == "win32":
 
2209
            # Disable use of platform calls on windows so envvar is used
 
2210
            self.overrideAttr(win32utils, 'has_ctypes', False)
 
2211
            return 'USERNAME' # only variable used on windows
 
2212
        return 'LOGNAME' # first variable checked by getpass.getuser()
 
2213
 
 
2214
    def test_ascii_user(self):
 
2215
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
 
2216
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
 
2217
 
 
2218
    def test_unicode_user(self):
 
2219
        ue = osutils.get_user_encoding()
 
2220
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
2221
        if uni_val is None:
 
2222
            raise tests.TestSkipped(
 
2223
                'Cannot find a unicode character that works in encoding %s'
 
2224
                % (osutils.get_user_encoding(),))
 
2225
        uni_username = u'jrandom' + uni_val
 
2226
        encoded_username = uni_username.encode(ue)
 
2227
        if PY3:
 
2228
            self.overrideEnv(self.envvar_to_override(), uni_username)
 
2229
        else:
 
2230
            self.overrideEnv(self.envvar_to_override(), encoded_username)
 
2231
        self.assertEqual(uni_username, osutils.getuser_unicode())
 
2232
 
 
2233
 
 
2234
class TestBackupNames(tests.TestCase):
 
2235
 
 
2236
    def setUp(self):
 
2237
        super(TestBackupNames, self).setUp()
 
2238
        self.backups = []
 
2239
 
 
2240
    def backup_exists(self, name):
 
2241
        return name in self.backups
 
2242
 
 
2243
    def available_backup_name(self, name):
 
2244
        backup_name = osutils.available_backup_name(name, self.backup_exists)
 
2245
        self.backups.append(backup_name)
 
2246
        return backup_name
 
2247
 
 
2248
    def assertBackupName(self, expected, name):
 
2249
        self.assertEqual(expected, self.available_backup_name(name))
 
2250
 
 
2251
    def test_empty(self):
 
2252
        self.assertBackupName('file.~1~', 'file')
 
2253
 
 
2254
    def test_existing(self):
 
2255
        self.available_backup_name('file')
 
2256
        self.available_backup_name('file')
 
2257
        self.assertBackupName('file.~3~', 'file')
 
2258
        # Empty slots are found, this is not a strict requirement and may be
 
2259
        # revisited if we test against all implementations.
 
2260
        self.backups.remove('file.~2~')
 
2261
        self.assertBackupName('file.~2~', 'file')
 
2262
 
 
2263
 
 
2264
class TestFindExecutableInPath(tests.TestCase):
 
2265
 
 
2266
    def test_windows(self):
 
2267
        if sys.platform != 'win32':
 
2268
            raise tests.TestSkipped('test requires win32')
 
2269
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
 
2270
        self.assertTrue(
 
2271
            osutils.find_executable_on_path('explorer.exe') is not None)
 
2272
        self.assertTrue(
 
2273
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
 
2274
        self.assertTrue(
 
2275
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2276
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
 
2277
        
 
2278
    def test_windows_app_path(self):
 
2279
        if sys.platform != 'win32':
 
2280
            raise tests.TestSkipped('test requires win32')
 
2281
        # Override PATH env var so that exe can only be found on App Path
 
2282
        self.overrideEnv('PATH', '')
 
2283
        # Internt Explorer is always registered in the App Path
 
2284
        self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
 
2285
 
 
2286
    def test_other(self):
 
2287
        if sys.platform == 'win32':
 
2288
            raise tests.TestSkipped('test requires non-win32')
 
2289
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
 
2290
        self.assertTrue(
 
2291
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2292
 
 
2293
 
 
2294
class TestEnvironmentErrors(tests.TestCase):
 
2295
    """Test handling of environmental errors"""
 
2296
 
 
2297
    def test_is_oserror(self):
 
2298
        self.assertTrue(osutils.is_environment_error(
 
2299
            OSError(errno.EINVAL, "Invalid parameter")))
 
2300
 
 
2301
    def test_is_ioerror(self):
 
2302
        self.assertTrue(osutils.is_environment_error(
 
2303
            IOError(errno.EINVAL, "Invalid parameter")))
 
2304
 
 
2305
    def test_is_socket_error(self):
 
2306
        self.assertTrue(osutils.is_environment_error(
 
2307
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2308
 
 
2309
    def test_is_select_error(self):
 
2310
        self.assertTrue(osutils.is_environment_error(
 
2311
            select.error(errno.EINVAL, "Invalid parameter")))
 
2312
 
 
2313
    def test_is_pywintypes_error(self):
 
2314
        self.requireFeature(features.pywintypes)
 
2315
        import pywintypes
 
2316
        self.assertTrue(osutils.is_environment_error(
 
2317
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))