/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Andrew Bennetts
  • Date: 2008-09-08 12:59:00 UTC
  • mfrom: (3695 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3756.
  • Revision ID: andrew.bennetts@canonical.com-20080908125900-8ywtsr7jqyyatjz0
Merge from bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for the osutils wrapper."""
 
18
 
 
19
from cStringIO import StringIO
 
20
import errno
 
21
import os
 
22
import socket
 
23
import stat
 
24
import sys
 
25
import time
 
26
 
 
27
from bzrlib import (
 
28
    errors,
 
29
    osutils,
 
30
    tests,
 
31
    win32utils,
 
32
    )
 
33
from bzrlib.errors import BzrBadParameterNotUnicode, InvalidURL
 
34
from bzrlib.osutils import (
 
35
        is_inside_any,
 
36
        is_inside_or_parent_of_any,
 
37
        pathjoin,
 
38
        pumpfile,
 
39
        pump_string_file,
 
40
        )
 
41
from bzrlib.tests import (
 
42
        adapt_tests,
 
43
        Feature,
 
44
        probe_unicode_in_user_encoding,
 
45
        split_suite_by_re,
 
46
        StringIOWrapper,
 
47
        SymlinkFeature,
 
48
        TestCase,
 
49
        TestCaseInTempDir,
 
50
        TestScenarioApplier,
 
51
        TestSkipped,
 
52
        )
 
53
from bzrlib.tests.file_utils import (
 
54
    FakeReadFile,
 
55
    )
 
56
from bzrlib.tests.test__walkdirs_win32 import WalkdirsWin32Feature
 
57
 
 
58
 
 
59
def load_tests(standard_tests, module, loader):
 
60
    """Parameterize readdir tests."""
 
61
    to_adapt, result = split_suite_by_re(standard_tests, "readdir")
 
62
    adapter = TestScenarioApplier()
 
63
    from bzrlib import _readdir_py
 
64
    adapter.scenarios = [('python', {'read_dir': _readdir_py.read_dir})]
 
65
    if ReadDirFeature.available():
 
66
        adapter.scenarios.append(('pyrex',
 
67
            {'read_dir': ReadDirFeature.read_dir}))
 
68
    adapt_tests(to_adapt, adapter, result)
 
69
    return result
 
70
 
 
71
 
 
72
class _ReadDirFeature(Feature):
 
73
 
 
74
    def _probe(self):
 
75
        try:
 
76
            from bzrlib import _readdir_pyx
 
77
            self.read_dir = _readdir_pyx.read_dir
 
78
            return True
 
79
        except ImportError:
 
80
            return False
 
81
 
 
82
    def feature_name(self):
 
83
        return 'bzrlib._readdir_pyx'
 
84
 
 
85
ReadDirFeature = _ReadDirFeature()
 
86
 
 
87
 
 
88
class TestOSUtils(TestCaseInTempDir):
 
89
 
 
90
    def test_contains_whitespace(self):
 
91
        self.failUnless(osutils.contains_whitespace(u' '))
 
92
        self.failUnless(osutils.contains_whitespace(u'hello there'))
 
93
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
 
94
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
 
95
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
 
96
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
97
 
 
98
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
 
99
        # is whitespace, but we do not.
 
100
        self.failIf(osutils.contains_whitespace(u''))
 
101
        self.failIf(osutils.contains_whitespace(u'hellothere'))
 
102
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
103
 
 
104
    def test_fancy_rename(self):
 
105
        # This should work everywhere
 
106
        def rename(a, b):
 
107
            osutils.fancy_rename(a, b,
 
108
                    rename_func=os.rename,
 
109
                    unlink_func=os.unlink)
 
110
 
 
111
        open('a', 'wb').write('something in a\n')
 
112
        rename('a', 'b')
 
113
        self.failIfExists('a')
 
114
        self.failUnlessExists('b')
 
115
        self.check_file_contents('b', 'something in a\n')
 
116
 
 
117
        open('a', 'wb').write('new something in a\n')
 
118
        rename('b', 'a')
 
119
 
 
120
        self.check_file_contents('a', 'something in a\n')
 
121
 
 
122
    def test_rename(self):
 
123
        # Rename should be semi-atomic on all platforms
 
124
        open('a', 'wb').write('something in a\n')
 
125
        osutils.rename('a', 'b')
 
126
        self.failIfExists('a')
 
127
        self.failUnlessExists('b')
 
128
        self.check_file_contents('b', 'something in a\n')
 
129
 
 
130
        open('a', 'wb').write('new something in a\n')
 
131
        osutils.rename('b', 'a')
 
132
 
 
133
        self.check_file_contents('a', 'something in a\n')
 
134
 
 
135
    # TODO: test fancy_rename using a MemoryTransport
 
136
 
 
137
    def test_rename_change_case(self):
 
138
        # on Windows we should be able to change filename case by rename
 
139
        self.build_tree(['a', 'b/'])
 
140
        osutils.rename('a', 'A')
 
141
        osutils.rename('b', 'B')
 
142
        # we can't use failUnlessExists on case-insensitive filesystem
 
143
        # so try to check shape of the tree
 
144
        shape = sorted(os.listdir('.'))
 
145
        self.assertEquals(['A', 'B'], shape)
 
146
 
 
147
    def test_01_rand_chars_empty(self):
 
148
        result = osutils.rand_chars(0)
 
149
        self.assertEqual(result, '')
 
150
 
 
151
    def test_02_rand_chars_100(self):
 
152
        result = osutils.rand_chars(100)
 
153
        self.assertEqual(len(result), 100)
 
154
        self.assertEqual(type(result), str)
 
155
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
 
156
 
 
157
    def test_is_inside(self):
 
158
        is_inside = osutils.is_inside
 
159
        self.assertTrue(is_inside('src', 'src/foo.c'))
 
160
        self.assertFalse(is_inside('src', 'srccontrol'))
 
161
        self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
 
162
        self.assertTrue(is_inside('foo.c', 'foo.c'))
 
163
        self.assertFalse(is_inside('foo.c', ''))
 
164
        self.assertTrue(is_inside('', 'foo.c'))
 
165
 
 
166
    def test_is_inside_any(self):
 
167
        SRC_FOO_C = pathjoin('src', 'foo.c')
 
168
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
 
169
                         (['src'], SRC_FOO_C),
 
170
                         (['src'], 'src'),
 
171
                         ]:
 
172
            self.assert_(is_inside_any(dirs, fn))
 
173
        for dirs, fn in [(['src'], 'srccontrol'),
 
174
                         (['src'], 'srccontrol/foo')]:
 
175
            self.assertFalse(is_inside_any(dirs, fn))
 
176
 
 
177
    def test_is_inside_or_parent_of_any(self):
 
178
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
 
179
                         (['src'], 'src/foo.c'),
 
180
                         (['src/bar.c'], 'src'),
 
181
                         (['src/bar.c', 'bla/foo.c'], 'src'),
 
182
                         (['src'], 'src'),
 
183
                         ]:
 
184
            self.assert_(is_inside_or_parent_of_any(dirs, fn))
 
185
            
 
186
        for dirs, fn in [(['src'], 'srccontrol'),
 
187
                         (['srccontrol/foo.c'], 'src'),
 
188
                         (['src'], 'srccontrol/foo')]:
 
189
            self.assertFalse(is_inside_or_parent_of_any(dirs, fn))
 
190
 
 
191
    def test_rmtree(self):
 
192
        # Check to remove tree with read-only files/dirs
 
193
        os.mkdir('dir')
 
194
        f = file('dir/file', 'w')
 
195
        f.write('spam')
 
196
        f.close()
 
197
        # would like to also try making the directory readonly, but at the
 
198
        # moment python shutil.rmtree doesn't handle that properly - it would
 
199
        # need to chmod the directory before removing things inside it - deferred
 
200
        # for now -- mbp 20060505
 
201
        # osutils.make_readonly('dir')
 
202
        osutils.make_readonly('dir/file')
 
203
 
 
204
        osutils.rmtree('dir')
 
205
 
 
206
        self.failIfExists('dir/file')
 
207
        self.failIfExists('dir')
 
208
 
 
209
    def test_file_kind(self):
 
210
        self.build_tree(['file', 'dir/'])
 
211
        self.assertEquals('file', osutils.file_kind('file'))
 
212
        self.assertEquals('directory', osutils.file_kind('dir/'))
 
213
        if osutils.has_symlinks():
 
214
            os.symlink('symlink', 'symlink')
 
215
            self.assertEquals('symlink', osutils.file_kind('symlink'))
 
216
        
 
217
        # TODO: jam 20060529 Test a block device
 
218
        try:
 
219
            os.lstat('/dev/null')
 
220
        except OSError, e:
 
221
            if e.errno not in (errno.ENOENT,):
 
222
                raise
 
223
        else:
 
224
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
 
225
 
 
226
        mkfifo = getattr(os, 'mkfifo', None)
 
227
        if mkfifo:
 
228
            mkfifo('fifo')
 
229
            try:
 
230
                self.assertEquals('fifo', osutils.file_kind('fifo'))
 
231
            finally:
 
232
                os.remove('fifo')
 
233
 
 
234
        AF_UNIX = getattr(socket, 'AF_UNIX', None)
 
235
        if AF_UNIX:
 
236
            s = socket.socket(AF_UNIX)
 
237
            s.bind('socket')
 
238
            try:
 
239
                self.assertEquals('socket', osutils.file_kind('socket'))
 
240
            finally:
 
241
                os.remove('socket')
 
242
 
 
243
    def test_kind_marker(self):
 
244
        self.assertEqual(osutils.kind_marker('file'), '')
 
245
        self.assertEqual(osutils.kind_marker('directory'), '/')
 
246
        self.assertEqual(osutils.kind_marker('symlink'), '@')
 
247
        self.assertEqual(osutils.kind_marker('tree-reference'), '+')
 
248
 
 
249
    def test_get_umask(self):
 
250
        if sys.platform == 'win32':
 
251
            # umask always returns '0', no way to set it
 
252
            self.assertEqual(0, osutils.get_umask())
 
253
            return
 
254
 
 
255
        orig_umask = osutils.get_umask()
 
256
        try:
 
257
            os.umask(0222)
 
258
            self.assertEqual(0222, osutils.get_umask())
 
259
            os.umask(0022)
 
260
            self.assertEqual(0022, osutils.get_umask())
 
261
            os.umask(0002)
 
262
            self.assertEqual(0002, osutils.get_umask())
 
263
            os.umask(0027)
 
264
            self.assertEqual(0027, osutils.get_umask())
 
265
        finally:
 
266
            os.umask(orig_umask)
 
267
 
 
268
    def assertFormatedDelta(self, expected, seconds):
 
269
        """Assert osutils.format_delta formats as expected"""
 
270
        actual = osutils.format_delta(seconds)
 
271
        self.assertEqual(expected, actual)
 
272
 
 
273
    def test_format_delta(self):
 
274
        self.assertFormatedDelta('0 seconds ago', 0)
 
275
        self.assertFormatedDelta('1 second ago', 1)
 
276
        self.assertFormatedDelta('10 seconds ago', 10)
 
277
        self.assertFormatedDelta('59 seconds ago', 59)
 
278
        self.assertFormatedDelta('89 seconds ago', 89)
 
279
        self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
 
280
        self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
 
281
        self.assertFormatedDelta('3 minutes, 1 second ago', 181)
 
282
        self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
 
283
        self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
 
284
        self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
 
285
        self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
 
286
        self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
 
287
        self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
 
288
        self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
 
289
        self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
 
290
        self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
 
291
        self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
 
292
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
 
293
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
 
294
        self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
 
295
        self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
 
296
        self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
 
297
 
 
298
        # We handle when time steps the wrong direction because computers
 
299
        # don't have synchronized clocks.
 
300
        self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
 
301
        self.assertFormatedDelta('1 second in the future', -1)
 
302
        self.assertFormatedDelta('2 seconds in the future', -2)
 
303
 
 
304
    def test_format_date(self):
 
305
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
306
            osutils.format_date, 0, timezone='foo')
 
307
 
 
308
    def test_dereference_path(self):
 
309
        self.requireFeature(SymlinkFeature)
 
310
        cwd = osutils.realpath('.')
 
311
        os.mkdir('bar')
 
312
        bar_path = osutils.pathjoin(cwd, 'bar')
 
313
        # Using './' to avoid bug #1213894 (first path component not
 
314
        # dereferenced) in Python 2.4.1 and earlier
 
315
        self.assertEqual(bar_path, osutils.realpath('./bar'))
 
316
        os.symlink('bar', 'foo')
 
317
        self.assertEqual(bar_path, osutils.realpath('./foo'))
 
318
        
 
319
        # Does not dereference terminal symlinks
 
320
        foo_path = osutils.pathjoin(cwd, 'foo')
 
321
        self.assertEqual(foo_path, osutils.dereference_path('./foo'))
 
322
 
 
323
        # Dereferences parent symlinks
 
324
        os.mkdir('bar/baz')
 
325
        baz_path = osutils.pathjoin(bar_path, 'baz')
 
326
        self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
 
327
 
 
328
        # Dereferences parent symlinks that are the first path element
 
329
        self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
 
330
 
 
331
        # Dereferences parent symlinks in absolute paths
 
332
        foo_baz_path = osutils.pathjoin(foo_path, 'baz')
 
333
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
 
334
 
 
335
    def test_changing_access(self):
 
336
        f = file('file', 'w')
 
337
        f.write('monkey')
 
338
        f.close()
 
339
 
 
340
        # Make a file readonly
 
341
        osutils.make_readonly('file')
 
342
        mode = os.lstat('file').st_mode
 
343
        self.assertEqual(mode, mode & 0777555)
 
344
 
 
345
        # Make a file writable
 
346
        osutils.make_writable('file')
 
347
        mode = os.lstat('file').st_mode
 
348
        self.assertEqual(mode, mode | 0200)
 
349
 
 
350
        if osutils.has_symlinks():
 
351
            # should not error when handed a symlink
 
352
            os.symlink('nonexistent', 'dangling')
 
353
            osutils.make_readonly('dangling')
 
354
            osutils.make_writable('dangling')
 
355
 
 
356
    def test_kind_marker(self):
 
357
        self.assertEqual("", osutils.kind_marker("file"))
 
358
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
 
359
        self.assertEqual("@", osutils.kind_marker("symlink"))
 
360
        self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
 
361
 
 
362
    def test_host_os_dereferences_symlinks(self):
 
363
        osutils.host_os_dereferences_symlinks()
 
364
 
 
365
 
 
366
class TestPumpFile(TestCase):
 
367
    """Test pumpfile method."""
 
368
    def setUp(self):
 
369
        # create a test datablock
 
370
        self.block_size = 512
 
371
        pattern = '0123456789ABCDEF'
 
372
        self.test_data = pattern * (3 * self.block_size / len(pattern))
 
373
        self.test_data_len = len(self.test_data)
 
374
 
 
375
    def test_bracket_block_size(self):
 
376
        """Read data in blocks with the requested read size bracketing the
 
377
        block size."""
 
378
        # make sure test data is larger than max read size
 
379
        self.assertTrue(self.test_data_len > self.block_size)
 
380
 
 
381
        from_file = FakeReadFile(self.test_data)
 
382
        to_file = StringIO()
 
383
 
 
384
        # read (max / 2) bytes and verify read size wasn't affected
 
385
        num_bytes_to_read = self.block_size / 2
 
386
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
387
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
388
        self.assertEqual(from_file.get_read_count(), 1)
 
389
 
 
390
        # read (max) bytes and verify read size wasn't affected
 
391
        num_bytes_to_read = self.block_size
 
392
        from_file.reset_read_count()
 
393
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
394
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
395
        self.assertEqual(from_file.get_read_count(), 1)
 
396
 
 
397
        # read (max + 1) bytes and verify read size was limited
 
398
        num_bytes_to_read = self.block_size + 1
 
399
        from_file.reset_read_count()
 
400
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
401
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
402
        self.assertEqual(from_file.get_read_count(), 2)
 
403
 
 
404
        # finish reading the rest of the data
 
405
        num_bytes_to_read = self.test_data_len - to_file.tell()
 
406
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
407
 
 
408
        # report error if the data wasn't equal (we only report the size due
 
409
        # to the length of the data)
 
410
        response_data = to_file.getvalue()
 
411
        if response_data != self.test_data:
 
412
            message = "Data not equal.  Expected %d bytes, received %d."
 
413
            self.fail(message % (len(response_data), self.test_data_len))
 
414
 
 
415
    def test_specified_size(self):
 
416
        """Request a transfer larger than the maximum block size and verify
 
417
        that the maximum read doesn't exceed the block_size."""
 
418
        # make sure test data is larger than max read size
 
419
        self.assertTrue(self.test_data_len > self.block_size)
 
420
 
 
421
        # retrieve data in blocks
 
422
        from_file = FakeReadFile(self.test_data)
 
423
        to_file = StringIO()
 
424
        pumpfile(from_file, to_file, self.test_data_len, self.block_size)
 
425
 
 
426
        # verify read size was equal to the maximum read size
 
427
        self.assertTrue(from_file.get_max_read_size() > 0)
 
428
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
429
        self.assertEqual(from_file.get_read_count(), 3)
 
430
 
 
431
        # report error if the data wasn't equal (we only report the size due
 
432
        # to the length of the data)
 
433
        response_data = to_file.getvalue()
 
434
        if response_data != self.test_data:
 
435
            message = "Data not equal.  Expected %d bytes, received %d."
 
436
            self.fail(message % (len(response_data), self.test_data_len))
 
437
 
 
438
    def test_to_eof(self):
 
439
        """Read to end-of-file and verify that the reads are not larger than
 
440
        the maximum read size."""
 
441
        # make sure test data is larger than max read size
 
442
        self.assertTrue(self.test_data_len > self.block_size)
 
443
 
 
444
        # retrieve data to EOF
 
445
        from_file = FakeReadFile(self.test_data)
 
446
        to_file = StringIO()
 
447
        pumpfile(from_file, to_file, -1, self.block_size)
 
448
 
 
449
        # verify read size was equal to the maximum read size
 
450
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
451
        self.assertEqual(from_file.get_read_count(), 4)
 
452
 
 
453
        # report error if the data wasn't equal (we only report the size due
 
454
        # to the length of the data)
 
455
        response_data = to_file.getvalue()
 
456
        if response_data != self.test_data:
 
457
            message = "Data not equal.  Expected %d bytes, received %d."
 
458
            self.fail(message % (len(response_data), self.test_data_len))
 
459
 
 
460
    def test_defaults(self):
 
461
        """Verifies that the default arguments will read to EOF -- this
 
462
        test verifies that any existing usages of pumpfile will not be broken
 
463
        with this new version."""
 
464
        # retrieve data using default (old) pumpfile method
 
465
        from_file = FakeReadFile(self.test_data)
 
466
        to_file = StringIO()
 
467
        pumpfile(from_file, to_file)
 
468
 
 
469
        # report error if the data wasn't equal (we only report the size due
 
470
        # to the length of the data)
 
471
        response_data = to_file.getvalue()
 
472
        if response_data != self.test_data:
 
473
            message = "Data not equal.  Expected %d bytes, received %d."
 
474
            self.fail(message % (len(response_data), self.test_data_len))
 
475
 
 
476
 
 
477
class TestPumpStringFile(TestCase):
 
478
 
 
479
    def test_empty(self):
 
480
        output = StringIO()
 
481
        pump_string_file("", output)
 
482
        self.assertEqual("", output.getvalue())
 
483
 
 
484
    def test_more_than_segment_size(self):
 
485
        output = StringIO()
 
486
        pump_string_file("123456789", output, 2)
 
487
        self.assertEqual("123456789", output.getvalue())
 
488
 
 
489
    def test_segment_size(self):
 
490
        output = StringIO()
 
491
        pump_string_file("12", output, 2)
 
492
        self.assertEqual("12", output.getvalue())
 
493
 
 
494
    def test_segment_size_multiple(self):
 
495
        output = StringIO()
 
496
        pump_string_file("1234", output, 2)
 
497
        self.assertEqual("1234", output.getvalue())
 
498
 
 
499
 
 
500
class TestSafeUnicode(TestCase):
 
501
 
 
502
    def test_from_ascii_string(self):
 
503
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
 
504
 
 
505
    def test_from_unicode_string_ascii_contents(self):
 
506
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
 
507
 
 
508
    def test_from_unicode_string_unicode_contents(self):
 
509
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
 
510
 
 
511
    def test_from_utf8_string(self):
 
512
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
 
513
 
 
514
    def test_bad_utf8_string(self):
 
515
        self.assertRaises(BzrBadParameterNotUnicode,
 
516
                          osutils.safe_unicode,
 
517
                          '\xbb\xbb')
 
518
 
 
519
 
 
520
class TestSafeUtf8(TestCase):
 
521
 
 
522
    def test_from_ascii_string(self):
 
523
        f = 'foobar'
 
524
        self.assertEqual('foobar', osutils.safe_utf8(f))
 
525
 
 
526
    def test_from_unicode_string_ascii_contents(self):
 
527
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
 
528
 
 
529
    def test_from_unicode_string_unicode_contents(self):
 
530
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
531
 
 
532
    def test_from_utf8_string(self):
 
533
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
 
534
 
 
535
    def test_bad_utf8_string(self):
 
536
        self.assertRaises(BzrBadParameterNotUnicode,
 
537
                          osutils.safe_utf8, '\xbb\xbb')
 
538
 
 
539
 
 
540
class TestSafeRevisionId(TestCase):
 
541
 
 
542
    def test_from_ascii_string(self):
 
543
        # this shouldn't give a warning because it's getting an ascii string
 
544
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
545
 
 
546
    def test_from_unicode_string_ascii_contents(self):
 
547
        self.assertEqual('bargam',
 
548
                         osutils.safe_revision_id(u'bargam', warn=False))
 
549
 
 
550
    def test_from_unicode_deprecated(self):
 
551
        self.assertEqual('bargam',
 
552
            self.callDeprecated([osutils._revision_id_warning],
 
553
                                osutils.safe_revision_id, u'bargam'))
 
554
 
 
555
    def test_from_unicode_string_unicode_contents(self):
 
556
        self.assertEqual('bargam\xc2\xae',
 
557
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
558
 
 
559
    def test_from_utf8_string(self):
 
560
        self.assertEqual('foo\xc2\xae',
 
561
                         osutils.safe_revision_id('foo\xc2\xae'))
 
562
 
 
563
    def test_none(self):
 
564
        """Currently, None is a valid revision_id"""
 
565
        self.assertEqual(None, osutils.safe_revision_id(None))
 
566
 
 
567
 
 
568
class TestSafeFileId(TestCase):
 
569
 
 
570
    def test_from_ascii_string(self):
 
571
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
572
 
 
573
    def test_from_unicode_string_ascii_contents(self):
 
574
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
 
575
 
 
576
    def test_from_unicode_deprecated(self):
 
577
        self.assertEqual('bargam',
 
578
            self.callDeprecated([osutils._file_id_warning],
 
579
                                osutils.safe_file_id, u'bargam'))
 
580
 
 
581
    def test_from_unicode_string_unicode_contents(self):
 
582
        self.assertEqual('bargam\xc2\xae',
 
583
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
584
 
 
585
    def test_from_utf8_string(self):
 
586
        self.assertEqual('foo\xc2\xae',
 
587
                         osutils.safe_file_id('foo\xc2\xae'))
 
588
 
 
589
    def test_none(self):
 
590
        """Currently, None is a valid revision_id"""
 
591
        self.assertEqual(None, osutils.safe_file_id(None))
 
592
 
 
593
 
 
594
class TestWin32Funcs(TestCase):
 
595
    """Test that the _win32 versions of os utilities return appropriate paths."""
 
596
 
 
597
    def test_abspath(self):
 
598
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
 
599
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
 
600
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
 
601
        self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
 
602
 
 
603
    def test_realpath(self):
 
604
        self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
 
605
        self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
 
606
 
 
607
    def test_pathjoin(self):
 
608
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path', 'to', 'foo'))
 
609
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path\\to', 'C:\\foo'))
 
610
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path/to', 'C:/foo'))
 
611
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path/to/', 'foo'))
 
612
        self.assertEqual('/foo', osutils._win32_pathjoin('C:/path/to/', '/foo'))
 
613
        self.assertEqual('/foo', osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
 
614
 
 
615
    def test_normpath(self):
 
616
        self.assertEqual('path/to/foo', osutils._win32_normpath(r'path\\from\..\to\.\foo'))
 
617
        self.assertEqual('path/to/foo', osutils._win32_normpath('path//from/../to/./foo'))
 
618
 
 
619
    def test_getcwd(self):
 
620
        cwd = osutils._win32_getcwd()
 
621
        os_cwd = os.getcwdu()
 
622
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
 
623
        # win32 is inconsistent whether it returns lower or upper case
 
624
        # and even if it was consistent the user might type the other
 
625
        # so we force it to uppercase
 
626
        # running python.exe under cmd.exe return capital C:\\
 
627
        # running win32 python inside a cygwin shell returns lowercase
 
628
        self.assertEqual(os_cwd[0].upper(), cwd[0])
 
629
 
 
630
    def test_fixdrive(self):
 
631
        self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
 
632
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
 
633
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
 
634
 
 
635
    def test_win98_abspath(self):
 
636
        # absolute path
 
637
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
 
638
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
 
639
        # UNC path
 
640
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
 
641
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
 
642
        # relative path
 
643
        cwd = osutils.getcwd().rstrip('/')
 
644
        drive = osutils._nt_splitdrive(cwd)[0]
 
645
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
 
646
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
 
647
        # unicode path
 
648
        u = u'\u1234'
 
649
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
 
650
 
 
651
 
 
652
class TestWin32FuncsDirs(TestCaseInTempDir):
 
653
    """Test win32 functions that create files."""
 
654
    
 
655
    def test_getcwd(self):
 
656
        if win32utils.winver == 'Windows 98':
 
657
            raise TestSkipped('Windows 98 cannot handle unicode filenames')
 
658
        # Make sure getcwd can handle unicode filenames
 
659
        try:
 
660
            os.mkdir(u'mu-\xb5')
 
661
        except UnicodeError:
 
662
            raise TestSkipped("Unable to create Unicode filename")
 
663
 
 
664
        os.chdir(u'mu-\xb5')
 
665
        # TODO: jam 20060427 This will probably fail on Mac OSX because
 
666
        #       it will change the normalization of B\xe5gfors
 
667
        #       Consider using a different unicode character, or make
 
668
        #       osutils.getcwd() renormalize the path.
 
669
        self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
 
670
 
 
671
    def test_minimum_path_selection(self):
 
672
        self.assertEqual(set(),
 
673
            osutils.minimum_path_selection([]))
 
674
        self.assertEqual(set(['a', 'b']),
 
675
            osutils.minimum_path_selection(['a', 'b']))
 
676
        self.assertEqual(set(['a/', 'b']),
 
677
            osutils.minimum_path_selection(['a/', 'b']))
 
678
        self.assertEqual(set(['a/', 'b']),
 
679
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
680
 
 
681
    def test_mkdtemp(self):
 
682
        tmpdir = osutils._win32_mkdtemp(dir='.')
 
683
        self.assertFalse('\\' in tmpdir)
 
684
 
 
685
    def test_rename(self):
 
686
        a = open('a', 'wb')
 
687
        a.write('foo\n')
 
688
        a.close()
 
689
        b = open('b', 'wb')
 
690
        b.write('baz\n')
 
691
        b.close()
 
692
 
 
693
        osutils._win32_rename('b', 'a')
 
694
        self.failUnlessExists('a')
 
695
        self.failIfExists('b')
 
696
        self.assertFileEqual('baz\n', 'a')
 
697
 
 
698
    def test_rename_missing_file(self):
 
699
        a = open('a', 'wb')
 
700
        a.write('foo\n')
 
701
        a.close()
 
702
 
 
703
        try:
 
704
            osutils._win32_rename('b', 'a')
 
705
        except (IOError, OSError), e:
 
706
            self.assertEqual(errno.ENOENT, e.errno)
 
707
        self.assertFileEqual('foo\n', 'a')
 
708
 
 
709
    def test_rename_missing_dir(self):
 
710
        os.mkdir('a')
 
711
        try:
 
712
            osutils._win32_rename('b', 'a')
 
713
        except (IOError, OSError), e:
 
714
            self.assertEqual(errno.ENOENT, e.errno)
 
715
 
 
716
    def test_rename_current_dir(self):
 
717
        os.mkdir('a')
 
718
        os.chdir('a')
 
719
        # You can't rename the working directory
 
720
        # doing rename non-existant . usually
 
721
        # just raises ENOENT, since non-existant
 
722
        # doesn't exist.
 
723
        try:
 
724
            osutils._win32_rename('b', '.')
 
725
        except (IOError, OSError), e:
 
726
            self.assertEqual(errno.ENOENT, e.errno)
 
727
 
 
728
    def test_splitpath(self):
 
729
        def check(expected, path):
 
730
            self.assertEqual(expected, osutils.splitpath(path))
 
731
 
 
732
        check(['a'], 'a')
 
733
        check(['a', 'b'], 'a/b')
 
734
        check(['a', 'b'], 'a/./b')
 
735
        check(['a', '.b'], 'a/.b')
 
736
        check(['a', '.b'], 'a\\.b')
 
737
 
 
738
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
 
739
 
 
740
 
 
741
class TestMacFuncsDirs(TestCaseInTempDir):
 
742
    """Test mac special functions that require directories."""
 
743
 
 
744
    def test_getcwd(self):
 
745
        # On Mac, this will actually create Ba\u030agfors
 
746
        # but chdir will still work, because it accepts both paths
 
747
        try:
 
748
            os.mkdir(u'B\xe5gfors')
 
749
        except UnicodeError:
 
750
            raise TestSkipped("Unable to create Unicode filename")
 
751
 
 
752
        os.chdir(u'B\xe5gfors')
 
753
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
754
 
 
755
    def test_getcwd_nonnorm(self):
 
756
        # Test that _mac_getcwd() will normalize this path
 
757
        try:
 
758
            os.mkdir(u'Ba\u030agfors')
 
759
        except UnicodeError:
 
760
            raise TestSkipped("Unable to create Unicode filename")
 
761
 
 
762
        os.chdir(u'Ba\u030agfors')
 
763
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
764
 
 
765
 
 
766
class TestSplitLines(TestCase):
 
767
 
 
768
    def test_split_unicode(self):
 
769
        self.assertEqual([u'foo\n', u'bar\xae'],
 
770
                         osutils.split_lines(u'foo\nbar\xae'))
 
771
        self.assertEqual([u'foo\n', u'bar\xae\n'],
 
772
                         osutils.split_lines(u'foo\nbar\xae\n'))
 
773
 
 
774
    def test_split_with_carriage_returns(self):
 
775
        self.assertEqual(['foo\rbar\n'],
 
776
                         osutils.split_lines('foo\rbar\n'))
 
777
 
 
778
 
 
779
class TestWalkDirs(TestCaseInTempDir):
 
780
 
 
781
    def test_readdir(self):
 
782
        tree = [
 
783
            '.bzr/',
 
784
            '0file',
 
785
            '1dir/',
 
786
            '1dir/0file',
 
787
            '1dir/1dir/',
 
788
            '2file'
 
789
            ]
 
790
        self.build_tree(tree)
 
791
        expected_names = ['.bzr', '0file', '1dir', '2file']
 
792
        # read_dir returns pairs, which form a table with either None in all
 
793
        # the first columns, or a sort key to get best on-disk-read order, 
 
794
        # and the disk path name in utf-8 encoding in the second column.
 
795
        read_result = self.read_dir('.')
 
796
        # The second column is always the names, and every name except "." and
 
797
        # ".." should be present.
 
798
        names = sorted([row[1] for row in read_result])
 
799
        self.assertEqual(expected_names, names)
 
800
        expected_sort_key = None
 
801
        if read_result[0][0] is None:
 
802
            # No sort key returned - all keys must None
 
803
            operator = self.assertEqual
 
804
        else:
 
805
            # A sort key in the first row implies sort keys in the other rows.
 
806
            operator = self.assertNotEqual
 
807
        for row in read_result:
 
808
            operator(None, row[0])
 
809
 
 
810
    def test_compiled_extension_exists(self):
 
811
        self.requireFeature(ReadDirFeature)
 
812
        
 
813
    def test_walkdirs(self):
 
814
        tree = [
 
815
            '.bzr',
 
816
            '0file',
 
817
            '1dir/',
 
818
            '1dir/0file',
 
819
            '1dir/1dir/',
 
820
            '2file'
 
821
            ]
 
822
        self.build_tree(tree)
 
823
        expected_dirblocks = [
 
824
                (('', '.'),
 
825
                 [('0file', '0file', 'file'),
 
826
                  ('1dir', '1dir', 'directory'),
 
827
                  ('2file', '2file', 'file'),
 
828
                 ]
 
829
                ),
 
830
                (('1dir', './1dir'),
 
831
                 [('1dir/0file', '0file', 'file'),
 
832
                  ('1dir/1dir', '1dir', 'directory'),
 
833
                 ]
 
834
                ),
 
835
                (('1dir/1dir', './1dir/1dir'),
 
836
                 [
 
837
                 ]
 
838
                ),
 
839
            ]
 
840
        result = []
 
841
        found_bzrdir = False
 
842
        for dirdetail, dirblock in osutils.walkdirs('.'):
 
843
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
844
                # this tests the filtering of selected paths
 
845
                found_bzrdir = True
 
846
                del dirblock[0]
 
847
            result.append((dirdetail, dirblock))
 
848
 
 
849
        self.assertTrue(found_bzrdir)
 
850
        self.assertEqual(expected_dirblocks,
 
851
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
852
        # you can search a subdir only, with a supplied prefix.
 
853
        result = []
 
854
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
855
            result.append(dirblock)
 
856
        self.assertEqual(expected_dirblocks[1:],
 
857
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
858
 
 
859
    def test__walkdirs_utf8(self):
 
860
        tree = [
 
861
            '.bzr',
 
862
            '0file',
 
863
            '1dir/',
 
864
            '1dir/0file',
 
865
            '1dir/1dir/',
 
866
            '2file'
 
867
            ]
 
868
        self.build_tree(tree)
 
869
        expected_dirblocks = [
 
870
                (('', '.'),
 
871
                 [('0file', '0file', 'file'),
 
872
                  ('1dir', '1dir', 'directory'),
 
873
                  ('2file', '2file', 'file'),
 
874
                 ]
 
875
                ),
 
876
                (('1dir', './1dir'),
 
877
                 [('1dir/0file', '0file', 'file'),
 
878
                  ('1dir/1dir', '1dir', 'directory'),
 
879
                 ]
 
880
                ),
 
881
                (('1dir/1dir', './1dir/1dir'),
 
882
                 [
 
883
                 ]
 
884
                ),
 
885
            ]
 
886
        result = []
 
887
        found_bzrdir = False
 
888
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
889
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
890
                # this tests the filtering of selected paths
 
891
                found_bzrdir = True
 
892
                del dirblock[0]
 
893
            result.append((dirdetail, dirblock))
 
894
 
 
895
        self.assertTrue(found_bzrdir)
 
896
        self.assertEqual(expected_dirblocks,
 
897
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
898
        # you can search a subdir only, with a supplied prefix.
 
899
        result = []
 
900
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
901
            result.append(dirblock)
 
902
        self.assertEqual(expected_dirblocks[1:],
 
903
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
904
 
 
905
    def _filter_out_stat(self, result):
 
906
        """Filter out the stat value from the walkdirs result"""
 
907
        for dirdetail, dirblock in result:
 
908
            new_dirblock = []
 
909
            for info in dirblock:
 
910
                # Ignore info[3] which is the stat
 
911
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
912
            dirblock[:] = new_dirblock
 
913
 
 
914
    def test__walkdirs_utf8_selection(self):
 
915
        # Just trigger the function once, to make sure it has selected a real
 
916
        # implementation.
 
917
        list(osutils._walkdirs_utf8('.'))
 
918
        if WalkdirsWin32Feature.available():
 
919
            # If the compiled form is available, make sure it is used
 
920
            from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
 
921
            self.assertIs(_walkdirs_utf8_win32_find_file,
 
922
                          osutils._real_walkdirs_utf8)
 
923
        elif sys.platform == 'win32':
 
924
            self.assertIs(osutils._walkdirs_unicode_to_utf8,
 
925
                          osutils._real_walkdirs_utf8)
 
926
        elif osutils._fs_enc.upper() in ('UTF-8', 'ASCII', 'ANSI_X3.4-1968'): # ascii
 
927
            self.assertIs(osutils._walkdirs_fs_utf8,
 
928
                          osutils._real_walkdirs_utf8)
 
929
        else:
 
930
            self.assertIs(osutils._walkdirs_unicode_to_utf8,
 
931
                          osutils._real_walkdirs_utf8)
 
932
 
 
933
    def _save_platform_info(self):
 
934
        cur_winver = win32utils.winver
 
935
        cur_fs_enc = osutils._fs_enc
 
936
        cur_real_walkdirs_utf8 = osutils._real_walkdirs_utf8
 
937
        def restore():
 
938
            win32utils.winver = cur_winver
 
939
            osutils._fs_enc = cur_fs_enc
 
940
            osutils._real_walkdirs_utf8 = cur_real_walkdirs_utf8
 
941
        self.addCleanup(restore)
 
942
 
 
943
    def assertWalkdirsUtf8Is(self, expected):
 
944
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
 
945
        # Force it to redetect
 
946
        osutils._real_walkdirs_utf8 = None
 
947
        # Nothing to list, but should still trigger the selection logic
 
948
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
 
949
        self.assertIs(expected, osutils._real_walkdirs_utf8)
 
950
 
 
951
    def test_force_walkdirs_utf8_fs_utf8(self):
 
952
        self._save_platform_info()
 
953
        win32utils.winver = None # Avoid the win32 detection code
 
954
        osutils._fs_enc = 'UTF-8'
 
955
        self.assertWalkdirsUtf8Is(osutils._walkdirs_fs_utf8)
 
956
 
 
957
    def test_force_walkdirs_utf8_fs_ascii(self):
 
958
        self._save_platform_info()
 
959
        win32utils.winver = None # Avoid the win32 detection code
 
960
        osutils._fs_enc = 'US-ASCII'
 
961
        self.assertWalkdirsUtf8Is(osutils._walkdirs_fs_utf8)
 
962
 
 
963
    def test_force_walkdirs_utf8_fs_ANSI(self):
 
964
        self._save_platform_info()
 
965
        win32utils.winver = None # Avoid the win32 detection code
 
966
        osutils._fs_enc = 'ANSI_X3.4-1968'
 
967
        self.assertWalkdirsUtf8Is(osutils._walkdirs_fs_utf8)
 
968
 
 
969
    def test_force_walkdirs_utf8_fs_latin1(self):
 
970
        self._save_platform_info()
 
971
        win32utils.winver = None # Avoid the win32 detection code
 
972
        osutils._fs_enc = 'latin1'
 
973
        self.assertWalkdirsUtf8Is(osutils._walkdirs_unicode_to_utf8)
 
974
 
 
975
    def test_force_walkdirs_utf8_nt(self):
 
976
        self.requireFeature(WalkdirsWin32Feature)
 
977
        self._save_platform_info()
 
978
        win32utils.winver = 'Windows NT'
 
979
        from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
 
980
        self.assertWalkdirsUtf8Is(_walkdirs_utf8_win32_find_file)
 
981
 
 
982
    def test_force_walkdirs_utf8_nt(self):
 
983
        self.requireFeature(WalkdirsWin32Feature)
 
984
        self._save_platform_info()
 
985
        win32utils.winver = 'Windows 98'
 
986
        self.assertWalkdirsUtf8Is(osutils._walkdirs_unicode_to_utf8)
 
987
 
 
988
    def test_unicode_walkdirs(self):
 
989
        """Walkdirs should always return unicode paths."""
 
990
        name0 = u'0file-\xb6'
 
991
        name1 = u'1dir-\u062c\u0648'
 
992
        name2 = u'2file-\u0633'
 
993
        tree = [
 
994
            name0,
 
995
            name1 + '/',
 
996
            name1 + '/' + name0,
 
997
            name1 + '/' + name1 + '/',
 
998
            name2,
 
999
            ]
 
1000
        try:
 
1001
            self.build_tree(tree)
 
1002
        except UnicodeError:
 
1003
            raise TestSkipped('Could not represent Unicode chars'
 
1004
                              ' in current encoding.')
 
1005
        expected_dirblocks = [
 
1006
                ((u'', u'.'),
 
1007
                 [(name0, name0, 'file', './' + name0),
 
1008
                  (name1, name1, 'directory', './' + name1),
 
1009
                  (name2, name2, 'file', './' + name2),
 
1010
                 ]
 
1011
                ),
 
1012
                ((name1, './' + name1),
 
1013
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1014
                                                        + '/' + name0),
 
1015
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1016
                                                            + '/' + name1),
 
1017
                 ]
 
1018
                ),
 
1019
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1020
                 [
 
1021
                 ]
 
1022
                ),
 
1023
            ]
 
1024
        result = list(osutils.walkdirs('.'))
 
1025
        self._filter_out_stat(result)
 
1026
        self.assertEqual(expected_dirblocks, result)
 
1027
        result = list(osutils.walkdirs(u'./'+name1, name1))
 
1028
        self._filter_out_stat(result)
 
1029
        self.assertEqual(expected_dirblocks[1:], result)
 
1030
 
 
1031
    def test_unicode__walkdirs_utf8(self):
 
1032
        """Walkdirs_utf8 should always return utf8 paths.
 
1033
 
 
1034
        The abspath portion might be in unicode or utf-8
 
1035
        """
 
1036
        name0 = u'0file-\xb6'
 
1037
        name1 = u'1dir-\u062c\u0648'
 
1038
        name2 = u'2file-\u0633'
 
1039
        tree = [
 
1040
            name0,
 
1041
            name1 + '/',
 
1042
            name1 + '/' + name0,
 
1043
            name1 + '/' + name1 + '/',
 
1044
            name2,
 
1045
            ]
 
1046
        try:
 
1047
            self.build_tree(tree)
 
1048
        except UnicodeError:
 
1049
            raise TestSkipped('Could not represent Unicode chars'
 
1050
                              ' in current encoding.')
 
1051
        name0 = name0.encode('utf8')
 
1052
        name1 = name1.encode('utf8')
 
1053
        name2 = name2.encode('utf8')
 
1054
 
 
1055
        expected_dirblocks = [
 
1056
                (('', '.'),
 
1057
                 [(name0, name0, 'file', './' + name0),
 
1058
                  (name1, name1, 'directory', './' + name1),
 
1059
                  (name2, name2, 'file', './' + name2),
 
1060
                 ]
 
1061
                ),
 
1062
                ((name1, './' + name1),
 
1063
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1064
                                                        + '/' + name0),
 
1065
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1066
                                                            + '/' + name1),
 
1067
                 ]
 
1068
                ),
 
1069
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1070
                 [
 
1071
                 ]
 
1072
                ),
 
1073
            ]
 
1074
        result = []
 
1075
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
 
1076
        # all abspaths are Unicode, and encode them back into utf8.
 
1077
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1078
            self.assertIsInstance(dirdetail[0], str)
 
1079
            if isinstance(dirdetail[1], unicode):
 
1080
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
 
1081
                dirblock = [list(info) for info in dirblock]
 
1082
                for info in dirblock:
 
1083
                    self.assertIsInstance(info[4], unicode)
 
1084
                    info[4] = info[4].encode('utf8')
 
1085
            new_dirblock = []
 
1086
            for info in dirblock:
 
1087
                self.assertIsInstance(info[0], str)
 
1088
                self.assertIsInstance(info[1], str)
 
1089
                self.assertIsInstance(info[4], str)
 
1090
                # Remove the stat information
 
1091
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
1092
            result.append((dirdetail, new_dirblock))
 
1093
        self.assertEqual(expected_dirblocks, result)
 
1094
 
 
1095
    def test_unicode__walkdirs_unicode_to_utf8(self):
 
1096
        """walkdirs_unicode_to_utf8 should be a safe fallback everywhere
 
1097
 
 
1098
        The abspath portion should be in unicode
 
1099
        """
 
1100
        name0u = u'0file-\xb6'
 
1101
        name1u = u'1dir-\u062c\u0648'
 
1102
        name2u = u'2file-\u0633'
 
1103
        tree = [
 
1104
            name0u,
 
1105
            name1u + '/',
 
1106
            name1u + '/' + name0u,
 
1107
            name1u + '/' + name1u + '/',
 
1108
            name2u,
 
1109
            ]
 
1110
        try:
 
1111
            self.build_tree(tree)
 
1112
        except UnicodeError:
 
1113
            raise TestSkipped('Could not represent Unicode chars'
 
1114
                              ' in current encoding.')
 
1115
        name0 = name0u.encode('utf8')
 
1116
        name1 = name1u.encode('utf8')
 
1117
        name2 = name2u.encode('utf8')
 
1118
 
 
1119
        # All of the abspaths should be in unicode, all of the relative paths
 
1120
        # should be in utf8
 
1121
        expected_dirblocks = [
 
1122
                (('', '.'),
 
1123
                 [(name0, name0, 'file', './' + name0u),
 
1124
                  (name1, name1, 'directory', './' + name1u),
 
1125
                  (name2, name2, 'file', './' + name2u),
 
1126
                 ]
 
1127
                ),
 
1128
                ((name1, './' + name1u),
 
1129
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1130
                                                        + '/' + name0u),
 
1131
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1132
                                                            + '/' + name1u),
 
1133
                 ]
 
1134
                ),
 
1135
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1136
                 [
 
1137
                 ]
 
1138
                ),
 
1139
            ]
 
1140
        result = list(osutils._walkdirs_unicode_to_utf8('.'))
 
1141
        self._filter_out_stat(result)
 
1142
        self.assertEqual(expected_dirblocks, result)
 
1143
 
 
1144
    def test__walkdirs_utf_win32_find_file(self):
 
1145
        self.requireFeature(WalkdirsWin32Feature)
 
1146
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1147
        from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
 
1148
        name0u = u'0file-\xb6'
 
1149
        name1u = u'1dir-\u062c\u0648'
 
1150
        name2u = u'2file-\u0633'
 
1151
        tree = [
 
1152
            name0u,
 
1153
            name1u + '/',
 
1154
            name1u + '/' + name0u,
 
1155
            name1u + '/' + name1u + '/',
 
1156
            name2u,
 
1157
            ]
 
1158
        self.build_tree(tree)
 
1159
        name0 = name0u.encode('utf8')
 
1160
        name1 = name1u.encode('utf8')
 
1161
        name2 = name2u.encode('utf8')
 
1162
 
 
1163
        # All of the abspaths should be in unicode, all of the relative paths
 
1164
        # should be in utf8
 
1165
        expected_dirblocks = [
 
1166
                (('', '.'),
 
1167
                 [(name0, name0, 'file', './' + name0u),
 
1168
                  (name1, name1, 'directory', './' + name1u),
 
1169
                  (name2, name2, 'file', './' + name2u),
 
1170
                 ]
 
1171
                ),
 
1172
                ((name1, './' + name1u),
 
1173
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1174
                                                        + '/' + name0u),
 
1175
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1176
                                                            + '/' + name1u),
 
1177
                 ]
 
1178
                ),
 
1179
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1180
                 [
 
1181
                 ]
 
1182
                ),
 
1183
            ]
 
1184
        result = list(_walkdirs_utf8_win32_find_file(u'.'))
 
1185
        self._filter_out_stat(result)
 
1186
        self.assertEqual(expected_dirblocks, result)
 
1187
 
 
1188
    def assertStatIsCorrect(self, path, win32stat):
 
1189
        os_stat = os.stat(path)
 
1190
        self.assertEqual(os_stat.st_size, win32stat.st_size)
 
1191
        self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
 
1192
        self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
 
1193
        self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
 
1194
        self.assertEqual(os_stat.st_dev, win32stat.st_dev)
 
1195
        self.assertEqual(os_stat.st_ino, win32stat.st_ino)
 
1196
        self.assertEqual(os_stat.st_mode, win32stat.st_mode)
 
1197
 
 
1198
    def test__walkdirs_utf_win32_find_file_stat_file(self):
 
1199
        """make sure our Stat values are valid"""
 
1200
        self.requireFeature(WalkdirsWin32Feature)
 
1201
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1202
        from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
 
1203
        name0u = u'0file-\xb6'
 
1204
        name0 = name0u.encode('utf8')
 
1205
        self.build_tree([name0u])
 
1206
        # I hate to sleep() here, but I'm trying to make the ctime different
 
1207
        # from the mtime
 
1208
        time.sleep(2)
 
1209
        f = open(name0u, 'ab')
 
1210
        try:
 
1211
            f.write('just a small update')
 
1212
        finally:
 
1213
            f.close()
 
1214
 
 
1215
        result = list(_walkdirs_utf8_win32_find_file(u'.'))
 
1216
        entry = result[0][1][0]
 
1217
        self.assertEqual((name0, name0, 'file'), entry[:3])
 
1218
        self.assertEqual(u'./' + name0u, entry[4])
 
1219
        self.assertStatIsCorrect(entry[4], entry[3])
 
1220
        self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
 
1221
 
 
1222
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
 
1223
        """make sure our Stat values are valid"""
 
1224
        self.requireFeature(WalkdirsWin32Feature)
 
1225
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1226
        from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
 
1227
        name0u = u'0dir-\u062c\u0648'
 
1228
        name0 = name0u.encode('utf8')
 
1229
        self.build_tree([name0u + '/'])
 
1230
 
 
1231
        result = list(_walkdirs_utf8_win32_find_file(u'.'))
 
1232
        entry = result[0][1][0]
 
1233
        self.assertEqual((name0, name0, 'directory'), entry[:3])
 
1234
        self.assertEqual(u'./' + name0u, entry[4])
 
1235
        self.assertStatIsCorrect(entry[4], entry[3])
 
1236
 
 
1237
    def assertPathCompare(self, path_less, path_greater):
 
1238
        """check that path_less and path_greater compare correctly."""
 
1239
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
1240
            path_less, path_less))
 
1241
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
1242
            path_greater, path_greater))
 
1243
        self.assertEqual(-1, osutils.compare_paths_prefix_order(
 
1244
            path_less, path_greater))
 
1245
        self.assertEqual(1, osutils.compare_paths_prefix_order(
 
1246
            path_greater, path_less))
 
1247
 
 
1248
    def test_compare_paths_prefix_order(self):
 
1249
        # root before all else
 
1250
        self.assertPathCompare("/", "/a")
 
1251
        # alpha within a dir
 
1252
        self.assertPathCompare("/a", "/b")
 
1253
        self.assertPathCompare("/b", "/z")
 
1254
        # high dirs before lower.
 
1255
        self.assertPathCompare("/z", "/a/a")
 
1256
        # except if the deeper dir should be output first
 
1257
        self.assertPathCompare("/a/b/c", "/d/g")
 
1258
        # lexical betwen dirs of the same height
 
1259
        self.assertPathCompare("/a/z", "/z/z")
 
1260
        self.assertPathCompare("/a/c/z", "/a/d/e")
 
1261
 
 
1262
        # this should also be consistent for no leading / paths
 
1263
        # root before all else
 
1264
        self.assertPathCompare("", "a")
 
1265
        # alpha within a dir
 
1266
        self.assertPathCompare("a", "b")
 
1267
        self.assertPathCompare("b", "z")
 
1268
        # high dirs before lower.
 
1269
        self.assertPathCompare("z", "a/a")
 
1270
        # except if the deeper dir should be output first
 
1271
        self.assertPathCompare("a/b/c", "d/g")
 
1272
        # lexical betwen dirs of the same height
 
1273
        self.assertPathCompare("a/z", "z/z")
 
1274
        self.assertPathCompare("a/c/z", "a/d/e")
 
1275
 
 
1276
    def test_path_prefix_sorting(self):
 
1277
        """Doing a sort on path prefix should match our sample data."""
 
1278
        original_paths = [
 
1279
            'a',
 
1280
            'a/b',
 
1281
            'a/b/c',
 
1282
            'b',
 
1283
            'b/c',
 
1284
            'd',
 
1285
            'd/e',
 
1286
            'd/e/f',
 
1287
            'd/f',
 
1288
            'd/g',
 
1289
            'g',
 
1290
            ]
 
1291
 
 
1292
        dir_sorted_paths = [
 
1293
            'a',
 
1294
            'b',
 
1295
            'd',
 
1296
            'g',
 
1297
            'a/b',
 
1298
            'a/b/c',
 
1299
            'b/c',
 
1300
            'd/e',
 
1301
            'd/f',
 
1302
            'd/g',
 
1303
            'd/e/f',
 
1304
            ]
 
1305
 
 
1306
        self.assertEqual(
 
1307
            dir_sorted_paths,
 
1308
            sorted(original_paths, key=osutils.path_prefix_key))
 
1309
        # using the comparison routine shoudl work too:
 
1310
        self.assertEqual(
 
1311
            dir_sorted_paths,
 
1312
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
 
1313
 
 
1314
 
 
1315
class TestCopyTree(TestCaseInTempDir):
 
1316
    
 
1317
    def test_copy_basic_tree(self):
 
1318
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1319
        osutils.copy_tree('source', 'target')
 
1320
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1321
        self.assertEqual(['c'], os.listdir('target/b'))
 
1322
 
 
1323
    def test_copy_tree_target_exists(self):
 
1324
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
 
1325
                         'target/'])
 
1326
        osutils.copy_tree('source', 'target')
 
1327
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1328
        self.assertEqual(['c'], os.listdir('target/b'))
 
1329
 
 
1330
    def test_copy_tree_symlinks(self):
 
1331
        self.requireFeature(SymlinkFeature)
 
1332
        self.build_tree(['source/'])
 
1333
        os.symlink('a/generic/path', 'source/lnk')
 
1334
        osutils.copy_tree('source', 'target')
 
1335
        self.assertEqual(['lnk'], os.listdir('target'))
 
1336
        self.assertEqual('a/generic/path', os.readlink('target/lnk'))
 
1337
 
 
1338
    def test_copy_tree_handlers(self):
 
1339
        processed_files = []
 
1340
        processed_links = []
 
1341
        def file_handler(from_path, to_path):
 
1342
            processed_files.append(('f', from_path, to_path))
 
1343
        def dir_handler(from_path, to_path):
 
1344
            processed_files.append(('d', from_path, to_path))
 
1345
        def link_handler(from_path, to_path):
 
1346
            processed_links.append((from_path, to_path))
 
1347
        handlers = {'file':file_handler,
 
1348
                    'directory':dir_handler,
 
1349
                    'symlink':link_handler,
 
1350
                   }
 
1351
 
 
1352
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1353
        if osutils.has_symlinks():
 
1354
            os.symlink('a/generic/path', 'source/lnk')
 
1355
        osutils.copy_tree('source', 'target', handlers=handlers)
 
1356
 
 
1357
        self.assertEqual([('d', 'source', 'target'),
 
1358
                          ('f', 'source/a', 'target/a'),
 
1359
                          ('d', 'source/b', 'target/b'),
 
1360
                          ('f', 'source/b/c', 'target/b/c'),
 
1361
                         ], processed_files)
 
1362
        self.failIfExists('target')
 
1363
        if osutils.has_symlinks():
 
1364
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
 
1365
 
 
1366
 
 
1367
#class TestTerminalEncoding has been moved to test_osutils_encodings.py
 
1368
# [bialix] 2006/12/26
 
1369
 
 
1370
 
 
1371
class TestSetUnsetEnv(TestCase):
 
1372
    """Test updating the environment"""
 
1373
 
 
1374
    def setUp(self):
 
1375
        super(TestSetUnsetEnv, self).setUp()
 
1376
 
 
1377
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
 
1378
                         'Environment was not cleaned up properly.'
 
1379
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
 
1380
        def cleanup():
 
1381
            if 'BZR_TEST_ENV_VAR' in os.environ:
 
1382
                del os.environ['BZR_TEST_ENV_VAR']
 
1383
 
 
1384
        self.addCleanup(cleanup)
 
1385
 
 
1386
    def test_set(self):
 
1387
        """Test that we can set an env variable"""
 
1388
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1389
        self.assertEqual(None, old)
 
1390
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
 
1391
 
 
1392
    def test_double_set(self):
 
1393
        """Test that we get the old value out"""
 
1394
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1395
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
 
1396
        self.assertEqual('foo', old)
 
1397
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
 
1398
 
 
1399
    def test_unicode(self):
 
1400
        """Environment can only contain plain strings
 
1401
        
 
1402
        So Unicode strings must be encoded.
 
1403
        """
 
1404
        uni_val, env_val = probe_unicode_in_user_encoding()
 
1405
        if uni_val is None:
 
1406
            raise TestSkipped('Cannot find a unicode character that works in'
 
1407
                              ' encoding %s' % (osutils.get_user_encoding(),))
 
1408
 
 
1409
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
 
1410
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
 
1411
 
 
1412
    def test_unset(self):
 
1413
        """Test that passing None will remove the env var"""
 
1414
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1415
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
 
1416
        self.assertEqual('foo', old)
 
1417
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
 
1418
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1419
 
 
1420
 
 
1421
class TestLocalTimeOffset(TestCase):
 
1422
 
 
1423
    def test_local_time_offset(self):
 
1424
        """Test that local_time_offset() returns a sane value."""
 
1425
        offset = osutils.local_time_offset()
 
1426
        self.assertTrue(isinstance(offset, int))
 
1427
        # Test that the offset is no more than a eighteen hours in
 
1428
        # either direction.
 
1429
        # Time zone handling is system specific, so it is difficult to
 
1430
        # do more specific tests, but a value outside of this range is
 
1431
        # probably wrong.
 
1432
        eighteen_hours = 18 * 3600
 
1433
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
1434
 
 
1435
    def test_local_time_offset_with_timestamp(self):
 
1436
        """Test that local_time_offset() works with a timestamp."""
 
1437
        offset = osutils.local_time_offset(1000000000.1234567)
 
1438
        self.assertTrue(isinstance(offset, int))
 
1439
        eighteen_hours = 18 * 3600
 
1440
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
1441
 
 
1442
 
 
1443
class TestShaFileByName(TestCaseInTempDir):
 
1444
 
 
1445
    def test_sha_empty(self):
 
1446
        self.build_tree_contents([('foo', '')])
 
1447
        expected_sha = osutils.sha_string('')
 
1448
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1449
 
 
1450
    def test_sha_mixed_endings(self):
 
1451
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1452
        self.build_tree_contents([('foo', text)])
 
1453
        expected_sha = osutils.sha_string(text)
 
1454
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1455
 
 
1456
 
 
1457
_debug_text = \
 
1458
r'''# Copyright (C) 2005, 2006 Canonical Ltd
 
1459
#
 
1460
# This program is free software; you can redistribute it and/or modify
 
1461
# it under the terms of the GNU General Public License as published by
 
1462
# the Free Software Foundation; either version 2 of the License, or
 
1463
# (at your option) any later version.
 
1464
#
 
1465
# This program is distributed in the hope that it will be useful,
 
1466
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
1467
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
1468
# GNU General Public License for more details.
 
1469
#
 
1470
# You should have received a copy of the GNU General Public License
 
1471
# along with this program; if not, write to the Free Software
 
1472
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
1473
 
 
1474
 
 
1475
# NOTE: If update these, please also update the help for global-options in
 
1476
#       bzrlib/help_topics/__init__.py
 
1477
 
 
1478
debug_flags = set()
 
1479
"""Set of flags that enable different debug behaviour.
 
1480
 
 
1481
These are set with eg ``-Dlock`` on the bzr command line.
 
1482
 
 
1483
Options include:
 
1484
 
 
1485
 * auth - show authentication sections used
 
1486
 * error - show stack traces for all top level exceptions
 
1487
 * evil - capture call sites that do expensive or badly-scaling operations.
 
1488
 * fetch - trace history copying between repositories
 
1489
 * graph - trace graph traversal information
 
1490
 * hashcache - log every time a working file is read to determine its hash
 
1491
 * hooks - trace hook execution
 
1492
 * hpss - trace smart protocol requests and responses
 
1493
 * http - trace http connections, requests and responses
 
1494
 * index - trace major index operations
 
1495
 * knit - trace knit operations
 
1496
 * lock - trace when lockdir locks are taken or released
 
1497
 * merge - emit information for debugging merges
 
1498
 * pack - emit information about pack operations
 
1499
 
 
1500
"""
 
1501
'''
 
1502
 
 
1503
 
 
1504
class TestResourceLoading(TestCaseInTempDir):
 
1505
 
 
1506
    def test_resource_string(self):
 
1507
        # test resource in bzrlib
 
1508
        text = osutils.resource_string('bzrlib', 'debug.py')
 
1509
        self.assertEquals(_debug_text, text)
 
1510
        # test resource under bzrlib
 
1511
        text = osutils.resource_string('bzrlib.ui', 'text.py')
 
1512
        self.assertContainsRe(text, "class TextUIFactory")
 
1513
        # test unsupported package
 
1514
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
 
1515
            'yyy.xx')
 
1516
        # test unknown resource
 
1517
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')