/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Robert Collins
  • Date: 2010-05-11 08:36:16 UTC
  • mto: This revision was merged to the branch mainline in revision 5223.
  • Revision ID: robertc@robertcollins.net-20100511083616-b8fjb19zomwupid0
Make all lock methods return Result objects, rather than lock_read returning self, as per John's review.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2016 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
 
from __future__ import absolute_import, division
20
 
 
 
19
from cStringIO import StringIO
21
20
import errno
22
21
import os
23
 
import select
 
22
import re
24
23
import socket
 
24
import stat
25
25
import sys
26
 
import tempfile
27
26
import time
28
27
 
29
 
from .. import (
 
28
from bzrlib import (
30
29
    errors,
31
30
    osutils,
32
31
    tests,
33
32
    trace,
34
33
    win32utils,
35
34
    )
36
 
from ..sixish import (
37
 
    BytesIO,
38
 
    PY3,
39
 
    text_type,
40
 
    )
41
 
from . import (
 
35
from bzrlib.tests import (
42
36
    features,
43
37
    file_utils,
44
38
    test__walkdirs_win32,
45
39
    )
46
 
from .scenarios import load_tests_apply_scenarios
47
 
 
48
 
 
49
 
class _UTF8DirReaderFeature(features.ModuleAvailableFeature):
 
40
 
 
41
 
 
42
class _UTF8DirReaderFeature(tests.Feature):
50
43
 
51
44
    def _probe(self):
52
45
        try:
53
 
            from .. import _readdir_pyx
54
 
            self._module = _readdir_pyx
 
46
            from bzrlib import _readdir_pyx
55
47
            self.reader = _readdir_pyx.UTF8DirReader
56
48
            return True
57
49
        except ImportError:
58
50
            return False
59
51
 
60
 
 
61
 
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
62
 
 
63
 
term_ios_feature = features.ModuleAvailableFeature('termios')
 
52
    def feature_name(self):
 
53
        return 'bzrlib._readdir_pyx'
 
54
 
 
55
UTF8DirReaderFeature = _UTF8DirReaderFeature()
 
56
 
 
57
term_ios_feature = tests.ModuleAvailableFeature('termios')
64
58
 
65
59
 
66
60
def _already_unicode(s):
84
78
    # Some DirReaders are platform specific and even there they may not be
85
79
    # available.
86
80
    if UTF8DirReaderFeature.available():
87
 
        from .. import _readdir_pyx
 
81
        from bzrlib import _readdir_pyx
88
82
        scenarios.append(('utf8',
89
83
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
90
84
                               _native_to_unicode=_utf8_to_unicode)))
91
85
 
92
86
    if test__walkdirs_win32.win32_readdir_feature.available():
93
87
        try:
94
 
            from .. import _walkdirs_win32
 
88
            from bzrlib import _walkdirs_win32
95
89
            scenarios.append(
96
90
                ('win32',
97
91
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
101
95
    return scenarios
102
96
 
103
97
 
104
 
load_tests = load_tests_apply_scenarios
 
98
def load_tests(basic_tests, module, loader):
 
99
    suite = loader.suiteClass()
 
100
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
 
101
        basic_tests, tests.condition_isinstance(TestDirReader))
 
102
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
 
103
    suite.addTest(remaining_tests)
 
104
    return suite
105
105
 
106
106
 
107
107
class TestContainsWhitespace(tests.TestCase):
108
108
 
109
109
    def test_contains_whitespace(self):
110
 
        self.assertTrue(osutils.contains_whitespace(u' '))
111
 
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
112
 
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
113
 
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
114
 
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
115
 
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
 
110
        self.failUnless(osutils.contains_whitespace(u' '))
 
111
        self.failUnless(osutils.contains_whitespace(u'hello there'))
 
112
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
 
113
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
 
114
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
 
115
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
116
116
 
117
117
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
118
118
        # is whitespace, but we do not.
119
 
        self.assertFalse(osutils.contains_whitespace(u''))
120
 
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
121
 
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
 
119
        self.failIf(osutils.contains_whitespace(u''))
 
120
        self.failIf(osutils.contains_whitespace(u'hellothere'))
 
121
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
122
122
 
123
123
 
124
124
class TestRename(tests.TestCaseInTempDir):
136
136
 
137
137
    def test_fancy_rename(self):
138
138
        # This should work everywhere
139
 
        self.create_file('a', b'something in a\n')
 
139
        self.create_file('a', 'something in a\n')
140
140
        self._fancy_rename('a', 'b')
141
 
        self.assertPathDoesNotExist('a')
142
 
        self.assertPathExists('b')
143
 
        self.check_file_contents('b', b'something in a\n')
 
141
        self.failIfExists('a')
 
142
        self.failUnlessExists('b')
 
143
        self.check_file_contents('b', 'something in a\n')
144
144
 
145
 
        self.create_file('a', b'new something in a\n')
 
145
        self.create_file('a', 'new something in a\n')
146
146
        self._fancy_rename('b', 'a')
147
147
 
148
 
        self.check_file_contents('a', b'something in a\n')
 
148
        self.check_file_contents('a', 'something in a\n')
149
149
 
150
150
    def test_fancy_rename_fails_source_missing(self):
151
151
        # An exception should be raised, and the target should be left in place
152
 
        self.create_file('target', b'data in target\n')
 
152
        self.create_file('target', 'data in target\n')
153
153
        self.assertRaises((IOError, OSError), self._fancy_rename,
154
154
                          'missingsource', 'target')
155
 
        self.assertPathExists('target')
156
 
        self.check_file_contents('target', b'data in target\n')
 
155
        self.failUnlessExists('target')
 
156
        self.check_file_contents('target', 'data in target\n')
157
157
 
158
158
    def test_fancy_rename_fails_if_source_and_target_missing(self):
159
159
        self.assertRaises((IOError, OSError), self._fancy_rename,
161
161
 
162
162
    def test_rename(self):
163
163
        # Rename should be semi-atomic on all platforms
164
 
        self.create_file('a', b'something in a\n')
 
164
        self.create_file('a', 'something in a\n')
165
165
        osutils.rename('a', 'b')
166
 
        self.assertPathDoesNotExist('a')
167
 
        self.assertPathExists('b')
168
 
        self.check_file_contents('b', b'something in a\n')
 
166
        self.failIfExists('a')
 
167
        self.failUnlessExists('b')
 
168
        self.check_file_contents('b', 'something in a\n')
169
169
 
170
 
        self.create_file('a', b'new something in a\n')
 
170
        self.create_file('a', 'new something in a\n')
171
171
        osutils.rename('b', 'a')
172
172
 
173
 
        self.check_file_contents('a', b'something in a\n')
 
173
        self.check_file_contents('a', 'something in a\n')
174
174
 
175
175
    # TODO: test fancy_rename using a MemoryTransport
176
176
 
182
182
        # we can't use failUnlessExists on case-insensitive filesystem
183
183
        # so try to check shape of the tree
184
184
        shape = sorted(os.listdir('.'))
185
 
        self.assertEqual(['A', 'B'], shape)
 
185
        self.assertEquals(['A', 'B'], shape)
186
186
 
187
 
    def test_rename_exception(self):
188
 
        try:
189
 
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
190
 
        except OSError as e:
191
 
            self.assertEqual(e.old_filename, 'nonexistent_path')
192
 
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
193
 
            self.assertTrue('nonexistent_path' in e.strerror)
194
 
            self.assertTrue('different_nonexistent_path' in e.strerror)
 
187
    def test_rename_error(self):
 
188
        # We wrap os.rename to make it give an error including the filenames
 
189
        # https://bugs.launchpad.net/bzr/+bug/491763
 
190
        err = self.assertRaises(OSError, osutils.rename,
 
191
            'nonexistent', 'target')
 
192
        self.assertContainsString(str(err), 'nonexistent')
195
193
 
196
194
 
197
195
class TestRandChars(tests.TestCase):
224
222
                         (['src'], SRC_FOO_C),
225
223
                         (['src'], 'src'),
226
224
                         ]:
227
 
            self.assertTrue(osutils.is_inside_any(dirs, fn))
 
225
            self.assert_(osutils.is_inside_any(dirs, fn))
228
226
        for dirs, fn in [(['src'], 'srccontrol'),
229
227
                         (['src'], 'srccontrol/foo')]:
230
228
            self.assertFalse(osutils.is_inside_any(dirs, fn))
236
234
                         (['src/bar.c', 'bla/foo.c'], 'src'),
237
235
                         (['src'], 'src'),
238
236
                         ]:
239
 
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
 
237
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
240
238
 
241
239
        for dirs, fn in [(['src'], 'srccontrol'),
242
240
                         (['srccontrol/foo.c'], 'src'),
244
242
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
245
243
 
246
244
 
247
 
class TestLstat(tests.TestCaseInTempDir):
248
 
 
249
 
    def test_lstat_matches_fstat(self):
250
 
        # On Windows, lstat and fstat don't always agree, primarily in the
251
 
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
252
 
        # custom implementation.
253
 
        if sys.platform == 'win32':
254
 
            # We only have special lstat/fstat if we have the extension.
255
 
            # Without it, we may end up re-reading content when we don't have
256
 
            # to, but otherwise it doesn't effect correctness.
257
 
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
258
 
        with open('test-file.txt', 'wb') as f:
259
 
            f.write(b'some content\n')
260
 
            f.flush()
261
 
            self.assertEqualStat(osutils.fstat(f.fileno()),
262
 
                                 osutils.lstat('test-file.txt'))
263
 
 
264
 
 
265
245
class TestRmTree(tests.TestCaseInTempDir):
266
246
 
267
247
    def test_rmtree(self):
268
248
        # Check to remove tree with read-only files/dirs
269
249
        os.mkdir('dir')
270
 
        with open('dir/file', 'w') as f:
271
 
            f.write('spam')
 
250
        f = file('dir/file', 'w')
 
251
        f.write('spam')
 
252
        f.close()
272
253
        # would like to also try making the directory readonly, but at the
273
254
        # moment python shutil.rmtree doesn't handle that properly - it would
274
255
        # need to chmod the directory before removing things inside it - deferred
278
259
 
279
260
        osutils.rmtree('dir')
280
261
 
281
 
        self.assertPathDoesNotExist('dir/file')
282
 
        self.assertPathDoesNotExist('dir')
 
262
        self.failIfExists('dir/file')
 
263
        self.failIfExists('dir')
283
264
 
284
265
 
285
266
class TestDeleteAny(tests.TestCaseInTempDir):
298
279
 
299
280
    def test_file_kind(self):
300
281
        self.build_tree(['file', 'dir/'])
301
 
        self.assertEqual('file', osutils.file_kind('file'))
302
 
        self.assertEqual('directory', osutils.file_kind('dir/'))
 
282
        self.assertEquals('file', osutils.file_kind('file'))
 
283
        self.assertEquals('directory', osutils.file_kind('dir/'))
303
284
        if osutils.has_symlinks():
304
285
            os.symlink('symlink', 'symlink')
305
 
            self.assertEqual('symlink', osutils.file_kind('symlink'))
 
286
            self.assertEquals('symlink', osutils.file_kind('symlink'))
306
287
 
307
288
        # TODO: jam 20060529 Test a block device
308
289
        try:
309
290
            os.lstat('/dev/null')
310
 
        except OSError as e:
 
291
        except OSError, e:
311
292
            if e.errno not in (errno.ENOENT,):
312
293
                raise
313
294
        else:
314
 
            self.assertEqual(
315
 
                'chardev',
316
 
                osutils.file_kind(os.path.realpath('/dev/null')))
 
295
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
317
296
 
318
297
        mkfifo = getattr(os, 'mkfifo', None)
319
298
        if mkfifo:
320
299
            mkfifo('fifo')
321
300
            try:
322
 
                self.assertEqual('fifo', osutils.file_kind('fifo'))
 
301
                self.assertEquals('fifo', osutils.file_kind('fifo'))
323
302
            finally:
324
303
                os.remove('fifo')
325
304
 
328
307
            s = socket.socket(AF_UNIX)
329
308
            s.bind('socket')
330
309
            try:
331
 
                self.assertEqual('socket', osutils.file_kind('socket'))
 
310
                self.assertEquals('socket', osutils.file_kind('socket'))
332
311
            finally:
333
312
                os.remove('socket')
334
313
 
353
332
 
354
333
        orig_umask = osutils.get_umask()
355
334
        self.addCleanup(os.umask, orig_umask)
356
 
        os.umask(0o222)
357
 
        self.assertEqual(0o222, osutils.get_umask())
358
 
        os.umask(0o022)
359
 
        self.assertEqual(0o022, osutils.get_umask())
360
 
        os.umask(0o002)
361
 
        self.assertEqual(0o002, osutils.get_umask())
362
 
        os.umask(0o027)
363
 
        self.assertEqual(0o027, osutils.get_umask())
 
335
        os.umask(0222)
 
336
        self.assertEqual(0222, osutils.get_umask())
 
337
        os.umask(0022)
 
338
        self.assertEqual(0022, osutils.get_umask())
 
339
        os.umask(0002)
 
340
        self.assertEqual(0002, osutils.get_umask())
 
341
        os.umask(0027)
 
342
        self.assertEqual(0027, osutils.get_umask())
364
343
 
365
344
 
366
345
class TestDateTime(tests.TestCase):
402
381
        self.assertFormatedDelta('2 seconds in the future', -2)
403
382
 
404
383
    def test_format_date(self):
405
 
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
406
 
                          osutils.format_date, 0, timezone='foo')
 
384
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
385
            osutils.format_date, 0, timezone='foo')
407
386
        self.assertIsInstance(osutils.format_date(0), str)
408
 
        self.assertIsInstance(osutils.format_local_date(0), text_type)
 
387
        self.assertIsInstance(osutils.format_local_date(0), unicode)
409
388
        # Testing for the actual value of the local weekday without
410
389
        # duplicating the code from format_date is difficult.
411
390
        # Instead blackbox.test_locale should check for localized
413
392
 
414
393
    def test_format_date_with_offset_in_original_timezone(self):
415
394
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
416
 
                         osutils.format_date_with_offset_in_original_timezone(0))
 
395
            osutils.format_date_with_offset_in_original_timezone(0))
417
396
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
418
 
                         osutils.format_date_with_offset_in_original_timezone(100000))
 
397
            osutils.format_date_with_offset_in_original_timezone(100000))
419
398
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
420
 
                         osutils.format_date_with_offset_in_original_timezone(100000, 7200))
 
399
            osutils.format_date_with_offset_in_original_timezone(100000, 7200))
421
400
 
422
401
    def test_local_time_offset(self):
423
402
        """Test that local_time_offset() returns a sane value."""
439
418
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
440
419
 
441
420
 
442
 
class TestFdatasync(tests.TestCaseInTempDir):
443
 
 
444
 
    def do_fdatasync(self):
445
 
        f = tempfile.NamedTemporaryFile()
446
 
        osutils.fdatasync(f.fileno())
447
 
        f.close()
448
 
 
449
 
    @staticmethod
450
 
    def raise_eopnotsupp(*args, **kwargs):
451
 
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
452
 
 
453
 
    @staticmethod
454
 
    def raise_enotsup(*args, **kwargs):
455
 
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
456
 
 
457
 
    def test_fdatasync_handles_system_function(self):
458
 
        self.overrideAttr(os, "fdatasync")
459
 
        self.do_fdatasync()
460
 
 
461
 
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
462
 
        self.overrideAttr(os, "fdatasync")
463
 
        self.overrideAttr(os, "fsync")
464
 
        self.do_fdatasync()
465
 
 
466
 
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
467
 
        self.overrideAttr(errno, "EOPNOTSUPP")
468
 
        self.do_fdatasync()
469
 
 
470
 
    def test_fdatasync_catches_ENOTSUP(self):
471
 
        enotsup = getattr(errno, "ENOTSUP", None)
472
 
        if enotsup is None:
473
 
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
474
 
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
475
 
        self.do_fdatasync()
476
 
 
477
 
    def test_fdatasync_catches_EOPNOTSUPP(self):
478
 
        enotsup = getattr(errno, "EOPNOTSUPP", None)
479
 
        if enotsup is None:
480
 
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
481
 
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
482
 
        self.do_fdatasync()
483
 
 
484
 
 
485
421
class TestLinks(tests.TestCaseInTempDir):
486
422
 
487
423
    def test_dereference_path(self):
488
 
        self.requireFeature(features.SymlinkFeature)
 
424
        self.requireFeature(tests.SymlinkFeature)
489
425
        cwd = osutils.realpath('.')
490
426
        os.mkdir('bar')
491
427
        bar_path = osutils.pathjoin(cwd, 'bar')
512
448
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
513
449
 
514
450
    def test_changing_access(self):
515
 
        with open('file', 'w') as f:
516
 
            f.write('monkey')
 
451
        f = file('file', 'w')
 
452
        f.write('monkey')
 
453
        f.close()
517
454
 
518
455
        # Make a file readonly
519
456
        osutils.make_readonly('file')
520
457
        mode = os.lstat('file').st_mode
521
 
        self.assertEqual(mode, mode & 0o777555)
 
458
        self.assertEqual(mode, mode & 0777555)
522
459
 
523
460
        # Make a file writable
524
461
        osutils.make_writable('file')
525
462
        mode = os.lstat('file').st_mode
526
 
        self.assertEqual(mode, mode | 0o200)
 
463
        self.assertEqual(mode, mode | 0200)
527
464
 
528
465
        if osutils.has_symlinks():
529
466
            # should not error when handed a symlink
537
474
 
538
475
class TestCanonicalRelPath(tests.TestCaseInTempDir):
539
476
 
540
 
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
 
477
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
541
478
 
542
479
    def test_canonical_relpath_simple(self):
543
 
        f = open('MixedCaseName', 'w')
 
480
        f = file('MixedCaseName', 'w')
544
481
        f.close()
545
482
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
546
 
        self.assertEqual('work/MixedCaseName', actual)
 
483
        self.failUnlessEqual('work/MixedCaseName', actual)
547
484
 
548
485
    def test_canonical_relpath_missing_tail(self):
549
486
        os.mkdir('MixedCaseParent')
550
487
        actual = osutils.canonical_relpath(self.test_base_dir,
551
488
                                           'mixedcaseparent/nochild')
552
 
        self.assertEqual('work/MixedCaseParent/nochild', actual)
 
489
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
553
490
 
554
491
 
555
492
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
599
536
    """Test pumpfile method."""
600
537
 
601
538
    def setUp(self):
602
 
        super(TestPumpFile, self).setUp()
 
539
        tests.TestCase.setUp(self)
603
540
        # create a test datablock
604
541
        self.block_size = 512
605
 
        pattern = b'0123456789ABCDEF'
606
 
        self.test_data = pattern * (3 * self.block_size // len(pattern))
 
542
        pattern = '0123456789ABCDEF'
 
543
        self.test_data = pattern * (3 * self.block_size / len(pattern))
607
544
        self.test_data_len = len(self.test_data)
608
545
 
609
546
    def test_bracket_block_size(self):
613
550
        self.assertTrue(self.test_data_len > self.block_size)
614
551
 
615
552
        from_file = file_utils.FakeReadFile(self.test_data)
616
 
        to_file = BytesIO()
 
553
        to_file = StringIO()
617
554
 
618
 
        # read (max // 2) bytes and verify read size wasn't affected
619
 
        num_bytes_to_read = self.block_size // 2
620
 
        osutils.pumpfile(from_file, to_file,
621
 
                         num_bytes_to_read, self.block_size)
 
555
        # read (max / 2) bytes and verify read size wasn't affected
 
556
        num_bytes_to_read = self.block_size / 2
 
557
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
622
558
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
623
559
        self.assertEqual(from_file.get_read_count(), 1)
624
560
 
625
561
        # read (max) bytes and verify read size wasn't affected
626
562
        num_bytes_to_read = self.block_size
627
563
        from_file.reset_read_count()
628
 
        osutils.pumpfile(from_file, to_file,
629
 
                         num_bytes_to_read, self.block_size)
 
564
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
630
565
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
631
566
        self.assertEqual(from_file.get_read_count(), 1)
632
567
 
633
568
        # read (max + 1) bytes and verify read size was limited
634
569
        num_bytes_to_read = self.block_size + 1
635
570
        from_file.reset_read_count()
636
 
        osutils.pumpfile(from_file, to_file,
637
 
                         num_bytes_to_read, self.block_size)
 
571
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
638
572
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
639
573
        self.assertEqual(from_file.get_read_count(), 2)
640
574
 
641
575
        # finish reading the rest of the data
642
576
        num_bytes_to_read = self.test_data_len - to_file.tell()
643
 
        osutils.pumpfile(from_file, to_file,
644
 
                         num_bytes_to_read, self.block_size)
 
577
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
645
578
 
646
579
        # report error if the data wasn't equal (we only report the size due
647
580
        # to the length of the data)
658
591
 
659
592
        # retrieve data in blocks
660
593
        from_file = file_utils.FakeReadFile(self.test_data)
661
 
        to_file = BytesIO()
 
594
        to_file = StringIO()
662
595
        osutils.pumpfile(from_file, to_file, self.test_data_len,
663
596
                         self.block_size)
664
597
 
682
615
 
683
616
        # retrieve data to EOF
684
617
        from_file = file_utils.FakeReadFile(self.test_data)
685
 
        to_file = BytesIO()
 
618
        to_file = StringIO()
686
619
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
687
620
 
688
621
        # verify read size was equal to the maximum read size
702
635
        with this new version."""
703
636
        # retrieve data using default (old) pumpfile method
704
637
        from_file = file_utils.FakeReadFile(self.test_data)
705
 
        to_file = BytesIO()
 
638
        to_file = StringIO()
706
639
        osutils.pumpfile(from_file, to_file)
707
640
 
708
641
        # report error if the data wasn't equal (we only report the size due
714
647
 
715
648
    def test_report_activity(self):
716
649
        activity = []
717
 
 
718
650
        def log_activity(length, direction):
719
651
            activity.append((length, direction))
720
 
        from_file = BytesIO(self.test_data)
721
 
        to_file = BytesIO()
 
652
        from_file = StringIO(self.test_data)
 
653
        to_file = StringIO()
722
654
        osutils.pumpfile(from_file, to_file, buff_size=500,
723
655
                         report_activity=log_activity, direction='read')
724
656
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
725
657
                          (36, 'read')], activity)
726
658
 
727
 
        from_file = BytesIO(self.test_data)
728
 
        to_file = BytesIO()
 
659
        from_file = StringIO(self.test_data)
 
660
        to_file = StringIO()
729
661
        del activity[:]
730
662
        osutils.pumpfile(from_file, to_file, buff_size=500,
731
663
                         report_activity=log_activity, direction='write')
733
665
                          (36, 'write')], activity)
734
666
 
735
667
        # And with a limited amount of data
736
 
        from_file = BytesIO(self.test_data)
737
 
        to_file = BytesIO()
 
668
        from_file = StringIO(self.test_data)
 
669
        to_file = StringIO()
738
670
        del activity[:]
739
671
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
740
672
                         report_activity=log_activity, direction='read')
741
 
        self.assertEqual(
742
 
            [(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
673
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
674
 
743
675
 
744
676
 
745
677
class TestPumpStringFile(tests.TestCase):
746
678
 
747
679
    def test_empty(self):
748
 
        output = BytesIO()
749
 
        osutils.pump_string_file(b"", output)
750
 
        self.assertEqual(b"", output.getvalue())
 
680
        output = StringIO()
 
681
        osutils.pump_string_file("", output)
 
682
        self.assertEqual("", output.getvalue())
751
683
 
752
684
    def test_more_than_segment_size(self):
753
 
        output = BytesIO()
754
 
        osutils.pump_string_file(b"123456789", output, 2)
755
 
        self.assertEqual(b"123456789", output.getvalue())
 
685
        output = StringIO()
 
686
        osutils.pump_string_file("123456789", output, 2)
 
687
        self.assertEqual("123456789", output.getvalue())
756
688
 
757
689
    def test_segment_size(self):
758
 
        output = BytesIO()
759
 
        osutils.pump_string_file(b"12", output, 2)
760
 
        self.assertEqual(b"12", output.getvalue())
 
690
        output = StringIO()
 
691
        osutils.pump_string_file("12", output, 2)
 
692
        self.assertEqual("12", output.getvalue())
761
693
 
762
694
    def test_segment_size_multiple(self):
763
 
        output = BytesIO()
764
 
        osutils.pump_string_file(b"1234", output, 2)
765
 
        self.assertEqual(b"1234", output.getvalue())
 
695
        output = StringIO()
 
696
        osutils.pump_string_file("1234", output, 2)
 
697
        self.assertEqual("1234", output.getvalue())
766
698
 
767
699
 
768
700
class TestRelpath(tests.TestCase):
787
719
class TestSafeUnicode(tests.TestCase):
788
720
 
789
721
    def test_from_ascii_string(self):
790
 
        self.assertEqual(u'foobar', osutils.safe_unicode(b'foobar'))
 
722
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
791
723
 
792
724
    def test_from_unicode_string_ascii_contents(self):
793
725
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
796
728
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
797
729
 
798
730
    def test_from_utf8_string(self):
799
 
        self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
 
731
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
800
732
 
801
733
    def test_bad_utf8_string(self):
802
734
        self.assertRaises(errors.BzrBadParameterNotUnicode,
803
735
                          osutils.safe_unicode,
804
 
                          b'\xbb\xbb')
 
736
                          '\xbb\xbb')
805
737
 
806
738
 
807
739
class TestSafeUtf8(tests.TestCase):
808
740
 
809
741
    def test_from_ascii_string(self):
810
 
        f = b'foobar'
811
 
        self.assertEqual(b'foobar', osutils.safe_utf8(f))
 
742
        f = 'foobar'
 
743
        self.assertEqual('foobar', osutils.safe_utf8(f))
812
744
 
813
745
    def test_from_unicode_string_ascii_contents(self):
814
 
        self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
 
746
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
815
747
 
816
748
    def test_from_unicode_string_unicode_contents(self):
817
 
        self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
749
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
818
750
 
819
751
    def test_from_utf8_string(self):
820
 
        self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
 
752
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
821
753
 
822
754
    def test_bad_utf8_string(self):
823
755
        self.assertRaises(errors.BzrBadParameterNotUnicode,
824
 
                          osutils.safe_utf8, b'\xbb\xbb')
825
 
 
826
 
 
827
 
class TestSendAll(tests.TestCase):
828
 
 
829
 
    def test_send_with_disconnected_socket(self):
830
 
        class DisconnectedSocket(object):
831
 
            def __init__(self, err):
832
 
                self.err = err
833
 
 
834
 
            def send(self, content):
835
 
                raise self.err
836
 
 
837
 
            def close(self):
838
 
                pass
839
 
        # All of these should be treated as ConnectionReset
840
 
        errs = []
841
 
        for err_cls in (IOError, socket.error):
842
 
            for errnum in osutils._end_of_stream_errors:
843
 
                errs.append(err_cls(errnum))
844
 
        for err in errs:
845
 
            sock = DisconnectedSocket(err)
846
 
            self.assertRaises(errors.ConnectionReset,
847
 
                              osutils.send_all, sock, b'some more content')
848
 
 
849
 
    def test_send_with_no_progress(self):
850
 
        # See https://bugs.launchpad.net/bzr/+bug/1047309
851
 
        # It seems that paramiko can get into a state where it doesn't error,
852
 
        # but it returns 0 bytes sent for requests over and over again.
853
 
        class NoSendingSocket(object):
854
 
            def __init__(self):
855
 
                self.call_count = 0
856
 
 
857
 
            def send(self, bytes):
858
 
                self.call_count += 1
859
 
                if self.call_count > 100:
860
 
                    # Prevent the test suite from hanging
861
 
                    raise RuntimeError('too many calls')
862
 
                return 0
863
 
        sock = NoSendingSocket()
864
 
        self.assertRaises(errors.ConnectionReset,
865
 
                          osutils.send_all, sock, b'content')
866
 
        self.assertEqual(1, sock.call_count)
867
 
 
868
 
 
869
 
class TestPosixFuncs(tests.TestCase):
870
 
    """Test that the posix version of normpath returns an appropriate path
871
 
       when used with 2 leading slashes."""
872
 
 
873
 
    def test_normpath(self):
874
 
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
875
 
        self.assertEqual(
876
 
            '/etc/shadow', osutils._posix_normpath('//etc/shadow'))
877
 
        self.assertEqual(
878
 
            '/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
756
                          osutils.safe_utf8, '\xbb\xbb')
 
757
 
 
758
 
 
759
class TestSafeRevisionId(tests.TestCase):
 
760
 
 
761
    def test_from_ascii_string(self):
 
762
        # this shouldn't give a warning because it's getting an ascii string
 
763
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
764
 
 
765
    def test_from_unicode_string_ascii_contents(self):
 
766
        self.assertEqual('bargam',
 
767
                         osutils.safe_revision_id(u'bargam', warn=False))
 
768
 
 
769
    def test_from_unicode_deprecated(self):
 
770
        self.assertEqual('bargam',
 
771
            self.callDeprecated([osutils._revision_id_warning],
 
772
                                osutils.safe_revision_id, u'bargam'))
 
773
 
 
774
    def test_from_unicode_string_unicode_contents(self):
 
775
        self.assertEqual('bargam\xc2\xae',
 
776
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
777
 
 
778
    def test_from_utf8_string(self):
 
779
        self.assertEqual('foo\xc2\xae',
 
780
                         osutils.safe_revision_id('foo\xc2\xae'))
 
781
 
 
782
    def test_none(self):
 
783
        """Currently, None is a valid revision_id"""
 
784
        self.assertEqual(None, osutils.safe_revision_id(None))
 
785
 
 
786
 
 
787
class TestSafeFileId(tests.TestCase):
 
788
 
 
789
    def test_from_ascii_string(self):
 
790
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
791
 
 
792
    def test_from_unicode_string_ascii_contents(self):
 
793
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
 
794
 
 
795
    def test_from_unicode_deprecated(self):
 
796
        self.assertEqual('bargam',
 
797
            self.callDeprecated([osutils._file_id_warning],
 
798
                                osutils.safe_file_id, u'bargam'))
 
799
 
 
800
    def test_from_unicode_string_unicode_contents(self):
 
801
        self.assertEqual('bargam\xc2\xae',
 
802
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
803
 
 
804
    def test_from_utf8_string(self):
 
805
        self.assertEqual('foo\xc2\xae',
 
806
                         osutils.safe_file_id('foo\xc2\xae'))
 
807
 
 
808
    def test_none(self):
 
809
        """Currently, None is a valid revision_id"""
 
810
        self.assertEqual(None, osutils.safe_file_id(None))
879
811
 
880
812
 
881
813
class TestWin32Funcs(tests.TestCase):
882
814
    """Test that _win32 versions of os utilities return appropriate paths."""
883
815
 
884
816
    def test_abspath(self):
885
 
        self.requireFeature(features.win32_feature)
886
817
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
887
818
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
888
819
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
901
832
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
902
833
        self.assertEqual('path/to/foo',
903
834
                         osutils._win32_pathjoin('path/to/', 'foo'))
904
 
 
905
 
    def test_pathjoin_late_bugfix(self):
906
 
        if sys.version_info < (2, 7, 6):
907
 
            expected = '/foo'
908
 
        else:
909
 
            expected = 'C:/foo'
910
 
        self.assertEqual(expected,
 
835
        self.assertEqual('/foo',
911
836
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
912
 
        self.assertEqual(expected,
 
837
        self.assertEqual('/foo',
913
838
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
914
839
 
915
840
    def test_normpath(self):
920
845
 
921
846
    def test_getcwd(self):
922
847
        cwd = osutils._win32_getcwd()
923
 
        os_cwd = osutils._getcwd()
 
848
        os_cwd = os.getcwdu()
924
849
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
925
850
        # win32 is inconsistent whether it returns lower or upper case
926
851
        # and even if it was consistent the user might type the other
934
859
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
935
860
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
936
861
 
 
862
    def test_win98_abspath(self):
 
863
        # absolute path
 
864
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
 
865
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
 
866
        # UNC path
 
867
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
 
868
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
 
869
        # relative path
 
870
        cwd = osutils.getcwd().rstrip('/')
 
871
        drive = osutils._nt_splitdrive(cwd)[0]
 
872
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
 
873
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
 
874
        # unicode path
 
875
        u = u'\u1234'
 
876
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
 
877
 
937
878
 
938
879
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
939
880
    """Test win32 functions that create files."""
940
881
 
941
882
    def test_getcwd(self):
942
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
883
        self.requireFeature(tests.UnicodeFilenameFeature)
943
884
        os.mkdir(u'mu-\xb5')
944
885
        os.chdir(u'mu-\xb5')
945
886
        # TODO: jam 20060427 This will probably fail on Mac OSX because
950
891
 
951
892
    def test_minimum_path_selection(self):
952
893
        self.assertEqual(set(),
953
 
                         osutils.minimum_path_selection([]))
954
 
        self.assertEqual({'a'},
955
 
                         osutils.minimum_path_selection(['a']))
956
 
        self.assertEqual({'a', 'b'},
957
 
                         osutils.minimum_path_selection(['a', 'b']))
958
 
        self.assertEqual({'a/', 'b'},
959
 
                         osutils.minimum_path_selection(['a/', 'b']))
960
 
        self.assertEqual({'a/', 'b'},
961
 
                         osutils.minimum_path_selection(['a/c', 'a/', 'b']))
962
 
        self.assertEqual({'a-b', 'a', 'a0b'},
963
 
                         osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
 
894
            osutils.minimum_path_selection([]))
 
895
        self.assertEqual(set(['a']),
 
896
            osutils.minimum_path_selection(['a']))
 
897
        self.assertEqual(set(['a', 'b']),
 
898
            osutils.minimum_path_selection(['a', 'b']))
 
899
        self.assertEqual(set(['a/', 'b']),
 
900
            osutils.minimum_path_selection(['a/', 'b']))
 
901
        self.assertEqual(set(['a/', 'b']),
 
902
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
903
        self.assertEqual(set(['a-b', 'a', 'a0b']),
 
904
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
964
905
 
965
906
    def test_mkdtemp(self):
966
907
        tmpdir = osutils._win32_mkdtemp(dir='.')
967
908
        self.assertFalse('\\' in tmpdir)
968
909
 
969
910
    def test_rename(self):
970
 
        with open('a', 'wb') as a:
971
 
            a.write(b'foo\n')
972
 
        with open('b', 'wb') as b:
973
 
            b.write(b'baz\n')
 
911
        a = open('a', 'wb')
 
912
        a.write('foo\n')
 
913
        a.close()
 
914
        b = open('b', 'wb')
 
915
        b.write('baz\n')
 
916
        b.close()
974
917
 
975
918
        osutils._win32_rename('b', 'a')
976
 
        self.assertPathExists('a')
977
 
        self.assertPathDoesNotExist('b')
978
 
        self.assertFileEqual(b'baz\n', 'a')
 
919
        self.failUnlessExists('a')
 
920
        self.failIfExists('b')
 
921
        self.assertFileEqual('baz\n', 'a')
979
922
 
980
923
    def test_rename_missing_file(self):
981
 
        with open('a', 'wb') as a:
982
 
            a.write(b'foo\n')
 
924
        a = open('a', 'wb')
 
925
        a.write('foo\n')
 
926
        a.close()
983
927
 
984
928
        try:
985
929
            osutils._win32_rename('b', 'a')
986
 
        except (IOError, OSError) as e:
 
930
        except (IOError, OSError), e:
987
931
            self.assertEqual(errno.ENOENT, e.errno)
988
 
        self.assertFileEqual(b'foo\n', 'a')
 
932
        self.assertFileEqual('foo\n', 'a')
989
933
 
990
934
    def test_rename_missing_dir(self):
991
935
        os.mkdir('a')
992
936
        try:
993
937
            osutils._win32_rename('b', 'a')
994
 
        except (IOError, OSError) as e:
 
938
        except (IOError, OSError), e:
995
939
            self.assertEqual(errno.ENOENT, e.errno)
996
940
 
997
941
    def test_rename_current_dir(self):
1003
947
        # doesn't exist.
1004
948
        try:
1005
949
            osutils._win32_rename('b', '.')
1006
 
        except (IOError, OSError) as e:
 
950
        except (IOError, OSError), e:
1007
951
            self.assertEqual(errno.ENOENT, e.errno)
1008
952
 
1009
953
    def test_splitpath(self):
1014
958
        check(['a', 'b'], 'a/b')
1015
959
        check(['a', 'b'], 'a/./b')
1016
960
        check(['a', '.b'], 'a/.b')
1017
 
        if os.path.sep == '\\':
1018
 
            check(['a', '.b'], 'a\\.b')
1019
 
        else:
1020
 
            check(['a\\.b'], 'a\\.b')
 
961
        check(['a', '.b'], 'a\\.b')
1021
962
 
1022
963
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
1023
964
 
1035
976
    """Test mac special functions that require directories."""
1036
977
 
1037
978
    def test_getcwd(self):
1038
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
979
        self.requireFeature(tests.UnicodeFilenameFeature)
1039
980
        os.mkdir(u'B\xe5gfors')
1040
981
        os.chdir(u'B\xe5gfors')
1041
982
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
1042
983
 
1043
984
    def test_getcwd_nonnorm(self):
1044
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
985
        self.requireFeature(tests.UnicodeFilenameFeature)
1045
986
        # Test that _mac_getcwd() will normalize this path
1046
987
        os.mkdir(u'Ba\u030agfors')
1047
988
        os.chdir(u'Ba\u030agfors')
1051
992
class TestChunksToLines(tests.TestCase):
1052
993
 
1053
994
    def test_smoketest(self):
1054
 
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1055
 
                         osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
1056
 
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1057
 
                         osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
 
995
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
996
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
 
997
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
998
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
1058
999
 
1059
1000
    def test_osutils_binding(self):
1060
 
        from . import test__chunks_to_lines
 
1001
        from bzrlib.tests import test__chunks_to_lines
1061
1002
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
1062
 
            from .._chunks_to_lines_pyx import chunks_to_lines
 
1003
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
1063
1004
        else:
1064
 
            from .._chunks_to_lines_py import chunks_to_lines
 
1005
            from bzrlib._chunks_to_lines_py import chunks_to_lines
1065
1006
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1066
1007
 
1067
1008
 
1074
1015
                         osutils.split_lines(u'foo\nbar\xae\n'))
1075
1016
 
1076
1017
    def test_split_with_carriage_returns(self):
1077
 
        self.assertEqual([b'foo\rbar\n'],
1078
 
                         osutils.split_lines(b'foo\rbar\n'))
 
1018
        self.assertEqual(['foo\rbar\n'],
 
1019
                         osutils.split_lines('foo\rbar\n'))
1079
1020
 
1080
1021
 
1081
1022
class TestWalkDirs(tests.TestCaseInTempDir):
1096
1037
            ]
1097
1038
        self.build_tree(tree)
1098
1039
        expected_dirblocks = [
1099
 
            (('', '.'),
1100
 
             [('0file', '0file', 'file'),
1101
 
              ('1dir', '1dir', 'directory'),
1102
 
              ('2file', '2file', 'file'),
1103
 
              ]
1104
 
             ),
1105
 
            (('1dir', './1dir'),
1106
 
             [('1dir/0file', '0file', 'file'),
1107
 
              ('1dir/1dir', '1dir', 'directory'),
1108
 
              ]
1109
 
             ),
1110
 
            (('1dir/1dir', './1dir/1dir'),
1111
 
             [
1112
 
                ]
1113
 
             ),
 
1040
                (('', '.'),
 
1041
                 [('0file', '0file', 'file'),
 
1042
                  ('1dir', '1dir', 'directory'),
 
1043
                  ('2file', '2file', 'file'),
 
1044
                 ]
 
1045
                ),
 
1046
                (('1dir', './1dir'),
 
1047
                 [('1dir/0file', '0file', 'file'),
 
1048
                  ('1dir/1dir', '1dir', 'directory'),
 
1049
                 ]
 
1050
                ),
 
1051
                (('1dir/1dir', './1dir/1dir'),
 
1052
                 [
 
1053
                 ]
 
1054
                ),
1114
1055
            ]
1115
1056
        result = []
1116
1057
        found_bzrdir = False
1130
1071
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
1131
1072
 
1132
1073
    def test_walkdirs_os_error(self):
1133
 
        # <https://bugs.launchpad.net/bzr/+bug/338653>
 
1074
        # <https://bugs.edge.launchpad.net/bzr/+bug/338653>
1134
1075
        # Pyrex readdir didn't raise useful messages if it had an error
1135
1076
        # reading the directory
1136
1077
        if sys.platform == 'win32':
1137
1078
            raise tests.TestNotApplicable(
1138
1079
                "readdir IOError not tested on win32")
1139
 
        self.requireFeature(features.not_running_as_root)
1140
1080
        os.mkdir("test-unreadable")
1141
1081
        os.chmod("test-unreadable", 0000)
1142
1082
        # must chmod it back so that it can be removed
1143
 
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
 
1083
        self.addCleanup(os.chmod, "test-unreadable", 0700)
1144
1084
        # The error is not raised until the generator is actually evaluated.
1145
1085
        # (It would be ok if it happened earlier but at the moment it
1146
1086
        # doesn't.)
1147
1087
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1148
 
        self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
1149
 
        self.assertEqual(errno.EACCES, e.errno)
 
1088
        self.assertEquals('./test-unreadable', e.filename)
 
1089
        self.assertEquals(errno.EACCES, e.errno)
1150
1090
        # Ensure the message contains the file name
1151
 
        self.assertContainsRe(str(e), "\\./test-unreadable")
1152
 
 
1153
 
    def test_walkdirs_encoding_error(self):
1154
 
        # <https://bugs.launchpad.net/bzr/+bug/488519>
1155
 
        # walkdirs didn't raise a useful message when the filenames
1156
 
        # are not using the filesystem's encoding
1157
 
 
1158
 
        # require a bytestring based filesystem
1159
 
        self.requireFeature(features.ByteStringNamedFilesystem)
1160
 
 
1161
 
        tree = [
1162
 
            '.bzr',
1163
 
            '0file',
1164
 
            '1dir/',
1165
 
            '1dir/0file',
1166
 
            '1dir/1dir/',
1167
 
            '1file'
1168
 
            ]
1169
 
 
1170
 
        self.build_tree(tree)
1171
 
 
1172
 
        # rename the 1file to a latin-1 filename
1173
 
        os.rename(b"./1file", b"\xe8file")
1174
 
        if b"\xe8file" not in os.listdir("."):
1175
 
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
1176
 
 
1177
 
        self._save_platform_info()
1178
 
        osutils._fs_enc = 'UTF-8'
1179
 
 
1180
 
        # this should raise on error
1181
 
        def attempt():
1182
 
            for dirdetail, dirblock in osutils.walkdirs(b'.'):
1183
 
                pass
1184
 
 
1185
 
        self.assertRaises(errors.BadFilenameEncoding, attempt)
 
1091
        self.assertContainsRe(str(e), "\./test-unreadable")
1186
1092
 
1187
1093
    def test__walkdirs_utf8(self):
1188
1094
        tree = [
1195
1101
            ]
1196
1102
        self.build_tree(tree)
1197
1103
        expected_dirblocks = [
1198
 
            (('', '.'),
1199
 
             [('0file', '0file', 'file'),
1200
 
              ('1dir', '1dir', 'directory'),
1201
 
              ('2file', '2file', 'file'),
1202
 
              ]
1203
 
             ),
1204
 
            (('1dir', './1dir'),
1205
 
             [('1dir/0file', '0file', 'file'),
1206
 
              ('1dir/1dir', '1dir', 'directory'),
1207
 
              ]
1208
 
             ),
1209
 
            (('1dir/1dir', './1dir/1dir'),
1210
 
             [
1211
 
                ]
1212
 
             ),
 
1104
                (('', '.'),
 
1105
                 [('0file', '0file', 'file'),
 
1106
                  ('1dir', '1dir', 'directory'),
 
1107
                  ('2file', '2file', 'file'),
 
1108
                 ]
 
1109
                ),
 
1110
                (('1dir', './1dir'),
 
1111
                 [('1dir/0file', '0file', 'file'),
 
1112
                  ('1dir/1dir', '1dir', 'directory'),
 
1113
                 ]
 
1114
                ),
 
1115
                (('1dir/1dir', './1dir/1dir'),
 
1116
                 [
 
1117
                 ]
 
1118
                ),
1213
1119
            ]
1214
1120
        result = []
1215
1121
        found_bzrdir = False
1216
 
        for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
1217
 
            if len(dirblock) and dirblock[0][1] == b'.bzr':
 
1122
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1123
            if len(dirblock) and dirblock[0][1] == '.bzr':
1218
1124
                # this tests the filtering of selected paths
1219
1125
                found_bzrdir = True
1220
1126
                del dirblock[0]
1221
 
            dirdetail = (dirdetail[0].decode('utf-8'),
1222
 
                         osutils.safe_unicode(dirdetail[1]))
1223
 
            dirblock = [
1224
 
                (entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
1225
 
                for entry in dirblock]
1226
1127
            result.append((dirdetail, dirblock))
1227
1128
 
1228
1129
        self.assertTrue(found_bzrdir)
1244
1145
            dirblock[:] = new_dirblock
1245
1146
 
1246
1147
    def _save_platform_info(self):
 
1148
        self.overrideAttr(win32utils, 'winver')
1247
1149
        self.overrideAttr(osutils, '_fs_enc')
1248
1150
        self.overrideAttr(osutils, '_selected_dir_reader')
1249
1151
 
1250
 
    def assertDirReaderIs(self, expected, top):
 
1152
    def assertDirReaderIs(self, expected):
1251
1153
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
1252
1154
        # Force it to redetect
1253
1155
        osutils._selected_dir_reader = None
1254
1156
        # Nothing to list, but should still trigger the selection logic
1255
 
        self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
 
1157
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1256
1158
        self.assertIsInstance(osutils._selected_dir_reader, expected)
1257
1159
 
1258
1160
    def test_force_walkdirs_utf8_fs_utf8(self):
1259
1161
        self.requireFeature(UTF8DirReaderFeature)
1260
1162
        self._save_platform_info()
1261
 
        osutils._fs_enc = 'utf-8'
1262
 
        self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1163
        win32utils.winver = None # Avoid the win32 detection code
 
1164
        osutils._fs_enc = 'UTF-8'
 
1165
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1263
1166
 
1264
1167
    def test_force_walkdirs_utf8_fs_ascii(self):
1265
1168
        self.requireFeature(UTF8DirReaderFeature)
1266
1169
        self._save_platform_info()
1267
 
        osutils._fs_enc = 'ascii'
1268
 
        self.assertDirReaderIs(
1269
 
            UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1170
        win32utils.winver = None # Avoid the win32 detection code
 
1171
        osutils._fs_enc = 'US-ASCII'
 
1172
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1173
 
 
1174
    def test_force_walkdirs_utf8_fs_ANSI(self):
 
1175
        self.requireFeature(UTF8DirReaderFeature)
 
1176
        self._save_platform_info()
 
1177
        win32utils.winver = None # Avoid the win32 detection code
 
1178
        osutils._fs_enc = 'ANSI_X3.4-1968'
 
1179
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1270
1180
 
1271
1181
    def test_force_walkdirs_utf8_fs_latin1(self):
1272
1182
        self._save_platform_info()
1273
 
        osutils._fs_enc = 'iso-8859-1'
1274
 
        self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
 
1183
        win32utils.winver = None # Avoid the win32 detection code
 
1184
        osutils._fs_enc = 'latin1'
 
1185
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1275
1186
 
1276
1187
    def test_force_walkdirs_utf8_nt(self):
1277
1188
        # Disabled because the thunk of the whole walkdirs api is disabled.
1278
1189
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1279
1190
        self._save_platform_info()
1280
 
        from .._walkdirs_win32 import Win32ReadDir
1281
 
        self.assertDirReaderIs(Win32ReadDir, ".")
 
1191
        win32utils.winver = 'Windows NT'
 
1192
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1193
        self.assertDirReaderIs(Win32ReadDir)
 
1194
 
 
1195
    def test_force_walkdirs_utf8_98(self):
 
1196
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1197
        self._save_platform_info()
 
1198
        win32utils.winver = 'Windows 98'
 
1199
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1282
1200
 
1283
1201
    def test_unicode_walkdirs(self):
1284
1202
        """Walkdirs should always return unicode paths."""
1285
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1203
        self.requireFeature(tests.UnicodeFilenameFeature)
1286
1204
        name0 = u'0file-\xb6'
1287
1205
        name1 = u'1dir-\u062c\u0648'
1288
1206
        name2 = u'2file-\u0633'
1295
1213
            ]
1296
1214
        self.build_tree(tree)
1297
1215
        expected_dirblocks = [
1298
 
            ((u'', u'.'),
1299
 
             [(name0, name0, 'file', './' + name0),
1300
 
              (name1, name1, 'directory', './' + name1),
1301
 
              (name2, name2, 'file', './' + name2),
1302
 
              ]
1303
 
             ),
1304
 
            ((name1, './' + name1),
1305
 
             [(name1 + '/' + name0, name0, 'file', './' + name1
1306
 
               + '/' + name0),
1307
 
              (name1 + '/' + name1, name1, 'directory', './' + name1
1308
 
               + '/' + name1),
1309
 
              ]
1310
 
             ),
1311
 
            ((name1 + '/' + name1, './' + name1 + '/' + name1),
1312
 
             [
1313
 
                ]
1314
 
             ),
 
1216
                ((u'', u'.'),
 
1217
                 [(name0, name0, 'file', './' + name0),
 
1218
                  (name1, name1, 'directory', './' + name1),
 
1219
                  (name2, name2, 'file', './' + name2),
 
1220
                 ]
 
1221
                ),
 
1222
                ((name1, './' + name1),
 
1223
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1224
                                                        + '/' + name0),
 
1225
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1226
                                                            + '/' + name1),
 
1227
                 ]
 
1228
                ),
 
1229
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1230
                 [
 
1231
                 ]
 
1232
                ),
1315
1233
            ]
1316
1234
        result = list(osutils.walkdirs('.'))
1317
1235
        self._filter_out_stat(result)
1318
1236
        self.assertEqual(expected_dirblocks, result)
1319
 
        result = list(osutils.walkdirs(u'./' + name1, name1))
 
1237
        result = list(osutils.walkdirs(u'./'+name1, name1))
1320
1238
        self._filter_out_stat(result)
1321
1239
        self.assertEqual(expected_dirblocks[1:], result)
1322
1240
 
1325
1243
 
1326
1244
        The abspath portion might be in unicode or utf-8
1327
1245
        """
1328
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1246
        self.requireFeature(tests.UnicodeFilenameFeature)
1329
1247
        name0 = u'0file-\xb6'
1330
1248
        name1 = u'1dir-\u062c\u0648'
1331
1249
        name2 = u'2file-\u0633'
1342
1260
        name2 = name2.encode('utf8')
1343
1261
 
1344
1262
        expected_dirblocks = [
1345
 
            ((b'', b'.'),
1346
 
             [(name0, name0, 'file', b'./' + name0),
1347
 
              (name1, name1, 'directory', b'./' + name1),
1348
 
              (name2, name2, 'file', b'./' + name2),
1349
 
              ]
1350
 
             ),
1351
 
            ((name1, b'./' + name1),
1352
 
             [(name1 + b'/' + name0, name0, 'file', b'./' + name1
1353
 
               + b'/' + name0),
1354
 
              (name1 + b'/' + name1, name1, 'directory', b'./' + name1
1355
 
               + b'/' + name1),
1356
 
              ]
1357
 
             ),
1358
 
            ((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
1359
 
             [
1360
 
                ]
1361
 
             ),
 
1263
                (('', '.'),
 
1264
                 [(name0, name0, 'file', './' + name0),
 
1265
                  (name1, name1, 'directory', './' + name1),
 
1266
                  (name2, name2, 'file', './' + name2),
 
1267
                 ]
 
1268
                ),
 
1269
                ((name1, './' + name1),
 
1270
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1271
                                                        + '/' + name0),
 
1272
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1273
                                                            + '/' + name1),
 
1274
                 ]
 
1275
                ),
 
1276
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1277
                 [
 
1278
                 ]
 
1279
                ),
1362
1280
            ]
1363
1281
        result = []
1364
1282
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1365
1283
        # all abspaths are Unicode, and encode them back into utf8.
1366
1284
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1367
 
            self.assertIsInstance(dirdetail[0], bytes)
1368
 
            if isinstance(dirdetail[1], text_type):
 
1285
            self.assertIsInstance(dirdetail[0], str)
 
1286
            if isinstance(dirdetail[1], unicode):
1369
1287
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1370
1288
                dirblock = [list(info) for info in dirblock]
1371
1289
                for info in dirblock:
1372
 
                    self.assertIsInstance(info[4], text_type)
 
1290
                    self.assertIsInstance(info[4], unicode)
1373
1291
                    info[4] = info[4].encode('utf8')
1374
1292
            new_dirblock = []
1375
1293
            for info in dirblock:
1376
 
                self.assertIsInstance(info[0], bytes)
1377
 
                self.assertIsInstance(info[1], bytes)
1378
 
                self.assertIsInstance(info[4], bytes)
 
1294
                self.assertIsInstance(info[0], str)
 
1295
                self.assertIsInstance(info[1], str)
 
1296
                self.assertIsInstance(info[4], str)
1379
1297
                # Remove the stat information
1380
1298
                new_dirblock.append((info[0], info[1], info[2], info[4]))
1381
1299
            result.append((dirdetail, new_dirblock))
1386
1304
 
1387
1305
        The abspath portion should be in unicode
1388
1306
        """
1389
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1307
        self.requireFeature(tests.UnicodeFilenameFeature)
1390
1308
        # Use the unicode reader. TODO: split into driver-and-driven unit
1391
1309
        # tests.
1392
1310
        self._save_platform_info()
1409
1327
        # All of the abspaths should be in unicode, all of the relative paths
1410
1328
        # should be in utf8
1411
1329
        expected_dirblocks = [
1412
 
            ((b'', '.'),
1413
 
             [(name0, name0, 'file', './' + name0u),
1414
 
              (name1, name1, 'directory', './' + name1u),
1415
 
              (name2, name2, 'file', './' + name2u),
1416
 
              ]
1417
 
             ),
1418
 
            ((name1, './' + name1u),
1419
 
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
1420
 
               + '/' + name0u),
1421
 
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
1422
 
               + '/' + name1u),
1423
 
              ]
1424
 
             ),
1425
 
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1426
 
             [
1427
 
                ]
1428
 
             ),
 
1330
                (('', '.'),
 
1331
                 [(name0, name0, 'file', './' + name0u),
 
1332
                  (name1, name1, 'directory', './' + name1u),
 
1333
                  (name2, name2, 'file', './' + name2u),
 
1334
                 ]
 
1335
                ),
 
1336
                ((name1, './' + name1u),
 
1337
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1338
                                                        + '/' + name0u),
 
1339
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1340
                                                            + '/' + name1u),
 
1341
                 ]
 
1342
                ),
 
1343
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1344
                 [
 
1345
                 ]
 
1346
                ),
1429
1347
            ]
1430
1348
        result = list(osutils._walkdirs_utf8('.'))
1431
1349
        self._filter_out_stat(result)
1433
1351
 
1434
1352
    def test__walkdirs_utf8_win32readdir(self):
1435
1353
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1436
 
        self.requireFeature(features.UnicodeFilenameFeature)
1437
 
        from .._walkdirs_win32 import Win32ReadDir
 
1354
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1355
        from bzrlib._walkdirs_win32 import Win32ReadDir
1438
1356
        self._save_platform_info()
1439
1357
        osutils._selected_dir_reader = Win32ReadDir()
1440
1358
        name0u = u'0file-\xb6'
1455
1373
        # All of the abspaths should be in unicode, all of the relative paths
1456
1374
        # should be in utf8
1457
1375
        expected_dirblocks = [
1458
 
            (('', '.'),
1459
 
             [(name0, name0, 'file', './' + name0u),
1460
 
              (name1, name1, 'directory', './' + name1u),
1461
 
              (name2, name2, 'file', './' + name2u),
1462
 
              ]
1463
 
             ),
1464
 
            ((name1, './' + name1u),
1465
 
             [(name1 + '/' + name0, name0, 'file', './' + name1u
1466
 
               + '/' + name0u),
1467
 
              (name1 + '/' + name1, name1, 'directory', './' + name1u
1468
 
               + '/' + name1u),
1469
 
              ]
1470
 
             ),
1471
 
            ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1472
 
             [
1473
 
                ]
1474
 
             ),
 
1376
                (('', '.'),
 
1377
                 [(name0, name0, 'file', './' + name0u),
 
1378
                  (name1, name1, 'directory', './' + name1u),
 
1379
                  (name2, name2, 'file', './' + name2u),
 
1380
                 ]
 
1381
                ),
 
1382
                ((name1, './' + name1u),
 
1383
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1384
                                                        + '/' + name0u),
 
1385
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1386
                                                            + '/' + name1u),
 
1387
                 ]
 
1388
                ),
 
1389
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1390
                 [
 
1391
                 ]
 
1392
                ),
1475
1393
            ]
1476
1394
        result = list(osutils._walkdirs_utf8(u'.'))
1477
1395
        self._filter_out_stat(result)
1490
1408
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1491
1409
        """make sure our Stat values are valid"""
1492
1410
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1493
 
        self.requireFeature(features.UnicodeFilenameFeature)
1494
 
        from .._walkdirs_win32 import Win32ReadDir
 
1411
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1412
        from bzrlib._walkdirs_win32 import Win32ReadDir
1495
1413
        name0u = u'0file-\xb6'
1496
1414
        name0 = name0u.encode('utf8')
1497
1415
        self.build_tree([name0u])
1498
1416
        # I hate to sleep() here, but I'm trying to make the ctime different
1499
1417
        # from the mtime
1500
1418
        time.sleep(2)
1501
 
        with open(name0u, 'ab') as f:
1502
 
            f.write(b'just a small update')
 
1419
        f = open(name0u, 'ab')
 
1420
        try:
 
1421
            f.write('just a small update')
 
1422
        finally:
 
1423
            f.close()
1503
1424
 
1504
1425
        result = Win32ReadDir().read_dir('', u'.')
1505
1426
        entry = result[0]
1511
1432
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1512
1433
        """make sure our Stat values are valid"""
1513
1434
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1514
 
        self.requireFeature(features.UnicodeFilenameFeature)
1515
 
        from .._walkdirs_win32 import Win32ReadDir
 
1435
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1436
        from bzrlib._walkdirs_win32 import Win32ReadDir
1516
1437
        name0u = u'0dir-\u062c\u0648'
1517
1438
        name0 = name0u.encode('utf8')
1518
1439
        self.build_tree([name0u + '/'])
1598
1519
        # using the comparison routine shoudl work too:
1599
1520
        self.assertEqual(
1600
1521
            dir_sorted_paths,
1601
 
            sorted(original_paths, key=osutils.path_prefix_key))
 
1522
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1602
1523
 
1603
1524
 
1604
1525
class TestCopyTree(tests.TestCaseInTempDir):
1617
1538
        self.assertEqual(['c'], os.listdir('target/b'))
1618
1539
 
1619
1540
    def test_copy_tree_symlinks(self):
1620
 
        self.requireFeature(features.SymlinkFeature)
 
1541
        self.requireFeature(tests.SymlinkFeature)
1621
1542
        self.build_tree(['source/'])
1622
1543
        os.symlink('a/generic/path', 'source/lnk')
1623
1544
        osutils.copy_tree('source', 'target')
1627
1548
    def test_copy_tree_handlers(self):
1628
1549
        processed_files = []
1629
1550
        processed_links = []
1630
 
 
1631
1551
        def file_handler(from_path, to_path):
1632
1552
            processed_files.append(('f', from_path, to_path))
1633
 
 
1634
1553
        def dir_handler(from_path, to_path):
1635
1554
            processed_files.append(('d', from_path, to_path))
1636
 
 
1637
1555
        def link_handler(from_path, to_path):
1638
1556
            processed_links.append((from_path, to_path))
1639
 
        handlers = {'file': file_handler,
1640
 
                    'directory': dir_handler,
1641
 
                    'symlink': link_handler,
1642
 
                    }
 
1557
        handlers = {'file':file_handler,
 
1558
                    'directory':dir_handler,
 
1559
                    'symlink':link_handler,
 
1560
                   }
1643
1561
 
1644
1562
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1645
1563
        if osutils.has_symlinks():
1650
1568
                          ('f', 'source/a', 'target/a'),
1651
1569
                          ('d', 'source/b', 'target/b'),
1652
1570
                          ('f', 'source/b/c', 'target/b/c'),
1653
 
                          ], processed_files)
1654
 
        self.assertPathDoesNotExist('target')
 
1571
                         ], processed_files)
 
1572
        self.failIfExists('target')
1655
1573
        if osutils.has_symlinks():
1656
1574
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1657
1575
 
1662
1580
    def setUp(self):
1663
1581
        super(TestSetUnsetEnv, self).setUp()
1664
1582
 
1665
 
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
 
1583
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1666
1584
                         'Environment was not cleaned up properly.'
1667
 
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
1668
 
 
 
1585
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
1669
1586
        def cleanup():
1670
 
            if 'BRZ_TEST_ENV_VAR' in os.environ:
1671
 
                del os.environ['BRZ_TEST_ENV_VAR']
 
1587
            if 'BZR_TEST_ENV_VAR' in os.environ:
 
1588
                del os.environ['BZR_TEST_ENV_VAR']
1672
1589
        self.addCleanup(cleanup)
1673
1590
 
1674
1591
    def test_set(self):
1675
1592
        """Test that we can set an env variable"""
1676
 
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1593
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1677
1594
        self.assertEqual(None, old)
1678
 
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1595
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1679
1596
 
1680
1597
    def test_double_set(self):
1681
1598
        """Test that we get the old value out"""
1682
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1683
 
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
 
1599
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1600
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1684
1601
        self.assertEqual('foo', old)
1685
 
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1602
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1686
1603
 
1687
1604
    def test_unicode(self):
1688
1605
        """Environment can only contain plain strings
1695
1612
                'Cannot find a unicode character that works in encoding %s'
1696
1613
                % (osutils.get_user_encoding(),))
1697
1614
 
1698
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
1699
 
        if PY3:
1700
 
            self.assertEqual(uni_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1701
 
        else:
1702
 
            self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1615
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
 
1616
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1703
1617
 
1704
1618
    def test_unset(self):
1705
1619
        """Test that passing None will remove the env var"""
1706
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1707
 
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
 
1620
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1621
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1708
1622
        self.assertEqual('foo', old)
1709
 
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
1710
 
        self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
 
1623
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
 
1624
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1711
1625
 
1712
1626
 
1713
1627
class TestSizeShaFile(tests.TestCaseInTempDir):
1714
1628
 
1715
1629
    def test_sha_empty(self):
1716
 
        self.build_tree_contents([('foo', b'')])
1717
 
        expected_sha = osutils.sha_string(b'')
 
1630
        self.build_tree_contents([('foo', '')])
 
1631
        expected_sha = osutils.sha_string('')
1718
1632
        f = open('foo')
1719
1633
        self.addCleanup(f.close)
1720
1634
        size, sha = osutils.size_sha_file(f)
1722
1636
        self.assertEqual(expected_sha, sha)
1723
1637
 
1724
1638
    def test_sha_mixed_endings(self):
1725
 
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1639
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1726
1640
        self.build_tree_contents([('foo', text)])
1727
1641
        expected_sha = osutils.sha_string(text)
1728
1642
        f = open('foo', 'rb')
1735
1649
class TestShaFileByName(tests.TestCaseInTempDir):
1736
1650
 
1737
1651
    def test_sha_empty(self):
1738
 
        self.build_tree_contents([('foo', b'')])
1739
 
        expected_sha = osutils.sha_string(b'')
 
1652
        self.build_tree_contents([('foo', '')])
 
1653
        expected_sha = osutils.sha_string('')
1740
1654
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1741
1655
 
1742
1656
    def test_sha_mixed_endings(self):
1743
 
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1657
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1744
1658
        self.build_tree_contents([('foo', text)])
1745
1659
        expected_sha = osutils.sha_string(text)
1746
1660
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1749
1663
class TestResourceLoading(tests.TestCaseInTempDir):
1750
1664
 
1751
1665
    def test_resource_string(self):
1752
 
        # test resource in breezy
1753
 
        text = osutils.resource_string('breezy', 'debug.py')
 
1666
        # test resource in bzrlib
 
1667
        text = osutils.resource_string('bzrlib', 'debug.py')
1754
1668
        self.assertContainsRe(text, "debug_flags = set()")
1755
 
        # test resource under breezy
1756
 
        text = osutils.resource_string('breezy.ui', 'text.py')
 
1669
        # test resource under bzrlib
 
1670
        text = osutils.resource_string('bzrlib.ui', 'text.py')
1757
1671
        self.assertContainsRe(text, "class TextUIFactory")
1758
1672
        # test unsupported package
1759
1673
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1760
 
                          'yyy.xx')
 
1674
            'yyy.xx')
1761
1675
        # test unknown resource
1762
 
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
 
1676
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
 
1677
 
 
1678
 
 
1679
class TestReCompile(tests.TestCase):
 
1680
 
 
1681
    def test_re_compile_checked(self):
 
1682
        r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
 
1683
        self.assertTrue(r.match('aaaa'))
 
1684
        self.assertTrue(r.match('aAaA'))
 
1685
 
 
1686
    def test_re_compile_checked_error(self):
 
1687
        # like https://bugs.launchpad.net/bzr/+bug/251352
 
1688
        err = self.assertRaises(
 
1689
            errors.BzrCommandError,
 
1690
            osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
 
1691
        self.assertEqual(
 
1692
            "Invalid regular expression in test case: '*': "
 
1693
            "nothing to repeat",
 
1694
            str(err))
1763
1695
 
1764
1696
 
1765
1697
class TestDirReader(tests.TestCaseInTempDir):
1766
1698
 
1767
 
    scenarios = dir_reader_scenarios()
1768
 
 
1769
1699
    # Set by load_tests
1770
1700
    _dir_reader_class = None
1771
1701
    _native_to_unicode = None
1772
1702
 
1773
1703
    def setUp(self):
1774
 
        super(TestDirReader, self).setUp()
 
1704
        tests.TestCaseInTempDir.setUp(self)
1775
1705
        self.overrideAttr(osutils,
1776
1706
                          '_selected_dir_reader', self._dir_reader_class())
1777
1707
 
1784
1714
            '2file'
1785
1715
            ]
1786
1716
        expected_dirblocks = [
1787
 
            ((b'', '.'),
1788
 
             [(b'0file', b'0file', 'file', './0file'),
1789
 
              (b'1dir', b'1dir', 'directory', './1dir'),
1790
 
              (b'2file', b'2file', 'file', './2file'),
1791
 
              ]
1792
 
             ),
1793
 
            ((b'1dir', './1dir'),
1794
 
             [(b'1dir/0file', b'0file', 'file', './1dir/0file'),
1795
 
              (b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
1796
 
              ]
1797
 
             ),
1798
 
            ((b'1dir/1dir', './1dir/1dir'),
1799
 
             [
1800
 
                ]
1801
 
             ),
 
1717
                (('', '.'),
 
1718
                 [('0file', '0file', 'file'),
 
1719
                  ('1dir', '1dir', 'directory'),
 
1720
                  ('2file', '2file', 'file'),
 
1721
                 ]
 
1722
                ),
 
1723
                (('1dir', './1dir'),
 
1724
                 [('1dir/0file', '0file', 'file'),
 
1725
                  ('1dir/1dir', '1dir', 'directory'),
 
1726
                 ]
 
1727
                ),
 
1728
                (('1dir/1dir', './1dir/1dir'),
 
1729
                 [
 
1730
                 ]
 
1731
                ),
1802
1732
            ]
1803
1733
        return tree, expected_dirblocks
1804
1734
 
1808
1738
        result = list(osutils._walkdirs_utf8('.'))
1809
1739
        # Filter out stat and abspath
1810
1740
        self.assertEqual(expected_dirblocks,
1811
 
                         self._filter_out(result))
 
1741
                         [(dirinfo, [line[0:3] for line in block])
 
1742
                          for dirinfo, block in result])
1812
1743
 
1813
1744
    def test_walk_sub_dir(self):
1814
1745
        tree, expected_dirblocks = self._get_ascii_tree()
1815
1746
        self.build_tree(tree)
1816
1747
        # you can search a subdir only, with a supplied prefix.
1817
 
        result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
 
1748
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1818
1749
        # Filter out stat and abspath
1819
1750
        self.assertEqual(expected_dirblocks[1:],
1820
 
                         self._filter_out(result))
 
1751
                         [(dirinfo, [line[0:3] for line in block])
 
1752
                          for dirinfo, block in result])
1821
1753
 
1822
1754
    def _get_unicode_tree(self):
1823
1755
        name0u = u'0file-\xb6'
1834
1766
        name1 = name1u.encode('UTF-8')
1835
1767
        name2 = name2u.encode('UTF-8')
1836
1768
        expected_dirblocks = [
1837
 
            ((b'', '.'),
1838
 
             [(name0, name0, 'file', './' + name0u),
1839
 
              (name1, name1, 'directory', './' + name1u),
1840
 
              (name2, name2, 'file', './' + name2u),
1841
 
              ]
1842
 
             ),
1843
 
            ((name1, './' + name1u),
1844
 
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
1845
 
               + '/' + name0u),
1846
 
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
1847
 
               + '/' + name1u),
1848
 
              ]
1849
 
             ),
1850
 
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1851
 
             [
1852
 
                ]
1853
 
             ),
 
1769
                (('', '.'),
 
1770
                 [(name0, name0, 'file', './' + name0u),
 
1771
                  (name1, name1, 'directory', './' + name1u),
 
1772
                  (name2, name2, 'file', './' + name2u),
 
1773
                 ]
 
1774
                ),
 
1775
                ((name1, './' + name1u),
 
1776
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1777
                                                        + '/' + name0u),
 
1778
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1779
                                                            + '/' + name1u),
 
1780
                 ]
 
1781
                ),
 
1782
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1783
                 [
 
1784
                 ]
 
1785
                ),
1854
1786
            ]
1855
1787
        return tree, expected_dirblocks
1856
1788
 
1864
1796
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1865
1797
            details = []
1866
1798
            for line in block:
1867
 
                details.append(
1868
 
                    line[0:3] + (self._native_to_unicode(line[4]), ))
 
1799
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1869
1800
            filtered_dirblocks.append((dirinfo, details))
1870
1801
        return filtered_dirblocks
1871
1802
 
1872
1803
    def test_walk_unicode_tree(self):
1873
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1804
        self.requireFeature(tests.UnicodeFilenameFeature)
1874
1805
        tree, expected_dirblocks = self._get_unicode_tree()
1875
1806
        self.build_tree(tree)
1876
1807
        result = list(osutils._walkdirs_utf8('.'))
1877
1808
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1878
1809
 
1879
1810
    def test_symlink(self):
1880
 
        self.requireFeature(features.SymlinkFeature)
1881
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1811
        self.requireFeature(tests.SymlinkFeature)
 
1812
        self.requireFeature(tests.UnicodeFilenameFeature)
1882
1813
        target = u'target\N{Euro Sign}'
1883
1814
        link_name = u'l\N{Euro Sign}nk'
1884
1815
        os.symlink(target, link_name)
 
1816
        target_utf8 = target.encode('UTF-8')
1885
1817
        link_name_utf8 = link_name.encode('UTF-8')
1886
1818
        expected_dirblocks = [
1887
 
            ((b'', '.'),
1888
 
             [(link_name_utf8, link_name_utf8,
1889
 
               'symlink', './' + link_name), ],
1890
 
             )]
 
1819
                (('', '.'),
 
1820
                 [(link_name_utf8, link_name_utf8,
 
1821
                   'symlink', './' + link_name),],
 
1822
                 )]
1891
1823
        result = list(osutils._walkdirs_utf8('.'))
1892
1824
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1893
1825
 
1901
1833
    But prior python versions failed to properly encode the passed unicode
1902
1834
    string.
1903
1835
    """
1904
 
    _test_needs_features = [features.SymlinkFeature,
1905
 
                            features.UnicodeFilenameFeature]
 
1836
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1906
1837
 
1907
1838
    def setUp(self):
1908
1839
        super(tests.TestCaseInTempDir, self).setUp()
1911
1842
        os.symlink(self.target, self.link)
1912
1843
 
1913
1844
    def test_os_readlink_link_encoding(self):
1914
 
        self.assertEqual(self.target, os.readlink(self.link))
 
1845
        if sys.version_info < (2, 6):
 
1846
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
 
1847
        else:
 
1848
            self.assertEquals(self.target,  os.readlink(self.link))
1915
1849
 
1916
1850
    def test_os_readlink_link_decoding(self):
1917
 
        self.assertEqual(self.target.encode(osutils._fs_enc),
1918
 
                         os.readlink(self.link.encode(osutils._fs_enc)))
 
1851
        self.assertEquals(self.target.encode(osutils._fs_enc),
 
1852
                          os.readlink(self.link.encode(osutils._fs_enc)))
1919
1853
 
1920
1854
 
1921
1855
class TestConcurrency(tests.TestCase):
1929
1863
        self.assertIsInstance(concurrency, int)
1930
1864
 
1931
1865
    def test_local_concurrency_environment_variable(self):
1932
 
        self.overrideEnv('BRZ_CONCURRENCY', '2')
 
1866
        os.environ['BZR_CONCURRENCY'] = '2'
1933
1867
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1934
 
        self.overrideEnv('BRZ_CONCURRENCY', '3')
 
1868
        os.environ['BZR_CONCURRENCY'] = '3'
1935
1869
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1936
 
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
 
1870
        os.environ['BZR_CONCURRENCY'] = 'foo'
1937
1871
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1938
1872
 
1939
1873
    def test_option_concurrency(self):
1940
 
        self.overrideEnv('BRZ_CONCURRENCY', '1')
 
1874
        os.environ['BZR_CONCURRENCY'] = '1'
1941
1875
        self.run_bzr('rocks --concurrency 42')
1942
 
        # Command line overrides environment variable
1943
 
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
1944
 
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
 
1876
        # Command line overrides envrionment variable
 
1877
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
 
1878
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1945
1879
 
1946
1880
 
1947
1881
class TestFailedToLoadExtension(tests.TestCase):
1948
1882
 
1949
1883
    def _try_loading(self):
1950
1884
        try:
1951
 
            import breezy._fictional_extension_py  # noqa: F401
1952
 
        except ImportError as e:
 
1885
            import bzrlib._fictional_extension_py
 
1886
        except ImportError, e:
1953
1887
            osutils.failed_to_load_extension(e)
1954
1888
            return True
1955
1889
 
1960
1894
    def test_failure_to_load(self):
1961
1895
        self._try_loading()
1962
1896
        self.assertLength(1, osutils._extension_load_failures)
1963
 
        if PY3:
1964
 
            self.assertEqual(
1965
 
                osutils._extension_load_failures[0],
1966
 
                "No module named 'breezy._fictional_extension_py'")
1967
 
        else:
1968
 
            self.assertEqual(osutils._extension_load_failures[0],
1969
 
                             "No module named _fictional_extension_py")
 
1897
        self.assertEquals(osutils._extension_load_failures[0],
 
1898
            "No module named _fictional_extension_py")
1970
1899
 
1971
1900
    def test_report_extension_load_failures_no_warning(self):
1972
1901
        self.assertTrue(self._try_loading())
1973
 
        warnings, result = self.callCatchWarnings(
1974
 
            osutils.report_extension_load_failures)
 
1902
        warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1975
1903
        # it used to give a Python warning; it no longer does
1976
1904
        self.assertLength(0, warnings)
1977
1905
 
1978
1906
    def test_report_extension_load_failures_message(self):
1979
 
        log = BytesIO()
 
1907
        log = StringIO()
1980
1908
        trace.push_log_file(log)
1981
1909
        self.assertTrue(self._try_loading())
1982
1910
        osutils.report_extension_load_failures()
1983
1911
        self.assertContainsRe(
1984
1912
            log.getvalue(),
1985
 
            br"brz: warning: some compiled extensions could not be loaded; "
1986
 
            b"see ``brz help missing-extensions``\n"
 
1913
            r"bzr: warning: some compiled extensions could not be loaded; "
 
1914
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
1987
1915
            )
1988
1916
 
1989
1917
 
1990
1918
class TestTerminalWidth(tests.TestCase):
1991
1919
 
1992
 
    def setUp(self):
1993
 
        super(TestTerminalWidth, self).setUp()
1994
 
        self._orig_terminal_size_state = osutils._terminal_size_state
1995
 
        self._orig_first_terminal_size = osutils._first_terminal_size
1996
 
        self.addCleanup(self.restore_osutils_globals)
1997
 
        osutils._terminal_size_state = 'no_data'
1998
 
        osutils._first_terminal_size = None
1999
 
 
2000
 
    def restore_osutils_globals(self):
2001
 
        osutils._terminal_size_state = self._orig_terminal_size_state
2002
 
        osutils._first_terminal_size = self._orig_first_terminal_size
2003
 
 
2004
1920
    def replace_stdout(self, new):
2005
1921
        self.overrideAttr(sys, 'stdout', new)
2006
1922
 
2018
1934
    def test_default_values(self):
2019
1935
        self.assertEqual(80, osutils.default_terminal_width)
2020
1936
 
2021
 
    def test_defaults_to_BRZ_COLUMNS(self):
2022
 
        # BRZ_COLUMNS is set by the test framework
2023
 
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
2024
 
        self.overrideEnv('BRZ_COLUMNS', '12')
 
1937
    def test_defaults_to_BZR_COLUMNS(self):
 
1938
        # BZR_COLUMNS is set by the test framework
 
1939
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
 
1940
        os.environ['BZR_COLUMNS'] = '12'
2025
1941
        self.assertEqual(12, osutils.terminal_width())
2026
1942
 
2027
 
    def test_BRZ_COLUMNS_0_no_limit(self):
2028
 
        self.overrideEnv('BRZ_COLUMNS', '0')
2029
 
        self.assertEqual(None, osutils.terminal_width())
2030
 
 
2031
1943
    def test_falls_back_to_COLUMNS(self):
2032
 
        self.overrideEnv('BRZ_COLUMNS', None)
 
1944
        del os.environ['BZR_COLUMNS']
2033
1945
        self.assertNotEqual('42', os.environ['COLUMNS'])
2034
1946
        self.set_fake_tty()
2035
 
        self.overrideEnv('COLUMNS', '42')
 
1947
        os.environ['COLUMNS'] = '42'
2036
1948
        self.assertEqual(42, osutils.terminal_width())
2037
1949
 
2038
1950
    def test_tty_default_without_columns(self):
2039
 
        self.overrideEnv('BRZ_COLUMNS', None)
2040
 
        self.overrideEnv('COLUMNS', None)
 
1951
        del os.environ['BZR_COLUMNS']
 
1952
        del os.environ['COLUMNS']
2041
1953
 
2042
1954
        def terminal_size(w, h):
2043
1955
            return 42, 42
2050
1962
        self.assertEqual(42, osutils.terminal_width())
2051
1963
 
2052
1964
    def test_non_tty_default_without_columns(self):
2053
 
        self.overrideEnv('BRZ_COLUMNS', None)
2054
 
        self.overrideEnv('COLUMNS', None)
 
1965
        del os.environ['BZR_COLUMNS']
 
1966
        del os.environ['COLUMNS']
2055
1967
        self.replace_stdout(None)
2056
1968
        self.assertEqual(None, osutils.terminal_width())
2057
1969
 
2060
1972
        termios = term_ios_feature.module
2061
1973
        # bug 63539 is about a termios without TIOCGWINSZ attribute
2062
1974
        try:
2063
 
            termios.TIOCGWINSZ
 
1975
            orig = termios.TIOCGWINSZ
2064
1976
        except AttributeError:
2065
1977
            # We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
2066
1978
            pass
2067
1979
        else:
2068
1980
            self.overrideAttr(termios, 'TIOCGWINSZ')
2069
1981
            del termios.TIOCGWINSZ
2070
 
        self.overrideEnv('BRZ_COLUMNS', None)
2071
 
        self.overrideEnv('COLUMNS', None)
 
1982
        del os.environ['BZR_COLUMNS']
 
1983
        del os.environ['COLUMNS']
2072
1984
        # Whatever the result is, if we don't raise an exception, it's ok.
2073
1985
        osutils.terminal_width()
2074
1986
 
2075
 
 
2076
1987
class TestCreationOps(tests.TestCaseInTempDir):
2077
1988
    _test_needs_features = [features.chown_feature]
2078
1989
 
2079
1990
    def setUp(self):
2080
 
        super(TestCreationOps, self).setUp()
 
1991
        tests.TestCaseInTempDir.setUp(self)
2081
1992
        self.overrideAttr(os, 'chown', self._dummy_chown)
2082
1993
 
2083
1994
        # params set by call to _dummy_chown
2089
2000
    def test_copy_ownership_from_path(self):
2090
2001
        """copy_ownership_from_path test with specified src."""
2091
2002
        ownsrc = '/'
2092
 
        open('test_file', 'wt').close()
 
2003
        f = open('test_file', 'wt')
2093
2004
        osutils.copy_ownership_from_path('test_file', ownsrc)
2094
2005
 
2095
2006
        s = os.stat(ownsrc)
2096
 
        self.assertEqual(self.path, 'test_file')
2097
 
        self.assertEqual(self.uid, s.st_uid)
2098
 
        self.assertEqual(self.gid, s.st_gid)
 
2007
        self.assertEquals(self.path, 'test_file')
 
2008
        self.assertEquals(self.uid, s.st_uid)
 
2009
        self.assertEquals(self.gid, s.st_gid)
2099
2010
 
2100
2011
    def test_copy_ownership_nonesrc(self):
2101
2012
        """copy_ownership_from_path test with src=None."""
2102
 
        open('test_file', 'wt').close()
 
2013
        f = open('test_file', 'wt')
2103
2014
        # should use parent dir for permissions
2104
2015
        osutils.copy_ownership_from_path('test_file')
2105
2016
 
2106
2017
        s = os.stat('..')
2107
 
        self.assertEqual(self.path, 'test_file')
2108
 
        self.assertEqual(self.uid, s.st_uid)
2109
 
        self.assertEqual(self.gid, s.st_gid)
2110
 
 
2111
 
 
2112
 
class TestPathFromEnviron(tests.TestCase):
2113
 
 
2114
 
    def test_is_unicode(self):
2115
 
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
2116
 
        path = osutils.path_from_environ('BRZ_TEST_PATH')
2117
 
        self.assertIsInstance(path, text_type)
2118
 
        self.assertEqual(u'./anywhere at all/', path)
2119
 
 
2120
 
    def test_posix_path_env_ascii(self):
2121
 
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
2122
 
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
2123
 
        self.assertIsInstance(home, text_type)
2124
 
        self.assertEqual(u'/tmp', home)
2125
 
 
2126
 
    def test_posix_path_env_unicode(self):
2127
 
        self.requireFeature(features.ByteStringNamedFilesystem)
2128
 
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
2129
 
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2130
 
        self.assertEqual(u'/home/\xa7test',
2131
 
                         osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2132
 
        osutils._fs_enc = "iso8859-5"
2133
 
        if PY3:
2134
 
            # In Python 3, os.environ returns unicode.
2135
 
            self.assertEqual(u'/home/\xa7test',
2136
 
                             osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2137
 
        else:
2138
 
            self.assertEqual(u'/home/\u0407test',
2139
 
                             osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2140
 
            osutils._fs_enc = "utf-8"
2141
 
            self.assertRaises(
2142
 
                errors.BadFilenameEncoding,
2143
 
                osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
2144
 
 
2145
 
 
2146
 
class TestGetHomeDir(tests.TestCase):
2147
 
 
2148
 
    def test_is_unicode(self):
2149
 
        home = osutils._get_home_dir()
2150
 
        self.assertIsInstance(home, text_type)
2151
 
 
2152
 
    def test_posix_homeless(self):
2153
 
        self.overrideEnv('HOME', None)
2154
 
        home = osutils._get_home_dir()
2155
 
        self.assertIsInstance(home, text_type)
2156
 
 
2157
 
    def test_posix_home_ascii(self):
2158
 
        self.overrideEnv('HOME', '/home/test')
2159
 
        home = osutils._posix_get_home_dir()
2160
 
        self.assertIsInstance(home, text_type)
2161
 
        self.assertEqual(u'/home/test', home)
2162
 
 
2163
 
    def test_posix_home_unicode(self):
2164
 
        self.requireFeature(features.ByteStringNamedFilesystem)
2165
 
        self.overrideEnv('HOME', '/home/\xa7test')
2166
 
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2167
 
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2168
 
        osutils._fs_enc = "iso8859-5"
2169
 
        if PY3:
2170
 
            # In python 3, os.environ returns unicode
2171
 
            self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2172
 
        else:
2173
 
            self.assertEqual(u'/home/\u0407test',
2174
 
                             osutils._posix_get_home_dir())
2175
 
            osutils._fs_enc = "utf-8"
2176
 
            self.assertRaises(errors.BadFilenameEncoding,
2177
 
                              osutils._posix_get_home_dir)
2178
 
 
2179
 
 
2180
 
class TestGetuserUnicode(tests.TestCase):
2181
 
 
2182
 
    def test_is_unicode(self):
2183
 
        user = osutils.getuser_unicode()
2184
 
        self.assertIsInstance(user, text_type)
2185
 
 
2186
 
    def envvar_to_override(self):
2187
 
        if sys.platform == "win32":
2188
 
            # Disable use of platform calls on windows so envvar is used
2189
 
            self.overrideAttr(win32utils, 'has_ctypes', False)
2190
 
            return 'USERNAME'  # only variable used on windows
2191
 
        return 'LOGNAME'  # first variable checked by getpass.getuser()
2192
 
 
2193
 
    def test_ascii_user(self):
2194
 
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
2195
 
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
2196
 
 
2197
 
    def test_unicode_user(self):
2198
 
        ue = osutils.get_user_encoding()
2199
 
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
2200
 
        if uni_val is None:
2201
 
            raise tests.TestSkipped(
2202
 
                'Cannot find a unicode character that works in encoding %s'
2203
 
                % (osutils.get_user_encoding(),))
2204
 
        uni_username = u'jrandom' + uni_val
2205
 
        encoded_username = uni_username.encode(ue)
2206
 
        if PY3:
2207
 
            self.overrideEnv(self.envvar_to_override(), uni_username)
2208
 
        else:
2209
 
            self.overrideEnv(self.envvar_to_override(), encoded_username)
2210
 
        self.assertEqual(uni_username, osutils.getuser_unicode())
2211
 
 
2212
 
 
2213
 
class TestBackupNames(tests.TestCase):
2214
 
 
2215
 
    def setUp(self):
2216
 
        super(TestBackupNames, self).setUp()
2217
 
        self.backups = []
2218
 
 
2219
 
    def backup_exists(self, name):
2220
 
        return name in self.backups
2221
 
 
2222
 
    def available_backup_name(self, name):
2223
 
        backup_name = osutils.available_backup_name(name, self.backup_exists)
2224
 
        self.backups.append(backup_name)
2225
 
        return backup_name
2226
 
 
2227
 
    def assertBackupName(self, expected, name):
2228
 
        self.assertEqual(expected, self.available_backup_name(name))
2229
 
 
2230
 
    def test_empty(self):
2231
 
        self.assertBackupName('file.~1~', 'file')
2232
 
 
2233
 
    def test_existing(self):
2234
 
        self.available_backup_name('file')
2235
 
        self.available_backup_name('file')
2236
 
        self.assertBackupName('file.~3~', 'file')
2237
 
        # Empty slots are found, this is not a strict requirement and may be
2238
 
        # revisited if we test against all implementations.
2239
 
        self.backups.remove('file.~2~')
2240
 
        self.assertBackupName('file.~2~', 'file')
2241
 
 
2242
 
 
2243
 
class TestFindExecutableInPath(tests.TestCase):
2244
 
 
2245
 
    def test_windows(self):
2246
 
        if sys.platform != 'win32':
2247
 
            raise tests.TestSkipped('test requires win32')
2248
 
        self.assertTrue(osutils.find_executable_on_path(
2249
 
            'explorer') is not None)
2250
 
        self.assertTrue(
2251
 
            osutils.find_executable_on_path('explorer.exe') is not None)
2252
 
        self.assertTrue(
2253
 
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
2254
 
        self.assertTrue(
2255
 
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2256
 
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
2257
 
 
2258
 
    def test_windows_app_path(self):
2259
 
        if sys.platform != 'win32':
2260
 
            raise tests.TestSkipped('test requires win32')
2261
 
        # Override PATH env var so that exe can only be found on App Path
2262
 
        self.overrideEnv('PATH', '')
2263
 
        # Internt Explorer is always registered in the App Path
2264
 
        self.assertTrue(osutils.find_executable_on_path(
2265
 
            'iexplore') is not None)
2266
 
 
2267
 
    def test_other(self):
2268
 
        if sys.platform == 'win32':
2269
 
            raise tests.TestSkipped('test requires non-win32')
2270
 
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
2271
 
        self.assertTrue(
2272
 
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2273
 
 
2274
 
 
2275
 
class TestEnvironmentErrors(tests.TestCase):
2276
 
    """Test handling of environmental errors"""
2277
 
 
2278
 
    def test_is_oserror(self):
2279
 
        self.assertTrue(osutils.is_environment_error(
2280
 
            OSError(errno.EINVAL, "Invalid parameter")))
2281
 
 
2282
 
    def test_is_ioerror(self):
2283
 
        self.assertTrue(osutils.is_environment_error(
2284
 
            IOError(errno.EINVAL, "Invalid parameter")))
2285
 
 
2286
 
    def test_is_socket_error(self):
2287
 
        self.assertTrue(osutils.is_environment_error(
2288
 
            socket.error(errno.EINVAL, "Invalid parameter")))
2289
 
 
2290
 
    def test_is_select_error(self):
2291
 
        self.assertTrue(osutils.is_environment_error(
2292
 
            select.error(errno.EINVAL, "Invalid parameter")))
2293
 
 
2294
 
    def test_is_pywintypes_error(self):
2295
 
        self.requireFeature(features.pywintypes)
2296
 
        import pywintypes
2297
 
        self.assertTrue(osutils.is_environment_error(
2298
 
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))
2299
 
 
2300
 
 
2301
 
class SupportsExecutableTests(tests.TestCaseInTempDir):
2302
 
 
2303
 
    def test_returns_bool(self):
2304
 
        self.assertIsInstance(osutils.supports_executable(self.test_dir), bool)
2305
 
 
2306
 
 
2307
 
class SupportsSymlinksTests(tests.TestCaseInTempDir):
2308
 
 
2309
 
    def test_returns_bool(self):
2310
 
        self.assertIsInstance(osutils.supports_symlinks(self.test_dir), bool)
2311
 
 
2312
 
 
2313
 
class MtabReader(tests.TestCaseInTempDir):
2314
 
 
2315
 
    def test_read_mtab(self):
2316
 
        self.build_tree_contents([('mtab', """\
2317
 
/dev/mapper/blah--vg-root / ext4 rw,relatime,errors=remount-ro 0 0
2318
 
/dev/mapper/blah--vg-home /home vfat rw,relatime 0 0
2319
 
# comment
2320
 
 
2321
 
iminvalid
2322
 
""")])
2323
 
        self.assertEqual(
2324
 
            list(osutils.read_mtab('mtab')),
2325
 
            [(b'/', 'ext4'),
2326
 
             (b'/home', 'vfat')])
2327
 
 
2328
 
 
2329
 
class GetFsTypeTests(tests.TestCaseInTempDir):
2330
 
 
2331
 
    def test_returns_string_or_none(self):
2332
 
        ret = osutils.get_fs_type(self.test_dir)
2333
 
        self.assertTrue(isinstance(ret, text_type) or ret is None)
2334
 
 
2335
 
    def test_returns_most_specific(self):
2336
 
        self.overrideAttr(
2337
 
            osutils, '_FILESYSTEM_FINDER',
2338
 
            osutils.FilesystemFinder(
2339
 
                [(b'/', 'ext4'), (b'/home', 'vfat'),
2340
 
                 (b'/home/jelmer', 'ext2')]))
2341
 
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer/blah'), 'ext2')
2342
 
        self.assertEqual(osutils.get_fs_type('/home/jelmer/blah'), 'ext2')
2343
 
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer'), 'ext2')
2344
 
        self.assertEqual(osutils.get_fs_type(b'/home/martin'), 'vfat')
2345
 
        self.assertEqual(osutils.get_fs_type(b'/home'), 'vfat')
2346
 
        self.assertEqual(osutils.get_fs_type(b'/other'), 'ext4')
2347
 
 
2348
 
    def test_returns_none(self):
2349
 
        self.overrideAttr(
2350
 
            osutils, '_FILESYSTEM_FINDER',
2351
 
            osutils.FilesystemFinder([]))
2352
 
        self.assertIs(osutils.get_fs_type('/home/jelmer/blah'), None)
2353
 
        self.assertIs(osutils.get_fs_type(b'/home/jelmer/blah'), None)
2354
 
        self.assertIs(osutils.get_fs_type('/home/jelmer'), None)
 
2018
        self.assertEquals(self.path, 'test_file')
 
2019
        self.assertEquals(self.uid, s.st_uid)
 
2020
        self.assertEquals(self.gid, s.st_gid)