/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Marius Kruger
  • Date: 2010-07-10 21:28:56 UTC
  • mto: (5384.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5385.
  • Revision ID: marius.kruger@enerweb.co.za-20100710212856-uq4ji3go0u5se7hx
* Update documentation
* add NEWS

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2016 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
 
from __future__ import absolute_import, division
20
 
 
 
19
from cStringIO import StringIO
21
20
import errno
22
21
import os
23
 
import select
 
22
import re
24
23
import socket
 
24
import stat
25
25
import sys
26
 
import tempfile
27
26
import time
28
27
 
29
 
from .. import (
 
28
from bzrlib import (
30
29
    errors,
 
30
    lazy_regex,
31
31
    osutils,
 
32
    symbol_versioning,
32
33
    tests,
33
34
    trace,
34
35
    win32utils,
35
36
    )
36
 
from ..sixish import (
37
 
    BytesIO,
38
 
    PY3,
39
 
    text_type,
40
 
    )
41
 
from . import (
 
37
from bzrlib.tests import (
42
38
    features,
43
39
    file_utils,
44
40
    test__walkdirs_win32,
45
41
    )
46
 
from .scenarios import load_tests_apply_scenarios
47
 
 
48
 
 
49
 
class _UTF8DirReaderFeature(features.ModuleAvailableFeature):
 
42
 
 
43
 
 
44
class _UTF8DirReaderFeature(tests.Feature):
50
45
 
51
46
    def _probe(self):
52
47
        try:
53
 
            from .. import _readdir_pyx
54
 
            self._module = _readdir_pyx
 
48
            from bzrlib import _readdir_pyx
55
49
            self.reader = _readdir_pyx.UTF8DirReader
56
50
            return True
57
51
        except ImportError:
58
52
            return False
59
53
 
60
 
 
61
 
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
62
 
 
63
 
term_ios_feature = features.ModuleAvailableFeature('termios')
 
54
    def feature_name(self):
 
55
        return 'bzrlib._readdir_pyx'
 
56
 
 
57
UTF8DirReaderFeature = _UTF8DirReaderFeature()
 
58
 
 
59
term_ios_feature = tests.ModuleAvailableFeature('termios')
64
60
 
65
61
 
66
62
def _already_unicode(s):
84
80
    # Some DirReaders are platform specific and even there they may not be
85
81
    # available.
86
82
    if UTF8DirReaderFeature.available():
87
 
        from .. import _readdir_pyx
 
83
        from bzrlib import _readdir_pyx
88
84
        scenarios.append(('utf8',
89
85
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
90
86
                               _native_to_unicode=_utf8_to_unicode)))
91
87
 
92
88
    if test__walkdirs_win32.win32_readdir_feature.available():
93
89
        try:
94
 
            from .. import _walkdirs_win32
 
90
            from bzrlib import _walkdirs_win32
95
91
            scenarios.append(
96
92
                ('win32',
97
93
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
101
97
    return scenarios
102
98
 
103
99
 
104
 
load_tests = load_tests_apply_scenarios
 
100
def load_tests(basic_tests, module, loader):
 
101
    suite = loader.suiteClass()
 
102
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
 
103
        basic_tests, tests.condition_isinstance(TestDirReader))
 
104
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
 
105
    suite.addTest(remaining_tests)
 
106
    return suite
105
107
 
106
108
 
107
109
class TestContainsWhitespace(tests.TestCase):
108
110
 
109
111
    def test_contains_whitespace(self):
110
 
        self.assertTrue(osutils.contains_whitespace(u' '))
111
 
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
112
 
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
113
 
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
114
 
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
115
 
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
 
112
        self.failUnless(osutils.contains_whitespace(u' '))
 
113
        self.failUnless(osutils.contains_whitespace(u'hello there'))
 
114
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
 
115
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
 
116
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
 
117
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
116
118
 
117
119
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
118
120
        # is whitespace, but we do not.
119
 
        self.assertFalse(osutils.contains_whitespace(u''))
120
 
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
121
 
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
 
121
        self.failIf(osutils.contains_whitespace(u''))
 
122
        self.failIf(osutils.contains_whitespace(u'hellothere'))
 
123
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
122
124
 
123
125
 
124
126
class TestRename(tests.TestCaseInTempDir):
136
138
 
137
139
    def test_fancy_rename(self):
138
140
        # This should work everywhere
139
 
        self.create_file('a', b'something in a\n')
 
141
        self.create_file('a', 'something in a\n')
140
142
        self._fancy_rename('a', 'b')
141
 
        self.assertPathDoesNotExist('a')
142
 
        self.assertPathExists('b')
143
 
        self.check_file_contents('b', b'something in a\n')
 
143
        self.failIfExists('a')
 
144
        self.failUnlessExists('b')
 
145
        self.check_file_contents('b', 'something in a\n')
144
146
 
145
 
        self.create_file('a', b'new something in a\n')
 
147
        self.create_file('a', 'new something in a\n')
146
148
        self._fancy_rename('b', 'a')
147
149
 
148
 
        self.check_file_contents('a', b'something in a\n')
 
150
        self.check_file_contents('a', 'something in a\n')
149
151
 
150
152
    def test_fancy_rename_fails_source_missing(self):
151
153
        # An exception should be raised, and the target should be left in place
152
 
        self.create_file('target', b'data in target\n')
 
154
        self.create_file('target', 'data in target\n')
153
155
        self.assertRaises((IOError, OSError), self._fancy_rename,
154
156
                          'missingsource', 'target')
155
 
        self.assertPathExists('target')
156
 
        self.check_file_contents('target', b'data in target\n')
 
157
        self.failUnlessExists('target')
 
158
        self.check_file_contents('target', 'data in target\n')
157
159
 
158
160
    def test_fancy_rename_fails_if_source_and_target_missing(self):
159
161
        self.assertRaises((IOError, OSError), self._fancy_rename,
161
163
 
162
164
    def test_rename(self):
163
165
        # Rename should be semi-atomic on all platforms
164
 
        self.create_file('a', b'something in a\n')
 
166
        self.create_file('a', 'something in a\n')
165
167
        osutils.rename('a', 'b')
166
 
        self.assertPathDoesNotExist('a')
167
 
        self.assertPathExists('b')
168
 
        self.check_file_contents('b', b'something in a\n')
 
168
        self.failIfExists('a')
 
169
        self.failUnlessExists('b')
 
170
        self.check_file_contents('b', 'something in a\n')
169
171
 
170
 
        self.create_file('a', b'new something in a\n')
 
172
        self.create_file('a', 'new something in a\n')
171
173
        osutils.rename('b', 'a')
172
174
 
173
 
        self.check_file_contents('a', b'something in a\n')
 
175
        self.check_file_contents('a', 'something in a\n')
174
176
 
175
177
    # TODO: test fancy_rename using a MemoryTransport
176
178
 
182
184
        # we can't use failUnlessExists on case-insensitive filesystem
183
185
        # so try to check shape of the tree
184
186
        shape = sorted(os.listdir('.'))
185
 
        self.assertEqual(['A', 'B'], shape)
186
 
 
187
 
    def test_rename_exception(self):
188
 
        try:
189
 
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
190
 
        except OSError as e:
191
 
            self.assertEqual(e.old_filename, 'nonexistent_path')
192
 
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
193
 
            self.assertTrue('nonexistent_path' in e.strerror)
194
 
            self.assertTrue('different_nonexistent_path' in e.strerror)
 
187
        self.assertEquals(['A', 'B'], shape)
195
188
 
196
189
 
197
190
class TestRandChars(tests.TestCase):
224
217
                         (['src'], SRC_FOO_C),
225
218
                         (['src'], 'src'),
226
219
                         ]:
227
 
            self.assertTrue(osutils.is_inside_any(dirs, fn))
 
220
            self.assert_(osutils.is_inside_any(dirs, fn))
228
221
        for dirs, fn in [(['src'], 'srccontrol'),
229
222
                         (['src'], 'srccontrol/foo')]:
230
223
            self.assertFalse(osutils.is_inside_any(dirs, fn))
236
229
                         (['src/bar.c', 'bla/foo.c'], 'src'),
237
230
                         (['src'], 'src'),
238
231
                         ]:
239
 
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
 
232
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
240
233
 
241
234
        for dirs, fn in [(['src'], 'srccontrol'),
242
235
                         (['srccontrol/foo.c'], 'src'),
244
237
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
245
238
 
246
239
 
247
 
class TestLstat(tests.TestCaseInTempDir):
248
 
 
249
 
    def test_lstat_matches_fstat(self):
250
 
        # On Windows, lstat and fstat don't always agree, primarily in the
251
 
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
252
 
        # custom implementation.
253
 
        if sys.platform == 'win32':
254
 
            # We only have special lstat/fstat if we have the extension.
255
 
            # Without it, we may end up re-reading content when we don't have
256
 
            # to, but otherwise it doesn't effect correctness.
257
 
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
258
 
        with open('test-file.txt', 'wb') as f:
259
 
            f.write(b'some content\n')
260
 
            f.flush()
261
 
            self.assertEqualStat(osutils.fstat(f.fileno()),
262
 
                                 osutils.lstat('test-file.txt'))
263
 
 
264
 
 
265
240
class TestRmTree(tests.TestCaseInTempDir):
266
241
 
267
242
    def test_rmtree(self):
268
243
        # Check to remove tree with read-only files/dirs
269
244
        os.mkdir('dir')
270
 
        with open('dir/file', 'w') as f:
271
 
            f.write('spam')
 
245
        f = file('dir/file', 'w')
 
246
        f.write('spam')
 
247
        f.close()
272
248
        # would like to also try making the directory readonly, but at the
273
249
        # moment python shutil.rmtree doesn't handle that properly - it would
274
250
        # need to chmod the directory before removing things inside it - deferred
278
254
 
279
255
        osutils.rmtree('dir')
280
256
 
281
 
        self.assertPathDoesNotExist('dir/file')
282
 
        self.assertPathDoesNotExist('dir')
 
257
        self.failIfExists('dir/file')
 
258
        self.failIfExists('dir')
283
259
 
284
260
 
285
261
class TestDeleteAny(tests.TestCaseInTempDir):
298
274
 
299
275
    def test_file_kind(self):
300
276
        self.build_tree(['file', 'dir/'])
301
 
        self.assertEqual('file', osutils.file_kind('file'))
302
 
        self.assertEqual('directory', osutils.file_kind('dir/'))
 
277
        self.assertEquals('file', osutils.file_kind('file'))
 
278
        self.assertEquals('directory', osutils.file_kind('dir/'))
303
279
        if osutils.has_symlinks():
304
280
            os.symlink('symlink', 'symlink')
305
 
            self.assertEqual('symlink', osutils.file_kind('symlink'))
 
281
            self.assertEquals('symlink', osutils.file_kind('symlink'))
306
282
 
307
283
        # TODO: jam 20060529 Test a block device
308
284
        try:
309
285
            os.lstat('/dev/null')
310
 
        except OSError as e:
 
286
        except OSError, e:
311
287
            if e.errno not in (errno.ENOENT,):
312
288
                raise
313
289
        else:
314
 
            self.assertEqual(
315
 
                'chardev',
316
 
                osutils.file_kind(os.path.realpath('/dev/null')))
 
290
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
317
291
 
318
292
        mkfifo = getattr(os, 'mkfifo', None)
319
293
        if mkfifo:
320
294
            mkfifo('fifo')
321
295
            try:
322
 
                self.assertEqual('fifo', osutils.file_kind('fifo'))
 
296
                self.assertEquals('fifo', osutils.file_kind('fifo'))
323
297
            finally:
324
298
                os.remove('fifo')
325
299
 
328
302
            s = socket.socket(AF_UNIX)
329
303
            s.bind('socket')
330
304
            try:
331
 
                self.assertEqual('socket', osutils.file_kind('socket'))
 
305
                self.assertEquals('socket', osutils.file_kind('socket'))
332
306
            finally:
333
307
                os.remove('socket')
334
308
 
353
327
 
354
328
        orig_umask = osutils.get_umask()
355
329
        self.addCleanup(os.umask, orig_umask)
356
 
        os.umask(0o222)
357
 
        self.assertEqual(0o222, osutils.get_umask())
358
 
        os.umask(0o022)
359
 
        self.assertEqual(0o022, osutils.get_umask())
360
 
        os.umask(0o002)
361
 
        self.assertEqual(0o002, osutils.get_umask())
362
 
        os.umask(0o027)
363
 
        self.assertEqual(0o027, osutils.get_umask())
 
330
        os.umask(0222)
 
331
        self.assertEqual(0222, osutils.get_umask())
 
332
        os.umask(0022)
 
333
        self.assertEqual(0022, osutils.get_umask())
 
334
        os.umask(0002)
 
335
        self.assertEqual(0002, osutils.get_umask())
 
336
        os.umask(0027)
 
337
        self.assertEqual(0027, osutils.get_umask())
364
338
 
365
339
 
366
340
class TestDateTime(tests.TestCase):
402
376
        self.assertFormatedDelta('2 seconds in the future', -2)
403
377
 
404
378
    def test_format_date(self):
405
 
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
406
 
                          osutils.format_date, 0, timezone='foo')
 
379
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
380
            osutils.format_date, 0, timezone='foo')
407
381
        self.assertIsInstance(osutils.format_date(0), str)
408
 
        self.assertIsInstance(osutils.format_local_date(0), text_type)
 
382
        self.assertIsInstance(osutils.format_local_date(0), unicode)
409
383
        # Testing for the actual value of the local weekday without
410
384
        # duplicating the code from format_date is difficult.
411
385
        # Instead blackbox.test_locale should check for localized
413
387
 
414
388
    def test_format_date_with_offset_in_original_timezone(self):
415
389
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
416
 
                         osutils.format_date_with_offset_in_original_timezone(0))
 
390
            osutils.format_date_with_offset_in_original_timezone(0))
417
391
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
418
 
                         osutils.format_date_with_offset_in_original_timezone(100000))
 
392
            osutils.format_date_with_offset_in_original_timezone(100000))
419
393
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
420
 
                         osutils.format_date_with_offset_in_original_timezone(100000, 7200))
 
394
            osutils.format_date_with_offset_in_original_timezone(100000, 7200))
421
395
 
422
396
    def test_local_time_offset(self):
423
397
        """Test that local_time_offset() returns a sane value."""
439
413
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
440
414
 
441
415
 
442
 
class TestFdatasync(tests.TestCaseInTempDir):
443
 
 
444
 
    def do_fdatasync(self):
445
 
        f = tempfile.NamedTemporaryFile()
446
 
        osutils.fdatasync(f.fileno())
447
 
        f.close()
448
 
 
449
 
    @staticmethod
450
 
    def raise_eopnotsupp(*args, **kwargs):
451
 
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
452
 
 
453
 
    @staticmethod
454
 
    def raise_enotsup(*args, **kwargs):
455
 
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
456
 
 
457
 
    def test_fdatasync_handles_system_function(self):
458
 
        self.overrideAttr(os, "fdatasync")
459
 
        self.do_fdatasync()
460
 
 
461
 
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
462
 
        self.overrideAttr(os, "fdatasync")
463
 
        self.overrideAttr(os, "fsync")
464
 
        self.do_fdatasync()
465
 
 
466
 
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
467
 
        self.overrideAttr(errno, "EOPNOTSUPP")
468
 
        self.do_fdatasync()
469
 
 
470
 
    def test_fdatasync_catches_ENOTSUP(self):
471
 
        enotsup = getattr(errno, "ENOTSUP", None)
472
 
        if enotsup is None:
473
 
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
474
 
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
475
 
        self.do_fdatasync()
476
 
 
477
 
    def test_fdatasync_catches_EOPNOTSUPP(self):
478
 
        enotsup = getattr(errno, "EOPNOTSUPP", None)
479
 
        if enotsup is None:
480
 
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
481
 
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
482
 
        self.do_fdatasync()
483
 
 
484
 
 
485
416
class TestLinks(tests.TestCaseInTempDir):
486
417
 
487
418
    def test_dereference_path(self):
488
 
        self.requireFeature(features.SymlinkFeature)
 
419
        self.requireFeature(tests.SymlinkFeature)
489
420
        cwd = osutils.realpath('.')
490
421
        os.mkdir('bar')
491
422
        bar_path = osutils.pathjoin(cwd, 'bar')
512
443
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
513
444
 
514
445
    def test_changing_access(self):
515
 
        with open('file', 'w') as f:
516
 
            f.write('monkey')
 
446
        f = file('file', 'w')
 
447
        f.write('monkey')
 
448
        f.close()
517
449
 
518
450
        # Make a file readonly
519
451
        osutils.make_readonly('file')
520
452
        mode = os.lstat('file').st_mode
521
 
        self.assertEqual(mode, mode & 0o777555)
 
453
        self.assertEqual(mode, mode & 0777555)
522
454
 
523
455
        # Make a file writable
524
456
        osutils.make_writable('file')
525
457
        mode = os.lstat('file').st_mode
526
 
        self.assertEqual(mode, mode | 0o200)
 
458
        self.assertEqual(mode, mode | 0200)
527
459
 
528
460
        if osutils.has_symlinks():
529
461
            # should not error when handed a symlink
537
469
 
538
470
class TestCanonicalRelPath(tests.TestCaseInTempDir):
539
471
 
540
 
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
 
472
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
541
473
 
542
474
    def test_canonical_relpath_simple(self):
543
 
        f = open('MixedCaseName', 'w')
 
475
        f = file('MixedCaseName', 'w')
544
476
        f.close()
545
477
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
546
 
        self.assertEqual('work/MixedCaseName', actual)
 
478
        self.failUnlessEqual('work/MixedCaseName', actual)
547
479
 
548
480
    def test_canonical_relpath_missing_tail(self):
549
481
        os.mkdir('MixedCaseParent')
550
482
        actual = osutils.canonical_relpath(self.test_base_dir,
551
483
                                           'mixedcaseparent/nochild')
552
 
        self.assertEqual('work/MixedCaseParent/nochild', actual)
 
484
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
553
485
 
554
486
 
555
487
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
599
531
    """Test pumpfile method."""
600
532
 
601
533
    def setUp(self):
602
 
        super(TestPumpFile, self).setUp()
 
534
        tests.TestCase.setUp(self)
603
535
        # create a test datablock
604
536
        self.block_size = 512
605
 
        pattern = b'0123456789ABCDEF'
606
 
        self.test_data = pattern * (3 * self.block_size // len(pattern))
 
537
        pattern = '0123456789ABCDEF'
 
538
        self.test_data = pattern * (3 * self.block_size / len(pattern))
607
539
        self.test_data_len = len(self.test_data)
608
540
 
609
541
    def test_bracket_block_size(self):
613
545
        self.assertTrue(self.test_data_len > self.block_size)
614
546
 
615
547
        from_file = file_utils.FakeReadFile(self.test_data)
616
 
        to_file = BytesIO()
 
548
        to_file = StringIO()
617
549
 
618
 
        # read (max // 2) bytes and verify read size wasn't affected
619
 
        num_bytes_to_read = self.block_size // 2
620
 
        osutils.pumpfile(from_file, to_file,
621
 
                         num_bytes_to_read, self.block_size)
 
550
        # read (max / 2) bytes and verify read size wasn't affected
 
551
        num_bytes_to_read = self.block_size / 2
 
552
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
622
553
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
623
554
        self.assertEqual(from_file.get_read_count(), 1)
624
555
 
625
556
        # read (max) bytes and verify read size wasn't affected
626
557
        num_bytes_to_read = self.block_size
627
558
        from_file.reset_read_count()
628
 
        osutils.pumpfile(from_file, to_file,
629
 
                         num_bytes_to_read, self.block_size)
 
559
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
630
560
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
631
561
        self.assertEqual(from_file.get_read_count(), 1)
632
562
 
633
563
        # read (max + 1) bytes and verify read size was limited
634
564
        num_bytes_to_read = self.block_size + 1
635
565
        from_file.reset_read_count()
636
 
        osutils.pumpfile(from_file, to_file,
637
 
                         num_bytes_to_read, self.block_size)
 
566
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
638
567
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
639
568
        self.assertEqual(from_file.get_read_count(), 2)
640
569
 
641
570
        # finish reading the rest of the data
642
571
        num_bytes_to_read = self.test_data_len - to_file.tell()
643
 
        osutils.pumpfile(from_file, to_file,
644
 
                         num_bytes_to_read, self.block_size)
 
572
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
645
573
 
646
574
        # report error if the data wasn't equal (we only report the size due
647
575
        # to the length of the data)
658
586
 
659
587
        # retrieve data in blocks
660
588
        from_file = file_utils.FakeReadFile(self.test_data)
661
 
        to_file = BytesIO()
 
589
        to_file = StringIO()
662
590
        osutils.pumpfile(from_file, to_file, self.test_data_len,
663
591
                         self.block_size)
664
592
 
682
610
 
683
611
        # retrieve data to EOF
684
612
        from_file = file_utils.FakeReadFile(self.test_data)
685
 
        to_file = BytesIO()
 
613
        to_file = StringIO()
686
614
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
687
615
 
688
616
        # verify read size was equal to the maximum read size
702
630
        with this new version."""
703
631
        # retrieve data using default (old) pumpfile method
704
632
        from_file = file_utils.FakeReadFile(self.test_data)
705
 
        to_file = BytesIO()
 
633
        to_file = StringIO()
706
634
        osutils.pumpfile(from_file, to_file)
707
635
 
708
636
        # report error if the data wasn't equal (we only report the size due
714
642
 
715
643
    def test_report_activity(self):
716
644
        activity = []
717
 
 
718
645
        def log_activity(length, direction):
719
646
            activity.append((length, direction))
720
 
        from_file = BytesIO(self.test_data)
721
 
        to_file = BytesIO()
 
647
        from_file = StringIO(self.test_data)
 
648
        to_file = StringIO()
722
649
        osutils.pumpfile(from_file, to_file, buff_size=500,
723
650
                         report_activity=log_activity, direction='read')
724
651
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
725
652
                          (36, 'read')], activity)
726
653
 
727
 
        from_file = BytesIO(self.test_data)
728
 
        to_file = BytesIO()
 
654
        from_file = StringIO(self.test_data)
 
655
        to_file = StringIO()
729
656
        del activity[:]
730
657
        osutils.pumpfile(from_file, to_file, buff_size=500,
731
658
                         report_activity=log_activity, direction='write')
733
660
                          (36, 'write')], activity)
734
661
 
735
662
        # And with a limited amount of data
736
 
        from_file = BytesIO(self.test_data)
737
 
        to_file = BytesIO()
 
663
        from_file = StringIO(self.test_data)
 
664
        to_file = StringIO()
738
665
        del activity[:]
739
666
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
740
667
                         report_activity=log_activity, direction='read')
741
 
        self.assertEqual(
742
 
            [(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
668
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
669
 
743
670
 
744
671
 
745
672
class TestPumpStringFile(tests.TestCase):
746
673
 
747
674
    def test_empty(self):
748
 
        output = BytesIO()
749
 
        osutils.pump_string_file(b"", output)
750
 
        self.assertEqual(b"", output.getvalue())
 
675
        output = StringIO()
 
676
        osutils.pump_string_file("", output)
 
677
        self.assertEqual("", output.getvalue())
751
678
 
752
679
    def test_more_than_segment_size(self):
753
 
        output = BytesIO()
754
 
        osutils.pump_string_file(b"123456789", output, 2)
755
 
        self.assertEqual(b"123456789", output.getvalue())
 
680
        output = StringIO()
 
681
        osutils.pump_string_file("123456789", output, 2)
 
682
        self.assertEqual("123456789", output.getvalue())
756
683
 
757
684
    def test_segment_size(self):
758
 
        output = BytesIO()
759
 
        osutils.pump_string_file(b"12", output, 2)
760
 
        self.assertEqual(b"12", output.getvalue())
 
685
        output = StringIO()
 
686
        osutils.pump_string_file("12", output, 2)
 
687
        self.assertEqual("12", output.getvalue())
761
688
 
762
689
    def test_segment_size_multiple(self):
763
 
        output = BytesIO()
764
 
        osutils.pump_string_file(b"1234", output, 2)
765
 
        self.assertEqual(b"1234", output.getvalue())
 
690
        output = StringIO()
 
691
        osutils.pump_string_file("1234", output, 2)
 
692
        self.assertEqual("1234", output.getvalue())
766
693
 
767
694
 
768
695
class TestRelpath(tests.TestCase):
787
714
class TestSafeUnicode(tests.TestCase):
788
715
 
789
716
    def test_from_ascii_string(self):
790
 
        self.assertEqual(u'foobar', osutils.safe_unicode(b'foobar'))
 
717
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
791
718
 
792
719
    def test_from_unicode_string_ascii_contents(self):
793
720
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
796
723
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
797
724
 
798
725
    def test_from_utf8_string(self):
799
 
        self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
 
726
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
800
727
 
801
728
    def test_bad_utf8_string(self):
802
729
        self.assertRaises(errors.BzrBadParameterNotUnicode,
803
730
                          osutils.safe_unicode,
804
 
                          b'\xbb\xbb')
 
731
                          '\xbb\xbb')
805
732
 
806
733
 
807
734
class TestSafeUtf8(tests.TestCase):
808
735
 
809
736
    def test_from_ascii_string(self):
810
 
        f = b'foobar'
811
 
        self.assertEqual(b'foobar', osutils.safe_utf8(f))
 
737
        f = 'foobar'
 
738
        self.assertEqual('foobar', osutils.safe_utf8(f))
812
739
 
813
740
    def test_from_unicode_string_ascii_contents(self):
814
 
        self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
 
741
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
815
742
 
816
743
    def test_from_unicode_string_unicode_contents(self):
817
 
        self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
744
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
818
745
 
819
746
    def test_from_utf8_string(self):
820
 
        self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
 
747
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
821
748
 
822
749
    def test_bad_utf8_string(self):
823
750
        self.assertRaises(errors.BzrBadParameterNotUnicode,
824
 
                          osutils.safe_utf8, b'\xbb\xbb')
 
751
                          osutils.safe_utf8, '\xbb\xbb')
825
752
 
826
753
 
827
754
class TestSafeRevisionId(tests.TestCase):
828
755
 
829
756
    def test_from_ascii_string(self):
830
757
        # this shouldn't give a warning because it's getting an ascii string
831
 
        self.assertEqual(b'foobar', osutils.safe_revision_id(b'foobar'))
 
758
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
832
759
 
833
760
    def test_from_unicode_string_ascii_contents(self):
834
 
        self.assertRaises(TypeError,
835
 
                          osutils.safe_revision_id, u'bargam')
 
761
        self.assertEqual('bargam',
 
762
                         osutils.safe_revision_id(u'bargam', warn=False))
 
763
 
 
764
    def test_from_unicode_deprecated(self):
 
765
        self.assertEqual('bargam',
 
766
            self.callDeprecated([osutils._revision_id_warning],
 
767
                                osutils.safe_revision_id, u'bargam'))
836
768
 
837
769
    def test_from_unicode_string_unicode_contents(self):
838
 
        self.assertRaises(TypeError,
839
 
                          osutils.safe_revision_id, u'bargam\xae')
 
770
        self.assertEqual('bargam\xc2\xae',
 
771
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
840
772
 
841
773
    def test_from_utf8_string(self):
842
 
        self.assertEqual(b'foo\xc2\xae',
843
 
                         osutils.safe_revision_id(b'foo\xc2\xae'))
 
774
        self.assertEqual('foo\xc2\xae',
 
775
                         osutils.safe_revision_id('foo\xc2\xae'))
844
776
 
845
777
    def test_none(self):
846
778
        """Currently, None is a valid revision_id"""
850
782
class TestSafeFileId(tests.TestCase):
851
783
 
852
784
    def test_from_ascii_string(self):
853
 
        self.assertEqual(b'foobar', osutils.safe_file_id(b'foobar'))
 
785
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
854
786
 
855
787
    def test_from_unicode_string_ascii_contents(self):
856
 
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
 
788
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
 
789
 
 
790
    def test_from_unicode_deprecated(self):
 
791
        self.assertEqual('bargam',
 
792
            self.callDeprecated([osutils._file_id_warning],
 
793
                                osutils.safe_file_id, u'bargam'))
857
794
 
858
795
    def test_from_unicode_string_unicode_contents(self):
859
 
        self.assertRaises(TypeError,
860
 
                          osutils.safe_file_id, u'bargam\xae')
 
796
        self.assertEqual('bargam\xc2\xae',
 
797
                         osutils.safe_file_id(u'bargam\xae', warn=False))
861
798
 
862
799
    def test_from_utf8_string(self):
863
 
        self.assertEqual(b'foo\xc2\xae',
864
 
                         osutils.safe_file_id(b'foo\xc2\xae'))
 
800
        self.assertEqual('foo\xc2\xae',
 
801
                         osutils.safe_file_id('foo\xc2\xae'))
865
802
 
866
803
    def test_none(self):
867
804
        """Currently, None is a valid revision_id"""
868
805
        self.assertEqual(None, osutils.safe_file_id(None))
869
806
 
870
807
 
871
 
class TestSendAll(tests.TestCase):
872
 
 
873
 
    def test_send_with_disconnected_socket(self):
874
 
        class DisconnectedSocket(object):
875
 
            def __init__(self, err):
876
 
                self.err = err
877
 
 
878
 
            def send(self, content):
879
 
                raise self.err
880
 
 
881
 
            def close(self):
882
 
                pass
883
 
        # All of these should be treated as ConnectionReset
884
 
        errs = []
885
 
        for err_cls in (IOError, socket.error):
886
 
            for errnum in osutils._end_of_stream_errors:
887
 
                errs.append(err_cls(errnum))
888
 
        for err in errs:
889
 
            sock = DisconnectedSocket(err)
890
 
            self.assertRaises(errors.ConnectionReset,
891
 
                              osutils.send_all, sock, b'some more content')
892
 
 
893
 
    def test_send_with_no_progress(self):
894
 
        # See https://bugs.launchpad.net/bzr/+bug/1047309
895
 
        # It seems that paramiko can get into a state where it doesn't error,
896
 
        # but it returns 0 bytes sent for requests over and over again.
897
 
        class NoSendingSocket(object):
898
 
            def __init__(self):
899
 
                self.call_count = 0
900
 
 
901
 
            def send(self, bytes):
902
 
                self.call_count += 1
903
 
                if self.call_count > 100:
904
 
                    # Prevent the test suite from hanging
905
 
                    raise RuntimeError('too many calls')
906
 
                return 0
907
 
        sock = NoSendingSocket()
908
 
        self.assertRaises(errors.ConnectionReset,
909
 
                          osutils.send_all, sock, b'content')
910
 
        self.assertEqual(1, sock.call_count)
911
 
 
912
 
 
913
 
class TestPosixFuncs(tests.TestCase):
914
 
    """Test that the posix version of normpath returns an appropriate path
915
 
       when used with 2 leading slashes."""
916
 
 
917
 
    def test_normpath(self):
918
 
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
919
 
        self.assertEqual(
920
 
            '/etc/shadow', osutils._posix_normpath('//etc/shadow'))
921
 
        self.assertEqual(
922
 
            '/etc/shadow', osutils._posix_normpath('///etc/shadow'))
923
 
 
924
 
 
925
808
class TestWin32Funcs(tests.TestCase):
926
809
    """Test that _win32 versions of os utilities return appropriate paths."""
927
810
 
928
811
    def test_abspath(self):
929
 
        self.requireFeature(features.win32_feature)
930
812
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
931
813
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
932
814
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
945
827
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
946
828
        self.assertEqual('path/to/foo',
947
829
                         osutils._win32_pathjoin('path/to/', 'foo'))
948
 
 
949
 
    def test_pathjoin_late_bugfix(self):
950
 
        if sys.version_info < (2, 7, 6):
951
 
            expected = '/foo'
952
 
        else:
953
 
            expected = 'C:/foo'
954
 
        self.assertEqual(expected,
 
830
        self.assertEqual('/foo',
955
831
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
956
 
        self.assertEqual(expected,
 
832
        self.assertEqual('/foo',
957
833
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
958
834
 
959
835
    def test_normpath(self):
964
840
 
965
841
    def test_getcwd(self):
966
842
        cwd = osutils._win32_getcwd()
967
 
        os_cwd = osutils._getcwd()
 
843
        os_cwd = os.getcwdu()
968
844
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
969
845
        # win32 is inconsistent whether it returns lower or upper case
970
846
        # and even if it was consistent the user might type the other
978
854
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
979
855
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
980
856
 
 
857
    def test_win98_abspath(self):
 
858
        # absolute path
 
859
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
 
860
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
 
861
        # UNC path
 
862
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
 
863
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
 
864
        # relative path
 
865
        cwd = osutils.getcwd().rstrip('/')
 
866
        drive = osutils.ntpath.splitdrive(cwd)[0]
 
867
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
 
868
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
 
869
        # unicode path
 
870
        u = u'\u1234'
 
871
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
 
872
 
981
873
 
982
874
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
983
875
    """Test win32 functions that create files."""
984
876
 
985
877
    def test_getcwd(self):
986
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
878
        self.requireFeature(tests.UnicodeFilenameFeature)
987
879
        os.mkdir(u'mu-\xb5')
988
880
        os.chdir(u'mu-\xb5')
989
881
        # TODO: jam 20060427 This will probably fail on Mac OSX because
994
886
 
995
887
    def test_minimum_path_selection(self):
996
888
        self.assertEqual(set(),
997
 
                         osutils.minimum_path_selection([]))
998
 
        self.assertEqual({'a'},
999
 
                         osutils.minimum_path_selection(['a']))
1000
 
        self.assertEqual({'a', 'b'},
1001
 
                         osutils.minimum_path_selection(['a', 'b']))
1002
 
        self.assertEqual({'a/', 'b'},
1003
 
                         osutils.minimum_path_selection(['a/', 'b']))
1004
 
        self.assertEqual({'a/', 'b'},
1005
 
                         osutils.minimum_path_selection(['a/c', 'a/', 'b']))
1006
 
        self.assertEqual({'a-b', 'a', 'a0b'},
1007
 
                         osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
 
889
            osutils.minimum_path_selection([]))
 
890
        self.assertEqual(set(['a']),
 
891
            osutils.minimum_path_selection(['a']))
 
892
        self.assertEqual(set(['a', 'b']),
 
893
            osutils.minimum_path_selection(['a', 'b']))
 
894
        self.assertEqual(set(['a/', 'b']),
 
895
            osutils.minimum_path_selection(['a/', 'b']))
 
896
        self.assertEqual(set(['a/', 'b']),
 
897
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
898
        self.assertEqual(set(['a-b', 'a', 'a0b']),
 
899
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
1008
900
 
1009
901
    def test_mkdtemp(self):
1010
902
        tmpdir = osutils._win32_mkdtemp(dir='.')
1011
903
        self.assertFalse('\\' in tmpdir)
1012
904
 
1013
905
    def test_rename(self):
1014
 
        with open('a', 'wb') as a:
1015
 
            a.write(b'foo\n')
1016
 
        with open('b', 'wb') as b:
1017
 
            b.write(b'baz\n')
 
906
        a = open('a', 'wb')
 
907
        a.write('foo\n')
 
908
        a.close()
 
909
        b = open('b', 'wb')
 
910
        b.write('baz\n')
 
911
        b.close()
1018
912
 
1019
913
        osutils._win32_rename('b', 'a')
1020
 
        self.assertPathExists('a')
1021
 
        self.assertPathDoesNotExist('b')
1022
 
        self.assertFileEqual(b'baz\n', 'a')
 
914
        self.failUnlessExists('a')
 
915
        self.failIfExists('b')
 
916
        self.assertFileEqual('baz\n', 'a')
1023
917
 
1024
918
    def test_rename_missing_file(self):
1025
 
        with open('a', 'wb') as a:
1026
 
            a.write(b'foo\n')
 
919
        a = open('a', 'wb')
 
920
        a.write('foo\n')
 
921
        a.close()
1027
922
 
1028
923
        try:
1029
924
            osutils._win32_rename('b', 'a')
1030
 
        except (IOError, OSError) as e:
 
925
        except (IOError, OSError), e:
1031
926
            self.assertEqual(errno.ENOENT, e.errno)
1032
 
        self.assertFileEqual(b'foo\n', 'a')
 
927
        self.assertFileEqual('foo\n', 'a')
1033
928
 
1034
929
    def test_rename_missing_dir(self):
1035
930
        os.mkdir('a')
1036
931
        try:
1037
932
            osutils._win32_rename('b', 'a')
1038
 
        except (IOError, OSError) as e:
 
933
        except (IOError, OSError), e:
1039
934
            self.assertEqual(errno.ENOENT, e.errno)
1040
935
 
1041
936
    def test_rename_current_dir(self):
1047
942
        # doesn't exist.
1048
943
        try:
1049
944
            osutils._win32_rename('b', '.')
1050
 
        except (IOError, OSError) as e:
 
945
        except (IOError, OSError), e:
1051
946
            self.assertEqual(errno.ENOENT, e.errno)
1052
947
 
1053
948
    def test_splitpath(self):
1058
953
        check(['a', 'b'], 'a/b')
1059
954
        check(['a', 'b'], 'a/./b')
1060
955
        check(['a', '.b'], 'a/.b')
1061
 
        if os.path.sep == '\\':
1062
 
            check(['a', '.b'], 'a\\.b')
1063
 
        else:
1064
 
            check(['a\\.b'], 'a\\.b')
 
956
        check(['a', '.b'], 'a\\.b')
1065
957
 
1066
958
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
1067
959
 
1079
971
    """Test mac special functions that require directories."""
1080
972
 
1081
973
    def test_getcwd(self):
1082
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
974
        self.requireFeature(tests.UnicodeFilenameFeature)
1083
975
        os.mkdir(u'B\xe5gfors')
1084
976
        os.chdir(u'B\xe5gfors')
1085
977
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
1086
978
 
1087
979
    def test_getcwd_nonnorm(self):
1088
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
980
        self.requireFeature(tests.UnicodeFilenameFeature)
1089
981
        # Test that _mac_getcwd() will normalize this path
1090
982
        os.mkdir(u'Ba\u030agfors')
1091
983
        os.chdir(u'Ba\u030agfors')
1095
987
class TestChunksToLines(tests.TestCase):
1096
988
 
1097
989
    def test_smoketest(self):
1098
 
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1099
 
                         osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
1100
 
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1101
 
                         osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
 
990
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
991
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
 
992
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
993
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
1102
994
 
1103
995
    def test_osutils_binding(self):
1104
 
        from . import test__chunks_to_lines
 
996
        from bzrlib.tests import test__chunks_to_lines
1105
997
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
1106
 
            from .._chunks_to_lines_pyx import chunks_to_lines
 
998
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
1107
999
        else:
1108
 
            from .._chunks_to_lines_py import chunks_to_lines
 
1000
            from bzrlib._chunks_to_lines_py import chunks_to_lines
1109
1001
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1110
1002
 
1111
1003
 
1118
1010
                         osutils.split_lines(u'foo\nbar\xae\n'))
1119
1011
 
1120
1012
    def test_split_with_carriage_returns(self):
1121
 
        self.assertEqual([b'foo\rbar\n'],
1122
 
                         osutils.split_lines(b'foo\rbar\n'))
 
1013
        self.assertEqual(['foo\rbar\n'],
 
1014
                         osutils.split_lines('foo\rbar\n'))
1123
1015
 
1124
1016
 
1125
1017
class TestWalkDirs(tests.TestCaseInTempDir):
1140
1032
            ]
1141
1033
        self.build_tree(tree)
1142
1034
        expected_dirblocks = [
1143
 
            (('', '.'),
1144
 
             [('0file', '0file', 'file'),
1145
 
              ('1dir', '1dir', 'directory'),
1146
 
              ('2file', '2file', 'file'),
1147
 
              ]
1148
 
             ),
1149
 
            (('1dir', './1dir'),
1150
 
             [('1dir/0file', '0file', 'file'),
1151
 
              ('1dir/1dir', '1dir', 'directory'),
1152
 
              ]
1153
 
             ),
1154
 
            (('1dir/1dir', './1dir/1dir'),
1155
 
             [
1156
 
                ]
1157
 
             ),
 
1035
                (('', '.'),
 
1036
                 [('0file', '0file', 'file'),
 
1037
                  ('1dir', '1dir', 'directory'),
 
1038
                  ('2file', '2file', 'file'),
 
1039
                 ]
 
1040
                ),
 
1041
                (('1dir', './1dir'),
 
1042
                 [('1dir/0file', '0file', 'file'),
 
1043
                  ('1dir/1dir', '1dir', 'directory'),
 
1044
                 ]
 
1045
                ),
 
1046
                (('1dir/1dir', './1dir/1dir'),
 
1047
                 [
 
1048
                 ]
 
1049
                ),
1158
1050
            ]
1159
1051
        result = []
1160
1052
        found_bzrdir = False
1180
1072
        if sys.platform == 'win32':
1181
1073
            raise tests.TestNotApplicable(
1182
1074
                "readdir IOError not tested on win32")
1183
 
        self.requireFeature(features.not_running_as_root)
1184
1075
        os.mkdir("test-unreadable")
1185
1076
        os.chmod("test-unreadable", 0000)
1186
1077
        # must chmod it back so that it can be removed
1187
 
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
 
1078
        self.addCleanup(os.chmod, "test-unreadable", 0700)
1188
1079
        # The error is not raised until the generator is actually evaluated.
1189
1080
        # (It would be ok if it happened earlier but at the moment it
1190
1081
        # doesn't.)
1191
1082
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1192
 
        self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
1193
 
        self.assertEqual(errno.EACCES, e.errno)
 
1083
        self.assertEquals('./test-unreadable', e.filename)
 
1084
        self.assertEquals(errno.EACCES, e.errno)
1194
1085
        # Ensure the message contains the file name
1195
 
        self.assertContainsRe(str(e), "\\./test-unreadable")
 
1086
        self.assertContainsRe(str(e), "\./test-unreadable")
 
1087
 
1196
1088
 
1197
1089
    def test_walkdirs_encoding_error(self):
1198
1090
        # <https://bugs.launchpad.net/bzr/+bug/488519>
1200
1092
        # are not using the filesystem's encoding
1201
1093
 
1202
1094
        # require a bytestring based filesystem
1203
 
        self.requireFeature(features.ByteStringNamedFilesystem)
 
1095
        self.requireFeature(tests.ByteStringNamedFilesystem)
1204
1096
 
1205
1097
        tree = [
1206
1098
            '.bzr',
1214
1106
        self.build_tree(tree)
1215
1107
 
1216
1108
        # rename the 1file to a latin-1 filename
1217
 
        os.rename(b"./1file", b"\xe8file")
1218
 
        if b"\xe8file" not in os.listdir("."):
1219
 
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
 
1109
        os.rename("./1file", "\xe8file")
1220
1110
 
1221
1111
        self._save_platform_info()
 
1112
        win32utils.winver = None # Avoid the win32 detection code
1222
1113
        osutils._fs_enc = 'UTF-8'
1223
1114
 
1224
1115
        # this should raise on error
1225
1116
        def attempt():
1226
 
            for dirdetail, dirblock in osutils.walkdirs(b'.'):
 
1117
            for dirdetail, dirblock in osutils.walkdirs('.'):
1227
1118
                pass
1228
1119
 
1229
1120
        self.assertRaises(errors.BadFilenameEncoding, attempt)
1239
1130
            ]
1240
1131
        self.build_tree(tree)
1241
1132
        expected_dirblocks = [
1242
 
            (('', '.'),
1243
 
             [('0file', '0file', 'file'),
1244
 
              ('1dir', '1dir', 'directory'),
1245
 
              ('2file', '2file', 'file'),
1246
 
              ]
1247
 
             ),
1248
 
            (('1dir', './1dir'),
1249
 
             [('1dir/0file', '0file', 'file'),
1250
 
              ('1dir/1dir', '1dir', 'directory'),
1251
 
              ]
1252
 
             ),
1253
 
            (('1dir/1dir', './1dir/1dir'),
1254
 
             [
1255
 
                ]
1256
 
             ),
 
1133
                (('', '.'),
 
1134
                 [('0file', '0file', 'file'),
 
1135
                  ('1dir', '1dir', 'directory'),
 
1136
                  ('2file', '2file', 'file'),
 
1137
                 ]
 
1138
                ),
 
1139
                (('1dir', './1dir'),
 
1140
                 [('1dir/0file', '0file', 'file'),
 
1141
                  ('1dir/1dir', '1dir', 'directory'),
 
1142
                 ]
 
1143
                ),
 
1144
                (('1dir/1dir', './1dir/1dir'),
 
1145
                 [
 
1146
                 ]
 
1147
                ),
1257
1148
            ]
1258
1149
        result = []
1259
1150
        found_bzrdir = False
1260
 
        for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
1261
 
            if len(dirblock) and dirblock[0][1] == b'.bzr':
 
1151
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1152
            if len(dirblock) and dirblock[0][1] == '.bzr':
1262
1153
                # this tests the filtering of selected paths
1263
1154
                found_bzrdir = True
1264
1155
                del dirblock[0]
1265
 
            dirdetail = (dirdetail[0].decode('utf-8'),
1266
 
                         osutils.safe_unicode(dirdetail[1]))
1267
 
            dirblock = [
1268
 
                (entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
1269
 
                for entry in dirblock]
1270
1156
            result.append((dirdetail, dirblock))
1271
1157
 
1272
1158
        self.assertTrue(found_bzrdir)
1288
1174
            dirblock[:] = new_dirblock
1289
1175
 
1290
1176
    def _save_platform_info(self):
 
1177
        self.overrideAttr(win32utils, 'winver')
1291
1178
        self.overrideAttr(osutils, '_fs_enc')
1292
1179
        self.overrideAttr(osutils, '_selected_dir_reader')
1293
1180
 
1294
 
    def assertDirReaderIs(self, expected, top):
 
1181
    def assertDirReaderIs(self, expected):
1295
1182
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
1296
1183
        # Force it to redetect
1297
1184
        osutils._selected_dir_reader = None
1298
1185
        # Nothing to list, but should still trigger the selection logic
1299
 
        self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
 
1186
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1300
1187
        self.assertIsInstance(osutils._selected_dir_reader, expected)
1301
1188
 
1302
1189
    def test_force_walkdirs_utf8_fs_utf8(self):
1303
1190
        self.requireFeature(UTF8DirReaderFeature)
1304
1191
        self._save_platform_info()
1305
 
        osutils._fs_enc = 'utf-8'
1306
 
        self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1192
        win32utils.winver = None # Avoid the win32 detection code
 
1193
        osutils._fs_enc = 'UTF-8'
 
1194
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1307
1195
 
1308
1196
    def test_force_walkdirs_utf8_fs_ascii(self):
1309
1197
        self.requireFeature(UTF8DirReaderFeature)
1310
1198
        self._save_platform_info()
1311
 
        osutils._fs_enc = 'ascii'
1312
 
        self.assertDirReaderIs(
1313
 
            UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1199
        win32utils.winver = None # Avoid the win32 detection code
 
1200
        osutils._fs_enc = 'US-ASCII'
 
1201
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1202
 
 
1203
    def test_force_walkdirs_utf8_fs_ANSI(self):
 
1204
        self.requireFeature(UTF8DirReaderFeature)
 
1205
        self._save_platform_info()
 
1206
        win32utils.winver = None # Avoid the win32 detection code
 
1207
        osutils._fs_enc = 'ANSI_X3.4-1968'
 
1208
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1314
1209
 
1315
1210
    def test_force_walkdirs_utf8_fs_latin1(self):
1316
1211
        self._save_platform_info()
1317
 
        osutils._fs_enc = 'iso-8859-1'
1318
 
        self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
 
1212
        win32utils.winver = None # Avoid the win32 detection code
 
1213
        osutils._fs_enc = 'latin1'
 
1214
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1319
1215
 
1320
1216
    def test_force_walkdirs_utf8_nt(self):
1321
1217
        # Disabled because the thunk of the whole walkdirs api is disabled.
1322
1218
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1323
1219
        self._save_platform_info()
1324
 
        from .._walkdirs_win32 import Win32ReadDir
1325
 
        self.assertDirReaderIs(Win32ReadDir, ".")
 
1220
        win32utils.winver = 'Windows NT'
 
1221
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1222
        self.assertDirReaderIs(Win32ReadDir)
 
1223
 
 
1224
    def test_force_walkdirs_utf8_98(self):
 
1225
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1226
        self._save_platform_info()
 
1227
        win32utils.winver = 'Windows 98'
 
1228
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1326
1229
 
1327
1230
    def test_unicode_walkdirs(self):
1328
1231
        """Walkdirs should always return unicode paths."""
1329
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1232
        self.requireFeature(tests.UnicodeFilenameFeature)
1330
1233
        name0 = u'0file-\xb6'
1331
1234
        name1 = u'1dir-\u062c\u0648'
1332
1235
        name2 = u'2file-\u0633'
1339
1242
            ]
1340
1243
        self.build_tree(tree)
1341
1244
        expected_dirblocks = [
1342
 
            ((u'', u'.'),
1343
 
             [(name0, name0, 'file', './' + name0),
1344
 
              (name1, name1, 'directory', './' + name1),
1345
 
              (name2, name2, 'file', './' + name2),
1346
 
              ]
1347
 
             ),
1348
 
            ((name1, './' + name1),
1349
 
             [(name1 + '/' + name0, name0, 'file', './' + name1
1350
 
               + '/' + name0),
1351
 
              (name1 + '/' + name1, name1, 'directory', './' + name1
1352
 
               + '/' + name1),
1353
 
              ]
1354
 
             ),
1355
 
            ((name1 + '/' + name1, './' + name1 + '/' + name1),
1356
 
             [
1357
 
                ]
1358
 
             ),
 
1245
                ((u'', u'.'),
 
1246
                 [(name0, name0, 'file', './' + name0),
 
1247
                  (name1, name1, 'directory', './' + name1),
 
1248
                  (name2, name2, 'file', './' + name2),
 
1249
                 ]
 
1250
                ),
 
1251
                ((name1, './' + name1),
 
1252
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1253
                                                        + '/' + name0),
 
1254
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1255
                                                            + '/' + name1),
 
1256
                 ]
 
1257
                ),
 
1258
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1259
                 [
 
1260
                 ]
 
1261
                ),
1359
1262
            ]
1360
1263
        result = list(osutils.walkdirs('.'))
1361
1264
        self._filter_out_stat(result)
1362
1265
        self.assertEqual(expected_dirblocks, result)
1363
 
        result = list(osutils.walkdirs(u'./' + name1, name1))
 
1266
        result = list(osutils.walkdirs(u'./'+name1, name1))
1364
1267
        self._filter_out_stat(result)
1365
1268
        self.assertEqual(expected_dirblocks[1:], result)
1366
1269
 
1369
1272
 
1370
1273
        The abspath portion might be in unicode or utf-8
1371
1274
        """
1372
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1275
        self.requireFeature(tests.UnicodeFilenameFeature)
1373
1276
        name0 = u'0file-\xb6'
1374
1277
        name1 = u'1dir-\u062c\u0648'
1375
1278
        name2 = u'2file-\u0633'
1386
1289
        name2 = name2.encode('utf8')
1387
1290
 
1388
1291
        expected_dirblocks = [
1389
 
            ((b'', b'.'),
1390
 
             [(name0, name0, 'file', b'./' + name0),
1391
 
              (name1, name1, 'directory', b'./' + name1),
1392
 
              (name2, name2, 'file', b'./' + name2),
1393
 
              ]
1394
 
             ),
1395
 
            ((name1, b'./' + name1),
1396
 
             [(name1 + b'/' + name0, name0, 'file', b'./' + name1
1397
 
               + b'/' + name0),
1398
 
              (name1 + b'/' + name1, name1, 'directory', b'./' + name1
1399
 
               + b'/' + name1),
1400
 
              ]
1401
 
             ),
1402
 
            ((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
1403
 
             [
1404
 
                ]
1405
 
             ),
 
1292
                (('', '.'),
 
1293
                 [(name0, name0, 'file', './' + name0),
 
1294
                  (name1, name1, 'directory', './' + name1),
 
1295
                  (name2, name2, 'file', './' + name2),
 
1296
                 ]
 
1297
                ),
 
1298
                ((name1, './' + name1),
 
1299
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1300
                                                        + '/' + name0),
 
1301
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1302
                                                            + '/' + name1),
 
1303
                 ]
 
1304
                ),
 
1305
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1306
                 [
 
1307
                 ]
 
1308
                ),
1406
1309
            ]
1407
1310
        result = []
1408
1311
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1409
1312
        # all abspaths are Unicode, and encode them back into utf8.
1410
1313
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1411
 
            self.assertIsInstance(dirdetail[0], bytes)
1412
 
            if isinstance(dirdetail[1], text_type):
 
1314
            self.assertIsInstance(dirdetail[0], str)
 
1315
            if isinstance(dirdetail[1], unicode):
1413
1316
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1414
1317
                dirblock = [list(info) for info in dirblock]
1415
1318
                for info in dirblock:
1416
 
                    self.assertIsInstance(info[4], text_type)
 
1319
                    self.assertIsInstance(info[4], unicode)
1417
1320
                    info[4] = info[4].encode('utf8')
1418
1321
            new_dirblock = []
1419
1322
            for info in dirblock:
1420
 
                self.assertIsInstance(info[0], bytes)
1421
 
                self.assertIsInstance(info[1], bytes)
1422
 
                self.assertIsInstance(info[4], bytes)
 
1323
                self.assertIsInstance(info[0], str)
 
1324
                self.assertIsInstance(info[1], str)
 
1325
                self.assertIsInstance(info[4], str)
1423
1326
                # Remove the stat information
1424
1327
                new_dirblock.append((info[0], info[1], info[2], info[4]))
1425
1328
            result.append((dirdetail, new_dirblock))
1430
1333
 
1431
1334
        The abspath portion should be in unicode
1432
1335
        """
1433
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1336
        self.requireFeature(tests.UnicodeFilenameFeature)
1434
1337
        # Use the unicode reader. TODO: split into driver-and-driven unit
1435
1338
        # tests.
1436
1339
        self._save_platform_info()
1453
1356
        # All of the abspaths should be in unicode, all of the relative paths
1454
1357
        # should be in utf8
1455
1358
        expected_dirblocks = [
1456
 
            ((b'', '.'),
1457
 
             [(name0, name0, 'file', './' + name0u),
1458
 
              (name1, name1, 'directory', './' + name1u),
1459
 
              (name2, name2, 'file', './' + name2u),
1460
 
              ]
1461
 
             ),
1462
 
            ((name1, './' + name1u),
1463
 
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
1464
 
               + '/' + name0u),
1465
 
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
1466
 
               + '/' + name1u),
1467
 
              ]
1468
 
             ),
1469
 
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1470
 
             [
1471
 
                ]
1472
 
             ),
 
1359
                (('', '.'),
 
1360
                 [(name0, name0, 'file', './' + name0u),
 
1361
                  (name1, name1, 'directory', './' + name1u),
 
1362
                  (name2, name2, 'file', './' + name2u),
 
1363
                 ]
 
1364
                ),
 
1365
                ((name1, './' + name1u),
 
1366
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1367
                                                        + '/' + name0u),
 
1368
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1369
                                                            + '/' + name1u),
 
1370
                 ]
 
1371
                ),
 
1372
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1373
                 [
 
1374
                 ]
 
1375
                ),
1473
1376
            ]
1474
1377
        result = list(osutils._walkdirs_utf8('.'))
1475
1378
        self._filter_out_stat(result)
1477
1380
 
1478
1381
    def test__walkdirs_utf8_win32readdir(self):
1479
1382
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1480
 
        self.requireFeature(features.UnicodeFilenameFeature)
1481
 
        from .._walkdirs_win32 import Win32ReadDir
 
1383
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1384
        from bzrlib._walkdirs_win32 import Win32ReadDir
1482
1385
        self._save_platform_info()
1483
1386
        osutils._selected_dir_reader = Win32ReadDir()
1484
1387
        name0u = u'0file-\xb6'
1499
1402
        # All of the abspaths should be in unicode, all of the relative paths
1500
1403
        # should be in utf8
1501
1404
        expected_dirblocks = [
1502
 
            (('', '.'),
1503
 
             [(name0, name0, 'file', './' + name0u),
1504
 
              (name1, name1, 'directory', './' + name1u),
1505
 
              (name2, name2, 'file', './' + name2u),
1506
 
              ]
1507
 
             ),
1508
 
            ((name1, './' + name1u),
1509
 
             [(name1 + '/' + name0, name0, 'file', './' + name1u
1510
 
               + '/' + name0u),
1511
 
              (name1 + '/' + name1, name1, 'directory', './' + name1u
1512
 
               + '/' + name1u),
1513
 
              ]
1514
 
             ),
1515
 
            ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1516
 
             [
1517
 
                ]
1518
 
             ),
 
1405
                (('', '.'),
 
1406
                 [(name0, name0, 'file', './' + name0u),
 
1407
                  (name1, name1, 'directory', './' + name1u),
 
1408
                  (name2, name2, 'file', './' + name2u),
 
1409
                 ]
 
1410
                ),
 
1411
                ((name1, './' + name1u),
 
1412
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1413
                                                        + '/' + name0u),
 
1414
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1415
                                                            + '/' + name1u),
 
1416
                 ]
 
1417
                ),
 
1418
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1419
                 [
 
1420
                 ]
 
1421
                ),
1519
1422
            ]
1520
1423
        result = list(osutils._walkdirs_utf8(u'.'))
1521
1424
        self._filter_out_stat(result)
1534
1437
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1535
1438
        """make sure our Stat values are valid"""
1536
1439
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1537
 
        self.requireFeature(features.UnicodeFilenameFeature)
1538
 
        from .._walkdirs_win32 import Win32ReadDir
 
1440
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1441
        from bzrlib._walkdirs_win32 import Win32ReadDir
1539
1442
        name0u = u'0file-\xb6'
1540
1443
        name0 = name0u.encode('utf8')
1541
1444
        self.build_tree([name0u])
1542
1445
        # I hate to sleep() here, but I'm trying to make the ctime different
1543
1446
        # from the mtime
1544
1447
        time.sleep(2)
1545
 
        with open(name0u, 'ab') as f:
1546
 
            f.write(b'just a small update')
 
1448
        f = open(name0u, 'ab')
 
1449
        try:
 
1450
            f.write('just a small update')
 
1451
        finally:
 
1452
            f.close()
1547
1453
 
1548
1454
        result = Win32ReadDir().read_dir('', u'.')
1549
1455
        entry = result[0]
1555
1461
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1556
1462
        """make sure our Stat values are valid"""
1557
1463
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1558
 
        self.requireFeature(features.UnicodeFilenameFeature)
1559
 
        from .._walkdirs_win32 import Win32ReadDir
 
1464
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1465
        from bzrlib._walkdirs_win32 import Win32ReadDir
1560
1466
        name0u = u'0dir-\u062c\u0648'
1561
1467
        name0 = name0u.encode('utf8')
1562
1468
        self.build_tree([name0u + '/'])
1642
1548
        # using the comparison routine shoudl work too:
1643
1549
        self.assertEqual(
1644
1550
            dir_sorted_paths,
1645
 
            sorted(original_paths, key=osutils.path_prefix_key))
 
1551
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1646
1552
 
1647
1553
 
1648
1554
class TestCopyTree(tests.TestCaseInTempDir):
1661
1567
        self.assertEqual(['c'], os.listdir('target/b'))
1662
1568
 
1663
1569
    def test_copy_tree_symlinks(self):
1664
 
        self.requireFeature(features.SymlinkFeature)
 
1570
        self.requireFeature(tests.SymlinkFeature)
1665
1571
        self.build_tree(['source/'])
1666
1572
        os.symlink('a/generic/path', 'source/lnk')
1667
1573
        osutils.copy_tree('source', 'target')
1671
1577
    def test_copy_tree_handlers(self):
1672
1578
        processed_files = []
1673
1579
        processed_links = []
1674
 
 
1675
1580
        def file_handler(from_path, to_path):
1676
1581
            processed_files.append(('f', from_path, to_path))
1677
 
 
1678
1582
        def dir_handler(from_path, to_path):
1679
1583
            processed_files.append(('d', from_path, to_path))
1680
 
 
1681
1584
        def link_handler(from_path, to_path):
1682
1585
            processed_links.append((from_path, to_path))
1683
 
        handlers = {'file': file_handler,
1684
 
                    'directory': dir_handler,
1685
 
                    'symlink': link_handler,
1686
 
                    }
 
1586
        handlers = {'file':file_handler,
 
1587
                    'directory':dir_handler,
 
1588
                    'symlink':link_handler,
 
1589
                   }
1687
1590
 
1688
1591
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1689
1592
        if osutils.has_symlinks():
1694
1597
                          ('f', 'source/a', 'target/a'),
1695
1598
                          ('d', 'source/b', 'target/b'),
1696
1599
                          ('f', 'source/b/c', 'target/b/c'),
1697
 
                          ], processed_files)
1698
 
        self.assertPathDoesNotExist('target')
 
1600
                         ], processed_files)
 
1601
        self.failIfExists('target')
1699
1602
        if osutils.has_symlinks():
1700
1603
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1701
1604
 
1706
1609
    def setUp(self):
1707
1610
        super(TestSetUnsetEnv, self).setUp()
1708
1611
 
1709
 
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
 
1612
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1710
1613
                         'Environment was not cleaned up properly.'
1711
 
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
1712
 
 
 
1614
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
1713
1615
        def cleanup():
1714
 
            if 'BRZ_TEST_ENV_VAR' in os.environ:
1715
 
                del os.environ['BRZ_TEST_ENV_VAR']
 
1616
            if 'BZR_TEST_ENV_VAR' in os.environ:
 
1617
                del os.environ['BZR_TEST_ENV_VAR']
1716
1618
        self.addCleanup(cleanup)
1717
1619
 
1718
1620
    def test_set(self):
1719
1621
        """Test that we can set an env variable"""
1720
 
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1622
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1721
1623
        self.assertEqual(None, old)
1722
 
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1624
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1723
1625
 
1724
1626
    def test_double_set(self):
1725
1627
        """Test that we get the old value out"""
1726
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1727
 
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
 
1628
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1629
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1728
1630
        self.assertEqual('foo', old)
1729
 
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1631
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1730
1632
 
1731
1633
    def test_unicode(self):
1732
1634
        """Environment can only contain plain strings
1739
1641
                'Cannot find a unicode character that works in encoding %s'
1740
1642
                % (osutils.get_user_encoding(),))
1741
1643
 
1742
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
1743
 
        if PY3:
1744
 
            self.assertEqual(uni_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1745
 
        else:
1746
 
            self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1644
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
 
1645
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1747
1646
 
1748
1647
    def test_unset(self):
1749
1648
        """Test that passing None will remove the env var"""
1750
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1751
 
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
 
1649
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1650
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1752
1651
        self.assertEqual('foo', old)
1753
 
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
1754
 
        self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
 
1652
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
 
1653
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1755
1654
 
1756
1655
 
1757
1656
class TestSizeShaFile(tests.TestCaseInTempDir):
1758
1657
 
1759
1658
    def test_sha_empty(self):
1760
 
        self.build_tree_contents([('foo', b'')])
1761
 
        expected_sha = osutils.sha_string(b'')
 
1659
        self.build_tree_contents([('foo', '')])
 
1660
        expected_sha = osutils.sha_string('')
1762
1661
        f = open('foo')
1763
1662
        self.addCleanup(f.close)
1764
1663
        size, sha = osutils.size_sha_file(f)
1766
1665
        self.assertEqual(expected_sha, sha)
1767
1666
 
1768
1667
    def test_sha_mixed_endings(self):
1769
 
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1668
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1770
1669
        self.build_tree_contents([('foo', text)])
1771
1670
        expected_sha = osutils.sha_string(text)
1772
1671
        f = open('foo', 'rb')
1779
1678
class TestShaFileByName(tests.TestCaseInTempDir):
1780
1679
 
1781
1680
    def test_sha_empty(self):
1782
 
        self.build_tree_contents([('foo', b'')])
1783
 
        expected_sha = osutils.sha_string(b'')
 
1681
        self.build_tree_contents([('foo', '')])
 
1682
        expected_sha = osutils.sha_string('')
1784
1683
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1785
1684
 
1786
1685
    def test_sha_mixed_endings(self):
1787
 
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1686
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1788
1687
        self.build_tree_contents([('foo', text)])
1789
1688
        expected_sha = osutils.sha_string(text)
1790
1689
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1793
1692
class TestResourceLoading(tests.TestCaseInTempDir):
1794
1693
 
1795
1694
    def test_resource_string(self):
1796
 
        # test resource in breezy
1797
 
        text = osutils.resource_string('breezy', 'debug.py')
 
1695
        # test resource in bzrlib
 
1696
        text = osutils.resource_string('bzrlib', 'debug.py')
1798
1697
        self.assertContainsRe(text, "debug_flags = set()")
1799
 
        # test resource under breezy
1800
 
        text = osutils.resource_string('breezy.ui', 'text.py')
 
1698
        # test resource under bzrlib
 
1699
        text = osutils.resource_string('bzrlib.ui', 'text.py')
1801
1700
        self.assertContainsRe(text, "class TextUIFactory")
1802
1701
        # test unsupported package
1803
1702
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1804
 
                          'yyy.xx')
 
1703
            'yyy.xx')
1805
1704
        # test unknown resource
1806
 
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
 
1705
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
 
1706
 
 
1707
 
 
1708
class TestReCompile(tests.TestCase):
 
1709
 
 
1710
    def _deprecated_re_compile_checked(self, *args, **kwargs):
 
1711
        return self.applyDeprecated(symbol_versioning.deprecated_in((2, 2, 0)),
 
1712
            osutils.re_compile_checked, *args, **kwargs)
 
1713
 
 
1714
    def test_re_compile_checked(self):
 
1715
        r = self._deprecated_re_compile_checked(r'A*', re.IGNORECASE)
 
1716
        self.assertTrue(r.match('aaaa'))
 
1717
        self.assertTrue(r.match('aAaA'))
 
1718
 
 
1719
    def test_re_compile_checked_error(self):
 
1720
        # like https://bugs.launchpad.net/bzr/+bug/251352
 
1721
 
 
1722
        # Due to possible test isolation error, re.compile is not lazy at
 
1723
        # this point. We re-install lazy compile.
 
1724
        lazy_regex.install_lazy_compile()
 
1725
        err = self.assertRaises(
 
1726
            errors.BzrCommandError,
 
1727
            self._deprecated_re_compile_checked, '*', re.IGNORECASE, 'test case')
 
1728
        self.assertEqual(
 
1729
            'Invalid regular expression in test case: '
 
1730
            '"*" nothing to repeat',
 
1731
            str(err))
1807
1732
 
1808
1733
 
1809
1734
class TestDirReader(tests.TestCaseInTempDir):
1810
1735
 
1811
 
    scenarios = dir_reader_scenarios()
1812
 
 
1813
1736
    # Set by load_tests
1814
1737
    _dir_reader_class = None
1815
1738
    _native_to_unicode = None
1816
1739
 
1817
1740
    def setUp(self):
1818
 
        super(TestDirReader, self).setUp()
 
1741
        tests.TestCaseInTempDir.setUp(self)
1819
1742
        self.overrideAttr(osutils,
1820
1743
                          '_selected_dir_reader', self._dir_reader_class())
1821
1744
 
1828
1751
            '2file'
1829
1752
            ]
1830
1753
        expected_dirblocks = [
1831
 
            ((b'', '.'),
1832
 
             [(b'0file', b'0file', 'file', './0file'),
1833
 
              (b'1dir', b'1dir', 'directory', './1dir'),
1834
 
              (b'2file', b'2file', 'file', './2file'),
1835
 
              ]
1836
 
             ),
1837
 
            ((b'1dir', './1dir'),
1838
 
             [(b'1dir/0file', b'0file', 'file', './1dir/0file'),
1839
 
              (b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
1840
 
              ]
1841
 
             ),
1842
 
            ((b'1dir/1dir', './1dir/1dir'),
1843
 
             [
1844
 
                ]
1845
 
             ),
 
1754
                (('', '.'),
 
1755
                 [('0file', '0file', 'file'),
 
1756
                  ('1dir', '1dir', 'directory'),
 
1757
                  ('2file', '2file', 'file'),
 
1758
                 ]
 
1759
                ),
 
1760
                (('1dir', './1dir'),
 
1761
                 [('1dir/0file', '0file', 'file'),
 
1762
                  ('1dir/1dir', '1dir', 'directory'),
 
1763
                 ]
 
1764
                ),
 
1765
                (('1dir/1dir', './1dir/1dir'),
 
1766
                 [
 
1767
                 ]
 
1768
                ),
1846
1769
            ]
1847
1770
        return tree, expected_dirblocks
1848
1771
 
1852
1775
        result = list(osutils._walkdirs_utf8('.'))
1853
1776
        # Filter out stat and abspath
1854
1777
        self.assertEqual(expected_dirblocks,
1855
 
                         self._filter_out(result))
 
1778
                         [(dirinfo, [line[0:3] for line in block])
 
1779
                          for dirinfo, block in result])
1856
1780
 
1857
1781
    def test_walk_sub_dir(self):
1858
1782
        tree, expected_dirblocks = self._get_ascii_tree()
1859
1783
        self.build_tree(tree)
1860
1784
        # you can search a subdir only, with a supplied prefix.
1861
 
        result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
 
1785
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1862
1786
        # Filter out stat and abspath
1863
1787
        self.assertEqual(expected_dirblocks[1:],
1864
 
                         self._filter_out(result))
 
1788
                         [(dirinfo, [line[0:3] for line in block])
 
1789
                          for dirinfo, block in result])
1865
1790
 
1866
1791
    def _get_unicode_tree(self):
1867
1792
        name0u = u'0file-\xb6'
1878
1803
        name1 = name1u.encode('UTF-8')
1879
1804
        name2 = name2u.encode('UTF-8')
1880
1805
        expected_dirblocks = [
1881
 
            ((b'', '.'),
1882
 
             [(name0, name0, 'file', './' + name0u),
1883
 
              (name1, name1, 'directory', './' + name1u),
1884
 
              (name2, name2, 'file', './' + name2u),
1885
 
              ]
1886
 
             ),
1887
 
            ((name1, './' + name1u),
1888
 
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
1889
 
               + '/' + name0u),
1890
 
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
1891
 
               + '/' + name1u),
1892
 
              ]
1893
 
             ),
1894
 
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1895
 
             [
1896
 
                ]
1897
 
             ),
 
1806
                (('', '.'),
 
1807
                 [(name0, name0, 'file', './' + name0u),
 
1808
                  (name1, name1, 'directory', './' + name1u),
 
1809
                  (name2, name2, 'file', './' + name2u),
 
1810
                 ]
 
1811
                ),
 
1812
                ((name1, './' + name1u),
 
1813
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1814
                                                        + '/' + name0u),
 
1815
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1816
                                                            + '/' + name1u),
 
1817
                 ]
 
1818
                ),
 
1819
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1820
                 [
 
1821
                 ]
 
1822
                ),
1898
1823
            ]
1899
1824
        return tree, expected_dirblocks
1900
1825
 
1908
1833
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1909
1834
            details = []
1910
1835
            for line in block:
1911
 
                details.append(
1912
 
                    line[0:3] + (self._native_to_unicode(line[4]), ))
 
1836
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1913
1837
            filtered_dirblocks.append((dirinfo, details))
1914
1838
        return filtered_dirblocks
1915
1839
 
1916
1840
    def test_walk_unicode_tree(self):
1917
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1841
        self.requireFeature(tests.UnicodeFilenameFeature)
1918
1842
        tree, expected_dirblocks = self._get_unicode_tree()
1919
1843
        self.build_tree(tree)
1920
1844
        result = list(osutils._walkdirs_utf8('.'))
1921
1845
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1922
1846
 
1923
1847
    def test_symlink(self):
1924
 
        self.requireFeature(features.SymlinkFeature)
1925
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1848
        self.requireFeature(tests.SymlinkFeature)
 
1849
        self.requireFeature(tests.UnicodeFilenameFeature)
1926
1850
        target = u'target\N{Euro Sign}'
1927
1851
        link_name = u'l\N{Euro Sign}nk'
1928
1852
        os.symlink(target, link_name)
 
1853
        target_utf8 = target.encode('UTF-8')
1929
1854
        link_name_utf8 = link_name.encode('UTF-8')
1930
1855
        expected_dirblocks = [
1931
 
            ((b'', '.'),
1932
 
             [(link_name_utf8, link_name_utf8,
1933
 
               'symlink', './' + link_name), ],
1934
 
             )]
 
1856
                (('', '.'),
 
1857
                 [(link_name_utf8, link_name_utf8,
 
1858
                   'symlink', './' + link_name),],
 
1859
                 )]
1935
1860
        result = list(osutils._walkdirs_utf8('.'))
1936
1861
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1937
1862
 
1945
1870
    But prior python versions failed to properly encode the passed unicode
1946
1871
    string.
1947
1872
    """
1948
 
    _test_needs_features = [features.SymlinkFeature,
1949
 
                            features.UnicodeFilenameFeature]
 
1873
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1950
1874
 
1951
1875
    def setUp(self):
1952
1876
        super(tests.TestCaseInTempDir, self).setUp()
1955
1879
        os.symlink(self.target, self.link)
1956
1880
 
1957
1881
    def test_os_readlink_link_encoding(self):
1958
 
        self.assertEqual(self.target, os.readlink(self.link))
 
1882
        if sys.version_info < (2, 6):
 
1883
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
 
1884
        else:
 
1885
            self.assertEquals(self.target,  os.readlink(self.link))
1959
1886
 
1960
1887
    def test_os_readlink_link_decoding(self):
1961
 
        self.assertEqual(self.target.encode(osutils._fs_enc),
1962
 
                         os.readlink(self.link.encode(osutils._fs_enc)))
 
1888
        self.assertEquals(self.target.encode(osutils._fs_enc),
 
1889
                          os.readlink(self.link.encode(osutils._fs_enc)))
1963
1890
 
1964
1891
 
1965
1892
class TestConcurrency(tests.TestCase):
1973
1900
        self.assertIsInstance(concurrency, int)
1974
1901
 
1975
1902
    def test_local_concurrency_environment_variable(self):
1976
 
        self.overrideEnv('BRZ_CONCURRENCY', '2')
 
1903
        os.environ['BZR_CONCURRENCY'] = '2'
1977
1904
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1978
 
        self.overrideEnv('BRZ_CONCURRENCY', '3')
 
1905
        os.environ['BZR_CONCURRENCY'] = '3'
1979
1906
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1980
 
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
 
1907
        os.environ['BZR_CONCURRENCY'] = 'foo'
1981
1908
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1982
1909
 
1983
1910
    def test_option_concurrency(self):
1984
 
        self.overrideEnv('BRZ_CONCURRENCY', '1')
 
1911
        os.environ['BZR_CONCURRENCY'] = '1'
1985
1912
        self.run_bzr('rocks --concurrency 42')
1986
 
        # Command line overrides environment variable
1987
 
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
1988
 
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
 
1913
        # Command line overrides envrionment variable
 
1914
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
 
1915
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1989
1916
 
1990
1917
 
1991
1918
class TestFailedToLoadExtension(tests.TestCase):
1992
1919
 
1993
1920
    def _try_loading(self):
1994
1921
        try:
1995
 
            import breezy._fictional_extension_py  # noqa: F401
1996
 
        except ImportError as e:
 
1922
            import bzrlib._fictional_extension_py
 
1923
        except ImportError, e:
1997
1924
            osutils.failed_to_load_extension(e)
1998
1925
            return True
1999
1926
 
2004
1931
    def test_failure_to_load(self):
2005
1932
        self._try_loading()
2006
1933
        self.assertLength(1, osutils._extension_load_failures)
2007
 
        if PY3:
2008
 
            self.assertEqual(
2009
 
                osutils._extension_load_failures[0],
2010
 
                "No module named 'breezy._fictional_extension_py'")
2011
 
        else:
2012
 
            self.assertEqual(osutils._extension_load_failures[0],
2013
 
                             "No module named _fictional_extension_py")
 
1934
        self.assertEquals(osutils._extension_load_failures[0],
 
1935
            "No module named _fictional_extension_py")
2014
1936
 
2015
1937
    def test_report_extension_load_failures_no_warning(self):
2016
1938
        self.assertTrue(self._try_loading())
2017
 
        warnings, result = self.callCatchWarnings(
2018
 
            osutils.report_extension_load_failures)
 
1939
        warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
2019
1940
        # it used to give a Python warning; it no longer does
2020
1941
        self.assertLength(0, warnings)
2021
1942
 
2022
1943
    def test_report_extension_load_failures_message(self):
2023
 
        log = BytesIO()
 
1944
        log = StringIO()
2024
1945
        trace.push_log_file(log)
2025
1946
        self.assertTrue(self._try_loading())
2026
1947
        osutils.report_extension_load_failures()
2027
1948
        self.assertContainsRe(
2028
1949
            log.getvalue(),
2029
 
            br"brz: warning: some compiled extensions could not be loaded; "
2030
 
            b"see ``brz help missing-extensions``\n"
 
1950
            r"bzr: warning: some compiled extensions could not be loaded; "
 
1951
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
2031
1952
            )
2032
1953
 
2033
1954
 
2034
1955
class TestTerminalWidth(tests.TestCase):
2035
1956
 
2036
1957
    def setUp(self):
2037
 
        super(TestTerminalWidth, self).setUp()
 
1958
        tests.TestCase.setUp(self)
2038
1959
        self._orig_terminal_size_state = osutils._terminal_size_state
2039
1960
        self._orig_first_terminal_size = osutils._first_terminal_size
2040
1961
        self.addCleanup(self.restore_osutils_globals)
2062
1983
    def test_default_values(self):
2063
1984
        self.assertEqual(80, osutils.default_terminal_width)
2064
1985
 
2065
 
    def test_defaults_to_BRZ_COLUMNS(self):
2066
 
        # BRZ_COLUMNS is set by the test framework
2067
 
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
2068
 
        self.overrideEnv('BRZ_COLUMNS', '12')
 
1986
    def test_defaults_to_BZR_COLUMNS(self):
 
1987
        # BZR_COLUMNS is set by the test framework
 
1988
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
 
1989
        os.environ['BZR_COLUMNS'] = '12'
2069
1990
        self.assertEqual(12, osutils.terminal_width())
2070
1991
 
2071
 
    def test_BRZ_COLUMNS_0_no_limit(self):
2072
 
        self.overrideEnv('BRZ_COLUMNS', '0')
2073
 
        self.assertEqual(None, osutils.terminal_width())
2074
 
 
2075
1992
    def test_falls_back_to_COLUMNS(self):
2076
 
        self.overrideEnv('BRZ_COLUMNS', None)
 
1993
        del os.environ['BZR_COLUMNS']
2077
1994
        self.assertNotEqual('42', os.environ['COLUMNS'])
2078
1995
        self.set_fake_tty()
2079
 
        self.overrideEnv('COLUMNS', '42')
 
1996
        os.environ['COLUMNS'] = '42'
2080
1997
        self.assertEqual(42, osutils.terminal_width())
2081
1998
 
2082
1999
    def test_tty_default_without_columns(self):
2083
 
        self.overrideEnv('BRZ_COLUMNS', None)
2084
 
        self.overrideEnv('COLUMNS', None)
 
2000
        del os.environ['BZR_COLUMNS']
 
2001
        del os.environ['COLUMNS']
2085
2002
 
2086
2003
        def terminal_size(w, h):
2087
2004
            return 42, 42
2094
2011
        self.assertEqual(42, osutils.terminal_width())
2095
2012
 
2096
2013
    def test_non_tty_default_without_columns(self):
2097
 
        self.overrideEnv('BRZ_COLUMNS', None)
2098
 
        self.overrideEnv('COLUMNS', None)
 
2014
        del os.environ['BZR_COLUMNS']
 
2015
        del os.environ['COLUMNS']
2099
2016
        self.replace_stdout(None)
2100
2017
        self.assertEqual(None, osutils.terminal_width())
2101
2018
 
2104
2021
        termios = term_ios_feature.module
2105
2022
        # bug 63539 is about a termios without TIOCGWINSZ attribute
2106
2023
        try:
2107
 
            termios.TIOCGWINSZ
 
2024
            orig = termios.TIOCGWINSZ
2108
2025
        except AttributeError:
2109
2026
            # We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
2110
2027
            pass
2111
2028
        else:
2112
2029
            self.overrideAttr(termios, 'TIOCGWINSZ')
2113
2030
            del termios.TIOCGWINSZ
2114
 
        self.overrideEnv('BRZ_COLUMNS', None)
2115
 
        self.overrideEnv('COLUMNS', None)
 
2031
        del os.environ['BZR_COLUMNS']
 
2032
        del os.environ['COLUMNS']
2116
2033
        # Whatever the result is, if we don't raise an exception, it's ok.
2117
2034
        osutils.terminal_width()
2118
2035
 
2119
 
 
2120
2036
class TestCreationOps(tests.TestCaseInTempDir):
2121
2037
    _test_needs_features = [features.chown_feature]
2122
2038
 
2123
2039
    def setUp(self):
2124
 
        super(TestCreationOps, self).setUp()
 
2040
        tests.TestCaseInTempDir.setUp(self)
2125
2041
        self.overrideAttr(os, 'chown', self._dummy_chown)
2126
2042
 
2127
2043
        # params set by call to _dummy_chown
2133
2049
    def test_copy_ownership_from_path(self):
2134
2050
        """copy_ownership_from_path test with specified src."""
2135
2051
        ownsrc = '/'
2136
 
        open('test_file', 'wt').close()
 
2052
        f = open('test_file', 'wt')
2137
2053
        osutils.copy_ownership_from_path('test_file', ownsrc)
2138
2054
 
2139
2055
        s = os.stat(ownsrc)
2140
 
        self.assertEqual(self.path, 'test_file')
2141
 
        self.assertEqual(self.uid, s.st_uid)
2142
 
        self.assertEqual(self.gid, s.st_gid)
 
2056
        self.assertEquals(self.path, 'test_file')
 
2057
        self.assertEquals(self.uid, s.st_uid)
 
2058
        self.assertEquals(self.gid, s.st_gid)
2143
2059
 
2144
2060
    def test_copy_ownership_nonesrc(self):
2145
2061
        """copy_ownership_from_path test with src=None."""
2146
 
        open('test_file', 'wt').close()
 
2062
        f = open('test_file', 'wt')
2147
2063
        # should use parent dir for permissions
2148
2064
        osutils.copy_ownership_from_path('test_file')
2149
2065
 
2150
2066
        s = os.stat('..')
2151
 
        self.assertEqual(self.path, 'test_file')
2152
 
        self.assertEqual(self.uid, s.st_uid)
2153
 
        self.assertEqual(self.gid, s.st_gid)
2154
 
 
2155
 
 
2156
 
class TestPathFromEnviron(tests.TestCase):
2157
 
 
2158
 
    def test_is_unicode(self):
2159
 
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
2160
 
        path = osutils.path_from_environ('BRZ_TEST_PATH')
2161
 
        self.assertIsInstance(path, text_type)
2162
 
        self.assertEqual(u'./anywhere at all/', path)
2163
 
 
2164
 
    def test_posix_path_env_ascii(self):
2165
 
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
2166
 
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
2167
 
        self.assertIsInstance(home, text_type)
2168
 
        self.assertEqual(u'/tmp', home)
2169
 
 
2170
 
    def test_posix_path_env_unicode(self):
2171
 
        self.requireFeature(features.ByteStringNamedFilesystem)
2172
 
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
2173
 
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2174
 
        self.assertEqual(u'/home/\xa7test',
2175
 
                         osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2176
 
        osutils._fs_enc = "iso8859-5"
2177
 
        if PY3:
2178
 
            # In Python 3, os.environ returns unicode.
2179
 
            self.assertEqual(u'/home/\xa7test',
2180
 
                             osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2181
 
        else:
2182
 
            self.assertEqual(u'/home/\u0407test',
2183
 
                             osutils._posix_path_from_environ('BRZ_TEST_PATH'))
2184
 
            osutils._fs_enc = "utf-8"
2185
 
            self.assertRaises(
2186
 
                errors.BadFilenameEncoding,
2187
 
                osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
2188
 
 
2189
 
 
2190
 
class TestGetHomeDir(tests.TestCase):
2191
 
 
2192
 
    def test_is_unicode(self):
2193
 
        home = osutils._get_home_dir()
2194
 
        self.assertIsInstance(home, text_type)
2195
 
 
2196
 
    def test_posix_homeless(self):
2197
 
        self.overrideEnv('HOME', None)
2198
 
        home = osutils._get_home_dir()
2199
 
        self.assertIsInstance(home, text_type)
2200
 
 
2201
 
    def test_posix_home_ascii(self):
2202
 
        self.overrideEnv('HOME', '/home/test')
2203
 
        home = osutils._posix_get_home_dir()
2204
 
        self.assertIsInstance(home, text_type)
2205
 
        self.assertEqual(u'/home/test', home)
2206
 
 
2207
 
    def test_posix_home_unicode(self):
2208
 
        self.requireFeature(features.ByteStringNamedFilesystem)
2209
 
        self.overrideEnv('HOME', '/home/\xa7test')
2210
 
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2211
 
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2212
 
        osutils._fs_enc = "iso8859-5"
2213
 
        if PY3:
2214
 
            # In python 3, os.environ returns unicode
2215
 
            self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2216
 
        else:
2217
 
            self.assertEqual(u'/home/\u0407test',
2218
 
                             osutils._posix_get_home_dir())
2219
 
            osutils._fs_enc = "utf-8"
2220
 
            self.assertRaises(errors.BadFilenameEncoding,
2221
 
                              osutils._posix_get_home_dir)
2222
 
 
 
2067
        self.assertEquals(self.path, 'test_file')
 
2068
        self.assertEquals(self.uid, s.st_uid)
 
2069
        self.assertEquals(self.gid, s.st_gid)
2223
2070
 
2224
2071
class TestGetuserUnicode(tests.TestCase):
2225
2072
 
2226
 
    def test_is_unicode(self):
2227
 
        user = osutils.getuser_unicode()
2228
 
        self.assertIsInstance(user, text_type)
2229
 
 
2230
 
    def envvar_to_override(self):
2231
 
        if sys.platform == "win32":
2232
 
            # Disable use of platform calls on windows so envvar is used
2233
 
            self.overrideAttr(win32utils, 'has_ctypes', False)
2234
 
            return 'USERNAME'  # only variable used on windows
2235
 
        return 'LOGNAME'  # first variable checked by getpass.getuser()
2236
 
 
2237
2073
    def test_ascii_user(self):
2238
 
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
 
2074
        osutils.set_or_unset_env('LOGNAME', 'jrandom')
2239
2075
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
2240
2076
 
2241
2077
    def test_unicode_user(self):
2242
2078
        ue = osutils.get_user_encoding()
2243
 
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
2244
 
        if uni_val is None:
2245
 
            raise tests.TestSkipped(
2246
 
                'Cannot find a unicode character that works in encoding %s'
2247
 
                % (osutils.get_user_encoding(),))
2248
 
        uni_username = u'jrandom' + uni_val
2249
 
        encoded_username = uni_username.encode(ue)
2250
 
        if PY3:
2251
 
            self.overrideEnv(self.envvar_to_override(), uni_username)
2252
 
        else:
2253
 
            self.overrideEnv(self.envvar_to_override(), encoded_username)
2254
 
        self.assertEqual(uni_username, osutils.getuser_unicode())
2255
 
 
2256
 
 
2257
 
class TestBackupNames(tests.TestCase):
2258
 
 
2259
 
    def setUp(self):
2260
 
        super(TestBackupNames, self).setUp()
2261
 
        self.backups = []
2262
 
 
2263
 
    def backup_exists(self, name):
2264
 
        return name in self.backups
2265
 
 
2266
 
    def available_backup_name(self, name):
2267
 
        backup_name = osutils.available_backup_name(name, self.backup_exists)
2268
 
        self.backups.append(backup_name)
2269
 
        return backup_name
2270
 
 
2271
 
    def assertBackupName(self, expected, name):
2272
 
        self.assertEqual(expected, self.available_backup_name(name))
2273
 
 
2274
 
    def test_empty(self):
2275
 
        self.assertBackupName('file.~1~', 'file')
2276
 
 
2277
 
    def test_existing(self):
2278
 
        self.available_backup_name('file')
2279
 
        self.available_backup_name('file')
2280
 
        self.assertBackupName('file.~3~', 'file')
2281
 
        # Empty slots are found, this is not a strict requirement and may be
2282
 
        # revisited if we test against all implementations.
2283
 
        self.backups.remove('file.~2~')
2284
 
        self.assertBackupName('file.~2~', 'file')
2285
 
 
2286
 
 
2287
 
class TestFindExecutableInPath(tests.TestCase):
2288
 
 
2289
 
    def test_windows(self):
2290
 
        if sys.platform != 'win32':
2291
 
            raise tests.TestSkipped('test requires win32')
2292
 
        self.assertTrue(osutils.find_executable_on_path(
2293
 
            'explorer') is not None)
2294
 
        self.assertTrue(
2295
 
            osutils.find_executable_on_path('explorer.exe') is not None)
2296
 
        self.assertTrue(
2297
 
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
2298
 
        self.assertTrue(
2299
 
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2300
 
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
2301
 
 
2302
 
    def test_windows_app_path(self):
2303
 
        if sys.platform != 'win32':
2304
 
            raise tests.TestSkipped('test requires win32')
2305
 
        # Override PATH env var so that exe can only be found on App Path
2306
 
        self.overrideEnv('PATH', '')
2307
 
        # Internt Explorer is always registered in the App Path
2308
 
        self.assertTrue(osutils.find_executable_on_path(
2309
 
            'iexplore') is not None)
2310
 
 
2311
 
    def test_other(self):
2312
 
        if sys.platform == 'win32':
2313
 
            raise tests.TestSkipped('test requires non-win32')
2314
 
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
2315
 
        self.assertTrue(
2316
 
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2317
 
 
2318
 
 
2319
 
class TestEnvironmentErrors(tests.TestCase):
2320
 
    """Test handling of environmental errors"""
2321
 
 
2322
 
    def test_is_oserror(self):
2323
 
        self.assertTrue(osutils.is_environment_error(
2324
 
            OSError(errno.EINVAL, "Invalid parameter")))
2325
 
 
2326
 
    def test_is_ioerror(self):
2327
 
        self.assertTrue(osutils.is_environment_error(
2328
 
            IOError(errno.EINVAL, "Invalid parameter")))
2329
 
 
2330
 
    def test_is_socket_error(self):
2331
 
        self.assertTrue(osutils.is_environment_error(
2332
 
            socket.error(errno.EINVAL, "Invalid parameter")))
2333
 
 
2334
 
    def test_is_select_error(self):
2335
 
        self.assertTrue(osutils.is_environment_error(
2336
 
            select.error(errno.EINVAL, "Invalid parameter")))
2337
 
 
2338
 
    def test_is_pywintypes_error(self):
2339
 
        self.requireFeature(features.pywintypes)
2340
 
        import pywintypes
2341
 
        self.assertTrue(osutils.is_environment_error(
2342
 
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))
2343
 
 
2344
 
 
2345
 
class SupportsExecutableTests(tests.TestCaseInTempDir):
2346
 
 
2347
 
    def test_returns_bool(self):
2348
 
        self.assertIsInstance(osutils.supports_executable(self.test_dir), bool)
2349
 
 
2350
 
 
2351
 
class SupportsSymlinksTests(tests.TestCaseInTempDir):
2352
 
 
2353
 
    def test_returns_bool(self):
2354
 
        self.assertIsInstance(osutils.supports_symlinks(self.test_dir), bool)
2355
 
 
2356
 
 
2357
 
class MtabReader(tests.TestCaseInTempDir):
2358
 
 
2359
 
    def test_read_mtab(self):
2360
 
        self.build_tree_contents([('mtab', """\
2361
 
/dev/mapper/blah--vg-root / ext4 rw,relatime,errors=remount-ro 0 0
2362
 
/dev/mapper/blah--vg-home /home vfat rw,relatime 0 0
2363
 
# comment
2364
 
 
2365
 
iminvalid
2366
 
""")])
2367
 
        self.assertEqual(
2368
 
            list(osutils.read_mtab('mtab')),
2369
 
            [(b'/', 'ext4'),
2370
 
             (b'/home', 'vfat')])
2371
 
 
2372
 
 
2373
 
class GetFsTypeTests(tests.TestCaseInTempDir):
2374
 
 
2375
 
    def test_returns_string_or_none(self):
2376
 
        ret = osutils.get_fs_type(self.test_dir)
2377
 
        self.assertTrue(isinstance(ret, text_type) or ret is None)
2378
 
 
2379
 
    def test_returns_most_specific(self):
2380
 
        self.overrideAttr(
2381
 
            osutils, '_FILESYSTEM_FINDER',
2382
 
            osutils.FilesystemFinder(
2383
 
                [(b'/', 'ext4'), (b'/home', 'vfat'),
2384
 
                 (b'/home/jelmer', 'ext2')]))
2385
 
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer/blah'), 'ext2')
2386
 
        self.assertEqual(osutils.get_fs_type('/home/jelmer/blah'), 'ext2')
2387
 
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer'), 'ext2')
2388
 
        self.assertEqual(osutils.get_fs_type(b'/home/martin'), 'vfat')
2389
 
        self.assertEqual(osutils.get_fs_type(b'/home'), 'vfat')
2390
 
        self.assertEqual(osutils.get_fs_type(b'/other'), 'ext4')
2391
 
 
2392
 
    def test_returns_none(self):
2393
 
        self.overrideAttr(
2394
 
            osutils, '_FILESYSTEM_FINDER',
2395
 
            osutils.FilesystemFinder([]))
2396
 
        self.assertIs(osutils.get_fs_type('/home/jelmer/blah'), None)
2397
 
        self.assertIs(osutils.get_fs_type(b'/home/jelmer/blah'), None)
2398
 
        self.assertIs(osutils.get_fs_type('/home/jelmer'), None)
 
2079
        osutils.set_or_unset_env('LOGNAME', u'jrandom\xb6'.encode(ue))
 
2080
        self.assertEqual(u'jrandom\xb6', osutils.getuser_unicode())