/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_osutils.py

  • Committer: Jelmer Vernooij
  • Date: 2020-04-05 19:11:34 UTC
  • mto: (7490.7.16 work)
  • mto: This revision was merged to the branch mainline in revision 7501.
  • Revision ID: jelmer@jelmer.uk-20200405191134-0aebh8ikiwygxma5
Populate the .gitignore file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
 
from cStringIO import StringIO
 
19
from __future__ import absolute_import, division
 
20
 
20
21
import errno
21
22
import os
22
 
import re
 
23
import select
23
24
import socket
24
 
import stat
25
25
import sys
 
26
import tempfile
26
27
import time
27
28
 
28
 
from bzrlib import (
 
29
from .. import (
29
30
    errors,
30
31
    osutils,
31
32
    tests,
32
33
    trace,
33
34
    win32utils,
34
35
    )
35
 
from bzrlib.tests import (
 
36
from ..sixish import (
 
37
    BytesIO,
 
38
    PY3,
 
39
    text_type,
 
40
    )
 
41
from . import (
36
42
    features,
37
43
    file_utils,
38
44
    test__walkdirs_win32,
39
45
    )
40
 
 
41
 
 
42
 
class _UTF8DirReaderFeature(tests.Feature):
 
46
from .scenarios import load_tests_apply_scenarios
 
47
 
 
48
 
 
49
class _UTF8DirReaderFeature(features.ModuleAvailableFeature):
43
50
 
44
51
    def _probe(self):
45
52
        try:
46
 
            from bzrlib import _readdir_pyx
 
53
            from .. import _readdir_pyx
 
54
            self._module = _readdir_pyx
47
55
            self.reader = _readdir_pyx.UTF8DirReader
48
56
            return True
49
57
        except ImportError:
50
58
            return False
51
59
 
52
 
    def feature_name(self):
53
 
        return 'bzrlib._readdir_pyx'
54
 
 
55
 
UTF8DirReaderFeature = _UTF8DirReaderFeature()
56
 
 
57
 
term_ios_feature = tests.ModuleAvailableFeature('termios')
 
60
 
 
61
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
 
62
 
 
63
term_ios_feature = features.ModuleAvailableFeature('termios')
58
64
 
59
65
 
60
66
def _already_unicode(s):
78
84
    # Some DirReaders are platform specific and even there they may not be
79
85
    # available.
80
86
    if UTF8DirReaderFeature.available():
81
 
        from bzrlib import _readdir_pyx
 
87
        from .. import _readdir_pyx
82
88
        scenarios.append(('utf8',
83
89
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
84
90
                               _native_to_unicode=_utf8_to_unicode)))
85
91
 
86
92
    if test__walkdirs_win32.win32_readdir_feature.available():
87
93
        try:
88
 
            from bzrlib import _walkdirs_win32
 
94
            from .. import _walkdirs_win32
89
95
            scenarios.append(
90
96
                ('win32',
91
97
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
95
101
    return scenarios
96
102
 
97
103
 
98
 
def load_tests(basic_tests, module, loader):
99
 
    suite = loader.suiteClass()
100
 
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
101
 
        basic_tests, tests.condition_isinstance(TestDirReader))
102
 
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
103
 
    suite.addTest(remaining_tests)
104
 
    return suite
 
104
load_tests = load_tests_apply_scenarios
105
105
 
106
106
 
107
107
class TestContainsWhitespace(tests.TestCase):
108
108
 
109
109
    def test_contains_whitespace(self):
110
 
        self.failUnless(osutils.contains_whitespace(u' '))
111
 
        self.failUnless(osutils.contains_whitespace(u'hello there'))
112
 
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
113
 
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
114
 
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
115
 
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
110
        self.assertTrue(osutils.contains_whitespace(u' '))
 
111
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
 
112
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
 
113
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
 
114
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
 
115
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
116
116
 
117
117
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
118
118
        # is whitespace, but we do not.
119
 
        self.failIf(osutils.contains_whitespace(u''))
120
 
        self.failIf(osutils.contains_whitespace(u'hellothere'))
121
 
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
119
        self.assertFalse(osutils.contains_whitespace(u''))
 
120
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
 
121
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
122
122
 
123
123
 
124
124
class TestRename(tests.TestCaseInTempDir):
136
136
 
137
137
    def test_fancy_rename(self):
138
138
        # This should work everywhere
139
 
        self.create_file('a', 'something in a\n')
 
139
        self.create_file('a', b'something in a\n')
140
140
        self._fancy_rename('a', 'b')
141
 
        self.failIfExists('a')
142
 
        self.failUnlessExists('b')
143
 
        self.check_file_contents('b', 'something in a\n')
 
141
        self.assertPathDoesNotExist('a')
 
142
        self.assertPathExists('b')
 
143
        self.check_file_contents('b', b'something in a\n')
144
144
 
145
 
        self.create_file('a', 'new something in a\n')
 
145
        self.create_file('a', b'new something in a\n')
146
146
        self._fancy_rename('b', 'a')
147
147
 
148
 
        self.check_file_contents('a', 'something in a\n')
 
148
        self.check_file_contents('a', b'something in a\n')
149
149
 
150
150
    def test_fancy_rename_fails_source_missing(self):
151
151
        # An exception should be raised, and the target should be left in place
152
 
        self.create_file('target', 'data in target\n')
 
152
        self.create_file('target', b'data in target\n')
153
153
        self.assertRaises((IOError, OSError), self._fancy_rename,
154
154
                          'missingsource', 'target')
155
 
        self.failUnlessExists('target')
156
 
        self.check_file_contents('target', 'data in target\n')
 
155
        self.assertPathExists('target')
 
156
        self.check_file_contents('target', b'data in target\n')
157
157
 
158
158
    def test_fancy_rename_fails_if_source_and_target_missing(self):
159
159
        self.assertRaises((IOError, OSError), self._fancy_rename,
161
161
 
162
162
    def test_rename(self):
163
163
        # Rename should be semi-atomic on all platforms
164
 
        self.create_file('a', 'something in a\n')
 
164
        self.create_file('a', b'something in a\n')
165
165
        osutils.rename('a', 'b')
166
 
        self.failIfExists('a')
167
 
        self.failUnlessExists('b')
168
 
        self.check_file_contents('b', 'something in a\n')
 
166
        self.assertPathDoesNotExist('a')
 
167
        self.assertPathExists('b')
 
168
        self.check_file_contents('b', b'something in a\n')
169
169
 
170
 
        self.create_file('a', 'new something in a\n')
 
170
        self.create_file('a', b'new something in a\n')
171
171
        osutils.rename('b', 'a')
172
172
 
173
 
        self.check_file_contents('a', 'something in a\n')
 
173
        self.check_file_contents('a', b'something in a\n')
174
174
 
175
175
    # TODO: test fancy_rename using a MemoryTransport
176
176
 
182
182
        # we can't use failUnlessExists on case-insensitive filesystem
183
183
        # so try to check shape of the tree
184
184
        shape = sorted(os.listdir('.'))
185
 
        self.assertEquals(['A', 'B'], shape)
 
185
        self.assertEqual(['A', 'B'], shape)
186
186
 
187
 
    def test_rename_error(self):
188
 
        # We wrap os.rename to make it give an error including the filenames
189
 
        # https://bugs.launchpad.net/bzr/+bug/491763
190
 
        err = self.assertRaises(OSError, osutils.rename,
191
 
            'nonexistent', 'target')
192
 
        self.assertContainsString(str(err), 'nonexistent')
 
187
    def test_rename_exception(self):
 
188
        try:
 
189
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
 
190
        except OSError as e:
 
191
            self.assertEqual(e.old_filename, 'nonexistent_path')
 
192
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
 
193
            self.assertTrue('nonexistent_path' in e.strerror)
 
194
            self.assertTrue('different_nonexistent_path' in e.strerror)
193
195
 
194
196
 
195
197
class TestRandChars(tests.TestCase):
222
224
                         (['src'], SRC_FOO_C),
223
225
                         (['src'], 'src'),
224
226
                         ]:
225
 
            self.assert_(osutils.is_inside_any(dirs, fn))
 
227
            self.assertTrue(osutils.is_inside_any(dirs, fn))
226
228
        for dirs, fn in [(['src'], 'srccontrol'),
227
229
                         (['src'], 'srccontrol/foo')]:
228
230
            self.assertFalse(osutils.is_inside_any(dirs, fn))
234
236
                         (['src/bar.c', 'bla/foo.c'], 'src'),
235
237
                         (['src'], 'src'),
236
238
                         ]:
237
 
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
 
239
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
238
240
 
239
241
        for dirs, fn in [(['src'], 'srccontrol'),
240
242
                         (['srccontrol/foo.c'], 'src'),
242
244
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
243
245
 
244
246
 
 
247
class TestLstat(tests.TestCaseInTempDir):
 
248
 
 
249
    def test_lstat_matches_fstat(self):
 
250
        # On Windows, lstat and fstat don't always agree, primarily in the
 
251
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
 
252
        # custom implementation.
 
253
        if sys.platform == 'win32':
 
254
            # We only have special lstat/fstat if we have the extension.
 
255
            # Without it, we may end up re-reading content when we don't have
 
256
            # to, but otherwise it doesn't effect correctness.
 
257
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
258
        with open('test-file.txt', 'wb') as f:
 
259
            f.write(b'some content\n')
 
260
            f.flush()
 
261
            self.assertEqualStat(osutils.fstat(f.fileno()),
 
262
                                 osutils.lstat('test-file.txt'))
 
263
 
 
264
 
245
265
class TestRmTree(tests.TestCaseInTempDir):
246
266
 
247
267
    def test_rmtree(self):
248
268
        # Check to remove tree with read-only files/dirs
249
269
        os.mkdir('dir')
250
 
        f = file('dir/file', 'w')
251
 
        f.write('spam')
252
 
        f.close()
 
270
        with open('dir/file', 'w') as f:
 
271
            f.write('spam')
253
272
        # would like to also try making the directory readonly, but at the
254
273
        # moment python shutil.rmtree doesn't handle that properly - it would
255
274
        # need to chmod the directory before removing things inside it - deferred
259
278
 
260
279
        osutils.rmtree('dir')
261
280
 
262
 
        self.failIfExists('dir/file')
263
 
        self.failIfExists('dir')
 
281
        self.assertPathDoesNotExist('dir/file')
 
282
        self.assertPathDoesNotExist('dir')
264
283
 
265
284
 
266
285
class TestDeleteAny(tests.TestCaseInTempDir):
279
298
 
280
299
    def test_file_kind(self):
281
300
        self.build_tree(['file', 'dir/'])
282
 
        self.assertEquals('file', osutils.file_kind('file'))
283
 
        self.assertEquals('directory', osutils.file_kind('dir/'))
 
301
        self.assertEqual('file', osutils.file_kind('file'))
 
302
        self.assertEqual('directory', osutils.file_kind('dir/'))
284
303
        if osutils.has_symlinks():
285
304
            os.symlink('symlink', 'symlink')
286
 
            self.assertEquals('symlink', osutils.file_kind('symlink'))
 
305
            self.assertEqual('symlink', osutils.file_kind('symlink'))
287
306
 
288
307
        # TODO: jam 20060529 Test a block device
289
308
        try:
290
309
            os.lstat('/dev/null')
291
 
        except OSError, e:
 
310
        except OSError as e:
292
311
            if e.errno not in (errno.ENOENT,):
293
312
                raise
294
313
        else:
295
 
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
 
314
            self.assertEqual(
 
315
                'chardev',
 
316
                osutils.file_kind(os.path.realpath('/dev/null')))
296
317
 
297
318
        mkfifo = getattr(os, 'mkfifo', None)
298
319
        if mkfifo:
299
320
            mkfifo('fifo')
300
321
            try:
301
 
                self.assertEquals('fifo', osutils.file_kind('fifo'))
 
322
                self.assertEqual('fifo', osutils.file_kind('fifo'))
302
323
            finally:
303
324
                os.remove('fifo')
304
325
 
307
328
            s = socket.socket(AF_UNIX)
308
329
            s.bind('socket')
309
330
            try:
310
 
                self.assertEquals('socket', osutils.file_kind('socket'))
 
331
                self.assertEqual('socket', osutils.file_kind('socket'))
311
332
            finally:
312
333
                os.remove('socket')
313
334
 
332
353
 
333
354
        orig_umask = osutils.get_umask()
334
355
        self.addCleanup(os.umask, orig_umask)
335
 
        os.umask(0222)
336
 
        self.assertEqual(0222, osutils.get_umask())
337
 
        os.umask(0022)
338
 
        self.assertEqual(0022, osutils.get_umask())
339
 
        os.umask(0002)
340
 
        self.assertEqual(0002, osutils.get_umask())
341
 
        os.umask(0027)
342
 
        self.assertEqual(0027, osutils.get_umask())
 
356
        os.umask(0o222)
 
357
        self.assertEqual(0o222, osutils.get_umask())
 
358
        os.umask(0o022)
 
359
        self.assertEqual(0o022, osutils.get_umask())
 
360
        os.umask(0o002)
 
361
        self.assertEqual(0o002, osutils.get_umask())
 
362
        os.umask(0o027)
 
363
        self.assertEqual(0o027, osutils.get_umask())
343
364
 
344
365
 
345
366
class TestDateTime(tests.TestCase):
381
402
        self.assertFormatedDelta('2 seconds in the future', -2)
382
403
 
383
404
    def test_format_date(self):
384
 
        self.assertRaises(errors.UnsupportedTimezoneFormat,
385
 
            osutils.format_date, 0, timezone='foo')
 
405
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
 
406
                          osutils.format_date, 0, timezone='foo')
386
407
        self.assertIsInstance(osutils.format_date(0), str)
387
 
        self.assertIsInstance(osutils.format_local_date(0), unicode)
 
408
        self.assertIsInstance(osutils.format_local_date(0), text_type)
388
409
        # Testing for the actual value of the local weekday without
389
410
        # duplicating the code from format_date is difficult.
390
411
        # Instead blackbox.test_locale should check for localized
392
413
 
393
414
    def test_format_date_with_offset_in_original_timezone(self):
394
415
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
395
 
            osutils.format_date_with_offset_in_original_timezone(0))
 
416
                         osutils.format_date_with_offset_in_original_timezone(0))
396
417
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
397
 
            osutils.format_date_with_offset_in_original_timezone(100000))
 
418
                         osutils.format_date_with_offset_in_original_timezone(100000))
398
419
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
399
 
            osutils.format_date_with_offset_in_original_timezone(100000, 7200))
 
420
                         osutils.format_date_with_offset_in_original_timezone(100000, 7200))
400
421
 
401
422
    def test_local_time_offset(self):
402
423
        """Test that local_time_offset() returns a sane value."""
418
439
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
419
440
 
420
441
 
 
442
class TestFdatasync(tests.TestCaseInTempDir):
 
443
 
 
444
    def do_fdatasync(self):
 
445
        f = tempfile.NamedTemporaryFile()
 
446
        osutils.fdatasync(f.fileno())
 
447
        f.close()
 
448
 
 
449
    @staticmethod
 
450
    def raise_eopnotsupp(*args, **kwargs):
 
451
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
 
452
 
 
453
    @staticmethod
 
454
    def raise_enotsup(*args, **kwargs):
 
455
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
 
456
 
 
457
    def test_fdatasync_handles_system_function(self):
 
458
        self.overrideAttr(os, "fdatasync")
 
459
        self.do_fdatasync()
 
460
 
 
461
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
 
462
        self.overrideAttr(os, "fdatasync")
 
463
        self.overrideAttr(os, "fsync")
 
464
        self.do_fdatasync()
 
465
 
 
466
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
 
467
        self.overrideAttr(errno, "EOPNOTSUPP")
 
468
        self.do_fdatasync()
 
469
 
 
470
    def test_fdatasync_catches_ENOTSUP(self):
 
471
        enotsup = getattr(errno, "ENOTSUP", None)
 
472
        if enotsup is None:
 
473
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
 
474
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
 
475
        self.do_fdatasync()
 
476
 
 
477
    def test_fdatasync_catches_EOPNOTSUPP(self):
 
478
        enotsup = getattr(errno, "EOPNOTSUPP", None)
 
479
        if enotsup is None:
 
480
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
 
481
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
 
482
        self.do_fdatasync()
 
483
 
 
484
 
421
485
class TestLinks(tests.TestCaseInTempDir):
422
486
 
423
487
    def test_dereference_path(self):
424
 
        self.requireFeature(tests.SymlinkFeature)
 
488
        self.requireFeature(features.SymlinkFeature)
425
489
        cwd = osutils.realpath('.')
426
490
        os.mkdir('bar')
427
491
        bar_path = osutils.pathjoin(cwd, 'bar')
448
512
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
449
513
 
450
514
    def test_changing_access(self):
451
 
        f = file('file', 'w')
452
 
        f.write('monkey')
453
 
        f.close()
 
515
        with open('file', 'w') as f:
 
516
            f.write('monkey')
454
517
 
455
518
        # Make a file readonly
456
519
        osutils.make_readonly('file')
457
520
        mode = os.lstat('file').st_mode
458
 
        self.assertEqual(mode, mode & 0777555)
 
521
        self.assertEqual(mode, mode & 0o777555)
459
522
 
460
523
        # Make a file writable
461
524
        osutils.make_writable('file')
462
525
        mode = os.lstat('file').st_mode
463
 
        self.assertEqual(mode, mode | 0200)
 
526
        self.assertEqual(mode, mode | 0o200)
464
527
 
465
528
        if osutils.has_symlinks():
466
529
            # should not error when handed a symlink
474
537
 
475
538
class TestCanonicalRelPath(tests.TestCaseInTempDir):
476
539
 
477
 
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
 
540
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
478
541
 
479
542
    def test_canonical_relpath_simple(self):
480
 
        f = file('MixedCaseName', 'w')
 
543
        f = open('MixedCaseName', 'w')
481
544
        f.close()
482
545
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
483
 
        self.failUnlessEqual('work/MixedCaseName', actual)
 
546
        self.assertEqual('work/MixedCaseName', actual)
484
547
 
485
548
    def test_canonical_relpath_missing_tail(self):
486
549
        os.mkdir('MixedCaseParent')
487
550
        actual = osutils.canonical_relpath(self.test_base_dir,
488
551
                                           'mixedcaseparent/nochild')
489
 
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
 
552
        self.assertEqual('work/MixedCaseParent/nochild', actual)
490
553
 
491
554
 
492
555
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
536
599
    """Test pumpfile method."""
537
600
 
538
601
    def setUp(self):
539
 
        tests.TestCase.setUp(self)
 
602
        super(TestPumpFile, self).setUp()
540
603
        # create a test datablock
541
604
        self.block_size = 512
542
 
        pattern = '0123456789ABCDEF'
543
 
        self.test_data = pattern * (3 * self.block_size / len(pattern))
 
605
        pattern = b'0123456789ABCDEF'
 
606
        self.test_data = pattern * (3 * self.block_size // len(pattern))
544
607
        self.test_data_len = len(self.test_data)
545
608
 
546
609
    def test_bracket_block_size(self):
550
613
        self.assertTrue(self.test_data_len > self.block_size)
551
614
 
552
615
        from_file = file_utils.FakeReadFile(self.test_data)
553
 
        to_file = StringIO()
 
616
        to_file = BytesIO()
554
617
 
555
 
        # read (max / 2) bytes and verify read size wasn't affected
556
 
        num_bytes_to_read = self.block_size / 2
557
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
618
        # read (max // 2) bytes and verify read size wasn't affected
 
619
        num_bytes_to_read = self.block_size // 2
 
620
        osutils.pumpfile(from_file, to_file,
 
621
                         num_bytes_to_read, self.block_size)
558
622
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
559
623
        self.assertEqual(from_file.get_read_count(), 1)
560
624
 
561
625
        # read (max) bytes and verify read size wasn't affected
562
626
        num_bytes_to_read = self.block_size
563
627
        from_file.reset_read_count()
564
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
628
        osutils.pumpfile(from_file, to_file,
 
629
                         num_bytes_to_read, self.block_size)
565
630
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
566
631
        self.assertEqual(from_file.get_read_count(), 1)
567
632
 
568
633
        # read (max + 1) bytes and verify read size was limited
569
634
        num_bytes_to_read = self.block_size + 1
570
635
        from_file.reset_read_count()
571
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
636
        osutils.pumpfile(from_file, to_file,
 
637
                         num_bytes_to_read, self.block_size)
572
638
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
573
639
        self.assertEqual(from_file.get_read_count(), 2)
574
640
 
575
641
        # finish reading the rest of the data
576
642
        num_bytes_to_read = self.test_data_len - to_file.tell()
577
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
643
        osutils.pumpfile(from_file, to_file,
 
644
                         num_bytes_to_read, self.block_size)
578
645
 
579
646
        # report error if the data wasn't equal (we only report the size due
580
647
        # to the length of the data)
591
658
 
592
659
        # retrieve data in blocks
593
660
        from_file = file_utils.FakeReadFile(self.test_data)
594
 
        to_file = StringIO()
 
661
        to_file = BytesIO()
595
662
        osutils.pumpfile(from_file, to_file, self.test_data_len,
596
663
                         self.block_size)
597
664
 
615
682
 
616
683
        # retrieve data to EOF
617
684
        from_file = file_utils.FakeReadFile(self.test_data)
618
 
        to_file = StringIO()
 
685
        to_file = BytesIO()
619
686
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
620
687
 
621
688
        # verify read size was equal to the maximum read size
635
702
        with this new version."""
636
703
        # retrieve data using default (old) pumpfile method
637
704
        from_file = file_utils.FakeReadFile(self.test_data)
638
 
        to_file = StringIO()
 
705
        to_file = BytesIO()
639
706
        osutils.pumpfile(from_file, to_file)
640
707
 
641
708
        # report error if the data wasn't equal (we only report the size due
647
714
 
648
715
    def test_report_activity(self):
649
716
        activity = []
 
717
 
650
718
        def log_activity(length, direction):
651
719
            activity.append((length, direction))
652
 
        from_file = StringIO(self.test_data)
653
 
        to_file = StringIO()
 
720
        from_file = BytesIO(self.test_data)
 
721
        to_file = BytesIO()
654
722
        osutils.pumpfile(from_file, to_file, buff_size=500,
655
723
                         report_activity=log_activity, direction='read')
656
724
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
657
725
                          (36, 'read')], activity)
658
726
 
659
 
        from_file = StringIO(self.test_data)
660
 
        to_file = StringIO()
 
727
        from_file = BytesIO(self.test_data)
 
728
        to_file = BytesIO()
661
729
        del activity[:]
662
730
        osutils.pumpfile(from_file, to_file, buff_size=500,
663
731
                         report_activity=log_activity, direction='write')
665
733
                          (36, 'write')], activity)
666
734
 
667
735
        # And with a limited amount of data
668
 
        from_file = StringIO(self.test_data)
669
 
        to_file = StringIO()
 
736
        from_file = BytesIO(self.test_data)
 
737
        to_file = BytesIO()
670
738
        del activity[:]
671
739
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
672
740
                         report_activity=log_activity, direction='read')
673
 
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
674
 
 
 
741
        self.assertEqual(
 
742
            [(500, 'read'), (500, 'read'), (28, 'read')], activity)
675
743
 
676
744
 
677
745
class TestPumpStringFile(tests.TestCase):
678
746
 
679
747
    def test_empty(self):
680
 
        output = StringIO()
681
 
        osutils.pump_string_file("", output)
682
 
        self.assertEqual("", output.getvalue())
 
748
        output = BytesIO()
 
749
        osutils.pump_string_file(b"", output)
 
750
        self.assertEqual(b"", output.getvalue())
683
751
 
684
752
    def test_more_than_segment_size(self):
685
 
        output = StringIO()
686
 
        osutils.pump_string_file("123456789", output, 2)
687
 
        self.assertEqual("123456789", output.getvalue())
 
753
        output = BytesIO()
 
754
        osutils.pump_string_file(b"123456789", output, 2)
 
755
        self.assertEqual(b"123456789", output.getvalue())
688
756
 
689
757
    def test_segment_size(self):
690
 
        output = StringIO()
691
 
        osutils.pump_string_file("12", output, 2)
692
 
        self.assertEqual("12", output.getvalue())
 
758
        output = BytesIO()
 
759
        osutils.pump_string_file(b"12", output, 2)
 
760
        self.assertEqual(b"12", output.getvalue())
693
761
 
694
762
    def test_segment_size_multiple(self):
695
 
        output = StringIO()
696
 
        osutils.pump_string_file("1234", output, 2)
697
 
        self.assertEqual("1234", output.getvalue())
 
763
        output = BytesIO()
 
764
        osutils.pump_string_file(b"1234", output, 2)
 
765
        self.assertEqual(b"1234", output.getvalue())
698
766
 
699
767
 
700
768
class TestRelpath(tests.TestCase):
719
787
class TestSafeUnicode(tests.TestCase):
720
788
 
721
789
    def test_from_ascii_string(self):
722
 
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
 
790
        self.assertEqual(u'foobar', osutils.safe_unicode(b'foobar'))
723
791
 
724
792
    def test_from_unicode_string_ascii_contents(self):
725
793
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
728
796
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
729
797
 
730
798
    def test_from_utf8_string(self):
731
 
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
 
799
        self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
732
800
 
733
801
    def test_bad_utf8_string(self):
734
802
        self.assertRaises(errors.BzrBadParameterNotUnicode,
735
803
                          osutils.safe_unicode,
736
 
                          '\xbb\xbb')
 
804
                          b'\xbb\xbb')
737
805
 
738
806
 
739
807
class TestSafeUtf8(tests.TestCase):
740
808
 
741
809
    def test_from_ascii_string(self):
742
 
        f = 'foobar'
743
 
        self.assertEqual('foobar', osutils.safe_utf8(f))
 
810
        f = b'foobar'
 
811
        self.assertEqual(b'foobar', osutils.safe_utf8(f))
744
812
 
745
813
    def test_from_unicode_string_ascii_contents(self):
746
 
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
 
814
        self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
747
815
 
748
816
    def test_from_unicode_string_unicode_contents(self):
749
 
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
817
        self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
750
818
 
751
819
    def test_from_utf8_string(self):
752
 
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
 
820
        self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
753
821
 
754
822
    def test_bad_utf8_string(self):
755
823
        self.assertRaises(errors.BzrBadParameterNotUnicode,
756
 
                          osutils.safe_utf8, '\xbb\xbb')
 
824
                          osutils.safe_utf8, b'\xbb\xbb')
757
825
 
758
826
 
759
827
class TestSafeRevisionId(tests.TestCase):
760
828
 
761
829
    def test_from_ascii_string(self):
762
830
        # this shouldn't give a warning because it's getting an ascii string
763
 
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
831
        self.assertEqual(b'foobar', osutils.safe_revision_id(b'foobar'))
764
832
 
765
833
    def test_from_unicode_string_ascii_contents(self):
766
 
        self.assertEqual('bargam',
767
 
                         osutils.safe_revision_id(u'bargam', warn=False))
768
 
 
769
 
    def test_from_unicode_deprecated(self):
770
 
        self.assertEqual('bargam',
771
 
            self.callDeprecated([osutils._revision_id_warning],
772
 
                                osutils.safe_revision_id, u'bargam'))
 
834
        self.assertRaises(TypeError,
 
835
                          osutils.safe_revision_id, u'bargam')
773
836
 
774
837
    def test_from_unicode_string_unicode_contents(self):
775
 
        self.assertEqual('bargam\xc2\xae',
776
 
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
838
        self.assertRaises(TypeError,
 
839
                          osutils.safe_revision_id, u'bargam\xae')
777
840
 
778
841
    def test_from_utf8_string(self):
779
 
        self.assertEqual('foo\xc2\xae',
780
 
                         osutils.safe_revision_id('foo\xc2\xae'))
 
842
        self.assertEqual(b'foo\xc2\xae',
 
843
                         osutils.safe_revision_id(b'foo\xc2\xae'))
781
844
 
782
845
    def test_none(self):
783
846
        """Currently, None is a valid revision_id"""
787
850
class TestSafeFileId(tests.TestCase):
788
851
 
789
852
    def test_from_ascii_string(self):
790
 
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
853
        self.assertEqual(b'foobar', osutils.safe_file_id(b'foobar'))
791
854
 
792
855
    def test_from_unicode_string_ascii_contents(self):
793
 
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
794
 
 
795
 
    def test_from_unicode_deprecated(self):
796
 
        self.assertEqual('bargam',
797
 
            self.callDeprecated([osutils._file_id_warning],
798
 
                                osutils.safe_file_id, u'bargam'))
 
856
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
799
857
 
800
858
    def test_from_unicode_string_unicode_contents(self):
801
 
        self.assertEqual('bargam\xc2\xae',
802
 
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
859
        self.assertRaises(TypeError,
 
860
                          osutils.safe_file_id, u'bargam\xae')
803
861
 
804
862
    def test_from_utf8_string(self):
805
 
        self.assertEqual('foo\xc2\xae',
806
 
                         osutils.safe_file_id('foo\xc2\xae'))
 
863
        self.assertEqual(b'foo\xc2\xae',
 
864
                         osutils.safe_file_id(b'foo\xc2\xae'))
807
865
 
808
866
    def test_none(self):
809
867
        """Currently, None is a valid revision_id"""
810
868
        self.assertEqual(None, osutils.safe_file_id(None))
811
869
 
812
870
 
 
871
class TestSendAll(tests.TestCase):
 
872
 
 
873
    def test_send_with_disconnected_socket(self):
 
874
        class DisconnectedSocket(object):
 
875
            def __init__(self, err):
 
876
                self.err = err
 
877
 
 
878
            def send(self, content):
 
879
                raise self.err
 
880
 
 
881
            def close(self):
 
882
                pass
 
883
        # All of these should be treated as ConnectionReset
 
884
        errs = []
 
885
        for err_cls in (IOError, socket.error):
 
886
            for errnum in osutils._end_of_stream_errors:
 
887
                errs.append(err_cls(errnum))
 
888
        for err in errs:
 
889
            sock = DisconnectedSocket(err)
 
890
            self.assertRaises(errors.ConnectionReset,
 
891
                              osutils.send_all, sock, b'some more content')
 
892
 
 
893
    def test_send_with_no_progress(self):
 
894
        # See https://bugs.launchpad.net/bzr/+bug/1047309
 
895
        # It seems that paramiko can get into a state where it doesn't error,
 
896
        # but it returns 0 bytes sent for requests over and over again.
 
897
        class NoSendingSocket(object):
 
898
            def __init__(self):
 
899
                self.call_count = 0
 
900
 
 
901
            def send(self, bytes):
 
902
                self.call_count += 1
 
903
                if self.call_count > 100:
 
904
                    # Prevent the test suite from hanging
 
905
                    raise RuntimeError('too many calls')
 
906
                return 0
 
907
        sock = NoSendingSocket()
 
908
        self.assertRaises(errors.ConnectionReset,
 
909
                          osutils.send_all, sock, b'content')
 
910
        self.assertEqual(1, sock.call_count)
 
911
 
 
912
 
 
913
class TestPosixFuncs(tests.TestCase):
 
914
    """Test that the posix version of normpath returns an appropriate path
 
915
       when used with 2 leading slashes."""
 
916
 
 
917
    def test_normpath(self):
 
918
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
 
919
        self.assertEqual(
 
920
            '/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
921
        self.assertEqual(
 
922
            '/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
923
 
 
924
 
813
925
class TestWin32Funcs(tests.TestCase):
814
926
    """Test that _win32 versions of os utilities return appropriate paths."""
815
927
 
816
928
    def test_abspath(self):
 
929
        self.requireFeature(features.win32_feature)
817
930
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
818
931
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
819
932
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
832
945
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
833
946
        self.assertEqual('path/to/foo',
834
947
                         osutils._win32_pathjoin('path/to/', 'foo'))
835
 
        self.assertEqual('/foo',
 
948
 
 
949
    def test_pathjoin_late_bugfix(self):
 
950
        if sys.version_info < (2, 7, 6):
 
951
            expected = '/foo'
 
952
        else:
 
953
            expected = 'C:/foo'
 
954
        self.assertEqual(expected,
836
955
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
837
 
        self.assertEqual('/foo',
 
956
        self.assertEqual(expected,
838
957
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
839
958
 
840
959
    def test_normpath(self):
845
964
 
846
965
    def test_getcwd(self):
847
966
        cwd = osutils._win32_getcwd()
848
 
        os_cwd = os.getcwdu()
 
967
        os_cwd = osutils._getcwd()
849
968
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
850
969
        # win32 is inconsistent whether it returns lower or upper case
851
970
        # and even if it was consistent the user might type the other
859
978
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
860
979
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
861
980
 
862
 
    def test_win98_abspath(self):
863
 
        # absolute path
864
 
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
865
 
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
866
 
        # UNC path
867
 
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
868
 
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
869
 
        # relative path
870
 
        cwd = osutils.getcwd().rstrip('/')
871
 
        drive = osutils._nt_splitdrive(cwd)[0]
872
 
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
873
 
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
874
 
        # unicode path
875
 
        u = u'\u1234'
876
 
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
877
 
 
878
981
 
879
982
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
880
983
    """Test win32 functions that create files."""
881
984
 
882
985
    def test_getcwd(self):
883
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
986
        self.requireFeature(features.UnicodeFilenameFeature)
884
987
        os.mkdir(u'mu-\xb5')
885
988
        os.chdir(u'mu-\xb5')
886
989
        # TODO: jam 20060427 This will probably fail on Mac OSX because
891
994
 
892
995
    def test_minimum_path_selection(self):
893
996
        self.assertEqual(set(),
894
 
            osutils.minimum_path_selection([]))
895
 
        self.assertEqual(set(['a']),
896
 
            osutils.minimum_path_selection(['a']))
897
 
        self.assertEqual(set(['a', 'b']),
898
 
            osutils.minimum_path_selection(['a', 'b']))
899
 
        self.assertEqual(set(['a/', 'b']),
900
 
            osutils.minimum_path_selection(['a/', 'b']))
901
 
        self.assertEqual(set(['a/', 'b']),
902
 
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
903
 
        self.assertEqual(set(['a-b', 'a', 'a0b']),
904
 
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
 
997
                         osutils.minimum_path_selection([]))
 
998
        self.assertEqual({'a'},
 
999
                         osutils.minimum_path_selection(['a']))
 
1000
        self.assertEqual({'a', 'b'},
 
1001
                         osutils.minimum_path_selection(['a', 'b']))
 
1002
        self.assertEqual({'a/', 'b'},
 
1003
                         osutils.minimum_path_selection(['a/', 'b']))
 
1004
        self.assertEqual({'a/', 'b'},
 
1005
                         osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
1006
        self.assertEqual({'a-b', 'a', 'a0b'},
 
1007
                         osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
905
1008
 
906
1009
    def test_mkdtemp(self):
907
1010
        tmpdir = osutils._win32_mkdtemp(dir='.')
908
1011
        self.assertFalse('\\' in tmpdir)
909
1012
 
910
1013
    def test_rename(self):
911
 
        a = open('a', 'wb')
912
 
        a.write('foo\n')
913
 
        a.close()
914
 
        b = open('b', 'wb')
915
 
        b.write('baz\n')
916
 
        b.close()
 
1014
        with open('a', 'wb') as a:
 
1015
            a.write(b'foo\n')
 
1016
        with open('b', 'wb') as b:
 
1017
            b.write(b'baz\n')
917
1018
 
918
1019
        osutils._win32_rename('b', 'a')
919
 
        self.failUnlessExists('a')
920
 
        self.failIfExists('b')
921
 
        self.assertFileEqual('baz\n', 'a')
 
1020
        self.assertPathExists('a')
 
1021
        self.assertPathDoesNotExist('b')
 
1022
        self.assertFileEqual(b'baz\n', 'a')
922
1023
 
923
1024
    def test_rename_missing_file(self):
924
 
        a = open('a', 'wb')
925
 
        a.write('foo\n')
926
 
        a.close()
 
1025
        with open('a', 'wb') as a:
 
1026
            a.write(b'foo\n')
927
1027
 
928
1028
        try:
929
1029
            osutils._win32_rename('b', 'a')
930
 
        except (IOError, OSError), e:
 
1030
        except (IOError, OSError) as e:
931
1031
            self.assertEqual(errno.ENOENT, e.errno)
932
 
        self.assertFileEqual('foo\n', 'a')
 
1032
        self.assertFileEqual(b'foo\n', 'a')
933
1033
 
934
1034
    def test_rename_missing_dir(self):
935
1035
        os.mkdir('a')
936
1036
        try:
937
1037
            osutils._win32_rename('b', 'a')
938
 
        except (IOError, OSError), e:
 
1038
        except (IOError, OSError) as e:
939
1039
            self.assertEqual(errno.ENOENT, e.errno)
940
1040
 
941
1041
    def test_rename_current_dir(self):
947
1047
        # doesn't exist.
948
1048
        try:
949
1049
            osutils._win32_rename('b', '.')
950
 
        except (IOError, OSError), e:
 
1050
        except (IOError, OSError) as e:
951
1051
            self.assertEqual(errno.ENOENT, e.errno)
952
1052
 
953
1053
    def test_splitpath(self):
958
1058
        check(['a', 'b'], 'a/b')
959
1059
        check(['a', 'b'], 'a/./b')
960
1060
        check(['a', '.b'], 'a/.b')
961
 
        check(['a', '.b'], 'a\\.b')
 
1061
        if os.path.sep == '\\':
 
1062
            check(['a', '.b'], 'a\\.b')
 
1063
        else:
 
1064
            check(['a\\.b'], 'a\\.b')
962
1065
 
963
1066
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
964
1067
 
976
1079
    """Test mac special functions that require directories."""
977
1080
 
978
1081
    def test_getcwd(self):
979
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1082
        self.requireFeature(features.UnicodeFilenameFeature)
980
1083
        os.mkdir(u'B\xe5gfors')
981
1084
        os.chdir(u'B\xe5gfors')
982
1085
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
983
1086
 
984
1087
    def test_getcwd_nonnorm(self):
985
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1088
        self.requireFeature(features.UnicodeFilenameFeature)
986
1089
        # Test that _mac_getcwd() will normalize this path
987
1090
        os.mkdir(u'Ba\u030agfors')
988
1091
        os.chdir(u'Ba\u030agfors')
992
1095
class TestChunksToLines(tests.TestCase):
993
1096
 
994
1097
    def test_smoketest(self):
995
 
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
996
 
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
997
 
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
998
 
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
 
1098
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
 
1099
                         osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
 
1100
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
 
1101
                         osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
999
1102
 
1000
1103
    def test_osutils_binding(self):
1001
 
        from bzrlib.tests import test__chunks_to_lines
 
1104
        from . import test__chunks_to_lines
1002
1105
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
1003
 
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
 
1106
            from .._chunks_to_lines_pyx import chunks_to_lines
1004
1107
        else:
1005
 
            from bzrlib._chunks_to_lines_py import chunks_to_lines
 
1108
            from .._chunks_to_lines_py import chunks_to_lines
1006
1109
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1007
1110
 
1008
1111
 
1015
1118
                         osutils.split_lines(u'foo\nbar\xae\n'))
1016
1119
 
1017
1120
    def test_split_with_carriage_returns(self):
1018
 
        self.assertEqual(['foo\rbar\n'],
1019
 
                         osutils.split_lines('foo\rbar\n'))
 
1121
        self.assertEqual([b'foo\rbar\n'],
 
1122
                         osutils.split_lines(b'foo\rbar\n'))
1020
1123
 
1021
1124
 
1022
1125
class TestWalkDirs(tests.TestCaseInTempDir):
1037
1140
            ]
1038
1141
        self.build_tree(tree)
1039
1142
        expected_dirblocks = [
1040
 
                (('', '.'),
1041
 
                 [('0file', '0file', 'file'),
1042
 
                  ('1dir', '1dir', 'directory'),
1043
 
                  ('2file', '2file', 'file'),
1044
 
                 ]
1045
 
                ),
1046
 
                (('1dir', './1dir'),
1047
 
                 [('1dir/0file', '0file', 'file'),
1048
 
                  ('1dir/1dir', '1dir', 'directory'),
1049
 
                 ]
1050
 
                ),
1051
 
                (('1dir/1dir', './1dir/1dir'),
1052
 
                 [
1053
 
                 ]
1054
 
                ),
 
1143
            (('', '.'),
 
1144
             [('0file', '0file', 'file'),
 
1145
              ('1dir', '1dir', 'directory'),
 
1146
              ('2file', '2file', 'file'),
 
1147
              ]
 
1148
             ),
 
1149
            (('1dir', './1dir'),
 
1150
             [('1dir/0file', '0file', 'file'),
 
1151
              ('1dir/1dir', '1dir', 'directory'),
 
1152
              ]
 
1153
             ),
 
1154
            (('1dir/1dir', './1dir/1dir'),
 
1155
             [
 
1156
                ]
 
1157
             ),
1055
1158
            ]
1056
1159
        result = []
1057
1160
        found_bzrdir = False
1071
1174
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
1072
1175
 
1073
1176
    def test_walkdirs_os_error(self):
1074
 
        # <https://bugs.edge.launchpad.net/bzr/+bug/338653>
 
1177
        # <https://bugs.launchpad.net/bzr/+bug/338653>
1075
1178
        # Pyrex readdir didn't raise useful messages if it had an error
1076
1179
        # reading the directory
1077
1180
        if sys.platform == 'win32':
1078
1181
            raise tests.TestNotApplicable(
1079
1182
                "readdir IOError not tested on win32")
 
1183
        self.requireFeature(features.not_running_as_root)
1080
1184
        os.mkdir("test-unreadable")
1081
1185
        os.chmod("test-unreadable", 0000)
1082
1186
        # must chmod it back so that it can be removed
1083
 
        self.addCleanup(os.chmod, "test-unreadable", 0700)
 
1187
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
1084
1188
        # The error is not raised until the generator is actually evaluated.
1085
1189
        # (It would be ok if it happened earlier but at the moment it
1086
1190
        # doesn't.)
1087
1191
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1088
 
        self.assertEquals('./test-unreadable', e.filename)
1089
 
        self.assertEquals(errno.EACCES, e.errno)
 
1192
        self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
 
1193
        self.assertEqual(errno.EACCES, e.errno)
1090
1194
        # Ensure the message contains the file name
1091
 
        self.assertContainsRe(str(e), "\./test-unreadable")
 
1195
        self.assertContainsRe(str(e), "\\./test-unreadable")
 
1196
 
 
1197
    def test_walkdirs_encoding_error(self):
 
1198
        # <https://bugs.launchpad.net/bzr/+bug/488519>
 
1199
        # walkdirs didn't raise a useful message when the filenames
 
1200
        # are not using the filesystem's encoding
 
1201
 
 
1202
        # require a bytestring based filesystem
 
1203
        self.requireFeature(features.ByteStringNamedFilesystem)
 
1204
 
 
1205
        tree = [
 
1206
            '.bzr',
 
1207
            '0file',
 
1208
            '1dir/',
 
1209
            '1dir/0file',
 
1210
            '1dir/1dir/',
 
1211
            '1file'
 
1212
            ]
 
1213
 
 
1214
        self.build_tree(tree)
 
1215
 
 
1216
        # rename the 1file to a latin-1 filename
 
1217
        os.rename(b"./1file", b"\xe8file")
 
1218
        if b"\xe8file" not in os.listdir("."):
 
1219
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
 
1220
 
 
1221
        self._save_platform_info()
 
1222
        osutils._fs_enc = 'UTF-8'
 
1223
 
 
1224
        # this should raise on error
 
1225
        def attempt():
 
1226
            for dirdetail, dirblock in osutils.walkdirs(b'.'):
 
1227
                pass
 
1228
 
 
1229
        self.assertRaises(errors.BadFilenameEncoding, attempt)
1092
1230
 
1093
1231
    def test__walkdirs_utf8(self):
1094
1232
        tree = [
1101
1239
            ]
1102
1240
        self.build_tree(tree)
1103
1241
        expected_dirblocks = [
1104
 
                (('', '.'),
1105
 
                 [('0file', '0file', 'file'),
1106
 
                  ('1dir', '1dir', 'directory'),
1107
 
                  ('2file', '2file', 'file'),
1108
 
                 ]
1109
 
                ),
1110
 
                (('1dir', './1dir'),
1111
 
                 [('1dir/0file', '0file', 'file'),
1112
 
                  ('1dir/1dir', '1dir', 'directory'),
1113
 
                 ]
1114
 
                ),
1115
 
                (('1dir/1dir', './1dir/1dir'),
1116
 
                 [
1117
 
                 ]
1118
 
                ),
 
1242
            (('', '.'),
 
1243
             [('0file', '0file', 'file'),
 
1244
              ('1dir', '1dir', 'directory'),
 
1245
              ('2file', '2file', 'file'),
 
1246
              ]
 
1247
             ),
 
1248
            (('1dir', './1dir'),
 
1249
             [('1dir/0file', '0file', 'file'),
 
1250
              ('1dir/1dir', '1dir', 'directory'),
 
1251
              ]
 
1252
             ),
 
1253
            (('1dir/1dir', './1dir/1dir'),
 
1254
             [
 
1255
                ]
 
1256
             ),
1119
1257
            ]
1120
1258
        result = []
1121
1259
        found_bzrdir = False
1122
 
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1123
 
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
1260
        for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
 
1261
            if len(dirblock) and dirblock[0][1] == b'.bzr':
1124
1262
                # this tests the filtering of selected paths
1125
1263
                found_bzrdir = True
1126
1264
                del dirblock[0]
 
1265
            dirdetail = (dirdetail[0].decode('utf-8'),
 
1266
                         osutils.safe_unicode(dirdetail[1]))
 
1267
            dirblock = [
 
1268
                (entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
 
1269
                for entry in dirblock]
1127
1270
            result.append((dirdetail, dirblock))
1128
1271
 
1129
1272
        self.assertTrue(found_bzrdir)
1145
1288
            dirblock[:] = new_dirblock
1146
1289
 
1147
1290
    def _save_platform_info(self):
1148
 
        self.overrideAttr(win32utils, 'winver')
1149
1291
        self.overrideAttr(osutils, '_fs_enc')
1150
1292
        self.overrideAttr(osutils, '_selected_dir_reader')
1151
1293
 
1152
 
    def assertDirReaderIs(self, expected):
 
1294
    def assertDirReaderIs(self, expected, top):
1153
1295
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
1154
1296
        # Force it to redetect
1155
1297
        osutils._selected_dir_reader = None
1156
1298
        # Nothing to list, but should still trigger the selection logic
1157
 
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
 
1299
        self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
1158
1300
        self.assertIsInstance(osutils._selected_dir_reader, expected)
1159
1301
 
1160
1302
    def test_force_walkdirs_utf8_fs_utf8(self):
1161
1303
        self.requireFeature(UTF8DirReaderFeature)
1162
1304
        self._save_platform_info()
1163
 
        win32utils.winver = None # Avoid the win32 detection code
1164
 
        osutils._fs_enc = 'UTF-8'
1165
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1305
        osutils._fs_enc = 'utf-8'
 
1306
        self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
1166
1307
 
1167
1308
    def test_force_walkdirs_utf8_fs_ascii(self):
1168
1309
        self.requireFeature(UTF8DirReaderFeature)
1169
1310
        self._save_platform_info()
1170
 
        win32utils.winver = None # Avoid the win32 detection code
1171
 
        osutils._fs_enc = 'US-ASCII'
1172
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1173
 
 
1174
 
    def test_force_walkdirs_utf8_fs_ANSI(self):
1175
 
        self.requireFeature(UTF8DirReaderFeature)
1176
 
        self._save_platform_info()
1177
 
        win32utils.winver = None # Avoid the win32 detection code
1178
 
        osutils._fs_enc = 'ANSI_X3.4-1968'
1179
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1311
        osutils._fs_enc = 'ascii'
 
1312
        self.assertDirReaderIs(
 
1313
            UTF8DirReaderFeature.module.UTF8DirReader, b".")
1180
1314
 
1181
1315
    def test_force_walkdirs_utf8_fs_latin1(self):
1182
1316
        self._save_platform_info()
1183
 
        win32utils.winver = None # Avoid the win32 detection code
1184
 
        osutils._fs_enc = 'latin1'
1185
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
1317
        osutils._fs_enc = 'iso-8859-1'
 
1318
        self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
1186
1319
 
1187
1320
    def test_force_walkdirs_utf8_nt(self):
1188
1321
        # Disabled because the thunk of the whole walkdirs api is disabled.
1189
1322
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1190
1323
        self._save_platform_info()
1191
 
        win32utils.winver = 'Windows NT'
1192
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
1193
 
        self.assertDirReaderIs(Win32ReadDir)
1194
 
 
1195
 
    def test_force_walkdirs_utf8_98(self):
1196
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1197
 
        self._save_platform_info()
1198
 
        win32utils.winver = 'Windows 98'
1199
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
1324
        from .._walkdirs_win32 import Win32ReadDir
 
1325
        self.assertDirReaderIs(Win32ReadDir, ".")
1200
1326
 
1201
1327
    def test_unicode_walkdirs(self):
1202
1328
        """Walkdirs should always return unicode paths."""
1203
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1329
        self.requireFeature(features.UnicodeFilenameFeature)
1204
1330
        name0 = u'0file-\xb6'
1205
1331
        name1 = u'1dir-\u062c\u0648'
1206
1332
        name2 = u'2file-\u0633'
1213
1339
            ]
1214
1340
        self.build_tree(tree)
1215
1341
        expected_dirblocks = [
1216
 
                ((u'', u'.'),
1217
 
                 [(name0, name0, 'file', './' + name0),
1218
 
                  (name1, name1, 'directory', './' + name1),
1219
 
                  (name2, name2, 'file', './' + name2),
1220
 
                 ]
1221
 
                ),
1222
 
                ((name1, './' + name1),
1223
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1
1224
 
                                                        + '/' + name0),
1225
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1
1226
 
                                                            + '/' + name1),
1227
 
                 ]
1228
 
                ),
1229
 
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
1230
 
                 [
1231
 
                 ]
1232
 
                ),
 
1342
            ((u'', u'.'),
 
1343
             [(name0, name0, 'file', './' + name0),
 
1344
              (name1, name1, 'directory', './' + name1),
 
1345
              (name2, name2, 'file', './' + name2),
 
1346
              ]
 
1347
             ),
 
1348
            ((name1, './' + name1),
 
1349
             [(name1 + '/' + name0, name0, 'file', './' + name1
 
1350
               + '/' + name0),
 
1351
              (name1 + '/' + name1, name1, 'directory', './' + name1
 
1352
               + '/' + name1),
 
1353
              ]
 
1354
             ),
 
1355
            ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1356
             [
 
1357
                ]
 
1358
             ),
1233
1359
            ]
1234
1360
        result = list(osutils.walkdirs('.'))
1235
1361
        self._filter_out_stat(result)
1236
1362
        self.assertEqual(expected_dirblocks, result)
1237
 
        result = list(osutils.walkdirs(u'./'+name1, name1))
 
1363
        result = list(osutils.walkdirs(u'./' + name1, name1))
1238
1364
        self._filter_out_stat(result)
1239
1365
        self.assertEqual(expected_dirblocks[1:], result)
1240
1366
 
1243
1369
 
1244
1370
        The abspath portion might be in unicode or utf-8
1245
1371
        """
1246
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1372
        self.requireFeature(features.UnicodeFilenameFeature)
1247
1373
        name0 = u'0file-\xb6'
1248
1374
        name1 = u'1dir-\u062c\u0648'
1249
1375
        name2 = u'2file-\u0633'
1260
1386
        name2 = name2.encode('utf8')
1261
1387
 
1262
1388
        expected_dirblocks = [
1263
 
                (('', '.'),
1264
 
                 [(name0, name0, 'file', './' + name0),
1265
 
                  (name1, name1, 'directory', './' + name1),
1266
 
                  (name2, name2, 'file', './' + name2),
1267
 
                 ]
1268
 
                ),
1269
 
                ((name1, './' + name1),
1270
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1
1271
 
                                                        + '/' + name0),
1272
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1
1273
 
                                                            + '/' + name1),
1274
 
                 ]
1275
 
                ),
1276
 
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
1277
 
                 [
1278
 
                 ]
1279
 
                ),
 
1389
            ((b'', b'.'),
 
1390
             [(name0, name0, 'file', b'./' + name0),
 
1391
              (name1, name1, 'directory', b'./' + name1),
 
1392
              (name2, name2, 'file', b'./' + name2),
 
1393
              ]
 
1394
             ),
 
1395
            ((name1, b'./' + name1),
 
1396
             [(name1 + b'/' + name0, name0, 'file', b'./' + name1
 
1397
               + b'/' + name0),
 
1398
              (name1 + b'/' + name1, name1, 'directory', b'./' + name1
 
1399
               + b'/' + name1),
 
1400
              ]
 
1401
             ),
 
1402
            ((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
 
1403
             [
 
1404
                ]
 
1405
             ),
1280
1406
            ]
1281
1407
        result = []
1282
1408
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1283
1409
        # all abspaths are Unicode, and encode them back into utf8.
1284
1410
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1285
 
            self.assertIsInstance(dirdetail[0], str)
1286
 
            if isinstance(dirdetail[1], unicode):
 
1411
            self.assertIsInstance(dirdetail[0], bytes)
 
1412
            if isinstance(dirdetail[1], text_type):
1287
1413
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1288
1414
                dirblock = [list(info) for info in dirblock]
1289
1415
                for info in dirblock:
1290
 
                    self.assertIsInstance(info[4], unicode)
 
1416
                    self.assertIsInstance(info[4], text_type)
1291
1417
                    info[4] = info[4].encode('utf8')
1292
1418
            new_dirblock = []
1293
1419
            for info in dirblock:
1294
 
                self.assertIsInstance(info[0], str)
1295
 
                self.assertIsInstance(info[1], str)
1296
 
                self.assertIsInstance(info[4], str)
 
1420
                self.assertIsInstance(info[0], bytes)
 
1421
                self.assertIsInstance(info[1], bytes)
 
1422
                self.assertIsInstance(info[4], bytes)
1297
1423
                # Remove the stat information
1298
1424
                new_dirblock.append((info[0], info[1], info[2], info[4]))
1299
1425
            result.append((dirdetail, new_dirblock))
1304
1430
 
1305
1431
        The abspath portion should be in unicode
1306
1432
        """
1307
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1433
        self.requireFeature(features.UnicodeFilenameFeature)
1308
1434
        # Use the unicode reader. TODO: split into driver-and-driven unit
1309
1435
        # tests.
1310
1436
        self._save_platform_info()
1327
1453
        # All of the abspaths should be in unicode, all of the relative paths
1328
1454
        # should be in utf8
1329
1455
        expected_dirblocks = [
1330
 
                (('', '.'),
1331
 
                 [(name0, name0, 'file', './' + name0u),
1332
 
                  (name1, name1, 'directory', './' + name1u),
1333
 
                  (name2, name2, 'file', './' + name2u),
1334
 
                 ]
1335
 
                ),
1336
 
                ((name1, './' + name1u),
1337
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
1338
 
                                                        + '/' + name0u),
1339
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
1340
 
                                                            + '/' + name1u),
1341
 
                 ]
1342
 
                ),
1343
 
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1344
 
                 [
1345
 
                 ]
1346
 
                ),
 
1456
            ((b'', '.'),
 
1457
             [(name0, name0, 'file', './' + name0u),
 
1458
              (name1, name1, 'directory', './' + name1u),
 
1459
              (name2, name2, 'file', './' + name2u),
 
1460
              ]
 
1461
             ),
 
1462
            ((name1, './' + name1u),
 
1463
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
 
1464
               + '/' + name0u),
 
1465
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
 
1466
               + '/' + name1u),
 
1467
              ]
 
1468
             ),
 
1469
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
 
1470
             [
 
1471
                ]
 
1472
             ),
1347
1473
            ]
1348
1474
        result = list(osutils._walkdirs_utf8('.'))
1349
1475
        self._filter_out_stat(result)
1351
1477
 
1352
1478
    def test__walkdirs_utf8_win32readdir(self):
1353
1479
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1354
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1355
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1480
        self.requireFeature(features.UnicodeFilenameFeature)
 
1481
        from .._walkdirs_win32 import Win32ReadDir
1356
1482
        self._save_platform_info()
1357
1483
        osutils._selected_dir_reader = Win32ReadDir()
1358
1484
        name0u = u'0file-\xb6'
1373
1499
        # All of the abspaths should be in unicode, all of the relative paths
1374
1500
        # should be in utf8
1375
1501
        expected_dirblocks = [
1376
 
                (('', '.'),
1377
 
                 [(name0, name0, 'file', './' + name0u),
1378
 
                  (name1, name1, 'directory', './' + name1u),
1379
 
                  (name2, name2, 'file', './' + name2u),
1380
 
                 ]
1381
 
                ),
1382
 
                ((name1, './' + name1u),
1383
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
1384
 
                                                        + '/' + name0u),
1385
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
1386
 
                                                            + '/' + name1u),
1387
 
                 ]
1388
 
                ),
1389
 
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1390
 
                 [
1391
 
                 ]
1392
 
                ),
 
1502
            (('', '.'),
 
1503
             [(name0, name0, 'file', './' + name0u),
 
1504
              (name1, name1, 'directory', './' + name1u),
 
1505
              (name2, name2, 'file', './' + name2u),
 
1506
              ]
 
1507
             ),
 
1508
            ((name1, './' + name1u),
 
1509
             [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1510
               + '/' + name0u),
 
1511
              (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1512
               + '/' + name1u),
 
1513
              ]
 
1514
             ),
 
1515
            ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1516
             [
 
1517
                ]
 
1518
             ),
1393
1519
            ]
1394
1520
        result = list(osutils._walkdirs_utf8(u'.'))
1395
1521
        self._filter_out_stat(result)
1408
1534
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1409
1535
        """make sure our Stat values are valid"""
1410
1536
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1411
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1412
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1537
        self.requireFeature(features.UnicodeFilenameFeature)
 
1538
        from .._walkdirs_win32 import Win32ReadDir
1413
1539
        name0u = u'0file-\xb6'
1414
1540
        name0 = name0u.encode('utf8')
1415
1541
        self.build_tree([name0u])
1416
1542
        # I hate to sleep() here, but I'm trying to make the ctime different
1417
1543
        # from the mtime
1418
1544
        time.sleep(2)
1419
 
        f = open(name0u, 'ab')
1420
 
        try:
1421
 
            f.write('just a small update')
1422
 
        finally:
1423
 
            f.close()
 
1545
        with open(name0u, 'ab') as f:
 
1546
            f.write(b'just a small update')
1424
1547
 
1425
1548
        result = Win32ReadDir().read_dir('', u'.')
1426
1549
        entry = result[0]
1432
1555
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1433
1556
        """make sure our Stat values are valid"""
1434
1557
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1435
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1436
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1558
        self.requireFeature(features.UnicodeFilenameFeature)
 
1559
        from .._walkdirs_win32 import Win32ReadDir
1437
1560
        name0u = u'0dir-\u062c\u0648'
1438
1561
        name0 = name0u.encode('utf8')
1439
1562
        self.build_tree([name0u + '/'])
1519
1642
        # using the comparison routine shoudl work too:
1520
1643
        self.assertEqual(
1521
1644
            dir_sorted_paths,
1522
 
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
 
1645
            sorted(original_paths, key=osutils.path_prefix_key))
1523
1646
 
1524
1647
 
1525
1648
class TestCopyTree(tests.TestCaseInTempDir):
1538
1661
        self.assertEqual(['c'], os.listdir('target/b'))
1539
1662
 
1540
1663
    def test_copy_tree_symlinks(self):
1541
 
        self.requireFeature(tests.SymlinkFeature)
 
1664
        self.requireFeature(features.SymlinkFeature)
1542
1665
        self.build_tree(['source/'])
1543
1666
        os.symlink('a/generic/path', 'source/lnk')
1544
1667
        osutils.copy_tree('source', 'target')
1548
1671
    def test_copy_tree_handlers(self):
1549
1672
        processed_files = []
1550
1673
        processed_links = []
 
1674
 
1551
1675
        def file_handler(from_path, to_path):
1552
1676
            processed_files.append(('f', from_path, to_path))
 
1677
 
1553
1678
        def dir_handler(from_path, to_path):
1554
1679
            processed_files.append(('d', from_path, to_path))
 
1680
 
1555
1681
        def link_handler(from_path, to_path):
1556
1682
            processed_links.append((from_path, to_path))
1557
 
        handlers = {'file':file_handler,
1558
 
                    'directory':dir_handler,
1559
 
                    'symlink':link_handler,
1560
 
                   }
 
1683
        handlers = {'file': file_handler,
 
1684
                    'directory': dir_handler,
 
1685
                    'symlink': link_handler,
 
1686
                    }
1561
1687
 
1562
1688
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1563
1689
        if osutils.has_symlinks():
1568
1694
                          ('f', 'source/a', 'target/a'),
1569
1695
                          ('d', 'source/b', 'target/b'),
1570
1696
                          ('f', 'source/b/c', 'target/b/c'),
1571
 
                         ], processed_files)
1572
 
        self.failIfExists('target')
 
1697
                          ], processed_files)
 
1698
        self.assertPathDoesNotExist('target')
1573
1699
        if osutils.has_symlinks():
1574
1700
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1575
1701
 
1580
1706
    def setUp(self):
1581
1707
        super(TestSetUnsetEnv, self).setUp()
1582
1708
 
1583
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
 
1709
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
1584
1710
                         'Environment was not cleaned up properly.'
1585
 
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
 
1711
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
 
1712
 
1586
1713
        def cleanup():
1587
 
            if 'BZR_TEST_ENV_VAR' in os.environ:
1588
 
                del os.environ['BZR_TEST_ENV_VAR']
 
1714
            if 'BRZ_TEST_ENV_VAR' in os.environ:
 
1715
                del os.environ['BRZ_TEST_ENV_VAR']
1589
1716
        self.addCleanup(cleanup)
1590
1717
 
1591
1718
    def test_set(self):
1592
1719
        """Test that we can set an env variable"""
1593
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1720
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1594
1721
        self.assertEqual(None, old)
1595
 
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
 
1722
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
1596
1723
 
1597
1724
    def test_double_set(self):
1598
1725
        """Test that we get the old value out"""
1599
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1600
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
 
1726
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1727
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
1601
1728
        self.assertEqual('foo', old)
1602
 
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
 
1729
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
1603
1730
 
1604
1731
    def test_unicode(self):
1605
1732
        """Environment can only contain plain strings
1612
1739
                'Cannot find a unicode character that works in encoding %s'
1613
1740
                % (osutils.get_user_encoding(),))
1614
1741
 
1615
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1616
 
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
 
1742
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
 
1743
        if PY3:
 
1744
            self.assertEqual(uni_val, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1745
        else:
 
1746
            self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1617
1747
 
1618
1748
    def test_unset(self):
1619
1749
        """Test that passing None will remove the env var"""
1620
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1621
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
 
1750
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1751
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
1622
1752
        self.assertEqual('foo', old)
1623
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1624
 
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1753
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1754
        self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
1625
1755
 
1626
1756
 
1627
1757
class TestSizeShaFile(tests.TestCaseInTempDir):
1628
1758
 
1629
1759
    def test_sha_empty(self):
1630
 
        self.build_tree_contents([('foo', '')])
1631
 
        expected_sha = osutils.sha_string('')
 
1760
        self.build_tree_contents([('foo', b'')])
 
1761
        expected_sha = osutils.sha_string(b'')
1632
1762
        f = open('foo')
1633
1763
        self.addCleanup(f.close)
1634
1764
        size, sha = osutils.size_sha_file(f)
1636
1766
        self.assertEqual(expected_sha, sha)
1637
1767
 
1638
1768
    def test_sha_mixed_endings(self):
1639
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1769
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
1640
1770
        self.build_tree_contents([('foo', text)])
1641
1771
        expected_sha = osutils.sha_string(text)
1642
1772
        f = open('foo', 'rb')
1649
1779
class TestShaFileByName(tests.TestCaseInTempDir):
1650
1780
 
1651
1781
    def test_sha_empty(self):
1652
 
        self.build_tree_contents([('foo', '')])
1653
 
        expected_sha = osutils.sha_string('')
 
1782
        self.build_tree_contents([('foo', b'')])
 
1783
        expected_sha = osutils.sha_string(b'')
1654
1784
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1655
1785
 
1656
1786
    def test_sha_mixed_endings(self):
1657
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1787
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
1658
1788
        self.build_tree_contents([('foo', text)])
1659
1789
        expected_sha = osutils.sha_string(text)
1660
1790
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1663
1793
class TestResourceLoading(tests.TestCaseInTempDir):
1664
1794
 
1665
1795
    def test_resource_string(self):
1666
 
        # test resource in bzrlib
1667
 
        text = osutils.resource_string('bzrlib', 'debug.py')
 
1796
        # test resource in breezy
 
1797
        text = osutils.resource_string('breezy', 'debug.py')
1668
1798
        self.assertContainsRe(text, "debug_flags = set()")
1669
 
        # test resource under bzrlib
1670
 
        text = osutils.resource_string('bzrlib.ui', 'text.py')
 
1799
        # test resource under breezy
 
1800
        text = osutils.resource_string('breezy.ui', 'text.py')
1671
1801
        self.assertContainsRe(text, "class TextUIFactory")
1672
1802
        # test unsupported package
1673
1803
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1674
 
            'yyy.xx')
 
1804
                          'yyy.xx')
1675
1805
        # test unknown resource
1676
 
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1677
 
 
1678
 
 
1679
 
class TestReCompile(tests.TestCase):
1680
 
 
1681
 
    def test_re_compile_checked(self):
1682
 
        r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1683
 
        self.assertTrue(r.match('aaaa'))
1684
 
        self.assertTrue(r.match('aAaA'))
1685
 
 
1686
 
    def test_re_compile_checked_error(self):
1687
 
        # like https://bugs.launchpad.net/bzr/+bug/251352
1688
 
        err = self.assertRaises(
1689
 
            errors.BzrCommandError,
1690
 
            osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1691
 
        self.assertEqual(
1692
 
            "Invalid regular expression in test case: '*': "
1693
 
            "nothing to repeat",
1694
 
            str(err))
 
1806
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
1695
1807
 
1696
1808
 
1697
1809
class TestDirReader(tests.TestCaseInTempDir):
1698
1810
 
 
1811
    scenarios = dir_reader_scenarios()
 
1812
 
1699
1813
    # Set by load_tests
1700
1814
    _dir_reader_class = None
1701
1815
    _native_to_unicode = None
1702
1816
 
1703
1817
    def setUp(self):
1704
 
        tests.TestCaseInTempDir.setUp(self)
 
1818
        super(TestDirReader, self).setUp()
1705
1819
        self.overrideAttr(osutils,
1706
1820
                          '_selected_dir_reader', self._dir_reader_class())
1707
1821
 
1714
1828
            '2file'
1715
1829
            ]
1716
1830
        expected_dirblocks = [
1717
 
                (('', '.'),
1718
 
                 [('0file', '0file', 'file'),
1719
 
                  ('1dir', '1dir', 'directory'),
1720
 
                  ('2file', '2file', 'file'),
1721
 
                 ]
1722
 
                ),
1723
 
                (('1dir', './1dir'),
1724
 
                 [('1dir/0file', '0file', 'file'),
1725
 
                  ('1dir/1dir', '1dir', 'directory'),
1726
 
                 ]
1727
 
                ),
1728
 
                (('1dir/1dir', './1dir/1dir'),
1729
 
                 [
1730
 
                 ]
1731
 
                ),
 
1831
            ((b'', '.'),
 
1832
             [(b'0file', b'0file', 'file', './0file'),
 
1833
              (b'1dir', b'1dir', 'directory', './1dir'),
 
1834
              (b'2file', b'2file', 'file', './2file'),
 
1835
              ]
 
1836
             ),
 
1837
            ((b'1dir', './1dir'),
 
1838
             [(b'1dir/0file', b'0file', 'file', './1dir/0file'),
 
1839
              (b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
 
1840
              ]
 
1841
             ),
 
1842
            ((b'1dir/1dir', './1dir/1dir'),
 
1843
             [
 
1844
                ]
 
1845
             ),
1732
1846
            ]
1733
1847
        return tree, expected_dirblocks
1734
1848
 
1738
1852
        result = list(osutils._walkdirs_utf8('.'))
1739
1853
        # Filter out stat and abspath
1740
1854
        self.assertEqual(expected_dirblocks,
1741
 
                         [(dirinfo, [line[0:3] for line in block])
1742
 
                          for dirinfo, block in result])
 
1855
                         self._filter_out(result))
1743
1856
 
1744
1857
    def test_walk_sub_dir(self):
1745
1858
        tree, expected_dirblocks = self._get_ascii_tree()
1746
1859
        self.build_tree(tree)
1747
1860
        # you can search a subdir only, with a supplied prefix.
1748
 
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
 
1861
        result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
1749
1862
        # Filter out stat and abspath
1750
1863
        self.assertEqual(expected_dirblocks[1:],
1751
 
                         [(dirinfo, [line[0:3] for line in block])
1752
 
                          for dirinfo, block in result])
 
1864
                         self._filter_out(result))
1753
1865
 
1754
1866
    def _get_unicode_tree(self):
1755
1867
        name0u = u'0file-\xb6'
1766
1878
        name1 = name1u.encode('UTF-8')
1767
1879
        name2 = name2u.encode('UTF-8')
1768
1880
        expected_dirblocks = [
1769
 
                (('', '.'),
1770
 
                 [(name0, name0, 'file', './' + name0u),
1771
 
                  (name1, name1, 'directory', './' + name1u),
1772
 
                  (name2, name2, 'file', './' + name2u),
1773
 
                 ]
1774
 
                ),
1775
 
                ((name1, './' + name1u),
1776
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
1777
 
                                                        + '/' + name0u),
1778
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
1779
 
                                                            + '/' + name1u),
1780
 
                 ]
1781
 
                ),
1782
 
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1783
 
                 [
1784
 
                 ]
1785
 
                ),
 
1881
            ((b'', '.'),
 
1882
             [(name0, name0, 'file', './' + name0u),
 
1883
              (name1, name1, 'directory', './' + name1u),
 
1884
              (name2, name2, 'file', './' + name2u),
 
1885
              ]
 
1886
             ),
 
1887
            ((name1, './' + name1u),
 
1888
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
 
1889
               + '/' + name0u),
 
1890
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
 
1891
               + '/' + name1u),
 
1892
              ]
 
1893
             ),
 
1894
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
 
1895
             [
 
1896
                ]
 
1897
             ),
1786
1898
            ]
1787
1899
        return tree, expected_dirblocks
1788
1900
 
1796
1908
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1797
1909
            details = []
1798
1910
            for line in block:
1799
 
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
 
1911
                details.append(
 
1912
                    line[0:3] + (self._native_to_unicode(line[4]), ))
1800
1913
            filtered_dirblocks.append((dirinfo, details))
1801
1914
        return filtered_dirblocks
1802
1915
 
1803
1916
    def test_walk_unicode_tree(self):
1804
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1917
        self.requireFeature(features.UnicodeFilenameFeature)
1805
1918
        tree, expected_dirblocks = self._get_unicode_tree()
1806
1919
        self.build_tree(tree)
1807
1920
        result = list(osutils._walkdirs_utf8('.'))
1808
1921
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1809
1922
 
1810
1923
    def test_symlink(self):
1811
 
        self.requireFeature(tests.SymlinkFeature)
1812
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1924
        self.requireFeature(features.SymlinkFeature)
 
1925
        self.requireFeature(features.UnicodeFilenameFeature)
1813
1926
        target = u'target\N{Euro Sign}'
1814
1927
        link_name = u'l\N{Euro Sign}nk'
1815
1928
        os.symlink(target, link_name)
1816
 
        target_utf8 = target.encode('UTF-8')
1817
1929
        link_name_utf8 = link_name.encode('UTF-8')
1818
1930
        expected_dirblocks = [
1819
 
                (('', '.'),
1820
 
                 [(link_name_utf8, link_name_utf8,
1821
 
                   'symlink', './' + link_name),],
1822
 
                 )]
 
1931
            ((b'', '.'),
 
1932
             [(link_name_utf8, link_name_utf8,
 
1933
               'symlink', './' + link_name), ],
 
1934
             )]
1823
1935
        result = list(osutils._walkdirs_utf8('.'))
1824
1936
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1825
1937
 
1833
1945
    But prior python versions failed to properly encode the passed unicode
1834
1946
    string.
1835
1947
    """
1836
 
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
 
1948
    _test_needs_features = [features.SymlinkFeature,
 
1949
                            features.UnicodeFilenameFeature]
1837
1950
 
1838
1951
    def setUp(self):
1839
1952
        super(tests.TestCaseInTempDir, self).setUp()
1842
1955
        os.symlink(self.target, self.link)
1843
1956
 
1844
1957
    def test_os_readlink_link_encoding(self):
1845
 
        if sys.version_info < (2, 6):
1846
 
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1847
 
        else:
1848
 
            self.assertEquals(self.target,  os.readlink(self.link))
 
1958
        self.assertEqual(self.target, os.readlink(self.link))
1849
1959
 
1850
1960
    def test_os_readlink_link_decoding(self):
1851
 
        self.assertEquals(self.target.encode(osutils._fs_enc),
1852
 
                          os.readlink(self.link.encode(osutils._fs_enc)))
 
1961
        self.assertEqual(self.target.encode(osutils._fs_enc),
 
1962
                         os.readlink(self.link.encode(osutils._fs_enc)))
1853
1963
 
1854
1964
 
1855
1965
class TestConcurrency(tests.TestCase):
1863
1973
        self.assertIsInstance(concurrency, int)
1864
1974
 
1865
1975
    def test_local_concurrency_environment_variable(self):
1866
 
        os.environ['BZR_CONCURRENCY'] = '2'
 
1976
        self.overrideEnv('BRZ_CONCURRENCY', '2')
1867
1977
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1868
 
        os.environ['BZR_CONCURRENCY'] = '3'
 
1978
        self.overrideEnv('BRZ_CONCURRENCY', '3')
1869
1979
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1870
 
        os.environ['BZR_CONCURRENCY'] = 'foo'
 
1980
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
1871
1981
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1872
1982
 
1873
1983
    def test_option_concurrency(self):
1874
 
        os.environ['BZR_CONCURRENCY'] = '1'
 
1984
        self.overrideEnv('BRZ_CONCURRENCY', '1')
1875
1985
        self.run_bzr('rocks --concurrency 42')
1876
 
        # Command line overrides envrionment variable
1877
 
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1878
 
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
 
1986
        # Command line overrides environment variable
 
1987
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
 
1988
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
1879
1989
 
1880
1990
 
1881
1991
class TestFailedToLoadExtension(tests.TestCase):
1882
1992
 
1883
1993
    def _try_loading(self):
1884
1994
        try:
1885
 
            import bzrlib._fictional_extension_py
1886
 
        except ImportError, e:
 
1995
            import breezy._fictional_extension_py  # noqa: F401
 
1996
        except ImportError as e:
1887
1997
            osutils.failed_to_load_extension(e)
1888
1998
            return True
1889
1999
 
1894
2004
    def test_failure_to_load(self):
1895
2005
        self._try_loading()
1896
2006
        self.assertLength(1, osutils._extension_load_failures)
1897
 
        self.assertEquals(osutils._extension_load_failures[0],
1898
 
            "No module named _fictional_extension_py")
 
2007
        if PY3:
 
2008
            self.assertEqual(
 
2009
                osutils._extension_load_failures[0],
 
2010
                "No module named 'breezy._fictional_extension_py'")
 
2011
        else:
 
2012
            self.assertEqual(osutils._extension_load_failures[0],
 
2013
                             "No module named _fictional_extension_py")
1899
2014
 
1900
2015
    def test_report_extension_load_failures_no_warning(self):
1901
2016
        self.assertTrue(self._try_loading())
1902
 
        warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
 
2017
        warnings, result = self.callCatchWarnings(
 
2018
            osutils.report_extension_load_failures)
1903
2019
        # it used to give a Python warning; it no longer does
1904
2020
        self.assertLength(0, warnings)
1905
2021
 
1906
2022
    def test_report_extension_load_failures_message(self):
1907
 
        log = StringIO()
 
2023
        log = BytesIO()
1908
2024
        trace.push_log_file(log)
1909
2025
        self.assertTrue(self._try_loading())
1910
2026
        osutils.report_extension_load_failures()
1911
2027
        self.assertContainsRe(
1912
2028
            log.getvalue(),
1913
 
            r"bzr: warning: some compiled extensions could not be loaded; "
1914
 
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
 
2029
            br"brz: warning: some compiled extensions could not be loaded; "
 
2030
            b"see ``brz help missing-extensions``\n"
1915
2031
            )
1916
2032
 
1917
2033
 
1918
2034
class TestTerminalWidth(tests.TestCase):
1919
2035
 
 
2036
    def setUp(self):
 
2037
        super(TestTerminalWidth, self).setUp()
 
2038
        self._orig_terminal_size_state = osutils._terminal_size_state
 
2039
        self._orig_first_terminal_size = osutils._first_terminal_size
 
2040
        self.addCleanup(self.restore_osutils_globals)
 
2041
        osutils._terminal_size_state = 'no_data'
 
2042
        osutils._first_terminal_size = None
 
2043
 
 
2044
    def restore_osutils_globals(self):
 
2045
        osutils._terminal_size_state = self._orig_terminal_size_state
 
2046
        osutils._first_terminal_size = self._orig_first_terminal_size
 
2047
 
1920
2048
    def replace_stdout(self, new):
1921
2049
        self.overrideAttr(sys, 'stdout', new)
1922
2050
 
1934
2062
    def test_default_values(self):
1935
2063
        self.assertEqual(80, osutils.default_terminal_width)
1936
2064
 
1937
 
    def test_defaults_to_BZR_COLUMNS(self):
1938
 
        # BZR_COLUMNS is set by the test framework
1939
 
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1940
 
        os.environ['BZR_COLUMNS'] = '12'
 
2065
    def test_defaults_to_BRZ_COLUMNS(self):
 
2066
        # BRZ_COLUMNS is set by the test framework
 
2067
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
 
2068
        self.overrideEnv('BRZ_COLUMNS', '12')
1941
2069
        self.assertEqual(12, osutils.terminal_width())
1942
2070
 
 
2071
    def test_BRZ_COLUMNS_0_no_limit(self):
 
2072
        self.overrideEnv('BRZ_COLUMNS', '0')
 
2073
        self.assertEqual(None, osutils.terminal_width())
 
2074
 
1943
2075
    def test_falls_back_to_COLUMNS(self):
1944
 
        del os.environ['BZR_COLUMNS']
 
2076
        self.overrideEnv('BRZ_COLUMNS', None)
1945
2077
        self.assertNotEqual('42', os.environ['COLUMNS'])
1946
2078
        self.set_fake_tty()
1947
 
        os.environ['COLUMNS'] = '42'
 
2079
        self.overrideEnv('COLUMNS', '42')
1948
2080
        self.assertEqual(42, osutils.terminal_width())
1949
2081
 
1950
2082
    def test_tty_default_without_columns(self):
1951
 
        del os.environ['BZR_COLUMNS']
1952
 
        del os.environ['COLUMNS']
 
2083
        self.overrideEnv('BRZ_COLUMNS', None)
 
2084
        self.overrideEnv('COLUMNS', None)
1953
2085
 
1954
2086
        def terminal_size(w, h):
1955
2087
            return 42, 42
1962
2094
        self.assertEqual(42, osutils.terminal_width())
1963
2095
 
1964
2096
    def test_non_tty_default_without_columns(self):
1965
 
        del os.environ['BZR_COLUMNS']
1966
 
        del os.environ['COLUMNS']
 
2097
        self.overrideEnv('BRZ_COLUMNS', None)
 
2098
        self.overrideEnv('COLUMNS', None)
1967
2099
        self.replace_stdout(None)
1968
2100
        self.assertEqual(None, osutils.terminal_width())
1969
2101
 
1972
2104
        termios = term_ios_feature.module
1973
2105
        # bug 63539 is about a termios without TIOCGWINSZ attribute
1974
2106
        try:
1975
 
            orig = termios.TIOCGWINSZ
 
2107
            termios.TIOCGWINSZ
1976
2108
        except AttributeError:
1977
2109
            # We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
1978
2110
            pass
1979
2111
        else:
1980
2112
            self.overrideAttr(termios, 'TIOCGWINSZ')
1981
2113
            del termios.TIOCGWINSZ
1982
 
        del os.environ['BZR_COLUMNS']
1983
 
        del os.environ['COLUMNS']
 
2114
        self.overrideEnv('BRZ_COLUMNS', None)
 
2115
        self.overrideEnv('COLUMNS', None)
1984
2116
        # Whatever the result is, if we don't raise an exception, it's ok.
1985
2117
        osutils.terminal_width()
1986
2118
 
 
2119
 
1987
2120
class TestCreationOps(tests.TestCaseInTempDir):
1988
2121
    _test_needs_features = [features.chown_feature]
1989
2122
 
1990
2123
    def setUp(self):
1991
 
        tests.TestCaseInTempDir.setUp(self)
 
2124
        super(TestCreationOps, self).setUp()
1992
2125
        self.overrideAttr(os, 'chown', self._dummy_chown)
1993
2126
 
1994
2127
        # params set by call to _dummy_chown
2000
2133
    def test_copy_ownership_from_path(self):
2001
2134
        """copy_ownership_from_path test with specified src."""
2002
2135
        ownsrc = '/'
2003
 
        f = open('test_file', 'wt')
 
2136
        open('test_file', 'wt').close()
2004
2137
        osutils.copy_ownership_from_path('test_file', ownsrc)
2005
2138
 
2006
2139
        s = os.stat(ownsrc)
2007
 
        self.assertEquals(self.path, 'test_file')
2008
 
        self.assertEquals(self.uid, s.st_uid)
2009
 
        self.assertEquals(self.gid, s.st_gid)
 
2140
        self.assertEqual(self.path, 'test_file')
 
2141
        self.assertEqual(self.uid, s.st_uid)
 
2142
        self.assertEqual(self.gid, s.st_gid)
2010
2143
 
2011
2144
    def test_copy_ownership_nonesrc(self):
2012
2145
        """copy_ownership_from_path test with src=None."""
2013
 
        f = open('test_file', 'wt')
 
2146
        open('test_file', 'wt').close()
2014
2147
        # should use parent dir for permissions
2015
2148
        osutils.copy_ownership_from_path('test_file')
2016
2149
 
2017
2150
        s = os.stat('..')
2018
 
        self.assertEquals(self.path, 'test_file')
2019
 
        self.assertEquals(self.uid, s.st_uid)
2020
 
        self.assertEquals(self.gid, s.st_gid)
 
2151
        self.assertEqual(self.path, 'test_file')
 
2152
        self.assertEqual(self.uid, s.st_uid)
 
2153
        self.assertEqual(self.gid, s.st_gid)
 
2154
 
 
2155
 
 
2156
class TestPathFromEnviron(tests.TestCase):
 
2157
 
 
2158
    def test_is_unicode(self):
 
2159
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
 
2160
        path = osutils.path_from_environ('BRZ_TEST_PATH')
 
2161
        self.assertIsInstance(path, text_type)
 
2162
        self.assertEqual(u'./anywhere at all/', path)
 
2163
 
 
2164
    def test_posix_path_env_ascii(self):
 
2165
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
 
2166
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
 
2167
        self.assertIsInstance(home, text_type)
 
2168
        self.assertEqual(u'/tmp', home)
 
2169
 
 
2170
    def test_posix_path_env_unicode(self):
 
2171
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2172
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
 
2173
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2174
        self.assertEqual(u'/home/\xa7test',
 
2175
                         osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2176
        osutils._fs_enc = "iso8859-5"
 
2177
        if PY3:
 
2178
            # In Python 3, os.environ returns unicode.
 
2179
            self.assertEqual(u'/home/\xa7test',
 
2180
                             osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2181
        else:
 
2182
            self.assertEqual(u'/home/\u0407test',
 
2183
                             osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2184
            osutils._fs_enc = "utf-8"
 
2185
            self.assertRaises(
 
2186
                errors.BadFilenameEncoding,
 
2187
                osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
 
2188
 
 
2189
 
 
2190
class TestGetHomeDir(tests.TestCase):
 
2191
 
 
2192
    def test_is_unicode(self):
 
2193
        home = osutils._get_home_dir()
 
2194
        self.assertIsInstance(home, text_type)
 
2195
 
 
2196
    def test_posix_homeless(self):
 
2197
        self.overrideEnv('HOME', None)
 
2198
        home = osutils._get_home_dir()
 
2199
        self.assertIsInstance(home, text_type)
 
2200
 
 
2201
    def test_posix_home_ascii(self):
 
2202
        self.overrideEnv('HOME', '/home/test')
 
2203
        home = osutils._posix_get_home_dir()
 
2204
        self.assertIsInstance(home, text_type)
 
2205
        self.assertEqual(u'/home/test', home)
 
2206
 
 
2207
    def test_posix_home_unicode(self):
 
2208
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2209
        self.overrideEnv('HOME', '/home/\xa7test')
 
2210
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2211
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2212
        osutils._fs_enc = "iso8859-5"
 
2213
        if PY3:
 
2214
            # In python 3, os.environ returns unicode
 
2215
            self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2216
        else:
 
2217
            self.assertEqual(u'/home/\u0407test',
 
2218
                             osutils._posix_get_home_dir())
 
2219
            osutils._fs_enc = "utf-8"
 
2220
            self.assertRaises(errors.BadFilenameEncoding,
 
2221
                              osutils._posix_get_home_dir)
 
2222
 
 
2223
 
 
2224
class TestGetuserUnicode(tests.TestCase):
 
2225
 
 
2226
    def test_is_unicode(self):
 
2227
        user = osutils.getuser_unicode()
 
2228
        self.assertIsInstance(user, text_type)
 
2229
 
 
2230
    def envvar_to_override(self):
 
2231
        if sys.platform == "win32":
 
2232
            # Disable use of platform calls on windows so envvar is used
 
2233
            self.overrideAttr(win32utils, 'has_ctypes', False)
 
2234
            return 'USERNAME'  # only variable used on windows
 
2235
        return 'LOGNAME'  # first variable checked by getpass.getuser()
 
2236
 
 
2237
    def test_ascii_user(self):
 
2238
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
 
2239
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
 
2240
 
 
2241
    def test_unicode_user(self):
 
2242
        ue = osutils.get_user_encoding()
 
2243
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
2244
        if uni_val is None:
 
2245
            raise tests.TestSkipped(
 
2246
                'Cannot find a unicode character that works in encoding %s'
 
2247
                % (osutils.get_user_encoding(),))
 
2248
        uni_username = u'jrandom' + uni_val
 
2249
        encoded_username = uni_username.encode(ue)
 
2250
        if PY3:
 
2251
            self.overrideEnv(self.envvar_to_override(), uni_username)
 
2252
        else:
 
2253
            self.overrideEnv(self.envvar_to_override(), encoded_username)
 
2254
        self.assertEqual(uni_username, osutils.getuser_unicode())
 
2255
 
 
2256
 
 
2257
class TestBackupNames(tests.TestCase):
 
2258
 
 
2259
    def setUp(self):
 
2260
        super(TestBackupNames, self).setUp()
 
2261
        self.backups = []
 
2262
 
 
2263
    def backup_exists(self, name):
 
2264
        return name in self.backups
 
2265
 
 
2266
    def available_backup_name(self, name):
 
2267
        backup_name = osutils.available_backup_name(name, self.backup_exists)
 
2268
        self.backups.append(backup_name)
 
2269
        return backup_name
 
2270
 
 
2271
    def assertBackupName(self, expected, name):
 
2272
        self.assertEqual(expected, self.available_backup_name(name))
 
2273
 
 
2274
    def test_empty(self):
 
2275
        self.assertBackupName('file.~1~', 'file')
 
2276
 
 
2277
    def test_existing(self):
 
2278
        self.available_backup_name('file')
 
2279
        self.available_backup_name('file')
 
2280
        self.assertBackupName('file.~3~', 'file')
 
2281
        # Empty slots are found, this is not a strict requirement and may be
 
2282
        # revisited if we test against all implementations.
 
2283
        self.backups.remove('file.~2~')
 
2284
        self.assertBackupName('file.~2~', 'file')
 
2285
 
 
2286
 
 
2287
class TestFindExecutableInPath(tests.TestCase):
 
2288
 
 
2289
    def test_windows(self):
 
2290
        if sys.platform != 'win32':
 
2291
            raise tests.TestSkipped('test requires win32')
 
2292
        self.assertTrue(osutils.find_executable_on_path(
 
2293
            'explorer') is not None)
 
2294
        self.assertTrue(
 
2295
            osutils.find_executable_on_path('explorer.exe') is not None)
 
2296
        self.assertTrue(
 
2297
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
 
2298
        self.assertTrue(
 
2299
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2300
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
 
2301
 
 
2302
    def test_windows_app_path(self):
 
2303
        if sys.platform != 'win32':
 
2304
            raise tests.TestSkipped('test requires win32')
 
2305
        # Override PATH env var so that exe can only be found on App Path
 
2306
        self.overrideEnv('PATH', '')
 
2307
        # Internt Explorer is always registered in the App Path
 
2308
        self.assertTrue(osutils.find_executable_on_path(
 
2309
            'iexplore') is not None)
 
2310
 
 
2311
    def test_other(self):
 
2312
        if sys.platform == 'win32':
 
2313
            raise tests.TestSkipped('test requires non-win32')
 
2314
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
 
2315
        self.assertTrue(
 
2316
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2317
 
 
2318
 
 
2319
class TestEnvironmentErrors(tests.TestCase):
 
2320
    """Test handling of environmental errors"""
 
2321
 
 
2322
    def test_is_oserror(self):
 
2323
        self.assertTrue(osutils.is_environment_error(
 
2324
            OSError(errno.EINVAL, "Invalid parameter")))
 
2325
 
 
2326
    def test_is_ioerror(self):
 
2327
        self.assertTrue(osutils.is_environment_error(
 
2328
            IOError(errno.EINVAL, "Invalid parameter")))
 
2329
 
 
2330
    def test_is_socket_error(self):
 
2331
        self.assertTrue(osutils.is_environment_error(
 
2332
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2333
 
 
2334
    def test_is_select_error(self):
 
2335
        self.assertTrue(osutils.is_environment_error(
 
2336
            select.error(errno.EINVAL, "Invalid parameter")))
 
2337
 
 
2338
    def test_is_pywintypes_error(self):
 
2339
        self.requireFeature(features.pywintypes)
 
2340
        import pywintypes
 
2341
        self.assertTrue(osutils.is_environment_error(
 
2342
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))
 
2343
 
 
2344
 
 
2345
class SupportsExecutableTests(tests.TestCaseInTempDir):
 
2346
 
 
2347
    def test_returns_bool(self):
 
2348
        self.assertIsInstance(osutils.supports_executable(self.test_dir), bool)
 
2349
 
 
2350
 
 
2351
class SupportsSymlinksTests(tests.TestCaseInTempDir):
 
2352
 
 
2353
    def test_returns_bool(self):
 
2354
        self.assertIsInstance(osutils.supports_symlinks(self.test_dir), bool)
 
2355
 
 
2356
 
 
2357
class MtabReader(tests.TestCaseInTempDir):
 
2358
 
 
2359
    def test_read_mtab(self):
 
2360
        self.build_tree_contents([('mtab', """\
 
2361
/dev/mapper/blah--vg-root / ext4 rw,relatime,errors=remount-ro 0 0
 
2362
/dev/mapper/blah--vg-home /home vfat rw,relatime 0 0
 
2363
# comment
 
2364
 
 
2365
iminvalid
 
2366
""")])
 
2367
        self.assertEqual(
 
2368
            list(osutils.read_mtab('mtab')),
 
2369
            [(b'/', 'ext4'),
 
2370
             (b'/home', 'vfat')])
 
2371
 
 
2372
 
 
2373
class GetFsTypeTests(tests.TestCaseInTempDir):
 
2374
 
 
2375
    def test_returns_string_or_none(self):
 
2376
        ret = osutils.get_fs_type(self.test_dir)
 
2377
        self.assertTrue(isinstance(ret, text_type) or ret is None)
 
2378
 
 
2379
    def test_returns_most_specific(self):
 
2380
        self.overrideAttr(
 
2381
            osutils, '_FILESYSTEM_FINDER',
 
2382
            osutils.FilesystemFinder(
 
2383
                [(b'/', 'ext4'), (b'/home', 'vfat'),
 
2384
                 (b'/home/jelmer', 'ext2')]))
 
2385
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer/blah'), 'ext2')
 
2386
        self.assertEqual(osutils.get_fs_type('/home/jelmer/blah'), 'ext2')
 
2387
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer'), 'ext2')
 
2388
        self.assertEqual(osutils.get_fs_type(b'/home/martin'), 'vfat')
 
2389
        self.assertEqual(osutils.get_fs_type(b'/home'), 'vfat')
 
2390
        self.assertEqual(osutils.get_fs_type(b'/other'), 'ext4')
 
2391
 
 
2392
    def test_returns_none(self):
 
2393
        self.overrideAttr(
 
2394
            osutils, '_FILESYSTEM_FINDER',
 
2395
            osutils.FilesystemFinder([]))
 
2396
        self.assertIs(osutils.get_fs_type('/home/jelmer/blah'), None)
 
2397
        self.assertIs(osutils.get_fs_type(b'/home/jelmer/blah'), None)
 
2398
        self.assertIs(osutils.get_fs_type('/home/jelmer'), None)