/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: John Arbash Meinel
  • Date: 2009-12-03 20:34:03 UTC
  • mto: This revision was merged to the branch mainline in revision 4865.
  • Revision ID: john@arbash-meinel.com-20091203203403-cww6ez5nvp616c85
Found the failed-to-unlocked branches in test_remote.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the osutils wrapper."""
 
18
 
 
19
from cStringIO import StringIO
 
20
import errno
 
21
import os
 
22
import re
 
23
import socket
 
24
import stat
 
25
import sys
 
26
import time
 
27
 
 
28
from bzrlib import (
 
29
    errors,
 
30
    osutils,
 
31
    tests,
 
32
    trace,
 
33
    win32utils,
 
34
    )
 
35
from bzrlib.tests import (
 
36
    file_utils,
 
37
    test__walkdirs_win32,
 
38
    )
 
39
 
 
40
 
 
41
class _UTF8DirReaderFeature(tests.Feature):
 
42
 
 
43
    def _probe(self):
 
44
        try:
 
45
            from bzrlib import _readdir_pyx
 
46
            self.reader = _readdir_pyx.UTF8DirReader
 
47
            return True
 
48
        except ImportError:
 
49
            return False
 
50
 
 
51
    def feature_name(self):
 
52
        return 'bzrlib._readdir_pyx'
 
53
 
 
54
UTF8DirReaderFeature = _UTF8DirReaderFeature()
 
55
 
 
56
 
 
57
def _already_unicode(s):
 
58
    return s
 
59
 
 
60
 
 
61
def _utf8_to_unicode(s):
 
62
    return s.decode('UTF-8')
 
63
 
 
64
 
 
65
def dir_reader_scenarios():
 
66
    # For each dir reader we define:
 
67
 
 
68
    # - native_to_unicode: a function converting the native_abspath as returned
 
69
    #   by DirReader.read_dir to its unicode representation
 
70
 
 
71
    # UnicodeDirReader is the fallback, it should be tested on all platforms.
 
72
    scenarios = [('unicode',
 
73
                  dict(_dir_reader_class=osutils.UnicodeDirReader,
 
74
                       _native_to_unicode=_already_unicode))]
 
75
    # Some DirReaders are platform specific and even there they may not be
 
76
    # available.
 
77
    if UTF8DirReaderFeature.available():
 
78
        from bzrlib import _readdir_pyx
 
79
        scenarios.append(('utf8',
 
80
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
 
81
                               _native_to_unicode=_utf8_to_unicode)))
 
82
 
 
83
    if test__walkdirs_win32.Win32ReadDirFeature.available():
 
84
        try:
 
85
            from bzrlib import _walkdirs_win32
 
86
            scenarios.append(
 
87
                ('win32',
 
88
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
 
89
                      _native_to_unicode=_already_unicode)))
 
90
        except ImportError:
 
91
            pass
 
92
    return scenarios
 
93
 
 
94
 
 
95
def load_tests(basic_tests, module, loader):
 
96
    suite = loader.suiteClass()
 
97
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
 
98
        basic_tests, tests.condition_isinstance(TestDirReader))
 
99
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
 
100
    suite.addTest(remaining_tests)
 
101
    return suite
 
102
 
 
103
 
 
104
class TestContainsWhitespace(tests.TestCase):
 
105
 
 
106
    def test_contains_whitespace(self):
 
107
        self.failUnless(osutils.contains_whitespace(u' '))
 
108
        self.failUnless(osutils.contains_whitespace(u'hello there'))
 
109
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
 
110
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
 
111
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
 
112
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
113
 
 
114
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
 
115
        # is whitespace, but we do not.
 
116
        self.failIf(osutils.contains_whitespace(u''))
 
117
        self.failIf(osutils.contains_whitespace(u'hellothere'))
 
118
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
119
 
 
120
 
 
121
class TestRename(tests.TestCaseInTempDir):
 
122
 
 
123
    def create_file(self, filename, content):
 
124
        f = open(filename, 'wb')
 
125
        try:
 
126
            f.write(content)
 
127
        finally:
 
128
            f.close()
 
129
 
 
130
    def _fancy_rename(self, a, b):
 
131
        osutils.fancy_rename(a, b, rename_func=os.rename,
 
132
                             unlink_func=os.unlink)
 
133
 
 
134
    def test_fancy_rename(self):
 
135
        # This should work everywhere
 
136
        self.create_file('a', 'something in a\n')
 
137
        self._fancy_rename('a', 'b')
 
138
        self.failIfExists('a')
 
139
        self.failUnlessExists('b')
 
140
        self.check_file_contents('b', 'something in a\n')
 
141
 
 
142
        self.create_file('a', 'new something in a\n')
 
143
        self._fancy_rename('b', 'a')
 
144
 
 
145
        self.check_file_contents('a', 'something in a\n')
 
146
 
 
147
    def test_fancy_rename_fails_source_missing(self):
 
148
        # An exception should be raised, and the target should be left in place
 
149
        self.create_file('target', 'data in target\n')
 
150
        self.assertRaises((IOError, OSError), self._fancy_rename,
 
151
                          'missingsource', 'target')
 
152
        self.failUnlessExists('target')
 
153
        self.check_file_contents('target', 'data in target\n')
 
154
 
 
155
    def test_fancy_rename_fails_if_source_and_target_missing(self):
 
156
        self.assertRaises((IOError, OSError), self._fancy_rename,
 
157
                          'missingsource', 'missingtarget')
 
158
 
 
159
    def test_rename(self):
 
160
        # Rename should be semi-atomic on all platforms
 
161
        self.create_file('a', 'something in a\n')
 
162
        osutils.rename('a', 'b')
 
163
        self.failIfExists('a')
 
164
        self.failUnlessExists('b')
 
165
        self.check_file_contents('b', 'something in a\n')
 
166
 
 
167
        self.create_file('a', 'new something in a\n')
 
168
        osutils.rename('b', 'a')
 
169
 
 
170
        self.check_file_contents('a', 'something in a\n')
 
171
 
 
172
    # TODO: test fancy_rename using a MemoryTransport
 
173
 
 
174
    def test_rename_change_case(self):
 
175
        # on Windows we should be able to change filename case by rename
 
176
        self.build_tree(['a', 'b/'])
 
177
        osutils.rename('a', 'A')
 
178
        osutils.rename('b', 'B')
 
179
        # we can't use failUnlessExists on case-insensitive filesystem
 
180
        # so try to check shape of the tree
 
181
        shape = sorted(os.listdir('.'))
 
182
        self.assertEquals(['A', 'B'], shape)
 
183
 
 
184
 
 
185
class TestRandChars(tests.TestCase):
 
186
 
 
187
    def test_01_rand_chars_empty(self):
 
188
        result = osutils.rand_chars(0)
 
189
        self.assertEqual(result, '')
 
190
 
 
191
    def test_02_rand_chars_100(self):
 
192
        result = osutils.rand_chars(100)
 
193
        self.assertEqual(len(result), 100)
 
194
        self.assertEqual(type(result), str)
 
195
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
 
196
 
 
197
 
 
198
class TestIsInside(tests.TestCase):
 
199
 
 
200
    def test_is_inside(self):
 
201
        is_inside = osutils.is_inside
 
202
        self.assertTrue(is_inside('src', 'src/foo.c'))
 
203
        self.assertFalse(is_inside('src', 'srccontrol'))
 
204
        self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
 
205
        self.assertTrue(is_inside('foo.c', 'foo.c'))
 
206
        self.assertFalse(is_inside('foo.c', ''))
 
207
        self.assertTrue(is_inside('', 'foo.c'))
 
208
 
 
209
    def test_is_inside_any(self):
 
210
        SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
 
211
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
 
212
                         (['src'], SRC_FOO_C),
 
213
                         (['src'], 'src'),
 
214
                         ]:
 
215
            self.assert_(osutils.is_inside_any(dirs, fn))
 
216
        for dirs, fn in [(['src'], 'srccontrol'),
 
217
                         (['src'], 'srccontrol/foo')]:
 
218
            self.assertFalse(osutils.is_inside_any(dirs, fn))
 
219
 
 
220
    def test_is_inside_or_parent_of_any(self):
 
221
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
 
222
                         (['src'], 'src/foo.c'),
 
223
                         (['src/bar.c'], 'src'),
 
224
                         (['src/bar.c', 'bla/foo.c'], 'src'),
 
225
                         (['src'], 'src'),
 
226
                         ]:
 
227
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
 
228
 
 
229
        for dirs, fn in [(['src'], 'srccontrol'),
 
230
                         (['srccontrol/foo.c'], 'src'),
 
231
                         (['src'], 'srccontrol/foo')]:
 
232
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
 
233
 
 
234
 
 
235
class TestRmTree(tests.TestCaseInTempDir):
 
236
 
 
237
    def test_rmtree(self):
 
238
        # Check to remove tree with read-only files/dirs
 
239
        os.mkdir('dir')
 
240
        f = file('dir/file', 'w')
 
241
        f.write('spam')
 
242
        f.close()
 
243
        # would like to also try making the directory readonly, but at the
 
244
        # moment python shutil.rmtree doesn't handle that properly - it would
 
245
        # need to chmod the directory before removing things inside it - deferred
 
246
        # for now -- mbp 20060505
 
247
        # osutils.make_readonly('dir')
 
248
        osutils.make_readonly('dir/file')
 
249
 
 
250
        osutils.rmtree('dir')
 
251
 
 
252
        self.failIfExists('dir/file')
 
253
        self.failIfExists('dir')
 
254
 
 
255
 
 
256
class TestDeleteAny(tests.TestCaseInTempDir):
 
257
 
 
258
    def test_delete_any_readonly(self):
 
259
        # from <https://bugs.launchpad.net/bzr/+bug/218206>
 
260
        self.build_tree(['d/', 'f'])
 
261
        osutils.make_readonly('d')
 
262
        osutils.make_readonly('f')
 
263
 
 
264
        osutils.delete_any('f')
 
265
        osutils.delete_any('d')
 
266
 
 
267
 
 
268
class TestKind(tests.TestCaseInTempDir):
 
269
 
 
270
    def test_file_kind(self):
 
271
        self.build_tree(['file', 'dir/'])
 
272
        self.assertEquals('file', osutils.file_kind('file'))
 
273
        self.assertEquals('directory', osutils.file_kind('dir/'))
 
274
        if osutils.has_symlinks():
 
275
            os.symlink('symlink', 'symlink')
 
276
            self.assertEquals('symlink', osutils.file_kind('symlink'))
 
277
 
 
278
        # TODO: jam 20060529 Test a block device
 
279
        try:
 
280
            os.lstat('/dev/null')
 
281
        except OSError, e:
 
282
            if e.errno not in (errno.ENOENT,):
 
283
                raise
 
284
        else:
 
285
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
 
286
 
 
287
        mkfifo = getattr(os, 'mkfifo', None)
 
288
        if mkfifo:
 
289
            mkfifo('fifo')
 
290
            try:
 
291
                self.assertEquals('fifo', osutils.file_kind('fifo'))
 
292
            finally:
 
293
                os.remove('fifo')
 
294
 
 
295
        AF_UNIX = getattr(socket, 'AF_UNIX', None)
 
296
        if AF_UNIX:
 
297
            s = socket.socket(AF_UNIX)
 
298
            s.bind('socket')
 
299
            try:
 
300
                self.assertEquals('socket', osutils.file_kind('socket'))
 
301
            finally:
 
302
                os.remove('socket')
 
303
 
 
304
    def test_kind_marker(self):
 
305
        self.assertEqual("", osutils.kind_marker("file"))
 
306
        self.assertEqual("/", osutils.kind_marker('directory'))
 
307
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
 
308
        self.assertEqual("@", osutils.kind_marker("symlink"))
 
309
        self.assertEqual("+", osutils.kind_marker("tree-reference"))
 
310
        self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
 
311
 
 
312
 
 
313
class TestUmask(tests.TestCaseInTempDir):
 
314
 
 
315
    def test_get_umask(self):
 
316
        if sys.platform == 'win32':
 
317
            # umask always returns '0', no way to set it
 
318
            self.assertEqual(0, osutils.get_umask())
 
319
            return
 
320
 
 
321
        orig_umask = osutils.get_umask()
 
322
        self.addCleanup(os.umask, orig_umask)
 
323
        os.umask(0222)
 
324
        self.assertEqual(0222, osutils.get_umask())
 
325
        os.umask(0022)
 
326
        self.assertEqual(0022, osutils.get_umask())
 
327
        os.umask(0002)
 
328
        self.assertEqual(0002, osutils.get_umask())
 
329
        os.umask(0027)
 
330
        self.assertEqual(0027, osutils.get_umask())
 
331
 
 
332
 
 
333
class TestDateTime(tests.TestCase):
 
334
 
 
335
    def assertFormatedDelta(self, expected, seconds):
 
336
        """Assert osutils.format_delta formats as expected"""
 
337
        actual = osutils.format_delta(seconds)
 
338
        self.assertEqual(expected, actual)
 
339
 
 
340
    def test_format_delta(self):
 
341
        self.assertFormatedDelta('0 seconds ago', 0)
 
342
        self.assertFormatedDelta('1 second ago', 1)
 
343
        self.assertFormatedDelta('10 seconds ago', 10)
 
344
        self.assertFormatedDelta('59 seconds ago', 59)
 
345
        self.assertFormatedDelta('89 seconds ago', 89)
 
346
        self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
 
347
        self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
 
348
        self.assertFormatedDelta('3 minutes, 1 second ago', 181)
 
349
        self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
 
350
        self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
 
351
        self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
 
352
        self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
 
353
        self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
 
354
        self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
 
355
        self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
 
356
        self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
 
357
        self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
 
358
        self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
 
359
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
 
360
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
 
361
        self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
 
362
        self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
 
363
        self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
 
364
 
 
365
        # We handle when time steps the wrong direction because computers
 
366
        # don't have synchronized clocks.
 
367
        self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
 
368
        self.assertFormatedDelta('1 second in the future', -1)
 
369
        self.assertFormatedDelta('2 seconds in the future', -2)
 
370
 
 
371
    def test_format_date(self):
 
372
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
373
            osutils.format_date, 0, timezone='foo')
 
374
        self.assertIsInstance(osutils.format_date(0), str)
 
375
        self.assertIsInstance(osutils.format_local_date(0), unicode)
 
376
        # Testing for the actual value of the local weekday without
 
377
        # duplicating the code from format_date is difficult.
 
378
        # Instead blackbox.test_locale should check for localized
 
379
        # dates once they do occur in output strings.
 
380
 
 
381
    def test_format_date_with_offset_in_original_timezone(self):
 
382
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
 
383
            osutils.format_date_with_offset_in_original_timezone(0))
 
384
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
 
385
            osutils.format_date_with_offset_in_original_timezone(100000))
 
386
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
 
387
            osutils.format_date_with_offset_in_original_timezone(100000, 7200))
 
388
 
 
389
    def test_local_time_offset(self):
 
390
        """Test that local_time_offset() returns a sane value."""
 
391
        offset = osutils.local_time_offset()
 
392
        self.assertTrue(isinstance(offset, int))
 
393
        # Test that the offset is no more than a eighteen hours in
 
394
        # either direction.
 
395
        # Time zone handling is system specific, so it is difficult to
 
396
        # do more specific tests, but a value outside of this range is
 
397
        # probably wrong.
 
398
        eighteen_hours = 18 * 3600
 
399
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
400
 
 
401
    def test_local_time_offset_with_timestamp(self):
 
402
        """Test that local_time_offset() works with a timestamp."""
 
403
        offset = osutils.local_time_offset(1000000000.1234567)
 
404
        self.assertTrue(isinstance(offset, int))
 
405
        eighteen_hours = 18 * 3600
 
406
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
407
 
 
408
 
 
409
class TestLinks(tests.TestCaseInTempDir):
 
410
 
 
411
    def test_dereference_path(self):
 
412
        self.requireFeature(tests.SymlinkFeature)
 
413
        cwd = osutils.realpath('.')
 
414
        os.mkdir('bar')
 
415
        bar_path = osutils.pathjoin(cwd, 'bar')
 
416
        # Using './' to avoid bug #1213894 (first path component not
 
417
        # dereferenced) in Python 2.4.1 and earlier
 
418
        self.assertEqual(bar_path, osutils.realpath('./bar'))
 
419
        os.symlink('bar', 'foo')
 
420
        self.assertEqual(bar_path, osutils.realpath('./foo'))
 
421
 
 
422
        # Does not dereference terminal symlinks
 
423
        foo_path = osutils.pathjoin(cwd, 'foo')
 
424
        self.assertEqual(foo_path, osutils.dereference_path('./foo'))
 
425
 
 
426
        # Dereferences parent symlinks
 
427
        os.mkdir('bar/baz')
 
428
        baz_path = osutils.pathjoin(bar_path, 'baz')
 
429
        self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
 
430
 
 
431
        # Dereferences parent symlinks that are the first path element
 
432
        self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
 
433
 
 
434
        # Dereferences parent symlinks in absolute paths
 
435
        foo_baz_path = osutils.pathjoin(foo_path, 'baz')
 
436
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
 
437
 
 
438
    def test_changing_access(self):
 
439
        f = file('file', 'w')
 
440
        f.write('monkey')
 
441
        f.close()
 
442
 
 
443
        # Make a file readonly
 
444
        osutils.make_readonly('file')
 
445
        mode = os.lstat('file').st_mode
 
446
        self.assertEqual(mode, mode & 0777555)
 
447
 
 
448
        # Make a file writable
 
449
        osutils.make_writable('file')
 
450
        mode = os.lstat('file').st_mode
 
451
        self.assertEqual(mode, mode | 0200)
 
452
 
 
453
        if osutils.has_symlinks():
 
454
            # should not error when handed a symlink
 
455
            os.symlink('nonexistent', 'dangling')
 
456
            osutils.make_readonly('dangling')
 
457
            osutils.make_writable('dangling')
 
458
 
 
459
    def test_host_os_dereferences_symlinks(self):
 
460
        osutils.host_os_dereferences_symlinks()
 
461
 
 
462
 
 
463
class TestCanonicalRelPath(tests.TestCaseInTempDir):
 
464
 
 
465
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
 
466
 
 
467
    def test_canonical_relpath_simple(self):
 
468
        f = file('MixedCaseName', 'w')
 
469
        f.close()
 
470
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
 
471
        self.failUnlessEqual('work/MixedCaseName', actual)
 
472
 
 
473
    def test_canonical_relpath_missing_tail(self):
 
474
        os.mkdir('MixedCaseParent')
 
475
        actual = osutils.canonical_relpath(self.test_base_dir,
 
476
                                           'mixedcaseparent/nochild')
 
477
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
 
478
 
 
479
 
 
480
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
 
481
 
 
482
    def assertRelpath(self, expected, base, path):
 
483
        actual = osutils._cicp_canonical_relpath(base, path)
 
484
        self.assertEqual(expected, actual)
 
485
 
 
486
    def test_simple(self):
 
487
        self.build_tree(['MixedCaseName'])
 
488
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
 
489
        self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
 
490
 
 
491
    def test_subdir_missing_tail(self):
 
492
        self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
 
493
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
 
494
        self.assertRelpath('MixedCaseParent/a_child', base,
 
495
                           'MixedCaseParent/a_child')
 
496
        self.assertRelpath('MixedCaseParent/a_child', base,
 
497
                           'MixedCaseParent/A_Child')
 
498
        self.assertRelpath('MixedCaseParent/not_child', base,
 
499
                           'MixedCaseParent/not_child')
 
500
 
 
501
    def test_at_root_slash(self):
 
502
        # We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
 
503
        # check...
 
504
        if osutils.MIN_ABS_PATHLENGTH > 1:
 
505
            raise tests.TestSkipped('relpath requires %d chars'
 
506
                                    % osutils.MIN_ABS_PATHLENGTH)
 
507
        self.assertRelpath('foo', '/', '/foo')
 
508
 
 
509
    def test_at_root_drive(self):
 
510
        if sys.platform != 'win32':
 
511
            raise tests.TestNotApplicable('we can only test drive-letter relative'
 
512
                                          ' paths on Windows where we have drive'
 
513
                                          ' letters.')
 
514
        # see bug #322807
 
515
        # The specific issue is that when at the root of a drive, 'abspath'
 
516
        # returns "C:/" or just "/". However, the code assumes that abspath
 
517
        # always returns something like "C:/foo" or "/foo" (no trailing slash).
 
518
        self.assertRelpath('foo', 'C:/', 'C:/foo')
 
519
        self.assertRelpath('foo', 'X:/', 'X:/foo')
 
520
        self.assertRelpath('foo', 'X:/', 'X://foo')
 
521
 
 
522
 
 
523
class TestPumpFile(tests.TestCase):
 
524
    """Test pumpfile method."""
 
525
 
 
526
    def setUp(self):
 
527
        tests.TestCase.setUp(self)
 
528
        # create a test datablock
 
529
        self.block_size = 512
 
530
        pattern = '0123456789ABCDEF'
 
531
        self.test_data = pattern * (3 * self.block_size / len(pattern))
 
532
        self.test_data_len = len(self.test_data)
 
533
 
 
534
    def test_bracket_block_size(self):
 
535
        """Read data in blocks with the requested read size bracketing the
 
536
        block size."""
 
537
        # make sure test data is larger than max read size
 
538
        self.assertTrue(self.test_data_len > self.block_size)
 
539
 
 
540
        from_file = file_utils.FakeReadFile(self.test_data)
 
541
        to_file = StringIO()
 
542
 
 
543
        # read (max / 2) bytes and verify read size wasn't affected
 
544
        num_bytes_to_read = self.block_size / 2
 
545
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
546
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
547
        self.assertEqual(from_file.get_read_count(), 1)
 
548
 
 
549
        # read (max) bytes and verify read size wasn't affected
 
550
        num_bytes_to_read = self.block_size
 
551
        from_file.reset_read_count()
 
552
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
553
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
554
        self.assertEqual(from_file.get_read_count(), 1)
 
555
 
 
556
        # read (max + 1) bytes and verify read size was limited
 
557
        num_bytes_to_read = self.block_size + 1
 
558
        from_file.reset_read_count()
 
559
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
560
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
561
        self.assertEqual(from_file.get_read_count(), 2)
 
562
 
 
563
        # finish reading the rest of the data
 
564
        num_bytes_to_read = self.test_data_len - to_file.tell()
 
565
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
566
 
 
567
        # report error if the data wasn't equal (we only report the size due
 
568
        # to the length of the data)
 
569
        response_data = to_file.getvalue()
 
570
        if response_data != self.test_data:
 
571
            message = "Data not equal.  Expected %d bytes, received %d."
 
572
            self.fail(message % (len(response_data), self.test_data_len))
 
573
 
 
574
    def test_specified_size(self):
 
575
        """Request a transfer larger than the maximum block size and verify
 
576
        that the maximum read doesn't exceed the block_size."""
 
577
        # make sure test data is larger than max read size
 
578
        self.assertTrue(self.test_data_len > self.block_size)
 
579
 
 
580
        # retrieve data in blocks
 
581
        from_file = file_utils.FakeReadFile(self.test_data)
 
582
        to_file = StringIO()
 
583
        osutils.pumpfile(from_file, to_file, self.test_data_len,
 
584
                         self.block_size)
 
585
 
 
586
        # verify read size was equal to the maximum read size
 
587
        self.assertTrue(from_file.get_max_read_size() > 0)
 
588
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
589
        self.assertEqual(from_file.get_read_count(), 3)
 
590
 
 
591
        # report error if the data wasn't equal (we only report the size due
 
592
        # to the length of the data)
 
593
        response_data = to_file.getvalue()
 
594
        if response_data != self.test_data:
 
595
            message = "Data not equal.  Expected %d bytes, received %d."
 
596
            self.fail(message % (len(response_data), self.test_data_len))
 
597
 
 
598
    def test_to_eof(self):
 
599
        """Read to end-of-file and verify that the reads are not larger than
 
600
        the maximum read size."""
 
601
        # make sure test data is larger than max read size
 
602
        self.assertTrue(self.test_data_len > self.block_size)
 
603
 
 
604
        # retrieve data to EOF
 
605
        from_file = file_utils.FakeReadFile(self.test_data)
 
606
        to_file = StringIO()
 
607
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
 
608
 
 
609
        # verify read size was equal to the maximum read size
 
610
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
611
        self.assertEqual(from_file.get_read_count(), 4)
 
612
 
 
613
        # report error if the data wasn't equal (we only report the size due
 
614
        # to the length of the data)
 
615
        response_data = to_file.getvalue()
 
616
        if response_data != self.test_data:
 
617
            message = "Data not equal.  Expected %d bytes, received %d."
 
618
            self.fail(message % (len(response_data), self.test_data_len))
 
619
 
 
620
    def test_defaults(self):
 
621
        """Verifies that the default arguments will read to EOF -- this
 
622
        test verifies that any existing usages of pumpfile will not be broken
 
623
        with this new version."""
 
624
        # retrieve data using default (old) pumpfile method
 
625
        from_file = file_utils.FakeReadFile(self.test_data)
 
626
        to_file = StringIO()
 
627
        osutils.pumpfile(from_file, to_file)
 
628
 
 
629
        # report error if the data wasn't equal (we only report the size due
 
630
        # to the length of the data)
 
631
        response_data = to_file.getvalue()
 
632
        if response_data != self.test_data:
 
633
            message = "Data not equal.  Expected %d bytes, received %d."
 
634
            self.fail(message % (len(response_data), self.test_data_len))
 
635
 
 
636
    def test_report_activity(self):
 
637
        activity = []
 
638
        def log_activity(length, direction):
 
639
            activity.append((length, direction))
 
640
        from_file = StringIO(self.test_data)
 
641
        to_file = StringIO()
 
642
        osutils.pumpfile(from_file, to_file, buff_size=500,
 
643
                         report_activity=log_activity, direction='read')
 
644
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
 
645
                          (36, 'read')], activity)
 
646
 
 
647
        from_file = StringIO(self.test_data)
 
648
        to_file = StringIO()
 
649
        del activity[:]
 
650
        osutils.pumpfile(from_file, to_file, buff_size=500,
 
651
                         report_activity=log_activity, direction='write')
 
652
        self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
 
653
                          (36, 'write')], activity)
 
654
 
 
655
        # And with a limited amount of data
 
656
        from_file = StringIO(self.test_data)
 
657
        to_file = StringIO()
 
658
        del activity[:]
 
659
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
 
660
                         report_activity=log_activity, direction='read')
 
661
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
662
 
 
663
 
 
664
 
 
665
class TestPumpStringFile(tests.TestCase):
 
666
 
 
667
    def test_empty(self):
 
668
        output = StringIO()
 
669
        osutils.pump_string_file("", output)
 
670
        self.assertEqual("", output.getvalue())
 
671
 
 
672
    def test_more_than_segment_size(self):
 
673
        output = StringIO()
 
674
        osutils.pump_string_file("123456789", output, 2)
 
675
        self.assertEqual("123456789", output.getvalue())
 
676
 
 
677
    def test_segment_size(self):
 
678
        output = StringIO()
 
679
        osutils.pump_string_file("12", output, 2)
 
680
        self.assertEqual("12", output.getvalue())
 
681
 
 
682
    def test_segment_size_multiple(self):
 
683
        output = StringIO()
 
684
        osutils.pump_string_file("1234", output, 2)
 
685
        self.assertEqual("1234", output.getvalue())
 
686
 
 
687
 
 
688
class TestRelpath(tests.TestCase):
 
689
 
 
690
    def test_simple_relpath(self):
 
691
        cwd = osutils.getcwd()
 
692
        subdir = cwd + '/subdir'
 
693
        self.assertEqual('subdir', osutils.relpath(cwd, subdir))
 
694
 
 
695
    def test_deep_relpath(self):
 
696
        cwd = osutils.getcwd()
 
697
        subdir = cwd + '/sub/subsubdir'
 
698
        self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
 
699
 
 
700
    def test_not_relative(self):
 
701
        self.assertRaises(errors.PathNotChild,
 
702
                          osutils.relpath, 'C:/path', 'H:/path')
 
703
        self.assertRaises(errors.PathNotChild,
 
704
                          osutils.relpath, 'C:/', 'H:/path')
 
705
 
 
706
 
 
707
class TestSafeUnicode(tests.TestCase):
 
708
 
 
709
    def test_from_ascii_string(self):
 
710
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
 
711
 
 
712
    def test_from_unicode_string_ascii_contents(self):
 
713
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
 
714
 
 
715
    def test_from_unicode_string_unicode_contents(self):
 
716
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
 
717
 
 
718
    def test_from_utf8_string(self):
 
719
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
 
720
 
 
721
    def test_bad_utf8_string(self):
 
722
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
723
                          osutils.safe_unicode,
 
724
                          '\xbb\xbb')
 
725
 
 
726
 
 
727
class TestSafeUtf8(tests.TestCase):
 
728
 
 
729
    def test_from_ascii_string(self):
 
730
        f = 'foobar'
 
731
        self.assertEqual('foobar', osutils.safe_utf8(f))
 
732
 
 
733
    def test_from_unicode_string_ascii_contents(self):
 
734
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
 
735
 
 
736
    def test_from_unicode_string_unicode_contents(self):
 
737
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
738
 
 
739
    def test_from_utf8_string(self):
 
740
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
 
741
 
 
742
    def test_bad_utf8_string(self):
 
743
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
744
                          osutils.safe_utf8, '\xbb\xbb')
 
745
 
 
746
 
 
747
class TestSafeRevisionId(tests.TestCase):
 
748
 
 
749
    def test_from_ascii_string(self):
 
750
        # this shouldn't give a warning because it's getting an ascii string
 
751
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
752
 
 
753
    def test_from_unicode_string_ascii_contents(self):
 
754
        self.assertEqual('bargam',
 
755
                         osutils.safe_revision_id(u'bargam', warn=False))
 
756
 
 
757
    def test_from_unicode_deprecated(self):
 
758
        self.assertEqual('bargam',
 
759
            self.callDeprecated([osutils._revision_id_warning],
 
760
                                osutils.safe_revision_id, u'bargam'))
 
761
 
 
762
    def test_from_unicode_string_unicode_contents(self):
 
763
        self.assertEqual('bargam\xc2\xae',
 
764
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
765
 
 
766
    def test_from_utf8_string(self):
 
767
        self.assertEqual('foo\xc2\xae',
 
768
                         osutils.safe_revision_id('foo\xc2\xae'))
 
769
 
 
770
    def test_none(self):
 
771
        """Currently, None is a valid revision_id"""
 
772
        self.assertEqual(None, osutils.safe_revision_id(None))
 
773
 
 
774
 
 
775
class TestSafeFileId(tests.TestCase):
 
776
 
 
777
    def test_from_ascii_string(self):
 
778
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
779
 
 
780
    def test_from_unicode_string_ascii_contents(self):
 
781
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
 
782
 
 
783
    def test_from_unicode_deprecated(self):
 
784
        self.assertEqual('bargam',
 
785
            self.callDeprecated([osutils._file_id_warning],
 
786
                                osutils.safe_file_id, u'bargam'))
 
787
 
 
788
    def test_from_unicode_string_unicode_contents(self):
 
789
        self.assertEqual('bargam\xc2\xae',
 
790
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
791
 
 
792
    def test_from_utf8_string(self):
 
793
        self.assertEqual('foo\xc2\xae',
 
794
                         osutils.safe_file_id('foo\xc2\xae'))
 
795
 
 
796
    def test_none(self):
 
797
        """Currently, None is a valid revision_id"""
 
798
        self.assertEqual(None, osutils.safe_file_id(None))
 
799
 
 
800
 
 
801
class TestWin32Funcs(tests.TestCase):
 
802
    """Test that _win32 versions of os utilities return appropriate paths."""
 
803
 
 
804
    def test_abspath(self):
 
805
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
 
806
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
 
807
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
 
808
        self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
 
809
 
 
810
    def test_realpath(self):
 
811
        self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
 
812
        self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
 
813
 
 
814
    def test_pathjoin(self):
 
815
        self.assertEqual('path/to/foo',
 
816
                         osutils._win32_pathjoin('path', 'to', 'foo'))
 
817
        self.assertEqual('C:/foo',
 
818
                         osutils._win32_pathjoin('path\\to', 'C:\\foo'))
 
819
        self.assertEqual('C:/foo',
 
820
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
 
821
        self.assertEqual('path/to/foo',
 
822
                         osutils._win32_pathjoin('path/to/', 'foo'))
 
823
        self.assertEqual('/foo',
 
824
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
 
825
        self.assertEqual('/foo',
 
826
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
 
827
 
 
828
    def test_normpath(self):
 
829
        self.assertEqual('path/to/foo',
 
830
                         osutils._win32_normpath(r'path\\from\..\to\.\foo'))
 
831
        self.assertEqual('path/to/foo',
 
832
                         osutils._win32_normpath('path//from/../to/./foo'))
 
833
 
 
834
    def test_getcwd(self):
 
835
        cwd = osutils._win32_getcwd()
 
836
        os_cwd = os.getcwdu()
 
837
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
 
838
        # win32 is inconsistent whether it returns lower or upper case
 
839
        # and even if it was consistent the user might type the other
 
840
        # so we force it to uppercase
 
841
        # running python.exe under cmd.exe return capital C:\\
 
842
        # running win32 python inside a cygwin shell returns lowercase
 
843
        self.assertEqual(os_cwd[0].upper(), cwd[0])
 
844
 
 
845
    def test_fixdrive(self):
 
846
        self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
 
847
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
 
848
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
 
849
 
 
850
    def test_win98_abspath(self):
 
851
        # absolute path
 
852
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
 
853
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
 
854
        # UNC path
 
855
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
 
856
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
 
857
        # relative path
 
858
        cwd = osutils.getcwd().rstrip('/')
 
859
        drive = osutils._nt_splitdrive(cwd)[0]
 
860
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
 
861
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
 
862
        # unicode path
 
863
        u = u'\u1234'
 
864
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
 
865
 
 
866
 
 
867
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
 
868
    """Test win32 functions that create files."""
 
869
 
 
870
    def test_getcwd(self):
 
871
        self.requireFeature(tests.UnicodeFilenameFeature)
 
872
        os.mkdir(u'mu-\xb5')
 
873
        os.chdir(u'mu-\xb5')
 
874
        # TODO: jam 20060427 This will probably fail on Mac OSX because
 
875
        #       it will change the normalization of B\xe5gfors
 
876
        #       Consider using a different unicode character, or make
 
877
        #       osutils.getcwd() renormalize the path.
 
878
        self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
 
879
 
 
880
    def test_minimum_path_selection(self):
 
881
        self.assertEqual(set(),
 
882
            osutils.minimum_path_selection([]))
 
883
        self.assertEqual(set(['a']),
 
884
            osutils.minimum_path_selection(['a']))
 
885
        self.assertEqual(set(['a', 'b']),
 
886
            osutils.minimum_path_selection(['a', 'b']))
 
887
        self.assertEqual(set(['a/', 'b']),
 
888
            osutils.minimum_path_selection(['a/', 'b']))
 
889
        self.assertEqual(set(['a/', 'b']),
 
890
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
891
        self.assertEqual(set(['a-b', 'a', 'a0b']),
 
892
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
 
893
 
 
894
    def test_mkdtemp(self):
 
895
        tmpdir = osutils._win32_mkdtemp(dir='.')
 
896
        self.assertFalse('\\' in tmpdir)
 
897
 
 
898
    def test_rename(self):
 
899
        a = open('a', 'wb')
 
900
        a.write('foo\n')
 
901
        a.close()
 
902
        b = open('b', 'wb')
 
903
        b.write('baz\n')
 
904
        b.close()
 
905
 
 
906
        osutils._win32_rename('b', 'a')
 
907
        self.failUnlessExists('a')
 
908
        self.failIfExists('b')
 
909
        self.assertFileEqual('baz\n', 'a')
 
910
 
 
911
    def test_rename_missing_file(self):
 
912
        a = open('a', 'wb')
 
913
        a.write('foo\n')
 
914
        a.close()
 
915
 
 
916
        try:
 
917
            osutils._win32_rename('b', 'a')
 
918
        except (IOError, OSError), e:
 
919
            self.assertEqual(errno.ENOENT, e.errno)
 
920
        self.assertFileEqual('foo\n', 'a')
 
921
 
 
922
    def test_rename_missing_dir(self):
 
923
        os.mkdir('a')
 
924
        try:
 
925
            osutils._win32_rename('b', 'a')
 
926
        except (IOError, OSError), e:
 
927
            self.assertEqual(errno.ENOENT, e.errno)
 
928
 
 
929
    def test_rename_current_dir(self):
 
930
        os.mkdir('a')
 
931
        os.chdir('a')
 
932
        # You can't rename the working directory
 
933
        # doing rename non-existant . usually
 
934
        # just raises ENOENT, since non-existant
 
935
        # doesn't exist.
 
936
        try:
 
937
            osutils._win32_rename('b', '.')
 
938
        except (IOError, OSError), e:
 
939
            self.assertEqual(errno.ENOENT, e.errno)
 
940
 
 
941
    def test_splitpath(self):
 
942
        def check(expected, path):
 
943
            self.assertEqual(expected, osutils.splitpath(path))
 
944
 
 
945
        check(['a'], 'a')
 
946
        check(['a', 'b'], 'a/b')
 
947
        check(['a', 'b'], 'a/./b')
 
948
        check(['a', '.b'], 'a/.b')
 
949
        check(['a', '.b'], 'a\\.b')
 
950
 
 
951
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
 
952
 
 
953
 
 
954
class TestParentDirectories(tests.TestCaseInTempDir):
 
955
    """Test osutils.parent_directories()"""
 
956
 
 
957
    def test_parent_directories(self):
 
958
        self.assertEqual([], osutils.parent_directories('a'))
 
959
        self.assertEqual(['a'], osutils.parent_directories('a/b'))
 
960
        self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
 
961
 
 
962
 
 
963
class TestMacFuncsDirs(tests.TestCaseInTempDir):
 
964
    """Test mac special functions that require directories."""
 
965
 
 
966
    def test_getcwd(self):
 
967
        self.requireFeature(tests.UnicodeFilenameFeature)
 
968
        os.mkdir(u'B\xe5gfors')
 
969
        os.chdir(u'B\xe5gfors')
 
970
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
971
 
 
972
    def test_getcwd_nonnorm(self):
 
973
        self.requireFeature(tests.UnicodeFilenameFeature)
 
974
        # Test that _mac_getcwd() will normalize this path
 
975
        os.mkdir(u'Ba\u030agfors')
 
976
        os.chdir(u'Ba\u030agfors')
 
977
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
978
 
 
979
 
 
980
class TestChunksToLines(tests.TestCase):
 
981
 
 
982
    def test_smoketest(self):
 
983
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
984
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
 
985
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
986
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
 
987
 
 
988
    def test_osutils_binding(self):
 
989
        from bzrlib.tests import test__chunks_to_lines
 
990
        if test__chunks_to_lines.CompiledChunksToLinesFeature.available():
 
991
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
 
992
        else:
 
993
            from bzrlib._chunks_to_lines_py import chunks_to_lines
 
994
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
 
995
 
 
996
 
 
997
class TestSplitLines(tests.TestCase):
 
998
 
 
999
    def test_split_unicode(self):
 
1000
        self.assertEqual([u'foo\n', u'bar\xae'],
 
1001
                         osutils.split_lines(u'foo\nbar\xae'))
 
1002
        self.assertEqual([u'foo\n', u'bar\xae\n'],
 
1003
                         osutils.split_lines(u'foo\nbar\xae\n'))
 
1004
 
 
1005
    def test_split_with_carriage_returns(self):
 
1006
        self.assertEqual(['foo\rbar\n'],
 
1007
                         osutils.split_lines('foo\rbar\n'))
 
1008
 
 
1009
 
 
1010
class TestWalkDirs(tests.TestCaseInTempDir):
 
1011
 
 
1012
    def assertExpectedBlocks(self, expected, result):
 
1013
        self.assertEqual(expected,
 
1014
                         [(dirinfo, [line[0:3] for line in block])
 
1015
                          for dirinfo, block in result])
 
1016
 
 
1017
    def test_walkdirs(self):
 
1018
        tree = [
 
1019
            '.bzr',
 
1020
            '0file',
 
1021
            '1dir/',
 
1022
            '1dir/0file',
 
1023
            '1dir/1dir/',
 
1024
            '2file'
 
1025
            ]
 
1026
        self.build_tree(tree)
 
1027
        expected_dirblocks = [
 
1028
                (('', '.'),
 
1029
                 [('0file', '0file', 'file'),
 
1030
                  ('1dir', '1dir', 'directory'),
 
1031
                  ('2file', '2file', 'file'),
 
1032
                 ]
 
1033
                ),
 
1034
                (('1dir', './1dir'),
 
1035
                 [('1dir/0file', '0file', 'file'),
 
1036
                  ('1dir/1dir', '1dir', 'directory'),
 
1037
                 ]
 
1038
                ),
 
1039
                (('1dir/1dir', './1dir/1dir'),
 
1040
                 [
 
1041
                 ]
 
1042
                ),
 
1043
            ]
 
1044
        result = []
 
1045
        found_bzrdir = False
 
1046
        for dirdetail, dirblock in osutils.walkdirs('.'):
 
1047
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
1048
                # this tests the filtering of selected paths
 
1049
                found_bzrdir = True
 
1050
                del dirblock[0]
 
1051
            result.append((dirdetail, dirblock))
 
1052
 
 
1053
        self.assertTrue(found_bzrdir)
 
1054
        self.assertExpectedBlocks(expected_dirblocks, result)
 
1055
        # you can search a subdir only, with a supplied prefix.
 
1056
        result = []
 
1057
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
1058
            result.append(dirblock)
 
1059
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
 
1060
 
 
1061
    def test_walkdirs_os_error(self):
 
1062
        # <https://bugs.edge.launchpad.net/bzr/+bug/338653>
 
1063
        # Pyrex readdir didn't raise useful messages if it had an error
 
1064
        # reading the directory
 
1065
        if sys.platform == 'win32':
 
1066
            raise tests.TestNotApplicable(
 
1067
                "readdir IOError not tested on win32")
 
1068
        os.mkdir("test-unreadable")
 
1069
        os.chmod("test-unreadable", 0000)
 
1070
        # must chmod it back so that it can be removed
 
1071
        self.addCleanup(os.chmod, "test-unreadable", 0700)
 
1072
        # The error is not raised until the generator is actually evaluated.
 
1073
        # (It would be ok if it happened earlier but at the moment it
 
1074
        # doesn't.)
 
1075
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
 
1076
        self.assertEquals('./test-unreadable', e.filename)
 
1077
        self.assertEquals(errno.EACCES, e.errno)
 
1078
        # Ensure the message contains the file name
 
1079
        self.assertContainsRe(str(e), "\./test-unreadable")
 
1080
 
 
1081
    def test__walkdirs_utf8(self):
 
1082
        tree = [
 
1083
            '.bzr',
 
1084
            '0file',
 
1085
            '1dir/',
 
1086
            '1dir/0file',
 
1087
            '1dir/1dir/',
 
1088
            '2file'
 
1089
            ]
 
1090
        self.build_tree(tree)
 
1091
        expected_dirblocks = [
 
1092
                (('', '.'),
 
1093
                 [('0file', '0file', 'file'),
 
1094
                  ('1dir', '1dir', 'directory'),
 
1095
                  ('2file', '2file', 'file'),
 
1096
                 ]
 
1097
                ),
 
1098
                (('1dir', './1dir'),
 
1099
                 [('1dir/0file', '0file', 'file'),
 
1100
                  ('1dir/1dir', '1dir', 'directory'),
 
1101
                 ]
 
1102
                ),
 
1103
                (('1dir/1dir', './1dir/1dir'),
 
1104
                 [
 
1105
                 ]
 
1106
                ),
 
1107
            ]
 
1108
        result = []
 
1109
        found_bzrdir = False
 
1110
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1111
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
1112
                # this tests the filtering of selected paths
 
1113
                found_bzrdir = True
 
1114
                del dirblock[0]
 
1115
            result.append((dirdetail, dirblock))
 
1116
 
 
1117
        self.assertTrue(found_bzrdir)
 
1118
        self.assertExpectedBlocks(expected_dirblocks, result)
 
1119
 
 
1120
        # you can search a subdir only, with a supplied prefix.
 
1121
        result = []
 
1122
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
1123
            result.append(dirblock)
 
1124
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
 
1125
 
 
1126
    def _filter_out_stat(self, result):
 
1127
        """Filter out the stat value from the walkdirs result"""
 
1128
        for dirdetail, dirblock in result:
 
1129
            new_dirblock = []
 
1130
            for info in dirblock:
 
1131
                # Ignore info[3] which is the stat
 
1132
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
1133
            dirblock[:] = new_dirblock
 
1134
 
 
1135
    def _save_platform_info(self):
 
1136
        cur_winver = win32utils.winver
 
1137
        cur_fs_enc = osutils._fs_enc
 
1138
        cur_dir_reader = osutils._selected_dir_reader
 
1139
        def restore():
 
1140
            win32utils.winver = cur_winver
 
1141
            osutils._fs_enc = cur_fs_enc
 
1142
            osutils._selected_dir_reader = cur_dir_reader
 
1143
        self.addCleanup(restore)
 
1144
 
 
1145
    def assertDirReaderIs(self, expected):
 
1146
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
 
1147
        # Force it to redetect
 
1148
        osutils._selected_dir_reader = None
 
1149
        # Nothing to list, but should still trigger the selection logic
 
1150
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
 
1151
        self.assertIsInstance(osutils._selected_dir_reader, expected)
 
1152
 
 
1153
    def test_force_walkdirs_utf8_fs_utf8(self):
 
1154
        self.requireFeature(UTF8DirReaderFeature)
 
1155
        self._save_platform_info()
 
1156
        win32utils.winver = None # Avoid the win32 detection code
 
1157
        osutils._fs_enc = 'UTF-8'
 
1158
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1159
 
 
1160
    def test_force_walkdirs_utf8_fs_ascii(self):
 
1161
        self.requireFeature(UTF8DirReaderFeature)
 
1162
        self._save_platform_info()
 
1163
        win32utils.winver = None # Avoid the win32 detection code
 
1164
        osutils._fs_enc = 'US-ASCII'
 
1165
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1166
 
 
1167
    def test_force_walkdirs_utf8_fs_ANSI(self):
 
1168
        self.requireFeature(UTF8DirReaderFeature)
 
1169
        self._save_platform_info()
 
1170
        win32utils.winver = None # Avoid the win32 detection code
 
1171
        osutils._fs_enc = 'ANSI_X3.4-1968'
 
1172
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1173
 
 
1174
    def test_force_walkdirs_utf8_fs_latin1(self):
 
1175
        self._save_platform_info()
 
1176
        win32utils.winver = None # Avoid the win32 detection code
 
1177
        osutils._fs_enc = 'latin1'
 
1178
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
1179
 
 
1180
    def test_force_walkdirs_utf8_nt(self):
 
1181
        # Disabled because the thunk of the whole walkdirs api is disabled.
 
1182
        self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
 
1183
        self._save_platform_info()
 
1184
        win32utils.winver = 'Windows NT'
 
1185
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1186
        self.assertDirReaderIs(Win32ReadDir)
 
1187
 
 
1188
    def test_force_walkdirs_utf8_98(self):
 
1189
        self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
 
1190
        self._save_platform_info()
 
1191
        win32utils.winver = 'Windows 98'
 
1192
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
1193
 
 
1194
    def test_unicode_walkdirs(self):
 
1195
        """Walkdirs should always return unicode paths."""
 
1196
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1197
        name0 = u'0file-\xb6'
 
1198
        name1 = u'1dir-\u062c\u0648'
 
1199
        name2 = u'2file-\u0633'
 
1200
        tree = [
 
1201
            name0,
 
1202
            name1 + '/',
 
1203
            name1 + '/' + name0,
 
1204
            name1 + '/' + name1 + '/',
 
1205
            name2,
 
1206
            ]
 
1207
        self.build_tree(tree)
 
1208
        expected_dirblocks = [
 
1209
                ((u'', u'.'),
 
1210
                 [(name0, name0, 'file', './' + name0),
 
1211
                  (name1, name1, 'directory', './' + name1),
 
1212
                  (name2, name2, 'file', './' + name2),
 
1213
                 ]
 
1214
                ),
 
1215
                ((name1, './' + name1),
 
1216
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1217
                                                        + '/' + name0),
 
1218
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1219
                                                            + '/' + name1),
 
1220
                 ]
 
1221
                ),
 
1222
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1223
                 [
 
1224
                 ]
 
1225
                ),
 
1226
            ]
 
1227
        result = list(osutils.walkdirs('.'))
 
1228
        self._filter_out_stat(result)
 
1229
        self.assertEqual(expected_dirblocks, result)
 
1230
        result = list(osutils.walkdirs(u'./'+name1, name1))
 
1231
        self._filter_out_stat(result)
 
1232
        self.assertEqual(expected_dirblocks[1:], result)
 
1233
 
 
1234
    def test_unicode__walkdirs_utf8(self):
 
1235
        """Walkdirs_utf8 should always return utf8 paths.
 
1236
 
 
1237
        The abspath portion might be in unicode or utf-8
 
1238
        """
 
1239
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1240
        name0 = u'0file-\xb6'
 
1241
        name1 = u'1dir-\u062c\u0648'
 
1242
        name2 = u'2file-\u0633'
 
1243
        tree = [
 
1244
            name0,
 
1245
            name1 + '/',
 
1246
            name1 + '/' + name0,
 
1247
            name1 + '/' + name1 + '/',
 
1248
            name2,
 
1249
            ]
 
1250
        self.build_tree(tree)
 
1251
        name0 = name0.encode('utf8')
 
1252
        name1 = name1.encode('utf8')
 
1253
        name2 = name2.encode('utf8')
 
1254
 
 
1255
        expected_dirblocks = [
 
1256
                (('', '.'),
 
1257
                 [(name0, name0, 'file', './' + name0),
 
1258
                  (name1, name1, 'directory', './' + name1),
 
1259
                  (name2, name2, 'file', './' + name2),
 
1260
                 ]
 
1261
                ),
 
1262
                ((name1, './' + name1),
 
1263
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1264
                                                        + '/' + name0),
 
1265
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1266
                                                            + '/' + name1),
 
1267
                 ]
 
1268
                ),
 
1269
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1270
                 [
 
1271
                 ]
 
1272
                ),
 
1273
            ]
 
1274
        result = []
 
1275
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
 
1276
        # all abspaths are Unicode, and encode them back into utf8.
 
1277
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1278
            self.assertIsInstance(dirdetail[0], str)
 
1279
            if isinstance(dirdetail[1], unicode):
 
1280
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
 
1281
                dirblock = [list(info) for info in dirblock]
 
1282
                for info in dirblock:
 
1283
                    self.assertIsInstance(info[4], unicode)
 
1284
                    info[4] = info[4].encode('utf8')
 
1285
            new_dirblock = []
 
1286
            for info in dirblock:
 
1287
                self.assertIsInstance(info[0], str)
 
1288
                self.assertIsInstance(info[1], str)
 
1289
                self.assertIsInstance(info[4], str)
 
1290
                # Remove the stat information
 
1291
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
1292
            result.append((dirdetail, new_dirblock))
 
1293
        self.assertEqual(expected_dirblocks, result)
 
1294
 
 
1295
    def test__walkdirs_utf8_with_unicode_fs(self):
 
1296
        """UnicodeDirReader should be a safe fallback everywhere
 
1297
 
 
1298
        The abspath portion should be in unicode
 
1299
        """
 
1300
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1301
        # Use the unicode reader. TODO: split into driver-and-driven unit
 
1302
        # tests.
 
1303
        self._save_platform_info()
 
1304
        osutils._selected_dir_reader = osutils.UnicodeDirReader()
 
1305
        name0u = u'0file-\xb6'
 
1306
        name1u = u'1dir-\u062c\u0648'
 
1307
        name2u = u'2file-\u0633'
 
1308
        tree = [
 
1309
            name0u,
 
1310
            name1u + '/',
 
1311
            name1u + '/' + name0u,
 
1312
            name1u + '/' + name1u + '/',
 
1313
            name2u,
 
1314
            ]
 
1315
        self.build_tree(tree)
 
1316
        name0 = name0u.encode('utf8')
 
1317
        name1 = name1u.encode('utf8')
 
1318
        name2 = name2u.encode('utf8')
 
1319
 
 
1320
        # All of the abspaths should be in unicode, all of the relative paths
 
1321
        # should be in utf8
 
1322
        expected_dirblocks = [
 
1323
                (('', '.'),
 
1324
                 [(name0, name0, 'file', './' + name0u),
 
1325
                  (name1, name1, 'directory', './' + name1u),
 
1326
                  (name2, name2, 'file', './' + name2u),
 
1327
                 ]
 
1328
                ),
 
1329
                ((name1, './' + name1u),
 
1330
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1331
                                                        + '/' + name0u),
 
1332
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1333
                                                            + '/' + name1u),
 
1334
                 ]
 
1335
                ),
 
1336
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1337
                 [
 
1338
                 ]
 
1339
                ),
 
1340
            ]
 
1341
        result = list(osutils._walkdirs_utf8('.'))
 
1342
        self._filter_out_stat(result)
 
1343
        self.assertEqual(expected_dirblocks, result)
 
1344
 
 
1345
    def test__walkdirs_utf8_win32readdir(self):
 
1346
        self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
 
1347
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1348
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1349
        self._save_platform_info()
 
1350
        osutils._selected_dir_reader = Win32ReadDir()
 
1351
        name0u = u'0file-\xb6'
 
1352
        name1u = u'1dir-\u062c\u0648'
 
1353
        name2u = u'2file-\u0633'
 
1354
        tree = [
 
1355
            name0u,
 
1356
            name1u + '/',
 
1357
            name1u + '/' + name0u,
 
1358
            name1u + '/' + name1u + '/',
 
1359
            name2u,
 
1360
            ]
 
1361
        self.build_tree(tree)
 
1362
        name0 = name0u.encode('utf8')
 
1363
        name1 = name1u.encode('utf8')
 
1364
        name2 = name2u.encode('utf8')
 
1365
 
 
1366
        # All of the abspaths should be in unicode, all of the relative paths
 
1367
        # should be in utf8
 
1368
        expected_dirblocks = [
 
1369
                (('', '.'),
 
1370
                 [(name0, name0, 'file', './' + name0u),
 
1371
                  (name1, name1, 'directory', './' + name1u),
 
1372
                  (name2, name2, 'file', './' + name2u),
 
1373
                 ]
 
1374
                ),
 
1375
                ((name1, './' + name1u),
 
1376
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1377
                                                        + '/' + name0u),
 
1378
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1379
                                                            + '/' + name1u),
 
1380
                 ]
 
1381
                ),
 
1382
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1383
                 [
 
1384
                 ]
 
1385
                ),
 
1386
            ]
 
1387
        result = list(osutils._walkdirs_utf8(u'.'))
 
1388
        self._filter_out_stat(result)
 
1389
        self.assertEqual(expected_dirblocks, result)
 
1390
 
 
1391
    def assertStatIsCorrect(self, path, win32stat):
 
1392
        os_stat = os.stat(path)
 
1393
        self.assertEqual(os_stat.st_size, win32stat.st_size)
 
1394
        self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
 
1395
        self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
 
1396
        self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
 
1397
        self.assertEqual(os_stat.st_dev, win32stat.st_dev)
 
1398
        self.assertEqual(os_stat.st_ino, win32stat.st_ino)
 
1399
        self.assertEqual(os_stat.st_mode, win32stat.st_mode)
 
1400
 
 
1401
    def test__walkdirs_utf_win32_find_file_stat_file(self):
 
1402
        """make sure our Stat values are valid"""
 
1403
        self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
 
1404
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1405
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1406
        name0u = u'0file-\xb6'
 
1407
        name0 = name0u.encode('utf8')
 
1408
        self.build_tree([name0u])
 
1409
        # I hate to sleep() here, but I'm trying to make the ctime different
 
1410
        # from the mtime
 
1411
        time.sleep(2)
 
1412
        f = open(name0u, 'ab')
 
1413
        try:
 
1414
            f.write('just a small update')
 
1415
        finally:
 
1416
            f.close()
 
1417
 
 
1418
        result = Win32ReadDir().read_dir('', u'.')
 
1419
        entry = result[0]
 
1420
        self.assertEqual((name0, name0, 'file'), entry[:3])
 
1421
        self.assertEqual(u'./' + name0u, entry[4])
 
1422
        self.assertStatIsCorrect(entry[4], entry[3])
 
1423
        self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
 
1424
 
 
1425
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
 
1426
        """make sure our Stat values are valid"""
 
1427
        self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
 
1428
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1429
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1430
        name0u = u'0dir-\u062c\u0648'
 
1431
        name0 = name0u.encode('utf8')
 
1432
        self.build_tree([name0u + '/'])
 
1433
 
 
1434
        result = Win32ReadDir().read_dir('', u'.')
 
1435
        entry = result[0]
 
1436
        self.assertEqual((name0, name0, 'directory'), entry[:3])
 
1437
        self.assertEqual(u'./' + name0u, entry[4])
 
1438
        self.assertStatIsCorrect(entry[4], entry[3])
 
1439
 
 
1440
    def assertPathCompare(self, path_less, path_greater):
 
1441
        """check that path_less and path_greater compare correctly."""
 
1442
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
1443
            path_less, path_less))
 
1444
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
1445
            path_greater, path_greater))
 
1446
        self.assertEqual(-1, osutils.compare_paths_prefix_order(
 
1447
            path_less, path_greater))
 
1448
        self.assertEqual(1, osutils.compare_paths_prefix_order(
 
1449
            path_greater, path_less))
 
1450
 
 
1451
    def test_compare_paths_prefix_order(self):
 
1452
        # root before all else
 
1453
        self.assertPathCompare("/", "/a")
 
1454
        # alpha within a dir
 
1455
        self.assertPathCompare("/a", "/b")
 
1456
        self.assertPathCompare("/b", "/z")
 
1457
        # high dirs before lower.
 
1458
        self.assertPathCompare("/z", "/a/a")
 
1459
        # except if the deeper dir should be output first
 
1460
        self.assertPathCompare("/a/b/c", "/d/g")
 
1461
        # lexical betwen dirs of the same height
 
1462
        self.assertPathCompare("/a/z", "/z/z")
 
1463
        self.assertPathCompare("/a/c/z", "/a/d/e")
 
1464
 
 
1465
        # this should also be consistent for no leading / paths
 
1466
        # root before all else
 
1467
        self.assertPathCompare("", "a")
 
1468
        # alpha within a dir
 
1469
        self.assertPathCompare("a", "b")
 
1470
        self.assertPathCompare("b", "z")
 
1471
        # high dirs before lower.
 
1472
        self.assertPathCompare("z", "a/a")
 
1473
        # except if the deeper dir should be output first
 
1474
        self.assertPathCompare("a/b/c", "d/g")
 
1475
        # lexical betwen dirs of the same height
 
1476
        self.assertPathCompare("a/z", "z/z")
 
1477
        self.assertPathCompare("a/c/z", "a/d/e")
 
1478
 
 
1479
    def test_path_prefix_sorting(self):
 
1480
        """Doing a sort on path prefix should match our sample data."""
 
1481
        original_paths = [
 
1482
            'a',
 
1483
            'a/b',
 
1484
            'a/b/c',
 
1485
            'b',
 
1486
            'b/c',
 
1487
            'd',
 
1488
            'd/e',
 
1489
            'd/e/f',
 
1490
            'd/f',
 
1491
            'd/g',
 
1492
            'g',
 
1493
            ]
 
1494
 
 
1495
        dir_sorted_paths = [
 
1496
            'a',
 
1497
            'b',
 
1498
            'd',
 
1499
            'g',
 
1500
            'a/b',
 
1501
            'a/b/c',
 
1502
            'b/c',
 
1503
            'd/e',
 
1504
            'd/f',
 
1505
            'd/g',
 
1506
            'd/e/f',
 
1507
            ]
 
1508
 
 
1509
        self.assertEqual(
 
1510
            dir_sorted_paths,
 
1511
            sorted(original_paths, key=osutils.path_prefix_key))
 
1512
        # using the comparison routine shoudl work too:
 
1513
        self.assertEqual(
 
1514
            dir_sorted_paths,
 
1515
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
 
1516
 
 
1517
 
 
1518
class TestCopyTree(tests.TestCaseInTempDir):
 
1519
 
 
1520
    def test_copy_basic_tree(self):
 
1521
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1522
        osutils.copy_tree('source', 'target')
 
1523
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1524
        self.assertEqual(['c'], os.listdir('target/b'))
 
1525
 
 
1526
    def test_copy_tree_target_exists(self):
 
1527
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
 
1528
                         'target/'])
 
1529
        osutils.copy_tree('source', 'target')
 
1530
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1531
        self.assertEqual(['c'], os.listdir('target/b'))
 
1532
 
 
1533
    def test_copy_tree_symlinks(self):
 
1534
        self.requireFeature(tests.SymlinkFeature)
 
1535
        self.build_tree(['source/'])
 
1536
        os.symlink('a/generic/path', 'source/lnk')
 
1537
        osutils.copy_tree('source', 'target')
 
1538
        self.assertEqual(['lnk'], os.listdir('target'))
 
1539
        self.assertEqual('a/generic/path', os.readlink('target/lnk'))
 
1540
 
 
1541
    def test_copy_tree_handlers(self):
 
1542
        processed_files = []
 
1543
        processed_links = []
 
1544
        def file_handler(from_path, to_path):
 
1545
            processed_files.append(('f', from_path, to_path))
 
1546
        def dir_handler(from_path, to_path):
 
1547
            processed_files.append(('d', from_path, to_path))
 
1548
        def link_handler(from_path, to_path):
 
1549
            processed_links.append((from_path, to_path))
 
1550
        handlers = {'file':file_handler,
 
1551
                    'directory':dir_handler,
 
1552
                    'symlink':link_handler,
 
1553
                   }
 
1554
 
 
1555
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1556
        if osutils.has_symlinks():
 
1557
            os.symlink('a/generic/path', 'source/lnk')
 
1558
        osutils.copy_tree('source', 'target', handlers=handlers)
 
1559
 
 
1560
        self.assertEqual([('d', 'source', 'target'),
 
1561
                          ('f', 'source/a', 'target/a'),
 
1562
                          ('d', 'source/b', 'target/b'),
 
1563
                          ('f', 'source/b/c', 'target/b/c'),
 
1564
                         ], processed_files)
 
1565
        self.failIfExists('target')
 
1566
        if osutils.has_symlinks():
 
1567
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
 
1568
 
 
1569
 
 
1570
class TestSetUnsetEnv(tests.TestCase):
 
1571
    """Test updating the environment"""
 
1572
 
 
1573
    def setUp(self):
 
1574
        super(TestSetUnsetEnv, self).setUp()
 
1575
 
 
1576
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
 
1577
                         'Environment was not cleaned up properly.'
 
1578
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
 
1579
        def cleanup():
 
1580
            if 'BZR_TEST_ENV_VAR' in os.environ:
 
1581
                del os.environ['BZR_TEST_ENV_VAR']
 
1582
 
 
1583
        self.addCleanup(cleanup)
 
1584
 
 
1585
    def test_set(self):
 
1586
        """Test that we can set an env variable"""
 
1587
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1588
        self.assertEqual(None, old)
 
1589
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
 
1590
 
 
1591
    def test_double_set(self):
 
1592
        """Test that we get the old value out"""
 
1593
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1594
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
 
1595
        self.assertEqual('foo', old)
 
1596
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
 
1597
 
 
1598
    def test_unicode(self):
 
1599
        """Environment can only contain plain strings
 
1600
 
 
1601
        So Unicode strings must be encoded.
 
1602
        """
 
1603
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
1604
        if uni_val is None:
 
1605
            raise tests.TestSkipped(
 
1606
                'Cannot find a unicode character that works in encoding %s'
 
1607
                % (osutils.get_user_encoding(),))
 
1608
 
 
1609
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
 
1610
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
 
1611
 
 
1612
    def test_unset(self):
 
1613
        """Test that passing None will remove the env var"""
 
1614
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1615
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
 
1616
        self.assertEqual('foo', old)
 
1617
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
 
1618
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1619
 
 
1620
 
 
1621
class TestSizeShaFile(tests.TestCaseInTempDir):
 
1622
 
 
1623
    def test_sha_empty(self):
 
1624
        self.build_tree_contents([('foo', '')])
 
1625
        expected_sha = osutils.sha_string('')
 
1626
        f = open('foo')
 
1627
        self.addCleanup(f.close)
 
1628
        size, sha = osutils.size_sha_file(f)
 
1629
        self.assertEqual(0, size)
 
1630
        self.assertEqual(expected_sha, sha)
 
1631
 
 
1632
    def test_sha_mixed_endings(self):
 
1633
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1634
        self.build_tree_contents([('foo', text)])
 
1635
        expected_sha = osutils.sha_string(text)
 
1636
        f = open('foo', 'rb')
 
1637
        self.addCleanup(f.close)
 
1638
        size, sha = osutils.size_sha_file(f)
 
1639
        self.assertEqual(38, size)
 
1640
        self.assertEqual(expected_sha, sha)
 
1641
 
 
1642
 
 
1643
class TestShaFileByName(tests.TestCaseInTempDir):
 
1644
 
 
1645
    def test_sha_empty(self):
 
1646
        self.build_tree_contents([('foo', '')])
 
1647
        expected_sha = osutils.sha_string('')
 
1648
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1649
 
 
1650
    def test_sha_mixed_endings(self):
 
1651
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1652
        self.build_tree_contents([('foo', text)])
 
1653
        expected_sha = osutils.sha_string(text)
 
1654
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1655
 
 
1656
 
 
1657
class TestResourceLoading(tests.TestCaseInTempDir):
 
1658
 
 
1659
    def test_resource_string(self):
 
1660
        # test resource in bzrlib
 
1661
        text = osutils.resource_string('bzrlib', 'debug.py')
 
1662
        self.assertContainsRe(text, "debug_flags = set()")
 
1663
        # test resource under bzrlib
 
1664
        text = osutils.resource_string('bzrlib.ui', 'text.py')
 
1665
        self.assertContainsRe(text, "class TextUIFactory")
 
1666
        # test unsupported package
 
1667
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
 
1668
            'yyy.xx')
 
1669
        # test unknown resource
 
1670
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
 
1671
 
 
1672
 
 
1673
class TestReCompile(tests.TestCase):
 
1674
 
 
1675
    def test_re_compile_checked(self):
 
1676
        r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
 
1677
        self.assertTrue(r.match('aaaa'))
 
1678
        self.assertTrue(r.match('aAaA'))
 
1679
 
 
1680
    def test_re_compile_checked_error(self):
 
1681
        # like https://bugs.launchpad.net/bzr/+bug/251352
 
1682
        err = self.assertRaises(
 
1683
            errors.BzrCommandError,
 
1684
            osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
 
1685
        self.assertEqual(
 
1686
            "Invalid regular expression in test case: '*': "
 
1687
            "nothing to repeat",
 
1688
            str(err))
 
1689
 
 
1690
 
 
1691
class TestDirReader(tests.TestCaseInTempDir):
 
1692
 
 
1693
    # Set by load_tests
 
1694
    _dir_reader_class = None
 
1695
    _native_to_unicode = None
 
1696
 
 
1697
    def setUp(self):
 
1698
        tests.TestCaseInTempDir.setUp(self)
 
1699
 
 
1700
        # Save platform specific info and reset it
 
1701
        cur_dir_reader = osutils._selected_dir_reader
 
1702
 
 
1703
        def restore():
 
1704
            osutils._selected_dir_reader = cur_dir_reader
 
1705
        self.addCleanup(restore)
 
1706
 
 
1707
        osutils._selected_dir_reader = self._dir_reader_class()
 
1708
 
 
1709
    def _get_ascii_tree(self):
 
1710
        tree = [
 
1711
            '0file',
 
1712
            '1dir/',
 
1713
            '1dir/0file',
 
1714
            '1dir/1dir/',
 
1715
            '2file'
 
1716
            ]
 
1717
        expected_dirblocks = [
 
1718
                (('', '.'),
 
1719
                 [('0file', '0file', 'file'),
 
1720
                  ('1dir', '1dir', 'directory'),
 
1721
                  ('2file', '2file', 'file'),
 
1722
                 ]
 
1723
                ),
 
1724
                (('1dir', './1dir'),
 
1725
                 [('1dir/0file', '0file', 'file'),
 
1726
                  ('1dir/1dir', '1dir', 'directory'),
 
1727
                 ]
 
1728
                ),
 
1729
                (('1dir/1dir', './1dir/1dir'),
 
1730
                 [
 
1731
                 ]
 
1732
                ),
 
1733
            ]
 
1734
        return tree, expected_dirblocks
 
1735
 
 
1736
    def test_walk_cur_dir(self):
 
1737
        tree, expected_dirblocks = self._get_ascii_tree()
 
1738
        self.build_tree(tree)
 
1739
        result = list(osutils._walkdirs_utf8('.'))
 
1740
        # Filter out stat and abspath
 
1741
        self.assertEqual(expected_dirblocks,
 
1742
                         [(dirinfo, [line[0:3] for line in block])
 
1743
                          for dirinfo, block in result])
 
1744
 
 
1745
    def test_walk_sub_dir(self):
 
1746
        tree, expected_dirblocks = self._get_ascii_tree()
 
1747
        self.build_tree(tree)
 
1748
        # you can search a subdir only, with a supplied prefix.
 
1749
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
 
1750
        # Filter out stat and abspath
 
1751
        self.assertEqual(expected_dirblocks[1:],
 
1752
                         [(dirinfo, [line[0:3] for line in block])
 
1753
                          for dirinfo, block in result])
 
1754
 
 
1755
    def _get_unicode_tree(self):
 
1756
        name0u = u'0file-\xb6'
 
1757
        name1u = u'1dir-\u062c\u0648'
 
1758
        name2u = u'2file-\u0633'
 
1759
        tree = [
 
1760
            name0u,
 
1761
            name1u + '/',
 
1762
            name1u + '/' + name0u,
 
1763
            name1u + '/' + name1u + '/',
 
1764
            name2u,
 
1765
            ]
 
1766
        name0 = name0u.encode('UTF-8')
 
1767
        name1 = name1u.encode('UTF-8')
 
1768
        name2 = name2u.encode('UTF-8')
 
1769
        expected_dirblocks = [
 
1770
                (('', '.'),
 
1771
                 [(name0, name0, 'file', './' + name0u),
 
1772
                  (name1, name1, 'directory', './' + name1u),
 
1773
                  (name2, name2, 'file', './' + name2u),
 
1774
                 ]
 
1775
                ),
 
1776
                ((name1, './' + name1u),
 
1777
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1778
                                                        + '/' + name0u),
 
1779
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1780
                                                            + '/' + name1u),
 
1781
                 ]
 
1782
                ),
 
1783
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1784
                 [
 
1785
                 ]
 
1786
                ),
 
1787
            ]
 
1788
        return tree, expected_dirblocks
 
1789
 
 
1790
    def _filter_out(self, raw_dirblocks):
 
1791
        """Filter out a walkdirs_utf8 result.
 
1792
 
 
1793
        stat field is removed, all native paths are converted to unicode
 
1794
        """
 
1795
        filtered_dirblocks = []
 
1796
        for dirinfo, block in raw_dirblocks:
 
1797
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
 
1798
            details = []
 
1799
            for line in block:
 
1800
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
 
1801
            filtered_dirblocks.append((dirinfo, details))
 
1802
        return filtered_dirblocks
 
1803
 
 
1804
    def test_walk_unicode_tree(self):
 
1805
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1806
        tree, expected_dirblocks = self._get_unicode_tree()
 
1807
        self.build_tree(tree)
 
1808
        result = list(osutils._walkdirs_utf8('.'))
 
1809
        self.assertEqual(expected_dirblocks, self._filter_out(result))
 
1810
 
 
1811
    def test_symlink(self):
 
1812
        self.requireFeature(tests.SymlinkFeature)
 
1813
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1814
        target = u'target\N{Euro Sign}'
 
1815
        link_name = u'l\N{Euro Sign}nk'
 
1816
        os.symlink(target, link_name)
 
1817
        target_utf8 = target.encode('UTF-8')
 
1818
        link_name_utf8 = link_name.encode('UTF-8')
 
1819
        expected_dirblocks = [
 
1820
                (('', '.'),
 
1821
                 [(link_name_utf8, link_name_utf8,
 
1822
                   'symlink', './' + link_name),],
 
1823
                 )]
 
1824
        result = list(osutils._walkdirs_utf8('.'))
 
1825
        self.assertEqual(expected_dirblocks, self._filter_out(result))
 
1826
 
 
1827
 
 
1828
class TestReadLink(tests.TestCaseInTempDir):
 
1829
    """Exposes os.readlink() problems and the osutils solution.
 
1830
 
 
1831
    The only guarantee offered by os.readlink(), starting with 2.6, is that a
 
1832
    unicode string will be returned if a unicode string is passed.
 
1833
 
 
1834
    But prior python versions failed to properly encode the passed unicode
 
1835
    string.
 
1836
    """
 
1837
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
 
1838
 
 
1839
    def setUp(self):
 
1840
        super(tests.TestCaseInTempDir, self).setUp()
 
1841
        self.link = u'l\N{Euro Sign}ink'
 
1842
        self.target = u'targe\N{Euro Sign}t'
 
1843
        os.symlink(self.target, self.link)
 
1844
 
 
1845
    def test_os_readlink_link_encoding(self):
 
1846
        if sys.version_info < (2, 6):
 
1847
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
 
1848
        else:
 
1849
            self.assertEquals(self.target,  os.readlink(self.link))
 
1850
 
 
1851
    def test_os_readlink_link_decoding(self):
 
1852
        self.assertEquals(self.target.encode(osutils._fs_enc),
 
1853
                          os.readlink(self.link.encode(osutils._fs_enc)))
 
1854
 
 
1855
 
 
1856
class TestConcurrency(tests.TestCase):
 
1857
 
 
1858
    def setUp(self):
 
1859
        super(TestConcurrency, self).setUp()
 
1860
        orig = osutils._cached_local_concurrency
 
1861
        def restore():
 
1862
            osutils._cached_local_concurrency = orig
 
1863
        self.addCleanup(restore)
 
1864
 
 
1865
    def test_local_concurrency(self):
 
1866
        concurrency = osutils.local_concurrency()
 
1867
        self.assertIsInstance(concurrency, int)
 
1868
 
 
1869
    def test_local_concurrency_environment_variable(self):
 
1870
        os.environ['BZR_CONCURRENCY'] = '2'
 
1871
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
 
1872
        os.environ['BZR_CONCURRENCY'] = '3'
 
1873
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
 
1874
        os.environ['BZR_CONCURRENCY'] = 'foo'
 
1875
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
 
1876
 
 
1877
    def test_option_concurrency(self):
 
1878
        os.environ['BZR_CONCURRENCY'] = '1'
 
1879
        self.run_bzr('rocks --concurrency 42')
 
1880
        # Command line overrides envrionment variable
 
1881
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
 
1882
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
 
1883
 
 
1884
 
 
1885
class TestFailedToLoadExtension(tests.TestCase):
 
1886
 
 
1887
    def _try_loading(self):
 
1888
        try:
 
1889
            import bzrlib._fictional_extension_py
 
1890
        except ImportError, e:
 
1891
            osutils.failed_to_load_extension(e)
 
1892
            return True
 
1893
 
 
1894
    def setUp(self):
 
1895
        super(TestFailedToLoadExtension, self).setUp()
 
1896
        self.saved_failures = osutils._extension_load_failures[:]
 
1897
        del osutils._extension_load_failures[:]
 
1898
        self.addCleanup(self.restore_failures)
 
1899
 
 
1900
    def restore_failures(self):
 
1901
        osutils._extension_load_failures = self.saved_failures
 
1902
 
 
1903
    def test_failure_to_load(self):
 
1904
        self._try_loading()
 
1905
        self.assertLength(1, osutils._extension_load_failures)
 
1906
        self.assertEquals(osutils._extension_load_failures[0],
 
1907
            "No module named _fictional_extension_py")
 
1908
 
 
1909
    def test_report_extension_load_failures_no_warning(self):
 
1910
        self.assertTrue(self._try_loading())
 
1911
        warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
 
1912
        # it used to give a Python warning; it no longer does
 
1913
        self.assertLength(0, warnings)
 
1914
 
 
1915
    def test_report_extension_load_failures_message(self):
 
1916
        log = StringIO()
 
1917
        trace.push_log_file(log)
 
1918
        self.assertTrue(self._try_loading())
 
1919
        osutils.report_extension_load_failures()
 
1920
        self.assertContainsRe(
 
1921
            log.getvalue(),
 
1922
            r"bzr: warning: some compiled extensions could not be loaded; "
 
1923
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
 
1924
            )