1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the osutils wrapper."""
19
from cStringIO import StringIO
35
from bzrlib.tests import (
41
class _UTF8DirReaderFeature(tests.Feature):
45
from bzrlib import _readdir_pyx
46
self.reader = _readdir_pyx.UTF8DirReader
51
def feature_name(self):
52
return 'bzrlib._readdir_pyx'
54
UTF8DirReaderFeature = _UTF8DirReaderFeature()
57
def _already_unicode(s):
61
def _fs_enc_to_unicode(s):
62
return s.decode(osutils._fs_enc)
65
def _utf8_to_unicode(s):
66
return s.decode('UTF-8')
69
def dir_reader_scenarios():
70
# For each dir reader we define:
72
# - native_to_unicode: a function converting the native_abspath as returned
73
# by DirReader.read_dir to its unicode representation
75
# UnicodeDirReader is the fallback, it should be tested on all platforms.
76
scenarios = [('unicode',
77
dict(_dir_reader_class=osutils.UnicodeDirReader,
78
_native_to_unicode=_already_unicode))]
79
# Some DirReaders are platform specific and even there they may not be
81
if UTF8DirReaderFeature.available():
82
from bzrlib import _readdir_pyx
83
scenarios.append(('utf8',
84
dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
85
_native_to_unicode=_utf8_to_unicode)))
87
if test__walkdirs_win32.Win32ReadDirFeature.available():
89
from bzrlib import _walkdirs_win32
90
# TODO: check on windows, it may be that we need to use/add
91
# safe_unicode instead of _fs_enc_to_unicode
94
dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
95
_native_to_unicode=_fs_enc_to_unicode)))
101
def load_tests(basic_tests, module, loader):
102
suite = loader.suiteClass()
103
dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
104
basic_tests, tests.condition_isinstance(TestDirReader))
105
tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
106
suite.addTest(remaining_tests)
110
class TestContainsWhitespace(tests.TestCase):
112
def test_contains_whitespace(self):
113
self.failUnless(osutils.contains_whitespace(u' '))
114
self.failUnless(osutils.contains_whitespace(u'hello there'))
115
self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
116
self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
117
self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
118
self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
120
# \xa0 is "Non-breaking-space" which on some python locales thinks it
121
# is whitespace, but we do not.
122
self.failIf(osutils.contains_whitespace(u''))
123
self.failIf(osutils.contains_whitespace(u'hellothere'))
124
self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
127
class TestRename(tests.TestCaseInTempDir):
129
def test_fancy_rename(self):
130
# This should work everywhere
132
osutils.fancy_rename(a, b,
133
rename_func=os.rename,
134
unlink_func=os.unlink)
136
open('a', 'wb').write('something in a\n')
138
self.failIfExists('a')
139
self.failUnlessExists('b')
140
self.check_file_contents('b', 'something in a\n')
142
open('a', 'wb').write('new something in a\n')
145
self.check_file_contents('a', 'something in a\n')
147
def test_rename(self):
148
# Rename should be semi-atomic on all platforms
149
open('a', 'wb').write('something in a\n')
150
osutils.rename('a', 'b')
151
self.failIfExists('a')
152
self.failUnlessExists('b')
153
self.check_file_contents('b', 'something in a\n')
155
open('a', 'wb').write('new something in a\n')
156
osutils.rename('b', 'a')
158
self.check_file_contents('a', 'something in a\n')
160
# TODO: test fancy_rename using a MemoryTransport
162
def test_rename_change_case(self):
163
# on Windows we should be able to change filename case by rename
164
self.build_tree(['a', 'b/'])
165
osutils.rename('a', 'A')
166
osutils.rename('b', 'B')
167
# we can't use failUnlessExists on case-insensitive filesystem
168
# so try to check shape of the tree
169
shape = sorted(os.listdir('.'))
170
self.assertEquals(['A', 'B'], shape)
173
class TestRandChars(tests.TestCase):
175
def test_01_rand_chars_empty(self):
176
result = osutils.rand_chars(0)
177
self.assertEqual(result, '')
179
def test_02_rand_chars_100(self):
180
result = osutils.rand_chars(100)
181
self.assertEqual(len(result), 100)
182
self.assertEqual(type(result), str)
183
self.assertContainsRe(result, r'^[a-z0-9]{100}$')
186
class TestIsInside(tests.TestCase):
188
def test_is_inside(self):
189
is_inside = osutils.is_inside
190
self.assertTrue(is_inside('src', 'src/foo.c'))
191
self.assertFalse(is_inside('src', 'srccontrol'))
192
self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
193
self.assertTrue(is_inside('foo.c', 'foo.c'))
194
self.assertFalse(is_inside('foo.c', ''))
195
self.assertTrue(is_inside('', 'foo.c'))
197
def test_is_inside_any(self):
198
SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
199
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
200
(['src'], SRC_FOO_C),
203
self.assert_(osutils.is_inside_any(dirs, fn))
204
for dirs, fn in [(['src'], 'srccontrol'),
205
(['src'], 'srccontrol/foo')]:
206
self.assertFalse(osutils.is_inside_any(dirs, fn))
208
def test_is_inside_or_parent_of_any(self):
209
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
210
(['src'], 'src/foo.c'),
211
(['src/bar.c'], 'src'),
212
(['src/bar.c', 'bla/foo.c'], 'src'),
215
self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
217
for dirs, fn in [(['src'], 'srccontrol'),
218
(['srccontrol/foo.c'], 'src'),
219
(['src'], 'srccontrol/foo')]:
220
self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
223
class TestRmTree(tests.TestCaseInTempDir):
225
def test_rmtree(self):
226
# Check to remove tree with read-only files/dirs
228
f = file('dir/file', 'w')
231
# would like to also try making the directory readonly, but at the
232
# moment python shutil.rmtree doesn't handle that properly - it would
233
# need to chmod the directory before removing things inside it - deferred
234
# for now -- mbp 20060505
235
# osutils.make_readonly('dir')
236
osutils.make_readonly('dir/file')
238
osutils.rmtree('dir')
240
self.failIfExists('dir/file')
241
self.failIfExists('dir')
244
class TestDeleteAny(tests.TestCaseInTempDir):
246
def test_delete_any_readonly(self):
247
# from <https://bugs.launchpad.net/bzr/+bug/218206>
248
self.build_tree(['d/', 'f'])
249
osutils.make_readonly('d')
250
osutils.make_readonly('f')
252
osutils.delete_any('f')
253
osutils.delete_any('d')
256
class TestKind(tests.TestCaseInTempDir):
258
def test_file_kind(self):
259
self.build_tree(['file', 'dir/'])
260
self.assertEquals('file', osutils.file_kind('file'))
261
self.assertEquals('directory', osutils.file_kind('dir/'))
262
if osutils.has_symlinks():
263
os.symlink('symlink', 'symlink')
264
self.assertEquals('symlink', osutils.file_kind('symlink'))
266
# TODO: jam 20060529 Test a block device
268
os.lstat('/dev/null')
270
if e.errno not in (errno.ENOENT,):
273
self.assertEquals('chardev', osutils.file_kind('/dev/null'))
275
mkfifo = getattr(os, 'mkfifo', None)
279
self.assertEquals('fifo', osutils.file_kind('fifo'))
283
AF_UNIX = getattr(socket, 'AF_UNIX', None)
285
s = socket.socket(AF_UNIX)
288
self.assertEquals('socket', osutils.file_kind('socket'))
292
def test_kind_marker(self):
293
self.assertEqual("", osutils.kind_marker("file"))
294
self.assertEqual("/", osutils.kind_marker('directory'))
295
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
296
self.assertEqual("@", osutils.kind_marker("symlink"))
297
self.assertEqual("+", osutils.kind_marker("tree-reference"))
298
self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
301
class TestUmask(tests.TestCaseInTempDir):
303
def test_get_umask(self):
304
if sys.platform == 'win32':
305
# umask always returns '0', no way to set it
306
self.assertEqual(0, osutils.get_umask())
309
orig_umask = osutils.get_umask()
310
self.addCleanup(os.umask, orig_umask)
312
self.assertEqual(0222, osutils.get_umask())
314
self.assertEqual(0022, osutils.get_umask())
316
self.assertEqual(0002, osutils.get_umask())
318
self.assertEqual(0027, osutils.get_umask())
321
class TestDateTime(tests.TestCase):
323
def assertFormatedDelta(self, expected, seconds):
324
"""Assert osutils.format_delta formats as expected"""
325
actual = osutils.format_delta(seconds)
326
self.assertEqual(expected, actual)
328
def test_format_delta(self):
329
self.assertFormatedDelta('0 seconds ago', 0)
330
self.assertFormatedDelta('1 second ago', 1)
331
self.assertFormatedDelta('10 seconds ago', 10)
332
self.assertFormatedDelta('59 seconds ago', 59)
333
self.assertFormatedDelta('89 seconds ago', 89)
334
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
335
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
336
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
337
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
338
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
339
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
340
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
341
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
342
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
343
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
344
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
345
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
346
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
347
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
348
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
349
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
350
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
351
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
353
# We handle when time steps the wrong direction because computers
354
# don't have synchronized clocks.
355
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
356
self.assertFormatedDelta('1 second in the future', -1)
357
self.assertFormatedDelta('2 seconds in the future', -2)
359
def test_format_date(self):
360
self.assertRaises(errors.UnsupportedTimezoneFormat,
361
osutils.format_date, 0, timezone='foo')
362
self.assertIsInstance(osutils.format_date(0), str)
363
self.assertIsInstance(osutils.format_local_date(0), unicode)
364
# Testing for the actual value of the local weekday without
365
# duplicating the code from format_date is difficult.
366
# Instead blackbox.test_locale should check for localized
367
# dates once they do occur in output strings.
369
def test_local_time_offset(self):
370
"""Test that local_time_offset() returns a sane value."""
371
offset = osutils.local_time_offset()
372
self.assertTrue(isinstance(offset, int))
373
# Test that the offset is no more than a eighteen hours in
375
# Time zone handling is system specific, so it is difficult to
376
# do more specific tests, but a value outside of this range is
378
eighteen_hours = 18 * 3600
379
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
381
def test_local_time_offset_with_timestamp(self):
382
"""Test that local_time_offset() works with a timestamp."""
383
offset = osutils.local_time_offset(1000000000.1234567)
384
self.assertTrue(isinstance(offset, int))
385
eighteen_hours = 18 * 3600
386
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
389
class TestLinks(tests.TestCaseInTempDir):
391
def test_dereference_path(self):
392
self.requireFeature(tests.SymlinkFeature)
393
cwd = osutils.realpath('.')
395
bar_path = osutils.pathjoin(cwd, 'bar')
396
# Using './' to avoid bug #1213894 (first path component not
397
# dereferenced) in Python 2.4.1 and earlier
398
self.assertEqual(bar_path, osutils.realpath('./bar'))
399
os.symlink('bar', 'foo')
400
self.assertEqual(bar_path, osutils.realpath('./foo'))
402
# Does not dereference terminal symlinks
403
foo_path = osutils.pathjoin(cwd, 'foo')
404
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
406
# Dereferences parent symlinks
408
baz_path = osutils.pathjoin(bar_path, 'baz')
409
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
411
# Dereferences parent symlinks that are the first path element
412
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
414
# Dereferences parent symlinks in absolute paths
415
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
416
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
418
def test_changing_access(self):
419
f = file('file', 'w')
423
# Make a file readonly
424
osutils.make_readonly('file')
425
mode = os.lstat('file').st_mode
426
self.assertEqual(mode, mode & 0777555)
428
# Make a file writable
429
osutils.make_writable('file')
430
mode = os.lstat('file').st_mode
431
self.assertEqual(mode, mode | 0200)
433
if osutils.has_symlinks():
434
# should not error when handed a symlink
435
os.symlink('nonexistent', 'dangling')
436
osutils.make_readonly('dangling')
437
osutils.make_writable('dangling')
439
def test_host_os_dereferences_symlinks(self):
440
osutils.host_os_dereferences_symlinks()
443
class TestCanonicalRelPath(tests.TestCaseInTempDir):
445
_test_needs_features = [tests.CaseInsCasePresFilenameFeature]
447
def test_canonical_relpath_simple(self):
448
f = file('MixedCaseName', 'w')
450
actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
451
self.failUnlessEqual('work/MixedCaseName', actual)
453
def test_canonical_relpath_missing_tail(self):
454
os.mkdir('MixedCaseParent')
455
actual = osutils.canonical_relpath(self.test_base_dir,
456
'mixedcaseparent/nochild')
457
self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
460
class TestPumpFile(tests.TestCase):
461
"""Test pumpfile method."""
464
tests.TestCase.setUp(self)
465
# create a test datablock
466
self.block_size = 512
467
pattern = '0123456789ABCDEF'
468
self.test_data = pattern * (3 * self.block_size / len(pattern))
469
self.test_data_len = len(self.test_data)
471
def test_bracket_block_size(self):
472
"""Read data in blocks with the requested read size bracketing the
474
# make sure test data is larger than max read size
475
self.assertTrue(self.test_data_len > self.block_size)
477
from_file = file_utils.FakeReadFile(self.test_data)
480
# read (max / 2) bytes and verify read size wasn't affected
481
num_bytes_to_read = self.block_size / 2
482
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
483
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
484
self.assertEqual(from_file.get_read_count(), 1)
486
# read (max) bytes and verify read size wasn't affected
487
num_bytes_to_read = self.block_size
488
from_file.reset_read_count()
489
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
490
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
491
self.assertEqual(from_file.get_read_count(), 1)
493
# read (max + 1) bytes and verify read size was limited
494
num_bytes_to_read = self.block_size + 1
495
from_file.reset_read_count()
496
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
497
self.assertEqual(from_file.get_max_read_size(), self.block_size)
498
self.assertEqual(from_file.get_read_count(), 2)
500
# finish reading the rest of the data
501
num_bytes_to_read = self.test_data_len - to_file.tell()
502
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
504
# report error if the data wasn't equal (we only report the size due
505
# to the length of the data)
506
response_data = to_file.getvalue()
507
if response_data != self.test_data:
508
message = "Data not equal. Expected %d bytes, received %d."
509
self.fail(message % (len(response_data), self.test_data_len))
511
def test_specified_size(self):
512
"""Request a transfer larger than the maximum block size and verify
513
that the maximum read doesn't exceed the block_size."""
514
# make sure test data is larger than max read size
515
self.assertTrue(self.test_data_len > self.block_size)
517
# retrieve data in blocks
518
from_file = file_utils.FakeReadFile(self.test_data)
520
osutils.pumpfile(from_file, to_file, self.test_data_len,
523
# verify read size was equal to the maximum read size
524
self.assertTrue(from_file.get_max_read_size() > 0)
525
self.assertEqual(from_file.get_max_read_size(), self.block_size)
526
self.assertEqual(from_file.get_read_count(), 3)
528
# report error if the data wasn't equal (we only report the size due
529
# to the length of the data)
530
response_data = to_file.getvalue()
531
if response_data != self.test_data:
532
message = "Data not equal. Expected %d bytes, received %d."
533
self.fail(message % (len(response_data), self.test_data_len))
535
def test_to_eof(self):
536
"""Read to end-of-file and verify that the reads are not larger than
537
the maximum read size."""
538
# make sure test data is larger than max read size
539
self.assertTrue(self.test_data_len > self.block_size)
541
# retrieve data to EOF
542
from_file = file_utils.FakeReadFile(self.test_data)
544
osutils.pumpfile(from_file, to_file, -1, self.block_size)
546
# verify read size was equal to the maximum read size
547
self.assertEqual(from_file.get_max_read_size(), self.block_size)
548
self.assertEqual(from_file.get_read_count(), 4)
550
# report error if the data wasn't equal (we only report the size due
551
# to the length of the data)
552
response_data = to_file.getvalue()
553
if response_data != self.test_data:
554
message = "Data not equal. Expected %d bytes, received %d."
555
self.fail(message % (len(response_data), self.test_data_len))
557
def test_defaults(self):
558
"""Verifies that the default arguments will read to EOF -- this
559
test verifies that any existing usages of pumpfile will not be broken
560
with this new version."""
561
# retrieve data using default (old) pumpfile method
562
from_file = file_utils.FakeReadFile(self.test_data)
564
osutils.pumpfile(from_file, to_file)
566
# report error if the data wasn't equal (we only report the size due
567
# to the length of the data)
568
response_data = to_file.getvalue()
569
if response_data != self.test_data:
570
message = "Data not equal. Expected %d bytes, received %d."
571
self.fail(message % (len(response_data), self.test_data_len))
573
def test_report_activity(self):
575
def log_activity(length, direction):
576
activity.append((length, direction))
577
from_file = StringIO(self.test_data)
579
osutils.pumpfile(from_file, to_file, buff_size=500,
580
report_activity=log_activity, direction='read')
581
self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
582
(36, 'read')], activity)
584
from_file = StringIO(self.test_data)
587
osutils.pumpfile(from_file, to_file, buff_size=500,
588
report_activity=log_activity, direction='write')
589
self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
590
(36, 'write')], activity)
592
# And with a limited amount of data
593
from_file = StringIO(self.test_data)
596
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
597
report_activity=log_activity, direction='read')
598
self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
602
class TestPumpStringFile(tests.TestCase):
604
def test_empty(self):
606
osutils.pump_string_file("", output)
607
self.assertEqual("", output.getvalue())
609
def test_more_than_segment_size(self):
611
osutils.pump_string_file("123456789", output, 2)
612
self.assertEqual("123456789", output.getvalue())
614
def test_segment_size(self):
616
osutils.pump_string_file("12", output, 2)
617
self.assertEqual("12", output.getvalue())
619
def test_segment_size_multiple(self):
621
osutils.pump_string_file("1234", output, 2)
622
self.assertEqual("1234", output.getvalue())
625
class TestRelpath(tests.TestCase):
627
def test_simple_relpath(self):
628
cwd = osutils.getcwd()
629
subdir = cwd + '/subdir'
630
self.assertEqual('subdir', osutils.relpath(cwd, subdir))
632
def test_deep_relpath(self):
633
cwd = osutils.getcwd()
634
subdir = cwd + '/sub/subsubdir'
635
self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
637
def test_not_relative(self):
638
self.assertRaises(errors.PathNotChild,
639
osutils.relpath, 'C:/path', 'H:/path')
640
self.assertRaises(errors.PathNotChild,
641
osutils.relpath, 'C:/', 'H:/path')
644
class TestSafeUnicode(tests.TestCase):
646
def test_from_ascii_string(self):
647
self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
649
def test_from_unicode_string_ascii_contents(self):
650
self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
652
def test_from_unicode_string_unicode_contents(self):
653
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
655
def test_from_utf8_string(self):
656
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
658
def test_bad_utf8_string(self):
659
self.assertRaises(errors.BzrBadParameterNotUnicode,
660
osutils.safe_unicode,
664
class TestSafeUtf8(tests.TestCase):
666
def test_from_ascii_string(self):
668
self.assertEqual('foobar', osutils.safe_utf8(f))
670
def test_from_unicode_string_ascii_contents(self):
671
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
673
def test_from_unicode_string_unicode_contents(self):
674
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
676
def test_from_utf8_string(self):
677
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
679
def test_bad_utf8_string(self):
680
self.assertRaises(errors.BzrBadParameterNotUnicode,
681
osutils.safe_utf8, '\xbb\xbb')
684
class TestSafeRevisionId(tests.TestCase):
686
def test_from_ascii_string(self):
687
# this shouldn't give a warning because it's getting an ascii string
688
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
690
def test_from_unicode_string_ascii_contents(self):
691
self.assertEqual('bargam',
692
osutils.safe_revision_id(u'bargam', warn=False))
694
def test_from_unicode_deprecated(self):
695
self.assertEqual('bargam',
696
self.callDeprecated([osutils._revision_id_warning],
697
osutils.safe_revision_id, u'bargam'))
699
def test_from_unicode_string_unicode_contents(self):
700
self.assertEqual('bargam\xc2\xae',
701
osutils.safe_revision_id(u'bargam\xae', warn=False))
703
def test_from_utf8_string(self):
704
self.assertEqual('foo\xc2\xae',
705
osutils.safe_revision_id('foo\xc2\xae'))
708
"""Currently, None is a valid revision_id"""
709
self.assertEqual(None, osutils.safe_revision_id(None))
712
class TestSafeFileId(tests.TestCase):
714
def test_from_ascii_string(self):
715
self.assertEqual('foobar', osutils.safe_file_id('foobar'))
717
def test_from_unicode_string_ascii_contents(self):
718
self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
720
def test_from_unicode_deprecated(self):
721
self.assertEqual('bargam',
722
self.callDeprecated([osutils._file_id_warning],
723
osutils.safe_file_id, u'bargam'))
725
def test_from_unicode_string_unicode_contents(self):
726
self.assertEqual('bargam\xc2\xae',
727
osutils.safe_file_id(u'bargam\xae', warn=False))
729
def test_from_utf8_string(self):
730
self.assertEqual('foo\xc2\xae',
731
osutils.safe_file_id('foo\xc2\xae'))
734
"""Currently, None is a valid revision_id"""
735
self.assertEqual(None, osutils.safe_file_id(None))
738
class TestWin32Funcs(tests.TestCase):
739
"""Test that _win32 versions of os utilities return appropriate paths."""
741
def test_abspath(self):
742
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
743
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
744
self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
745
self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
747
def test_realpath(self):
748
self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
749
self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
751
def test_pathjoin(self):
752
self.assertEqual('path/to/foo',
753
osutils._win32_pathjoin('path', 'to', 'foo'))
754
self.assertEqual('C:/foo',
755
osutils._win32_pathjoin('path\\to', 'C:\\foo'))
756
self.assertEqual('C:/foo',
757
osutils._win32_pathjoin('path/to', 'C:/foo'))
758
self.assertEqual('path/to/foo',
759
osutils._win32_pathjoin('path/to/', 'foo'))
760
self.assertEqual('/foo',
761
osutils._win32_pathjoin('C:/path/to/', '/foo'))
762
self.assertEqual('/foo',
763
osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
765
def test_normpath(self):
766
self.assertEqual('path/to/foo',
767
osutils._win32_normpath(r'path\\from\..\to\.\foo'))
768
self.assertEqual('path/to/foo',
769
osutils._win32_normpath('path//from/../to/./foo'))
771
def test_getcwd(self):
772
cwd = osutils._win32_getcwd()
773
os_cwd = os.getcwdu()
774
self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
775
# win32 is inconsistent whether it returns lower or upper case
776
# and even if it was consistent the user might type the other
777
# so we force it to uppercase
778
# running python.exe under cmd.exe return capital C:\\
779
# running win32 python inside a cygwin shell returns lowercase
780
self.assertEqual(os_cwd[0].upper(), cwd[0])
782
def test_fixdrive(self):
783
self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
784
self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
785
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
787
def test_win98_abspath(self):
789
self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
790
self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
792
self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
793
self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
795
cwd = osutils.getcwd().rstrip('/')
796
drive = osutils._nt_splitdrive(cwd)[0]
797
self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
798
self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
801
self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
804
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
805
"""Test win32 functions that create files."""
807
def test_getcwd(self):
808
self.requireFeature(tests.UnicodeFilenameFeature)
811
# TODO: jam 20060427 This will probably fail on Mac OSX because
812
# it will change the normalization of B\xe5gfors
813
# Consider using a different unicode character, or make
814
# osutils.getcwd() renormalize the path.
815
self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
817
def test_minimum_path_selection(self):
818
self.assertEqual(set(),
819
osutils.minimum_path_selection([]))
820
self.assertEqual(set(['a']),
821
osutils.minimum_path_selection(['a']))
822
self.assertEqual(set(['a', 'b']),
823
osutils.minimum_path_selection(['a', 'b']))
824
self.assertEqual(set(['a/', 'b']),
825
osutils.minimum_path_selection(['a/', 'b']))
826
self.assertEqual(set(['a/', 'b']),
827
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
828
self.assertEqual(set(['a-b', 'a', 'a0b']),
829
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
831
def test_mkdtemp(self):
832
tmpdir = osutils._win32_mkdtemp(dir='.')
833
self.assertFalse('\\' in tmpdir)
835
def test_rename(self):
843
osutils._win32_rename('b', 'a')
844
self.failUnlessExists('a')
845
self.failIfExists('b')
846
self.assertFileEqual('baz\n', 'a')
848
def test_rename_missing_file(self):
854
osutils._win32_rename('b', 'a')
855
except (IOError, OSError), e:
856
self.assertEqual(errno.ENOENT, e.errno)
857
self.assertFileEqual('foo\n', 'a')
859
def test_rename_missing_dir(self):
862
osutils._win32_rename('b', 'a')
863
except (IOError, OSError), e:
864
self.assertEqual(errno.ENOENT, e.errno)
866
def test_rename_current_dir(self):
869
# You can't rename the working directory
870
# doing rename non-existant . usually
871
# just raises ENOENT, since non-existant
874
osutils._win32_rename('b', '.')
875
except (IOError, OSError), e:
876
self.assertEqual(errno.ENOENT, e.errno)
878
def test_splitpath(self):
879
def check(expected, path):
880
self.assertEqual(expected, osutils.splitpath(path))
883
check(['a', 'b'], 'a/b')
884
check(['a', 'b'], 'a/./b')
885
check(['a', '.b'], 'a/.b')
886
check(['a', '.b'], 'a\\.b')
888
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
891
class TestParentDirectories(tests.TestCaseInTempDir):
892
"""Test osutils.parent_directories()"""
894
def test_parent_directories(self):
895
self.assertEqual([], osutils.parent_directories('a'))
896
self.assertEqual(['a'], osutils.parent_directories('a/b'))
897
self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
900
class TestMacFuncsDirs(tests.TestCaseInTempDir):
901
"""Test mac special functions that require directories."""
903
def test_getcwd(self):
904
self.requireFeature(tests.UnicodeFilenameFeature)
905
os.mkdir(u'B\xe5gfors')
906
os.chdir(u'B\xe5gfors')
907
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
909
def test_getcwd_nonnorm(self):
910
self.requireFeature(tests.UnicodeFilenameFeature)
911
# Test that _mac_getcwd() will normalize this path
912
os.mkdir(u'Ba\u030agfors')
913
os.chdir(u'Ba\u030agfors')
914
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
917
class TestChunksToLines(tests.TestCase):
919
def test_smoketest(self):
920
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
921
osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
922
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
923
osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
925
def test_osutils_binding(self):
926
from bzrlib.tests import test__chunks_to_lines
927
if test__chunks_to_lines.CompiledChunksToLinesFeature.available():
928
from bzrlib._chunks_to_lines_pyx import chunks_to_lines
930
from bzrlib._chunks_to_lines_py import chunks_to_lines
931
self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
934
class TestSplitLines(tests.TestCase):
936
def test_split_unicode(self):
937
self.assertEqual([u'foo\n', u'bar\xae'],
938
osutils.split_lines(u'foo\nbar\xae'))
939
self.assertEqual([u'foo\n', u'bar\xae\n'],
940
osutils.split_lines(u'foo\nbar\xae\n'))
942
def test_split_with_carriage_returns(self):
943
self.assertEqual(['foo\rbar\n'],
944
osutils.split_lines('foo\rbar\n'))
947
class TestWalkDirs(tests.TestCaseInTempDir):
949
def assertExpectedBlocks(self, expected, result):
950
self.assertEqual(expected,
951
[(dirinfo, [line[0:3] for line in block])
952
for dirinfo, block in result])
954
def test_walkdirs(self):
963
self.build_tree(tree)
964
expected_dirblocks = [
966
[('0file', '0file', 'file'),
967
('1dir', '1dir', 'directory'),
968
('2file', '2file', 'file'),
972
[('1dir/0file', '0file', 'file'),
973
('1dir/1dir', '1dir', 'directory'),
976
(('1dir/1dir', './1dir/1dir'),
983
for dirdetail, dirblock in osutils.walkdirs('.'):
984
if len(dirblock) and dirblock[0][1] == '.bzr':
985
# this tests the filtering of selected paths
988
result.append((dirdetail, dirblock))
990
self.assertTrue(found_bzrdir)
991
self.assertExpectedBlocks(expected_dirblocks, result)
992
# you can search a subdir only, with a supplied prefix.
994
for dirblock in osutils.walkdirs('./1dir', '1dir'):
995
result.append(dirblock)
996
self.assertExpectedBlocks(expected_dirblocks[1:], result)
998
def test_walkdirs_os_error(self):
999
# <https://bugs.edge.launchpad.net/bzr/+bug/338653>
1000
# Pyrex readdir didn't raise useful messages if it had an error
1001
# reading the directory
1002
if sys.platform == 'win32':
1003
raise tests.TestNotApplicable(
1004
"readdir IOError not tested on win32")
1005
os.mkdir("test-unreadable")
1006
os.chmod("test-unreadable", 0000)
1007
# must chmod it back so that it can be removed
1008
self.addCleanup(os.chmod, "test-unreadable", 0700)
1009
# The error is not raised until the generator is actually evaluated.
1010
# (It would be ok if it happened earlier but at the moment it
1012
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1013
self.assertEquals('./test-unreadable', e.filename)
1014
self.assertEquals(errno.EACCES, e.errno)
1015
# Ensure the message contains the file name
1016
self.assertContainsRe(str(e), "\./test-unreadable")
1018
def test__walkdirs_utf8(self):
1027
self.build_tree(tree)
1028
expected_dirblocks = [
1030
[('0file', '0file', 'file'),
1031
('1dir', '1dir', 'directory'),
1032
('2file', '2file', 'file'),
1035
(('1dir', './1dir'),
1036
[('1dir/0file', '0file', 'file'),
1037
('1dir/1dir', '1dir', 'directory'),
1040
(('1dir/1dir', './1dir/1dir'),
1046
found_bzrdir = False
1047
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1048
if len(dirblock) and dirblock[0][1] == '.bzr':
1049
# this tests the filtering of selected paths
1052
result.append((dirdetail, dirblock))
1054
self.assertTrue(found_bzrdir)
1055
self.assertExpectedBlocks(expected_dirblocks, result)
1057
# you can search a subdir only, with a supplied prefix.
1059
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1060
result.append(dirblock)
1061
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1063
def _filter_out_stat(self, result):
1064
"""Filter out the stat value from the walkdirs result"""
1065
for dirdetail, dirblock in result:
1067
for info in dirblock:
1068
# Ignore info[3] which is the stat
1069
new_dirblock.append((info[0], info[1], info[2], info[4]))
1070
dirblock[:] = new_dirblock
1072
def _save_platform_info(self):
1073
cur_winver = win32utils.winver
1074
cur_fs_enc = osutils._fs_enc
1075
cur_dir_reader = osutils._selected_dir_reader
1077
win32utils.winver = cur_winver
1078
osutils._fs_enc = cur_fs_enc
1079
osutils._selected_dir_reader = cur_dir_reader
1080
self.addCleanup(restore)
1082
def assertDirReaderIs(self, expected):
1083
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1084
# Force it to redetect
1085
osutils._selected_dir_reader = None
1086
# Nothing to list, but should still trigger the selection logic
1087
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1088
self.assertIsInstance(osutils._selected_dir_reader, expected)
1090
def test_force_walkdirs_utf8_fs_utf8(self):
1091
self.requireFeature(UTF8DirReaderFeature)
1092
self._save_platform_info()
1093
win32utils.winver = None # Avoid the win32 detection code
1094
osutils._fs_enc = 'UTF-8'
1095
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1097
def test_force_walkdirs_utf8_fs_ascii(self):
1098
self.requireFeature(UTF8DirReaderFeature)
1099
self._save_platform_info()
1100
win32utils.winver = None # Avoid the win32 detection code
1101
osutils._fs_enc = 'US-ASCII'
1102
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1104
def test_force_walkdirs_utf8_fs_ANSI(self):
1105
self.requireFeature(UTF8DirReaderFeature)
1106
self._save_platform_info()
1107
win32utils.winver = None # Avoid the win32 detection code
1108
osutils._fs_enc = 'ANSI_X3.4-1968'
1109
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1111
def test_force_walkdirs_utf8_fs_latin1(self):
1112
self._save_platform_info()
1113
win32utils.winver = None # Avoid the win32 detection code
1114
osutils._fs_enc = 'latin1'
1115
self.assertDirReaderIs(osutils.UnicodeDirReader)
1117
def test_force_walkdirs_utf8_nt(self):
1118
# Disabled because the thunk of the whole walkdirs api is disabled.
1119
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1120
self._save_platform_info()
1121
win32utils.winver = 'Windows NT'
1122
from bzrlib._walkdirs_win32 import Win32ReadDir
1123
self.assertDirReaderIs(Win32ReadDir)
1125
def test_force_walkdirs_utf8_98(self):
1126
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1127
self._save_platform_info()
1128
win32utils.winver = 'Windows 98'
1129
self.assertDirReaderIs(osutils.UnicodeDirReader)
1131
def test_unicode_walkdirs(self):
1132
"""Walkdirs should always return unicode paths."""
1133
self.requireFeature(tests.UnicodeFilenameFeature)
1134
name0 = u'0file-\xb6'
1135
name1 = u'1dir-\u062c\u0648'
1136
name2 = u'2file-\u0633'
1140
name1 + '/' + name0,
1141
name1 + '/' + name1 + '/',
1144
self.build_tree(tree)
1145
expected_dirblocks = [
1147
[(name0, name0, 'file', './' + name0),
1148
(name1, name1, 'directory', './' + name1),
1149
(name2, name2, 'file', './' + name2),
1152
((name1, './' + name1),
1153
[(name1 + '/' + name0, name0, 'file', './' + name1
1155
(name1 + '/' + name1, name1, 'directory', './' + name1
1159
((name1 + '/' + name1, './' + name1 + '/' + name1),
1164
result = list(osutils.walkdirs('.'))
1165
self._filter_out_stat(result)
1166
self.assertEqual(expected_dirblocks, result)
1167
result = list(osutils.walkdirs(u'./'+name1, name1))
1168
self._filter_out_stat(result)
1169
self.assertEqual(expected_dirblocks[1:], result)
1171
def test_unicode__walkdirs_utf8(self):
1172
"""Walkdirs_utf8 should always return utf8 paths.
1174
The abspath portion might be in unicode or utf-8
1176
self.requireFeature(tests.UnicodeFilenameFeature)
1177
name0 = u'0file-\xb6'
1178
name1 = u'1dir-\u062c\u0648'
1179
name2 = u'2file-\u0633'
1183
name1 + '/' + name0,
1184
name1 + '/' + name1 + '/',
1187
self.build_tree(tree)
1188
name0 = name0.encode('utf8')
1189
name1 = name1.encode('utf8')
1190
name2 = name2.encode('utf8')
1192
expected_dirblocks = [
1194
[(name0, name0, 'file', './' + name0),
1195
(name1, name1, 'directory', './' + name1),
1196
(name2, name2, 'file', './' + name2),
1199
((name1, './' + name1),
1200
[(name1 + '/' + name0, name0, 'file', './' + name1
1202
(name1 + '/' + name1, name1, 'directory', './' + name1
1206
((name1 + '/' + name1, './' + name1 + '/' + name1),
1212
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1213
# all abspaths are Unicode, and encode them back into utf8.
1214
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1215
self.assertIsInstance(dirdetail[0], str)
1216
if isinstance(dirdetail[1], unicode):
1217
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1218
dirblock = [list(info) for info in dirblock]
1219
for info in dirblock:
1220
self.assertIsInstance(info[4], unicode)
1221
info[4] = info[4].encode('utf8')
1223
for info in dirblock:
1224
self.assertIsInstance(info[0], str)
1225
self.assertIsInstance(info[1], str)
1226
self.assertIsInstance(info[4], str)
1227
# Remove the stat information
1228
new_dirblock.append((info[0], info[1], info[2], info[4]))
1229
result.append((dirdetail, new_dirblock))
1230
self.assertEqual(expected_dirblocks, result)
1232
def test__walkdirs_utf8_with_unicode_fs(self):
1233
"""UnicodeDirReader should be a safe fallback everywhere
1235
The abspath portion should be in unicode
1237
self.requireFeature(tests.UnicodeFilenameFeature)
1238
# Use the unicode reader. TODO: split into driver-and-driven unit
1240
self._save_platform_info()
1241
osutils._selected_dir_reader = osutils.UnicodeDirReader()
1242
name0u = u'0file-\xb6'
1243
name1u = u'1dir-\u062c\u0648'
1244
name2u = u'2file-\u0633'
1248
name1u + '/' + name0u,
1249
name1u + '/' + name1u + '/',
1252
self.build_tree(tree)
1253
name0 = name0u.encode('utf8')
1254
name1 = name1u.encode('utf8')
1255
name2 = name2u.encode('utf8')
1257
# All of the abspaths should be in unicode, all of the relative paths
1259
expected_dirblocks = [
1261
[(name0, name0, 'file', './' + name0u),
1262
(name1, name1, 'directory', './' + name1u),
1263
(name2, name2, 'file', './' + name2u),
1266
((name1, './' + name1u),
1267
[(name1 + '/' + name0, name0, 'file', './' + name1u
1269
(name1 + '/' + name1, name1, 'directory', './' + name1u
1273
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1278
result = list(osutils._walkdirs_utf8('.'))
1279
self._filter_out_stat(result)
1280
self.assertEqual(expected_dirblocks, result)
1282
def test__walkdirs_utf8_win32readdir(self):
1283
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1284
self.requireFeature(tests.UnicodeFilenameFeature)
1285
from bzrlib._walkdirs_win32 import Win32ReadDir
1286
self._save_platform_info()
1287
osutils._selected_dir_reader = Win32ReadDir()
1288
name0u = u'0file-\xb6'
1289
name1u = u'1dir-\u062c\u0648'
1290
name2u = u'2file-\u0633'
1294
name1u + '/' + name0u,
1295
name1u + '/' + name1u + '/',
1298
self.build_tree(tree)
1299
name0 = name0u.encode('utf8')
1300
name1 = name1u.encode('utf8')
1301
name2 = name2u.encode('utf8')
1303
# All of the abspaths should be in unicode, all of the relative paths
1305
expected_dirblocks = [
1307
[(name0, name0, 'file', './' + name0u),
1308
(name1, name1, 'directory', './' + name1u),
1309
(name2, name2, 'file', './' + name2u),
1312
((name1, './' + name1u),
1313
[(name1 + '/' + name0, name0, 'file', './' + name1u
1315
(name1 + '/' + name1, name1, 'directory', './' + name1u
1319
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1324
result = list(osutils._walkdirs_utf8(u'.'))
1325
self._filter_out_stat(result)
1326
self.assertEqual(expected_dirblocks, result)
1328
def assertStatIsCorrect(self, path, win32stat):
1329
os_stat = os.stat(path)
1330
self.assertEqual(os_stat.st_size, win32stat.st_size)
1331
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1332
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1333
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1334
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1335
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1336
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1338
def test__walkdirs_utf_win32_find_file_stat_file(self):
1339
"""make sure our Stat values are valid"""
1340
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1341
self.requireFeature(tests.UnicodeFilenameFeature)
1342
from bzrlib._walkdirs_win32 import Win32ReadDir
1343
name0u = u'0file-\xb6'
1344
name0 = name0u.encode('utf8')
1345
self.build_tree([name0u])
1346
# I hate to sleep() here, but I'm trying to make the ctime different
1349
f = open(name0u, 'ab')
1351
f.write('just a small update')
1355
result = Win32ReadDir().read_dir('', u'.')
1357
self.assertEqual((name0, name0, 'file'), entry[:3])
1358
self.assertEqual(u'./' + name0u, entry[4])
1359
self.assertStatIsCorrect(entry[4], entry[3])
1360
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1362
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1363
"""make sure our Stat values are valid"""
1364
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1365
self.requireFeature(tests.UnicodeFilenameFeature)
1366
from bzrlib._walkdirs_win32 import Win32ReadDir
1367
name0u = u'0dir-\u062c\u0648'
1368
name0 = name0u.encode('utf8')
1369
self.build_tree([name0u + '/'])
1371
result = Win32ReadDir().read_dir('', u'.')
1373
self.assertEqual((name0, name0, 'directory'), entry[:3])
1374
self.assertEqual(u'./' + name0u, entry[4])
1375
self.assertStatIsCorrect(entry[4], entry[3])
1377
def assertPathCompare(self, path_less, path_greater):
1378
"""check that path_less and path_greater compare correctly."""
1379
self.assertEqual(0, osutils.compare_paths_prefix_order(
1380
path_less, path_less))
1381
self.assertEqual(0, osutils.compare_paths_prefix_order(
1382
path_greater, path_greater))
1383
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1384
path_less, path_greater))
1385
self.assertEqual(1, osutils.compare_paths_prefix_order(
1386
path_greater, path_less))
1388
def test_compare_paths_prefix_order(self):
1389
# root before all else
1390
self.assertPathCompare("/", "/a")
1391
# alpha within a dir
1392
self.assertPathCompare("/a", "/b")
1393
self.assertPathCompare("/b", "/z")
1394
# high dirs before lower.
1395
self.assertPathCompare("/z", "/a/a")
1396
# except if the deeper dir should be output first
1397
self.assertPathCompare("/a/b/c", "/d/g")
1398
# lexical betwen dirs of the same height
1399
self.assertPathCompare("/a/z", "/z/z")
1400
self.assertPathCompare("/a/c/z", "/a/d/e")
1402
# this should also be consistent for no leading / paths
1403
# root before all else
1404
self.assertPathCompare("", "a")
1405
# alpha within a dir
1406
self.assertPathCompare("a", "b")
1407
self.assertPathCompare("b", "z")
1408
# high dirs before lower.
1409
self.assertPathCompare("z", "a/a")
1410
# except if the deeper dir should be output first
1411
self.assertPathCompare("a/b/c", "d/g")
1412
# lexical betwen dirs of the same height
1413
self.assertPathCompare("a/z", "z/z")
1414
self.assertPathCompare("a/c/z", "a/d/e")
1416
def test_path_prefix_sorting(self):
1417
"""Doing a sort on path prefix should match our sample data."""
1432
dir_sorted_paths = [
1448
sorted(original_paths, key=osutils.path_prefix_key))
1449
# using the comparison routine shoudl work too:
1452
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1455
class TestCopyTree(tests.TestCaseInTempDir):
1457
def test_copy_basic_tree(self):
1458
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1459
osutils.copy_tree('source', 'target')
1460
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1461
self.assertEqual(['c'], os.listdir('target/b'))
1463
def test_copy_tree_target_exists(self):
1464
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1466
osutils.copy_tree('source', 'target')
1467
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1468
self.assertEqual(['c'], os.listdir('target/b'))
1470
def test_copy_tree_symlinks(self):
1471
self.requireFeature(tests.SymlinkFeature)
1472
self.build_tree(['source/'])
1473
os.symlink('a/generic/path', 'source/lnk')
1474
osutils.copy_tree('source', 'target')
1475
self.assertEqual(['lnk'], os.listdir('target'))
1476
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1478
def test_copy_tree_handlers(self):
1479
processed_files = []
1480
processed_links = []
1481
def file_handler(from_path, to_path):
1482
processed_files.append(('f', from_path, to_path))
1483
def dir_handler(from_path, to_path):
1484
processed_files.append(('d', from_path, to_path))
1485
def link_handler(from_path, to_path):
1486
processed_links.append((from_path, to_path))
1487
handlers = {'file':file_handler,
1488
'directory':dir_handler,
1489
'symlink':link_handler,
1492
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1493
if osutils.has_symlinks():
1494
os.symlink('a/generic/path', 'source/lnk')
1495
osutils.copy_tree('source', 'target', handlers=handlers)
1497
self.assertEqual([('d', 'source', 'target'),
1498
('f', 'source/a', 'target/a'),
1499
('d', 'source/b', 'target/b'),
1500
('f', 'source/b/c', 'target/b/c'),
1502
self.failIfExists('target')
1503
if osutils.has_symlinks():
1504
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1507
class TestSetUnsetEnv(tests.TestCase):
1508
"""Test updating the environment"""
1511
super(TestSetUnsetEnv, self).setUp()
1513
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1514
'Environment was not cleaned up properly.'
1515
' Variable BZR_TEST_ENV_VAR should not exist.')
1517
if 'BZR_TEST_ENV_VAR' in os.environ:
1518
del os.environ['BZR_TEST_ENV_VAR']
1520
self.addCleanup(cleanup)
1523
"""Test that we can set an env variable"""
1524
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1525
self.assertEqual(None, old)
1526
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1528
def test_double_set(self):
1529
"""Test that we get the old value out"""
1530
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1531
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1532
self.assertEqual('foo', old)
1533
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1535
def test_unicode(self):
1536
"""Environment can only contain plain strings
1538
So Unicode strings must be encoded.
1540
uni_val, env_val = tests.probe_unicode_in_user_encoding()
1542
raise tests.TestSkipped(
1543
'Cannot find a unicode character that works in encoding %s'
1544
% (osutils.get_user_encoding(),))
1546
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1547
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1549
def test_unset(self):
1550
"""Test that passing None will remove the env var"""
1551
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1552
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1553
self.assertEqual('foo', old)
1554
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1555
self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1558
class TestSizeShaFile(tests.TestCaseInTempDir):
1560
def test_sha_empty(self):
1561
self.build_tree_contents([('foo', '')])
1562
expected_sha = osutils.sha_string('')
1564
self.addCleanup(f.close)
1565
size, sha = osutils.size_sha_file(f)
1566
self.assertEqual(0, size)
1567
self.assertEqual(expected_sha, sha)
1569
def test_sha_mixed_endings(self):
1570
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1571
self.build_tree_contents([('foo', text)])
1572
expected_sha = osutils.sha_string(text)
1574
self.addCleanup(f.close)
1575
size, sha = osutils.size_sha_file(f)
1576
self.assertEqual(38, size)
1577
self.assertEqual(expected_sha, sha)
1580
class TestShaFileByName(tests.TestCaseInTempDir):
1582
def test_sha_empty(self):
1583
self.build_tree_contents([('foo', '')])
1584
expected_sha = osutils.sha_string('')
1585
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1587
def test_sha_mixed_endings(self):
1588
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1589
self.build_tree_contents([('foo', text)])
1590
expected_sha = osutils.sha_string(text)
1591
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1594
class TestResourceLoading(tests.TestCaseInTempDir):
1596
def test_resource_string(self):
1597
# test resource in bzrlib
1598
text = osutils.resource_string('bzrlib', 'debug.py')
1599
self.assertContainsRe(text, "debug_flags = set()")
1600
# test resource under bzrlib
1601
text = osutils.resource_string('bzrlib.ui', 'text.py')
1602
self.assertContainsRe(text, "class TextUIFactory")
1603
# test unsupported package
1604
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1606
# test unknown resource
1607
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1610
class TestReCompile(tests.TestCase):
1612
def test_re_compile_checked(self):
1613
r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1614
self.assertTrue(r.match('aaaa'))
1615
self.assertTrue(r.match('aAaA'))
1617
def test_re_compile_checked_error(self):
1618
# like https://bugs.launchpad.net/bzr/+bug/251352
1619
err = self.assertRaises(
1620
errors.BzrCommandError,
1621
osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1623
"Invalid regular expression in test case: '*': "
1624
"nothing to repeat",
1628
class TestDirReader(tests.TestCaseInTempDir):
1631
_dir_reader_class = None
1632
_native_to_unicode = None
1635
tests.TestCaseInTempDir.setUp(self)
1637
# Save platform specific info and reset it
1638
cur_dir_reader = osutils._selected_dir_reader
1641
osutils._selected_dir_reader = cur_dir_reader
1642
self.addCleanup(restore)
1644
osutils._selected_dir_reader = self._dir_reader_class()
1646
def _get_ascii_tree(self):
1654
expected_dirblocks = [
1656
[('0file', '0file', 'file'),
1657
('1dir', '1dir', 'directory'),
1658
('2file', '2file', 'file'),
1661
(('1dir', './1dir'),
1662
[('1dir/0file', '0file', 'file'),
1663
('1dir/1dir', '1dir', 'directory'),
1666
(('1dir/1dir', './1dir/1dir'),
1671
return tree, expected_dirblocks
1673
def test_walk_cur_dir(self):
1674
tree, expected_dirblocks = self._get_ascii_tree()
1675
self.build_tree(tree)
1676
result = list(osutils._walkdirs_utf8('.'))
1677
# Filter out stat and abspath
1678
self.assertEqual(expected_dirblocks,
1679
[(dirinfo, [line[0:3] for line in block])
1680
for dirinfo, block in result])
1682
def test_walk_sub_dir(self):
1683
tree, expected_dirblocks = self._get_ascii_tree()
1684
self.build_tree(tree)
1685
# you can search a subdir only, with a supplied prefix.
1686
result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1687
# Filter out stat and abspath
1688
self.assertEqual(expected_dirblocks[1:],
1689
[(dirinfo, [line[0:3] for line in block])
1690
for dirinfo, block in result])
1692
def _get_unicode_tree(self):
1693
name0u = u'0file-\xb6'
1694
name1u = u'1dir-\u062c\u0648'
1695
name2u = u'2file-\u0633'
1699
name1u + '/' + name0u,
1700
name1u + '/' + name1u + '/',
1703
name0 = name0u.encode('UTF-8')
1704
name1 = name1u.encode('UTF-8')
1705
name2 = name2u.encode('UTF-8')
1706
expected_dirblocks = [
1708
[(name0, name0, 'file', './' + name0u),
1709
(name1, name1, 'directory', './' + name1u),
1710
(name2, name2, 'file', './' + name2u),
1713
((name1, './' + name1u),
1714
[(name1 + '/' + name0, name0, 'file', './' + name1u
1716
(name1 + '/' + name1, name1, 'directory', './' + name1u
1720
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1725
return tree, expected_dirblocks
1727
def _filter_out(self, raw_dirblocks):
1728
"""Filter out a walkdirs_utf8 result.
1730
stat field is removed, all native paths are converted to unicode
1732
filtered_dirblocks = []
1733
for dirinfo, block in raw_dirblocks:
1734
dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1737
details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1738
filtered_dirblocks.append((dirinfo, details))
1739
return filtered_dirblocks
1741
def test_walk_unicode_tree(self):
1742
self.requireFeature(tests.UnicodeFilenameFeature)
1743
tree, expected_dirblocks = self._get_unicode_tree()
1744
self.build_tree(tree)
1745
result = list(osutils._walkdirs_utf8('.'))
1746
self.assertEqual(expected_dirblocks, self._filter_out(result))
1748
def test_symlink(self):
1749
self.requireFeature(tests.SymlinkFeature)
1750
self.requireFeature(tests.UnicodeFilenameFeature)
1751
target = u'target\N{Euro Sign}'
1752
link_name = u'l\N{Euro Sign}nk'
1753
os.symlink(target, link_name)
1754
target_utf8 = target.encode('UTF-8')
1755
link_name_utf8 = link_name.encode('UTF-8')
1756
expected_dirblocks = [
1758
[(link_name_utf8, link_name_utf8,
1759
'symlink', './' + link_name),],
1761
result = list(osutils._walkdirs_utf8('.'))
1762
self.assertEqual(expected_dirblocks, self._filter_out(result))
1765
class TestReadLink(tests.TestCaseInTempDir):
1766
"""Exposes os.readlink() problems and the osutils solution.
1768
The only guarantee offered by os.readlink(), starting with 2.6, is that a
1769
unicode string will be returned if a unicode string is passed.
1771
But prior python versions failed to properly encode the passed unicode
1774
_test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1777
super(tests.TestCaseInTempDir, self).setUp()
1778
self.link = u'l\N{Euro Sign}ink'
1779
self.target = u'targe\N{Euro Sign}t'
1780
os.symlink(self.target, self.link)
1782
def test_os_readlink_link_encoding(self):
1783
if sys.version_info < (2, 6):
1784
self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1786
self.assertEquals(self.target, os.readlink(self.link))
1788
def test_os_readlink_link_decoding(self):
1789
self.assertEquals(self.target.encode(osutils._fs_enc),
1790
os.readlink(self.link.encode(osutils._fs_enc)))
1793
class TestConcurrency(tests.TestCase):
1795
def test_local_concurrency(self):
1796
concurrency = osutils.local_concurrency()
1797
self.assertIsInstance(concurrency, int)
1800
class TestFailedToLoadExtension(tests.TestCase):
1802
def _try_loading(self):
1804
import bzrlib._fictional_extension_py
1805
except ImportError, e:
1806
osutils.failed_to_load_extension(e)
1810
super(TestFailedToLoadExtension, self).setUp()
1811
self.saved_failures = osutils._extension_load_failures[:]
1812
del osutils._extension_load_failures[:]
1813
self.addCleanup(self.restore_failures)
1815
def restore_failures(self):
1816
osutils._extension_load_failures = self.saved_failures
1818
def test_failure_to_load(self):
1820
self.assertLength(1, osutils._extension_load_failures)
1821
self.assertEquals(osutils._extension_load_failures[0],
1822
"No module named _fictional_extension_py")
1824
def test_report_extension_load_failures_no_warning(self):
1825
self.assertTrue(self._try_loading())
1826
warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1827
# it used to give a Python warning; it no longer does
1828
self.assertLength(0, warnings)
1830
def test_report_extension_load_failures_message(self):
1832
trace.push_log_file(log)
1833
self.assertTrue(self._try_loading())
1834
osutils.report_extension_load_failures()
1835
self.assertContainsRe(
1837
r"bzr: warning: some compiled extensions could not be loaded; "
1838
"see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"