/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Martin Pool
  • Date: 2009-06-05 23:21:51 UTC
  • mfrom: (4415 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4416.
  • Revision ID: mbp@sourcefrog.net-20090605232151-luwmyyl95siraqyz
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
19
from cStringIO import StringIO
20
20
import errno
21
21
import os
 
22
import re
22
23
import socket
23
24
import stat
24
25
import sys
30
31
    tests,
31
32
    win32utils,
32
33
    )
33
 
from bzrlib.errors import BzrBadParameterNotUnicode, InvalidURL
34
 
from bzrlib.osutils import (
35
 
        is_inside_any,
36
 
        is_inside_or_parent_of_any,
37
 
        pathjoin,
38
 
        pumpfile,
39
 
        pump_string_file,
40
 
        canonical_relpath,
41
 
        )
42
34
from bzrlib.tests import (
43
 
        Feature,
44
 
        probe_unicode_in_user_encoding,
45
 
        StringIOWrapper,
46
 
        SymlinkFeature,
47
 
        CaseInsCasePresFilenameFeature,
48
 
        TestCase,
49
 
        TestCaseInTempDir,
50
 
        TestSkipped,
51
 
        )
52
 
from bzrlib.tests.file_utils import (
53
 
    FakeReadFile,
 
35
    file_utils,
 
36
    test__walkdirs_win32,
54
37
    )
55
 
from bzrlib.tests.test__walkdirs_win32 import Win32ReadDirFeature
56
 
 
57
 
 
58
 
class _UTF8DirReaderFeature(Feature):
 
38
 
 
39
 
 
40
class _UTF8DirReaderFeature(tests.Feature):
59
41
 
60
42
    def _probe(self):
61
43
        try:
71
53
UTF8DirReaderFeature = _UTF8DirReaderFeature()
72
54
 
73
55
 
74
 
class TestOSUtils(TestCaseInTempDir):
 
56
def _already_unicode(s):
 
57
    return s
 
58
 
 
59
 
 
60
def _fs_enc_to_unicode(s):
 
61
    return s.decode(osutils._fs_enc)
 
62
 
 
63
 
 
64
def _utf8_to_unicode(s):
 
65
    return s.decode('UTF-8')
 
66
 
 
67
 
 
68
def dir_reader_scenarios():
 
69
    # For each dir reader we define:
 
70
 
 
71
    # - native_to_unicode: a function converting the native_abspath as returned
 
72
    #   by DirReader.read_dir to its unicode representation
 
73
 
 
74
    # UnicodeDirReader is the fallback, it should be tested on all platforms.
 
75
    scenarios = [('unicode',
 
76
                  dict(_dir_reader_class=osutils.UnicodeDirReader,
 
77
                       _native_to_unicode=_already_unicode))]
 
78
    # Some DirReaders are platform specific and even there they may not be
 
79
    # available.
 
80
    if UTF8DirReaderFeature.available():
 
81
        from bzrlib import _readdir_pyx
 
82
        scenarios.append(('utf8',
 
83
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
 
84
                               _native_to_unicode=_utf8_to_unicode)))
 
85
 
 
86
    if test__walkdirs_win32.Win32ReadDirFeature.available():
 
87
        try:
 
88
            from bzrlib import _walkdirs_win32
 
89
            # TODO: check on windows, it may be that we need to use/add
 
90
            # safe_unicode instead of _fs_enc_to_unicode
 
91
            scenarios.append(
 
92
                ('win32',
 
93
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
 
94
                      _native_to_unicode=_fs_enc_to_unicode)))
 
95
        except ImportError:
 
96
            pass
 
97
    return scenarios
 
98
 
 
99
 
 
100
def load_tests(basic_tests, module, loader):
 
101
    suite = loader.suiteClass()
 
102
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
 
103
        basic_tests, tests.condition_isinstance(TestDirReader))
 
104
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
 
105
    suite.addTest(remaining_tests)
 
106
    return suite
 
107
 
 
108
 
 
109
class TestContainsWhitespace(tests.TestCase):
75
110
 
76
111
    def test_contains_whitespace(self):
77
112
        self.failUnless(osutils.contains_whitespace(u' '))
87
122
        self.failIf(osutils.contains_whitespace(u'hellothere'))
88
123
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
89
124
 
 
125
 
 
126
class TestRename(tests.TestCaseInTempDir):
 
127
 
90
128
    def test_fancy_rename(self):
91
129
        # This should work everywhere
92
130
        def rename(a, b):
130
168
        shape = sorted(os.listdir('.'))
131
169
        self.assertEquals(['A', 'B'], shape)
132
170
 
 
171
 
 
172
class TestRandChars(tests.TestCase):
 
173
 
133
174
    def test_01_rand_chars_empty(self):
134
175
        result = osutils.rand_chars(0)
135
176
        self.assertEqual(result, '')
140
181
        self.assertEqual(type(result), str)
141
182
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
142
183
 
 
184
 
 
185
class TestIsInside(tests.TestCase):
 
186
 
143
187
    def test_is_inside(self):
144
188
        is_inside = osutils.is_inside
145
189
        self.assertTrue(is_inside('src', 'src/foo.c'))
150
194
        self.assertTrue(is_inside('', 'foo.c'))
151
195
 
152
196
    def test_is_inside_any(self):
153
 
        SRC_FOO_C = pathjoin('src', 'foo.c')
 
197
        SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
154
198
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
155
199
                         (['src'], SRC_FOO_C),
156
200
                         (['src'], 'src'),
157
201
                         ]:
158
 
            self.assert_(is_inside_any(dirs, fn))
 
202
            self.assert_(osutils.is_inside_any(dirs, fn))
159
203
        for dirs, fn in [(['src'], 'srccontrol'),
160
204
                         (['src'], 'srccontrol/foo')]:
161
 
            self.assertFalse(is_inside_any(dirs, fn))
 
205
            self.assertFalse(osutils.is_inside_any(dirs, fn))
162
206
 
163
207
    def test_is_inside_or_parent_of_any(self):
164
208
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
167
211
                         (['src/bar.c', 'bla/foo.c'], 'src'),
168
212
                         (['src'], 'src'),
169
213
                         ]:
170
 
            self.assert_(is_inside_or_parent_of_any(dirs, fn))
 
214
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
171
215
 
172
216
        for dirs, fn in [(['src'], 'srccontrol'),
173
217
                         (['srccontrol/foo.c'], 'src'),
174
218
                         (['src'], 'srccontrol/foo')]:
175
 
            self.assertFalse(is_inside_or_parent_of_any(dirs, fn))
 
219
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
 
220
 
 
221
 
 
222
class TestRmTree(tests.TestCaseInTempDir):
176
223
 
177
224
    def test_rmtree(self):
178
225
        # Check to remove tree with read-only files/dirs
192
239
        self.failIfExists('dir/file')
193
240
        self.failIfExists('dir')
194
241
 
 
242
 
 
243
class TestKind(tests.TestCaseInTempDir):
 
244
 
195
245
    def test_file_kind(self):
196
246
        self.build_tree(['file', 'dir/'])
197
247
        self.assertEquals('file', osutils.file_kind('file'))
227
277
                os.remove('socket')
228
278
 
229
279
    def test_kind_marker(self):
230
 
        self.assertEqual(osutils.kind_marker('file'), '')
231
 
        self.assertEqual(osutils.kind_marker('directory'), '/')
232
 
        self.assertEqual(osutils.kind_marker('symlink'), '@')
233
 
        self.assertEqual(osutils.kind_marker('tree-reference'), '+')
 
280
        self.assertEqual("", osutils.kind_marker("file"))
 
281
        self.assertEqual("/", osutils.kind_marker('directory'))
 
282
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
 
283
        self.assertEqual("@", osutils.kind_marker("symlink"))
 
284
        self.assertEqual("+", osutils.kind_marker("tree-reference"))
 
285
        self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
 
286
 
 
287
 
 
288
class TestUmask(tests.TestCaseInTempDir):
234
289
 
235
290
    def test_get_umask(self):
236
291
        if sys.platform == 'win32':
239
294
            return
240
295
 
241
296
        orig_umask = osutils.get_umask()
242
 
        try:
243
 
            os.umask(0222)
244
 
            self.assertEqual(0222, osutils.get_umask())
245
 
            os.umask(0022)
246
 
            self.assertEqual(0022, osutils.get_umask())
247
 
            os.umask(0002)
248
 
            self.assertEqual(0002, osutils.get_umask())
249
 
            os.umask(0027)
250
 
            self.assertEqual(0027, osutils.get_umask())
251
 
        finally:
252
 
            os.umask(orig_umask)
 
297
        self.addCleanup(os.umask, orig_umask)
 
298
        os.umask(0222)
 
299
        self.assertEqual(0222, osutils.get_umask())
 
300
        os.umask(0022)
 
301
        self.assertEqual(0022, osutils.get_umask())
 
302
        os.umask(0002)
 
303
        self.assertEqual(0002, osutils.get_umask())
 
304
        os.umask(0027)
 
305
        self.assertEqual(0027, osutils.get_umask())
 
306
 
 
307
 
 
308
class TestDateTime(tests.TestCase):
253
309
 
254
310
    def assertFormatedDelta(self, expected, seconds):
255
311
        """Assert osutils.format_delta formats as expected"""
297
353
        # Instead blackbox.test_locale should check for localized
298
354
        # dates once they do occur in output strings.
299
355
 
 
356
    def test_local_time_offset(self):
 
357
        """Test that local_time_offset() returns a sane value."""
 
358
        offset = osutils.local_time_offset()
 
359
        self.assertTrue(isinstance(offset, int))
 
360
        # Test that the offset is no more than a eighteen hours in
 
361
        # either direction.
 
362
        # Time zone handling is system specific, so it is difficult to
 
363
        # do more specific tests, but a value outside of this range is
 
364
        # probably wrong.
 
365
        eighteen_hours = 18 * 3600
 
366
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
367
 
 
368
    def test_local_time_offset_with_timestamp(self):
 
369
        """Test that local_time_offset() works with a timestamp."""
 
370
        offset = osutils.local_time_offset(1000000000.1234567)
 
371
        self.assertTrue(isinstance(offset, int))
 
372
        eighteen_hours = 18 * 3600
 
373
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
374
 
 
375
 
 
376
class TestLinks(tests.TestCaseInTempDir):
 
377
 
300
378
    def test_dereference_path(self):
301
 
        self.requireFeature(SymlinkFeature)
 
379
        self.requireFeature(tests.SymlinkFeature)
302
380
        cwd = osutils.realpath('.')
303
381
        os.mkdir('bar')
304
382
        bar_path = osutils.pathjoin(cwd, 'bar')
345
423
            osutils.make_readonly('dangling')
346
424
            osutils.make_writable('dangling')
347
425
 
348
 
    def test_kind_marker(self):
349
 
        self.assertEqual("", osutils.kind_marker("file"))
350
 
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
351
 
        self.assertEqual("@", osutils.kind_marker("symlink"))
352
 
        self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
353
 
 
354
426
    def test_host_os_dereferences_symlinks(self):
355
427
        osutils.host_os_dereferences_symlinks()
356
428
 
357
429
 
358
 
class TestCanonicalRelPath(TestCaseInTempDir):
 
430
class TestCanonicalRelPath(tests.TestCaseInTempDir):
359
431
 
360
 
    _test_needs_features = [CaseInsCasePresFilenameFeature]
 
432
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
361
433
 
362
434
    def test_canonical_relpath_simple(self):
363
435
        f = file('MixedCaseName', 'w')
364
436
        f.close()
365
 
        self.failUnlessEqual(
366
 
            canonical_relpath(self.test_base_dir, 'mixedcasename'),
367
 
            'work/MixedCaseName')
 
437
        # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
438
        real_base_dir = osutils.realpath(self.test_base_dir)
 
439
        actual = osutils.canonical_relpath(real_base_dir, 'mixedcasename')
 
440
        self.failUnlessEqual('work/MixedCaseName', actual)
368
441
 
369
442
    def test_canonical_relpath_missing_tail(self):
370
443
        os.mkdir('MixedCaseParent')
371
 
        self.failUnlessEqual(
372
 
            canonical_relpath(self.test_base_dir, 'mixedcaseparent/nochild'),
373
 
            'work/MixedCaseParent/nochild')
374
 
 
375
 
 
376
 
class TestPumpFile(TestCase):
 
444
        # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
445
        real_base_dir = osutils.realpath(self.test_base_dir)
 
446
        actual = osutils.canonical_relpath(real_base_dir,
 
447
                                           'mixedcaseparent/nochild')
 
448
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
 
449
 
 
450
 
 
451
class TestPumpFile(tests.TestCase):
377
452
    """Test pumpfile method."""
 
453
 
378
454
    def setUp(self):
 
455
        tests.TestCase.setUp(self)
379
456
        # create a test datablock
380
457
        self.block_size = 512
381
458
        pattern = '0123456789ABCDEF'
388
465
        # make sure test data is larger than max read size
389
466
        self.assertTrue(self.test_data_len > self.block_size)
390
467
 
391
 
        from_file = FakeReadFile(self.test_data)
 
468
        from_file = file_utils.FakeReadFile(self.test_data)
392
469
        to_file = StringIO()
393
470
 
394
471
        # read (max / 2) bytes and verify read size wasn't affected
395
472
        num_bytes_to_read = self.block_size / 2
396
 
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
473
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
397
474
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
398
475
        self.assertEqual(from_file.get_read_count(), 1)
399
476
 
400
477
        # read (max) bytes and verify read size wasn't affected
401
478
        num_bytes_to_read = self.block_size
402
479
        from_file.reset_read_count()
403
 
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
480
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
404
481
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
405
482
        self.assertEqual(from_file.get_read_count(), 1)
406
483
 
407
484
        # read (max + 1) bytes and verify read size was limited
408
485
        num_bytes_to_read = self.block_size + 1
409
486
        from_file.reset_read_count()
410
 
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
487
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
411
488
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
412
489
        self.assertEqual(from_file.get_read_count(), 2)
413
490
 
414
491
        # finish reading the rest of the data
415
492
        num_bytes_to_read = self.test_data_len - to_file.tell()
416
 
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
493
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
417
494
 
418
495
        # report error if the data wasn't equal (we only report the size due
419
496
        # to the length of the data)
429
506
        self.assertTrue(self.test_data_len > self.block_size)
430
507
 
431
508
        # retrieve data in blocks
432
 
        from_file = FakeReadFile(self.test_data)
 
509
        from_file = file_utils.FakeReadFile(self.test_data)
433
510
        to_file = StringIO()
434
 
        pumpfile(from_file, to_file, self.test_data_len, self.block_size)
 
511
        osutils.pumpfile(from_file, to_file, self.test_data_len,
 
512
                         self.block_size)
435
513
 
436
514
        # verify read size was equal to the maximum read size
437
515
        self.assertTrue(from_file.get_max_read_size() > 0)
452
530
        self.assertTrue(self.test_data_len > self.block_size)
453
531
 
454
532
        # retrieve data to EOF
455
 
        from_file = FakeReadFile(self.test_data)
 
533
        from_file = file_utils.FakeReadFile(self.test_data)
456
534
        to_file = StringIO()
457
 
        pumpfile(from_file, to_file, -1, self.block_size)
 
535
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
458
536
 
459
537
        # verify read size was equal to the maximum read size
460
538
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
472
550
        test verifies that any existing usages of pumpfile will not be broken
473
551
        with this new version."""
474
552
        # retrieve data using default (old) pumpfile method
475
 
        from_file = FakeReadFile(self.test_data)
 
553
        from_file = file_utils.FakeReadFile(self.test_data)
476
554
        to_file = StringIO()
477
 
        pumpfile(from_file, to_file)
 
555
        osutils.pumpfile(from_file, to_file)
478
556
 
479
557
        # report error if the data wasn't equal (we only report the size due
480
558
        # to the length of the data)
489
567
            activity.append((length, direction))
490
568
        from_file = StringIO(self.test_data)
491
569
        to_file = StringIO()
492
 
        pumpfile(from_file, to_file, buff_size=500,
493
 
                 report_activity=log_activity, direction='read')
 
570
        osutils.pumpfile(from_file, to_file, buff_size=500,
 
571
                         report_activity=log_activity, direction='read')
494
572
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
495
573
                          (36, 'read')], activity)
496
574
 
497
575
        from_file = StringIO(self.test_data)
498
576
        to_file = StringIO()
499
577
        del activity[:]
500
 
        pumpfile(from_file, to_file, buff_size=500,
501
 
                 report_activity=log_activity, direction='write')
 
578
        osutils.pumpfile(from_file, to_file, buff_size=500,
 
579
                         report_activity=log_activity, direction='write')
502
580
        self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
503
581
                          (36, 'write')], activity)
504
582
 
506
584
        from_file = StringIO(self.test_data)
507
585
        to_file = StringIO()
508
586
        del activity[:]
509
 
        pumpfile(from_file, to_file, buff_size=500, read_length=1028,
510
 
                 report_activity=log_activity, direction='read')
 
587
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
 
588
                         report_activity=log_activity, direction='read')
511
589
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
512
590
 
513
591
 
514
592
 
515
 
class TestPumpStringFile(TestCase):
 
593
class TestPumpStringFile(tests.TestCase):
516
594
 
517
595
    def test_empty(self):
518
596
        output = StringIO()
519
 
        pump_string_file("", output)
 
597
        osutils.pump_string_file("", output)
520
598
        self.assertEqual("", output.getvalue())
521
599
 
522
600
    def test_more_than_segment_size(self):
523
601
        output = StringIO()
524
 
        pump_string_file("123456789", output, 2)
 
602
        osutils.pump_string_file("123456789", output, 2)
525
603
        self.assertEqual("123456789", output.getvalue())
526
604
 
527
605
    def test_segment_size(self):
528
606
        output = StringIO()
529
 
        pump_string_file("12", output, 2)
 
607
        osutils.pump_string_file("12", output, 2)
530
608
        self.assertEqual("12", output.getvalue())
531
609
 
532
610
    def test_segment_size_multiple(self):
533
611
        output = StringIO()
534
 
        pump_string_file("1234", output, 2)
 
612
        osutils.pump_string_file("1234", output, 2)
535
613
        self.assertEqual("1234", output.getvalue())
536
614
 
537
615
 
538
 
class TestSafeUnicode(TestCase):
 
616
class TestSafeUnicode(tests.TestCase):
539
617
 
540
618
    def test_from_ascii_string(self):
541
619
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
550
628
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
551
629
 
552
630
    def test_bad_utf8_string(self):
553
 
        self.assertRaises(BzrBadParameterNotUnicode,
 
631
        self.assertRaises(errors.BzrBadParameterNotUnicode,
554
632
                          osutils.safe_unicode,
555
633
                          '\xbb\xbb')
556
634
 
557
635
 
558
 
class TestSafeUtf8(TestCase):
 
636
class TestSafeUtf8(tests.TestCase):
559
637
 
560
638
    def test_from_ascii_string(self):
561
639
        f = 'foobar'
571
649
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
572
650
 
573
651
    def test_bad_utf8_string(self):
574
 
        self.assertRaises(BzrBadParameterNotUnicode,
 
652
        self.assertRaises(errors.BzrBadParameterNotUnicode,
575
653
                          osutils.safe_utf8, '\xbb\xbb')
576
654
 
577
655
 
578
 
class TestSafeRevisionId(TestCase):
 
656
class TestSafeRevisionId(tests.TestCase):
579
657
 
580
658
    def test_from_ascii_string(self):
581
659
        # this shouldn't give a warning because it's getting an ascii string
603
681
        self.assertEqual(None, osutils.safe_revision_id(None))
604
682
 
605
683
 
606
 
class TestSafeFileId(TestCase):
 
684
class TestSafeFileId(tests.TestCase):
607
685
 
608
686
    def test_from_ascii_string(self):
609
687
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
629
707
        self.assertEqual(None, osutils.safe_file_id(None))
630
708
 
631
709
 
632
 
class TestWin32Funcs(TestCase):
633
 
    """Test that the _win32 versions of os utilities return appropriate paths."""
 
710
class TestWin32Funcs(tests.TestCase):
 
711
    """Test that _win32 versions of os utilities return appropriate paths."""
634
712
 
635
713
    def test_abspath(self):
636
714
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
643
721
        self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
644
722
 
645
723
    def test_pathjoin(self):
646
 
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path', 'to', 'foo'))
647
 
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path\\to', 'C:\\foo'))
648
 
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path/to', 'C:/foo'))
649
 
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path/to/', 'foo'))
650
 
        self.assertEqual('/foo', osutils._win32_pathjoin('C:/path/to/', '/foo'))
651
 
        self.assertEqual('/foo', osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
 
724
        self.assertEqual('path/to/foo',
 
725
                         osutils._win32_pathjoin('path', 'to', 'foo'))
 
726
        self.assertEqual('C:/foo',
 
727
                         osutils._win32_pathjoin('path\\to', 'C:\\foo'))
 
728
        self.assertEqual('C:/foo',
 
729
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
 
730
        self.assertEqual('path/to/foo',
 
731
                         osutils._win32_pathjoin('path/to/', 'foo'))
 
732
        self.assertEqual('/foo',
 
733
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
 
734
        self.assertEqual('/foo',
 
735
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
652
736
 
653
737
    def test_normpath(self):
654
 
        self.assertEqual('path/to/foo', osutils._win32_normpath(r'path\\from\..\to\.\foo'))
655
 
        self.assertEqual('path/to/foo', osutils._win32_normpath('path//from/../to/./foo'))
 
738
        self.assertEqual('path/to/foo',
 
739
                         osutils._win32_normpath(r'path\\from\..\to\.\foo'))
 
740
        self.assertEqual('path/to/foo',
 
741
                         osutils._win32_normpath('path//from/../to/./foo'))
656
742
 
657
743
    def test_getcwd(self):
658
744
        cwd = osutils._win32_getcwd()
687
773
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
688
774
 
689
775
 
690
 
class TestWin32FuncsDirs(TestCaseInTempDir):
 
776
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
691
777
    """Test win32 functions that create files."""
692
778
 
693
779
    def test_getcwd(self):
694
 
        if win32utils.winver == 'Windows 98':
695
 
            raise TestSkipped('Windows 98 cannot handle unicode filenames')
696
 
        # Make sure getcwd can handle unicode filenames
697
 
        try:
698
 
            os.mkdir(u'mu-\xb5')
699
 
        except UnicodeError:
700
 
            raise TestSkipped("Unable to create Unicode filename")
701
 
 
 
780
        self.requireFeature(tests.UnicodeFilenameFeature)
 
781
        os.mkdir(u'mu-\xb5')
702
782
        os.chdir(u'mu-\xb5')
703
783
        # TODO: jam 20060427 This will probably fail on Mac OSX because
704
784
        #       it will change the normalization of B\xe5gfors
709
789
    def test_minimum_path_selection(self):
710
790
        self.assertEqual(set(),
711
791
            osutils.minimum_path_selection([]))
 
792
        self.assertEqual(set(['a']),
 
793
            osutils.minimum_path_selection(['a']))
712
794
        self.assertEqual(set(['a', 'b']),
713
795
            osutils.minimum_path_selection(['a', 'b']))
714
796
        self.assertEqual(set(['a/', 'b']),
715
797
            osutils.minimum_path_selection(['a/', 'b']))
716
798
        self.assertEqual(set(['a/', 'b']),
717
799
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
800
        self.assertEqual(set(['a-b', 'a', 'a0b']),
 
801
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
718
802
 
719
803
    def test_mkdtemp(self):
720
804
        tmpdir = osutils._win32_mkdtemp(dir='.')
776
860
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
777
861
 
778
862
 
779
 
class TestMacFuncsDirs(TestCaseInTempDir):
 
863
class TestParentDirectories(tests.TestCaseInTempDir):
 
864
    """Test osutils.parent_directories()"""
 
865
 
 
866
    def test_parent_directories(self):
 
867
        self.assertEqual([], osutils.parent_directories('a'))
 
868
        self.assertEqual(['a'], osutils.parent_directories('a/b'))
 
869
        self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
 
870
 
 
871
 
 
872
class TestMacFuncsDirs(tests.TestCaseInTempDir):
780
873
    """Test mac special functions that require directories."""
781
874
 
782
875
    def test_getcwd(self):
783
 
        # On Mac, this will actually create Ba\u030agfors
784
 
        # but chdir will still work, because it accepts both paths
785
 
        try:
786
 
            os.mkdir(u'B\xe5gfors')
787
 
        except UnicodeError:
788
 
            raise TestSkipped("Unable to create Unicode filename")
789
 
 
 
876
        self.requireFeature(tests.UnicodeFilenameFeature)
 
877
        os.mkdir(u'B\xe5gfors')
790
878
        os.chdir(u'B\xe5gfors')
791
879
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
792
880
 
793
881
    def test_getcwd_nonnorm(self):
 
882
        self.requireFeature(tests.UnicodeFilenameFeature)
794
883
        # Test that _mac_getcwd() will normalize this path
795
 
        try:
796
 
            os.mkdir(u'Ba\u030agfors')
797
 
        except UnicodeError:
798
 
            raise TestSkipped("Unable to create Unicode filename")
799
 
 
 
884
        os.mkdir(u'Ba\u030agfors')
800
885
        os.chdir(u'Ba\u030agfors')
801
886
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
802
887
 
803
888
 
804
 
class TestChunksToLines(TestCase):
 
889
class TestChunksToLines(tests.TestCase):
805
890
 
806
891
    def test_smoketest(self):
807
892
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
818
903
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
819
904
 
820
905
 
821
 
class TestSplitLines(TestCase):
 
906
class TestSplitLines(tests.TestCase):
822
907
 
823
908
    def test_split_unicode(self):
824
909
        self.assertEqual([u'foo\n', u'bar\xae'],
831
916
                         osutils.split_lines('foo\rbar\n'))
832
917
 
833
918
 
834
 
class TestWalkDirs(TestCaseInTempDir):
 
919
class TestWalkDirs(tests.TestCaseInTempDir):
 
920
 
 
921
    def assertExpectedBlocks(self, expected, result):
 
922
        self.assertEqual(expected,
 
923
                         [(dirinfo, [line[0:3] for line in block])
 
924
                          for dirinfo, block in result])
835
925
 
836
926
    def test_walkdirs(self):
837
927
        tree = [
870
960
            result.append((dirdetail, dirblock))
871
961
 
872
962
        self.assertTrue(found_bzrdir)
873
 
        self.assertEqual(expected_dirblocks,
874
 
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
963
        self.assertExpectedBlocks(expected_dirblocks, result)
875
964
        # you can search a subdir only, with a supplied prefix.
876
965
        result = []
877
966
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
878
967
            result.append(dirblock)
879
 
        self.assertEqual(expected_dirblocks[1:],
880
 
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
968
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
881
969
 
882
970
    def test_walkdirs_os_error(self):
883
971
        # <https://bugs.edge.launchpad.net/bzr/+bug/338653>
889
977
        os.mkdir("test-unreadable")
890
978
        os.chmod("test-unreadable", 0000)
891
979
        # must chmod it back so that it can be removed
892
 
        self.addCleanup(lambda: os.chmod("test-unreadable", 0700))
 
980
        self.addCleanup(os.chmod, "test-unreadable", 0700)
893
981
        # The error is not raised until the generator is actually evaluated.
894
982
        # (It would be ok if it happened earlier but at the moment it
895
983
        # doesn't.)
896
 
        e = self.assertRaises(OSError, list,
897
 
            osutils._walkdirs_utf8("."))
898
 
        self.assertEquals(e.filename, './test-unreadable')
899
 
        self.assertEquals(str(e),
900
 
            "[Errno 13] chdir: Permission denied: './test-unreadable'")
 
984
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
 
985
        self.assertEquals('./test-unreadable', e.filename)
 
986
        self.assertEquals(errno.EACCES, e.errno)
 
987
        # Ensure the message contains the file name
 
988
        self.assertContainsRe(str(e), "\./test-unreadable")
901
989
 
902
990
    def test__walkdirs_utf8(self):
903
991
        tree = [
936
1024
            result.append((dirdetail, dirblock))
937
1025
 
938
1026
        self.assertTrue(found_bzrdir)
939
 
        self.assertEqual(expected_dirblocks,
940
 
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
1027
        self.assertExpectedBlocks(expected_dirblocks, result)
 
1028
 
941
1029
        # you can search a subdir only, with a supplied prefix.
942
1030
        result = []
943
1031
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
944
1032
            result.append(dirblock)
945
 
        self.assertEqual(expected_dirblocks[1:],
946
 
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
1033
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
947
1034
 
948
1035
    def _filter_out_stat(self, result):
949
1036
        """Filter out the stat value from the walkdirs result"""
964
1051
            osutils._selected_dir_reader = cur_dir_reader
965
1052
        self.addCleanup(restore)
966
1053
 
967
 
    def assertReadFSDirIs(self, expected):
 
1054
    def assertDirReaderIs(self, expected):
968
1055
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
969
1056
        # Force it to redetect
970
1057
        osutils._selected_dir_reader = None
977
1064
        self._save_platform_info()
978
1065
        win32utils.winver = None # Avoid the win32 detection code
979
1066
        osutils._fs_enc = 'UTF-8'
980
 
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
 
1067
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
981
1068
 
982
1069
    def test_force_walkdirs_utf8_fs_ascii(self):
983
1070
        self.requireFeature(UTF8DirReaderFeature)
984
1071
        self._save_platform_info()
985
1072
        win32utils.winver = None # Avoid the win32 detection code
986
1073
        osutils._fs_enc = 'US-ASCII'
987
 
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
 
1074
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
988
1075
 
989
1076
    def test_force_walkdirs_utf8_fs_ANSI(self):
990
1077
        self.requireFeature(UTF8DirReaderFeature)
991
1078
        self._save_platform_info()
992
1079
        win32utils.winver = None # Avoid the win32 detection code
993
1080
        osutils._fs_enc = 'ANSI_X3.4-1968'
994
 
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
 
1081
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
995
1082
 
996
1083
    def test_force_walkdirs_utf8_fs_latin1(self):
997
1084
        self._save_platform_info()
998
1085
        win32utils.winver = None # Avoid the win32 detection code
999
1086
        osutils._fs_enc = 'latin1'
1000
 
        self.assertReadFSDirIs(osutils.UnicodeDirReader)
 
1087
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1001
1088
 
1002
1089
    def test_force_walkdirs_utf8_nt(self):
1003
1090
        # Disabled because the thunk of the whole walkdirs api is disabled.
1004
 
        self.requireFeature(Win32ReadDirFeature)
 
1091
        self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1005
1092
        self._save_platform_info()
1006
1093
        win32utils.winver = 'Windows NT'
1007
1094
        from bzrlib._walkdirs_win32 import Win32ReadDir
1008
 
        self.assertReadFSDirIs(Win32ReadDir)
 
1095
        self.assertDirReaderIs(Win32ReadDir)
1009
1096
 
1010
1097
    def test_force_walkdirs_utf8_98(self):
1011
 
        self.requireFeature(Win32ReadDirFeature)
 
1098
        self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1012
1099
        self._save_platform_info()
1013
1100
        win32utils.winver = 'Windows 98'
1014
 
        self.assertReadFSDirIs(osutils.UnicodeDirReader)
 
1101
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1015
1102
 
1016
1103
    def test_unicode_walkdirs(self):
1017
1104
        """Walkdirs should always return unicode paths."""
 
1105
        self.requireFeature(tests.UnicodeFilenameFeature)
1018
1106
        name0 = u'0file-\xb6'
1019
1107
        name1 = u'1dir-\u062c\u0648'
1020
1108
        name2 = u'2file-\u0633'
1025
1113
            name1 + '/' + name1 + '/',
1026
1114
            name2,
1027
1115
            ]
1028
 
        try:
1029
 
            self.build_tree(tree)
1030
 
        except UnicodeError:
1031
 
            raise TestSkipped('Could not represent Unicode chars'
1032
 
                              ' in current encoding.')
 
1116
        self.build_tree(tree)
1033
1117
        expected_dirblocks = [
1034
1118
                ((u'', u'.'),
1035
1119
                 [(name0, name0, 'file', './' + name0),
1061
1145
 
1062
1146
        The abspath portion might be in unicode or utf-8
1063
1147
        """
 
1148
        self.requireFeature(tests.UnicodeFilenameFeature)
1064
1149
        name0 = u'0file-\xb6'
1065
1150
        name1 = u'1dir-\u062c\u0648'
1066
1151
        name2 = u'2file-\u0633'
1071
1156
            name1 + '/' + name1 + '/',
1072
1157
            name2,
1073
1158
            ]
1074
 
        try:
1075
 
            self.build_tree(tree)
1076
 
        except UnicodeError:
1077
 
            raise TestSkipped('Could not represent Unicode chars'
1078
 
                              ' in current encoding.')
 
1159
        self.build_tree(tree)
1079
1160
        name0 = name0.encode('utf8')
1080
1161
        name1 = name1.encode('utf8')
1081
1162
        name2 = name2.encode('utf8')
1125
1206
 
1126
1207
        The abspath portion should be in unicode
1127
1208
        """
 
1209
        self.requireFeature(tests.UnicodeFilenameFeature)
1128
1210
        # Use the unicode reader. TODO: split into driver-and-driven unit
1129
1211
        # tests.
1130
1212
        self._save_platform_info()
1139
1221
            name1u + '/' + name1u + '/',
1140
1222
            name2u,
1141
1223
            ]
1142
 
        try:
1143
 
            self.build_tree(tree)
1144
 
        except UnicodeError:
1145
 
            raise TestSkipped('Could not represent Unicode chars'
1146
 
                              ' in current encoding.')
 
1224
        self.build_tree(tree)
1147
1225
        name0 = name0u.encode('utf8')
1148
1226
        name1 = name1u.encode('utf8')
1149
1227
        name2 = name2u.encode('utf8')
1174
1252
        self.assertEqual(expected_dirblocks, result)
1175
1253
 
1176
1254
    def test__walkdirs_utf8_win32readdir(self):
1177
 
        self.requireFeature(Win32ReadDirFeature)
 
1255
        self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1178
1256
        self.requireFeature(tests.UnicodeFilenameFeature)
1179
1257
        from bzrlib._walkdirs_win32 import Win32ReadDir
1180
1258
        self._save_platform_info()
1231
1309
 
1232
1310
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1233
1311
        """make sure our Stat values are valid"""
1234
 
        self.requireFeature(Win32ReadDirFeature)
 
1312
        self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1235
1313
        self.requireFeature(tests.UnicodeFilenameFeature)
1236
1314
        from bzrlib._walkdirs_win32 import Win32ReadDir
1237
1315
        name0u = u'0file-\xb6'
1255
1333
 
1256
1334
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1257
1335
        """make sure our Stat values are valid"""
1258
 
        self.requireFeature(Win32ReadDirFeature)
 
1336
        self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1259
1337
        self.requireFeature(tests.UnicodeFilenameFeature)
1260
1338
        from bzrlib._walkdirs_win32 import Win32ReadDir
1261
1339
        name0u = u'0dir-\u062c\u0648'
1346
1424
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1347
1425
 
1348
1426
 
1349
 
class TestCopyTree(TestCaseInTempDir):
 
1427
class TestCopyTree(tests.TestCaseInTempDir):
1350
1428
 
1351
1429
    def test_copy_basic_tree(self):
1352
1430
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1362
1440
        self.assertEqual(['c'], os.listdir('target/b'))
1363
1441
 
1364
1442
    def test_copy_tree_symlinks(self):
1365
 
        self.requireFeature(SymlinkFeature)
 
1443
        self.requireFeature(tests.SymlinkFeature)
1366
1444
        self.build_tree(['source/'])
1367
1445
        os.symlink('a/generic/path', 'source/lnk')
1368
1446
        osutils.copy_tree('source', 'target')
1398
1476
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1399
1477
 
1400
1478
 
1401
 
#class TestTerminalEncoding has been moved to test_osutils_encodings.py
1402
 
# [bialix] 2006/12/26
1403
 
 
1404
 
 
1405
 
class TestSetUnsetEnv(TestCase):
 
1479
class TestSetUnsetEnv(tests.TestCase):
1406
1480
    """Test updating the environment"""
1407
1481
 
1408
1482
    def setUp(self):
1435
1509
 
1436
1510
        So Unicode strings must be encoded.
1437
1511
        """
1438
 
        uni_val, env_val = probe_unicode_in_user_encoding()
 
1512
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
1439
1513
        if uni_val is None:
1440
 
            raise TestSkipped('Cannot find a unicode character that works in'
1441
 
                              ' encoding %s' % (osutils.get_user_encoding(),))
 
1514
            raise tests.TestSkipped(
 
1515
                'Cannot find a unicode character that works in encoding %s'
 
1516
                % (osutils.get_user_encoding(),))
1442
1517
 
1443
1518
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1444
1519
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1452
1527
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1453
1528
 
1454
1529
 
1455
 
class TestLocalTimeOffset(TestCase):
1456
 
 
1457
 
    def test_local_time_offset(self):
1458
 
        """Test that local_time_offset() returns a sane value."""
1459
 
        offset = osutils.local_time_offset()
1460
 
        self.assertTrue(isinstance(offset, int))
1461
 
        # Test that the offset is no more than a eighteen hours in
1462
 
        # either direction.
1463
 
        # Time zone handling is system specific, so it is difficult to
1464
 
        # do more specific tests, but a value outside of this range is
1465
 
        # probably wrong.
1466
 
        eighteen_hours = 18 * 3600
1467
 
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
1468
 
 
1469
 
    def test_local_time_offset_with_timestamp(self):
1470
 
        """Test that local_time_offset() works with a timestamp."""
1471
 
        offset = osutils.local_time_offset(1000000000.1234567)
1472
 
        self.assertTrue(isinstance(offset, int))
1473
 
        eighteen_hours = 18 * 3600
1474
 
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
1475
 
 
1476
 
 
1477
 
class TestShaFileByName(TestCaseInTempDir):
1478
 
 
1479
 
    def test_sha_empty(self):
1480
 
        self.build_tree_contents([('foo', '')])
1481
 
        expected_sha = osutils.sha_string('')
1482
 
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1483
 
 
1484
 
    def test_sha_mixed_endings(self):
1485
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1486
 
        self.build_tree_contents([('foo', text)])
1487
 
        expected_sha = osutils.sha_string(text)
1488
 
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1489
 
 
1490
 
 
1491
 
class TestResourceLoading(TestCaseInTempDir):
 
1530
class TestSizeShaFile(tests.TestCaseInTempDir):
 
1531
 
 
1532
    def test_sha_empty(self):
 
1533
        self.build_tree_contents([('foo', '')])
 
1534
        expected_sha = osutils.sha_string('')
 
1535
        f = open('foo')
 
1536
        self.addCleanup(f.close)
 
1537
        size, sha = osutils.size_sha_file(f)
 
1538
        self.assertEqual(0, size)
 
1539
        self.assertEqual(expected_sha, sha)
 
1540
 
 
1541
    def test_sha_mixed_endings(self):
 
1542
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1543
        self.build_tree_contents([('foo', text)])
 
1544
        expected_sha = osutils.sha_string(text)
 
1545
        f = open('foo')
 
1546
        self.addCleanup(f.close)
 
1547
        size, sha = osutils.size_sha_file(f)
 
1548
        self.assertEqual(38, size)
 
1549
        self.assertEqual(expected_sha, sha)
 
1550
 
 
1551
 
 
1552
class TestShaFileByName(tests.TestCaseInTempDir):
 
1553
 
 
1554
    def test_sha_empty(self):
 
1555
        self.build_tree_contents([('foo', '')])
 
1556
        expected_sha = osutils.sha_string('')
 
1557
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1558
 
 
1559
    def test_sha_mixed_endings(self):
 
1560
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1561
        self.build_tree_contents([('foo', text)])
 
1562
        expected_sha = osutils.sha_string(text)
 
1563
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1564
 
 
1565
 
 
1566
class TestResourceLoading(tests.TestCaseInTempDir):
1492
1567
 
1493
1568
    def test_resource_string(self):
1494
1569
        # test resource in bzrlib
1502
1577
            'yyy.xx')
1503
1578
        # test unknown resource
1504
1579
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
 
1580
 
 
1581
 
 
1582
class TestReCompile(tests.TestCase):
 
1583
 
 
1584
    def test_re_compile_checked(self):
 
1585
        r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
 
1586
        self.assertTrue(r.match('aaaa'))
 
1587
        self.assertTrue(r.match('aAaA'))
 
1588
 
 
1589
    def test_re_compile_checked_error(self):
 
1590
        # like https://bugs.launchpad.net/bzr/+bug/251352
 
1591
        err = self.assertRaises(
 
1592
            errors.BzrCommandError,
 
1593
            osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
 
1594
        self.assertEqual(
 
1595
            "Invalid regular expression in test case: '*': "
 
1596
            "nothing to repeat",
 
1597
            str(err))
 
1598
 
 
1599
 
 
1600
class TestDirReader(tests.TestCaseInTempDir):
 
1601
 
 
1602
    # Set by load_tests
 
1603
    _dir_reader_class = None
 
1604
    _native_to_unicode = None
 
1605
 
 
1606
    def setUp(self):
 
1607
        tests.TestCaseInTempDir.setUp(self)
 
1608
 
 
1609
        # Save platform specific info and reset it
 
1610
        cur_dir_reader = osutils._selected_dir_reader
 
1611
 
 
1612
        def restore():
 
1613
            osutils._selected_dir_reader = cur_dir_reader
 
1614
        self.addCleanup(restore)
 
1615
 
 
1616
        osutils._selected_dir_reader = self._dir_reader_class()
 
1617
 
 
1618
    def _get_ascii_tree(self):
 
1619
        tree = [
 
1620
            '0file',
 
1621
            '1dir/',
 
1622
            '1dir/0file',
 
1623
            '1dir/1dir/',
 
1624
            '2file'
 
1625
            ]
 
1626
        expected_dirblocks = [
 
1627
                (('', '.'),
 
1628
                 [('0file', '0file', 'file'),
 
1629
                  ('1dir', '1dir', 'directory'),
 
1630
                  ('2file', '2file', 'file'),
 
1631
                 ]
 
1632
                ),
 
1633
                (('1dir', './1dir'),
 
1634
                 [('1dir/0file', '0file', 'file'),
 
1635
                  ('1dir/1dir', '1dir', 'directory'),
 
1636
                 ]
 
1637
                ),
 
1638
                (('1dir/1dir', './1dir/1dir'),
 
1639
                 [
 
1640
                 ]
 
1641
                ),
 
1642
            ]
 
1643
        return tree, expected_dirblocks
 
1644
 
 
1645
    def test_walk_cur_dir(self):
 
1646
        tree, expected_dirblocks = self._get_ascii_tree()
 
1647
        self.build_tree(tree)
 
1648
        result = list(osutils._walkdirs_utf8('.'))
 
1649
        # Filter out stat and abspath
 
1650
        self.assertEqual(expected_dirblocks,
 
1651
                         [(dirinfo, [line[0:3] for line in block])
 
1652
                          for dirinfo, block in result])
 
1653
 
 
1654
    def test_walk_sub_dir(self):
 
1655
        tree, expected_dirblocks = self._get_ascii_tree()
 
1656
        self.build_tree(tree)
 
1657
        # you can search a subdir only, with a supplied prefix.
 
1658
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
 
1659
        # Filter out stat and abspath
 
1660
        self.assertEqual(expected_dirblocks[1:],
 
1661
                         [(dirinfo, [line[0:3] for line in block])
 
1662
                          for dirinfo, block in result])
 
1663
 
 
1664
    def _get_unicode_tree(self):
 
1665
        name0u = u'0file-\xb6'
 
1666
        name1u = u'1dir-\u062c\u0648'
 
1667
        name2u = u'2file-\u0633'
 
1668
        tree = [
 
1669
            name0u,
 
1670
            name1u + '/',
 
1671
            name1u + '/' + name0u,
 
1672
            name1u + '/' + name1u + '/',
 
1673
            name2u,
 
1674
            ]
 
1675
        name0 = name0u.encode('UTF-8')
 
1676
        name1 = name1u.encode('UTF-8')
 
1677
        name2 = name2u.encode('UTF-8')
 
1678
        expected_dirblocks = [
 
1679
                (('', '.'),
 
1680
                 [(name0, name0, 'file', './' + name0u),
 
1681
                  (name1, name1, 'directory', './' + name1u),
 
1682
                  (name2, name2, 'file', './' + name2u),
 
1683
                 ]
 
1684
                ),
 
1685
                ((name1, './' + name1u),
 
1686
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1687
                                                        + '/' + name0u),
 
1688
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1689
                                                            + '/' + name1u),
 
1690
                 ]
 
1691
                ),
 
1692
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1693
                 [
 
1694
                 ]
 
1695
                ),
 
1696
            ]
 
1697
        return tree, expected_dirblocks
 
1698
 
 
1699
    def _filter_out(self, raw_dirblocks):
 
1700
        """Filter out a walkdirs_utf8 result.
 
1701
 
 
1702
        stat field is removed, all native paths are converted to unicode
 
1703
        """
 
1704
        filtered_dirblocks = []
 
1705
        for dirinfo, block in raw_dirblocks:
 
1706
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
 
1707
            details = []
 
1708
            for line in block:
 
1709
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
 
1710
            filtered_dirblocks.append((dirinfo, details))
 
1711
        return filtered_dirblocks
 
1712
 
 
1713
    def test_walk_unicode_tree(self):
 
1714
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1715
        tree, expected_dirblocks = self._get_unicode_tree()
 
1716
        self.build_tree(tree)
 
1717
        result = list(osutils._walkdirs_utf8('.'))
 
1718
        self.assertEqual(expected_dirblocks, self._filter_out(result))
 
1719
 
 
1720
    def test_symlink(self):
 
1721
        self.requireFeature(tests.SymlinkFeature)
 
1722
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1723
        target = u'target\N{Euro Sign}'
 
1724
        link_name = u'l\N{Euro Sign}nk'
 
1725
        os.symlink(target, link_name)
 
1726
        target_utf8 = target.encode('UTF-8')
 
1727
        link_name_utf8 = link_name.encode('UTF-8')
 
1728
        expected_dirblocks = [
 
1729
                (('', '.'),
 
1730
                 [(link_name_utf8, link_name_utf8,
 
1731
                   'symlink', './' + link_name),],
 
1732
                 )]
 
1733
        result = list(osutils._walkdirs_utf8('.'))
 
1734
        self.assertEqual(expected_dirblocks, self._filter_out(result))
 
1735
 
 
1736
 
 
1737
class TestReadLink(tests.TestCaseInTempDir):
 
1738
    """Exposes os.readlink() problems and the osutils solution.
 
1739
 
 
1740
    The only guarantee offered by os.readlink(), starting with 2.6, is that a
 
1741
    unicode string will be returned if a unicode string is passed.
 
1742
 
 
1743
    But prior python versions failed to properly encode the passed unicode
 
1744
    string.
 
1745
    """
 
1746
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
 
1747
 
 
1748
    def setUp(self):
 
1749
        super(tests.TestCaseInTempDir, self).setUp()
 
1750
        self.link = u'l\N{Euro Sign}ink'
 
1751
        self.target = u'targe\N{Euro Sign}t'
 
1752
        os.symlink(self.target, self.link)
 
1753
 
 
1754
    def test_os_readlink_link_encoding(self):
 
1755
        if sys.version_info < (2, 6):
 
1756
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
 
1757
        else:
 
1758
            self.assertEquals(self.target,  os.readlink(self.link))
 
1759
 
 
1760
    def test_os_readlink_link_decoding(self):
 
1761
        self.assertEquals(self.target.encode(osutils._fs_enc),
 
1762
                          os.readlink(self.link.encode(osutils._fs_enc)))
 
1763
 
 
1764
 
 
1765
class TestConcurrency(tests.TestCase):
 
1766
 
 
1767
    def test_local_concurrency(self):
 
1768
        concurrency = osutils.local_concurrency()
 
1769
        self.assertIsInstance(concurrency, int)