/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_osutils.py

  • Committer: Jelmer Vernooij
  • Date: 2018-07-08 14:45:27 UTC
  • mto: This revision was merged to the branch mainline in revision 7036.
  • Revision ID: jelmer@jelmer.uk-20180708144527-codhlvdcdg9y0nji
Fix a bunch of merge tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
 
from cStringIO import StringIO
 
19
from __future__ import absolute_import, division
 
20
 
20
21
import errno
21
22
import os
22
23
import re
 
24
import select
23
25
import socket
24
 
import stat
25
26
import sys
 
27
import tempfile
26
28
import time
27
29
 
28
 
from bzrlib import (
 
30
from .. import (
29
31
    errors,
 
32
    lazy_regex,
30
33
    osutils,
 
34
    symbol_versioning,
31
35
    tests,
32
36
    trace,
33
37
    win32utils,
34
38
    )
35
 
from bzrlib.tests import (
 
39
from ..sixish import (
 
40
    BytesIO,
 
41
    text_type,
 
42
    )
 
43
from . import (
36
44
    features,
37
45
    file_utils,
38
46
    test__walkdirs_win32,
39
47
    )
40
 
 
41
 
 
42
 
class _UTF8DirReaderFeature(tests.Feature):
 
48
from .scenarios import load_tests_apply_scenarios
 
49
 
 
50
 
 
51
class _UTF8DirReaderFeature(features.ModuleAvailableFeature):
43
52
 
44
53
    def _probe(self):
45
54
        try:
46
 
            from bzrlib import _readdir_pyx
 
55
            from .. import _readdir_pyx
 
56
            self._module = _readdir_pyx
47
57
            self.reader = _readdir_pyx.UTF8DirReader
48
58
            return True
49
59
        except ImportError:
50
60
            return False
51
61
 
52
 
    def feature_name(self):
53
 
        return 'bzrlib._readdir_pyx'
54
 
 
55
 
UTF8DirReaderFeature = _UTF8DirReaderFeature()
56
 
 
57
 
term_ios_feature = tests.ModuleAvailableFeature('termios')
 
62
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
 
63
 
 
64
term_ios_feature = features.ModuleAvailableFeature('termios')
58
65
 
59
66
 
60
67
def _already_unicode(s):
78
85
    # Some DirReaders are platform specific and even there they may not be
79
86
    # available.
80
87
    if UTF8DirReaderFeature.available():
81
 
        from bzrlib import _readdir_pyx
 
88
        from .. import _readdir_pyx
82
89
        scenarios.append(('utf8',
83
90
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
84
91
                               _native_to_unicode=_utf8_to_unicode)))
85
92
 
86
93
    if test__walkdirs_win32.win32_readdir_feature.available():
87
94
        try:
88
 
            from bzrlib import _walkdirs_win32
 
95
            from .. import _walkdirs_win32
89
96
            scenarios.append(
90
97
                ('win32',
91
98
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
95
102
    return scenarios
96
103
 
97
104
 
98
 
def load_tests(basic_tests, module, loader):
99
 
    suite = loader.suiteClass()
100
 
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
101
 
        basic_tests, tests.condition_isinstance(TestDirReader))
102
 
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
103
 
    suite.addTest(remaining_tests)
104
 
    return suite
 
105
load_tests = load_tests_apply_scenarios
105
106
 
106
107
 
107
108
class TestContainsWhitespace(tests.TestCase):
108
109
 
109
110
    def test_contains_whitespace(self):
110
 
        self.failUnless(osutils.contains_whitespace(u' '))
111
 
        self.failUnless(osutils.contains_whitespace(u'hello there'))
112
 
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
113
 
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
114
 
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
115
 
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
111
        self.assertTrue(osutils.contains_whitespace(u' '))
 
112
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
 
113
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
 
114
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
 
115
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
 
116
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
116
117
 
117
118
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
118
119
        # is whitespace, but we do not.
119
 
        self.failIf(osutils.contains_whitespace(u''))
120
 
        self.failIf(osutils.contains_whitespace(u'hellothere'))
121
 
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
120
        self.assertFalse(osutils.contains_whitespace(u''))
 
121
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
 
122
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
122
123
 
123
124
 
124
125
class TestRename(tests.TestCaseInTempDir):
136
137
 
137
138
    def test_fancy_rename(self):
138
139
        # This should work everywhere
139
 
        self.create_file('a', 'something in a\n')
 
140
        self.create_file('a', b'something in a\n')
140
141
        self._fancy_rename('a', 'b')
141
 
        self.failIfExists('a')
142
 
        self.failUnlessExists('b')
143
 
        self.check_file_contents('b', 'something in a\n')
 
142
        self.assertPathDoesNotExist('a')
 
143
        self.assertPathExists('b')
 
144
        self.check_file_contents('b', b'something in a\n')
144
145
 
145
 
        self.create_file('a', 'new something in a\n')
 
146
        self.create_file('a', b'new something in a\n')
146
147
        self._fancy_rename('b', 'a')
147
148
 
148
 
        self.check_file_contents('a', 'something in a\n')
 
149
        self.check_file_contents('a', b'something in a\n')
149
150
 
150
151
    def test_fancy_rename_fails_source_missing(self):
151
152
        # An exception should be raised, and the target should be left in place
152
 
        self.create_file('target', 'data in target\n')
 
153
        self.create_file('target', b'data in target\n')
153
154
        self.assertRaises((IOError, OSError), self._fancy_rename,
154
155
                          'missingsource', 'target')
155
 
        self.failUnlessExists('target')
156
 
        self.check_file_contents('target', 'data in target\n')
 
156
        self.assertPathExists('target')
 
157
        self.check_file_contents('target', b'data in target\n')
157
158
 
158
159
    def test_fancy_rename_fails_if_source_and_target_missing(self):
159
160
        self.assertRaises((IOError, OSError), self._fancy_rename,
161
162
 
162
163
    def test_rename(self):
163
164
        # Rename should be semi-atomic on all platforms
164
 
        self.create_file('a', 'something in a\n')
 
165
        self.create_file('a', b'something in a\n')
165
166
        osutils.rename('a', 'b')
166
 
        self.failIfExists('a')
167
 
        self.failUnlessExists('b')
168
 
        self.check_file_contents('b', 'something in a\n')
 
167
        self.assertPathDoesNotExist('a')
 
168
        self.assertPathExists('b')
 
169
        self.check_file_contents('b', b'something in a\n')
169
170
 
170
 
        self.create_file('a', 'new something in a\n')
 
171
        self.create_file('a', b'new something in a\n')
171
172
        osutils.rename('b', 'a')
172
173
 
173
 
        self.check_file_contents('a', 'something in a\n')
 
174
        self.check_file_contents('a', b'something in a\n')
174
175
 
175
176
    # TODO: test fancy_rename using a MemoryTransport
176
177
 
182
183
        # we can't use failUnlessExists on case-insensitive filesystem
183
184
        # so try to check shape of the tree
184
185
        shape = sorted(os.listdir('.'))
185
 
        self.assertEquals(['A', 'B'], shape)
 
186
        self.assertEqual(['A', 'B'], shape)
186
187
 
187
 
    def test_rename_error(self):
188
 
        # We wrap os.rename to make it give an error including the filenames
189
 
        # https://bugs.launchpad.net/bzr/+bug/491763
190
 
        err = self.assertRaises(OSError, osutils.rename,
191
 
            'nonexistent', 'target')
192
 
        self.assertContainsString(str(err), 'nonexistent')
 
188
    def test_rename_exception(self):
 
189
        try:
 
190
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
 
191
        except OSError as e:
 
192
            self.assertEqual(e.old_filename, 'nonexistent_path')
 
193
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
 
194
            self.assertTrue('nonexistent_path' in e.strerror)
 
195
            self.assertTrue('different_nonexistent_path' in e.strerror)
193
196
 
194
197
 
195
198
class TestRandChars(tests.TestCase):
222
225
                         (['src'], SRC_FOO_C),
223
226
                         (['src'], 'src'),
224
227
                         ]:
225
 
            self.assert_(osutils.is_inside_any(dirs, fn))
 
228
            self.assertTrue(osutils.is_inside_any(dirs, fn))
226
229
        for dirs, fn in [(['src'], 'srccontrol'),
227
230
                         (['src'], 'srccontrol/foo')]:
228
231
            self.assertFalse(osutils.is_inside_any(dirs, fn))
234
237
                         (['src/bar.c', 'bla/foo.c'], 'src'),
235
238
                         (['src'], 'src'),
236
239
                         ]:
237
 
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
 
240
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
238
241
 
239
242
        for dirs, fn in [(['src'], 'srccontrol'),
240
243
                         (['srccontrol/foo.c'], 'src'),
242
245
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
243
246
 
244
247
 
 
248
class TestLstat(tests.TestCaseInTempDir):
 
249
 
 
250
    def test_lstat_matches_fstat(self):
 
251
        # On Windows, lstat and fstat don't always agree, primarily in the
 
252
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
 
253
        # custom implementation.
 
254
        if sys.platform == 'win32':
 
255
            # We only have special lstat/fstat if we have the extension.
 
256
            # Without it, we may end up re-reading content when we don't have
 
257
            # to, but otherwise it doesn't effect correctness.
 
258
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
259
        with open('test-file.txt', 'wb') as f:
 
260
            f.write(b'some content\n')
 
261
            f.flush()
 
262
            self.assertEqualStat(osutils.fstat(f.fileno()),
 
263
                                 osutils.lstat('test-file.txt'))
 
264
 
 
265
 
245
266
class TestRmTree(tests.TestCaseInTempDir):
246
267
 
247
268
    def test_rmtree(self):
248
269
        # Check to remove tree with read-only files/dirs
249
270
        os.mkdir('dir')
250
 
        f = file('dir/file', 'w')
251
 
        f.write('spam')
252
 
        f.close()
 
271
        with open('dir/file', 'w') as f:
 
272
            f.write('spam')
253
273
        # would like to also try making the directory readonly, but at the
254
274
        # moment python shutil.rmtree doesn't handle that properly - it would
255
275
        # need to chmod the directory before removing things inside it - deferred
259
279
 
260
280
        osutils.rmtree('dir')
261
281
 
262
 
        self.failIfExists('dir/file')
263
 
        self.failIfExists('dir')
 
282
        self.assertPathDoesNotExist('dir/file')
 
283
        self.assertPathDoesNotExist('dir')
264
284
 
265
285
 
266
286
class TestDeleteAny(tests.TestCaseInTempDir):
279
299
 
280
300
    def test_file_kind(self):
281
301
        self.build_tree(['file', 'dir/'])
282
 
        self.assertEquals('file', osutils.file_kind('file'))
283
 
        self.assertEquals('directory', osutils.file_kind('dir/'))
 
302
        self.assertEqual('file', osutils.file_kind('file'))
 
303
        self.assertEqual('directory', osutils.file_kind('dir/'))
284
304
        if osutils.has_symlinks():
285
305
            os.symlink('symlink', 'symlink')
286
 
            self.assertEquals('symlink', osutils.file_kind('symlink'))
 
306
            self.assertEqual('symlink', osutils.file_kind('symlink'))
287
307
 
288
308
        # TODO: jam 20060529 Test a block device
289
309
        try:
290
310
            os.lstat('/dev/null')
291
 
        except OSError, e:
 
311
        except OSError as e:
292
312
            if e.errno not in (errno.ENOENT,):
293
313
                raise
294
314
        else:
295
 
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
 
315
            self.assertEqual('chardev', osutils.file_kind('/dev/null'))
296
316
 
297
317
        mkfifo = getattr(os, 'mkfifo', None)
298
318
        if mkfifo:
299
319
            mkfifo('fifo')
300
320
            try:
301
 
                self.assertEquals('fifo', osutils.file_kind('fifo'))
 
321
                self.assertEqual('fifo', osutils.file_kind('fifo'))
302
322
            finally:
303
323
                os.remove('fifo')
304
324
 
307
327
            s = socket.socket(AF_UNIX)
308
328
            s.bind('socket')
309
329
            try:
310
 
                self.assertEquals('socket', osutils.file_kind('socket'))
 
330
                self.assertEqual('socket', osutils.file_kind('socket'))
311
331
            finally:
312
332
                os.remove('socket')
313
333
 
332
352
 
333
353
        orig_umask = osutils.get_umask()
334
354
        self.addCleanup(os.umask, orig_umask)
335
 
        os.umask(0222)
336
 
        self.assertEqual(0222, osutils.get_umask())
337
 
        os.umask(0022)
338
 
        self.assertEqual(0022, osutils.get_umask())
339
 
        os.umask(0002)
340
 
        self.assertEqual(0002, osutils.get_umask())
341
 
        os.umask(0027)
342
 
        self.assertEqual(0027, osutils.get_umask())
 
355
        os.umask(0o222)
 
356
        self.assertEqual(0o222, osutils.get_umask())
 
357
        os.umask(0o022)
 
358
        self.assertEqual(0o022, osutils.get_umask())
 
359
        os.umask(0o002)
 
360
        self.assertEqual(0o002, osutils.get_umask())
 
361
        os.umask(0o027)
 
362
        self.assertEqual(0o027, osutils.get_umask())
343
363
 
344
364
 
345
365
class TestDateTime(tests.TestCase):
381
401
        self.assertFormatedDelta('2 seconds in the future', -2)
382
402
 
383
403
    def test_format_date(self):
384
 
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
404
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
385
405
            osutils.format_date, 0, timezone='foo')
386
406
        self.assertIsInstance(osutils.format_date(0), str)
387
 
        self.assertIsInstance(osutils.format_local_date(0), unicode)
 
407
        self.assertIsInstance(osutils.format_local_date(0), text_type)
388
408
        # Testing for the actual value of the local weekday without
389
409
        # duplicating the code from format_date is difficult.
390
410
        # Instead blackbox.test_locale should check for localized
418
438
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
419
439
 
420
440
 
 
441
class TestFdatasync(tests.TestCaseInTempDir):
 
442
 
 
443
    def do_fdatasync(self):
 
444
        f = tempfile.NamedTemporaryFile()
 
445
        osutils.fdatasync(f.fileno())
 
446
        f.close()
 
447
 
 
448
    @staticmethod
 
449
    def raise_eopnotsupp(*args, **kwargs):
 
450
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
 
451
 
 
452
    @staticmethod
 
453
    def raise_enotsup(*args, **kwargs):
 
454
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
 
455
 
 
456
    def test_fdatasync_handles_system_function(self):
 
457
        self.overrideAttr(os, "fdatasync")
 
458
        self.do_fdatasync()
 
459
 
 
460
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
 
461
        self.overrideAttr(os, "fdatasync")
 
462
        self.overrideAttr(os, "fsync")
 
463
        self.do_fdatasync()
 
464
 
 
465
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
 
466
        self.overrideAttr(errno, "EOPNOTSUPP")
 
467
        self.do_fdatasync()
 
468
 
 
469
    def test_fdatasync_catches_ENOTSUP(self):
 
470
        enotsup = getattr(errno, "ENOTSUP", None)
 
471
        if enotsup is None:
 
472
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
 
473
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
 
474
        self.do_fdatasync()
 
475
 
 
476
    def test_fdatasync_catches_EOPNOTSUPP(self):
 
477
        enotsup = getattr(errno, "EOPNOTSUPP", None)
 
478
        if enotsup is None:
 
479
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
 
480
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
 
481
        self.do_fdatasync()
 
482
 
 
483
 
421
484
class TestLinks(tests.TestCaseInTempDir):
422
485
 
423
486
    def test_dereference_path(self):
424
 
        self.requireFeature(tests.SymlinkFeature)
 
487
        self.requireFeature(features.SymlinkFeature)
425
488
        cwd = osutils.realpath('.')
426
489
        os.mkdir('bar')
427
490
        bar_path = osutils.pathjoin(cwd, 'bar')
448
511
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
449
512
 
450
513
    def test_changing_access(self):
451
 
        f = file('file', 'w')
452
 
        f.write('monkey')
453
 
        f.close()
 
514
        with open('file', 'w') as f:
 
515
            f.write('monkey')
454
516
 
455
517
        # Make a file readonly
456
518
        osutils.make_readonly('file')
457
519
        mode = os.lstat('file').st_mode
458
 
        self.assertEqual(mode, mode & 0777555)
 
520
        self.assertEqual(mode, mode & 0o777555)
459
521
 
460
522
        # Make a file writable
461
523
        osutils.make_writable('file')
462
524
        mode = os.lstat('file').st_mode
463
 
        self.assertEqual(mode, mode | 0200)
 
525
        self.assertEqual(mode, mode | 0o200)
464
526
 
465
527
        if osutils.has_symlinks():
466
528
            # should not error when handed a symlink
474
536
 
475
537
class TestCanonicalRelPath(tests.TestCaseInTempDir):
476
538
 
477
 
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
 
539
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
478
540
 
479
541
    def test_canonical_relpath_simple(self):
480
 
        f = file('MixedCaseName', 'w')
 
542
        f = open('MixedCaseName', 'w')
481
543
        f.close()
482
544
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
483
 
        self.failUnlessEqual('work/MixedCaseName', actual)
 
545
        self.assertEqual('work/MixedCaseName', actual)
484
546
 
485
547
    def test_canonical_relpath_missing_tail(self):
486
548
        os.mkdir('MixedCaseParent')
487
549
        actual = osutils.canonical_relpath(self.test_base_dir,
488
550
                                           'mixedcaseparent/nochild')
489
 
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
 
551
        self.assertEqual('work/MixedCaseParent/nochild', actual)
490
552
 
491
553
 
492
554
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
536
598
    """Test pumpfile method."""
537
599
 
538
600
    def setUp(self):
539
 
        tests.TestCase.setUp(self)
 
601
        super(TestPumpFile, self).setUp()
540
602
        # create a test datablock
541
603
        self.block_size = 512
542
 
        pattern = '0123456789ABCDEF'
543
 
        self.test_data = pattern * (3 * self.block_size / len(pattern))
 
604
        pattern = b'0123456789ABCDEF'
 
605
        self.test_data = pattern * (3 * self.block_size // len(pattern))
544
606
        self.test_data_len = len(self.test_data)
545
607
 
546
608
    def test_bracket_block_size(self):
550
612
        self.assertTrue(self.test_data_len > self.block_size)
551
613
 
552
614
        from_file = file_utils.FakeReadFile(self.test_data)
553
 
        to_file = StringIO()
 
615
        to_file = BytesIO()
554
616
 
555
 
        # read (max / 2) bytes and verify read size wasn't affected
556
 
        num_bytes_to_read = self.block_size / 2
 
617
        # read (max // 2) bytes and verify read size wasn't affected
 
618
        num_bytes_to_read = self.block_size // 2
557
619
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
558
620
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
559
621
        self.assertEqual(from_file.get_read_count(), 1)
591
653
 
592
654
        # retrieve data in blocks
593
655
        from_file = file_utils.FakeReadFile(self.test_data)
594
 
        to_file = StringIO()
 
656
        to_file = BytesIO()
595
657
        osutils.pumpfile(from_file, to_file, self.test_data_len,
596
658
                         self.block_size)
597
659
 
615
677
 
616
678
        # retrieve data to EOF
617
679
        from_file = file_utils.FakeReadFile(self.test_data)
618
 
        to_file = StringIO()
 
680
        to_file = BytesIO()
619
681
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
620
682
 
621
683
        # verify read size was equal to the maximum read size
635
697
        with this new version."""
636
698
        # retrieve data using default (old) pumpfile method
637
699
        from_file = file_utils.FakeReadFile(self.test_data)
638
 
        to_file = StringIO()
 
700
        to_file = BytesIO()
639
701
        osutils.pumpfile(from_file, to_file)
640
702
 
641
703
        # report error if the data wasn't equal (we only report the size due
649
711
        activity = []
650
712
        def log_activity(length, direction):
651
713
            activity.append((length, direction))
652
 
        from_file = StringIO(self.test_data)
653
 
        to_file = StringIO()
 
714
        from_file = BytesIO(self.test_data)
 
715
        to_file = BytesIO()
654
716
        osutils.pumpfile(from_file, to_file, buff_size=500,
655
717
                         report_activity=log_activity, direction='read')
656
718
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
657
719
                          (36, 'read')], activity)
658
720
 
659
 
        from_file = StringIO(self.test_data)
660
 
        to_file = StringIO()
 
721
        from_file = BytesIO(self.test_data)
 
722
        to_file = BytesIO()
661
723
        del activity[:]
662
724
        osutils.pumpfile(from_file, to_file, buff_size=500,
663
725
                         report_activity=log_activity, direction='write')
665
727
                          (36, 'write')], activity)
666
728
 
667
729
        # And with a limited amount of data
668
 
        from_file = StringIO(self.test_data)
669
 
        to_file = StringIO()
 
730
        from_file = BytesIO(self.test_data)
 
731
        to_file = BytesIO()
670
732
        del activity[:]
671
733
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
672
734
                         report_activity=log_activity, direction='read')
677
739
class TestPumpStringFile(tests.TestCase):
678
740
 
679
741
    def test_empty(self):
680
 
        output = StringIO()
681
 
        osutils.pump_string_file("", output)
682
 
        self.assertEqual("", output.getvalue())
 
742
        output = BytesIO()
 
743
        osutils.pump_string_file(b"", output)
 
744
        self.assertEqual(b"", output.getvalue())
683
745
 
684
746
    def test_more_than_segment_size(self):
685
 
        output = StringIO()
686
 
        osutils.pump_string_file("123456789", output, 2)
687
 
        self.assertEqual("123456789", output.getvalue())
 
747
        output = BytesIO()
 
748
        osutils.pump_string_file(b"123456789", output, 2)
 
749
        self.assertEqual(b"123456789", output.getvalue())
688
750
 
689
751
    def test_segment_size(self):
690
 
        output = StringIO()
691
 
        osutils.pump_string_file("12", output, 2)
692
 
        self.assertEqual("12", output.getvalue())
 
752
        output = BytesIO()
 
753
        osutils.pump_string_file(b"12", output, 2)
 
754
        self.assertEqual(b"12", output.getvalue())
693
755
 
694
756
    def test_segment_size_multiple(self):
695
 
        output = StringIO()
696
 
        osutils.pump_string_file("1234", output, 2)
697
 
        self.assertEqual("1234", output.getvalue())
 
757
        output = BytesIO()
 
758
        osutils.pump_string_file(b"1234", output, 2)
 
759
        self.assertEqual(b"1234", output.getvalue())
698
760
 
699
761
 
700
762
class TestRelpath(tests.TestCase):
763
825
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
764
826
 
765
827
    def test_from_unicode_string_ascii_contents(self):
766
 
        self.assertEqual('bargam',
767
 
                         osutils.safe_revision_id(u'bargam', warn=False))
768
 
 
769
 
    def test_from_unicode_deprecated(self):
770
 
        self.assertEqual('bargam',
771
 
            self.callDeprecated([osutils._revision_id_warning],
772
 
                                osutils.safe_revision_id, u'bargam'))
 
828
        self.assertRaises(TypeError,
 
829
                          osutils.safe_revision_id, u'bargam')
773
830
 
774
831
    def test_from_unicode_string_unicode_contents(self):
775
 
        self.assertEqual('bargam\xc2\xae',
776
 
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
832
        self.assertRaises(TypeError,
 
833
                         osutils.safe_revision_id, u'bargam\xae')
777
834
 
778
835
    def test_from_utf8_string(self):
779
836
        self.assertEqual('foo\xc2\xae',
790
847
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
791
848
 
792
849
    def test_from_unicode_string_ascii_contents(self):
793
 
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
794
 
 
795
 
    def test_from_unicode_deprecated(self):
796
 
        self.assertEqual('bargam',
797
 
            self.callDeprecated([osutils._file_id_warning],
798
 
                                osutils.safe_file_id, u'bargam'))
 
850
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
799
851
 
800
852
    def test_from_unicode_string_unicode_contents(self):
801
 
        self.assertEqual('bargam\xc2\xae',
802
 
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
853
        self.assertRaises(TypeError,
 
854
                          osutils.safe_file_id, u'bargam\xae')
803
855
 
804
856
    def test_from_utf8_string(self):
805
857
        self.assertEqual('foo\xc2\xae',
810
862
        self.assertEqual(None, osutils.safe_file_id(None))
811
863
 
812
864
 
 
865
class TestSendAll(tests.TestCase):
 
866
 
 
867
    def test_send_with_disconnected_socket(self):
 
868
        class DisconnectedSocket(object):
 
869
            def __init__(self, err):
 
870
                self.err = err
 
871
            def send(self, content):
 
872
                raise self.err
 
873
            def close(self):
 
874
                pass
 
875
        # All of these should be treated as ConnectionReset
 
876
        errs = []
 
877
        for err_cls in (IOError, socket.error):
 
878
            for errnum in osutils._end_of_stream_errors:
 
879
                errs.append(err_cls(errnum))
 
880
        for err in errs:
 
881
            sock = DisconnectedSocket(err)
 
882
            self.assertRaises(errors.ConnectionReset,
 
883
                osutils.send_all, sock, b'some more content')
 
884
 
 
885
    def test_send_with_no_progress(self):
 
886
        # See https://bugs.launchpad.net/bzr/+bug/1047309
 
887
        # It seems that paramiko can get into a state where it doesn't error,
 
888
        # but it returns 0 bytes sent for requests over and over again.
 
889
        class NoSendingSocket(object):
 
890
            def __init__(self):
 
891
                self.call_count = 0
 
892
            def send(self, bytes):
 
893
                self.call_count += 1
 
894
                if self.call_count > 100:
 
895
                    # Prevent the test suite from hanging
 
896
                    raise RuntimeError('too many calls')
 
897
                return 0
 
898
        sock = NoSendingSocket()
 
899
        self.assertRaises(errors.ConnectionReset,
 
900
                          osutils.send_all, sock, b'content')
 
901
        self.assertEqual(1, sock.call_count)
 
902
 
 
903
 
 
904
class TestPosixFuncs(tests.TestCase):
 
905
    """Test that the posix version of normpath returns an appropriate path
 
906
       when used with 2 leading slashes."""
 
907
 
 
908
    def test_normpath(self):
 
909
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
 
910
        self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
911
        self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
912
 
 
913
 
813
914
class TestWin32Funcs(tests.TestCase):
814
915
    """Test that _win32 versions of os utilities return appropriate paths."""
815
916
 
816
917
    def test_abspath(self):
 
918
        self.requireFeature(features.win32_feature)
817
919
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
818
920
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
819
921
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
832
934
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
833
935
        self.assertEqual('path/to/foo',
834
936
                         osutils._win32_pathjoin('path/to/', 'foo'))
835
 
        self.assertEqual('/foo',
 
937
 
 
938
    def test_pathjoin_late_bugfix(self):
 
939
        if sys.version_info < (2, 7, 6):
 
940
            expected = '/foo'
 
941
        else:
 
942
            expected = 'C:/foo'
 
943
        self.assertEqual(expected,
836
944
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
837
 
        self.assertEqual('/foo',
 
945
        self.assertEqual(expected,
838
946
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
839
947
 
840
948
    def test_normpath(self):
845
953
 
846
954
    def test_getcwd(self):
847
955
        cwd = osutils._win32_getcwd()
848
 
        os_cwd = os.getcwdu()
 
956
        os_cwd = osutils._getcwd()
849
957
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
850
958
        # win32 is inconsistent whether it returns lower or upper case
851
959
        # and even if it was consistent the user might type the other
859
967
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
860
968
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
861
969
 
862
 
    def test_win98_abspath(self):
863
 
        # absolute path
864
 
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
865
 
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
866
 
        # UNC path
867
 
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
868
 
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
869
 
        # relative path
870
 
        cwd = osutils.getcwd().rstrip('/')
871
 
        drive = osutils._nt_splitdrive(cwd)[0]
872
 
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
873
 
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
874
 
        # unicode path
875
 
        u = u'\u1234'
876
 
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
877
 
 
878
970
 
879
971
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
880
972
    """Test win32 functions that create files."""
881
973
 
882
974
    def test_getcwd(self):
883
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
975
        self.requireFeature(features.UnicodeFilenameFeature)
884
976
        os.mkdir(u'mu-\xb5')
885
977
        os.chdir(u'mu-\xb5')
886
978
        # TODO: jam 20060427 This will probably fail on Mac OSX because
892
984
    def test_minimum_path_selection(self):
893
985
        self.assertEqual(set(),
894
986
            osutils.minimum_path_selection([]))
895
 
        self.assertEqual(set(['a']),
 
987
        self.assertEqual({'a'},
896
988
            osutils.minimum_path_selection(['a']))
897
 
        self.assertEqual(set(['a', 'b']),
 
989
        self.assertEqual({'a', 'b'},
898
990
            osutils.minimum_path_selection(['a', 'b']))
899
 
        self.assertEqual(set(['a/', 'b']),
 
991
        self.assertEqual({'a/', 'b'},
900
992
            osutils.minimum_path_selection(['a/', 'b']))
901
 
        self.assertEqual(set(['a/', 'b']),
 
993
        self.assertEqual({'a/', 'b'},
902
994
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
903
 
        self.assertEqual(set(['a-b', 'a', 'a0b']),
 
995
        self.assertEqual({'a-b', 'a', 'a0b'},
904
996
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
905
997
 
906
998
    def test_mkdtemp(self):
908
1000
        self.assertFalse('\\' in tmpdir)
909
1001
 
910
1002
    def test_rename(self):
911
 
        a = open('a', 'wb')
912
 
        a.write('foo\n')
913
 
        a.close()
914
 
        b = open('b', 'wb')
915
 
        b.write('baz\n')
916
 
        b.close()
 
1003
        with open('a', 'wb') as a:
 
1004
            a.write(b'foo\n')
 
1005
        with open('b', 'wb') as b:
 
1006
            b.write(b'baz\n')
917
1007
 
918
1008
        osutils._win32_rename('b', 'a')
919
 
        self.failUnlessExists('a')
920
 
        self.failIfExists('b')
921
 
        self.assertFileEqual('baz\n', 'a')
 
1009
        self.assertPathExists('a')
 
1010
        self.assertPathDoesNotExist('b')
 
1011
        self.assertFileEqual(b'baz\n', 'a')
922
1012
 
923
1013
    def test_rename_missing_file(self):
924
 
        a = open('a', 'wb')
925
 
        a.write('foo\n')
926
 
        a.close()
 
1014
        with open('a', 'wb') as a:
 
1015
            a.write(b'foo\n')
927
1016
 
928
1017
        try:
929
1018
            osutils._win32_rename('b', 'a')
930
 
        except (IOError, OSError), e:
 
1019
        except (IOError, OSError) as e:
931
1020
            self.assertEqual(errno.ENOENT, e.errno)
932
 
        self.assertFileEqual('foo\n', 'a')
 
1021
        self.assertFileEqual(b'foo\n', 'a')
933
1022
 
934
1023
    def test_rename_missing_dir(self):
935
1024
        os.mkdir('a')
936
1025
        try:
937
1026
            osutils._win32_rename('b', 'a')
938
 
        except (IOError, OSError), e:
 
1027
        except (IOError, OSError) as e:
939
1028
            self.assertEqual(errno.ENOENT, e.errno)
940
1029
 
941
1030
    def test_rename_current_dir(self):
947
1036
        # doesn't exist.
948
1037
        try:
949
1038
            osutils._win32_rename('b', '.')
950
 
        except (IOError, OSError), e:
 
1039
        except (IOError, OSError) as e:
951
1040
            self.assertEqual(errno.ENOENT, e.errno)
952
1041
 
953
1042
    def test_splitpath(self):
976
1065
    """Test mac special functions that require directories."""
977
1066
 
978
1067
    def test_getcwd(self):
979
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1068
        self.requireFeature(features.UnicodeFilenameFeature)
980
1069
        os.mkdir(u'B\xe5gfors')
981
1070
        os.chdir(u'B\xe5gfors')
982
1071
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
983
1072
 
984
1073
    def test_getcwd_nonnorm(self):
985
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1074
        self.requireFeature(features.UnicodeFilenameFeature)
986
1075
        # Test that _mac_getcwd() will normalize this path
987
1076
        os.mkdir(u'Ba\u030agfors')
988
1077
        os.chdir(u'Ba\u030agfors')
998
1087
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
999
1088
 
1000
1089
    def test_osutils_binding(self):
1001
 
        from bzrlib.tests import test__chunks_to_lines
 
1090
        from . import test__chunks_to_lines
1002
1091
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
1003
 
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
 
1092
            from .._chunks_to_lines_pyx import chunks_to_lines
1004
1093
        else:
1005
 
            from bzrlib._chunks_to_lines_py import chunks_to_lines
 
1094
            from .._chunks_to_lines_py import chunks_to_lines
1006
1095
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1007
1096
 
1008
1097
 
1071
1160
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
1072
1161
 
1073
1162
    def test_walkdirs_os_error(self):
1074
 
        # <https://bugs.edge.launchpad.net/bzr/+bug/338653>
 
1163
        # <https://bugs.launchpad.net/bzr/+bug/338653>
1075
1164
        # Pyrex readdir didn't raise useful messages if it had an error
1076
1165
        # reading the directory
1077
1166
        if sys.platform == 'win32':
1078
1167
            raise tests.TestNotApplicable(
1079
1168
                "readdir IOError not tested on win32")
 
1169
        self.requireFeature(features.not_running_as_root)
1080
1170
        os.mkdir("test-unreadable")
1081
1171
        os.chmod("test-unreadable", 0000)
1082
1172
        # must chmod it back so that it can be removed
1083
 
        self.addCleanup(os.chmod, "test-unreadable", 0700)
 
1173
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
1084
1174
        # The error is not raised until the generator is actually evaluated.
1085
1175
        # (It would be ok if it happened earlier but at the moment it
1086
1176
        # doesn't.)
1087
1177
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1088
 
        self.assertEquals('./test-unreadable', e.filename)
1089
 
        self.assertEquals(errno.EACCES, e.errno)
 
1178
        self.assertEqual('./test-unreadable', e.filename)
 
1179
        self.assertEqual(errno.EACCES, e.errno)
1090
1180
        # Ensure the message contains the file name
1091
 
        self.assertContainsRe(str(e), "\./test-unreadable")
 
1181
        self.assertContainsRe(str(e), "\\./test-unreadable")
 
1182
 
 
1183
 
 
1184
    def test_walkdirs_encoding_error(self):
 
1185
        # <https://bugs.launchpad.net/bzr/+bug/488519>
 
1186
        # walkdirs didn't raise a useful message when the filenames
 
1187
        # are not using the filesystem's encoding
 
1188
 
 
1189
        # require a bytestring based filesystem
 
1190
        self.requireFeature(features.ByteStringNamedFilesystem)
 
1191
 
 
1192
        tree = [
 
1193
            '.bzr',
 
1194
            '0file',
 
1195
            '1dir/',
 
1196
            '1dir/0file',
 
1197
            '1dir/1dir/',
 
1198
            '1file'
 
1199
            ]
 
1200
 
 
1201
        self.build_tree(tree)
 
1202
 
 
1203
        # rename the 1file to a latin-1 filename
 
1204
        os.rename("./1file", "\xe8file")
 
1205
        if "\xe8file" not in os.listdir("."):
 
1206
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
 
1207
 
 
1208
        self._save_platform_info()
 
1209
        osutils._fs_enc = 'UTF-8'
 
1210
 
 
1211
        # this should raise on error
 
1212
        def attempt():
 
1213
            for dirdetail, dirblock in osutils.walkdirs('.'):
 
1214
                pass
 
1215
 
 
1216
        self.assertRaises(errors.BadFilenameEncoding, attempt)
1092
1217
 
1093
1218
    def test__walkdirs_utf8(self):
1094
1219
        tree = [
1145
1270
            dirblock[:] = new_dirblock
1146
1271
 
1147
1272
    def _save_platform_info(self):
1148
 
        self.overrideAttr(win32utils, 'winver')
1149
1273
        self.overrideAttr(osutils, '_fs_enc')
1150
1274
        self.overrideAttr(osutils, '_selected_dir_reader')
1151
1275
 
1160
1284
    def test_force_walkdirs_utf8_fs_utf8(self):
1161
1285
        self.requireFeature(UTF8DirReaderFeature)
1162
1286
        self._save_platform_info()
1163
 
        win32utils.winver = None # Avoid the win32 detection code
1164
 
        osutils._fs_enc = 'UTF-8'
1165
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1287
        osutils._fs_enc = 'utf-8'
 
1288
        self.assertDirReaderIs(
 
1289
            UTF8DirReaderFeature.module.UTF8DirReader)
1166
1290
 
1167
1291
    def test_force_walkdirs_utf8_fs_ascii(self):
1168
1292
        self.requireFeature(UTF8DirReaderFeature)
1169
1293
        self._save_platform_info()
1170
 
        win32utils.winver = None # Avoid the win32 detection code
1171
 
        osutils._fs_enc = 'US-ASCII'
1172
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1173
 
 
1174
 
    def test_force_walkdirs_utf8_fs_ANSI(self):
1175
 
        self.requireFeature(UTF8DirReaderFeature)
1176
 
        self._save_platform_info()
1177
 
        win32utils.winver = None # Avoid the win32 detection code
1178
 
        osutils._fs_enc = 'ANSI_X3.4-1968'
1179
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1294
        osutils._fs_enc = 'ascii'
 
1295
        self.assertDirReaderIs(
 
1296
            UTF8DirReaderFeature.module.UTF8DirReader)
1180
1297
 
1181
1298
    def test_force_walkdirs_utf8_fs_latin1(self):
1182
1299
        self._save_platform_info()
1183
 
        win32utils.winver = None # Avoid the win32 detection code
1184
 
        osutils._fs_enc = 'latin1'
 
1300
        osutils._fs_enc = 'iso-8859-1'
1185
1301
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1186
1302
 
1187
1303
    def test_force_walkdirs_utf8_nt(self):
1188
1304
        # Disabled because the thunk of the whole walkdirs api is disabled.
1189
1305
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1190
1306
        self._save_platform_info()
1191
 
        win32utils.winver = 'Windows NT'
1192
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1307
        from .._walkdirs_win32 import Win32ReadDir
1193
1308
        self.assertDirReaderIs(Win32ReadDir)
1194
1309
 
1195
 
    def test_force_walkdirs_utf8_98(self):
1196
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1197
 
        self._save_platform_info()
1198
 
        win32utils.winver = 'Windows 98'
1199
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1200
 
 
1201
1310
    def test_unicode_walkdirs(self):
1202
1311
        """Walkdirs should always return unicode paths."""
1203
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1312
        self.requireFeature(features.UnicodeFilenameFeature)
1204
1313
        name0 = u'0file-\xb6'
1205
1314
        name1 = u'1dir-\u062c\u0648'
1206
1315
        name2 = u'2file-\u0633'
1243
1352
 
1244
1353
        The abspath portion might be in unicode or utf-8
1245
1354
        """
1246
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1355
        self.requireFeature(features.UnicodeFilenameFeature)
1247
1356
        name0 = u'0file-\xb6'
1248
1357
        name1 = u'1dir-\u062c\u0648'
1249
1358
        name2 = u'2file-\u0633'
1283
1392
        # all abspaths are Unicode, and encode them back into utf8.
1284
1393
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1285
1394
            self.assertIsInstance(dirdetail[0], str)
1286
 
            if isinstance(dirdetail[1], unicode):
 
1395
            if isinstance(dirdetail[1], text_type):
1287
1396
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1288
1397
                dirblock = [list(info) for info in dirblock]
1289
1398
                for info in dirblock:
1290
 
                    self.assertIsInstance(info[4], unicode)
 
1399
                    self.assertIsInstance(info[4], text_type)
1291
1400
                    info[4] = info[4].encode('utf8')
1292
1401
            new_dirblock = []
1293
1402
            for info in dirblock:
1304
1413
 
1305
1414
        The abspath portion should be in unicode
1306
1415
        """
1307
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1416
        self.requireFeature(features.UnicodeFilenameFeature)
1308
1417
        # Use the unicode reader. TODO: split into driver-and-driven unit
1309
1418
        # tests.
1310
1419
        self._save_platform_info()
1351
1460
 
1352
1461
    def test__walkdirs_utf8_win32readdir(self):
1353
1462
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1354
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1355
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1463
        self.requireFeature(features.UnicodeFilenameFeature)
 
1464
        from .._walkdirs_win32 import Win32ReadDir
1356
1465
        self._save_platform_info()
1357
1466
        osutils._selected_dir_reader = Win32ReadDir()
1358
1467
        name0u = u'0file-\xb6'
1408
1517
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1409
1518
        """make sure our Stat values are valid"""
1410
1519
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1411
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1412
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1520
        self.requireFeature(features.UnicodeFilenameFeature)
 
1521
        from .._walkdirs_win32 import Win32ReadDir
1413
1522
        name0u = u'0file-\xb6'
1414
1523
        name0 = name0u.encode('utf8')
1415
1524
        self.build_tree([name0u])
1416
1525
        # I hate to sleep() here, but I'm trying to make the ctime different
1417
1526
        # from the mtime
1418
1527
        time.sleep(2)
1419
 
        f = open(name0u, 'ab')
1420
 
        try:
1421
 
            f.write('just a small update')
1422
 
        finally:
1423
 
            f.close()
 
1528
        with open(name0u, 'ab') as f:
 
1529
            f.write(b'just a small update')
1424
1530
 
1425
1531
        result = Win32ReadDir().read_dir('', u'.')
1426
1532
        entry = result[0]
1432
1538
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1433
1539
        """make sure our Stat values are valid"""
1434
1540
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1435
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1436
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1541
        self.requireFeature(features.UnicodeFilenameFeature)
 
1542
        from .._walkdirs_win32 import Win32ReadDir
1437
1543
        name0u = u'0dir-\u062c\u0648'
1438
1544
        name0 = name0u.encode('utf8')
1439
1545
        self.build_tree([name0u + '/'])
1538
1644
        self.assertEqual(['c'], os.listdir('target/b'))
1539
1645
 
1540
1646
    def test_copy_tree_symlinks(self):
1541
 
        self.requireFeature(tests.SymlinkFeature)
 
1647
        self.requireFeature(features.SymlinkFeature)
1542
1648
        self.build_tree(['source/'])
1543
1649
        os.symlink('a/generic/path', 'source/lnk')
1544
1650
        osutils.copy_tree('source', 'target')
1554
1660
            processed_files.append(('d', from_path, to_path))
1555
1661
        def link_handler(from_path, to_path):
1556
1662
            processed_links.append((from_path, to_path))
1557
 
        handlers = {'file':file_handler,
1558
 
                    'directory':dir_handler,
1559
 
                    'symlink':link_handler,
 
1663
        handlers = {'file': file_handler,
 
1664
                    'directory': dir_handler,
 
1665
                    'symlink': link_handler,
1560
1666
                   }
1561
1667
 
1562
1668
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1569
1675
                          ('d', 'source/b', 'target/b'),
1570
1676
                          ('f', 'source/b/c', 'target/b/c'),
1571
1677
                         ], processed_files)
1572
 
        self.failIfExists('target')
 
1678
        self.assertPathDoesNotExist('target')
1573
1679
        if osutils.has_symlinks():
1574
1680
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1575
1681
 
1580
1686
    def setUp(self):
1581
1687
        super(TestSetUnsetEnv, self).setUp()
1582
1688
 
1583
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
 
1689
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
1584
1690
                         'Environment was not cleaned up properly.'
1585
 
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
 
1691
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
1586
1692
        def cleanup():
1587
 
            if 'BZR_TEST_ENV_VAR' in os.environ:
1588
 
                del os.environ['BZR_TEST_ENV_VAR']
 
1693
            if 'BRZ_TEST_ENV_VAR' in os.environ:
 
1694
                del os.environ['BRZ_TEST_ENV_VAR']
1589
1695
        self.addCleanup(cleanup)
1590
1696
 
1591
1697
    def test_set(self):
1592
1698
        """Test that we can set an env variable"""
1593
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1699
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1594
1700
        self.assertEqual(None, old)
1595
 
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
 
1701
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
1596
1702
 
1597
1703
    def test_double_set(self):
1598
1704
        """Test that we get the old value out"""
1599
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1600
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
 
1705
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1706
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
1601
1707
        self.assertEqual('foo', old)
1602
 
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
 
1708
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
1603
1709
 
1604
1710
    def test_unicode(self):
1605
1711
        """Environment can only contain plain strings
1612
1718
                'Cannot find a unicode character that works in encoding %s'
1613
1719
                % (osutils.get_user_encoding(),))
1614
1720
 
1615
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1616
 
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
 
1721
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
 
1722
        self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1617
1723
 
1618
1724
    def test_unset(self):
1619
1725
        """Test that passing None will remove the env var"""
1620
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1621
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
 
1726
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1727
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
1622
1728
        self.assertEqual('foo', old)
1623
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1624
 
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1729
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1730
        self.assertFalse('BRZ_TEST_ENV_VAR' in os.environ)
1625
1731
 
1626
1732
 
1627
1733
class TestSizeShaFile(tests.TestCaseInTempDir):
1628
1734
 
1629
1735
    def test_sha_empty(self):
1630
 
        self.build_tree_contents([('foo', '')])
 
1736
        self.build_tree_contents([('foo', b'')])
1631
1737
        expected_sha = osutils.sha_string('')
1632
1738
        f = open('foo')
1633
1739
        self.addCleanup(f.close)
1636
1742
        self.assertEqual(expected_sha, sha)
1637
1743
 
1638
1744
    def test_sha_mixed_endings(self):
1639
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1745
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
1640
1746
        self.build_tree_contents([('foo', text)])
1641
1747
        expected_sha = osutils.sha_string(text)
1642
1748
        f = open('foo', 'rb')
1649
1755
class TestShaFileByName(tests.TestCaseInTempDir):
1650
1756
 
1651
1757
    def test_sha_empty(self):
1652
 
        self.build_tree_contents([('foo', '')])
 
1758
        self.build_tree_contents([('foo', b'')])
1653
1759
        expected_sha = osutils.sha_string('')
1654
1760
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1655
1761
 
1656
1762
    def test_sha_mixed_endings(self):
1657
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1763
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
1658
1764
        self.build_tree_contents([('foo', text)])
1659
1765
        expected_sha = osutils.sha_string(text)
1660
1766
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1663
1769
class TestResourceLoading(tests.TestCaseInTempDir):
1664
1770
 
1665
1771
    def test_resource_string(self):
1666
 
        # test resource in bzrlib
1667
 
        text = osutils.resource_string('bzrlib', 'debug.py')
 
1772
        # test resource in breezy
 
1773
        text = osutils.resource_string('breezy', 'debug.py')
1668
1774
        self.assertContainsRe(text, "debug_flags = set()")
1669
 
        # test resource under bzrlib
1670
 
        text = osutils.resource_string('bzrlib.ui', 'text.py')
 
1775
        # test resource under breezy
 
1776
        text = osutils.resource_string('breezy.ui', 'text.py')
1671
1777
        self.assertContainsRe(text, "class TextUIFactory")
1672
1778
        # test unsupported package
1673
1779
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1674
1780
            'yyy.xx')
1675
1781
        # test unknown resource
1676
 
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1677
 
 
1678
 
 
1679
 
class TestReCompile(tests.TestCase):
1680
 
 
1681
 
    def test_re_compile_checked(self):
1682
 
        r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1683
 
        self.assertTrue(r.match('aaaa'))
1684
 
        self.assertTrue(r.match('aAaA'))
1685
 
 
1686
 
    def test_re_compile_checked_error(self):
1687
 
        # like https://bugs.launchpad.net/bzr/+bug/251352
1688
 
        err = self.assertRaises(
1689
 
            errors.BzrCommandError,
1690
 
            osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1691
 
        self.assertEqual(
1692
 
            "Invalid regular expression in test case: '*': "
1693
 
            "nothing to repeat",
1694
 
            str(err))
 
1782
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
1695
1783
 
1696
1784
 
1697
1785
class TestDirReader(tests.TestCaseInTempDir):
1698
1786
 
 
1787
    scenarios = dir_reader_scenarios()
 
1788
 
1699
1789
    # Set by load_tests
1700
1790
    _dir_reader_class = None
1701
1791
    _native_to_unicode = None
1702
1792
 
1703
1793
    def setUp(self):
1704
 
        tests.TestCaseInTempDir.setUp(self)
 
1794
        super(TestDirReader, self).setUp()
1705
1795
        self.overrideAttr(osutils,
1706
1796
                          '_selected_dir_reader', self._dir_reader_class())
1707
1797
 
1801
1891
        return filtered_dirblocks
1802
1892
 
1803
1893
    def test_walk_unicode_tree(self):
1804
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1894
        self.requireFeature(features.UnicodeFilenameFeature)
1805
1895
        tree, expected_dirblocks = self._get_unicode_tree()
1806
1896
        self.build_tree(tree)
1807
1897
        result = list(osutils._walkdirs_utf8('.'))
1808
1898
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1809
1899
 
1810
1900
    def test_symlink(self):
1811
 
        self.requireFeature(tests.SymlinkFeature)
1812
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1901
        self.requireFeature(features.SymlinkFeature)
 
1902
        self.requireFeature(features.UnicodeFilenameFeature)
1813
1903
        target = u'target\N{Euro Sign}'
1814
1904
        link_name = u'l\N{Euro Sign}nk'
1815
1905
        os.symlink(target, link_name)
1833
1923
    But prior python versions failed to properly encode the passed unicode
1834
1924
    string.
1835
1925
    """
1836
 
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
 
1926
    _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
1837
1927
 
1838
1928
    def setUp(self):
1839
1929
        super(tests.TestCaseInTempDir, self).setUp()
1842
1932
        os.symlink(self.target, self.link)
1843
1933
 
1844
1934
    def test_os_readlink_link_encoding(self):
1845
 
        if sys.version_info < (2, 6):
1846
 
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1847
 
        else:
1848
 
            self.assertEquals(self.target,  os.readlink(self.link))
 
1935
        self.assertEqual(self.target,  os.readlink(self.link))
1849
1936
 
1850
1937
    def test_os_readlink_link_decoding(self):
1851
 
        self.assertEquals(self.target.encode(osutils._fs_enc),
 
1938
        self.assertEqual(self.target.encode(osutils._fs_enc),
1852
1939
                          os.readlink(self.link.encode(osutils._fs_enc)))
1853
1940
 
1854
1941
 
1863
1950
        self.assertIsInstance(concurrency, int)
1864
1951
 
1865
1952
    def test_local_concurrency_environment_variable(self):
1866
 
        os.environ['BZR_CONCURRENCY'] = '2'
 
1953
        self.overrideEnv('BRZ_CONCURRENCY', '2')
1867
1954
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1868
 
        os.environ['BZR_CONCURRENCY'] = '3'
 
1955
        self.overrideEnv('BRZ_CONCURRENCY', '3')
1869
1956
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1870
 
        os.environ['BZR_CONCURRENCY'] = 'foo'
 
1957
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
1871
1958
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1872
1959
 
1873
1960
    def test_option_concurrency(self):
1874
 
        os.environ['BZR_CONCURRENCY'] = '1'
 
1961
        self.overrideEnv('BRZ_CONCURRENCY', '1')
1875
1962
        self.run_bzr('rocks --concurrency 42')
1876
 
        # Command line overrides envrionment variable
1877
 
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1878
 
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
 
1963
        # Command line overrides environment variable
 
1964
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
 
1965
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
1879
1966
 
1880
1967
 
1881
1968
class TestFailedToLoadExtension(tests.TestCase):
1882
1969
 
1883
1970
    def _try_loading(self):
1884
1971
        try:
1885
 
            import bzrlib._fictional_extension_py
1886
 
        except ImportError, e:
 
1972
            import breezy._fictional_extension_py
 
1973
        except ImportError as e:
1887
1974
            osutils.failed_to_load_extension(e)
1888
1975
            return True
1889
1976
 
1894
1981
    def test_failure_to_load(self):
1895
1982
        self._try_loading()
1896
1983
        self.assertLength(1, osutils._extension_load_failures)
1897
 
        self.assertEquals(osutils._extension_load_failures[0],
 
1984
        self.assertEqual(osutils._extension_load_failures[0],
1898
1985
            "No module named _fictional_extension_py")
1899
1986
 
1900
1987
    def test_report_extension_load_failures_no_warning(self):
1904
1991
        self.assertLength(0, warnings)
1905
1992
 
1906
1993
    def test_report_extension_load_failures_message(self):
1907
 
        log = StringIO()
 
1994
        log = BytesIO()
1908
1995
        trace.push_log_file(log)
1909
1996
        self.assertTrue(self._try_loading())
1910
1997
        osutils.report_extension_load_failures()
1911
1998
        self.assertContainsRe(
1912
1999
            log.getvalue(),
1913
 
            r"bzr: warning: some compiled extensions could not be loaded; "
1914
 
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
 
2000
            r"brz: warning: some compiled extensions could not be loaded; "
 
2001
            "see ``brz help missing-extensions``\n"
1915
2002
            )
1916
2003
 
1917
2004
 
1918
2005
class TestTerminalWidth(tests.TestCase):
1919
2006
 
 
2007
    def setUp(self):
 
2008
        super(TestTerminalWidth, self).setUp()
 
2009
        self._orig_terminal_size_state = osutils._terminal_size_state
 
2010
        self._orig_first_terminal_size = osutils._first_terminal_size
 
2011
        self.addCleanup(self.restore_osutils_globals)
 
2012
        osutils._terminal_size_state = 'no_data'
 
2013
        osutils._first_terminal_size = None
 
2014
 
 
2015
    def restore_osutils_globals(self):
 
2016
        osutils._terminal_size_state = self._orig_terminal_size_state
 
2017
        osutils._first_terminal_size = self._orig_first_terminal_size
 
2018
 
1920
2019
    def replace_stdout(self, new):
1921
2020
        self.overrideAttr(sys, 'stdout', new)
1922
2021
 
1934
2033
    def test_default_values(self):
1935
2034
        self.assertEqual(80, osutils.default_terminal_width)
1936
2035
 
1937
 
    def test_defaults_to_BZR_COLUMNS(self):
1938
 
        # BZR_COLUMNS is set by the test framework
1939
 
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1940
 
        os.environ['BZR_COLUMNS'] = '12'
 
2036
    def test_defaults_to_BRZ_COLUMNS(self):
 
2037
        # BRZ_COLUMNS is set by the test framework
 
2038
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
 
2039
        self.overrideEnv('BRZ_COLUMNS', '12')
1941
2040
        self.assertEqual(12, osutils.terminal_width())
1942
2041
 
 
2042
    def test_BRZ_COLUMNS_0_no_limit(self):
 
2043
        self.overrideEnv('BRZ_COLUMNS', '0')
 
2044
        self.assertEqual(None, osutils.terminal_width())
 
2045
 
1943
2046
    def test_falls_back_to_COLUMNS(self):
1944
 
        del os.environ['BZR_COLUMNS']
 
2047
        self.overrideEnv('BRZ_COLUMNS', None)
1945
2048
        self.assertNotEqual('42', os.environ['COLUMNS'])
1946
2049
        self.set_fake_tty()
1947
 
        os.environ['COLUMNS'] = '42'
 
2050
        self.overrideEnv('COLUMNS', '42')
1948
2051
        self.assertEqual(42, osutils.terminal_width())
1949
2052
 
1950
2053
    def test_tty_default_without_columns(self):
1951
 
        del os.environ['BZR_COLUMNS']
1952
 
        del os.environ['COLUMNS']
 
2054
        self.overrideEnv('BRZ_COLUMNS', None)
 
2055
        self.overrideEnv('COLUMNS', None)
1953
2056
 
1954
2057
        def terminal_size(w, h):
1955
2058
            return 42, 42
1962
2065
        self.assertEqual(42, osutils.terminal_width())
1963
2066
 
1964
2067
    def test_non_tty_default_without_columns(self):
1965
 
        del os.environ['BZR_COLUMNS']
1966
 
        del os.environ['COLUMNS']
 
2068
        self.overrideEnv('BRZ_COLUMNS', None)
 
2069
        self.overrideEnv('COLUMNS', None)
1967
2070
        self.replace_stdout(None)
1968
2071
        self.assertEqual(None, osutils.terminal_width())
1969
2072
 
1979
2082
        else:
1980
2083
            self.overrideAttr(termios, 'TIOCGWINSZ')
1981
2084
            del termios.TIOCGWINSZ
1982
 
        del os.environ['BZR_COLUMNS']
1983
 
        del os.environ['COLUMNS']
 
2085
        self.overrideEnv('BRZ_COLUMNS', None)
 
2086
        self.overrideEnv('COLUMNS', None)
1984
2087
        # Whatever the result is, if we don't raise an exception, it's ok.
1985
2088
        osutils.terminal_width()
1986
2089
 
 
2090
 
1987
2091
class TestCreationOps(tests.TestCaseInTempDir):
1988
2092
    _test_needs_features = [features.chown_feature]
1989
2093
 
1990
2094
    def setUp(self):
1991
 
        tests.TestCaseInTempDir.setUp(self)
 
2095
        super(TestCreationOps, self).setUp()
1992
2096
        self.overrideAttr(os, 'chown', self._dummy_chown)
1993
2097
 
1994
2098
        # params set by call to _dummy_chown
2004
2108
        osutils.copy_ownership_from_path('test_file', ownsrc)
2005
2109
 
2006
2110
        s = os.stat(ownsrc)
2007
 
        self.assertEquals(self.path, 'test_file')
2008
 
        self.assertEquals(self.uid, s.st_uid)
2009
 
        self.assertEquals(self.gid, s.st_gid)
 
2111
        self.assertEqual(self.path, 'test_file')
 
2112
        self.assertEqual(self.uid, s.st_uid)
 
2113
        self.assertEqual(self.gid, s.st_gid)
2010
2114
 
2011
2115
    def test_copy_ownership_nonesrc(self):
2012
2116
        """copy_ownership_from_path test with src=None."""
2015
2119
        osutils.copy_ownership_from_path('test_file')
2016
2120
 
2017
2121
        s = os.stat('..')
2018
 
        self.assertEquals(self.path, 'test_file')
2019
 
        self.assertEquals(self.uid, s.st_uid)
2020
 
        self.assertEquals(self.gid, s.st_gid)
 
2122
        self.assertEqual(self.path, 'test_file')
 
2123
        self.assertEqual(self.uid, s.st_uid)
 
2124
        self.assertEqual(self.gid, s.st_gid)
 
2125
 
 
2126
 
 
2127
class TestPathFromEnviron(tests.TestCase):
 
2128
 
 
2129
    def test_is_unicode(self):
 
2130
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
 
2131
        path = osutils.path_from_environ('BRZ_TEST_PATH')
 
2132
        self.assertIsInstance(path, text_type)
 
2133
        self.assertEqual(u'./anywhere at all/', path)
 
2134
 
 
2135
    def test_posix_path_env_ascii(self):
 
2136
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
 
2137
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
 
2138
        self.assertIsInstance(home, text_type)
 
2139
        self.assertEqual(u'/tmp', home)
 
2140
 
 
2141
    def test_posix_path_env_unicode(self):
 
2142
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2143
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
 
2144
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2145
        self.assertEqual(u'/home/\xa7test',
 
2146
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2147
        osutils._fs_enc = "iso8859-5"
 
2148
        self.assertEqual(u'/home/\u0407test',
 
2149
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2150
        osutils._fs_enc = "utf-8"
 
2151
        self.assertRaises(errors.BadFilenameEncoding,
 
2152
            osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
 
2153
 
 
2154
 
 
2155
class TestGetHomeDir(tests.TestCase):
 
2156
 
 
2157
    def test_is_unicode(self):
 
2158
        home = osutils._get_home_dir()
 
2159
        self.assertIsInstance(home, text_type)
 
2160
 
 
2161
    def test_posix_homeless(self):
 
2162
        self.overrideEnv('HOME', None)
 
2163
        home = osutils._get_home_dir()
 
2164
        self.assertIsInstance(home, text_type)
 
2165
 
 
2166
    def test_posix_home_ascii(self):
 
2167
        self.overrideEnv('HOME', '/home/test')
 
2168
        home = osutils._posix_get_home_dir()
 
2169
        self.assertIsInstance(home, text_type)
 
2170
        self.assertEqual(u'/home/test', home)
 
2171
 
 
2172
    def test_posix_home_unicode(self):
 
2173
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2174
        self.overrideEnv('HOME', '/home/\xa7test')
 
2175
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2176
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2177
        osutils._fs_enc = "iso8859-5"
 
2178
        self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
 
2179
        osutils._fs_enc = "utf-8"
 
2180
        self.assertRaises(errors.BadFilenameEncoding,
 
2181
            osutils._posix_get_home_dir)
 
2182
 
 
2183
 
 
2184
class TestGetuserUnicode(tests.TestCase):
 
2185
 
 
2186
    def test_is_unicode(self):
 
2187
        user = osutils.getuser_unicode()
 
2188
        self.assertIsInstance(user, text_type)
 
2189
 
 
2190
    def envvar_to_override(self):
 
2191
        if sys.platform == "win32":
 
2192
            # Disable use of platform calls on windows so envvar is used
 
2193
            self.overrideAttr(win32utils, 'has_ctypes', False)
 
2194
            return 'USERNAME' # only variable used on windows
 
2195
        return 'LOGNAME' # first variable checked by getpass.getuser()
 
2196
 
 
2197
    def test_ascii_user(self):
 
2198
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
 
2199
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
 
2200
 
 
2201
    def test_unicode_user(self):
 
2202
        ue = osutils.get_user_encoding()
 
2203
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
2204
        if uni_val is None:
 
2205
            raise tests.TestSkipped(
 
2206
                'Cannot find a unicode character that works in encoding %s'
 
2207
                % (osutils.get_user_encoding(),))
 
2208
        uni_username = u'jrandom' + uni_val
 
2209
        encoded_username = uni_username.encode(ue)
 
2210
        self.overrideEnv(self.envvar_to_override(), encoded_username)
 
2211
        self.assertEqual(uni_username, osutils.getuser_unicode())
 
2212
 
 
2213
 
 
2214
class TestBackupNames(tests.TestCase):
 
2215
 
 
2216
    def setUp(self):
 
2217
        super(TestBackupNames, self).setUp()
 
2218
        self.backups = []
 
2219
 
 
2220
    def backup_exists(self, name):
 
2221
        return name in self.backups
 
2222
 
 
2223
    def available_backup_name(self, name):
 
2224
        backup_name = osutils.available_backup_name(name, self.backup_exists)
 
2225
        self.backups.append(backup_name)
 
2226
        return backup_name
 
2227
 
 
2228
    def assertBackupName(self, expected, name):
 
2229
        self.assertEqual(expected, self.available_backup_name(name))
 
2230
 
 
2231
    def test_empty(self):
 
2232
        self.assertBackupName('file.~1~', 'file')
 
2233
 
 
2234
    def test_existing(self):
 
2235
        self.available_backup_name('file')
 
2236
        self.available_backup_name('file')
 
2237
        self.assertBackupName('file.~3~', 'file')
 
2238
        # Empty slots are found, this is not a strict requirement and may be
 
2239
        # revisited if we test against all implementations.
 
2240
        self.backups.remove('file.~2~')
 
2241
        self.assertBackupName('file.~2~', 'file')
 
2242
 
 
2243
 
 
2244
class TestFindExecutableInPath(tests.TestCase):
 
2245
 
 
2246
    def test_windows(self):
 
2247
        if sys.platform != 'win32':
 
2248
            raise tests.TestSkipped('test requires win32')
 
2249
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
 
2250
        self.assertTrue(
 
2251
            osutils.find_executable_on_path('explorer.exe') is not None)
 
2252
        self.assertTrue(
 
2253
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
 
2254
        self.assertTrue(
 
2255
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2256
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
 
2257
        
 
2258
    def test_windows_app_path(self):
 
2259
        if sys.platform != 'win32':
 
2260
            raise tests.TestSkipped('test requires win32')
 
2261
        # Override PATH env var so that exe can only be found on App Path
 
2262
        self.overrideEnv('PATH', '')
 
2263
        # Internt Explorer is always registered in the App Path
 
2264
        self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
 
2265
 
 
2266
    def test_other(self):
 
2267
        if sys.platform == 'win32':
 
2268
            raise tests.TestSkipped('test requires non-win32')
 
2269
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
 
2270
        self.assertTrue(
 
2271
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2272
 
 
2273
 
 
2274
class TestEnvironmentErrors(tests.TestCase):
 
2275
    """Test handling of environmental errors"""
 
2276
 
 
2277
    def test_is_oserror(self):
 
2278
        self.assertTrue(osutils.is_environment_error(
 
2279
            OSError(errno.EINVAL, "Invalid parameter")))
 
2280
 
 
2281
    def test_is_ioerror(self):
 
2282
        self.assertTrue(osutils.is_environment_error(
 
2283
            IOError(errno.EINVAL, "Invalid parameter")))
 
2284
 
 
2285
    def test_is_socket_error(self):
 
2286
        self.assertTrue(osutils.is_environment_error(
 
2287
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2288
 
 
2289
    def test_is_select_error(self):
 
2290
        self.assertTrue(osutils.is_environment_error(
 
2291
            select.error(errno.EINVAL, "Invalid parameter")))
 
2292
 
 
2293
    def test_is_pywintypes_error(self):
 
2294
        self.requireFeature(features.pywintypes)
 
2295
        import pywintypes
 
2296
        self.assertTrue(osutils.is_environment_error(
 
2297
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))