/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Marius Kruger
  • Date: 2010-07-10 21:28:56 UTC
  • mto: (5384.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5385.
  • Revision ID: marius.kruger@enerweb.co.za-20100710212856-uq4ji3go0u5se7hx
* Update documentation
* add NEWS

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2016 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
 
19
from cStringIO import StringIO
19
20
import errno
20
 
from io import BytesIO
21
21
import os
22
 
import select
 
22
import re
23
23
import socket
 
24
import stat
24
25
import sys
25
 
import tempfile
26
26
import time
27
27
 
28
 
from .. import (
 
28
from bzrlib import (
29
29
    errors,
 
30
    lazy_regex,
30
31
    osutils,
 
32
    symbol_versioning,
31
33
    tests,
32
34
    trace,
33
35
    win32utils,
34
36
    )
35
 
from . import (
 
37
from bzrlib.tests import (
36
38
    features,
37
39
    file_utils,
38
40
    test__walkdirs_win32,
39
41
    )
40
 
from .scenarios import load_tests_apply_scenarios
41
 
 
42
 
 
43
 
class _UTF8DirReaderFeature(features.ModuleAvailableFeature):
 
42
 
 
43
 
 
44
class _UTF8DirReaderFeature(tests.Feature):
44
45
 
45
46
    def _probe(self):
46
47
        try:
47
 
            from .. import _readdir_pyx
48
 
            self._module = _readdir_pyx
 
48
            from bzrlib import _readdir_pyx
49
49
            self.reader = _readdir_pyx.UTF8DirReader
50
50
            return True
51
51
        except ImportError:
52
52
            return False
53
53
 
54
 
 
55
 
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
56
 
 
57
 
term_ios_feature = features.ModuleAvailableFeature('termios')
 
54
    def feature_name(self):
 
55
        return 'bzrlib._readdir_pyx'
 
56
 
 
57
UTF8DirReaderFeature = _UTF8DirReaderFeature()
 
58
 
 
59
term_ios_feature = tests.ModuleAvailableFeature('termios')
58
60
 
59
61
 
60
62
def _already_unicode(s):
78
80
    # Some DirReaders are platform specific and even there they may not be
79
81
    # available.
80
82
    if UTF8DirReaderFeature.available():
81
 
        from .. import _readdir_pyx
 
83
        from bzrlib import _readdir_pyx
82
84
        scenarios.append(('utf8',
83
85
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
84
86
                               _native_to_unicode=_utf8_to_unicode)))
85
87
 
86
88
    if test__walkdirs_win32.win32_readdir_feature.available():
87
89
        try:
88
 
            from .. import _walkdirs_win32
 
90
            from bzrlib import _walkdirs_win32
89
91
            scenarios.append(
90
92
                ('win32',
91
93
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
95
97
    return scenarios
96
98
 
97
99
 
98
 
load_tests = load_tests_apply_scenarios
 
100
def load_tests(basic_tests, module, loader):
 
101
    suite = loader.suiteClass()
 
102
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
 
103
        basic_tests, tests.condition_isinstance(TestDirReader))
 
104
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
 
105
    suite.addTest(remaining_tests)
 
106
    return suite
99
107
 
100
108
 
101
109
class TestContainsWhitespace(tests.TestCase):
102
110
 
103
111
    def test_contains_whitespace(self):
104
 
        self.assertTrue(osutils.contains_whitespace(u' '))
105
 
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
106
 
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
107
 
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
108
 
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
109
 
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
 
112
        self.failUnless(osutils.contains_whitespace(u' '))
 
113
        self.failUnless(osutils.contains_whitespace(u'hello there'))
 
114
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
 
115
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
 
116
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
 
117
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
110
118
 
111
119
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
112
120
        # is whitespace, but we do not.
113
 
        self.assertFalse(osutils.contains_whitespace(u''))
114
 
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
115
 
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
 
121
        self.failIf(osutils.contains_whitespace(u''))
 
122
        self.failIf(osutils.contains_whitespace(u'hellothere'))
 
123
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
116
124
 
117
125
 
118
126
class TestRename(tests.TestCaseInTempDir):
130
138
 
131
139
    def test_fancy_rename(self):
132
140
        # This should work everywhere
133
 
        self.create_file('a', b'something in a\n')
 
141
        self.create_file('a', 'something in a\n')
134
142
        self._fancy_rename('a', 'b')
135
 
        self.assertPathDoesNotExist('a')
136
 
        self.assertPathExists('b')
137
 
        self.check_file_contents('b', b'something in a\n')
 
143
        self.failIfExists('a')
 
144
        self.failUnlessExists('b')
 
145
        self.check_file_contents('b', 'something in a\n')
138
146
 
139
 
        self.create_file('a', b'new something in a\n')
 
147
        self.create_file('a', 'new something in a\n')
140
148
        self._fancy_rename('b', 'a')
141
149
 
142
 
        self.check_file_contents('a', b'something in a\n')
 
150
        self.check_file_contents('a', 'something in a\n')
143
151
 
144
152
    def test_fancy_rename_fails_source_missing(self):
145
153
        # An exception should be raised, and the target should be left in place
146
 
        self.create_file('target', b'data in target\n')
 
154
        self.create_file('target', 'data in target\n')
147
155
        self.assertRaises((IOError, OSError), self._fancy_rename,
148
156
                          'missingsource', 'target')
149
 
        self.assertPathExists('target')
150
 
        self.check_file_contents('target', b'data in target\n')
 
157
        self.failUnlessExists('target')
 
158
        self.check_file_contents('target', 'data in target\n')
151
159
 
152
160
    def test_fancy_rename_fails_if_source_and_target_missing(self):
153
161
        self.assertRaises((IOError, OSError), self._fancy_rename,
155
163
 
156
164
    def test_rename(self):
157
165
        # Rename should be semi-atomic on all platforms
158
 
        self.create_file('a', b'something in a\n')
 
166
        self.create_file('a', 'something in a\n')
159
167
        osutils.rename('a', 'b')
160
 
        self.assertPathDoesNotExist('a')
161
 
        self.assertPathExists('b')
162
 
        self.check_file_contents('b', b'something in a\n')
 
168
        self.failIfExists('a')
 
169
        self.failUnlessExists('b')
 
170
        self.check_file_contents('b', 'something in a\n')
163
171
 
164
 
        self.create_file('a', b'new something in a\n')
 
172
        self.create_file('a', 'new something in a\n')
165
173
        osutils.rename('b', 'a')
166
174
 
167
 
        self.check_file_contents('a', b'something in a\n')
 
175
        self.check_file_contents('a', 'something in a\n')
168
176
 
169
177
    # TODO: test fancy_rename using a MemoryTransport
170
178
 
176
184
        # we can't use failUnlessExists on case-insensitive filesystem
177
185
        # so try to check shape of the tree
178
186
        shape = sorted(os.listdir('.'))
179
 
        self.assertEqual(['A', 'B'], shape)
180
 
 
181
 
    def test_rename_exception(self):
182
 
        try:
183
 
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
184
 
        except OSError as e:
185
 
            self.assertEqual(e.old_filename, 'nonexistent_path')
186
 
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
187
 
            self.assertTrue('nonexistent_path' in e.strerror)
188
 
            self.assertTrue('different_nonexistent_path' in e.strerror)
 
187
        self.assertEquals(['A', 'B'], shape)
189
188
 
190
189
 
191
190
class TestRandChars(tests.TestCase):
218
217
                         (['src'], SRC_FOO_C),
219
218
                         (['src'], 'src'),
220
219
                         ]:
221
 
            self.assertTrue(osutils.is_inside_any(dirs, fn))
 
220
            self.assert_(osutils.is_inside_any(dirs, fn))
222
221
        for dirs, fn in [(['src'], 'srccontrol'),
223
222
                         (['src'], 'srccontrol/foo')]:
224
223
            self.assertFalse(osutils.is_inside_any(dirs, fn))
230
229
                         (['src/bar.c', 'bla/foo.c'], 'src'),
231
230
                         (['src'], 'src'),
232
231
                         ]:
233
 
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
 
232
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
234
233
 
235
234
        for dirs, fn in [(['src'], 'srccontrol'),
236
235
                         (['srccontrol/foo.c'], 'src'),
238
237
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
239
238
 
240
239
 
241
 
class TestLstat(tests.TestCaseInTempDir):
242
 
 
243
 
    def test_lstat_matches_fstat(self):
244
 
        # On Windows, lstat and fstat don't always agree, primarily in the
245
 
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
246
 
        # custom implementation.
247
 
        if sys.platform == 'win32':
248
 
            # We only have special lstat/fstat if we have the extension.
249
 
            # Without it, we may end up re-reading content when we don't have
250
 
            # to, but otherwise it doesn't effect correctness.
251
 
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
252
 
        with open('test-file.txt', 'wb') as f:
253
 
            f.write(b'some content\n')
254
 
            f.flush()
255
 
            self.assertEqualStat(osutils.fstat(f.fileno()),
256
 
                                 osutils.lstat('test-file.txt'))
257
 
 
258
 
 
259
240
class TestRmTree(tests.TestCaseInTempDir):
260
241
 
261
242
    def test_rmtree(self):
262
243
        # Check to remove tree with read-only files/dirs
263
244
        os.mkdir('dir')
264
 
        with open('dir/file', 'w') as f:
265
 
            f.write('spam')
 
245
        f = file('dir/file', 'w')
 
246
        f.write('spam')
 
247
        f.close()
266
248
        # would like to also try making the directory readonly, but at the
267
249
        # moment python shutil.rmtree doesn't handle that properly - it would
268
250
        # need to chmod the directory before removing things inside it - deferred
272
254
 
273
255
        osutils.rmtree('dir')
274
256
 
275
 
        self.assertPathDoesNotExist('dir/file')
276
 
        self.assertPathDoesNotExist('dir')
 
257
        self.failIfExists('dir/file')
 
258
        self.failIfExists('dir')
277
259
 
278
260
 
279
261
class TestDeleteAny(tests.TestCaseInTempDir):
292
274
 
293
275
    def test_file_kind(self):
294
276
        self.build_tree(['file', 'dir/'])
295
 
        self.assertEqual('file', osutils.file_kind('file'))
296
 
        self.assertEqual('directory', osutils.file_kind('dir/'))
 
277
        self.assertEquals('file', osutils.file_kind('file'))
 
278
        self.assertEquals('directory', osutils.file_kind('dir/'))
297
279
        if osutils.has_symlinks():
298
280
            os.symlink('symlink', 'symlink')
299
 
            self.assertEqual('symlink', osutils.file_kind('symlink'))
 
281
            self.assertEquals('symlink', osutils.file_kind('symlink'))
300
282
 
301
283
        # TODO: jam 20060529 Test a block device
302
284
        try:
303
285
            os.lstat('/dev/null')
304
 
        except OSError as e:
 
286
        except OSError, e:
305
287
            if e.errno not in (errno.ENOENT,):
306
288
                raise
307
289
        else:
308
 
            self.assertEqual(
309
 
                'chardev',
310
 
                osutils.file_kind(os.path.realpath('/dev/null')))
 
290
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
311
291
 
312
292
        mkfifo = getattr(os, 'mkfifo', None)
313
293
        if mkfifo:
314
294
            mkfifo('fifo')
315
295
            try:
316
 
                self.assertEqual('fifo', osutils.file_kind('fifo'))
 
296
                self.assertEquals('fifo', osutils.file_kind('fifo'))
317
297
            finally:
318
298
                os.remove('fifo')
319
299
 
322
302
            s = socket.socket(AF_UNIX)
323
303
            s.bind('socket')
324
304
            try:
325
 
                self.assertEqual('socket', osutils.file_kind('socket'))
 
305
                self.assertEquals('socket', osutils.file_kind('socket'))
326
306
            finally:
327
307
                os.remove('socket')
328
308
 
347
327
 
348
328
        orig_umask = osutils.get_umask()
349
329
        self.addCleanup(os.umask, orig_umask)
350
 
        os.umask(0o222)
351
 
        self.assertEqual(0o222, osutils.get_umask())
352
 
        os.umask(0o022)
353
 
        self.assertEqual(0o022, osutils.get_umask())
354
 
        os.umask(0o002)
355
 
        self.assertEqual(0o002, osutils.get_umask())
356
 
        os.umask(0o027)
357
 
        self.assertEqual(0o027, osutils.get_umask())
 
330
        os.umask(0222)
 
331
        self.assertEqual(0222, osutils.get_umask())
 
332
        os.umask(0022)
 
333
        self.assertEqual(0022, osutils.get_umask())
 
334
        os.umask(0002)
 
335
        self.assertEqual(0002, osutils.get_umask())
 
336
        os.umask(0027)
 
337
        self.assertEqual(0027, osutils.get_umask())
358
338
 
359
339
 
360
340
class TestDateTime(tests.TestCase):
396
376
        self.assertFormatedDelta('2 seconds in the future', -2)
397
377
 
398
378
    def test_format_date(self):
399
 
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
400
 
                          osutils.format_date, 0, timezone='foo')
 
379
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
380
            osutils.format_date, 0, timezone='foo')
401
381
        self.assertIsInstance(osutils.format_date(0), str)
402
 
        self.assertIsInstance(osutils.format_local_date(0), str)
 
382
        self.assertIsInstance(osutils.format_local_date(0), unicode)
403
383
        # Testing for the actual value of the local weekday without
404
384
        # duplicating the code from format_date is difficult.
405
385
        # Instead blackbox.test_locale should check for localized
407
387
 
408
388
    def test_format_date_with_offset_in_original_timezone(self):
409
389
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
410
 
                         osutils.format_date_with_offset_in_original_timezone(0))
 
390
            osutils.format_date_with_offset_in_original_timezone(0))
411
391
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
412
 
                         osutils.format_date_with_offset_in_original_timezone(100000))
 
392
            osutils.format_date_with_offset_in_original_timezone(100000))
413
393
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
414
 
                         osutils.format_date_with_offset_in_original_timezone(100000, 7200))
 
394
            osutils.format_date_with_offset_in_original_timezone(100000, 7200))
415
395
 
416
396
    def test_local_time_offset(self):
417
397
        """Test that local_time_offset() returns a sane value."""
433
413
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
434
414
 
435
415
 
436
 
class TestFdatasync(tests.TestCaseInTempDir):
437
 
 
438
 
    def do_fdatasync(self):
439
 
        f = tempfile.NamedTemporaryFile()
440
 
        osutils.fdatasync(f.fileno())
441
 
        f.close()
442
 
 
443
 
    @staticmethod
444
 
    def raise_eopnotsupp(*args, **kwargs):
445
 
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
446
 
 
447
 
    @staticmethod
448
 
    def raise_enotsup(*args, **kwargs):
449
 
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
450
 
 
451
 
    def test_fdatasync_handles_system_function(self):
452
 
        self.overrideAttr(os, "fdatasync")
453
 
        self.do_fdatasync()
454
 
 
455
 
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
456
 
        self.overrideAttr(os, "fdatasync")
457
 
        self.overrideAttr(os, "fsync")
458
 
        self.do_fdatasync()
459
 
 
460
 
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
461
 
        self.overrideAttr(errno, "EOPNOTSUPP")
462
 
        self.do_fdatasync()
463
 
 
464
 
    def test_fdatasync_catches_ENOTSUP(self):
465
 
        enotsup = getattr(errno, "ENOTSUP", None)
466
 
        if enotsup is None:
467
 
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
468
 
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
469
 
        self.do_fdatasync()
470
 
 
471
 
    def test_fdatasync_catches_EOPNOTSUPP(self):
472
 
        enotsup = getattr(errno, "EOPNOTSUPP", None)
473
 
        if enotsup is None:
474
 
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
475
 
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
476
 
        self.do_fdatasync()
477
 
 
478
 
 
479
416
class TestLinks(tests.TestCaseInTempDir):
480
417
 
481
418
    def test_dereference_path(self):
482
 
        self.requireFeature(features.SymlinkFeature)
 
419
        self.requireFeature(tests.SymlinkFeature)
483
420
        cwd = osutils.realpath('.')
484
421
        os.mkdir('bar')
485
422
        bar_path = osutils.pathjoin(cwd, 'bar')
506
443
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
507
444
 
508
445
    def test_changing_access(self):
509
 
        with open('file', 'w') as f:
510
 
            f.write('monkey')
 
446
        f = file('file', 'w')
 
447
        f.write('monkey')
 
448
        f.close()
511
449
 
512
450
        # Make a file readonly
513
451
        osutils.make_readonly('file')
514
452
        mode = os.lstat('file').st_mode
515
 
        self.assertEqual(mode, mode & 0o777555)
 
453
        self.assertEqual(mode, mode & 0777555)
516
454
 
517
455
        # Make a file writable
518
456
        osutils.make_writable('file')
519
457
        mode = os.lstat('file').st_mode
520
 
        self.assertEqual(mode, mode | 0o200)
 
458
        self.assertEqual(mode, mode | 0200)
521
459
 
522
460
        if osutils.has_symlinks():
523
461
            # should not error when handed a symlink
531
469
 
532
470
class TestCanonicalRelPath(tests.TestCaseInTempDir):
533
471
 
534
 
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
 
472
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
535
473
 
536
474
    def test_canonical_relpath_simple(self):
537
 
        f = open('MixedCaseName', 'w')
 
475
        f = file('MixedCaseName', 'w')
538
476
        f.close()
539
477
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
540
 
        self.assertEqual('work/MixedCaseName', actual)
 
478
        self.failUnlessEqual('work/MixedCaseName', actual)
541
479
 
542
480
    def test_canonical_relpath_missing_tail(self):
543
481
        os.mkdir('MixedCaseParent')
544
482
        actual = osutils.canonical_relpath(self.test_base_dir,
545
483
                                           'mixedcaseparent/nochild')
546
 
        self.assertEqual('work/MixedCaseParent/nochild', actual)
 
484
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
547
485
 
548
486
 
549
487
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
593
531
    """Test pumpfile method."""
594
532
 
595
533
    def setUp(self):
596
 
        super(TestPumpFile, self).setUp()
 
534
        tests.TestCase.setUp(self)
597
535
        # create a test datablock
598
536
        self.block_size = 512
599
 
        pattern = b'0123456789ABCDEF'
600
 
        self.test_data = pattern * (3 * self.block_size // len(pattern))
 
537
        pattern = '0123456789ABCDEF'
 
538
        self.test_data = pattern * (3 * self.block_size / len(pattern))
601
539
        self.test_data_len = len(self.test_data)
602
540
 
603
541
    def test_bracket_block_size(self):
607
545
        self.assertTrue(self.test_data_len > self.block_size)
608
546
 
609
547
        from_file = file_utils.FakeReadFile(self.test_data)
610
 
        to_file = BytesIO()
 
548
        to_file = StringIO()
611
549
 
612
 
        # read (max // 2) bytes and verify read size wasn't affected
613
 
        num_bytes_to_read = self.block_size // 2
614
 
        osutils.pumpfile(from_file, to_file,
615
 
                         num_bytes_to_read, self.block_size)
 
550
        # read (max / 2) bytes and verify read size wasn't affected
 
551
        num_bytes_to_read = self.block_size / 2
 
552
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
616
553
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
617
554
        self.assertEqual(from_file.get_read_count(), 1)
618
555
 
619
556
        # read (max) bytes and verify read size wasn't affected
620
557
        num_bytes_to_read = self.block_size
621
558
        from_file.reset_read_count()
622
 
        osutils.pumpfile(from_file, to_file,
623
 
                         num_bytes_to_read, self.block_size)
 
559
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
624
560
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
625
561
        self.assertEqual(from_file.get_read_count(), 1)
626
562
 
627
563
        # read (max + 1) bytes and verify read size was limited
628
564
        num_bytes_to_read = self.block_size + 1
629
565
        from_file.reset_read_count()
630
 
        osutils.pumpfile(from_file, to_file,
631
 
                         num_bytes_to_read, self.block_size)
 
566
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
632
567
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
633
568
        self.assertEqual(from_file.get_read_count(), 2)
634
569
 
635
570
        # finish reading the rest of the data
636
571
        num_bytes_to_read = self.test_data_len - to_file.tell()
637
 
        osutils.pumpfile(from_file, to_file,
638
 
                         num_bytes_to_read, self.block_size)
 
572
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
639
573
 
640
574
        # report error if the data wasn't equal (we only report the size due
641
575
        # to the length of the data)
652
586
 
653
587
        # retrieve data in blocks
654
588
        from_file = file_utils.FakeReadFile(self.test_data)
655
 
        to_file = BytesIO()
 
589
        to_file = StringIO()
656
590
        osutils.pumpfile(from_file, to_file, self.test_data_len,
657
591
                         self.block_size)
658
592
 
676
610
 
677
611
        # retrieve data to EOF
678
612
        from_file = file_utils.FakeReadFile(self.test_data)
679
 
        to_file = BytesIO()
 
613
        to_file = StringIO()
680
614
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
681
615
 
682
616
        # verify read size was equal to the maximum read size
696
630
        with this new version."""
697
631
        # retrieve data using default (old) pumpfile method
698
632
        from_file = file_utils.FakeReadFile(self.test_data)
699
 
        to_file = BytesIO()
 
633
        to_file = StringIO()
700
634
        osutils.pumpfile(from_file, to_file)
701
635
 
702
636
        # report error if the data wasn't equal (we only report the size due
708
642
 
709
643
    def test_report_activity(self):
710
644
        activity = []
711
 
 
712
645
        def log_activity(length, direction):
713
646
            activity.append((length, direction))
714
 
        from_file = BytesIO(self.test_data)
715
 
        to_file = BytesIO()
 
647
        from_file = StringIO(self.test_data)
 
648
        to_file = StringIO()
716
649
        osutils.pumpfile(from_file, to_file, buff_size=500,
717
650
                         report_activity=log_activity, direction='read')
718
651
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
719
652
                          (36, 'read')], activity)
720
653
 
721
 
        from_file = BytesIO(self.test_data)
722
 
        to_file = BytesIO()
 
654
        from_file = StringIO(self.test_data)
 
655
        to_file = StringIO()
723
656
        del activity[:]
724
657
        osutils.pumpfile(from_file, to_file, buff_size=500,
725
658
                         report_activity=log_activity, direction='write')
727
660
                          (36, 'write')], activity)
728
661
 
729
662
        # And with a limited amount of data
730
 
        from_file = BytesIO(self.test_data)
731
 
        to_file = BytesIO()
 
663
        from_file = StringIO(self.test_data)
 
664
        to_file = StringIO()
732
665
        del activity[:]
733
666
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
734
667
                         report_activity=log_activity, direction='read')
735
 
        self.assertEqual(
736
 
            [(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
668
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
669
 
737
670
 
738
671
 
739
672
class TestPumpStringFile(tests.TestCase):
740
673
 
741
674
    def test_empty(self):
742
 
        output = BytesIO()
743
 
        osutils.pump_string_file(b"", output)
744
 
        self.assertEqual(b"", output.getvalue())
 
675
        output = StringIO()
 
676
        osutils.pump_string_file("", output)
 
677
        self.assertEqual("", output.getvalue())
745
678
 
746
679
    def test_more_than_segment_size(self):
747
 
        output = BytesIO()
748
 
        osutils.pump_string_file(b"123456789", output, 2)
749
 
        self.assertEqual(b"123456789", output.getvalue())
 
680
        output = StringIO()
 
681
        osutils.pump_string_file("123456789", output, 2)
 
682
        self.assertEqual("123456789", output.getvalue())
750
683
 
751
684
    def test_segment_size(self):
752
 
        output = BytesIO()
753
 
        osutils.pump_string_file(b"12", output, 2)
754
 
        self.assertEqual(b"12", output.getvalue())
 
685
        output = StringIO()
 
686
        osutils.pump_string_file("12", output, 2)
 
687
        self.assertEqual("12", output.getvalue())
755
688
 
756
689
    def test_segment_size_multiple(self):
757
 
        output = BytesIO()
758
 
        osutils.pump_string_file(b"1234", output, 2)
759
 
        self.assertEqual(b"1234", output.getvalue())
 
690
        output = StringIO()
 
691
        osutils.pump_string_file("1234", output, 2)
 
692
        self.assertEqual("1234", output.getvalue())
760
693
 
761
694
 
762
695
class TestRelpath(tests.TestCase):
781
714
class TestSafeUnicode(tests.TestCase):
782
715
 
783
716
    def test_from_ascii_string(self):
784
 
        self.assertEqual(u'foobar', osutils.safe_unicode(b'foobar'))
 
717
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
785
718
 
786
719
    def test_from_unicode_string_ascii_contents(self):
787
720
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
790
723
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
791
724
 
792
725
    def test_from_utf8_string(self):
793
 
        self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
 
726
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
794
727
 
795
728
    def test_bad_utf8_string(self):
796
729
        self.assertRaises(errors.BzrBadParameterNotUnicode,
797
730
                          osutils.safe_unicode,
798
 
                          b'\xbb\xbb')
 
731
                          '\xbb\xbb')
799
732
 
800
733
 
801
734
class TestSafeUtf8(tests.TestCase):
802
735
 
803
736
    def test_from_ascii_string(self):
804
 
        f = b'foobar'
805
 
        self.assertEqual(b'foobar', osutils.safe_utf8(f))
 
737
        f = 'foobar'
 
738
        self.assertEqual('foobar', osutils.safe_utf8(f))
806
739
 
807
740
    def test_from_unicode_string_ascii_contents(self):
808
 
        self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
 
741
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
809
742
 
810
743
    def test_from_unicode_string_unicode_contents(self):
811
 
        self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
744
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
812
745
 
813
746
    def test_from_utf8_string(self):
814
 
        self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
 
747
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
815
748
 
816
749
    def test_bad_utf8_string(self):
817
750
        self.assertRaises(errors.BzrBadParameterNotUnicode,
818
 
                          osutils.safe_utf8, b'\xbb\xbb')
819
 
 
820
 
 
821
 
class TestSendAll(tests.TestCase):
822
 
 
823
 
    def test_send_with_disconnected_socket(self):
824
 
        class DisconnectedSocket(object):
825
 
            def __init__(self, err):
826
 
                self.err = err
827
 
 
828
 
            def send(self, content):
829
 
                raise self.err
830
 
 
831
 
            def close(self):
832
 
                pass
833
 
        # All of these should be treated as ConnectionReset
834
 
        errs = []
835
 
        for err_cls in (IOError, socket.error):
836
 
            for errnum in osutils._end_of_stream_errors:
837
 
                errs.append(err_cls(errnum))
838
 
        for err in errs:
839
 
            sock = DisconnectedSocket(err)
840
 
            self.assertRaises(errors.ConnectionReset,
841
 
                              osutils.send_all, sock, b'some more content')
842
 
 
843
 
    def test_send_with_no_progress(self):
844
 
        # See https://bugs.launchpad.net/bzr/+bug/1047309
845
 
        # It seems that paramiko can get into a state where it doesn't error,
846
 
        # but it returns 0 bytes sent for requests over and over again.
847
 
        class NoSendingSocket(object):
848
 
            def __init__(self):
849
 
                self.call_count = 0
850
 
 
851
 
            def send(self, bytes):
852
 
                self.call_count += 1
853
 
                if self.call_count > 100:
854
 
                    # Prevent the test suite from hanging
855
 
                    raise RuntimeError('too many calls')
856
 
                return 0
857
 
        sock = NoSendingSocket()
858
 
        self.assertRaises(errors.ConnectionReset,
859
 
                          osutils.send_all, sock, b'content')
860
 
        self.assertEqual(1, sock.call_count)
861
 
 
862
 
 
863
 
class TestPosixFuncs(tests.TestCase):
864
 
    """Test that the posix version of normpath returns an appropriate path
865
 
       when used with 2 leading slashes."""
866
 
 
867
 
    def test_normpath(self):
868
 
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
869
 
        self.assertEqual(
870
 
            '/etc/shadow', osutils._posix_normpath('//etc/shadow'))
871
 
        self.assertEqual(
872
 
            '/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
751
                          osutils.safe_utf8, '\xbb\xbb')
 
752
 
 
753
 
 
754
class TestSafeRevisionId(tests.TestCase):
 
755
 
 
756
    def test_from_ascii_string(self):
 
757
        # this shouldn't give a warning because it's getting an ascii string
 
758
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
759
 
 
760
    def test_from_unicode_string_ascii_contents(self):
 
761
        self.assertEqual('bargam',
 
762
                         osutils.safe_revision_id(u'bargam', warn=False))
 
763
 
 
764
    def test_from_unicode_deprecated(self):
 
765
        self.assertEqual('bargam',
 
766
            self.callDeprecated([osutils._revision_id_warning],
 
767
                                osutils.safe_revision_id, u'bargam'))
 
768
 
 
769
    def test_from_unicode_string_unicode_contents(self):
 
770
        self.assertEqual('bargam\xc2\xae',
 
771
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
772
 
 
773
    def test_from_utf8_string(self):
 
774
        self.assertEqual('foo\xc2\xae',
 
775
                         osutils.safe_revision_id('foo\xc2\xae'))
 
776
 
 
777
    def test_none(self):
 
778
        """Currently, None is a valid revision_id"""
 
779
        self.assertEqual(None, osutils.safe_revision_id(None))
 
780
 
 
781
 
 
782
class TestSafeFileId(tests.TestCase):
 
783
 
 
784
    def test_from_ascii_string(self):
 
785
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
786
 
 
787
    def test_from_unicode_string_ascii_contents(self):
 
788
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
 
789
 
 
790
    def test_from_unicode_deprecated(self):
 
791
        self.assertEqual('bargam',
 
792
            self.callDeprecated([osutils._file_id_warning],
 
793
                                osutils.safe_file_id, u'bargam'))
 
794
 
 
795
    def test_from_unicode_string_unicode_contents(self):
 
796
        self.assertEqual('bargam\xc2\xae',
 
797
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
798
 
 
799
    def test_from_utf8_string(self):
 
800
        self.assertEqual('foo\xc2\xae',
 
801
                         osutils.safe_file_id('foo\xc2\xae'))
 
802
 
 
803
    def test_none(self):
 
804
        """Currently, None is a valid revision_id"""
 
805
        self.assertEqual(None, osutils.safe_file_id(None))
873
806
 
874
807
 
875
808
class TestWin32Funcs(tests.TestCase):
876
809
    """Test that _win32 versions of os utilities return appropriate paths."""
877
810
 
878
811
    def test_abspath(self):
879
 
        self.requireFeature(features.win32_feature)
880
812
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
881
813
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
882
814
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
895
827
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
896
828
        self.assertEqual('path/to/foo',
897
829
                         osutils._win32_pathjoin('path/to/', 'foo'))
898
 
 
899
 
    def test_pathjoin_late_bugfix(self):
900
 
        expected = 'C:/foo'
901
 
        self.assertEqual(expected,
 
830
        self.assertEqual('/foo',
902
831
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
903
 
        self.assertEqual(expected,
 
832
        self.assertEqual('/foo',
904
833
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
905
834
 
906
835
    def test_normpath(self):
911
840
 
912
841
    def test_getcwd(self):
913
842
        cwd = osutils._win32_getcwd()
914
 
        os_cwd = osutils._getcwd()
 
843
        os_cwd = os.getcwdu()
915
844
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
916
845
        # win32 is inconsistent whether it returns lower or upper case
917
846
        # and even if it was consistent the user might type the other
925
854
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
926
855
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
927
856
 
 
857
    def test_win98_abspath(self):
 
858
        # absolute path
 
859
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
 
860
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
 
861
        # UNC path
 
862
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
 
863
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
 
864
        # relative path
 
865
        cwd = osutils.getcwd().rstrip('/')
 
866
        drive = osutils.ntpath.splitdrive(cwd)[0]
 
867
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
 
868
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
 
869
        # unicode path
 
870
        u = u'\u1234'
 
871
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
 
872
 
928
873
 
929
874
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
930
875
    """Test win32 functions that create files."""
931
876
 
932
877
    def test_getcwd(self):
933
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
878
        self.requireFeature(tests.UnicodeFilenameFeature)
934
879
        os.mkdir(u'mu-\xb5')
935
880
        os.chdir(u'mu-\xb5')
936
881
        # TODO: jam 20060427 This will probably fail on Mac OSX because
941
886
 
942
887
    def test_minimum_path_selection(self):
943
888
        self.assertEqual(set(),
944
 
                         osutils.minimum_path_selection([]))
945
 
        self.assertEqual({'a'},
946
 
                         osutils.minimum_path_selection(['a']))
947
 
        self.assertEqual({'a', 'b'},
948
 
                         osutils.minimum_path_selection(['a', 'b']))
949
 
        self.assertEqual({'a/', 'b'},
950
 
                         osutils.minimum_path_selection(['a/', 'b']))
951
 
        self.assertEqual({'a/', 'b'},
952
 
                         osutils.minimum_path_selection(['a/c', 'a/', 'b']))
953
 
        self.assertEqual({'a-b', 'a', 'a0b'},
954
 
                         osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
 
889
            osutils.minimum_path_selection([]))
 
890
        self.assertEqual(set(['a']),
 
891
            osutils.minimum_path_selection(['a']))
 
892
        self.assertEqual(set(['a', 'b']),
 
893
            osutils.minimum_path_selection(['a', 'b']))
 
894
        self.assertEqual(set(['a/', 'b']),
 
895
            osutils.minimum_path_selection(['a/', 'b']))
 
896
        self.assertEqual(set(['a/', 'b']),
 
897
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
898
        self.assertEqual(set(['a-b', 'a', 'a0b']),
 
899
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
955
900
 
956
901
    def test_mkdtemp(self):
957
902
        tmpdir = osutils._win32_mkdtemp(dir='.')
958
903
        self.assertFalse('\\' in tmpdir)
959
904
 
960
905
    def test_rename(self):
961
 
        with open('a', 'wb') as a:
962
 
            a.write(b'foo\n')
963
 
        with open('b', 'wb') as b:
964
 
            b.write(b'baz\n')
 
906
        a = open('a', 'wb')
 
907
        a.write('foo\n')
 
908
        a.close()
 
909
        b = open('b', 'wb')
 
910
        b.write('baz\n')
 
911
        b.close()
965
912
 
966
913
        osutils._win32_rename('b', 'a')
967
 
        self.assertPathExists('a')
968
 
        self.assertPathDoesNotExist('b')
969
 
        self.assertFileEqual(b'baz\n', 'a')
 
914
        self.failUnlessExists('a')
 
915
        self.failIfExists('b')
 
916
        self.assertFileEqual('baz\n', 'a')
970
917
 
971
918
    def test_rename_missing_file(self):
972
 
        with open('a', 'wb') as a:
973
 
            a.write(b'foo\n')
 
919
        a = open('a', 'wb')
 
920
        a.write('foo\n')
 
921
        a.close()
974
922
 
975
923
        try:
976
924
            osutils._win32_rename('b', 'a')
977
 
        except (IOError, OSError) as e:
 
925
        except (IOError, OSError), e:
978
926
            self.assertEqual(errno.ENOENT, e.errno)
979
 
        self.assertFileEqual(b'foo\n', 'a')
 
927
        self.assertFileEqual('foo\n', 'a')
980
928
 
981
929
    def test_rename_missing_dir(self):
982
930
        os.mkdir('a')
983
931
        try:
984
932
            osutils._win32_rename('b', 'a')
985
 
        except (IOError, OSError) as e:
 
933
        except (IOError, OSError), e:
986
934
            self.assertEqual(errno.ENOENT, e.errno)
987
935
 
988
936
    def test_rename_current_dir(self):
994
942
        # doesn't exist.
995
943
        try:
996
944
            osutils._win32_rename('b', '.')
997
 
        except (IOError, OSError) as e:
 
945
        except (IOError, OSError), e:
998
946
            self.assertEqual(errno.ENOENT, e.errno)
999
947
 
1000
948
    def test_splitpath(self):
1005
953
        check(['a', 'b'], 'a/b')
1006
954
        check(['a', 'b'], 'a/./b')
1007
955
        check(['a', '.b'], 'a/.b')
1008
 
        if os.path.sep == '\\':
1009
 
            check(['a', '.b'], 'a\\.b')
1010
 
        else:
1011
 
            check(['a\\.b'], 'a\\.b')
 
956
        check(['a', '.b'], 'a\\.b')
1012
957
 
1013
958
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
1014
959
 
1026
971
    """Test mac special functions that require directories."""
1027
972
 
1028
973
    def test_getcwd(self):
1029
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
974
        self.requireFeature(tests.UnicodeFilenameFeature)
1030
975
        os.mkdir(u'B\xe5gfors')
1031
976
        os.chdir(u'B\xe5gfors')
1032
977
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
1033
978
 
1034
979
    def test_getcwd_nonnorm(self):
1035
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
980
        self.requireFeature(tests.UnicodeFilenameFeature)
1036
981
        # Test that _mac_getcwd() will normalize this path
1037
982
        os.mkdir(u'Ba\u030agfors')
1038
983
        os.chdir(u'Ba\u030agfors')
1042
987
class TestChunksToLines(tests.TestCase):
1043
988
 
1044
989
    def test_smoketest(self):
1045
 
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1046
 
                         osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
1047
 
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
1048
 
                         osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
 
990
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
991
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
 
992
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
993
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
1049
994
 
1050
995
    def test_osutils_binding(self):
1051
 
        from . import test__chunks_to_lines
 
996
        from bzrlib.tests import test__chunks_to_lines
1052
997
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
1053
 
            from .._chunks_to_lines_pyx import chunks_to_lines
 
998
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
1054
999
        else:
1055
 
            from .._chunks_to_lines_py import chunks_to_lines
 
1000
            from bzrlib._chunks_to_lines_py import chunks_to_lines
1056
1001
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1057
1002
 
1058
1003
 
1065
1010
                         osutils.split_lines(u'foo\nbar\xae\n'))
1066
1011
 
1067
1012
    def test_split_with_carriage_returns(self):
1068
 
        self.assertEqual([b'foo\rbar\n'],
1069
 
                         osutils.split_lines(b'foo\rbar\n'))
 
1013
        self.assertEqual(['foo\rbar\n'],
 
1014
                         osutils.split_lines('foo\rbar\n'))
1070
1015
 
1071
1016
 
1072
1017
class TestWalkDirs(tests.TestCaseInTempDir):
1087
1032
            ]
1088
1033
        self.build_tree(tree)
1089
1034
        expected_dirblocks = [
1090
 
            (('', '.'),
1091
 
             [('0file', '0file', 'file'),
1092
 
              ('1dir', '1dir', 'directory'),
1093
 
              ('2file', '2file', 'file'),
1094
 
              ]
1095
 
             ),
1096
 
            (('1dir', './1dir'),
1097
 
             [('1dir/0file', '0file', 'file'),
1098
 
              ('1dir/1dir', '1dir', 'directory'),
1099
 
              ]
1100
 
             ),
1101
 
            (('1dir/1dir', './1dir/1dir'),
1102
 
             [
1103
 
                ]
1104
 
             ),
 
1035
                (('', '.'),
 
1036
                 [('0file', '0file', 'file'),
 
1037
                  ('1dir', '1dir', 'directory'),
 
1038
                  ('2file', '2file', 'file'),
 
1039
                 ]
 
1040
                ),
 
1041
                (('1dir', './1dir'),
 
1042
                 [('1dir/0file', '0file', 'file'),
 
1043
                  ('1dir/1dir', '1dir', 'directory'),
 
1044
                 ]
 
1045
                ),
 
1046
                (('1dir/1dir', './1dir/1dir'),
 
1047
                 [
 
1048
                 ]
 
1049
                ),
1105
1050
            ]
1106
1051
        result = []
1107
1052
        found_bzrdir = False
1127
1072
        if sys.platform == 'win32':
1128
1073
            raise tests.TestNotApplicable(
1129
1074
                "readdir IOError not tested on win32")
1130
 
        self.requireFeature(features.not_running_as_root)
1131
1075
        os.mkdir("test-unreadable")
1132
1076
        os.chmod("test-unreadable", 0000)
1133
1077
        # must chmod it back so that it can be removed
1134
 
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
 
1078
        self.addCleanup(os.chmod, "test-unreadable", 0700)
1135
1079
        # The error is not raised until the generator is actually evaluated.
1136
1080
        # (It would be ok if it happened earlier but at the moment it
1137
1081
        # doesn't.)
1138
1082
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1139
 
        self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
1140
 
        self.assertEqual(errno.EACCES, e.errno)
 
1083
        self.assertEquals('./test-unreadable', e.filename)
 
1084
        self.assertEquals(errno.EACCES, e.errno)
1141
1085
        # Ensure the message contains the file name
1142
 
        self.assertContainsRe(str(e), "\\./test-unreadable")
 
1086
        self.assertContainsRe(str(e), "\./test-unreadable")
 
1087
 
1143
1088
 
1144
1089
    def test_walkdirs_encoding_error(self):
1145
1090
        # <https://bugs.launchpad.net/bzr/+bug/488519>
1147
1092
        # are not using the filesystem's encoding
1148
1093
 
1149
1094
        # require a bytestring based filesystem
1150
 
        self.requireFeature(features.ByteStringNamedFilesystem)
 
1095
        self.requireFeature(tests.ByteStringNamedFilesystem)
1151
1096
 
1152
1097
        tree = [
1153
1098
            '.bzr',
1161
1106
        self.build_tree(tree)
1162
1107
 
1163
1108
        # rename the 1file to a latin-1 filename
1164
 
        os.rename(b"./1file", b"\xe8file")
1165
 
        if b"\xe8file" not in os.listdir("."):
1166
 
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
 
1109
        os.rename("./1file", "\xe8file")
1167
1110
 
1168
1111
        self._save_platform_info()
 
1112
        win32utils.winver = None # Avoid the win32 detection code
1169
1113
        osutils._fs_enc = 'UTF-8'
1170
1114
 
1171
1115
        # this should raise on error
1172
1116
        def attempt():
1173
 
            for dirdetail, dirblock in osutils.walkdirs(b'.'):
 
1117
            for dirdetail, dirblock in osutils.walkdirs('.'):
1174
1118
                pass
1175
1119
 
1176
1120
        self.assertRaises(errors.BadFilenameEncoding, attempt)
1186
1130
            ]
1187
1131
        self.build_tree(tree)
1188
1132
        expected_dirblocks = [
1189
 
            (('', '.'),
1190
 
             [('0file', '0file', 'file'),
1191
 
              ('1dir', '1dir', 'directory'),
1192
 
              ('2file', '2file', 'file'),
1193
 
              ]
1194
 
             ),
1195
 
            (('1dir', './1dir'),
1196
 
             [('1dir/0file', '0file', 'file'),
1197
 
              ('1dir/1dir', '1dir', 'directory'),
1198
 
              ]
1199
 
             ),
1200
 
            (('1dir/1dir', './1dir/1dir'),
1201
 
             [
1202
 
                ]
1203
 
             ),
 
1133
                (('', '.'),
 
1134
                 [('0file', '0file', 'file'),
 
1135
                  ('1dir', '1dir', 'directory'),
 
1136
                  ('2file', '2file', 'file'),
 
1137
                 ]
 
1138
                ),
 
1139
                (('1dir', './1dir'),
 
1140
                 [('1dir/0file', '0file', 'file'),
 
1141
                  ('1dir/1dir', '1dir', 'directory'),
 
1142
                 ]
 
1143
                ),
 
1144
                (('1dir/1dir', './1dir/1dir'),
 
1145
                 [
 
1146
                 ]
 
1147
                ),
1204
1148
            ]
1205
1149
        result = []
1206
1150
        found_bzrdir = False
1207
 
        for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
1208
 
            if len(dirblock) and dirblock[0][1] == b'.bzr':
 
1151
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1152
            if len(dirblock) and dirblock[0][1] == '.bzr':
1209
1153
                # this tests the filtering of selected paths
1210
1154
                found_bzrdir = True
1211
1155
                del dirblock[0]
1212
 
            dirdetail = (dirdetail[0].decode('utf-8'),
1213
 
                         osutils.safe_unicode(dirdetail[1]))
1214
 
            dirblock = [
1215
 
                (entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
1216
 
                for entry in dirblock]
1217
1156
            result.append((dirdetail, dirblock))
1218
1157
 
1219
1158
        self.assertTrue(found_bzrdir)
1235
1174
            dirblock[:] = new_dirblock
1236
1175
 
1237
1176
    def _save_platform_info(self):
 
1177
        self.overrideAttr(win32utils, 'winver')
1238
1178
        self.overrideAttr(osutils, '_fs_enc')
1239
1179
        self.overrideAttr(osutils, '_selected_dir_reader')
1240
1180
 
1241
 
    def assertDirReaderIs(self, expected, top):
 
1181
    def assertDirReaderIs(self, expected):
1242
1182
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
1243
1183
        # Force it to redetect
1244
1184
        osutils._selected_dir_reader = None
1245
1185
        # Nothing to list, but should still trigger the selection logic
1246
 
        self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
 
1186
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1247
1187
        self.assertIsInstance(osutils._selected_dir_reader, expected)
1248
1188
 
1249
1189
    def test_force_walkdirs_utf8_fs_utf8(self):
1250
1190
        self.requireFeature(UTF8DirReaderFeature)
1251
1191
        self._save_platform_info()
1252
 
        osutils._fs_enc = 'utf-8'
1253
 
        self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1192
        win32utils.winver = None # Avoid the win32 detection code
 
1193
        osutils._fs_enc = 'UTF-8'
 
1194
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1254
1195
 
1255
1196
    def test_force_walkdirs_utf8_fs_ascii(self):
1256
1197
        self.requireFeature(UTF8DirReaderFeature)
1257
1198
        self._save_platform_info()
1258
 
        osutils._fs_enc = 'ascii'
1259
 
        self.assertDirReaderIs(
1260
 
            UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1199
        win32utils.winver = None # Avoid the win32 detection code
 
1200
        osutils._fs_enc = 'US-ASCII'
 
1201
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1202
 
 
1203
    def test_force_walkdirs_utf8_fs_ANSI(self):
 
1204
        self.requireFeature(UTF8DirReaderFeature)
 
1205
        self._save_platform_info()
 
1206
        win32utils.winver = None # Avoid the win32 detection code
 
1207
        osutils._fs_enc = 'ANSI_X3.4-1968'
 
1208
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1261
1209
 
1262
1210
    def test_force_walkdirs_utf8_fs_latin1(self):
1263
1211
        self._save_platform_info()
1264
 
        osutils._fs_enc = 'iso-8859-1'
1265
 
        self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
 
1212
        win32utils.winver = None # Avoid the win32 detection code
 
1213
        osutils._fs_enc = 'latin1'
 
1214
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1266
1215
 
1267
1216
    def test_force_walkdirs_utf8_nt(self):
1268
1217
        # Disabled because the thunk of the whole walkdirs api is disabled.
1269
1218
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1270
1219
        self._save_platform_info()
1271
 
        from .._walkdirs_win32 import Win32ReadDir
1272
 
        self.assertDirReaderIs(Win32ReadDir, ".")
 
1220
        win32utils.winver = 'Windows NT'
 
1221
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1222
        self.assertDirReaderIs(Win32ReadDir)
 
1223
 
 
1224
    def test_force_walkdirs_utf8_98(self):
 
1225
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1226
        self._save_platform_info()
 
1227
        win32utils.winver = 'Windows 98'
 
1228
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1273
1229
 
1274
1230
    def test_unicode_walkdirs(self):
1275
1231
        """Walkdirs should always return unicode paths."""
1276
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1232
        self.requireFeature(tests.UnicodeFilenameFeature)
1277
1233
        name0 = u'0file-\xb6'
1278
1234
        name1 = u'1dir-\u062c\u0648'
1279
1235
        name2 = u'2file-\u0633'
1286
1242
            ]
1287
1243
        self.build_tree(tree)
1288
1244
        expected_dirblocks = [
1289
 
            ((u'', u'.'),
1290
 
             [(name0, name0, 'file', './' + name0),
1291
 
              (name1, name1, 'directory', './' + name1),
1292
 
              (name2, name2, 'file', './' + name2),
1293
 
              ]
1294
 
             ),
1295
 
            ((name1, './' + name1),
1296
 
             [(name1 + '/' + name0, name0, 'file', './' + name1
1297
 
               + '/' + name0),
1298
 
              (name1 + '/' + name1, name1, 'directory', './' + name1
1299
 
               + '/' + name1),
1300
 
              ]
1301
 
             ),
1302
 
            ((name1 + '/' + name1, './' + name1 + '/' + name1),
1303
 
             [
1304
 
                ]
1305
 
             ),
 
1245
                ((u'', u'.'),
 
1246
                 [(name0, name0, 'file', './' + name0),
 
1247
                  (name1, name1, 'directory', './' + name1),
 
1248
                  (name2, name2, 'file', './' + name2),
 
1249
                 ]
 
1250
                ),
 
1251
                ((name1, './' + name1),
 
1252
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1253
                                                        + '/' + name0),
 
1254
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1255
                                                            + '/' + name1),
 
1256
                 ]
 
1257
                ),
 
1258
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1259
                 [
 
1260
                 ]
 
1261
                ),
1306
1262
            ]
1307
1263
        result = list(osutils.walkdirs('.'))
1308
1264
        self._filter_out_stat(result)
1309
1265
        self.assertEqual(expected_dirblocks, result)
1310
 
        result = list(osutils.walkdirs(u'./' + name1, name1))
 
1266
        result = list(osutils.walkdirs(u'./'+name1, name1))
1311
1267
        self._filter_out_stat(result)
1312
1268
        self.assertEqual(expected_dirblocks[1:], result)
1313
1269
 
1316
1272
 
1317
1273
        The abspath portion might be in unicode or utf-8
1318
1274
        """
1319
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1275
        self.requireFeature(tests.UnicodeFilenameFeature)
1320
1276
        name0 = u'0file-\xb6'
1321
1277
        name1 = u'1dir-\u062c\u0648'
1322
1278
        name2 = u'2file-\u0633'
1333
1289
        name2 = name2.encode('utf8')
1334
1290
 
1335
1291
        expected_dirblocks = [
1336
 
            ((b'', b'.'),
1337
 
             [(name0, name0, 'file', b'./' + name0),
1338
 
              (name1, name1, 'directory', b'./' + name1),
1339
 
              (name2, name2, 'file', b'./' + name2),
1340
 
              ]
1341
 
             ),
1342
 
            ((name1, b'./' + name1),
1343
 
             [(name1 + b'/' + name0, name0, 'file', b'./' + name1
1344
 
               + b'/' + name0),
1345
 
              (name1 + b'/' + name1, name1, 'directory', b'./' + name1
1346
 
               + b'/' + name1),
1347
 
              ]
1348
 
             ),
1349
 
            ((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
1350
 
             [
1351
 
                ]
1352
 
             ),
 
1292
                (('', '.'),
 
1293
                 [(name0, name0, 'file', './' + name0),
 
1294
                  (name1, name1, 'directory', './' + name1),
 
1295
                  (name2, name2, 'file', './' + name2),
 
1296
                 ]
 
1297
                ),
 
1298
                ((name1, './' + name1),
 
1299
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1300
                                                        + '/' + name0),
 
1301
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1302
                                                            + '/' + name1),
 
1303
                 ]
 
1304
                ),
 
1305
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1306
                 [
 
1307
                 ]
 
1308
                ),
1353
1309
            ]
1354
1310
        result = []
1355
1311
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1356
1312
        # all abspaths are Unicode, and encode them back into utf8.
1357
1313
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1358
 
            self.assertIsInstance(dirdetail[0], bytes)
1359
 
            if isinstance(dirdetail[1], str):
 
1314
            self.assertIsInstance(dirdetail[0], str)
 
1315
            if isinstance(dirdetail[1], unicode):
1360
1316
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1361
1317
                dirblock = [list(info) for info in dirblock]
1362
1318
                for info in dirblock:
1363
 
                    self.assertIsInstance(info[4], str)
 
1319
                    self.assertIsInstance(info[4], unicode)
1364
1320
                    info[4] = info[4].encode('utf8')
1365
1321
            new_dirblock = []
1366
1322
            for info in dirblock:
1367
 
                self.assertIsInstance(info[0], bytes)
1368
 
                self.assertIsInstance(info[1], bytes)
1369
 
                self.assertIsInstance(info[4], bytes)
 
1323
                self.assertIsInstance(info[0], str)
 
1324
                self.assertIsInstance(info[1], str)
 
1325
                self.assertIsInstance(info[4], str)
1370
1326
                # Remove the stat information
1371
1327
                new_dirblock.append((info[0], info[1], info[2], info[4]))
1372
1328
            result.append((dirdetail, new_dirblock))
1377
1333
 
1378
1334
        The abspath portion should be in unicode
1379
1335
        """
1380
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1336
        self.requireFeature(tests.UnicodeFilenameFeature)
1381
1337
        # Use the unicode reader. TODO: split into driver-and-driven unit
1382
1338
        # tests.
1383
1339
        self._save_platform_info()
1400
1356
        # All of the abspaths should be in unicode, all of the relative paths
1401
1357
        # should be in utf8
1402
1358
        expected_dirblocks = [
1403
 
            ((b'', '.'),
1404
 
             [(name0, name0, 'file', './' + name0u),
1405
 
              (name1, name1, 'directory', './' + name1u),
1406
 
              (name2, name2, 'file', './' + name2u),
1407
 
              ]
1408
 
             ),
1409
 
            ((name1, './' + name1u),
1410
 
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
1411
 
               + '/' + name0u),
1412
 
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
1413
 
               + '/' + name1u),
1414
 
              ]
1415
 
             ),
1416
 
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1417
 
             [
1418
 
                ]
1419
 
             ),
 
1359
                (('', '.'),
 
1360
                 [(name0, name0, 'file', './' + name0u),
 
1361
                  (name1, name1, 'directory', './' + name1u),
 
1362
                  (name2, name2, 'file', './' + name2u),
 
1363
                 ]
 
1364
                ),
 
1365
                ((name1, './' + name1u),
 
1366
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1367
                                                        + '/' + name0u),
 
1368
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1369
                                                            + '/' + name1u),
 
1370
                 ]
 
1371
                ),
 
1372
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1373
                 [
 
1374
                 ]
 
1375
                ),
1420
1376
            ]
1421
1377
        result = list(osutils._walkdirs_utf8('.'))
1422
1378
        self._filter_out_stat(result)
1424
1380
 
1425
1381
    def test__walkdirs_utf8_win32readdir(self):
1426
1382
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1427
 
        self.requireFeature(features.UnicodeFilenameFeature)
1428
 
        from .._walkdirs_win32 import Win32ReadDir
 
1383
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1384
        from bzrlib._walkdirs_win32 import Win32ReadDir
1429
1385
        self._save_platform_info()
1430
1386
        osutils._selected_dir_reader = Win32ReadDir()
1431
1387
        name0u = u'0file-\xb6'
1446
1402
        # All of the abspaths should be in unicode, all of the relative paths
1447
1403
        # should be in utf8
1448
1404
        expected_dirblocks = [
1449
 
            (('', '.'),
1450
 
             [(name0, name0, 'file', './' + name0u),
1451
 
              (name1, name1, 'directory', './' + name1u),
1452
 
              (name2, name2, 'file', './' + name2u),
1453
 
              ]
1454
 
             ),
1455
 
            ((name1, './' + name1u),
1456
 
             [(name1 + '/' + name0, name0, 'file', './' + name1u
1457
 
               + '/' + name0u),
1458
 
              (name1 + '/' + name1, name1, 'directory', './' + name1u
1459
 
               + '/' + name1u),
1460
 
              ]
1461
 
             ),
1462
 
            ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1463
 
             [
1464
 
                ]
1465
 
             ),
 
1405
                (('', '.'),
 
1406
                 [(name0, name0, 'file', './' + name0u),
 
1407
                  (name1, name1, 'directory', './' + name1u),
 
1408
                  (name2, name2, 'file', './' + name2u),
 
1409
                 ]
 
1410
                ),
 
1411
                ((name1, './' + name1u),
 
1412
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1413
                                                        + '/' + name0u),
 
1414
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1415
                                                            + '/' + name1u),
 
1416
                 ]
 
1417
                ),
 
1418
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1419
                 [
 
1420
                 ]
 
1421
                ),
1466
1422
            ]
1467
1423
        result = list(osutils._walkdirs_utf8(u'.'))
1468
1424
        self._filter_out_stat(result)
1481
1437
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1482
1438
        """make sure our Stat values are valid"""
1483
1439
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1484
 
        self.requireFeature(features.UnicodeFilenameFeature)
1485
 
        from .._walkdirs_win32 import Win32ReadDir
 
1440
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1441
        from bzrlib._walkdirs_win32 import Win32ReadDir
1486
1442
        name0u = u'0file-\xb6'
1487
1443
        name0 = name0u.encode('utf8')
1488
1444
        self.build_tree([name0u])
1489
1445
        # I hate to sleep() here, but I'm trying to make the ctime different
1490
1446
        # from the mtime
1491
1447
        time.sleep(2)
1492
 
        with open(name0u, 'ab') as f:
1493
 
            f.write(b'just a small update')
 
1448
        f = open(name0u, 'ab')
 
1449
        try:
 
1450
            f.write('just a small update')
 
1451
        finally:
 
1452
            f.close()
1494
1453
 
1495
1454
        result = Win32ReadDir().read_dir('', u'.')
1496
1455
        entry = result[0]
1502
1461
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1503
1462
        """make sure our Stat values are valid"""
1504
1463
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1505
 
        self.requireFeature(features.UnicodeFilenameFeature)
1506
 
        from .._walkdirs_win32 import Win32ReadDir
 
1464
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1465
        from bzrlib._walkdirs_win32 import Win32ReadDir
1507
1466
        name0u = u'0dir-\u062c\u0648'
1508
1467
        name0 = name0u.encode('utf8')
1509
1468
        self.build_tree([name0u + '/'])
1589
1548
        # using the comparison routine shoudl work too:
1590
1549
        self.assertEqual(
1591
1550
            dir_sorted_paths,
1592
 
            sorted(original_paths, key=osutils.path_prefix_key))
 
1551
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1593
1552
 
1594
1553
 
1595
1554
class TestCopyTree(tests.TestCaseInTempDir):
1608
1567
        self.assertEqual(['c'], os.listdir('target/b'))
1609
1568
 
1610
1569
    def test_copy_tree_symlinks(self):
1611
 
        self.requireFeature(features.SymlinkFeature)
 
1570
        self.requireFeature(tests.SymlinkFeature)
1612
1571
        self.build_tree(['source/'])
1613
1572
        os.symlink('a/generic/path', 'source/lnk')
1614
1573
        osutils.copy_tree('source', 'target')
1618
1577
    def test_copy_tree_handlers(self):
1619
1578
        processed_files = []
1620
1579
        processed_links = []
1621
 
 
1622
1580
        def file_handler(from_path, to_path):
1623
1581
            processed_files.append(('f', from_path, to_path))
1624
 
 
1625
1582
        def dir_handler(from_path, to_path):
1626
1583
            processed_files.append(('d', from_path, to_path))
1627
 
 
1628
1584
        def link_handler(from_path, to_path):
1629
1585
            processed_links.append((from_path, to_path))
1630
 
        handlers = {'file': file_handler,
1631
 
                    'directory': dir_handler,
1632
 
                    'symlink': link_handler,
1633
 
                    }
 
1586
        handlers = {'file':file_handler,
 
1587
                    'directory':dir_handler,
 
1588
                    'symlink':link_handler,
 
1589
                   }
1634
1590
 
1635
1591
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1636
1592
        if osutils.has_symlinks():
1641
1597
                          ('f', 'source/a', 'target/a'),
1642
1598
                          ('d', 'source/b', 'target/b'),
1643
1599
                          ('f', 'source/b/c', 'target/b/c'),
1644
 
                          ], processed_files)
1645
 
        self.assertPathDoesNotExist('target')
 
1600
                         ], processed_files)
 
1601
        self.failIfExists('target')
1646
1602
        if osutils.has_symlinks():
1647
1603
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1648
1604
 
1653
1609
    def setUp(self):
1654
1610
        super(TestSetUnsetEnv, self).setUp()
1655
1611
 
1656
 
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
 
1612
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1657
1613
                         'Environment was not cleaned up properly.'
1658
 
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
1659
 
 
 
1614
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
1660
1615
        def cleanup():
1661
 
            if 'BRZ_TEST_ENV_VAR' in os.environ:
1662
 
                del os.environ['BRZ_TEST_ENV_VAR']
 
1616
            if 'BZR_TEST_ENV_VAR' in os.environ:
 
1617
                del os.environ['BZR_TEST_ENV_VAR']
1663
1618
        self.addCleanup(cleanup)
1664
1619
 
1665
1620
    def test_set(self):
1666
1621
        """Test that we can set an env variable"""
1667
 
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1622
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1668
1623
        self.assertEqual(None, old)
1669
 
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1624
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1670
1625
 
1671
1626
    def test_double_set(self):
1672
1627
        """Test that we get the old value out"""
1673
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1674
 
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
 
1628
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1629
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1675
1630
        self.assertEqual('foo', old)
1676
 
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1631
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1677
1632
 
1678
1633
    def test_unicode(self):
1679
1634
        """Environment can only contain plain strings
1686
1641
                'Cannot find a unicode character that works in encoding %s'
1687
1642
                % (osutils.get_user_encoding(),))
1688
1643
 
1689
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
1690
 
        self.assertEqual(uni_val, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1644
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
 
1645
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1691
1646
 
1692
1647
    def test_unset(self):
1693
1648
        """Test that passing None will remove the env var"""
1694
 
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1695
 
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
 
1649
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1650
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1696
1651
        self.assertEqual('foo', old)
1697
 
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
1698
 
        self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
 
1652
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
 
1653
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1699
1654
 
1700
1655
 
1701
1656
class TestSizeShaFile(tests.TestCaseInTempDir):
1702
1657
 
1703
1658
    def test_sha_empty(self):
1704
 
        self.build_tree_contents([('foo', b'')])
1705
 
        expected_sha = osutils.sha_string(b'')
 
1659
        self.build_tree_contents([('foo', '')])
 
1660
        expected_sha = osutils.sha_string('')
1706
1661
        f = open('foo')
1707
1662
        self.addCleanup(f.close)
1708
1663
        size, sha = osutils.size_sha_file(f)
1710
1665
        self.assertEqual(expected_sha, sha)
1711
1666
 
1712
1667
    def test_sha_mixed_endings(self):
1713
 
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1668
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1714
1669
        self.build_tree_contents([('foo', text)])
1715
1670
        expected_sha = osutils.sha_string(text)
1716
1671
        f = open('foo', 'rb')
1723
1678
class TestShaFileByName(tests.TestCaseInTempDir):
1724
1679
 
1725
1680
    def test_sha_empty(self):
1726
 
        self.build_tree_contents([('foo', b'')])
1727
 
        expected_sha = osutils.sha_string(b'')
 
1681
        self.build_tree_contents([('foo', '')])
 
1682
        expected_sha = osutils.sha_string('')
1728
1683
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1729
1684
 
1730
1685
    def test_sha_mixed_endings(self):
1731
 
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1686
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1732
1687
        self.build_tree_contents([('foo', text)])
1733
1688
        expected_sha = osutils.sha_string(text)
1734
1689
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1737
1692
class TestResourceLoading(tests.TestCaseInTempDir):
1738
1693
 
1739
1694
    def test_resource_string(self):
1740
 
        # test resource in breezy
1741
 
        text = osutils.resource_string('breezy', 'debug.py')
 
1695
        # test resource in bzrlib
 
1696
        text = osutils.resource_string('bzrlib', 'debug.py')
1742
1697
        self.assertContainsRe(text, "debug_flags = set()")
1743
 
        # test resource under breezy
1744
 
        text = osutils.resource_string('breezy.ui', 'text.py')
 
1698
        # test resource under bzrlib
 
1699
        text = osutils.resource_string('bzrlib.ui', 'text.py')
1745
1700
        self.assertContainsRe(text, "class TextUIFactory")
1746
1701
        # test unsupported package
1747
1702
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1748
 
                          'yyy.xx')
 
1703
            'yyy.xx')
1749
1704
        # test unknown resource
1750
 
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
 
1705
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
 
1706
 
 
1707
 
 
1708
class TestReCompile(tests.TestCase):
 
1709
 
 
1710
    def _deprecated_re_compile_checked(self, *args, **kwargs):
 
1711
        return self.applyDeprecated(symbol_versioning.deprecated_in((2, 2, 0)),
 
1712
            osutils.re_compile_checked, *args, **kwargs)
 
1713
 
 
1714
    def test_re_compile_checked(self):
 
1715
        r = self._deprecated_re_compile_checked(r'A*', re.IGNORECASE)
 
1716
        self.assertTrue(r.match('aaaa'))
 
1717
        self.assertTrue(r.match('aAaA'))
 
1718
 
 
1719
    def test_re_compile_checked_error(self):
 
1720
        # like https://bugs.launchpad.net/bzr/+bug/251352
 
1721
 
 
1722
        # Due to possible test isolation error, re.compile is not lazy at
 
1723
        # this point. We re-install lazy compile.
 
1724
        lazy_regex.install_lazy_compile()
 
1725
        err = self.assertRaises(
 
1726
            errors.BzrCommandError,
 
1727
            self._deprecated_re_compile_checked, '*', re.IGNORECASE, 'test case')
 
1728
        self.assertEqual(
 
1729
            'Invalid regular expression in test case: '
 
1730
            '"*" nothing to repeat',
 
1731
            str(err))
1751
1732
 
1752
1733
 
1753
1734
class TestDirReader(tests.TestCaseInTempDir):
1754
1735
 
1755
 
    scenarios = dir_reader_scenarios()
1756
 
 
1757
1736
    # Set by load_tests
1758
1737
    _dir_reader_class = None
1759
1738
    _native_to_unicode = None
1760
1739
 
1761
1740
    def setUp(self):
1762
 
        super(TestDirReader, self).setUp()
 
1741
        tests.TestCaseInTempDir.setUp(self)
1763
1742
        self.overrideAttr(osutils,
1764
1743
                          '_selected_dir_reader', self._dir_reader_class())
1765
1744
 
1772
1751
            '2file'
1773
1752
            ]
1774
1753
        expected_dirblocks = [
1775
 
            ((b'', '.'),
1776
 
             [(b'0file', b'0file', 'file', './0file'),
1777
 
              (b'1dir', b'1dir', 'directory', './1dir'),
1778
 
              (b'2file', b'2file', 'file', './2file'),
1779
 
              ]
1780
 
             ),
1781
 
            ((b'1dir', './1dir'),
1782
 
             [(b'1dir/0file', b'0file', 'file', './1dir/0file'),
1783
 
              (b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
1784
 
              ]
1785
 
             ),
1786
 
            ((b'1dir/1dir', './1dir/1dir'),
1787
 
             [
1788
 
                ]
1789
 
             ),
 
1754
                (('', '.'),
 
1755
                 [('0file', '0file', 'file'),
 
1756
                  ('1dir', '1dir', 'directory'),
 
1757
                  ('2file', '2file', 'file'),
 
1758
                 ]
 
1759
                ),
 
1760
                (('1dir', './1dir'),
 
1761
                 [('1dir/0file', '0file', 'file'),
 
1762
                  ('1dir/1dir', '1dir', 'directory'),
 
1763
                 ]
 
1764
                ),
 
1765
                (('1dir/1dir', './1dir/1dir'),
 
1766
                 [
 
1767
                 ]
 
1768
                ),
1790
1769
            ]
1791
1770
        return tree, expected_dirblocks
1792
1771
 
1796
1775
        result = list(osutils._walkdirs_utf8('.'))
1797
1776
        # Filter out stat and abspath
1798
1777
        self.assertEqual(expected_dirblocks,
1799
 
                         self._filter_out(result))
 
1778
                         [(dirinfo, [line[0:3] for line in block])
 
1779
                          for dirinfo, block in result])
1800
1780
 
1801
1781
    def test_walk_sub_dir(self):
1802
1782
        tree, expected_dirblocks = self._get_ascii_tree()
1803
1783
        self.build_tree(tree)
1804
1784
        # you can search a subdir only, with a supplied prefix.
1805
 
        result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
 
1785
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1806
1786
        # Filter out stat and abspath
1807
1787
        self.assertEqual(expected_dirblocks[1:],
1808
 
                         self._filter_out(result))
 
1788
                         [(dirinfo, [line[0:3] for line in block])
 
1789
                          for dirinfo, block in result])
1809
1790
 
1810
1791
    def _get_unicode_tree(self):
1811
1792
        name0u = u'0file-\xb6'
1822
1803
        name1 = name1u.encode('UTF-8')
1823
1804
        name2 = name2u.encode('UTF-8')
1824
1805
        expected_dirblocks = [
1825
 
            ((b'', '.'),
1826
 
             [(name0, name0, 'file', './' + name0u),
1827
 
              (name1, name1, 'directory', './' + name1u),
1828
 
              (name2, name2, 'file', './' + name2u),
1829
 
              ]
1830
 
             ),
1831
 
            ((name1, './' + name1u),
1832
 
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
1833
 
               + '/' + name0u),
1834
 
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
1835
 
               + '/' + name1u),
1836
 
              ]
1837
 
             ),
1838
 
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
1839
 
             [
1840
 
                ]
1841
 
             ),
 
1806
                (('', '.'),
 
1807
                 [(name0, name0, 'file', './' + name0u),
 
1808
                  (name1, name1, 'directory', './' + name1u),
 
1809
                  (name2, name2, 'file', './' + name2u),
 
1810
                 ]
 
1811
                ),
 
1812
                ((name1, './' + name1u),
 
1813
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1814
                                                        + '/' + name0u),
 
1815
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1816
                                                            + '/' + name1u),
 
1817
                 ]
 
1818
                ),
 
1819
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1820
                 [
 
1821
                 ]
 
1822
                ),
1842
1823
            ]
1843
1824
        return tree, expected_dirblocks
1844
1825
 
1852
1833
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1853
1834
            details = []
1854
1835
            for line in block:
1855
 
                details.append(
1856
 
                    line[0:3] + (self._native_to_unicode(line[4]), ))
 
1836
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1857
1837
            filtered_dirblocks.append((dirinfo, details))
1858
1838
        return filtered_dirblocks
1859
1839
 
1860
1840
    def test_walk_unicode_tree(self):
1861
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1841
        self.requireFeature(tests.UnicodeFilenameFeature)
1862
1842
        tree, expected_dirblocks = self._get_unicode_tree()
1863
1843
        self.build_tree(tree)
1864
1844
        result = list(osutils._walkdirs_utf8('.'))
1865
1845
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1866
1846
 
1867
1847
    def test_symlink(self):
1868
 
        self.requireFeature(features.SymlinkFeature)
1869
 
        self.requireFeature(features.UnicodeFilenameFeature)
 
1848
        self.requireFeature(tests.SymlinkFeature)
 
1849
        self.requireFeature(tests.UnicodeFilenameFeature)
1870
1850
        target = u'target\N{Euro Sign}'
1871
1851
        link_name = u'l\N{Euro Sign}nk'
1872
1852
        os.symlink(target, link_name)
 
1853
        target_utf8 = target.encode('UTF-8')
1873
1854
        link_name_utf8 = link_name.encode('UTF-8')
1874
1855
        expected_dirblocks = [
1875
 
            ((b'', '.'),
1876
 
             [(link_name_utf8, link_name_utf8,
1877
 
               'symlink', './' + link_name), ],
1878
 
             )]
 
1856
                (('', '.'),
 
1857
                 [(link_name_utf8, link_name_utf8,
 
1858
                   'symlink', './' + link_name),],
 
1859
                 )]
1879
1860
        result = list(osutils._walkdirs_utf8('.'))
1880
1861
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1881
1862
 
1889
1870
    But prior python versions failed to properly encode the passed unicode
1890
1871
    string.
1891
1872
    """
1892
 
    _test_needs_features = [features.SymlinkFeature,
1893
 
                            features.UnicodeFilenameFeature]
 
1873
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1894
1874
 
1895
1875
    def setUp(self):
1896
1876
        super(tests.TestCaseInTempDir, self).setUp()
1899
1879
        os.symlink(self.target, self.link)
1900
1880
 
1901
1881
    def test_os_readlink_link_encoding(self):
1902
 
        self.assertEqual(self.target, os.readlink(self.link))
 
1882
        if sys.version_info < (2, 6):
 
1883
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
 
1884
        else:
 
1885
            self.assertEquals(self.target,  os.readlink(self.link))
1903
1886
 
1904
1887
    def test_os_readlink_link_decoding(self):
1905
 
        self.assertEqual(self.target.encode(osutils._fs_enc),
1906
 
                         os.readlink(self.link.encode(osutils._fs_enc)))
 
1888
        self.assertEquals(self.target.encode(osutils._fs_enc),
 
1889
                          os.readlink(self.link.encode(osutils._fs_enc)))
1907
1890
 
1908
1891
 
1909
1892
class TestConcurrency(tests.TestCase):
1917
1900
        self.assertIsInstance(concurrency, int)
1918
1901
 
1919
1902
    def test_local_concurrency_environment_variable(self):
1920
 
        self.overrideEnv('BRZ_CONCURRENCY', '2')
 
1903
        os.environ['BZR_CONCURRENCY'] = '2'
1921
1904
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1922
 
        self.overrideEnv('BRZ_CONCURRENCY', '3')
 
1905
        os.environ['BZR_CONCURRENCY'] = '3'
1923
1906
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1924
 
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
 
1907
        os.environ['BZR_CONCURRENCY'] = 'foo'
1925
1908
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1926
1909
 
1927
1910
    def test_option_concurrency(self):
1928
 
        self.overrideEnv('BRZ_CONCURRENCY', '1')
 
1911
        os.environ['BZR_CONCURRENCY'] = '1'
1929
1912
        self.run_bzr('rocks --concurrency 42')
1930
 
        # Command line overrides environment variable
1931
 
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
1932
 
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
 
1913
        # Command line overrides envrionment variable
 
1914
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
 
1915
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1933
1916
 
1934
1917
 
1935
1918
class TestFailedToLoadExtension(tests.TestCase):
1936
1919
 
1937
1920
    def _try_loading(self):
1938
1921
        try:
1939
 
            import breezy._fictional_extension_py  # noqa: F401
1940
 
        except ImportError as e:
 
1922
            import bzrlib._fictional_extension_py
 
1923
        except ImportError, e:
1941
1924
            osutils.failed_to_load_extension(e)
1942
1925
            return True
1943
1926
 
1948
1931
    def test_failure_to_load(self):
1949
1932
        self._try_loading()
1950
1933
        self.assertLength(1, osutils._extension_load_failures)
1951
 
        self.assertEqual(
1952
 
            osutils._extension_load_failures[0],
1953
 
            "No module named 'breezy._fictional_extension_py'")
 
1934
        self.assertEquals(osutils._extension_load_failures[0],
 
1935
            "No module named _fictional_extension_py")
1954
1936
 
1955
1937
    def test_report_extension_load_failures_no_warning(self):
1956
1938
        self.assertTrue(self._try_loading())
1957
 
        warnings, result = self.callCatchWarnings(
1958
 
            osutils.report_extension_load_failures)
 
1939
        warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1959
1940
        # it used to give a Python warning; it no longer does
1960
1941
        self.assertLength(0, warnings)
1961
1942
 
1962
1943
    def test_report_extension_load_failures_message(self):
1963
 
        log = BytesIO()
 
1944
        log = StringIO()
1964
1945
        trace.push_log_file(log)
1965
1946
        self.assertTrue(self._try_loading())
1966
1947
        osutils.report_extension_load_failures()
1967
1948
        self.assertContainsRe(
1968
1949
            log.getvalue(),
1969
 
            br"brz: warning: some compiled extensions could not be loaded; "
1970
 
            b"see ``brz help missing-extensions``\n"
 
1950
            r"bzr: warning: some compiled extensions could not be loaded; "
 
1951
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
1971
1952
            )
1972
1953
 
1973
1954
 
1974
1955
class TestTerminalWidth(tests.TestCase):
1975
1956
 
1976
1957
    def setUp(self):
1977
 
        super(TestTerminalWidth, self).setUp()
 
1958
        tests.TestCase.setUp(self)
1978
1959
        self._orig_terminal_size_state = osutils._terminal_size_state
1979
1960
        self._orig_first_terminal_size = osutils._first_terminal_size
1980
1961
        self.addCleanup(self.restore_osutils_globals)
2002
1983
    def test_default_values(self):
2003
1984
        self.assertEqual(80, osutils.default_terminal_width)
2004
1985
 
2005
 
    def test_defaults_to_BRZ_COLUMNS(self):
2006
 
        # BRZ_COLUMNS is set by the test framework
2007
 
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
2008
 
        self.overrideEnv('BRZ_COLUMNS', '12')
 
1986
    def test_defaults_to_BZR_COLUMNS(self):
 
1987
        # BZR_COLUMNS is set by the test framework
 
1988
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
 
1989
        os.environ['BZR_COLUMNS'] = '12'
2009
1990
        self.assertEqual(12, osutils.terminal_width())
2010
1991
 
2011
 
    def test_BRZ_COLUMNS_0_no_limit(self):
2012
 
        self.overrideEnv('BRZ_COLUMNS', '0')
2013
 
        self.assertEqual(None, osutils.terminal_width())
2014
 
 
2015
1992
    def test_falls_back_to_COLUMNS(self):
2016
 
        self.overrideEnv('BRZ_COLUMNS', None)
 
1993
        del os.environ['BZR_COLUMNS']
2017
1994
        self.assertNotEqual('42', os.environ['COLUMNS'])
2018
1995
        self.set_fake_tty()
2019
 
        self.overrideEnv('COLUMNS', '42')
 
1996
        os.environ['COLUMNS'] = '42'
2020
1997
        self.assertEqual(42, osutils.terminal_width())
2021
1998
 
2022
1999
    def test_tty_default_without_columns(self):
2023
 
        self.overrideEnv('BRZ_COLUMNS', None)
2024
 
        self.overrideEnv('COLUMNS', None)
 
2000
        del os.environ['BZR_COLUMNS']
 
2001
        del os.environ['COLUMNS']
2025
2002
 
2026
2003
        def terminal_size(w, h):
2027
2004
            return 42, 42
2034
2011
        self.assertEqual(42, osutils.terminal_width())
2035
2012
 
2036
2013
    def test_non_tty_default_without_columns(self):
2037
 
        self.overrideEnv('BRZ_COLUMNS', None)
2038
 
        self.overrideEnv('COLUMNS', None)
 
2014
        del os.environ['BZR_COLUMNS']
 
2015
        del os.environ['COLUMNS']
2039
2016
        self.replace_stdout(None)
2040
2017
        self.assertEqual(None, osutils.terminal_width())
2041
2018
 
2044
2021
        termios = term_ios_feature.module
2045
2022
        # bug 63539 is about a termios without TIOCGWINSZ attribute
2046
2023
        try:
2047
 
            termios.TIOCGWINSZ
 
2024
            orig = termios.TIOCGWINSZ
2048
2025
        except AttributeError:
2049
2026
            # We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
2050
2027
            pass
2051
2028
        else:
2052
2029
            self.overrideAttr(termios, 'TIOCGWINSZ')
2053
2030
            del termios.TIOCGWINSZ
2054
 
        self.overrideEnv('BRZ_COLUMNS', None)
2055
 
        self.overrideEnv('COLUMNS', None)
 
2031
        del os.environ['BZR_COLUMNS']
 
2032
        del os.environ['COLUMNS']
2056
2033
        # Whatever the result is, if we don't raise an exception, it's ok.
2057
2034
        osutils.terminal_width()
2058
2035
 
2059
 
 
2060
2036
class TestCreationOps(tests.TestCaseInTempDir):
2061
2037
    _test_needs_features = [features.chown_feature]
2062
2038
 
2063
2039
    def setUp(self):
2064
 
        super(TestCreationOps, self).setUp()
 
2040
        tests.TestCaseInTempDir.setUp(self)
2065
2041
        self.overrideAttr(os, 'chown', self._dummy_chown)
2066
2042
 
2067
2043
        # params set by call to _dummy_chown
2073
2049
    def test_copy_ownership_from_path(self):
2074
2050
        """copy_ownership_from_path test with specified src."""
2075
2051
        ownsrc = '/'
2076
 
        open('test_file', 'wt').close()
 
2052
        f = open('test_file', 'wt')
2077
2053
        osutils.copy_ownership_from_path('test_file', ownsrc)
2078
2054
 
2079
2055
        s = os.stat(ownsrc)
2080
 
        self.assertEqual(self.path, 'test_file')
2081
 
        self.assertEqual(self.uid, s.st_uid)
2082
 
        self.assertEqual(self.gid, s.st_gid)
 
2056
        self.assertEquals(self.path, 'test_file')
 
2057
        self.assertEquals(self.uid, s.st_uid)
 
2058
        self.assertEquals(self.gid, s.st_gid)
2083
2059
 
2084
2060
    def test_copy_ownership_nonesrc(self):
2085
2061
        """copy_ownership_from_path test with src=None."""
2086
 
        open('test_file', 'wt').close()
 
2062
        f = open('test_file', 'wt')
2087
2063
        # should use parent dir for permissions
2088
2064
        osutils.copy_ownership_from_path('test_file')
2089
2065
 
2090
2066
        s = os.stat('..')
2091
 
        self.assertEqual(self.path, 'test_file')
2092
 
        self.assertEqual(self.uid, s.st_uid)
2093
 
        self.assertEqual(self.gid, s.st_gid)
2094
 
 
2095
 
 
2096
 
class TestGetHomeDir(tests.TestCase):
2097
 
 
2098
 
    def test_is_unicode(self):
2099
 
        home = osutils._get_home_dir()
2100
 
        self.assertIsInstance(home, str)
2101
 
 
2102
 
    def test_posix_homeless(self):
2103
 
        self.overrideEnv('HOME', None)
2104
 
        home = osutils._get_home_dir()
2105
 
        self.assertIsInstance(home, str)
2106
 
 
2107
 
    def test_posix_home_ascii(self):
2108
 
        self.overrideEnv('HOME', '/home/test')
2109
 
        home = osutils._posix_get_home_dir()
2110
 
        self.assertIsInstance(home, str)
2111
 
        self.assertEqual(u'/home/test', home)
2112
 
 
2113
 
    def test_posix_home_unicode(self):
2114
 
        self.requireFeature(features.ByteStringNamedFilesystem)
2115
 
        self.overrideEnv('HOME', '/home/\xa7test')
2116
 
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2117
 
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2118
 
        osutils._fs_enc = "iso8859-5"
2119
 
        # In python 3, os.environ returns unicode
2120
 
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2121
 
 
 
2067
        self.assertEquals(self.path, 'test_file')
 
2068
        self.assertEquals(self.uid, s.st_uid)
 
2069
        self.assertEquals(self.gid, s.st_gid)
2122
2070
 
2123
2071
class TestGetuserUnicode(tests.TestCase):
2124
2072
 
2125
 
    def test_is_unicode(self):
2126
 
        user = osutils.getuser_unicode()
2127
 
        self.assertIsInstance(user, str)
2128
 
 
2129
 
    def envvar_to_override(self):
2130
 
        if sys.platform == "win32":
2131
 
            # Disable use of platform calls on windows so envvar is used
2132
 
            self.overrideAttr(win32utils, 'has_ctypes', False)
2133
 
            return 'USERNAME'  # only variable used on windows
2134
 
        return 'LOGNAME'  # first variable checked by getpass.getuser()
2135
 
 
2136
2073
    def test_ascii_user(self):
2137
 
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
 
2074
        osutils.set_or_unset_env('LOGNAME', 'jrandom')
2138
2075
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
2139
2076
 
2140
2077
    def test_unicode_user(self):
2141
2078
        ue = osutils.get_user_encoding()
2142
 
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
2143
 
        if uni_val is None:
2144
 
            raise tests.TestSkipped(
2145
 
                'Cannot find a unicode character that works in encoding %s'
2146
 
                % (osutils.get_user_encoding(),))
2147
 
        uni_username = u'jrandom' + uni_val
2148
 
        encoded_username = uni_username.encode(ue)
2149
 
        self.overrideEnv(self.envvar_to_override(), uni_username)
2150
 
        self.assertEqual(uni_username, osutils.getuser_unicode())
2151
 
 
2152
 
 
2153
 
class TestBackupNames(tests.TestCase):
2154
 
 
2155
 
    def setUp(self):
2156
 
        super(TestBackupNames, self).setUp()
2157
 
        self.backups = []
2158
 
 
2159
 
    def backup_exists(self, name):
2160
 
        return name in self.backups
2161
 
 
2162
 
    def available_backup_name(self, name):
2163
 
        backup_name = osutils.available_backup_name(name, self.backup_exists)
2164
 
        self.backups.append(backup_name)
2165
 
        return backup_name
2166
 
 
2167
 
    def assertBackupName(self, expected, name):
2168
 
        self.assertEqual(expected, self.available_backup_name(name))
2169
 
 
2170
 
    def test_empty(self):
2171
 
        self.assertBackupName('file.~1~', 'file')
2172
 
 
2173
 
    def test_existing(self):
2174
 
        self.available_backup_name('file')
2175
 
        self.available_backup_name('file')
2176
 
        self.assertBackupName('file.~3~', 'file')
2177
 
        # Empty slots are found, this is not a strict requirement and may be
2178
 
        # revisited if we test against all implementations.
2179
 
        self.backups.remove('file.~2~')
2180
 
        self.assertBackupName('file.~2~', 'file')
2181
 
 
2182
 
 
2183
 
class TestFindExecutableInPath(tests.TestCase):
2184
 
 
2185
 
    def test_windows(self):
2186
 
        if sys.platform != 'win32':
2187
 
            raise tests.TestSkipped('test requires win32')
2188
 
        self.assertTrue(osutils.find_executable_on_path(
2189
 
            'explorer') is not None)
2190
 
        self.assertTrue(
2191
 
            osutils.find_executable_on_path('explorer.exe') is not None)
2192
 
        self.assertTrue(
2193
 
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
2194
 
        self.assertTrue(
2195
 
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2196
 
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
2197
 
 
2198
 
    def test_windows_app_path(self):
2199
 
        if sys.platform != 'win32':
2200
 
            raise tests.TestSkipped('test requires win32')
2201
 
        # Override PATH env var so that exe can only be found on App Path
2202
 
        self.overrideEnv('PATH', '')
2203
 
        # Internt Explorer is always registered in the App Path
2204
 
        self.assertTrue(osutils.find_executable_on_path(
2205
 
            'iexplore') is not None)
2206
 
 
2207
 
    def test_other(self):
2208
 
        if sys.platform == 'win32':
2209
 
            raise tests.TestSkipped('test requires non-win32')
2210
 
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
2211
 
        self.assertTrue(
2212
 
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2213
 
 
2214
 
 
2215
 
class SupportsExecutableTests(tests.TestCaseInTempDir):
2216
 
 
2217
 
    def test_returns_bool(self):
2218
 
        self.assertIsInstance(osutils.supports_executable(self.test_dir), bool)
2219
 
 
2220
 
 
2221
 
class SupportsSymlinksTests(tests.TestCaseInTempDir):
2222
 
 
2223
 
    def test_returns_bool(self):
2224
 
        self.assertIsInstance(osutils.supports_symlinks(self.test_dir), bool)
2225
 
 
2226
 
 
2227
 
class MtabReader(tests.TestCaseInTempDir):
2228
 
 
2229
 
    def test_read_mtab(self):
2230
 
        self.build_tree_contents([('mtab', """\
2231
 
/dev/mapper/blah--vg-root / ext4 rw,relatime,errors=remount-ro 0 0
2232
 
/dev/mapper/blah--vg-home /home vfat rw,relatime 0 0
2233
 
# comment
2234
 
 
2235
 
iminvalid
2236
 
""")])
2237
 
        self.assertEqual(
2238
 
            list(osutils.read_mtab('mtab')),
2239
 
            [(b'/', 'ext4'),
2240
 
             (b'/home', 'vfat')])
2241
 
 
2242
 
 
2243
 
class GetFsTypeTests(tests.TestCaseInTempDir):
2244
 
 
2245
 
    def test_returns_string_or_none(self):
2246
 
        ret = osutils.get_fs_type(self.test_dir)
2247
 
        self.assertTrue(isinstance(ret, str) or ret is None)
2248
 
 
2249
 
    def test_returns_most_specific(self):
2250
 
        self.overrideAttr(
2251
 
            osutils, '_FILESYSTEM_FINDER',
2252
 
            osutils.FilesystemFinder(
2253
 
                [(b'/', 'ext4'), (b'/home', 'vfat'),
2254
 
                 (b'/home/jelmer', 'ext2')]))
2255
 
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer/blah'), 'ext2')
2256
 
        self.assertEqual(osutils.get_fs_type('/home/jelmer/blah'), 'ext2')
2257
 
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer'), 'ext2')
2258
 
        self.assertEqual(osutils.get_fs_type(b'/home/martin'), 'vfat')
2259
 
        self.assertEqual(osutils.get_fs_type(b'/home'), 'vfat')
2260
 
        self.assertEqual(osutils.get_fs_type(b'/other'), 'ext4')
2261
 
 
2262
 
    def test_returns_none(self):
2263
 
        self.overrideAttr(
2264
 
            osutils, '_FILESYSTEM_FINDER',
2265
 
            osutils.FilesystemFinder([]))
2266
 
        self.assertIs(osutils.get_fs_type('/home/jelmer/blah'), None)
2267
 
        self.assertIs(osutils.get_fs_type(b'/home/jelmer/blah'), None)
2268
 
        self.assertIs(osutils.get_fs_type('/home/jelmer'), None)
 
2079
        osutils.set_or_unset_env('LOGNAME', u'jrandom\xb6'.encode(ue))
 
2080
        self.assertEqual(u'jrandom\xb6', osutils.getuser_unicode())