/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_osutils.py

  • Committer: Breezy landing bot
  • Author(s): Jelmer Vernooij
  • Date: 2017-06-02 11:26:27 UTC
  • mfrom: (6621.27.5 1089352-sni-support)
  • Revision ID: breezy.the.bot@gmail.com-20170602112627-jbvjcm9czx7gt3gb
Add SNI support.

Merged from https://code.launchpad.net/~jelmer/brz/sni-support/+merge/324979

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
 
from cStringIO import StringIO
 
19
from __future__ import absolute_import, division
 
20
 
20
21
import errno
21
22
import os
22
23
import re
 
24
import select
23
25
import socket
24
 
import stat
25
26
import sys
 
27
import tempfile
26
28
import time
27
29
 
28
 
from bzrlib import (
 
30
from .. import (
29
31
    errors,
 
32
    lazy_regex,
30
33
    osutils,
 
34
    symbol_versioning,
31
35
    tests,
32
36
    trace,
33
37
    win32utils,
34
38
    )
35
 
from bzrlib.tests import (
 
39
from ..sixish import (
 
40
    BytesIO,
 
41
    )
 
42
from . import (
36
43
    features,
37
44
    file_utils,
38
45
    test__walkdirs_win32,
39
46
    )
40
 
 
41
 
 
42
 
class _UTF8DirReaderFeature(tests.Feature):
 
47
from .scenarios import load_tests_apply_scenarios
 
48
 
 
49
 
 
50
class _UTF8DirReaderFeature(features.Feature):
43
51
 
44
52
    def _probe(self):
45
53
        try:
46
 
            from bzrlib import _readdir_pyx
 
54
            from .. import _readdir_pyx
47
55
            self.reader = _readdir_pyx.UTF8DirReader
48
56
            return True
49
57
        except ImportError:
50
58
            return False
51
59
 
52
60
    def feature_name(self):
53
 
        return 'bzrlib._readdir_pyx'
54
 
 
55
 
UTF8DirReaderFeature = _UTF8DirReaderFeature()
56
 
 
57
 
term_ios_feature = tests.ModuleAvailableFeature('termios')
 
61
        return 'breezy._readdir_pyx'
 
62
 
 
63
UTF8DirReaderFeature = features.ModuleAvailableFeature('breezy._readdir_pyx')
 
64
 
 
65
term_ios_feature = features.ModuleAvailableFeature('termios')
58
66
 
59
67
 
60
68
def _already_unicode(s):
78
86
    # Some DirReaders are platform specific and even there they may not be
79
87
    # available.
80
88
    if UTF8DirReaderFeature.available():
81
 
        from bzrlib import _readdir_pyx
 
89
        from .. import _readdir_pyx
82
90
        scenarios.append(('utf8',
83
91
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
84
92
                               _native_to_unicode=_utf8_to_unicode)))
85
93
 
86
94
    if test__walkdirs_win32.win32_readdir_feature.available():
87
95
        try:
88
 
            from bzrlib import _walkdirs_win32
 
96
            from .. import _walkdirs_win32
89
97
            scenarios.append(
90
98
                ('win32',
91
99
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
95
103
    return scenarios
96
104
 
97
105
 
98
 
def load_tests(basic_tests, module, loader):
99
 
    suite = loader.suiteClass()
100
 
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
101
 
        basic_tests, tests.condition_isinstance(TestDirReader))
102
 
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
103
 
    suite.addTest(remaining_tests)
104
 
    return suite
 
106
load_tests = load_tests_apply_scenarios
105
107
 
106
108
 
107
109
class TestContainsWhitespace(tests.TestCase):
108
110
 
109
111
    def test_contains_whitespace(self):
110
 
        self.failUnless(osutils.contains_whitespace(u' '))
111
 
        self.failUnless(osutils.contains_whitespace(u'hello there'))
112
 
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
113
 
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
114
 
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
115
 
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
112
        self.assertTrue(osutils.contains_whitespace(u' '))
 
113
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
 
114
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
 
115
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
 
116
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
 
117
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
116
118
 
117
119
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
118
120
        # is whitespace, but we do not.
119
 
        self.failIf(osutils.contains_whitespace(u''))
120
 
        self.failIf(osutils.contains_whitespace(u'hellothere'))
121
 
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
121
        self.assertFalse(osutils.contains_whitespace(u''))
 
122
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
 
123
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
122
124
 
123
125
 
124
126
class TestRename(tests.TestCaseInTempDir):
138
140
        # This should work everywhere
139
141
        self.create_file('a', 'something in a\n')
140
142
        self._fancy_rename('a', 'b')
141
 
        self.failIfExists('a')
142
 
        self.failUnlessExists('b')
 
143
        self.assertPathDoesNotExist('a')
 
144
        self.assertPathExists('b')
143
145
        self.check_file_contents('b', 'something in a\n')
144
146
 
145
147
        self.create_file('a', 'new something in a\n')
152
154
        self.create_file('target', 'data in target\n')
153
155
        self.assertRaises((IOError, OSError), self._fancy_rename,
154
156
                          'missingsource', 'target')
155
 
        self.failUnlessExists('target')
 
157
        self.assertPathExists('target')
156
158
        self.check_file_contents('target', 'data in target\n')
157
159
 
158
160
    def test_fancy_rename_fails_if_source_and_target_missing(self):
163
165
        # Rename should be semi-atomic on all platforms
164
166
        self.create_file('a', 'something in a\n')
165
167
        osutils.rename('a', 'b')
166
 
        self.failIfExists('a')
167
 
        self.failUnlessExists('b')
 
168
        self.assertPathDoesNotExist('a')
 
169
        self.assertPathExists('b')
168
170
        self.check_file_contents('b', 'something in a\n')
169
171
 
170
172
        self.create_file('a', 'new something in a\n')
182
184
        # we can't use failUnlessExists on case-insensitive filesystem
183
185
        # so try to check shape of the tree
184
186
        shape = sorted(os.listdir('.'))
185
 
        self.assertEquals(['A', 'B'], shape)
 
187
        self.assertEqual(['A', 'B'], shape)
186
188
 
187
 
    def test_rename_error(self):
188
 
        # We wrap os.rename to make it give an error including the filenames
189
 
        # https://bugs.launchpad.net/bzr/+bug/491763
190
 
        err = self.assertRaises(OSError, osutils.rename,
191
 
            'nonexistent', 'target')
192
 
        self.assertContainsString(str(err), 'nonexistent')
 
189
    def test_rename_exception(self):
 
190
        try:
 
191
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
 
192
        except OSError as e:
 
193
            self.assertEqual(e.old_filename, 'nonexistent_path')
 
194
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
 
195
            self.assertTrue('nonexistent_path' in e.strerror)
 
196
            self.assertTrue('different_nonexistent_path' in e.strerror)
193
197
 
194
198
 
195
199
class TestRandChars(tests.TestCase):
222
226
                         (['src'], SRC_FOO_C),
223
227
                         (['src'], 'src'),
224
228
                         ]:
225
 
            self.assert_(osutils.is_inside_any(dirs, fn))
 
229
            self.assertTrue(osutils.is_inside_any(dirs, fn))
226
230
        for dirs, fn in [(['src'], 'srccontrol'),
227
231
                         (['src'], 'srccontrol/foo')]:
228
232
            self.assertFalse(osutils.is_inside_any(dirs, fn))
234
238
                         (['src/bar.c', 'bla/foo.c'], 'src'),
235
239
                         (['src'], 'src'),
236
240
                         ]:
237
 
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
 
241
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
238
242
 
239
243
        for dirs, fn in [(['src'], 'srccontrol'),
240
244
                         (['srccontrol/foo.c'], 'src'),
242
246
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
243
247
 
244
248
 
 
249
class TestLstat(tests.TestCaseInTempDir):
 
250
 
 
251
    def test_lstat_matches_fstat(self):
 
252
        # On Windows, lstat and fstat don't always agree, primarily in the
 
253
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
 
254
        # custom implementation.
 
255
        if sys.platform == 'win32':
 
256
            # We only have special lstat/fstat if we have the extension.
 
257
            # Without it, we may end up re-reading content when we don't have
 
258
            # to, but otherwise it doesn't effect correctness.
 
259
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
260
        f = open('test-file.txt', 'wb')
 
261
        self.addCleanup(f.close)
 
262
        f.write('some content\n')
 
263
        f.flush()
 
264
        self.assertEqualStat(osutils.fstat(f.fileno()),
 
265
                             osutils.lstat('test-file.txt'))
 
266
 
 
267
 
245
268
class TestRmTree(tests.TestCaseInTempDir):
246
269
 
247
270
    def test_rmtree(self):
259
282
 
260
283
        osutils.rmtree('dir')
261
284
 
262
 
        self.failIfExists('dir/file')
263
 
        self.failIfExists('dir')
 
285
        self.assertPathDoesNotExist('dir/file')
 
286
        self.assertPathDoesNotExist('dir')
264
287
 
265
288
 
266
289
class TestDeleteAny(tests.TestCaseInTempDir):
279
302
 
280
303
    def test_file_kind(self):
281
304
        self.build_tree(['file', 'dir/'])
282
 
        self.assertEquals('file', osutils.file_kind('file'))
283
 
        self.assertEquals('directory', osutils.file_kind('dir/'))
 
305
        self.assertEqual('file', osutils.file_kind('file'))
 
306
        self.assertEqual('directory', osutils.file_kind('dir/'))
284
307
        if osutils.has_symlinks():
285
308
            os.symlink('symlink', 'symlink')
286
 
            self.assertEquals('symlink', osutils.file_kind('symlink'))
 
309
            self.assertEqual('symlink', osutils.file_kind('symlink'))
287
310
 
288
311
        # TODO: jam 20060529 Test a block device
289
312
        try:
290
313
            os.lstat('/dev/null')
291
 
        except OSError, e:
 
314
        except OSError as e:
292
315
            if e.errno not in (errno.ENOENT,):
293
316
                raise
294
317
        else:
295
 
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
 
318
            self.assertEqual('chardev', osutils.file_kind('/dev/null'))
296
319
 
297
320
        mkfifo = getattr(os, 'mkfifo', None)
298
321
        if mkfifo:
299
322
            mkfifo('fifo')
300
323
            try:
301
 
                self.assertEquals('fifo', osutils.file_kind('fifo'))
 
324
                self.assertEqual('fifo', osutils.file_kind('fifo'))
302
325
            finally:
303
326
                os.remove('fifo')
304
327
 
307
330
            s = socket.socket(AF_UNIX)
308
331
            s.bind('socket')
309
332
            try:
310
 
                self.assertEquals('socket', osutils.file_kind('socket'))
 
333
                self.assertEqual('socket', osutils.file_kind('socket'))
311
334
            finally:
312
335
                os.remove('socket')
313
336
 
332
355
 
333
356
        orig_umask = osutils.get_umask()
334
357
        self.addCleanup(os.umask, orig_umask)
335
 
        os.umask(0222)
336
 
        self.assertEqual(0222, osutils.get_umask())
337
 
        os.umask(0022)
338
 
        self.assertEqual(0022, osutils.get_umask())
339
 
        os.umask(0002)
340
 
        self.assertEqual(0002, osutils.get_umask())
341
 
        os.umask(0027)
342
 
        self.assertEqual(0027, osutils.get_umask())
 
358
        os.umask(0o222)
 
359
        self.assertEqual(0o222, osutils.get_umask())
 
360
        os.umask(0o022)
 
361
        self.assertEqual(0o022, osutils.get_umask())
 
362
        os.umask(0o002)
 
363
        self.assertEqual(0o002, osutils.get_umask())
 
364
        os.umask(0o027)
 
365
        self.assertEqual(0o027, osutils.get_umask())
343
366
 
344
367
 
345
368
class TestDateTime(tests.TestCase):
418
441
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
419
442
 
420
443
 
 
444
class TestFdatasync(tests.TestCaseInTempDir):
 
445
 
 
446
    def do_fdatasync(self):
 
447
        f = tempfile.NamedTemporaryFile()
 
448
        osutils.fdatasync(f.fileno())
 
449
        f.close()
 
450
 
 
451
    @staticmethod
 
452
    def raise_eopnotsupp(*args, **kwargs):
 
453
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
 
454
 
 
455
    @staticmethod
 
456
    def raise_enotsup(*args, **kwargs):
 
457
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
 
458
 
 
459
    def test_fdatasync_handles_system_function(self):
 
460
        self.overrideAttr(os, "fdatasync")
 
461
        self.do_fdatasync()
 
462
 
 
463
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
 
464
        self.overrideAttr(os, "fdatasync")
 
465
        self.overrideAttr(os, "fsync")
 
466
        self.do_fdatasync()
 
467
 
 
468
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
 
469
        self.overrideAttr(errno, "EOPNOTSUPP")
 
470
        self.do_fdatasync()
 
471
 
 
472
    def test_fdatasync_catches_ENOTSUP(self):
 
473
        enotsup = getattr(errno, "ENOTSUP", None)
 
474
        if enotsup is None:
 
475
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
 
476
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
 
477
        self.do_fdatasync()
 
478
 
 
479
    def test_fdatasync_catches_EOPNOTSUPP(self):
 
480
        enotsup = getattr(errno, "EOPNOTSUPP", None)
 
481
        if enotsup is None:
 
482
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
 
483
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
 
484
        self.do_fdatasync()
 
485
 
 
486
 
421
487
class TestLinks(tests.TestCaseInTempDir):
422
488
 
423
489
    def test_dereference_path(self):
424
 
        self.requireFeature(tests.SymlinkFeature)
 
490
        self.requireFeature(features.SymlinkFeature)
425
491
        cwd = osutils.realpath('.')
426
492
        os.mkdir('bar')
427
493
        bar_path = osutils.pathjoin(cwd, 'bar')
455
521
        # Make a file readonly
456
522
        osutils.make_readonly('file')
457
523
        mode = os.lstat('file').st_mode
458
 
        self.assertEqual(mode, mode & 0777555)
 
524
        self.assertEqual(mode, mode & 0o777555)
459
525
 
460
526
        # Make a file writable
461
527
        osutils.make_writable('file')
462
528
        mode = os.lstat('file').st_mode
463
 
        self.assertEqual(mode, mode | 0200)
 
529
        self.assertEqual(mode, mode | 0o200)
464
530
 
465
531
        if osutils.has_symlinks():
466
532
            # should not error when handed a symlink
474
540
 
475
541
class TestCanonicalRelPath(tests.TestCaseInTempDir):
476
542
 
477
 
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
 
543
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
478
544
 
479
545
    def test_canonical_relpath_simple(self):
480
546
        f = file('MixedCaseName', 'w')
481
547
        f.close()
482
548
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
483
 
        self.failUnlessEqual('work/MixedCaseName', actual)
 
549
        self.assertEqual('work/MixedCaseName', actual)
484
550
 
485
551
    def test_canonical_relpath_missing_tail(self):
486
552
        os.mkdir('MixedCaseParent')
487
553
        actual = osutils.canonical_relpath(self.test_base_dir,
488
554
                                           'mixedcaseparent/nochild')
489
 
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
 
555
        self.assertEqual('work/MixedCaseParent/nochild', actual)
490
556
 
491
557
 
492
558
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
536
602
    """Test pumpfile method."""
537
603
 
538
604
    def setUp(self):
539
 
        tests.TestCase.setUp(self)
 
605
        super(TestPumpFile, self).setUp()
540
606
        # create a test datablock
541
607
        self.block_size = 512
542
 
        pattern = '0123456789ABCDEF'
543
 
        self.test_data = pattern * (3 * self.block_size / len(pattern))
 
608
        pattern = b'0123456789ABCDEF'
 
609
        self.test_data = pattern * (3 * self.block_size // len(pattern))
544
610
        self.test_data_len = len(self.test_data)
545
611
 
546
612
    def test_bracket_block_size(self):
550
616
        self.assertTrue(self.test_data_len > self.block_size)
551
617
 
552
618
        from_file = file_utils.FakeReadFile(self.test_data)
553
 
        to_file = StringIO()
 
619
        to_file = BytesIO()
554
620
 
555
 
        # read (max / 2) bytes and verify read size wasn't affected
556
 
        num_bytes_to_read = self.block_size / 2
 
621
        # read (max // 2) bytes and verify read size wasn't affected
 
622
        num_bytes_to_read = self.block_size // 2
557
623
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
558
624
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
559
625
        self.assertEqual(from_file.get_read_count(), 1)
591
657
 
592
658
        # retrieve data in blocks
593
659
        from_file = file_utils.FakeReadFile(self.test_data)
594
 
        to_file = StringIO()
 
660
        to_file = BytesIO()
595
661
        osutils.pumpfile(from_file, to_file, self.test_data_len,
596
662
                         self.block_size)
597
663
 
615
681
 
616
682
        # retrieve data to EOF
617
683
        from_file = file_utils.FakeReadFile(self.test_data)
618
 
        to_file = StringIO()
 
684
        to_file = BytesIO()
619
685
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
620
686
 
621
687
        # verify read size was equal to the maximum read size
635
701
        with this new version."""
636
702
        # retrieve data using default (old) pumpfile method
637
703
        from_file = file_utils.FakeReadFile(self.test_data)
638
 
        to_file = StringIO()
 
704
        to_file = BytesIO()
639
705
        osutils.pumpfile(from_file, to_file)
640
706
 
641
707
        # report error if the data wasn't equal (we only report the size due
649
715
        activity = []
650
716
        def log_activity(length, direction):
651
717
            activity.append((length, direction))
652
 
        from_file = StringIO(self.test_data)
653
 
        to_file = StringIO()
 
718
        from_file = BytesIO(self.test_data)
 
719
        to_file = BytesIO()
654
720
        osutils.pumpfile(from_file, to_file, buff_size=500,
655
721
                         report_activity=log_activity, direction='read')
656
722
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
657
723
                          (36, 'read')], activity)
658
724
 
659
 
        from_file = StringIO(self.test_data)
660
 
        to_file = StringIO()
 
725
        from_file = BytesIO(self.test_data)
 
726
        to_file = BytesIO()
661
727
        del activity[:]
662
728
        osutils.pumpfile(from_file, to_file, buff_size=500,
663
729
                         report_activity=log_activity, direction='write')
665
731
                          (36, 'write')], activity)
666
732
 
667
733
        # And with a limited amount of data
668
 
        from_file = StringIO(self.test_data)
669
 
        to_file = StringIO()
 
734
        from_file = BytesIO(self.test_data)
 
735
        to_file = BytesIO()
670
736
        del activity[:]
671
737
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
672
738
                         report_activity=log_activity, direction='read')
677
743
class TestPumpStringFile(tests.TestCase):
678
744
 
679
745
    def test_empty(self):
680
 
        output = StringIO()
681
 
        osutils.pump_string_file("", output)
682
 
        self.assertEqual("", output.getvalue())
 
746
        output = BytesIO()
 
747
        osutils.pump_string_file(b"", output)
 
748
        self.assertEqual(b"", output.getvalue())
683
749
 
684
750
    def test_more_than_segment_size(self):
685
 
        output = StringIO()
686
 
        osutils.pump_string_file("123456789", output, 2)
687
 
        self.assertEqual("123456789", output.getvalue())
 
751
        output = BytesIO()
 
752
        osutils.pump_string_file(b"123456789", output, 2)
 
753
        self.assertEqual(b"123456789", output.getvalue())
688
754
 
689
755
    def test_segment_size(self):
690
 
        output = StringIO()
691
 
        osutils.pump_string_file("12", output, 2)
692
 
        self.assertEqual("12", output.getvalue())
 
756
        output = BytesIO()
 
757
        osutils.pump_string_file(b"12", output, 2)
 
758
        self.assertEqual(b"12", output.getvalue())
693
759
 
694
760
    def test_segment_size_multiple(self):
695
 
        output = StringIO()
696
 
        osutils.pump_string_file("1234", output, 2)
697
 
        self.assertEqual("1234", output.getvalue())
 
761
        output = BytesIO()
 
762
        osutils.pump_string_file(b"1234", output, 2)
 
763
        self.assertEqual(b"1234", output.getvalue())
698
764
 
699
765
 
700
766
class TestRelpath(tests.TestCase):
763
829
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
764
830
 
765
831
    def test_from_unicode_string_ascii_contents(self):
766
 
        self.assertEqual('bargam',
767
 
                         osutils.safe_revision_id(u'bargam', warn=False))
768
 
 
769
 
    def test_from_unicode_deprecated(self):
770
 
        self.assertEqual('bargam',
771
 
            self.callDeprecated([osutils._revision_id_warning],
772
 
                                osutils.safe_revision_id, u'bargam'))
 
832
        self.assertRaises(TypeError,
 
833
                          osutils.safe_revision_id, u'bargam')
773
834
 
774
835
    def test_from_unicode_string_unicode_contents(self):
775
 
        self.assertEqual('bargam\xc2\xae',
776
 
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
836
        self.assertRaises(TypeError,
 
837
                         osutils.safe_revision_id, u'bargam\xae')
777
838
 
778
839
    def test_from_utf8_string(self):
779
840
        self.assertEqual('foo\xc2\xae',
790
851
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
791
852
 
792
853
    def test_from_unicode_string_ascii_contents(self):
793
 
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
794
 
 
795
 
    def test_from_unicode_deprecated(self):
796
 
        self.assertEqual('bargam',
797
 
            self.callDeprecated([osutils._file_id_warning],
798
 
                                osutils.safe_file_id, u'bargam'))
 
854
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
799
855
 
800
856
    def test_from_unicode_string_unicode_contents(self):
801
 
        self.assertEqual('bargam\xc2\xae',
802
 
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
857
        self.assertRaises(TypeError,
 
858
                          osutils.safe_file_id, u'bargam\xae')
803
859
 
804
860
    def test_from_utf8_string(self):
805
861
        self.assertEqual('foo\xc2\xae',
810
866
        self.assertEqual(None, osutils.safe_file_id(None))
811
867
 
812
868
 
 
869
class TestSendAll(tests.TestCase):
 
870
 
 
871
    def test_send_with_disconnected_socket(self):
 
872
        class DisconnectedSocket(object):
 
873
            def __init__(self, err):
 
874
                self.err = err
 
875
            def send(self, content):
 
876
                raise self.err
 
877
            def close(self):
 
878
                pass
 
879
        # All of these should be treated as ConnectionReset
 
880
        errs = []
 
881
        for err_cls in (IOError, socket.error):
 
882
            for errnum in osutils._end_of_stream_errors:
 
883
                errs.append(err_cls(errnum))
 
884
        for err in errs:
 
885
            sock = DisconnectedSocket(err)
 
886
            self.assertRaises(errors.ConnectionReset,
 
887
                osutils.send_all, sock, b'some more content')
 
888
 
 
889
    def test_send_with_no_progress(self):
 
890
        # See https://bugs.launchpad.net/bzr/+bug/1047309
 
891
        # It seems that paramiko can get into a state where it doesn't error,
 
892
        # but it returns 0 bytes sent for requests over and over again.
 
893
        class NoSendingSocket(object):
 
894
            def __init__(self):
 
895
                self.call_count = 0
 
896
            def send(self, bytes):
 
897
                self.call_count += 1
 
898
                if self.call_count > 100:
 
899
                    # Prevent the test suite from hanging
 
900
                    raise RuntimeError('too many calls')
 
901
                return 0
 
902
        sock = NoSendingSocket()
 
903
        self.assertRaises(errors.ConnectionReset,
 
904
                          osutils.send_all, sock, b'content')
 
905
        self.assertEqual(1, sock.call_count)
 
906
 
 
907
 
 
908
class TestPosixFuncs(tests.TestCase):
 
909
    """Test that the posix version of normpath returns an appropriate path
 
910
       when used with 2 leading slashes."""
 
911
 
 
912
    def test_normpath(self):
 
913
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
 
914
        self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
915
        self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
916
 
 
917
 
813
918
class TestWin32Funcs(tests.TestCase):
814
919
    """Test that _win32 versions of os utilities return appropriate paths."""
815
920
 
816
921
    def test_abspath(self):
 
922
        self.requireFeature(features.win32_feature)
817
923
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
818
924
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
819
925
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
832
938
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
833
939
        self.assertEqual('path/to/foo',
834
940
                         osutils._win32_pathjoin('path/to/', 'foo'))
835
 
        self.assertEqual('/foo',
 
941
 
 
942
    def test_pathjoin_late_bugfix(self):
 
943
        if sys.version_info < (2, 7, 6):
 
944
            expected = '/foo'
 
945
        else:
 
946
            expected = 'C:/foo'
 
947
        self.assertEqual(expected,
836
948
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
837
 
        self.assertEqual('/foo',
 
949
        self.assertEqual(expected,
838
950
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
839
951
 
840
952
    def test_normpath(self):
845
957
 
846
958
    def test_getcwd(self):
847
959
        cwd = osutils._win32_getcwd()
848
 
        os_cwd = os.getcwdu()
 
960
        os_cwd = osutils._getcwd()
849
961
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
850
962
        # win32 is inconsistent whether it returns lower or upper case
851
963
        # and even if it was consistent the user might type the other
860
972
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
861
973
 
862
974
    def test_win98_abspath(self):
 
975
        self.requireFeature(features.win32_feature)
863
976
        # absolute path
864
977
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
865
978
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
868
981
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
869
982
        # relative path
870
983
        cwd = osutils.getcwd().rstrip('/')
871
 
        drive = osutils._nt_splitdrive(cwd)[0]
 
984
        drive = osutils.ntpath.splitdrive(cwd)[0]
872
985
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
873
986
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
874
987
        # unicode path
880
993
    """Test win32 functions that create files."""
881
994
 
882
995
    def test_getcwd(self):
883
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
996
        self.requireFeature(features.UnicodeFilenameFeature)
884
997
        os.mkdir(u'mu-\xb5')
885
998
        os.chdir(u'mu-\xb5')
886
999
        # TODO: jam 20060427 This will probably fail on Mac OSX because
892
1005
    def test_minimum_path_selection(self):
893
1006
        self.assertEqual(set(),
894
1007
            osutils.minimum_path_selection([]))
895
 
        self.assertEqual(set(['a']),
 
1008
        self.assertEqual({'a'},
896
1009
            osutils.minimum_path_selection(['a']))
897
 
        self.assertEqual(set(['a', 'b']),
 
1010
        self.assertEqual({'a', 'b'},
898
1011
            osutils.minimum_path_selection(['a', 'b']))
899
 
        self.assertEqual(set(['a/', 'b']),
 
1012
        self.assertEqual({'a/', 'b'},
900
1013
            osutils.minimum_path_selection(['a/', 'b']))
901
 
        self.assertEqual(set(['a/', 'b']),
 
1014
        self.assertEqual({'a/', 'b'},
902
1015
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
903
 
        self.assertEqual(set(['a-b', 'a', 'a0b']),
 
1016
        self.assertEqual({'a-b', 'a', 'a0b'},
904
1017
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
905
1018
 
906
1019
    def test_mkdtemp(self):
916
1029
        b.close()
917
1030
 
918
1031
        osutils._win32_rename('b', 'a')
919
 
        self.failUnlessExists('a')
920
 
        self.failIfExists('b')
 
1032
        self.assertPathExists('a')
 
1033
        self.assertPathDoesNotExist('b')
921
1034
        self.assertFileEqual('baz\n', 'a')
922
1035
 
923
1036
    def test_rename_missing_file(self):
927
1040
 
928
1041
        try:
929
1042
            osutils._win32_rename('b', 'a')
930
 
        except (IOError, OSError), e:
 
1043
        except (IOError, OSError) as e:
931
1044
            self.assertEqual(errno.ENOENT, e.errno)
932
1045
        self.assertFileEqual('foo\n', 'a')
933
1046
 
935
1048
        os.mkdir('a')
936
1049
        try:
937
1050
            osutils._win32_rename('b', 'a')
938
 
        except (IOError, OSError), e:
 
1051
        except (IOError, OSError) as e:
939
1052
            self.assertEqual(errno.ENOENT, e.errno)
940
1053
 
941
1054
    def test_rename_current_dir(self):
947
1060
        # doesn't exist.
948
1061
        try:
949
1062
            osutils._win32_rename('b', '.')
950
 
        except (IOError, OSError), e:
 
1063
        except (IOError, OSError) as e:
951
1064
            self.assertEqual(errno.ENOENT, e.errno)
952
1065
 
953
1066
    def test_splitpath(self):
976
1089
    """Test mac special functions that require directories."""
977
1090
 
978
1091
    def test_getcwd(self):
979
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1092
        self.requireFeature(features.UnicodeFilenameFeature)
980
1093
        os.mkdir(u'B\xe5gfors')
981
1094
        os.chdir(u'B\xe5gfors')
982
1095
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
983
1096
 
984
1097
    def test_getcwd_nonnorm(self):
985
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1098
        self.requireFeature(features.UnicodeFilenameFeature)
986
1099
        # Test that _mac_getcwd() will normalize this path
987
1100
        os.mkdir(u'Ba\u030agfors')
988
1101
        os.chdir(u'Ba\u030agfors')
998
1111
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
999
1112
 
1000
1113
    def test_osutils_binding(self):
1001
 
        from bzrlib.tests import test__chunks_to_lines
 
1114
        from . import test__chunks_to_lines
1002
1115
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
1003
 
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
 
1116
            from .._chunks_to_lines_pyx import chunks_to_lines
1004
1117
        else:
1005
 
            from bzrlib._chunks_to_lines_py import chunks_to_lines
 
1118
            from .._chunks_to_lines_py import chunks_to_lines
1006
1119
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1007
1120
 
1008
1121
 
1071
1184
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
1072
1185
 
1073
1186
    def test_walkdirs_os_error(self):
1074
 
        # <https://bugs.edge.launchpad.net/bzr/+bug/338653>
 
1187
        # <https://bugs.launchpad.net/bzr/+bug/338653>
1075
1188
        # Pyrex readdir didn't raise useful messages if it had an error
1076
1189
        # reading the directory
1077
1190
        if sys.platform == 'win32':
1078
1191
            raise tests.TestNotApplicable(
1079
1192
                "readdir IOError not tested on win32")
 
1193
        self.requireFeature(features.not_running_as_root)
1080
1194
        os.mkdir("test-unreadable")
1081
1195
        os.chmod("test-unreadable", 0000)
1082
1196
        # must chmod it back so that it can be removed
1083
 
        self.addCleanup(os.chmod, "test-unreadable", 0700)
 
1197
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
1084
1198
        # The error is not raised until the generator is actually evaluated.
1085
1199
        # (It would be ok if it happened earlier but at the moment it
1086
1200
        # doesn't.)
1087
1201
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1088
 
        self.assertEquals('./test-unreadable', e.filename)
1089
 
        self.assertEquals(errno.EACCES, e.errno)
 
1202
        self.assertEqual('./test-unreadable', e.filename)
 
1203
        self.assertEqual(errno.EACCES, e.errno)
1090
1204
        # Ensure the message contains the file name
1091
1205
        self.assertContainsRe(str(e), "\./test-unreadable")
1092
1206
 
 
1207
 
 
1208
    def test_walkdirs_encoding_error(self):
 
1209
        # <https://bugs.launchpad.net/bzr/+bug/488519>
 
1210
        # walkdirs didn't raise a useful message when the filenames
 
1211
        # are not using the filesystem's encoding
 
1212
 
 
1213
        # require a bytestring based filesystem
 
1214
        self.requireFeature(features.ByteStringNamedFilesystem)
 
1215
 
 
1216
        tree = [
 
1217
            '.bzr',
 
1218
            '0file',
 
1219
            '1dir/',
 
1220
            '1dir/0file',
 
1221
            '1dir/1dir/',
 
1222
            '1file'
 
1223
            ]
 
1224
 
 
1225
        self.build_tree(tree)
 
1226
 
 
1227
        # rename the 1file to a latin-1 filename
 
1228
        os.rename("./1file", "\xe8file")
 
1229
        if "\xe8file" not in os.listdir("."):
 
1230
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
 
1231
 
 
1232
        self._save_platform_info()
 
1233
        win32utils.winver = None # Avoid the win32 detection code
 
1234
        osutils._fs_enc = 'UTF-8'
 
1235
 
 
1236
        # this should raise on error
 
1237
        def attempt():
 
1238
            for dirdetail, dirblock in osutils.walkdirs('.'):
 
1239
                pass
 
1240
 
 
1241
        self.assertRaises(errors.BadFilenameEncoding, attempt)
 
1242
 
1093
1243
    def test__walkdirs_utf8(self):
1094
1244
        tree = [
1095
1245
            '.bzr',
1161
1311
        self.requireFeature(UTF8DirReaderFeature)
1162
1312
        self._save_platform_info()
1163
1313
        win32utils.winver = None # Avoid the win32 detection code
1164
 
        osutils._fs_enc = 'UTF-8'
1165
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1314
        osutils._fs_enc = 'utf-8'
 
1315
        self.assertDirReaderIs(
 
1316
            UTF8DirReaderFeature.module.UTF8DirReader)
1166
1317
 
1167
1318
    def test_force_walkdirs_utf8_fs_ascii(self):
1168
1319
        self.requireFeature(UTF8DirReaderFeature)
1169
1320
        self._save_platform_info()
1170
1321
        win32utils.winver = None # Avoid the win32 detection code
1171
 
        osutils._fs_enc = 'US-ASCII'
1172
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1173
 
 
1174
 
    def test_force_walkdirs_utf8_fs_ANSI(self):
1175
 
        self.requireFeature(UTF8DirReaderFeature)
1176
 
        self._save_platform_info()
1177
 
        win32utils.winver = None # Avoid the win32 detection code
1178
 
        osutils._fs_enc = 'ANSI_X3.4-1968'
1179
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1322
        osutils._fs_enc = 'ascii'
 
1323
        self.assertDirReaderIs(
 
1324
            UTF8DirReaderFeature.module.UTF8DirReader)
1180
1325
 
1181
1326
    def test_force_walkdirs_utf8_fs_latin1(self):
1182
1327
        self._save_platform_info()
1183
1328
        win32utils.winver = None # Avoid the win32 detection code
1184
 
        osutils._fs_enc = 'latin1'
 
1329
        osutils._fs_enc = 'iso-8859-1'
1185
1330
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1186
1331
 
1187
1332
    def test_force_walkdirs_utf8_nt(self):
1189
1334
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1190
1335
        self._save_platform_info()
1191
1336
        win32utils.winver = 'Windows NT'
1192
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1337
        from .._walkdirs_win32 import Win32ReadDir
1193
1338
        self.assertDirReaderIs(Win32ReadDir)
1194
1339
 
1195
1340
    def test_force_walkdirs_utf8_98(self):
1200
1345
 
1201
1346
    def test_unicode_walkdirs(self):
1202
1347
        """Walkdirs should always return unicode paths."""
1203
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1348
        self.requireFeature(features.UnicodeFilenameFeature)
1204
1349
        name0 = u'0file-\xb6'
1205
1350
        name1 = u'1dir-\u062c\u0648'
1206
1351
        name2 = u'2file-\u0633'
1243
1388
 
1244
1389
        The abspath portion might be in unicode or utf-8
1245
1390
        """
1246
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1391
        self.requireFeature(features.UnicodeFilenameFeature)
1247
1392
        name0 = u'0file-\xb6'
1248
1393
        name1 = u'1dir-\u062c\u0648'
1249
1394
        name2 = u'2file-\u0633'
1304
1449
 
1305
1450
        The abspath portion should be in unicode
1306
1451
        """
1307
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1452
        self.requireFeature(features.UnicodeFilenameFeature)
1308
1453
        # Use the unicode reader. TODO: split into driver-and-driven unit
1309
1454
        # tests.
1310
1455
        self._save_platform_info()
1351
1496
 
1352
1497
    def test__walkdirs_utf8_win32readdir(self):
1353
1498
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1354
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1355
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1499
        self.requireFeature(features.UnicodeFilenameFeature)
 
1500
        from .._walkdirs_win32 import Win32ReadDir
1356
1501
        self._save_platform_info()
1357
1502
        osutils._selected_dir_reader = Win32ReadDir()
1358
1503
        name0u = u'0file-\xb6'
1408
1553
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1409
1554
        """make sure our Stat values are valid"""
1410
1555
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1411
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1412
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1556
        self.requireFeature(features.UnicodeFilenameFeature)
 
1557
        from .._walkdirs_win32 import Win32ReadDir
1413
1558
        name0u = u'0file-\xb6'
1414
1559
        name0 = name0u.encode('utf8')
1415
1560
        self.build_tree([name0u])
1432
1577
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1433
1578
        """make sure our Stat values are valid"""
1434
1579
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1435
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1436
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1580
        self.requireFeature(features.UnicodeFilenameFeature)
 
1581
        from .._walkdirs_win32 import Win32ReadDir
1437
1582
        name0u = u'0dir-\u062c\u0648'
1438
1583
        name0 = name0u.encode('utf8')
1439
1584
        self.build_tree([name0u + '/'])
1538
1683
        self.assertEqual(['c'], os.listdir('target/b'))
1539
1684
 
1540
1685
    def test_copy_tree_symlinks(self):
1541
 
        self.requireFeature(tests.SymlinkFeature)
 
1686
        self.requireFeature(features.SymlinkFeature)
1542
1687
        self.build_tree(['source/'])
1543
1688
        os.symlink('a/generic/path', 'source/lnk')
1544
1689
        osutils.copy_tree('source', 'target')
1569
1714
                          ('d', 'source/b', 'target/b'),
1570
1715
                          ('f', 'source/b/c', 'target/b/c'),
1571
1716
                         ], processed_files)
1572
 
        self.failIfExists('target')
 
1717
        self.assertPathDoesNotExist('target')
1573
1718
        if osutils.has_symlinks():
1574
1719
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1575
1720
 
1580
1725
    def setUp(self):
1581
1726
        super(TestSetUnsetEnv, self).setUp()
1582
1727
 
1583
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
 
1728
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
1584
1729
                         'Environment was not cleaned up properly.'
1585
 
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
 
1730
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
1586
1731
        def cleanup():
1587
 
            if 'BZR_TEST_ENV_VAR' in os.environ:
1588
 
                del os.environ['BZR_TEST_ENV_VAR']
 
1732
            if 'BRZ_TEST_ENV_VAR' in os.environ:
 
1733
                del os.environ['BRZ_TEST_ENV_VAR']
1589
1734
        self.addCleanup(cleanup)
1590
1735
 
1591
1736
    def test_set(self):
1592
1737
        """Test that we can set an env variable"""
1593
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1738
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
1594
1739
        self.assertEqual(None, old)
1595
 
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
 
1740
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
1596
1741
 
1597
1742
    def test_double_set(self):
1598
1743
        """Test that we get the old value out"""
1599
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1600
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
 
1744
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1745
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
1601
1746
        self.assertEqual('foo', old)
1602
 
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
 
1747
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
1603
1748
 
1604
1749
    def test_unicode(self):
1605
1750
        """Environment can only contain plain strings
1612
1757
                'Cannot find a unicode character that works in encoding %s'
1613
1758
                % (osutils.get_user_encoding(),))
1614
1759
 
1615
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1616
 
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
 
1760
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
 
1761
        self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
1617
1762
 
1618
1763
    def test_unset(self):
1619
1764
        """Test that passing None will remove the env var"""
1620
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1621
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
 
1765
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1766
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
1622
1767
        self.assertEqual('foo', old)
1623
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1624
 
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1768
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1769
        self.assertFalse('BRZ_TEST_ENV_VAR' in os.environ)
1625
1770
 
1626
1771
 
1627
1772
class TestSizeShaFile(tests.TestCaseInTempDir):
1663
1808
class TestResourceLoading(tests.TestCaseInTempDir):
1664
1809
 
1665
1810
    def test_resource_string(self):
1666
 
        # test resource in bzrlib
1667
 
        text = osutils.resource_string('bzrlib', 'debug.py')
 
1811
        # test resource in breezy
 
1812
        text = osutils.resource_string('breezy', 'debug.py')
1668
1813
        self.assertContainsRe(text, "debug_flags = set()")
1669
 
        # test resource under bzrlib
1670
 
        text = osutils.resource_string('bzrlib.ui', 'text.py')
 
1814
        # test resource under breezy
 
1815
        text = osutils.resource_string('breezy.ui', 'text.py')
1671
1816
        self.assertContainsRe(text, "class TextUIFactory")
1672
1817
        # test unsupported package
1673
1818
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1674
1819
            'yyy.xx')
1675
1820
        # test unknown resource
1676
 
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1677
 
 
1678
 
 
1679
 
class TestReCompile(tests.TestCase):
1680
 
 
1681
 
    def test_re_compile_checked(self):
1682
 
        r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1683
 
        self.assertTrue(r.match('aaaa'))
1684
 
        self.assertTrue(r.match('aAaA'))
1685
 
 
1686
 
    def test_re_compile_checked_error(self):
1687
 
        # like https://bugs.launchpad.net/bzr/+bug/251352
1688
 
        err = self.assertRaises(
1689
 
            errors.BzrCommandError,
1690
 
            osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1691
 
        self.assertEqual(
1692
 
            "Invalid regular expression in test case: '*': "
1693
 
            "nothing to repeat",
1694
 
            str(err))
 
1821
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
1695
1822
 
1696
1823
 
1697
1824
class TestDirReader(tests.TestCaseInTempDir):
1698
1825
 
 
1826
    scenarios = dir_reader_scenarios()
 
1827
 
1699
1828
    # Set by load_tests
1700
1829
    _dir_reader_class = None
1701
1830
    _native_to_unicode = None
1702
1831
 
1703
1832
    def setUp(self):
1704
 
        tests.TestCaseInTempDir.setUp(self)
 
1833
        super(TestDirReader, self).setUp()
1705
1834
        self.overrideAttr(osutils,
1706
1835
                          '_selected_dir_reader', self._dir_reader_class())
1707
1836
 
1801
1930
        return filtered_dirblocks
1802
1931
 
1803
1932
    def test_walk_unicode_tree(self):
1804
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1933
        self.requireFeature(features.UnicodeFilenameFeature)
1805
1934
        tree, expected_dirblocks = self._get_unicode_tree()
1806
1935
        self.build_tree(tree)
1807
1936
        result = list(osutils._walkdirs_utf8('.'))
1808
1937
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1809
1938
 
1810
1939
    def test_symlink(self):
1811
 
        self.requireFeature(tests.SymlinkFeature)
1812
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1940
        self.requireFeature(features.SymlinkFeature)
 
1941
        self.requireFeature(features.UnicodeFilenameFeature)
1813
1942
        target = u'target\N{Euro Sign}'
1814
1943
        link_name = u'l\N{Euro Sign}nk'
1815
1944
        os.symlink(target, link_name)
1833
1962
    But prior python versions failed to properly encode the passed unicode
1834
1963
    string.
1835
1964
    """
1836
 
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
 
1965
    _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
1837
1966
 
1838
1967
    def setUp(self):
1839
1968
        super(tests.TestCaseInTempDir, self).setUp()
1842
1971
        os.symlink(self.target, self.link)
1843
1972
 
1844
1973
    def test_os_readlink_link_encoding(self):
1845
 
        if sys.version_info < (2, 6):
1846
 
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1847
 
        else:
1848
 
            self.assertEquals(self.target,  os.readlink(self.link))
 
1974
        self.assertEqual(self.target,  os.readlink(self.link))
1849
1975
 
1850
1976
    def test_os_readlink_link_decoding(self):
1851
 
        self.assertEquals(self.target.encode(osutils._fs_enc),
 
1977
        self.assertEqual(self.target.encode(osutils._fs_enc),
1852
1978
                          os.readlink(self.link.encode(osutils._fs_enc)))
1853
1979
 
1854
1980
 
1863
1989
        self.assertIsInstance(concurrency, int)
1864
1990
 
1865
1991
    def test_local_concurrency_environment_variable(self):
1866
 
        os.environ['BZR_CONCURRENCY'] = '2'
 
1992
        self.overrideEnv('BRZ_CONCURRENCY', '2')
1867
1993
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1868
 
        os.environ['BZR_CONCURRENCY'] = '3'
 
1994
        self.overrideEnv('BRZ_CONCURRENCY', '3')
1869
1995
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1870
 
        os.environ['BZR_CONCURRENCY'] = 'foo'
 
1996
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
1871
1997
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1872
1998
 
1873
1999
    def test_option_concurrency(self):
1874
 
        os.environ['BZR_CONCURRENCY'] = '1'
 
2000
        self.overrideEnv('BRZ_CONCURRENCY', '1')
1875
2001
        self.run_bzr('rocks --concurrency 42')
1876
 
        # Command line overrides envrionment variable
1877
 
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1878
 
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
 
2002
        # Command line overrides environment variable
 
2003
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
 
2004
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
1879
2005
 
1880
2006
 
1881
2007
class TestFailedToLoadExtension(tests.TestCase):
1882
2008
 
1883
2009
    def _try_loading(self):
1884
2010
        try:
1885
 
            import bzrlib._fictional_extension_py
1886
 
        except ImportError, e:
 
2011
            import breezy._fictional_extension_py
 
2012
        except ImportError as e:
1887
2013
            osutils.failed_to_load_extension(e)
1888
2014
            return True
1889
2015
 
1894
2020
    def test_failure_to_load(self):
1895
2021
        self._try_loading()
1896
2022
        self.assertLength(1, osutils._extension_load_failures)
1897
 
        self.assertEquals(osutils._extension_load_failures[0],
 
2023
        self.assertEqual(osutils._extension_load_failures[0],
1898
2024
            "No module named _fictional_extension_py")
1899
2025
 
1900
2026
    def test_report_extension_load_failures_no_warning(self):
1904
2030
        self.assertLength(0, warnings)
1905
2031
 
1906
2032
    def test_report_extension_load_failures_message(self):
1907
 
        log = StringIO()
 
2033
        log = BytesIO()
1908
2034
        trace.push_log_file(log)
1909
2035
        self.assertTrue(self._try_loading())
1910
2036
        osutils.report_extension_load_failures()
1911
2037
        self.assertContainsRe(
1912
2038
            log.getvalue(),
1913
 
            r"bzr: warning: some compiled extensions could not be loaded; "
 
2039
            r"brz: warning: some compiled extensions could not be loaded; "
1914
2040
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
1915
2041
            )
1916
2042
 
1917
2043
 
1918
2044
class TestTerminalWidth(tests.TestCase):
1919
2045
 
 
2046
    def setUp(self):
 
2047
        super(TestTerminalWidth, self).setUp()
 
2048
        self._orig_terminal_size_state = osutils._terminal_size_state
 
2049
        self._orig_first_terminal_size = osutils._first_terminal_size
 
2050
        self.addCleanup(self.restore_osutils_globals)
 
2051
        osutils._terminal_size_state = 'no_data'
 
2052
        osutils._first_terminal_size = None
 
2053
 
 
2054
    def restore_osutils_globals(self):
 
2055
        osutils._terminal_size_state = self._orig_terminal_size_state
 
2056
        osutils._first_terminal_size = self._orig_first_terminal_size
 
2057
 
1920
2058
    def replace_stdout(self, new):
1921
2059
        self.overrideAttr(sys, 'stdout', new)
1922
2060
 
1934
2072
    def test_default_values(self):
1935
2073
        self.assertEqual(80, osutils.default_terminal_width)
1936
2074
 
1937
 
    def test_defaults_to_BZR_COLUMNS(self):
1938
 
        # BZR_COLUMNS is set by the test framework
1939
 
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1940
 
        os.environ['BZR_COLUMNS'] = '12'
 
2075
    def test_defaults_to_BRZ_COLUMNS(self):
 
2076
        # BRZ_COLUMNS is set by the test framework
 
2077
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
 
2078
        self.overrideEnv('BRZ_COLUMNS', '12')
1941
2079
        self.assertEqual(12, osutils.terminal_width())
1942
2080
 
 
2081
    def test_BRZ_COLUMNS_0_no_limit(self):
 
2082
        self.overrideEnv('BRZ_COLUMNS', '0')
 
2083
        self.assertEqual(None, osutils.terminal_width())
 
2084
 
1943
2085
    def test_falls_back_to_COLUMNS(self):
1944
 
        del os.environ['BZR_COLUMNS']
 
2086
        self.overrideEnv('BRZ_COLUMNS', None)
1945
2087
        self.assertNotEqual('42', os.environ['COLUMNS'])
1946
2088
        self.set_fake_tty()
1947
 
        os.environ['COLUMNS'] = '42'
 
2089
        self.overrideEnv('COLUMNS', '42')
1948
2090
        self.assertEqual(42, osutils.terminal_width())
1949
2091
 
1950
2092
    def test_tty_default_without_columns(self):
1951
 
        del os.environ['BZR_COLUMNS']
1952
 
        del os.environ['COLUMNS']
 
2093
        self.overrideEnv('BRZ_COLUMNS', None)
 
2094
        self.overrideEnv('COLUMNS', None)
1953
2095
 
1954
2096
        def terminal_size(w, h):
1955
2097
            return 42, 42
1962
2104
        self.assertEqual(42, osutils.terminal_width())
1963
2105
 
1964
2106
    def test_non_tty_default_without_columns(self):
1965
 
        del os.environ['BZR_COLUMNS']
1966
 
        del os.environ['COLUMNS']
 
2107
        self.overrideEnv('BRZ_COLUMNS', None)
 
2108
        self.overrideEnv('COLUMNS', None)
1967
2109
        self.replace_stdout(None)
1968
2110
        self.assertEqual(None, osutils.terminal_width())
1969
2111
 
1979
2121
        else:
1980
2122
            self.overrideAttr(termios, 'TIOCGWINSZ')
1981
2123
            del termios.TIOCGWINSZ
1982
 
        del os.environ['BZR_COLUMNS']
1983
 
        del os.environ['COLUMNS']
 
2124
        self.overrideEnv('BRZ_COLUMNS', None)
 
2125
        self.overrideEnv('COLUMNS', None)
1984
2126
        # Whatever the result is, if we don't raise an exception, it's ok.
1985
2127
        osutils.terminal_width()
1986
2128
 
 
2129
 
1987
2130
class TestCreationOps(tests.TestCaseInTempDir):
1988
2131
    _test_needs_features = [features.chown_feature]
1989
2132
 
1990
2133
    def setUp(self):
1991
 
        tests.TestCaseInTempDir.setUp(self)
 
2134
        super(TestCreationOps, self).setUp()
1992
2135
        self.overrideAttr(os, 'chown', self._dummy_chown)
1993
2136
 
1994
2137
        # params set by call to _dummy_chown
2004
2147
        osutils.copy_ownership_from_path('test_file', ownsrc)
2005
2148
 
2006
2149
        s = os.stat(ownsrc)
2007
 
        self.assertEquals(self.path, 'test_file')
2008
 
        self.assertEquals(self.uid, s.st_uid)
2009
 
        self.assertEquals(self.gid, s.st_gid)
 
2150
        self.assertEqual(self.path, 'test_file')
 
2151
        self.assertEqual(self.uid, s.st_uid)
 
2152
        self.assertEqual(self.gid, s.st_gid)
2010
2153
 
2011
2154
    def test_copy_ownership_nonesrc(self):
2012
2155
        """copy_ownership_from_path test with src=None."""
2015
2158
        osutils.copy_ownership_from_path('test_file')
2016
2159
 
2017
2160
        s = os.stat('..')
2018
 
        self.assertEquals(self.path, 'test_file')
2019
 
        self.assertEquals(self.uid, s.st_uid)
2020
 
        self.assertEquals(self.gid, s.st_gid)
 
2161
        self.assertEqual(self.path, 'test_file')
 
2162
        self.assertEqual(self.uid, s.st_uid)
 
2163
        self.assertEqual(self.gid, s.st_gid)
 
2164
 
 
2165
 
 
2166
class TestPathFromEnviron(tests.TestCase):
 
2167
 
 
2168
    def test_is_unicode(self):
 
2169
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
 
2170
        path = osutils.path_from_environ('BRZ_TEST_PATH')
 
2171
        self.assertIsInstance(path, unicode)
 
2172
        self.assertEqual(u'./anywhere at all/', path)
 
2173
 
 
2174
    def test_posix_path_env_ascii(self):
 
2175
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
 
2176
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
 
2177
        self.assertIsInstance(home, unicode)
 
2178
        self.assertEqual(u'/tmp', home)
 
2179
 
 
2180
    def test_posix_path_env_unicode(self):
 
2181
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2182
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
 
2183
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2184
        self.assertEqual(u'/home/\xa7test',
 
2185
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2186
        osutils._fs_enc = "iso8859-5"
 
2187
        self.assertEqual(u'/home/\u0407test',
 
2188
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2189
        osutils._fs_enc = "utf-8"
 
2190
        self.assertRaises(errors.BadFilenameEncoding,
 
2191
            osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
 
2192
 
 
2193
 
 
2194
class TestGetHomeDir(tests.TestCase):
 
2195
 
 
2196
    def test_is_unicode(self):
 
2197
        home = osutils._get_home_dir()
 
2198
        self.assertIsInstance(home, unicode)
 
2199
 
 
2200
    def test_posix_homeless(self):
 
2201
        self.overrideEnv('HOME', None)
 
2202
        home = osutils._get_home_dir()
 
2203
        self.assertIsInstance(home, unicode)
 
2204
 
 
2205
    def test_posix_home_ascii(self):
 
2206
        self.overrideEnv('HOME', '/home/test')
 
2207
        home = osutils._posix_get_home_dir()
 
2208
        self.assertIsInstance(home, unicode)
 
2209
        self.assertEqual(u'/home/test', home)
 
2210
 
 
2211
    def test_posix_home_unicode(self):
 
2212
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2213
        self.overrideEnv('HOME', '/home/\xa7test')
 
2214
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2215
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2216
        osutils._fs_enc = "iso8859-5"
 
2217
        self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
 
2218
        osutils._fs_enc = "utf-8"
 
2219
        self.assertRaises(errors.BadFilenameEncoding,
 
2220
            osutils._posix_get_home_dir)
 
2221
 
 
2222
 
 
2223
class TestGetuserUnicode(tests.TestCase):
 
2224
 
 
2225
    def test_is_unicode(self):
 
2226
        user = osutils.getuser_unicode()
 
2227
        self.assertIsInstance(user, unicode)
 
2228
 
 
2229
    def envvar_to_override(self):
 
2230
        if sys.platform == "win32":
 
2231
            # Disable use of platform calls on windows so envvar is used
 
2232
            self.overrideAttr(win32utils, 'has_ctypes', False)
 
2233
            return 'USERNAME' # only variable used on windows
 
2234
        return 'LOGNAME' # first variable checked by getpass.getuser()
 
2235
 
 
2236
    def test_ascii_user(self):
 
2237
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
 
2238
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
 
2239
 
 
2240
    def test_unicode_user(self):
 
2241
        ue = osutils.get_user_encoding()
 
2242
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
2243
        if uni_val is None:
 
2244
            raise tests.TestSkipped(
 
2245
                'Cannot find a unicode character that works in encoding %s'
 
2246
                % (osutils.get_user_encoding(),))
 
2247
        uni_username = u'jrandom' + uni_val
 
2248
        encoded_username = uni_username.encode(ue)
 
2249
        self.overrideEnv(self.envvar_to_override(), encoded_username)
 
2250
        self.assertEqual(uni_username, osutils.getuser_unicode())
 
2251
 
 
2252
 
 
2253
class TestBackupNames(tests.TestCase):
 
2254
 
 
2255
    def setUp(self):
 
2256
        super(TestBackupNames, self).setUp()
 
2257
        self.backups = []
 
2258
 
 
2259
    def backup_exists(self, name):
 
2260
        return name in self.backups
 
2261
 
 
2262
    def available_backup_name(self, name):
 
2263
        backup_name = osutils.available_backup_name(name, self.backup_exists)
 
2264
        self.backups.append(backup_name)
 
2265
        return backup_name
 
2266
 
 
2267
    def assertBackupName(self, expected, name):
 
2268
        self.assertEqual(expected, self.available_backup_name(name))
 
2269
 
 
2270
    def test_empty(self):
 
2271
        self.assertBackupName('file.~1~', 'file')
 
2272
 
 
2273
    def test_existing(self):
 
2274
        self.available_backup_name('file')
 
2275
        self.available_backup_name('file')
 
2276
        self.assertBackupName('file.~3~', 'file')
 
2277
        # Empty slots are found, this is not a strict requirement and may be
 
2278
        # revisited if we test against all implementations.
 
2279
        self.backups.remove('file.~2~')
 
2280
        self.assertBackupName('file.~2~', 'file')
 
2281
 
 
2282
 
 
2283
class TestFindExecutableInPath(tests.TestCase):
 
2284
 
 
2285
    def test_windows(self):
 
2286
        if sys.platform != 'win32':
 
2287
            raise tests.TestSkipped('test requires win32')
 
2288
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
 
2289
        self.assertTrue(
 
2290
            osutils.find_executable_on_path('explorer.exe') is not None)
 
2291
        self.assertTrue(
 
2292
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
 
2293
        self.assertTrue(
 
2294
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2295
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
 
2296
        
 
2297
    def test_windows_app_path(self):
 
2298
        if sys.platform != 'win32':
 
2299
            raise tests.TestSkipped('test requires win32')
 
2300
        # Override PATH env var so that exe can only be found on App Path
 
2301
        self.overrideEnv('PATH', '')
 
2302
        # Internt Explorer is always registered in the App Path
 
2303
        self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
 
2304
 
 
2305
    def test_other(self):
 
2306
        if sys.platform == 'win32':
 
2307
            raise tests.TestSkipped('test requires non-win32')
 
2308
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
 
2309
        self.assertTrue(
 
2310
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2311
 
 
2312
 
 
2313
class TestEnvironmentErrors(tests.TestCase):
 
2314
    """Test handling of environmental errors"""
 
2315
 
 
2316
    def test_is_oserror(self):
 
2317
        self.assertTrue(osutils.is_environment_error(
 
2318
            OSError(errno.EINVAL, "Invalid parameter")))
 
2319
 
 
2320
    def test_is_ioerror(self):
 
2321
        self.assertTrue(osutils.is_environment_error(
 
2322
            IOError(errno.EINVAL, "Invalid parameter")))
 
2323
 
 
2324
    def test_is_socket_error(self):
 
2325
        self.assertTrue(osutils.is_environment_error(
 
2326
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2327
 
 
2328
    def test_is_select_error(self):
 
2329
        self.assertTrue(osutils.is_environment_error(
 
2330
            select.error(errno.EINVAL, "Invalid parameter")))
 
2331
 
 
2332
    def test_is_pywintypes_error(self):
 
2333
        self.requireFeature(features.pywintypes)
 
2334
        import pywintypes
 
2335
        self.assertTrue(osutils.is_environment_error(
 
2336
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))