1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the osutils wrapper."""
19
from cStringIO import StringIO
35
from bzrlib.tests import (
41
class _UTF8DirReaderFeature(tests.Feature):
45
from bzrlib import _readdir_pyx
46
self.reader = _readdir_pyx.UTF8DirReader
51
def feature_name(self):
52
return 'bzrlib._readdir_pyx'
54
UTF8DirReaderFeature = _UTF8DirReaderFeature()
57
def _already_unicode(s):
61
def _fs_enc_to_unicode(s):
62
return s.decode(osutils._fs_enc)
65
def _utf8_to_unicode(s):
66
return s.decode('UTF-8')
69
def dir_reader_scenarios():
70
# For each dir reader we define:
72
# - native_to_unicode: a function converting the native_abspath as returned
73
# by DirReader.read_dir to its unicode representation
75
# UnicodeDirReader is the fallback, it should be tested on all platforms.
76
scenarios = [('unicode',
77
dict(_dir_reader_class=osutils.UnicodeDirReader,
78
_native_to_unicode=_already_unicode))]
79
# Some DirReaders are platform specific and even there they may not be
81
if UTF8DirReaderFeature.available():
82
from bzrlib import _readdir_pyx
83
scenarios.append(('utf8',
84
dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
85
_native_to_unicode=_utf8_to_unicode)))
87
if test__walkdirs_win32.Win32ReadDirFeature.available():
89
from bzrlib import _walkdirs_win32
90
# TODO: check on windows, it may be that we need to use/add
91
# safe_unicode instead of _fs_enc_to_unicode
94
dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
95
_native_to_unicode=_fs_enc_to_unicode)))
101
def load_tests(basic_tests, module, loader):
102
suite = loader.suiteClass()
103
dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
104
basic_tests, tests.condition_isinstance(TestDirReader))
105
tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
106
suite.addTest(remaining_tests)
110
class TestContainsWhitespace(tests.TestCase):
112
def test_contains_whitespace(self):
113
self.failUnless(osutils.contains_whitespace(u' '))
114
self.failUnless(osutils.contains_whitespace(u'hello there'))
115
self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
116
self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
117
self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
118
self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
120
# \xa0 is "Non-breaking-space" which on some python locales thinks it
121
# is whitespace, but we do not.
122
self.failIf(osutils.contains_whitespace(u''))
123
self.failIf(osutils.contains_whitespace(u'hellothere'))
124
self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
127
class TestRename(tests.TestCaseInTempDir):
129
def create_file(self, filename, content):
130
f = open(filename, 'wb')
136
def _fancy_rename(self, a, b):
137
osutils.fancy_rename(a, b, rename_func=os.rename,
138
unlink_func=os.unlink)
140
def test_fancy_rename(self):
141
# This should work everywhere
142
self.create_file('a', 'something in a\n')
143
self._fancy_rename('a', 'b')
144
self.failIfExists('a')
145
self.failUnlessExists('b')
146
self.check_file_contents('b', 'something in a\n')
148
self.create_file('a', 'new something in a\n')
149
self._fancy_rename('b', 'a')
151
self.check_file_contents('a', 'something in a\n')
153
def test_fancy_rename_fails_source_missing(self):
154
# An exception should be raised, and the target should be left in place
155
self.create_file('target', 'data in target\n')
156
self.assertRaises((IOError, OSError), self._fancy_rename,
157
'missingsource', 'target')
158
self.failUnlessExists('target')
159
self.check_file_contents('target', 'data in target\n')
161
def test_fancy_rename_fails_if_source_and_target_missing(self):
162
self.assertRaises((IOError, OSError), self._fancy_rename,
163
'missingsource', 'missingtarget')
165
def test_rename(self):
166
# Rename should be semi-atomic on all platforms
167
self.create_file('a', 'something in a\n')
168
osutils.rename('a', 'b')
169
self.failIfExists('a')
170
self.failUnlessExists('b')
171
self.check_file_contents('b', 'something in a\n')
173
self.create_file('a', 'new something in a\n')
174
osutils.rename('b', 'a')
176
self.check_file_contents('a', 'something in a\n')
178
# TODO: test fancy_rename using a MemoryTransport
180
def test_rename_change_case(self):
181
# on Windows we should be able to change filename case by rename
182
self.build_tree(['a', 'b/'])
183
osutils.rename('a', 'A')
184
osutils.rename('b', 'B')
185
# we can't use failUnlessExists on case-insensitive filesystem
186
# so try to check shape of the tree
187
shape = sorted(os.listdir('.'))
188
self.assertEquals(['A', 'B'], shape)
191
class TestRandChars(tests.TestCase):
193
def test_01_rand_chars_empty(self):
194
result = osutils.rand_chars(0)
195
self.assertEqual(result, '')
197
def test_02_rand_chars_100(self):
198
result = osutils.rand_chars(100)
199
self.assertEqual(len(result), 100)
200
self.assertEqual(type(result), str)
201
self.assertContainsRe(result, r'^[a-z0-9]{100}$')
204
class TestIsInside(tests.TestCase):
206
def test_is_inside(self):
207
is_inside = osutils.is_inside
208
self.assertTrue(is_inside('src', 'src/foo.c'))
209
self.assertFalse(is_inside('src', 'srccontrol'))
210
self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
211
self.assertTrue(is_inside('foo.c', 'foo.c'))
212
self.assertFalse(is_inside('foo.c', ''))
213
self.assertTrue(is_inside('', 'foo.c'))
215
def test_is_inside_any(self):
216
SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
217
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
218
(['src'], SRC_FOO_C),
221
self.assert_(osutils.is_inside_any(dirs, fn))
222
for dirs, fn in [(['src'], 'srccontrol'),
223
(['src'], 'srccontrol/foo')]:
224
self.assertFalse(osutils.is_inside_any(dirs, fn))
226
def test_is_inside_or_parent_of_any(self):
227
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
228
(['src'], 'src/foo.c'),
229
(['src/bar.c'], 'src'),
230
(['src/bar.c', 'bla/foo.c'], 'src'),
233
self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
235
for dirs, fn in [(['src'], 'srccontrol'),
236
(['srccontrol/foo.c'], 'src'),
237
(['src'], 'srccontrol/foo')]:
238
self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
241
class TestRmTree(tests.TestCaseInTempDir):
243
def test_rmtree(self):
244
# Check to remove tree with read-only files/dirs
246
f = file('dir/file', 'w')
249
# would like to also try making the directory readonly, but at the
250
# moment python shutil.rmtree doesn't handle that properly - it would
251
# need to chmod the directory before removing things inside it - deferred
252
# for now -- mbp 20060505
253
# osutils.make_readonly('dir')
254
osutils.make_readonly('dir/file')
256
osutils.rmtree('dir')
258
self.failIfExists('dir/file')
259
self.failIfExists('dir')
262
class TestDeleteAny(tests.TestCaseInTempDir):
264
def test_delete_any_readonly(self):
265
# from <https://bugs.launchpad.net/bzr/+bug/218206>
266
self.build_tree(['d/', 'f'])
267
osutils.make_readonly('d')
268
osutils.make_readonly('f')
270
osutils.delete_any('f')
271
osutils.delete_any('d')
274
class TestKind(tests.TestCaseInTempDir):
276
def test_file_kind(self):
277
self.build_tree(['file', 'dir/'])
278
self.assertEquals('file', osutils.file_kind('file'))
279
self.assertEquals('directory', osutils.file_kind('dir/'))
280
if osutils.has_symlinks():
281
os.symlink('symlink', 'symlink')
282
self.assertEquals('symlink', osutils.file_kind('symlink'))
284
# TODO: jam 20060529 Test a block device
286
os.lstat('/dev/null')
288
if e.errno not in (errno.ENOENT,):
291
self.assertEquals('chardev', osutils.file_kind('/dev/null'))
293
mkfifo = getattr(os, 'mkfifo', None)
297
self.assertEquals('fifo', osutils.file_kind('fifo'))
301
AF_UNIX = getattr(socket, 'AF_UNIX', None)
303
s = socket.socket(AF_UNIX)
306
self.assertEquals('socket', osutils.file_kind('socket'))
310
def test_kind_marker(self):
311
self.assertEqual("", osutils.kind_marker("file"))
312
self.assertEqual("/", osutils.kind_marker('directory'))
313
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
314
self.assertEqual("@", osutils.kind_marker("symlink"))
315
self.assertEqual("+", osutils.kind_marker("tree-reference"))
316
self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
319
class TestUmask(tests.TestCaseInTempDir):
321
def test_get_umask(self):
322
if sys.platform == 'win32':
323
# umask always returns '0', no way to set it
324
self.assertEqual(0, osutils.get_umask())
327
orig_umask = osutils.get_umask()
328
self.addCleanup(os.umask, orig_umask)
330
self.assertEqual(0222, osutils.get_umask())
332
self.assertEqual(0022, osutils.get_umask())
334
self.assertEqual(0002, osutils.get_umask())
336
self.assertEqual(0027, osutils.get_umask())
339
class TestDateTime(tests.TestCase):
341
def assertFormatedDelta(self, expected, seconds):
342
"""Assert osutils.format_delta formats as expected"""
343
actual = osutils.format_delta(seconds)
344
self.assertEqual(expected, actual)
346
def test_format_delta(self):
347
self.assertFormatedDelta('0 seconds ago', 0)
348
self.assertFormatedDelta('1 second ago', 1)
349
self.assertFormatedDelta('10 seconds ago', 10)
350
self.assertFormatedDelta('59 seconds ago', 59)
351
self.assertFormatedDelta('89 seconds ago', 89)
352
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
353
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
354
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
355
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
356
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
357
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
358
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
359
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
360
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
361
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
362
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
363
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
364
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
365
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
366
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
367
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
368
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
369
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
371
# We handle when time steps the wrong direction because computers
372
# don't have synchronized clocks.
373
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
374
self.assertFormatedDelta('1 second in the future', -1)
375
self.assertFormatedDelta('2 seconds in the future', -2)
377
def test_format_date(self):
378
self.assertRaises(errors.UnsupportedTimezoneFormat,
379
osutils.format_date, 0, timezone='foo')
380
self.assertIsInstance(osutils.format_date(0), str)
381
self.assertIsInstance(osutils.format_local_date(0), unicode)
382
# Testing for the actual value of the local weekday without
383
# duplicating the code from format_date is difficult.
384
# Instead blackbox.test_locale should check for localized
385
# dates once they do occur in output strings.
387
def test_local_time_offset(self):
388
"""Test that local_time_offset() returns a sane value."""
389
offset = osutils.local_time_offset()
390
self.assertTrue(isinstance(offset, int))
391
# Test that the offset is no more than a eighteen hours in
393
# Time zone handling is system specific, so it is difficult to
394
# do more specific tests, but a value outside of this range is
396
eighteen_hours = 18 * 3600
397
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
399
def test_local_time_offset_with_timestamp(self):
400
"""Test that local_time_offset() works with a timestamp."""
401
offset = osutils.local_time_offset(1000000000.1234567)
402
self.assertTrue(isinstance(offset, int))
403
eighteen_hours = 18 * 3600
404
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
407
class TestLinks(tests.TestCaseInTempDir):
409
def test_dereference_path(self):
410
self.requireFeature(tests.SymlinkFeature)
411
cwd = osutils.realpath('.')
413
bar_path = osutils.pathjoin(cwd, 'bar')
414
# Using './' to avoid bug #1213894 (first path component not
415
# dereferenced) in Python 2.4.1 and earlier
416
self.assertEqual(bar_path, osutils.realpath('./bar'))
417
os.symlink('bar', 'foo')
418
self.assertEqual(bar_path, osutils.realpath('./foo'))
420
# Does not dereference terminal symlinks
421
foo_path = osutils.pathjoin(cwd, 'foo')
422
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
424
# Dereferences parent symlinks
426
baz_path = osutils.pathjoin(bar_path, 'baz')
427
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
429
# Dereferences parent symlinks that are the first path element
430
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
432
# Dereferences parent symlinks in absolute paths
433
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
434
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
436
def test_changing_access(self):
437
f = file('file', 'w')
441
# Make a file readonly
442
osutils.make_readonly('file')
443
mode = os.lstat('file').st_mode
444
self.assertEqual(mode, mode & 0777555)
446
# Make a file writable
447
osutils.make_writable('file')
448
mode = os.lstat('file').st_mode
449
self.assertEqual(mode, mode | 0200)
451
if osutils.has_symlinks():
452
# should not error when handed a symlink
453
os.symlink('nonexistent', 'dangling')
454
osutils.make_readonly('dangling')
455
osutils.make_writable('dangling')
457
def test_host_os_dereferences_symlinks(self):
458
osutils.host_os_dereferences_symlinks()
461
class TestCanonicalRelPath(tests.TestCaseInTempDir):
463
_test_needs_features = [tests.CaseInsCasePresFilenameFeature]
465
def test_canonical_relpath_simple(self):
466
f = file('MixedCaseName', 'w')
468
actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
469
self.failUnlessEqual('work/MixedCaseName', actual)
471
def test_canonical_relpath_missing_tail(self):
472
os.mkdir('MixedCaseParent')
473
actual = osutils.canonical_relpath(self.test_base_dir,
474
'mixedcaseparent/nochild')
475
self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
478
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
480
def assertRelpath(self, expected, base, path):
481
actual = osutils._cicp_canonical_relpath(base, path)
482
self.assertEqual(expected, actual)
484
def test_simple(self):
485
self.build_tree(['MixedCaseName'])
486
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
487
self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
489
def test_subdir_missing_tail(self):
490
self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
491
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
492
self.assertRelpath('MixedCaseParent/a_child', base,
493
'MixedCaseParent/a_child')
494
self.assertRelpath('MixedCaseParent/a_child', base,
495
'MixedCaseParent/A_Child')
496
self.assertRelpath('MixedCaseParent/not_child', base,
497
'MixedCaseParent/not_child')
499
def test_at_root_slash(self):
500
# We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
502
if osutils.MIN_ABS_PATHLENGTH > 1:
503
raise tests.TestSkipped('relpath requires %d chars'
504
% osutils.MIN_ABS_PATHLENGTH)
505
self.assertRelpath('foo', '/', '/foo')
507
def test_at_root_drive(self):
508
if sys.platform != 'win32':
509
raise tests.TestNotApplicable('we can only test drive-letter relative'
510
' paths on Windows where we have drive'
513
# The specific issue is that when at the root of a drive, 'abspath'
514
# returns "C:/" or just "/". However, the code assumes that abspath
515
# always returns something like "C:/foo" or "/foo" (no trailing slash).
516
self.assertRelpath('foo', 'C:/', 'C:/foo')
517
self.assertRelpath('foo', 'X:/', 'X:/foo')
518
self.assertRelpath('foo', 'X:/', 'X://foo')
521
class TestPumpFile(tests.TestCase):
522
"""Test pumpfile method."""
525
tests.TestCase.setUp(self)
526
# create a test datablock
527
self.block_size = 512
528
pattern = '0123456789ABCDEF'
529
self.test_data = pattern * (3 * self.block_size / len(pattern))
530
self.test_data_len = len(self.test_data)
532
def test_bracket_block_size(self):
533
"""Read data in blocks with the requested read size bracketing the
535
# make sure test data is larger than max read size
536
self.assertTrue(self.test_data_len > self.block_size)
538
from_file = file_utils.FakeReadFile(self.test_data)
541
# read (max / 2) bytes and verify read size wasn't affected
542
num_bytes_to_read = self.block_size / 2
543
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
544
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
545
self.assertEqual(from_file.get_read_count(), 1)
547
# read (max) bytes and verify read size wasn't affected
548
num_bytes_to_read = self.block_size
549
from_file.reset_read_count()
550
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
551
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
552
self.assertEqual(from_file.get_read_count(), 1)
554
# read (max + 1) bytes and verify read size was limited
555
num_bytes_to_read = self.block_size + 1
556
from_file.reset_read_count()
557
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
558
self.assertEqual(from_file.get_max_read_size(), self.block_size)
559
self.assertEqual(from_file.get_read_count(), 2)
561
# finish reading the rest of the data
562
num_bytes_to_read = self.test_data_len - to_file.tell()
563
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
565
# report error if the data wasn't equal (we only report the size due
566
# to the length of the data)
567
response_data = to_file.getvalue()
568
if response_data != self.test_data:
569
message = "Data not equal. Expected %d bytes, received %d."
570
self.fail(message % (len(response_data), self.test_data_len))
572
def test_specified_size(self):
573
"""Request a transfer larger than the maximum block size and verify
574
that the maximum read doesn't exceed the block_size."""
575
# make sure test data is larger than max read size
576
self.assertTrue(self.test_data_len > self.block_size)
578
# retrieve data in blocks
579
from_file = file_utils.FakeReadFile(self.test_data)
581
osutils.pumpfile(from_file, to_file, self.test_data_len,
584
# verify read size was equal to the maximum read size
585
self.assertTrue(from_file.get_max_read_size() > 0)
586
self.assertEqual(from_file.get_max_read_size(), self.block_size)
587
self.assertEqual(from_file.get_read_count(), 3)
589
# report error if the data wasn't equal (we only report the size due
590
# to the length of the data)
591
response_data = to_file.getvalue()
592
if response_data != self.test_data:
593
message = "Data not equal. Expected %d bytes, received %d."
594
self.fail(message % (len(response_data), self.test_data_len))
596
def test_to_eof(self):
597
"""Read to end-of-file and verify that the reads are not larger than
598
the maximum read size."""
599
# make sure test data is larger than max read size
600
self.assertTrue(self.test_data_len > self.block_size)
602
# retrieve data to EOF
603
from_file = file_utils.FakeReadFile(self.test_data)
605
osutils.pumpfile(from_file, to_file, -1, self.block_size)
607
# verify read size was equal to the maximum read size
608
self.assertEqual(from_file.get_max_read_size(), self.block_size)
609
self.assertEqual(from_file.get_read_count(), 4)
611
# report error if the data wasn't equal (we only report the size due
612
# to the length of the data)
613
response_data = to_file.getvalue()
614
if response_data != self.test_data:
615
message = "Data not equal. Expected %d bytes, received %d."
616
self.fail(message % (len(response_data), self.test_data_len))
618
def test_defaults(self):
619
"""Verifies that the default arguments will read to EOF -- this
620
test verifies that any existing usages of pumpfile will not be broken
621
with this new version."""
622
# retrieve data using default (old) pumpfile method
623
from_file = file_utils.FakeReadFile(self.test_data)
625
osutils.pumpfile(from_file, to_file)
627
# report error if the data wasn't equal (we only report the size due
628
# to the length of the data)
629
response_data = to_file.getvalue()
630
if response_data != self.test_data:
631
message = "Data not equal. Expected %d bytes, received %d."
632
self.fail(message % (len(response_data), self.test_data_len))
634
def test_report_activity(self):
636
def log_activity(length, direction):
637
activity.append((length, direction))
638
from_file = StringIO(self.test_data)
640
osutils.pumpfile(from_file, to_file, buff_size=500,
641
report_activity=log_activity, direction='read')
642
self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
643
(36, 'read')], activity)
645
from_file = StringIO(self.test_data)
648
osutils.pumpfile(from_file, to_file, buff_size=500,
649
report_activity=log_activity, direction='write')
650
self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
651
(36, 'write')], activity)
653
# And with a limited amount of data
654
from_file = StringIO(self.test_data)
657
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
658
report_activity=log_activity, direction='read')
659
self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
663
class TestPumpStringFile(tests.TestCase):
665
def test_empty(self):
667
osutils.pump_string_file("", output)
668
self.assertEqual("", output.getvalue())
670
def test_more_than_segment_size(self):
672
osutils.pump_string_file("123456789", output, 2)
673
self.assertEqual("123456789", output.getvalue())
675
def test_segment_size(self):
677
osutils.pump_string_file("12", output, 2)
678
self.assertEqual("12", output.getvalue())
680
def test_segment_size_multiple(self):
682
osutils.pump_string_file("1234", output, 2)
683
self.assertEqual("1234", output.getvalue())
686
class TestRelpath(tests.TestCase):
688
def test_simple_relpath(self):
689
cwd = osutils.getcwd()
690
subdir = cwd + '/subdir'
691
self.assertEqual('subdir', osutils.relpath(cwd, subdir))
693
def test_deep_relpath(self):
694
cwd = osutils.getcwd()
695
subdir = cwd + '/sub/subsubdir'
696
self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
698
def test_not_relative(self):
699
self.assertRaises(errors.PathNotChild,
700
osutils.relpath, 'C:/path', 'H:/path')
701
self.assertRaises(errors.PathNotChild,
702
osutils.relpath, 'C:/', 'H:/path')
705
class TestSafeUnicode(tests.TestCase):
707
def test_from_ascii_string(self):
708
self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
710
def test_from_unicode_string_ascii_contents(self):
711
self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
713
def test_from_unicode_string_unicode_contents(self):
714
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
716
def test_from_utf8_string(self):
717
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
719
def test_bad_utf8_string(self):
720
self.assertRaises(errors.BzrBadParameterNotUnicode,
721
osutils.safe_unicode,
725
class TestSafeUtf8(tests.TestCase):
727
def test_from_ascii_string(self):
729
self.assertEqual('foobar', osutils.safe_utf8(f))
731
def test_from_unicode_string_ascii_contents(self):
732
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
734
def test_from_unicode_string_unicode_contents(self):
735
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
737
def test_from_utf8_string(self):
738
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
740
def test_bad_utf8_string(self):
741
self.assertRaises(errors.BzrBadParameterNotUnicode,
742
osutils.safe_utf8, '\xbb\xbb')
745
class TestSafeRevisionId(tests.TestCase):
747
def test_from_ascii_string(self):
748
# this shouldn't give a warning because it's getting an ascii string
749
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
751
def test_from_unicode_string_ascii_contents(self):
752
self.assertEqual('bargam',
753
osutils.safe_revision_id(u'bargam', warn=False))
755
def test_from_unicode_deprecated(self):
756
self.assertEqual('bargam',
757
self.callDeprecated([osutils._revision_id_warning],
758
osutils.safe_revision_id, u'bargam'))
760
def test_from_unicode_string_unicode_contents(self):
761
self.assertEqual('bargam\xc2\xae',
762
osutils.safe_revision_id(u'bargam\xae', warn=False))
764
def test_from_utf8_string(self):
765
self.assertEqual('foo\xc2\xae',
766
osutils.safe_revision_id('foo\xc2\xae'))
769
"""Currently, None is a valid revision_id"""
770
self.assertEqual(None, osutils.safe_revision_id(None))
773
class TestSafeFileId(tests.TestCase):
775
def test_from_ascii_string(self):
776
self.assertEqual('foobar', osutils.safe_file_id('foobar'))
778
def test_from_unicode_string_ascii_contents(self):
779
self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
781
def test_from_unicode_deprecated(self):
782
self.assertEqual('bargam',
783
self.callDeprecated([osutils._file_id_warning],
784
osutils.safe_file_id, u'bargam'))
786
def test_from_unicode_string_unicode_contents(self):
787
self.assertEqual('bargam\xc2\xae',
788
osutils.safe_file_id(u'bargam\xae', warn=False))
790
def test_from_utf8_string(self):
791
self.assertEqual('foo\xc2\xae',
792
osutils.safe_file_id('foo\xc2\xae'))
795
"""Currently, None is a valid revision_id"""
796
self.assertEqual(None, osutils.safe_file_id(None))
799
class TestWin32Funcs(tests.TestCase):
800
"""Test that _win32 versions of os utilities return appropriate paths."""
802
def test_abspath(self):
803
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
804
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
805
self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
806
self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
808
def test_realpath(self):
809
self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
810
self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
812
def test_pathjoin(self):
813
self.assertEqual('path/to/foo',
814
osutils._win32_pathjoin('path', 'to', 'foo'))
815
self.assertEqual('C:/foo',
816
osutils._win32_pathjoin('path\\to', 'C:\\foo'))
817
self.assertEqual('C:/foo',
818
osutils._win32_pathjoin('path/to', 'C:/foo'))
819
self.assertEqual('path/to/foo',
820
osutils._win32_pathjoin('path/to/', 'foo'))
821
self.assertEqual('/foo',
822
osutils._win32_pathjoin('C:/path/to/', '/foo'))
823
self.assertEqual('/foo',
824
osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
826
def test_normpath(self):
827
self.assertEqual('path/to/foo',
828
osutils._win32_normpath(r'path\\from\..\to\.\foo'))
829
self.assertEqual('path/to/foo',
830
osutils._win32_normpath('path//from/../to/./foo'))
832
def test_getcwd(self):
833
cwd = osutils._win32_getcwd()
834
os_cwd = os.getcwdu()
835
self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
836
# win32 is inconsistent whether it returns lower or upper case
837
# and even if it was consistent the user might type the other
838
# so we force it to uppercase
839
# running python.exe under cmd.exe return capital C:\\
840
# running win32 python inside a cygwin shell returns lowercase
841
self.assertEqual(os_cwd[0].upper(), cwd[0])
843
def test_fixdrive(self):
844
self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
845
self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
846
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
848
def test_win98_abspath(self):
850
self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
851
self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
853
self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
854
self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
856
cwd = osutils.getcwd().rstrip('/')
857
drive = osutils._nt_splitdrive(cwd)[0]
858
self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
859
self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
862
self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
865
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
866
"""Test win32 functions that create files."""
868
def test_getcwd(self):
869
self.requireFeature(tests.UnicodeFilenameFeature)
872
# TODO: jam 20060427 This will probably fail on Mac OSX because
873
# it will change the normalization of B\xe5gfors
874
# Consider using a different unicode character, or make
875
# osutils.getcwd() renormalize the path.
876
self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
878
def test_minimum_path_selection(self):
879
self.assertEqual(set(),
880
osutils.minimum_path_selection([]))
881
self.assertEqual(set(['a']),
882
osutils.minimum_path_selection(['a']))
883
self.assertEqual(set(['a', 'b']),
884
osutils.minimum_path_selection(['a', 'b']))
885
self.assertEqual(set(['a/', 'b']),
886
osutils.minimum_path_selection(['a/', 'b']))
887
self.assertEqual(set(['a/', 'b']),
888
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
889
self.assertEqual(set(['a-b', 'a', 'a0b']),
890
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
892
def test_mkdtemp(self):
893
tmpdir = osutils._win32_mkdtemp(dir='.')
894
self.assertFalse('\\' in tmpdir)
896
def test_rename(self):
904
osutils._win32_rename('b', 'a')
905
self.failUnlessExists('a')
906
self.failIfExists('b')
907
self.assertFileEqual('baz\n', 'a')
909
def test_rename_missing_file(self):
915
osutils._win32_rename('b', 'a')
916
except (IOError, OSError), e:
917
self.assertEqual(errno.ENOENT, e.errno)
918
self.assertFileEqual('foo\n', 'a')
920
def test_rename_missing_dir(self):
923
osutils._win32_rename('b', 'a')
924
except (IOError, OSError), e:
925
self.assertEqual(errno.ENOENT, e.errno)
927
def test_rename_current_dir(self):
930
# You can't rename the working directory
931
# doing rename non-existant . usually
932
# just raises ENOENT, since non-existant
935
osutils._win32_rename('b', '.')
936
except (IOError, OSError), e:
937
self.assertEqual(errno.ENOENT, e.errno)
939
def test_splitpath(self):
940
def check(expected, path):
941
self.assertEqual(expected, osutils.splitpath(path))
944
check(['a', 'b'], 'a/b')
945
check(['a', 'b'], 'a/./b')
946
check(['a', '.b'], 'a/.b')
947
check(['a', '.b'], 'a\\.b')
949
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
952
class TestParentDirectories(tests.TestCaseInTempDir):
953
"""Test osutils.parent_directories()"""
955
def test_parent_directories(self):
956
self.assertEqual([], osutils.parent_directories('a'))
957
self.assertEqual(['a'], osutils.parent_directories('a/b'))
958
self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
961
class TestMacFuncsDirs(tests.TestCaseInTempDir):
962
"""Test mac special functions that require directories."""
964
def test_getcwd(self):
965
self.requireFeature(tests.UnicodeFilenameFeature)
966
os.mkdir(u'B\xe5gfors')
967
os.chdir(u'B\xe5gfors')
968
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
970
def test_getcwd_nonnorm(self):
971
self.requireFeature(tests.UnicodeFilenameFeature)
972
# Test that _mac_getcwd() will normalize this path
973
os.mkdir(u'Ba\u030agfors')
974
os.chdir(u'Ba\u030agfors')
975
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
978
class TestChunksToLines(tests.TestCase):
980
def test_smoketest(self):
981
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
982
osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
983
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
984
osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
986
def test_osutils_binding(self):
987
from bzrlib.tests import test__chunks_to_lines
988
if test__chunks_to_lines.CompiledChunksToLinesFeature.available():
989
from bzrlib._chunks_to_lines_pyx import chunks_to_lines
991
from bzrlib._chunks_to_lines_py import chunks_to_lines
992
self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
995
class TestSplitLines(tests.TestCase):
997
def test_split_unicode(self):
998
self.assertEqual([u'foo\n', u'bar\xae'],
999
osutils.split_lines(u'foo\nbar\xae'))
1000
self.assertEqual([u'foo\n', u'bar\xae\n'],
1001
osutils.split_lines(u'foo\nbar\xae\n'))
1003
def test_split_with_carriage_returns(self):
1004
self.assertEqual(['foo\rbar\n'],
1005
osutils.split_lines('foo\rbar\n'))
1008
class TestWalkDirs(tests.TestCaseInTempDir):
1010
def assertExpectedBlocks(self, expected, result):
1011
self.assertEqual(expected,
1012
[(dirinfo, [line[0:3] for line in block])
1013
for dirinfo, block in result])
1015
def test_walkdirs(self):
1024
self.build_tree(tree)
1025
expected_dirblocks = [
1027
[('0file', '0file', 'file'),
1028
('1dir', '1dir', 'directory'),
1029
('2file', '2file', 'file'),
1032
(('1dir', './1dir'),
1033
[('1dir/0file', '0file', 'file'),
1034
('1dir/1dir', '1dir', 'directory'),
1037
(('1dir/1dir', './1dir/1dir'),
1043
found_bzrdir = False
1044
for dirdetail, dirblock in osutils.walkdirs('.'):
1045
if len(dirblock) and dirblock[0][1] == '.bzr':
1046
# this tests the filtering of selected paths
1049
result.append((dirdetail, dirblock))
1051
self.assertTrue(found_bzrdir)
1052
self.assertExpectedBlocks(expected_dirblocks, result)
1053
# you can search a subdir only, with a supplied prefix.
1055
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1056
result.append(dirblock)
1057
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1059
def test_walkdirs_os_error(self):
1060
# <https://bugs.edge.launchpad.net/bzr/+bug/338653>
1061
# Pyrex readdir didn't raise useful messages if it had an error
1062
# reading the directory
1063
if sys.platform == 'win32':
1064
raise tests.TestNotApplicable(
1065
"readdir IOError not tested on win32")
1066
os.mkdir("test-unreadable")
1067
os.chmod("test-unreadable", 0000)
1068
# must chmod it back so that it can be removed
1069
self.addCleanup(os.chmod, "test-unreadable", 0700)
1070
# The error is not raised until the generator is actually evaluated.
1071
# (It would be ok if it happened earlier but at the moment it
1073
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1074
self.assertEquals('./test-unreadable', e.filename)
1075
self.assertEquals(errno.EACCES, e.errno)
1076
# Ensure the message contains the file name
1077
self.assertContainsRe(str(e), "\./test-unreadable")
1079
def test__walkdirs_utf8(self):
1088
self.build_tree(tree)
1089
expected_dirblocks = [
1091
[('0file', '0file', 'file'),
1092
('1dir', '1dir', 'directory'),
1093
('2file', '2file', 'file'),
1096
(('1dir', './1dir'),
1097
[('1dir/0file', '0file', 'file'),
1098
('1dir/1dir', '1dir', 'directory'),
1101
(('1dir/1dir', './1dir/1dir'),
1107
found_bzrdir = False
1108
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1109
if len(dirblock) and dirblock[0][1] == '.bzr':
1110
# this tests the filtering of selected paths
1113
result.append((dirdetail, dirblock))
1115
self.assertTrue(found_bzrdir)
1116
self.assertExpectedBlocks(expected_dirblocks, result)
1118
# you can search a subdir only, with a supplied prefix.
1120
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1121
result.append(dirblock)
1122
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1124
def _filter_out_stat(self, result):
1125
"""Filter out the stat value from the walkdirs result"""
1126
for dirdetail, dirblock in result:
1128
for info in dirblock:
1129
# Ignore info[3] which is the stat
1130
new_dirblock.append((info[0], info[1], info[2], info[4]))
1131
dirblock[:] = new_dirblock
1133
def _save_platform_info(self):
1134
cur_winver = win32utils.winver
1135
cur_fs_enc = osutils._fs_enc
1136
cur_dir_reader = osutils._selected_dir_reader
1138
win32utils.winver = cur_winver
1139
osutils._fs_enc = cur_fs_enc
1140
osutils._selected_dir_reader = cur_dir_reader
1141
self.addCleanup(restore)
1143
def assertDirReaderIs(self, expected):
1144
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1145
# Force it to redetect
1146
osutils._selected_dir_reader = None
1147
# Nothing to list, but should still trigger the selection logic
1148
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1149
self.assertIsInstance(osutils._selected_dir_reader, expected)
1151
def test_force_walkdirs_utf8_fs_utf8(self):
1152
self.requireFeature(UTF8DirReaderFeature)
1153
self._save_platform_info()
1154
win32utils.winver = None # Avoid the win32 detection code
1155
osutils._fs_enc = 'UTF-8'
1156
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1158
def test_force_walkdirs_utf8_fs_ascii(self):
1159
self.requireFeature(UTF8DirReaderFeature)
1160
self._save_platform_info()
1161
win32utils.winver = None # Avoid the win32 detection code
1162
osutils._fs_enc = 'US-ASCII'
1163
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1165
def test_force_walkdirs_utf8_fs_ANSI(self):
1166
self.requireFeature(UTF8DirReaderFeature)
1167
self._save_platform_info()
1168
win32utils.winver = None # Avoid the win32 detection code
1169
osutils._fs_enc = 'ANSI_X3.4-1968'
1170
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1172
def test_force_walkdirs_utf8_fs_latin1(self):
1173
self._save_platform_info()
1174
win32utils.winver = None # Avoid the win32 detection code
1175
osutils._fs_enc = 'latin1'
1176
self.assertDirReaderIs(osutils.UnicodeDirReader)
1178
def test_force_walkdirs_utf8_nt(self):
1179
# Disabled because the thunk of the whole walkdirs api is disabled.
1180
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1181
self._save_platform_info()
1182
win32utils.winver = 'Windows NT'
1183
from bzrlib._walkdirs_win32 import Win32ReadDir
1184
self.assertDirReaderIs(Win32ReadDir)
1186
def test_force_walkdirs_utf8_98(self):
1187
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1188
self._save_platform_info()
1189
win32utils.winver = 'Windows 98'
1190
self.assertDirReaderIs(osutils.UnicodeDirReader)
1192
def test_unicode_walkdirs(self):
1193
"""Walkdirs should always return unicode paths."""
1194
self.requireFeature(tests.UnicodeFilenameFeature)
1195
name0 = u'0file-\xb6'
1196
name1 = u'1dir-\u062c\u0648'
1197
name2 = u'2file-\u0633'
1201
name1 + '/' + name0,
1202
name1 + '/' + name1 + '/',
1205
self.build_tree(tree)
1206
expected_dirblocks = [
1208
[(name0, name0, 'file', './' + name0),
1209
(name1, name1, 'directory', './' + name1),
1210
(name2, name2, 'file', './' + name2),
1213
((name1, './' + name1),
1214
[(name1 + '/' + name0, name0, 'file', './' + name1
1216
(name1 + '/' + name1, name1, 'directory', './' + name1
1220
((name1 + '/' + name1, './' + name1 + '/' + name1),
1225
result = list(osutils.walkdirs('.'))
1226
self._filter_out_stat(result)
1227
self.assertEqual(expected_dirblocks, result)
1228
result = list(osutils.walkdirs(u'./'+name1, name1))
1229
self._filter_out_stat(result)
1230
self.assertEqual(expected_dirblocks[1:], result)
1232
def test_unicode__walkdirs_utf8(self):
1233
"""Walkdirs_utf8 should always return utf8 paths.
1235
The abspath portion might be in unicode or utf-8
1237
self.requireFeature(tests.UnicodeFilenameFeature)
1238
name0 = u'0file-\xb6'
1239
name1 = u'1dir-\u062c\u0648'
1240
name2 = u'2file-\u0633'
1244
name1 + '/' + name0,
1245
name1 + '/' + name1 + '/',
1248
self.build_tree(tree)
1249
name0 = name0.encode('utf8')
1250
name1 = name1.encode('utf8')
1251
name2 = name2.encode('utf8')
1253
expected_dirblocks = [
1255
[(name0, name0, 'file', './' + name0),
1256
(name1, name1, 'directory', './' + name1),
1257
(name2, name2, 'file', './' + name2),
1260
((name1, './' + name1),
1261
[(name1 + '/' + name0, name0, 'file', './' + name1
1263
(name1 + '/' + name1, name1, 'directory', './' + name1
1267
((name1 + '/' + name1, './' + name1 + '/' + name1),
1273
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1274
# all abspaths are Unicode, and encode them back into utf8.
1275
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1276
self.assertIsInstance(dirdetail[0], str)
1277
if isinstance(dirdetail[1], unicode):
1278
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1279
dirblock = [list(info) for info in dirblock]
1280
for info in dirblock:
1281
self.assertIsInstance(info[4], unicode)
1282
info[4] = info[4].encode('utf8')
1284
for info in dirblock:
1285
self.assertIsInstance(info[0], str)
1286
self.assertIsInstance(info[1], str)
1287
self.assertIsInstance(info[4], str)
1288
# Remove the stat information
1289
new_dirblock.append((info[0], info[1], info[2], info[4]))
1290
result.append((dirdetail, new_dirblock))
1291
self.assertEqual(expected_dirblocks, result)
1293
def test__walkdirs_utf8_with_unicode_fs(self):
1294
"""UnicodeDirReader should be a safe fallback everywhere
1296
The abspath portion should be in unicode
1298
self.requireFeature(tests.UnicodeFilenameFeature)
1299
# Use the unicode reader. TODO: split into driver-and-driven unit
1301
self._save_platform_info()
1302
osutils._selected_dir_reader = osutils.UnicodeDirReader()
1303
name0u = u'0file-\xb6'
1304
name1u = u'1dir-\u062c\u0648'
1305
name2u = u'2file-\u0633'
1309
name1u + '/' + name0u,
1310
name1u + '/' + name1u + '/',
1313
self.build_tree(tree)
1314
name0 = name0u.encode('utf8')
1315
name1 = name1u.encode('utf8')
1316
name2 = name2u.encode('utf8')
1318
# All of the abspaths should be in unicode, all of the relative paths
1320
expected_dirblocks = [
1322
[(name0, name0, 'file', './' + name0u),
1323
(name1, name1, 'directory', './' + name1u),
1324
(name2, name2, 'file', './' + name2u),
1327
((name1, './' + name1u),
1328
[(name1 + '/' + name0, name0, 'file', './' + name1u
1330
(name1 + '/' + name1, name1, 'directory', './' + name1u
1334
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1339
result = list(osutils._walkdirs_utf8('.'))
1340
self._filter_out_stat(result)
1341
self.assertEqual(expected_dirblocks, result)
1343
def test__walkdirs_utf8_win32readdir(self):
1344
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1345
self.requireFeature(tests.UnicodeFilenameFeature)
1346
from bzrlib._walkdirs_win32 import Win32ReadDir
1347
self._save_platform_info()
1348
osutils._selected_dir_reader = Win32ReadDir()
1349
name0u = u'0file-\xb6'
1350
name1u = u'1dir-\u062c\u0648'
1351
name2u = u'2file-\u0633'
1355
name1u + '/' + name0u,
1356
name1u + '/' + name1u + '/',
1359
self.build_tree(tree)
1360
name0 = name0u.encode('utf8')
1361
name1 = name1u.encode('utf8')
1362
name2 = name2u.encode('utf8')
1364
# All of the abspaths should be in unicode, all of the relative paths
1366
expected_dirblocks = [
1368
[(name0, name0, 'file', './' + name0u),
1369
(name1, name1, 'directory', './' + name1u),
1370
(name2, name2, 'file', './' + name2u),
1373
((name1, './' + name1u),
1374
[(name1 + '/' + name0, name0, 'file', './' + name1u
1376
(name1 + '/' + name1, name1, 'directory', './' + name1u
1380
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1385
result = list(osutils._walkdirs_utf8(u'.'))
1386
self._filter_out_stat(result)
1387
self.assertEqual(expected_dirblocks, result)
1389
def assertStatIsCorrect(self, path, win32stat):
1390
os_stat = os.stat(path)
1391
self.assertEqual(os_stat.st_size, win32stat.st_size)
1392
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1393
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1394
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1395
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1396
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1397
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1399
def test__walkdirs_utf_win32_find_file_stat_file(self):
1400
"""make sure our Stat values are valid"""
1401
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1402
self.requireFeature(tests.UnicodeFilenameFeature)
1403
from bzrlib._walkdirs_win32 import Win32ReadDir
1404
name0u = u'0file-\xb6'
1405
name0 = name0u.encode('utf8')
1406
self.build_tree([name0u])
1407
# I hate to sleep() here, but I'm trying to make the ctime different
1410
f = open(name0u, 'ab')
1412
f.write('just a small update')
1416
result = Win32ReadDir().read_dir('', u'.')
1418
self.assertEqual((name0, name0, 'file'), entry[:3])
1419
self.assertEqual(u'./' + name0u, entry[4])
1420
self.assertStatIsCorrect(entry[4], entry[3])
1421
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1423
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1424
"""make sure our Stat values are valid"""
1425
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1426
self.requireFeature(tests.UnicodeFilenameFeature)
1427
from bzrlib._walkdirs_win32 import Win32ReadDir
1428
name0u = u'0dir-\u062c\u0648'
1429
name0 = name0u.encode('utf8')
1430
self.build_tree([name0u + '/'])
1432
result = Win32ReadDir().read_dir('', u'.')
1434
self.assertEqual((name0, name0, 'directory'), entry[:3])
1435
self.assertEqual(u'./' + name0u, entry[4])
1436
self.assertStatIsCorrect(entry[4], entry[3])
1438
def assertPathCompare(self, path_less, path_greater):
1439
"""check that path_less and path_greater compare correctly."""
1440
self.assertEqual(0, osutils.compare_paths_prefix_order(
1441
path_less, path_less))
1442
self.assertEqual(0, osutils.compare_paths_prefix_order(
1443
path_greater, path_greater))
1444
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1445
path_less, path_greater))
1446
self.assertEqual(1, osutils.compare_paths_prefix_order(
1447
path_greater, path_less))
1449
def test_compare_paths_prefix_order(self):
1450
# root before all else
1451
self.assertPathCompare("/", "/a")
1452
# alpha within a dir
1453
self.assertPathCompare("/a", "/b")
1454
self.assertPathCompare("/b", "/z")
1455
# high dirs before lower.
1456
self.assertPathCompare("/z", "/a/a")
1457
# except if the deeper dir should be output first
1458
self.assertPathCompare("/a/b/c", "/d/g")
1459
# lexical betwen dirs of the same height
1460
self.assertPathCompare("/a/z", "/z/z")
1461
self.assertPathCompare("/a/c/z", "/a/d/e")
1463
# this should also be consistent for no leading / paths
1464
# root before all else
1465
self.assertPathCompare("", "a")
1466
# alpha within a dir
1467
self.assertPathCompare("a", "b")
1468
self.assertPathCompare("b", "z")
1469
# high dirs before lower.
1470
self.assertPathCompare("z", "a/a")
1471
# except if the deeper dir should be output first
1472
self.assertPathCompare("a/b/c", "d/g")
1473
# lexical betwen dirs of the same height
1474
self.assertPathCompare("a/z", "z/z")
1475
self.assertPathCompare("a/c/z", "a/d/e")
1477
def test_path_prefix_sorting(self):
1478
"""Doing a sort on path prefix should match our sample data."""
1493
dir_sorted_paths = [
1509
sorted(original_paths, key=osutils.path_prefix_key))
1510
# using the comparison routine shoudl work too:
1513
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1516
class TestCopyTree(tests.TestCaseInTempDir):
1518
def test_copy_basic_tree(self):
1519
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1520
osutils.copy_tree('source', 'target')
1521
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1522
self.assertEqual(['c'], os.listdir('target/b'))
1524
def test_copy_tree_target_exists(self):
1525
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1527
osutils.copy_tree('source', 'target')
1528
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1529
self.assertEqual(['c'], os.listdir('target/b'))
1531
def test_copy_tree_symlinks(self):
1532
self.requireFeature(tests.SymlinkFeature)
1533
self.build_tree(['source/'])
1534
os.symlink('a/generic/path', 'source/lnk')
1535
osutils.copy_tree('source', 'target')
1536
self.assertEqual(['lnk'], os.listdir('target'))
1537
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1539
def test_copy_tree_handlers(self):
1540
processed_files = []
1541
processed_links = []
1542
def file_handler(from_path, to_path):
1543
processed_files.append(('f', from_path, to_path))
1544
def dir_handler(from_path, to_path):
1545
processed_files.append(('d', from_path, to_path))
1546
def link_handler(from_path, to_path):
1547
processed_links.append((from_path, to_path))
1548
handlers = {'file':file_handler,
1549
'directory':dir_handler,
1550
'symlink':link_handler,
1553
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1554
if osutils.has_symlinks():
1555
os.symlink('a/generic/path', 'source/lnk')
1556
osutils.copy_tree('source', 'target', handlers=handlers)
1558
self.assertEqual([('d', 'source', 'target'),
1559
('f', 'source/a', 'target/a'),
1560
('d', 'source/b', 'target/b'),
1561
('f', 'source/b/c', 'target/b/c'),
1563
self.failIfExists('target')
1564
if osutils.has_symlinks():
1565
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1568
class TestSetUnsetEnv(tests.TestCase):
1569
"""Test updating the environment"""
1572
super(TestSetUnsetEnv, self).setUp()
1574
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1575
'Environment was not cleaned up properly.'
1576
' Variable BZR_TEST_ENV_VAR should not exist.')
1578
if 'BZR_TEST_ENV_VAR' in os.environ:
1579
del os.environ['BZR_TEST_ENV_VAR']
1581
self.addCleanup(cleanup)
1584
"""Test that we can set an env variable"""
1585
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1586
self.assertEqual(None, old)
1587
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1589
def test_double_set(self):
1590
"""Test that we get the old value out"""
1591
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1592
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1593
self.assertEqual('foo', old)
1594
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1596
def test_unicode(self):
1597
"""Environment can only contain plain strings
1599
So Unicode strings must be encoded.
1601
uni_val, env_val = tests.probe_unicode_in_user_encoding()
1603
raise tests.TestSkipped(
1604
'Cannot find a unicode character that works in encoding %s'
1605
% (osutils.get_user_encoding(),))
1607
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1608
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1610
def test_unset(self):
1611
"""Test that passing None will remove the env var"""
1612
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1613
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1614
self.assertEqual('foo', old)
1615
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1616
self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1619
class TestSizeShaFile(tests.TestCaseInTempDir):
1621
def test_sha_empty(self):
1622
self.build_tree_contents([('foo', '')])
1623
expected_sha = osutils.sha_string('')
1625
self.addCleanup(f.close)
1626
size, sha = osutils.size_sha_file(f)
1627
self.assertEqual(0, size)
1628
self.assertEqual(expected_sha, sha)
1630
def test_sha_mixed_endings(self):
1631
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1632
self.build_tree_contents([('foo', text)])
1633
expected_sha = osutils.sha_string(text)
1635
self.addCleanup(f.close)
1636
size, sha = osutils.size_sha_file(f)
1637
self.assertEqual(38, size)
1638
self.assertEqual(expected_sha, sha)
1641
class TestShaFileByName(tests.TestCaseInTempDir):
1643
def test_sha_empty(self):
1644
self.build_tree_contents([('foo', '')])
1645
expected_sha = osutils.sha_string('')
1646
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1648
def test_sha_mixed_endings(self):
1649
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1650
self.build_tree_contents([('foo', text)])
1651
expected_sha = osutils.sha_string(text)
1652
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1655
class TestResourceLoading(tests.TestCaseInTempDir):
1657
def test_resource_string(self):
1658
# test resource in bzrlib
1659
text = osutils.resource_string('bzrlib', 'debug.py')
1660
self.assertContainsRe(text, "debug_flags = set()")
1661
# test resource under bzrlib
1662
text = osutils.resource_string('bzrlib.ui', 'text.py')
1663
self.assertContainsRe(text, "class TextUIFactory")
1664
# test unsupported package
1665
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1667
# test unknown resource
1668
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1671
class TestReCompile(tests.TestCase):
1673
def test_re_compile_checked(self):
1674
r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1675
self.assertTrue(r.match('aaaa'))
1676
self.assertTrue(r.match('aAaA'))
1678
def test_re_compile_checked_error(self):
1679
# like https://bugs.launchpad.net/bzr/+bug/251352
1680
err = self.assertRaises(
1681
errors.BzrCommandError,
1682
osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1684
"Invalid regular expression in test case: '*': "
1685
"nothing to repeat",
1689
class TestDirReader(tests.TestCaseInTempDir):
1692
_dir_reader_class = None
1693
_native_to_unicode = None
1696
tests.TestCaseInTempDir.setUp(self)
1698
# Save platform specific info and reset it
1699
cur_dir_reader = osutils._selected_dir_reader
1702
osutils._selected_dir_reader = cur_dir_reader
1703
self.addCleanup(restore)
1705
osutils._selected_dir_reader = self._dir_reader_class()
1707
def _get_ascii_tree(self):
1715
expected_dirblocks = [
1717
[('0file', '0file', 'file'),
1718
('1dir', '1dir', 'directory'),
1719
('2file', '2file', 'file'),
1722
(('1dir', './1dir'),
1723
[('1dir/0file', '0file', 'file'),
1724
('1dir/1dir', '1dir', 'directory'),
1727
(('1dir/1dir', './1dir/1dir'),
1732
return tree, expected_dirblocks
1734
def test_walk_cur_dir(self):
1735
tree, expected_dirblocks = self._get_ascii_tree()
1736
self.build_tree(tree)
1737
result = list(osutils._walkdirs_utf8('.'))
1738
# Filter out stat and abspath
1739
self.assertEqual(expected_dirblocks,
1740
[(dirinfo, [line[0:3] for line in block])
1741
for dirinfo, block in result])
1743
def test_walk_sub_dir(self):
1744
tree, expected_dirblocks = self._get_ascii_tree()
1745
self.build_tree(tree)
1746
# you can search a subdir only, with a supplied prefix.
1747
result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1748
# Filter out stat and abspath
1749
self.assertEqual(expected_dirblocks[1:],
1750
[(dirinfo, [line[0:3] for line in block])
1751
for dirinfo, block in result])
1753
def _get_unicode_tree(self):
1754
name0u = u'0file-\xb6'
1755
name1u = u'1dir-\u062c\u0648'
1756
name2u = u'2file-\u0633'
1760
name1u + '/' + name0u,
1761
name1u + '/' + name1u + '/',
1764
name0 = name0u.encode('UTF-8')
1765
name1 = name1u.encode('UTF-8')
1766
name2 = name2u.encode('UTF-8')
1767
expected_dirblocks = [
1769
[(name0, name0, 'file', './' + name0u),
1770
(name1, name1, 'directory', './' + name1u),
1771
(name2, name2, 'file', './' + name2u),
1774
((name1, './' + name1u),
1775
[(name1 + '/' + name0, name0, 'file', './' + name1u
1777
(name1 + '/' + name1, name1, 'directory', './' + name1u
1781
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1786
return tree, expected_dirblocks
1788
def _filter_out(self, raw_dirblocks):
1789
"""Filter out a walkdirs_utf8 result.
1791
stat field is removed, all native paths are converted to unicode
1793
filtered_dirblocks = []
1794
for dirinfo, block in raw_dirblocks:
1795
dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1798
details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1799
filtered_dirblocks.append((dirinfo, details))
1800
return filtered_dirblocks
1802
def test_walk_unicode_tree(self):
1803
self.requireFeature(tests.UnicodeFilenameFeature)
1804
tree, expected_dirblocks = self._get_unicode_tree()
1805
self.build_tree(tree)
1806
result = list(osutils._walkdirs_utf8('.'))
1807
self.assertEqual(expected_dirblocks, self._filter_out(result))
1809
def test_symlink(self):
1810
self.requireFeature(tests.SymlinkFeature)
1811
self.requireFeature(tests.UnicodeFilenameFeature)
1812
target = u'target\N{Euro Sign}'
1813
link_name = u'l\N{Euro Sign}nk'
1814
os.symlink(target, link_name)
1815
target_utf8 = target.encode('UTF-8')
1816
link_name_utf8 = link_name.encode('UTF-8')
1817
expected_dirblocks = [
1819
[(link_name_utf8, link_name_utf8,
1820
'symlink', './' + link_name),],
1822
result = list(osutils._walkdirs_utf8('.'))
1823
self.assertEqual(expected_dirblocks, self._filter_out(result))
1826
class TestReadLink(tests.TestCaseInTempDir):
1827
"""Exposes os.readlink() problems and the osutils solution.
1829
The only guarantee offered by os.readlink(), starting with 2.6, is that a
1830
unicode string will be returned if a unicode string is passed.
1832
But prior python versions failed to properly encode the passed unicode
1835
_test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1838
super(tests.TestCaseInTempDir, self).setUp()
1839
self.link = u'l\N{Euro Sign}ink'
1840
self.target = u'targe\N{Euro Sign}t'
1841
os.symlink(self.target, self.link)
1843
def test_os_readlink_link_encoding(self):
1844
if sys.version_info < (2, 6):
1845
self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1847
self.assertEquals(self.target, os.readlink(self.link))
1849
def test_os_readlink_link_decoding(self):
1850
self.assertEquals(self.target.encode(osutils._fs_enc),
1851
os.readlink(self.link.encode(osutils._fs_enc)))
1854
class TestConcurrency(tests.TestCase):
1856
def test_local_concurrency(self):
1857
concurrency = osutils.local_concurrency()
1858
self.assertIsInstance(concurrency, int)
1861
class TestFailedToLoadExtension(tests.TestCase):
1863
def _try_loading(self):
1865
import bzrlib._fictional_extension_py
1866
except ImportError, e:
1867
osutils.failed_to_load_extension(e)
1871
super(TestFailedToLoadExtension, self).setUp()
1872
self.saved_failures = osutils._extension_load_failures[:]
1873
del osutils._extension_load_failures[:]
1874
self.addCleanup(self.restore_failures)
1876
def restore_failures(self):
1877
osutils._extension_load_failures = self.saved_failures
1879
def test_failure_to_load(self):
1881
self.assertLength(1, osutils._extension_load_failures)
1882
self.assertEquals(osutils._extension_load_failures[0],
1883
"No module named _fictional_extension_py")
1885
def test_report_extension_load_failures_no_warning(self):
1886
self.assertTrue(self._try_loading())
1887
warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1888
# it used to give a Python warning; it no longer does
1889
self.assertLength(0, warnings)
1891
def test_report_extension_load_failures_message(self):
1893
trace.push_log_file(log)
1894
self.assertTrue(self._try_loading())
1895
osutils.report_extension_load_failures()
1896
self.assertContainsRe(
1898
r"bzr: warning: some compiled extensions could not be loaded; "
1899
"see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"