1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the osutils wrapper."""
19
from cStringIO import StringIO
35
from bzrlib.tests import (
41
class _UTF8DirReaderFeature(tests.Feature):
45
from bzrlib import _readdir_pyx
46
self.reader = _readdir_pyx.UTF8DirReader
51
def feature_name(self):
52
return 'bzrlib._readdir_pyx'
54
UTF8DirReaderFeature = _UTF8DirReaderFeature()
57
def _already_unicode(s):
61
def _utf8_to_unicode(s):
62
return s.decode('UTF-8')
65
def dir_reader_scenarios():
66
# For each dir reader we define:
68
# - native_to_unicode: a function converting the native_abspath as returned
69
# by DirReader.read_dir to its unicode representation
71
# UnicodeDirReader is the fallback, it should be tested on all platforms.
72
scenarios = [('unicode',
73
dict(_dir_reader_class=osutils.UnicodeDirReader,
74
_native_to_unicode=_already_unicode))]
75
# Some DirReaders are platform specific and even there they may not be
77
if UTF8DirReaderFeature.available():
78
from bzrlib import _readdir_pyx
79
scenarios.append(('utf8',
80
dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
81
_native_to_unicode=_utf8_to_unicode)))
83
if test__walkdirs_win32.Win32ReadDirFeature.available():
85
from bzrlib import _walkdirs_win32
88
dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
89
_native_to_unicode=_already_unicode)))
95
def load_tests(basic_tests, module, loader):
96
suite = loader.suiteClass()
97
dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
98
basic_tests, tests.condition_isinstance(TestDirReader))
99
tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
100
suite.addTest(remaining_tests)
104
class TestContainsWhitespace(tests.TestCase):
106
def test_contains_whitespace(self):
107
self.failUnless(osutils.contains_whitespace(u' '))
108
self.failUnless(osutils.contains_whitespace(u'hello there'))
109
self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
110
self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
111
self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
112
self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
114
# \xa0 is "Non-breaking-space" which on some python locales thinks it
115
# is whitespace, but we do not.
116
self.failIf(osutils.contains_whitespace(u''))
117
self.failIf(osutils.contains_whitespace(u'hellothere'))
118
self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
121
class TestRename(tests.TestCaseInTempDir):
123
def create_file(self, filename, content):
124
f = open(filename, 'wb')
130
def _fancy_rename(self, a, b):
131
osutils.fancy_rename(a, b, rename_func=os.rename,
132
unlink_func=os.unlink)
134
def test_fancy_rename(self):
135
# This should work everywhere
136
self.create_file('a', 'something in a\n')
137
self._fancy_rename('a', 'b')
138
self.failIfExists('a')
139
self.failUnlessExists('b')
140
self.check_file_contents('b', 'something in a\n')
142
self.create_file('a', 'new something in a\n')
143
self._fancy_rename('b', 'a')
145
self.check_file_contents('a', 'something in a\n')
147
def test_fancy_rename_fails_source_missing(self):
148
# An exception should be raised, and the target should be left in place
149
self.create_file('target', 'data in target\n')
150
self.assertRaises((IOError, OSError), self._fancy_rename,
151
'missingsource', 'target')
152
self.failUnlessExists('target')
153
self.check_file_contents('target', 'data in target\n')
155
def test_fancy_rename_fails_if_source_and_target_missing(self):
156
self.assertRaises((IOError, OSError), self._fancy_rename,
157
'missingsource', 'missingtarget')
159
def test_rename(self):
160
# Rename should be semi-atomic on all platforms
161
self.create_file('a', 'something in a\n')
162
osutils.rename('a', 'b')
163
self.failIfExists('a')
164
self.failUnlessExists('b')
165
self.check_file_contents('b', 'something in a\n')
167
self.create_file('a', 'new something in a\n')
168
osutils.rename('b', 'a')
170
self.check_file_contents('a', 'something in a\n')
172
# TODO: test fancy_rename using a MemoryTransport
174
def test_rename_change_case(self):
175
# on Windows we should be able to change filename case by rename
176
self.build_tree(['a', 'b/'])
177
osutils.rename('a', 'A')
178
osutils.rename('b', 'B')
179
# we can't use failUnlessExists on case-insensitive filesystem
180
# so try to check shape of the tree
181
shape = sorted(os.listdir('.'))
182
self.assertEquals(['A', 'B'], shape)
185
class TestRandChars(tests.TestCase):
187
def test_01_rand_chars_empty(self):
188
result = osutils.rand_chars(0)
189
self.assertEqual(result, '')
191
def test_02_rand_chars_100(self):
192
result = osutils.rand_chars(100)
193
self.assertEqual(len(result), 100)
194
self.assertEqual(type(result), str)
195
self.assertContainsRe(result, r'^[a-z0-9]{100}$')
198
class TestIsInside(tests.TestCase):
200
def test_is_inside(self):
201
is_inside = osutils.is_inside
202
self.assertTrue(is_inside('src', 'src/foo.c'))
203
self.assertFalse(is_inside('src', 'srccontrol'))
204
self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
205
self.assertTrue(is_inside('foo.c', 'foo.c'))
206
self.assertFalse(is_inside('foo.c', ''))
207
self.assertTrue(is_inside('', 'foo.c'))
209
def test_is_inside_any(self):
210
SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
211
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
212
(['src'], SRC_FOO_C),
215
self.assert_(osutils.is_inside_any(dirs, fn))
216
for dirs, fn in [(['src'], 'srccontrol'),
217
(['src'], 'srccontrol/foo')]:
218
self.assertFalse(osutils.is_inside_any(dirs, fn))
220
def test_is_inside_or_parent_of_any(self):
221
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
222
(['src'], 'src/foo.c'),
223
(['src/bar.c'], 'src'),
224
(['src/bar.c', 'bla/foo.c'], 'src'),
227
self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
229
for dirs, fn in [(['src'], 'srccontrol'),
230
(['srccontrol/foo.c'], 'src'),
231
(['src'], 'srccontrol/foo')]:
232
self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
235
class TestRmTree(tests.TestCaseInTempDir):
237
def test_rmtree(self):
238
# Check to remove tree with read-only files/dirs
240
f = file('dir/file', 'w')
243
# would like to also try making the directory readonly, but at the
244
# moment python shutil.rmtree doesn't handle that properly - it would
245
# need to chmod the directory before removing things inside it - deferred
246
# for now -- mbp 20060505
247
# osutils.make_readonly('dir')
248
osutils.make_readonly('dir/file')
250
osutils.rmtree('dir')
252
self.failIfExists('dir/file')
253
self.failIfExists('dir')
256
class TestDeleteAny(tests.TestCaseInTempDir):
258
def test_delete_any_readonly(self):
259
# from <https://bugs.launchpad.net/bzr/+bug/218206>
260
self.build_tree(['d/', 'f'])
261
osutils.make_readonly('d')
262
osutils.make_readonly('f')
264
osutils.delete_any('f')
265
osutils.delete_any('d')
268
class TestKind(tests.TestCaseInTempDir):
270
def test_file_kind(self):
271
self.build_tree(['file', 'dir/'])
272
self.assertEquals('file', osutils.file_kind('file'))
273
self.assertEquals('directory', osutils.file_kind('dir/'))
274
if osutils.has_symlinks():
275
os.symlink('symlink', 'symlink')
276
self.assertEquals('symlink', osutils.file_kind('symlink'))
278
# TODO: jam 20060529 Test a block device
280
os.lstat('/dev/null')
282
if e.errno not in (errno.ENOENT,):
285
self.assertEquals('chardev', osutils.file_kind('/dev/null'))
287
mkfifo = getattr(os, 'mkfifo', None)
291
self.assertEquals('fifo', osutils.file_kind('fifo'))
295
AF_UNIX = getattr(socket, 'AF_UNIX', None)
297
s = socket.socket(AF_UNIX)
300
self.assertEquals('socket', osutils.file_kind('socket'))
304
def test_kind_marker(self):
305
self.assertEqual("", osutils.kind_marker("file"))
306
self.assertEqual("/", osutils.kind_marker('directory'))
307
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
308
self.assertEqual("@", osutils.kind_marker("symlink"))
309
self.assertEqual("+", osutils.kind_marker("tree-reference"))
310
self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
313
class TestUmask(tests.TestCaseInTempDir):
315
def test_get_umask(self):
316
if sys.platform == 'win32':
317
# umask always returns '0', no way to set it
318
self.assertEqual(0, osutils.get_umask())
321
orig_umask = osutils.get_umask()
322
self.addCleanup(os.umask, orig_umask)
324
self.assertEqual(0222, osutils.get_umask())
326
self.assertEqual(0022, osutils.get_umask())
328
self.assertEqual(0002, osutils.get_umask())
330
self.assertEqual(0027, osutils.get_umask())
333
class TestDateTime(tests.TestCase):
335
def assertFormatedDelta(self, expected, seconds):
336
"""Assert osutils.format_delta formats as expected"""
337
actual = osutils.format_delta(seconds)
338
self.assertEqual(expected, actual)
340
def test_format_delta(self):
341
self.assertFormatedDelta('0 seconds ago', 0)
342
self.assertFormatedDelta('1 second ago', 1)
343
self.assertFormatedDelta('10 seconds ago', 10)
344
self.assertFormatedDelta('59 seconds ago', 59)
345
self.assertFormatedDelta('89 seconds ago', 89)
346
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
347
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
348
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
349
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
350
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
351
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
352
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
353
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
354
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
355
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
356
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
357
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
358
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
359
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
360
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
361
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
362
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
363
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
365
# We handle when time steps the wrong direction because computers
366
# don't have synchronized clocks.
367
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
368
self.assertFormatedDelta('1 second in the future', -1)
369
self.assertFormatedDelta('2 seconds in the future', -2)
371
def test_format_date(self):
372
self.assertRaises(errors.UnsupportedTimezoneFormat,
373
osutils.format_date, 0, timezone='foo')
374
self.assertIsInstance(osutils.format_date(0), str)
375
self.assertIsInstance(osutils.format_local_date(0), unicode)
376
# Testing for the actual value of the local weekday without
377
# duplicating the code from format_date is difficult.
378
# Instead blackbox.test_locale should check for localized
379
# dates once they do occur in output strings.
381
def test_format_date_with_offset_in_original_timezone(self):
382
self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
383
osutils.format_date_with_offset_in_original_timezone(0))
384
self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
385
osutils.format_date_with_offset_in_original_timezone(100000))
386
self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
387
osutils.format_date_with_offset_in_original_timezone(100000, 7200))
389
def test_local_time_offset(self):
390
"""Test that local_time_offset() returns a sane value."""
391
offset = osutils.local_time_offset()
392
self.assertTrue(isinstance(offset, int))
393
# Test that the offset is no more than a eighteen hours in
395
# Time zone handling is system specific, so it is difficult to
396
# do more specific tests, but a value outside of this range is
398
eighteen_hours = 18 * 3600
399
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
401
def test_local_time_offset_with_timestamp(self):
402
"""Test that local_time_offset() works with a timestamp."""
403
offset = osutils.local_time_offset(1000000000.1234567)
404
self.assertTrue(isinstance(offset, int))
405
eighteen_hours = 18 * 3600
406
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
409
class TestLinks(tests.TestCaseInTempDir):
411
def test_dereference_path(self):
412
self.requireFeature(tests.SymlinkFeature)
413
cwd = osutils.realpath('.')
415
bar_path = osutils.pathjoin(cwd, 'bar')
416
# Using './' to avoid bug #1213894 (first path component not
417
# dereferenced) in Python 2.4.1 and earlier
418
self.assertEqual(bar_path, osutils.realpath('./bar'))
419
os.symlink('bar', 'foo')
420
self.assertEqual(bar_path, osutils.realpath('./foo'))
422
# Does not dereference terminal symlinks
423
foo_path = osutils.pathjoin(cwd, 'foo')
424
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
426
# Dereferences parent symlinks
428
baz_path = osutils.pathjoin(bar_path, 'baz')
429
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
431
# Dereferences parent symlinks that are the first path element
432
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
434
# Dereferences parent symlinks in absolute paths
435
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
436
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
438
def test_changing_access(self):
439
f = file('file', 'w')
443
# Make a file readonly
444
osutils.make_readonly('file')
445
mode = os.lstat('file').st_mode
446
self.assertEqual(mode, mode & 0777555)
448
# Make a file writable
449
osutils.make_writable('file')
450
mode = os.lstat('file').st_mode
451
self.assertEqual(mode, mode | 0200)
453
if osutils.has_symlinks():
454
# should not error when handed a symlink
455
os.symlink('nonexistent', 'dangling')
456
osutils.make_readonly('dangling')
457
osutils.make_writable('dangling')
459
def test_host_os_dereferences_symlinks(self):
460
osutils.host_os_dereferences_symlinks()
463
class TestCanonicalRelPath(tests.TestCaseInTempDir):
465
_test_needs_features = [tests.CaseInsCasePresFilenameFeature]
467
def test_canonical_relpath_simple(self):
468
f = file('MixedCaseName', 'w')
470
actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
471
self.failUnlessEqual('work/MixedCaseName', actual)
473
def test_canonical_relpath_missing_tail(self):
474
os.mkdir('MixedCaseParent')
475
actual = osutils.canonical_relpath(self.test_base_dir,
476
'mixedcaseparent/nochild')
477
self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
480
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
482
def assertRelpath(self, expected, base, path):
483
actual = osutils._cicp_canonical_relpath(base, path)
484
self.assertEqual(expected, actual)
486
def test_simple(self):
487
self.build_tree(['MixedCaseName'])
488
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
489
self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
491
def test_subdir_missing_tail(self):
492
self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
493
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
494
self.assertRelpath('MixedCaseParent/a_child', base,
495
'MixedCaseParent/a_child')
496
self.assertRelpath('MixedCaseParent/a_child', base,
497
'MixedCaseParent/A_Child')
498
self.assertRelpath('MixedCaseParent/not_child', base,
499
'MixedCaseParent/not_child')
501
def test_at_root_slash(self):
502
# We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
504
if osutils.MIN_ABS_PATHLENGTH > 1:
505
raise tests.TestSkipped('relpath requires %d chars'
506
% osutils.MIN_ABS_PATHLENGTH)
507
self.assertRelpath('foo', '/', '/foo')
509
def test_at_root_drive(self):
510
if sys.platform != 'win32':
511
raise tests.TestNotApplicable('we can only test drive-letter relative'
512
' paths on Windows where we have drive'
515
# The specific issue is that when at the root of a drive, 'abspath'
516
# returns "C:/" or just "/". However, the code assumes that abspath
517
# always returns something like "C:/foo" or "/foo" (no trailing slash).
518
self.assertRelpath('foo', 'C:/', 'C:/foo')
519
self.assertRelpath('foo', 'X:/', 'X:/foo')
520
self.assertRelpath('foo', 'X:/', 'X://foo')
523
class TestPumpFile(tests.TestCase):
524
"""Test pumpfile method."""
527
tests.TestCase.setUp(self)
528
# create a test datablock
529
self.block_size = 512
530
pattern = '0123456789ABCDEF'
531
self.test_data = pattern * (3 * self.block_size / len(pattern))
532
self.test_data_len = len(self.test_data)
534
def test_bracket_block_size(self):
535
"""Read data in blocks with the requested read size bracketing the
537
# make sure test data is larger than max read size
538
self.assertTrue(self.test_data_len > self.block_size)
540
from_file = file_utils.FakeReadFile(self.test_data)
543
# read (max / 2) bytes and verify read size wasn't affected
544
num_bytes_to_read = self.block_size / 2
545
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
546
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
547
self.assertEqual(from_file.get_read_count(), 1)
549
# read (max) bytes and verify read size wasn't affected
550
num_bytes_to_read = self.block_size
551
from_file.reset_read_count()
552
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
553
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
554
self.assertEqual(from_file.get_read_count(), 1)
556
# read (max + 1) bytes and verify read size was limited
557
num_bytes_to_read = self.block_size + 1
558
from_file.reset_read_count()
559
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
560
self.assertEqual(from_file.get_max_read_size(), self.block_size)
561
self.assertEqual(from_file.get_read_count(), 2)
563
# finish reading the rest of the data
564
num_bytes_to_read = self.test_data_len - to_file.tell()
565
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
567
# report error if the data wasn't equal (we only report the size due
568
# to the length of the data)
569
response_data = to_file.getvalue()
570
if response_data != self.test_data:
571
message = "Data not equal. Expected %d bytes, received %d."
572
self.fail(message % (len(response_data), self.test_data_len))
574
def test_specified_size(self):
575
"""Request a transfer larger than the maximum block size and verify
576
that the maximum read doesn't exceed the block_size."""
577
# make sure test data is larger than max read size
578
self.assertTrue(self.test_data_len > self.block_size)
580
# retrieve data in blocks
581
from_file = file_utils.FakeReadFile(self.test_data)
583
osutils.pumpfile(from_file, to_file, self.test_data_len,
586
# verify read size was equal to the maximum read size
587
self.assertTrue(from_file.get_max_read_size() > 0)
588
self.assertEqual(from_file.get_max_read_size(), self.block_size)
589
self.assertEqual(from_file.get_read_count(), 3)
591
# report error if the data wasn't equal (we only report the size due
592
# to the length of the data)
593
response_data = to_file.getvalue()
594
if response_data != self.test_data:
595
message = "Data not equal. Expected %d bytes, received %d."
596
self.fail(message % (len(response_data), self.test_data_len))
598
def test_to_eof(self):
599
"""Read to end-of-file and verify that the reads are not larger than
600
the maximum read size."""
601
# make sure test data is larger than max read size
602
self.assertTrue(self.test_data_len > self.block_size)
604
# retrieve data to EOF
605
from_file = file_utils.FakeReadFile(self.test_data)
607
osutils.pumpfile(from_file, to_file, -1, self.block_size)
609
# verify read size was equal to the maximum read size
610
self.assertEqual(from_file.get_max_read_size(), self.block_size)
611
self.assertEqual(from_file.get_read_count(), 4)
613
# report error if the data wasn't equal (we only report the size due
614
# to the length of the data)
615
response_data = to_file.getvalue()
616
if response_data != self.test_data:
617
message = "Data not equal. Expected %d bytes, received %d."
618
self.fail(message % (len(response_data), self.test_data_len))
620
def test_defaults(self):
621
"""Verifies that the default arguments will read to EOF -- this
622
test verifies that any existing usages of pumpfile will not be broken
623
with this new version."""
624
# retrieve data using default (old) pumpfile method
625
from_file = file_utils.FakeReadFile(self.test_data)
627
osutils.pumpfile(from_file, to_file)
629
# report error if the data wasn't equal (we only report the size due
630
# to the length of the data)
631
response_data = to_file.getvalue()
632
if response_data != self.test_data:
633
message = "Data not equal. Expected %d bytes, received %d."
634
self.fail(message % (len(response_data), self.test_data_len))
636
def test_report_activity(self):
638
def log_activity(length, direction):
639
activity.append((length, direction))
640
from_file = StringIO(self.test_data)
642
osutils.pumpfile(from_file, to_file, buff_size=500,
643
report_activity=log_activity, direction='read')
644
self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
645
(36, 'read')], activity)
647
from_file = StringIO(self.test_data)
650
osutils.pumpfile(from_file, to_file, buff_size=500,
651
report_activity=log_activity, direction='write')
652
self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
653
(36, 'write')], activity)
655
# And with a limited amount of data
656
from_file = StringIO(self.test_data)
659
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
660
report_activity=log_activity, direction='read')
661
self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
665
class TestPumpStringFile(tests.TestCase):
667
def test_empty(self):
669
osutils.pump_string_file("", output)
670
self.assertEqual("", output.getvalue())
672
def test_more_than_segment_size(self):
674
osutils.pump_string_file("123456789", output, 2)
675
self.assertEqual("123456789", output.getvalue())
677
def test_segment_size(self):
679
osutils.pump_string_file("12", output, 2)
680
self.assertEqual("12", output.getvalue())
682
def test_segment_size_multiple(self):
684
osutils.pump_string_file("1234", output, 2)
685
self.assertEqual("1234", output.getvalue())
688
class TestRelpath(tests.TestCase):
690
def test_simple_relpath(self):
691
cwd = osutils.getcwd()
692
subdir = cwd + '/subdir'
693
self.assertEqual('subdir', osutils.relpath(cwd, subdir))
695
def test_deep_relpath(self):
696
cwd = osutils.getcwd()
697
subdir = cwd + '/sub/subsubdir'
698
self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
700
def test_not_relative(self):
701
self.assertRaises(errors.PathNotChild,
702
osutils.relpath, 'C:/path', 'H:/path')
703
self.assertRaises(errors.PathNotChild,
704
osutils.relpath, 'C:/', 'H:/path')
707
class TestSafeUnicode(tests.TestCase):
709
def test_from_ascii_string(self):
710
self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
712
def test_from_unicode_string_ascii_contents(self):
713
self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
715
def test_from_unicode_string_unicode_contents(self):
716
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
718
def test_from_utf8_string(self):
719
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
721
def test_bad_utf8_string(self):
722
self.assertRaises(errors.BzrBadParameterNotUnicode,
723
osutils.safe_unicode,
727
class TestSafeUtf8(tests.TestCase):
729
def test_from_ascii_string(self):
731
self.assertEqual('foobar', osutils.safe_utf8(f))
733
def test_from_unicode_string_ascii_contents(self):
734
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
736
def test_from_unicode_string_unicode_contents(self):
737
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
739
def test_from_utf8_string(self):
740
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
742
def test_bad_utf8_string(self):
743
self.assertRaises(errors.BzrBadParameterNotUnicode,
744
osutils.safe_utf8, '\xbb\xbb')
747
class TestSafeRevisionId(tests.TestCase):
749
def test_from_ascii_string(self):
750
# this shouldn't give a warning because it's getting an ascii string
751
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
753
def test_from_unicode_string_ascii_contents(self):
754
self.assertEqual('bargam',
755
osutils.safe_revision_id(u'bargam', warn=False))
757
def test_from_unicode_deprecated(self):
758
self.assertEqual('bargam',
759
self.callDeprecated([osutils._revision_id_warning],
760
osutils.safe_revision_id, u'bargam'))
762
def test_from_unicode_string_unicode_contents(self):
763
self.assertEqual('bargam\xc2\xae',
764
osutils.safe_revision_id(u'bargam\xae', warn=False))
766
def test_from_utf8_string(self):
767
self.assertEqual('foo\xc2\xae',
768
osutils.safe_revision_id('foo\xc2\xae'))
771
"""Currently, None is a valid revision_id"""
772
self.assertEqual(None, osutils.safe_revision_id(None))
775
class TestSafeFileId(tests.TestCase):
777
def test_from_ascii_string(self):
778
self.assertEqual('foobar', osutils.safe_file_id('foobar'))
780
def test_from_unicode_string_ascii_contents(self):
781
self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
783
def test_from_unicode_deprecated(self):
784
self.assertEqual('bargam',
785
self.callDeprecated([osutils._file_id_warning],
786
osutils.safe_file_id, u'bargam'))
788
def test_from_unicode_string_unicode_contents(self):
789
self.assertEqual('bargam\xc2\xae',
790
osutils.safe_file_id(u'bargam\xae', warn=False))
792
def test_from_utf8_string(self):
793
self.assertEqual('foo\xc2\xae',
794
osutils.safe_file_id('foo\xc2\xae'))
797
"""Currently, None is a valid revision_id"""
798
self.assertEqual(None, osutils.safe_file_id(None))
801
class TestWin32Funcs(tests.TestCase):
802
"""Test that _win32 versions of os utilities return appropriate paths."""
804
def test_abspath(self):
805
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
806
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
807
self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
808
self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
810
def test_realpath(self):
811
self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
812
self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
814
def test_pathjoin(self):
815
self.assertEqual('path/to/foo',
816
osutils._win32_pathjoin('path', 'to', 'foo'))
817
self.assertEqual('C:/foo',
818
osutils._win32_pathjoin('path\\to', 'C:\\foo'))
819
self.assertEqual('C:/foo',
820
osutils._win32_pathjoin('path/to', 'C:/foo'))
821
self.assertEqual('path/to/foo',
822
osutils._win32_pathjoin('path/to/', 'foo'))
823
self.assertEqual('/foo',
824
osutils._win32_pathjoin('C:/path/to/', '/foo'))
825
self.assertEqual('/foo',
826
osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
828
def test_normpath(self):
829
self.assertEqual('path/to/foo',
830
osutils._win32_normpath(r'path\\from\..\to\.\foo'))
831
self.assertEqual('path/to/foo',
832
osutils._win32_normpath('path//from/../to/./foo'))
834
def test_getcwd(self):
835
cwd = osutils._win32_getcwd()
836
os_cwd = os.getcwdu()
837
self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
838
# win32 is inconsistent whether it returns lower or upper case
839
# and even if it was consistent the user might type the other
840
# so we force it to uppercase
841
# running python.exe under cmd.exe return capital C:\\
842
# running win32 python inside a cygwin shell returns lowercase
843
self.assertEqual(os_cwd[0].upper(), cwd[0])
845
def test_fixdrive(self):
846
self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
847
self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
848
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
850
def test_win98_abspath(self):
852
self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
853
self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
855
self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
856
self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
858
cwd = osutils.getcwd().rstrip('/')
859
drive = osutils._nt_splitdrive(cwd)[0]
860
self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
861
self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
864
self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
867
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
868
"""Test win32 functions that create files."""
870
def test_getcwd(self):
871
self.requireFeature(tests.UnicodeFilenameFeature)
874
# TODO: jam 20060427 This will probably fail on Mac OSX because
875
# it will change the normalization of B\xe5gfors
876
# Consider using a different unicode character, or make
877
# osutils.getcwd() renormalize the path.
878
self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
880
def test_minimum_path_selection(self):
881
self.assertEqual(set(),
882
osutils.minimum_path_selection([]))
883
self.assertEqual(set(['a']),
884
osutils.minimum_path_selection(['a']))
885
self.assertEqual(set(['a', 'b']),
886
osutils.minimum_path_selection(['a', 'b']))
887
self.assertEqual(set(['a/', 'b']),
888
osutils.minimum_path_selection(['a/', 'b']))
889
self.assertEqual(set(['a/', 'b']),
890
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
891
self.assertEqual(set(['a-b', 'a', 'a0b']),
892
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
894
def test_mkdtemp(self):
895
tmpdir = osutils._win32_mkdtemp(dir='.')
896
self.assertFalse('\\' in tmpdir)
898
def test_rename(self):
906
osutils._win32_rename('b', 'a')
907
self.failUnlessExists('a')
908
self.failIfExists('b')
909
self.assertFileEqual('baz\n', 'a')
911
def test_rename_missing_file(self):
917
osutils._win32_rename('b', 'a')
918
except (IOError, OSError), e:
919
self.assertEqual(errno.ENOENT, e.errno)
920
self.assertFileEqual('foo\n', 'a')
922
def test_rename_missing_dir(self):
925
osutils._win32_rename('b', 'a')
926
except (IOError, OSError), e:
927
self.assertEqual(errno.ENOENT, e.errno)
929
def test_rename_current_dir(self):
932
# You can't rename the working directory
933
# doing rename non-existant . usually
934
# just raises ENOENT, since non-existant
937
osutils._win32_rename('b', '.')
938
except (IOError, OSError), e:
939
self.assertEqual(errno.ENOENT, e.errno)
941
def test_splitpath(self):
942
def check(expected, path):
943
self.assertEqual(expected, osutils.splitpath(path))
946
check(['a', 'b'], 'a/b')
947
check(['a', 'b'], 'a/./b')
948
check(['a', '.b'], 'a/.b')
949
check(['a', '.b'], 'a\\.b')
951
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
954
class TestParentDirectories(tests.TestCaseInTempDir):
955
"""Test osutils.parent_directories()"""
957
def test_parent_directories(self):
958
self.assertEqual([], osutils.parent_directories('a'))
959
self.assertEqual(['a'], osutils.parent_directories('a/b'))
960
self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
963
class TestMacFuncsDirs(tests.TestCaseInTempDir):
964
"""Test mac special functions that require directories."""
966
def test_getcwd(self):
967
self.requireFeature(tests.UnicodeFilenameFeature)
968
os.mkdir(u'B\xe5gfors')
969
os.chdir(u'B\xe5gfors')
970
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
972
def test_getcwd_nonnorm(self):
973
self.requireFeature(tests.UnicodeFilenameFeature)
974
# Test that _mac_getcwd() will normalize this path
975
os.mkdir(u'Ba\u030agfors')
976
os.chdir(u'Ba\u030agfors')
977
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
980
class TestChunksToLines(tests.TestCase):
982
def test_smoketest(self):
983
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
984
osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
985
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
986
osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
988
def test_osutils_binding(self):
989
from bzrlib.tests import test__chunks_to_lines
990
if test__chunks_to_lines.CompiledChunksToLinesFeature.available():
991
from bzrlib._chunks_to_lines_pyx import chunks_to_lines
993
from bzrlib._chunks_to_lines_py import chunks_to_lines
994
self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
997
class TestSplitLines(tests.TestCase):
999
def test_split_unicode(self):
1000
self.assertEqual([u'foo\n', u'bar\xae'],
1001
osutils.split_lines(u'foo\nbar\xae'))
1002
self.assertEqual([u'foo\n', u'bar\xae\n'],
1003
osutils.split_lines(u'foo\nbar\xae\n'))
1005
def test_split_with_carriage_returns(self):
1006
self.assertEqual(['foo\rbar\n'],
1007
osutils.split_lines('foo\rbar\n'))
1010
class TestWalkDirs(tests.TestCaseInTempDir):
1012
def assertExpectedBlocks(self, expected, result):
1013
self.assertEqual(expected,
1014
[(dirinfo, [line[0:3] for line in block])
1015
for dirinfo, block in result])
1017
def test_walkdirs(self):
1026
self.build_tree(tree)
1027
expected_dirblocks = [
1029
[('0file', '0file', 'file'),
1030
('1dir', '1dir', 'directory'),
1031
('2file', '2file', 'file'),
1034
(('1dir', './1dir'),
1035
[('1dir/0file', '0file', 'file'),
1036
('1dir/1dir', '1dir', 'directory'),
1039
(('1dir/1dir', './1dir/1dir'),
1045
found_bzrdir = False
1046
for dirdetail, dirblock in osutils.walkdirs('.'):
1047
if len(dirblock) and dirblock[0][1] == '.bzr':
1048
# this tests the filtering of selected paths
1051
result.append((dirdetail, dirblock))
1053
self.assertTrue(found_bzrdir)
1054
self.assertExpectedBlocks(expected_dirblocks, result)
1055
# you can search a subdir only, with a supplied prefix.
1057
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1058
result.append(dirblock)
1059
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1061
def test_walkdirs_os_error(self):
1062
# <https://bugs.edge.launchpad.net/bzr/+bug/338653>
1063
# Pyrex readdir didn't raise useful messages if it had an error
1064
# reading the directory
1065
if sys.platform == 'win32':
1066
raise tests.TestNotApplicable(
1067
"readdir IOError not tested on win32")
1068
os.mkdir("test-unreadable")
1069
os.chmod("test-unreadable", 0000)
1070
# must chmod it back so that it can be removed
1071
self.addCleanup(os.chmod, "test-unreadable", 0700)
1072
# The error is not raised until the generator is actually evaluated.
1073
# (It would be ok if it happened earlier but at the moment it
1075
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1076
self.assertEquals('./test-unreadable', e.filename)
1077
self.assertEquals(errno.EACCES, e.errno)
1078
# Ensure the message contains the file name
1079
self.assertContainsRe(str(e), "\./test-unreadable")
1081
def test__walkdirs_utf8(self):
1090
self.build_tree(tree)
1091
expected_dirblocks = [
1093
[('0file', '0file', 'file'),
1094
('1dir', '1dir', 'directory'),
1095
('2file', '2file', 'file'),
1098
(('1dir', './1dir'),
1099
[('1dir/0file', '0file', 'file'),
1100
('1dir/1dir', '1dir', 'directory'),
1103
(('1dir/1dir', './1dir/1dir'),
1109
found_bzrdir = False
1110
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1111
if len(dirblock) and dirblock[0][1] == '.bzr':
1112
# this tests the filtering of selected paths
1115
result.append((dirdetail, dirblock))
1117
self.assertTrue(found_bzrdir)
1118
self.assertExpectedBlocks(expected_dirblocks, result)
1120
# you can search a subdir only, with a supplied prefix.
1122
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1123
result.append(dirblock)
1124
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1126
def _filter_out_stat(self, result):
1127
"""Filter out the stat value from the walkdirs result"""
1128
for dirdetail, dirblock in result:
1130
for info in dirblock:
1131
# Ignore info[3] which is the stat
1132
new_dirblock.append((info[0], info[1], info[2], info[4]))
1133
dirblock[:] = new_dirblock
1135
def _save_platform_info(self):
1136
cur_winver = win32utils.winver
1137
cur_fs_enc = osutils._fs_enc
1138
cur_dir_reader = osutils._selected_dir_reader
1140
win32utils.winver = cur_winver
1141
osutils._fs_enc = cur_fs_enc
1142
osutils._selected_dir_reader = cur_dir_reader
1143
self.addCleanup(restore)
1145
def assertDirReaderIs(self, expected):
1146
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1147
# Force it to redetect
1148
osutils._selected_dir_reader = None
1149
# Nothing to list, but should still trigger the selection logic
1150
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1151
self.assertIsInstance(osutils._selected_dir_reader, expected)
1153
def test_force_walkdirs_utf8_fs_utf8(self):
1154
self.requireFeature(UTF8DirReaderFeature)
1155
self._save_platform_info()
1156
win32utils.winver = None # Avoid the win32 detection code
1157
osutils._fs_enc = 'UTF-8'
1158
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1160
def test_force_walkdirs_utf8_fs_ascii(self):
1161
self.requireFeature(UTF8DirReaderFeature)
1162
self._save_platform_info()
1163
win32utils.winver = None # Avoid the win32 detection code
1164
osutils._fs_enc = 'US-ASCII'
1165
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1167
def test_force_walkdirs_utf8_fs_ANSI(self):
1168
self.requireFeature(UTF8DirReaderFeature)
1169
self._save_platform_info()
1170
win32utils.winver = None # Avoid the win32 detection code
1171
osutils._fs_enc = 'ANSI_X3.4-1968'
1172
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1174
def test_force_walkdirs_utf8_fs_latin1(self):
1175
self._save_platform_info()
1176
win32utils.winver = None # Avoid the win32 detection code
1177
osutils._fs_enc = 'latin1'
1178
self.assertDirReaderIs(osutils.UnicodeDirReader)
1180
def test_force_walkdirs_utf8_nt(self):
1181
# Disabled because the thunk of the whole walkdirs api is disabled.
1182
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1183
self._save_platform_info()
1184
win32utils.winver = 'Windows NT'
1185
from bzrlib._walkdirs_win32 import Win32ReadDir
1186
self.assertDirReaderIs(Win32ReadDir)
1188
def test_force_walkdirs_utf8_98(self):
1189
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1190
self._save_platform_info()
1191
win32utils.winver = 'Windows 98'
1192
self.assertDirReaderIs(osutils.UnicodeDirReader)
1194
def test_unicode_walkdirs(self):
1195
"""Walkdirs should always return unicode paths."""
1196
self.requireFeature(tests.UnicodeFilenameFeature)
1197
name0 = u'0file-\xb6'
1198
name1 = u'1dir-\u062c\u0648'
1199
name2 = u'2file-\u0633'
1203
name1 + '/' + name0,
1204
name1 + '/' + name1 + '/',
1207
self.build_tree(tree)
1208
expected_dirblocks = [
1210
[(name0, name0, 'file', './' + name0),
1211
(name1, name1, 'directory', './' + name1),
1212
(name2, name2, 'file', './' + name2),
1215
((name1, './' + name1),
1216
[(name1 + '/' + name0, name0, 'file', './' + name1
1218
(name1 + '/' + name1, name1, 'directory', './' + name1
1222
((name1 + '/' + name1, './' + name1 + '/' + name1),
1227
result = list(osutils.walkdirs('.'))
1228
self._filter_out_stat(result)
1229
self.assertEqual(expected_dirblocks, result)
1230
result = list(osutils.walkdirs(u'./'+name1, name1))
1231
self._filter_out_stat(result)
1232
self.assertEqual(expected_dirblocks[1:], result)
1234
def test_unicode__walkdirs_utf8(self):
1235
"""Walkdirs_utf8 should always return utf8 paths.
1237
The abspath portion might be in unicode or utf-8
1239
self.requireFeature(tests.UnicodeFilenameFeature)
1240
name0 = u'0file-\xb6'
1241
name1 = u'1dir-\u062c\u0648'
1242
name2 = u'2file-\u0633'
1246
name1 + '/' + name0,
1247
name1 + '/' + name1 + '/',
1250
self.build_tree(tree)
1251
name0 = name0.encode('utf8')
1252
name1 = name1.encode('utf8')
1253
name2 = name2.encode('utf8')
1255
expected_dirblocks = [
1257
[(name0, name0, 'file', './' + name0),
1258
(name1, name1, 'directory', './' + name1),
1259
(name2, name2, 'file', './' + name2),
1262
((name1, './' + name1),
1263
[(name1 + '/' + name0, name0, 'file', './' + name1
1265
(name1 + '/' + name1, name1, 'directory', './' + name1
1269
((name1 + '/' + name1, './' + name1 + '/' + name1),
1275
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1276
# all abspaths are Unicode, and encode them back into utf8.
1277
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1278
self.assertIsInstance(dirdetail[0], str)
1279
if isinstance(dirdetail[1], unicode):
1280
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1281
dirblock = [list(info) for info in dirblock]
1282
for info in dirblock:
1283
self.assertIsInstance(info[4], unicode)
1284
info[4] = info[4].encode('utf8')
1286
for info in dirblock:
1287
self.assertIsInstance(info[0], str)
1288
self.assertIsInstance(info[1], str)
1289
self.assertIsInstance(info[4], str)
1290
# Remove the stat information
1291
new_dirblock.append((info[0], info[1], info[2], info[4]))
1292
result.append((dirdetail, new_dirblock))
1293
self.assertEqual(expected_dirblocks, result)
1295
def test__walkdirs_utf8_with_unicode_fs(self):
1296
"""UnicodeDirReader should be a safe fallback everywhere
1298
The abspath portion should be in unicode
1300
self.requireFeature(tests.UnicodeFilenameFeature)
1301
# Use the unicode reader. TODO: split into driver-and-driven unit
1303
self._save_platform_info()
1304
osutils._selected_dir_reader = osutils.UnicodeDirReader()
1305
name0u = u'0file-\xb6'
1306
name1u = u'1dir-\u062c\u0648'
1307
name2u = u'2file-\u0633'
1311
name1u + '/' + name0u,
1312
name1u + '/' + name1u + '/',
1315
self.build_tree(tree)
1316
name0 = name0u.encode('utf8')
1317
name1 = name1u.encode('utf8')
1318
name2 = name2u.encode('utf8')
1320
# All of the abspaths should be in unicode, all of the relative paths
1322
expected_dirblocks = [
1324
[(name0, name0, 'file', './' + name0u),
1325
(name1, name1, 'directory', './' + name1u),
1326
(name2, name2, 'file', './' + name2u),
1329
((name1, './' + name1u),
1330
[(name1 + '/' + name0, name0, 'file', './' + name1u
1332
(name1 + '/' + name1, name1, 'directory', './' + name1u
1336
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1341
result = list(osutils._walkdirs_utf8('.'))
1342
self._filter_out_stat(result)
1343
self.assertEqual(expected_dirblocks, result)
1345
def test__walkdirs_utf8_win32readdir(self):
1346
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1347
self.requireFeature(tests.UnicodeFilenameFeature)
1348
from bzrlib._walkdirs_win32 import Win32ReadDir
1349
self._save_platform_info()
1350
osutils._selected_dir_reader = Win32ReadDir()
1351
name0u = u'0file-\xb6'
1352
name1u = u'1dir-\u062c\u0648'
1353
name2u = u'2file-\u0633'
1357
name1u + '/' + name0u,
1358
name1u + '/' + name1u + '/',
1361
self.build_tree(tree)
1362
name0 = name0u.encode('utf8')
1363
name1 = name1u.encode('utf8')
1364
name2 = name2u.encode('utf8')
1366
# All of the abspaths should be in unicode, all of the relative paths
1368
expected_dirblocks = [
1370
[(name0, name0, 'file', './' + name0u),
1371
(name1, name1, 'directory', './' + name1u),
1372
(name2, name2, 'file', './' + name2u),
1375
((name1, './' + name1u),
1376
[(name1 + '/' + name0, name0, 'file', './' + name1u
1378
(name1 + '/' + name1, name1, 'directory', './' + name1u
1382
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1387
result = list(osutils._walkdirs_utf8(u'.'))
1388
self._filter_out_stat(result)
1389
self.assertEqual(expected_dirblocks, result)
1391
def assertStatIsCorrect(self, path, win32stat):
1392
os_stat = os.stat(path)
1393
self.assertEqual(os_stat.st_size, win32stat.st_size)
1394
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1395
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1396
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1397
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1398
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1399
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1401
def test__walkdirs_utf_win32_find_file_stat_file(self):
1402
"""make sure our Stat values are valid"""
1403
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1404
self.requireFeature(tests.UnicodeFilenameFeature)
1405
from bzrlib._walkdirs_win32 import Win32ReadDir
1406
name0u = u'0file-\xb6'
1407
name0 = name0u.encode('utf8')
1408
self.build_tree([name0u])
1409
# I hate to sleep() here, but I'm trying to make the ctime different
1412
f = open(name0u, 'ab')
1414
f.write('just a small update')
1418
result = Win32ReadDir().read_dir('', u'.')
1420
self.assertEqual((name0, name0, 'file'), entry[:3])
1421
self.assertEqual(u'./' + name0u, entry[4])
1422
self.assertStatIsCorrect(entry[4], entry[3])
1423
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1425
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1426
"""make sure our Stat values are valid"""
1427
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1428
self.requireFeature(tests.UnicodeFilenameFeature)
1429
from bzrlib._walkdirs_win32 import Win32ReadDir
1430
name0u = u'0dir-\u062c\u0648'
1431
name0 = name0u.encode('utf8')
1432
self.build_tree([name0u + '/'])
1434
result = Win32ReadDir().read_dir('', u'.')
1436
self.assertEqual((name0, name0, 'directory'), entry[:3])
1437
self.assertEqual(u'./' + name0u, entry[4])
1438
self.assertStatIsCorrect(entry[4], entry[3])
1440
def assertPathCompare(self, path_less, path_greater):
1441
"""check that path_less and path_greater compare correctly."""
1442
self.assertEqual(0, osutils.compare_paths_prefix_order(
1443
path_less, path_less))
1444
self.assertEqual(0, osutils.compare_paths_prefix_order(
1445
path_greater, path_greater))
1446
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1447
path_less, path_greater))
1448
self.assertEqual(1, osutils.compare_paths_prefix_order(
1449
path_greater, path_less))
1451
def test_compare_paths_prefix_order(self):
1452
# root before all else
1453
self.assertPathCompare("/", "/a")
1454
# alpha within a dir
1455
self.assertPathCompare("/a", "/b")
1456
self.assertPathCompare("/b", "/z")
1457
# high dirs before lower.
1458
self.assertPathCompare("/z", "/a/a")
1459
# except if the deeper dir should be output first
1460
self.assertPathCompare("/a/b/c", "/d/g")
1461
# lexical betwen dirs of the same height
1462
self.assertPathCompare("/a/z", "/z/z")
1463
self.assertPathCompare("/a/c/z", "/a/d/e")
1465
# this should also be consistent for no leading / paths
1466
# root before all else
1467
self.assertPathCompare("", "a")
1468
# alpha within a dir
1469
self.assertPathCompare("a", "b")
1470
self.assertPathCompare("b", "z")
1471
# high dirs before lower.
1472
self.assertPathCompare("z", "a/a")
1473
# except if the deeper dir should be output first
1474
self.assertPathCompare("a/b/c", "d/g")
1475
# lexical betwen dirs of the same height
1476
self.assertPathCompare("a/z", "z/z")
1477
self.assertPathCompare("a/c/z", "a/d/e")
1479
def test_path_prefix_sorting(self):
1480
"""Doing a sort on path prefix should match our sample data."""
1495
dir_sorted_paths = [
1511
sorted(original_paths, key=osutils.path_prefix_key))
1512
# using the comparison routine shoudl work too:
1515
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1518
class TestCopyTree(tests.TestCaseInTempDir):
1520
def test_copy_basic_tree(self):
1521
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1522
osutils.copy_tree('source', 'target')
1523
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1524
self.assertEqual(['c'], os.listdir('target/b'))
1526
def test_copy_tree_target_exists(self):
1527
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1529
osutils.copy_tree('source', 'target')
1530
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1531
self.assertEqual(['c'], os.listdir('target/b'))
1533
def test_copy_tree_symlinks(self):
1534
self.requireFeature(tests.SymlinkFeature)
1535
self.build_tree(['source/'])
1536
os.symlink('a/generic/path', 'source/lnk')
1537
osutils.copy_tree('source', 'target')
1538
self.assertEqual(['lnk'], os.listdir('target'))
1539
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1541
def test_copy_tree_handlers(self):
1542
processed_files = []
1543
processed_links = []
1544
def file_handler(from_path, to_path):
1545
processed_files.append(('f', from_path, to_path))
1546
def dir_handler(from_path, to_path):
1547
processed_files.append(('d', from_path, to_path))
1548
def link_handler(from_path, to_path):
1549
processed_links.append((from_path, to_path))
1550
handlers = {'file':file_handler,
1551
'directory':dir_handler,
1552
'symlink':link_handler,
1555
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1556
if osutils.has_symlinks():
1557
os.symlink('a/generic/path', 'source/lnk')
1558
osutils.copy_tree('source', 'target', handlers=handlers)
1560
self.assertEqual([('d', 'source', 'target'),
1561
('f', 'source/a', 'target/a'),
1562
('d', 'source/b', 'target/b'),
1563
('f', 'source/b/c', 'target/b/c'),
1565
self.failIfExists('target')
1566
if osutils.has_symlinks():
1567
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1570
class TestSetUnsetEnv(tests.TestCase):
1571
"""Test updating the environment"""
1574
super(TestSetUnsetEnv, self).setUp()
1576
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1577
'Environment was not cleaned up properly.'
1578
' Variable BZR_TEST_ENV_VAR should not exist.')
1580
if 'BZR_TEST_ENV_VAR' in os.environ:
1581
del os.environ['BZR_TEST_ENV_VAR']
1583
self.addCleanup(cleanup)
1586
"""Test that we can set an env variable"""
1587
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1588
self.assertEqual(None, old)
1589
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1591
def test_double_set(self):
1592
"""Test that we get the old value out"""
1593
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1594
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1595
self.assertEqual('foo', old)
1596
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1598
def test_unicode(self):
1599
"""Environment can only contain plain strings
1601
So Unicode strings must be encoded.
1603
uni_val, env_val = tests.probe_unicode_in_user_encoding()
1605
raise tests.TestSkipped(
1606
'Cannot find a unicode character that works in encoding %s'
1607
% (osutils.get_user_encoding(),))
1609
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1610
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1612
def test_unset(self):
1613
"""Test that passing None will remove the env var"""
1614
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1615
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1616
self.assertEqual('foo', old)
1617
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1618
self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1621
class TestSizeShaFile(tests.TestCaseInTempDir):
1623
def test_sha_empty(self):
1624
self.build_tree_contents([('foo', '')])
1625
expected_sha = osutils.sha_string('')
1627
self.addCleanup(f.close)
1628
size, sha = osutils.size_sha_file(f)
1629
self.assertEqual(0, size)
1630
self.assertEqual(expected_sha, sha)
1632
def test_sha_mixed_endings(self):
1633
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1634
self.build_tree_contents([('foo', text)])
1635
expected_sha = osutils.sha_string(text)
1636
f = open('foo', 'rb')
1637
self.addCleanup(f.close)
1638
size, sha = osutils.size_sha_file(f)
1639
self.assertEqual(38, size)
1640
self.assertEqual(expected_sha, sha)
1643
class TestShaFileByName(tests.TestCaseInTempDir):
1645
def test_sha_empty(self):
1646
self.build_tree_contents([('foo', '')])
1647
expected_sha = osutils.sha_string('')
1648
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1650
def test_sha_mixed_endings(self):
1651
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1652
self.build_tree_contents([('foo', text)])
1653
expected_sha = osutils.sha_string(text)
1654
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1657
class TestResourceLoading(tests.TestCaseInTempDir):
1659
def test_resource_string(self):
1660
# test resource in bzrlib
1661
text = osutils.resource_string('bzrlib', 'debug.py')
1662
self.assertContainsRe(text, "debug_flags = set()")
1663
# test resource under bzrlib
1664
text = osutils.resource_string('bzrlib.ui', 'text.py')
1665
self.assertContainsRe(text, "class TextUIFactory")
1666
# test unsupported package
1667
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1669
# test unknown resource
1670
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1673
class TestReCompile(tests.TestCase):
1675
def test_re_compile_checked(self):
1676
r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1677
self.assertTrue(r.match('aaaa'))
1678
self.assertTrue(r.match('aAaA'))
1680
def test_re_compile_checked_error(self):
1681
# like https://bugs.launchpad.net/bzr/+bug/251352
1682
err = self.assertRaises(
1683
errors.BzrCommandError,
1684
osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1686
"Invalid regular expression in test case: '*': "
1687
"nothing to repeat",
1691
class TestDirReader(tests.TestCaseInTempDir):
1694
_dir_reader_class = None
1695
_native_to_unicode = None
1698
tests.TestCaseInTempDir.setUp(self)
1700
# Save platform specific info and reset it
1701
cur_dir_reader = osutils._selected_dir_reader
1704
osutils._selected_dir_reader = cur_dir_reader
1705
self.addCleanup(restore)
1707
osutils._selected_dir_reader = self._dir_reader_class()
1709
def _get_ascii_tree(self):
1717
expected_dirblocks = [
1719
[('0file', '0file', 'file'),
1720
('1dir', '1dir', 'directory'),
1721
('2file', '2file', 'file'),
1724
(('1dir', './1dir'),
1725
[('1dir/0file', '0file', 'file'),
1726
('1dir/1dir', '1dir', 'directory'),
1729
(('1dir/1dir', './1dir/1dir'),
1734
return tree, expected_dirblocks
1736
def test_walk_cur_dir(self):
1737
tree, expected_dirblocks = self._get_ascii_tree()
1738
self.build_tree(tree)
1739
result = list(osutils._walkdirs_utf8('.'))
1740
# Filter out stat and abspath
1741
self.assertEqual(expected_dirblocks,
1742
[(dirinfo, [line[0:3] for line in block])
1743
for dirinfo, block in result])
1745
def test_walk_sub_dir(self):
1746
tree, expected_dirblocks = self._get_ascii_tree()
1747
self.build_tree(tree)
1748
# you can search a subdir only, with a supplied prefix.
1749
result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1750
# Filter out stat and abspath
1751
self.assertEqual(expected_dirblocks[1:],
1752
[(dirinfo, [line[0:3] for line in block])
1753
for dirinfo, block in result])
1755
def _get_unicode_tree(self):
1756
name0u = u'0file-\xb6'
1757
name1u = u'1dir-\u062c\u0648'
1758
name2u = u'2file-\u0633'
1762
name1u + '/' + name0u,
1763
name1u + '/' + name1u + '/',
1766
name0 = name0u.encode('UTF-8')
1767
name1 = name1u.encode('UTF-8')
1768
name2 = name2u.encode('UTF-8')
1769
expected_dirblocks = [
1771
[(name0, name0, 'file', './' + name0u),
1772
(name1, name1, 'directory', './' + name1u),
1773
(name2, name2, 'file', './' + name2u),
1776
((name1, './' + name1u),
1777
[(name1 + '/' + name0, name0, 'file', './' + name1u
1779
(name1 + '/' + name1, name1, 'directory', './' + name1u
1783
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1788
return tree, expected_dirblocks
1790
def _filter_out(self, raw_dirblocks):
1791
"""Filter out a walkdirs_utf8 result.
1793
stat field is removed, all native paths are converted to unicode
1795
filtered_dirblocks = []
1796
for dirinfo, block in raw_dirblocks:
1797
dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1800
details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1801
filtered_dirblocks.append((dirinfo, details))
1802
return filtered_dirblocks
1804
def test_walk_unicode_tree(self):
1805
self.requireFeature(tests.UnicodeFilenameFeature)
1806
tree, expected_dirblocks = self._get_unicode_tree()
1807
self.build_tree(tree)
1808
result = list(osutils._walkdirs_utf8('.'))
1809
self.assertEqual(expected_dirblocks, self._filter_out(result))
1811
def test_symlink(self):
1812
self.requireFeature(tests.SymlinkFeature)
1813
self.requireFeature(tests.UnicodeFilenameFeature)
1814
target = u'target\N{Euro Sign}'
1815
link_name = u'l\N{Euro Sign}nk'
1816
os.symlink(target, link_name)
1817
target_utf8 = target.encode('UTF-8')
1818
link_name_utf8 = link_name.encode('UTF-8')
1819
expected_dirblocks = [
1821
[(link_name_utf8, link_name_utf8,
1822
'symlink', './' + link_name),],
1824
result = list(osutils._walkdirs_utf8('.'))
1825
self.assertEqual(expected_dirblocks, self._filter_out(result))
1828
class TestReadLink(tests.TestCaseInTempDir):
1829
"""Exposes os.readlink() problems and the osutils solution.
1831
The only guarantee offered by os.readlink(), starting with 2.6, is that a
1832
unicode string will be returned if a unicode string is passed.
1834
But prior python versions failed to properly encode the passed unicode
1837
_test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1840
super(tests.TestCaseInTempDir, self).setUp()
1841
self.link = u'l\N{Euro Sign}ink'
1842
self.target = u'targe\N{Euro Sign}t'
1843
os.symlink(self.target, self.link)
1845
def test_os_readlink_link_encoding(self):
1846
if sys.version_info < (2, 6):
1847
self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1849
self.assertEquals(self.target, os.readlink(self.link))
1851
def test_os_readlink_link_decoding(self):
1852
self.assertEquals(self.target.encode(osutils._fs_enc),
1853
os.readlink(self.link.encode(osutils._fs_enc)))
1856
class TestConcurrency(tests.TestCase):
1859
super(TestConcurrency, self).setUp()
1860
orig = osutils._cached_local_concurrency
1862
osutils._cached_local_concurrency = orig
1863
self.addCleanup(restore)
1865
def test_local_concurrency(self):
1866
concurrency = osutils.local_concurrency()
1867
self.assertIsInstance(concurrency, int)
1869
def test_local_concurrency_environment_variable(self):
1870
os.environ['BZR_CONCURRENCY'] = '2'
1871
self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1872
os.environ['BZR_CONCURRENCY'] = '3'
1873
self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1874
os.environ['BZR_CONCURRENCY'] = 'foo'
1875
self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1877
def test_option_concurrency(self):
1878
os.environ['BZR_CONCURRENCY'] = '1'
1879
self.run_bzr('rocks --concurrency 42')
1880
# Command line overrides envrionment variable
1881
self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1882
self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1885
class TestFailedToLoadExtension(tests.TestCase):
1887
def _try_loading(self):
1889
import bzrlib._fictional_extension_py
1890
except ImportError, e:
1891
osutils.failed_to_load_extension(e)
1895
super(TestFailedToLoadExtension, self).setUp()
1896
self.saved_failures = osutils._extension_load_failures[:]
1897
del osutils._extension_load_failures[:]
1898
self.addCleanup(self.restore_failures)
1900
def restore_failures(self):
1901
osutils._extension_load_failures = self.saved_failures
1903
def test_failure_to_load(self):
1905
self.assertLength(1, osutils._extension_load_failures)
1906
self.assertEquals(osutils._extension_load_failures[0],
1907
"No module named _fictional_extension_py")
1909
def test_report_extension_load_failures_no_warning(self):
1910
self.assertTrue(self._try_loading())
1911
warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1912
# it used to give a Python warning; it no longer does
1913
self.assertLength(0, warnings)
1915
def test_report_extension_load_failures_message(self):
1917
trace.push_log_file(log)
1918
self.assertTrue(self._try_loading())
1919
osutils.report_extension_load_failures()
1920
self.assertContainsRe(
1922
r"bzr: warning: some compiled extensions could not be loaded; "
1923
"see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"