1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the osutils wrapper."""
19
from cStringIO import StringIO
35
from bzrlib.tests import (
41
class _UTF8DirReaderFeature(tests.Feature):
45
from bzrlib import _readdir_pyx
46
self.reader = _readdir_pyx.UTF8DirReader
51
def feature_name(self):
52
return 'bzrlib._readdir_pyx'
54
UTF8DirReaderFeature = _UTF8DirReaderFeature()
57
def _already_unicode(s):
61
def _fs_enc_to_unicode(s):
62
return s.decode(osutils._fs_enc)
65
def _utf8_to_unicode(s):
66
return s.decode('UTF-8')
69
def dir_reader_scenarios():
70
# For each dir reader we define:
72
# - native_to_unicode: a function converting the native_abspath as returned
73
# by DirReader.read_dir to its unicode representation
75
# UnicodeDirReader is the fallback, it should be tested on all platforms.
76
scenarios = [('unicode',
77
dict(_dir_reader_class=osutils.UnicodeDirReader,
78
_native_to_unicode=_already_unicode))]
79
# Some DirReaders are platform specific and even there they may not be
81
if UTF8DirReaderFeature.available():
82
from bzrlib import _readdir_pyx
83
scenarios.append(('utf8',
84
dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
85
_native_to_unicode=_utf8_to_unicode)))
87
if test__walkdirs_win32.Win32ReadDirFeature.available():
89
from bzrlib import _walkdirs_win32
90
# TODO: check on windows, it may be that we need to use/add
91
# safe_unicode instead of _fs_enc_to_unicode
94
dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
95
_native_to_unicode=_fs_enc_to_unicode)))
101
def load_tests(basic_tests, module, loader):
102
suite = loader.suiteClass()
103
dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
104
basic_tests, tests.condition_isinstance(TestDirReader))
105
tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
106
suite.addTest(remaining_tests)
110
class TestContainsWhitespace(tests.TestCase):
112
def test_contains_whitespace(self):
113
self.failUnless(osutils.contains_whitespace(u' '))
114
self.failUnless(osutils.contains_whitespace(u'hello there'))
115
self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
116
self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
117
self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
118
self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
120
# \xa0 is "Non-breaking-space" which on some python locales thinks it
121
# is whitespace, but we do not.
122
self.failIf(osutils.contains_whitespace(u''))
123
self.failIf(osutils.contains_whitespace(u'hellothere'))
124
self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
127
class TestRename(tests.TestCaseInTempDir):
129
def test_fancy_rename(self):
130
# This should work everywhere
132
osutils.fancy_rename(a, b,
133
rename_func=os.rename,
134
unlink_func=os.unlink)
136
open('a', 'wb').write('something in a\n')
138
self.failIfExists('a')
139
self.failUnlessExists('b')
140
self.check_file_contents('b', 'something in a\n')
142
open('a', 'wb').write('new something in a\n')
145
self.check_file_contents('a', 'something in a\n')
147
def test_rename(self):
148
# Rename should be semi-atomic on all platforms
149
open('a', 'wb').write('something in a\n')
150
osutils.rename('a', 'b')
151
self.failIfExists('a')
152
self.failUnlessExists('b')
153
self.check_file_contents('b', 'something in a\n')
155
open('a', 'wb').write('new something in a\n')
156
osutils.rename('b', 'a')
158
self.check_file_contents('a', 'something in a\n')
160
# TODO: test fancy_rename using a MemoryTransport
162
def test_rename_change_case(self):
163
# on Windows we should be able to change filename case by rename
164
self.build_tree(['a', 'b/'])
165
osutils.rename('a', 'A')
166
osutils.rename('b', 'B')
167
# we can't use failUnlessExists on case-insensitive filesystem
168
# so try to check shape of the tree
169
shape = sorted(os.listdir('.'))
170
self.assertEquals(['A', 'B'], shape)
173
class TestRandChars(tests.TestCase):
175
def test_01_rand_chars_empty(self):
176
result = osutils.rand_chars(0)
177
self.assertEqual(result, '')
179
def test_02_rand_chars_100(self):
180
result = osutils.rand_chars(100)
181
self.assertEqual(len(result), 100)
182
self.assertEqual(type(result), str)
183
self.assertContainsRe(result, r'^[a-z0-9]{100}$')
186
class TestIsInside(tests.TestCase):
188
def test_is_inside(self):
189
is_inside = osutils.is_inside
190
self.assertTrue(is_inside('src', 'src/foo.c'))
191
self.assertFalse(is_inside('src', 'srccontrol'))
192
self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
193
self.assertTrue(is_inside('foo.c', 'foo.c'))
194
self.assertFalse(is_inside('foo.c', ''))
195
self.assertTrue(is_inside('', 'foo.c'))
197
def test_is_inside_any(self):
198
SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
199
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
200
(['src'], SRC_FOO_C),
203
self.assert_(osutils.is_inside_any(dirs, fn))
204
for dirs, fn in [(['src'], 'srccontrol'),
205
(['src'], 'srccontrol/foo')]:
206
self.assertFalse(osutils.is_inside_any(dirs, fn))
208
def test_is_inside_or_parent_of_any(self):
209
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
210
(['src'], 'src/foo.c'),
211
(['src/bar.c'], 'src'),
212
(['src/bar.c', 'bla/foo.c'], 'src'),
215
self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
217
for dirs, fn in [(['src'], 'srccontrol'),
218
(['srccontrol/foo.c'], 'src'),
219
(['src'], 'srccontrol/foo')]:
220
self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
223
class TestRmTree(tests.TestCaseInTempDir):
225
def test_rmtree(self):
226
# Check to remove tree with read-only files/dirs
228
f = file('dir/file', 'w')
231
# would like to also try making the directory readonly, but at the
232
# moment python shutil.rmtree doesn't handle that properly - it would
233
# need to chmod the directory before removing things inside it - deferred
234
# for now -- mbp 20060505
235
# osutils.make_readonly('dir')
236
osutils.make_readonly('dir/file')
238
osutils.rmtree('dir')
240
self.failIfExists('dir/file')
241
self.failIfExists('dir')
244
class TestDeleteAny(tests.TestCaseInTempDir):
246
def test_delete_any_readonly(self):
247
# from <https://bugs.launchpad.net/bzr/+bug/218206>
248
self.build_tree(['d/', 'f'])
249
osutils.make_readonly('d')
250
osutils.make_readonly('f')
252
osutils.delete_any('f')
253
osutils.delete_any('d')
256
class TestKind(tests.TestCaseInTempDir):
258
def test_file_kind(self):
259
self.build_tree(['file', 'dir/'])
260
self.assertEquals('file', osutils.file_kind('file'))
261
self.assertEquals('directory', osutils.file_kind('dir/'))
262
if osutils.has_symlinks():
263
os.symlink('symlink', 'symlink')
264
self.assertEquals('symlink', osutils.file_kind('symlink'))
266
# TODO: jam 20060529 Test a block device
268
os.lstat('/dev/null')
270
if e.errno not in (errno.ENOENT,):
273
self.assertEquals('chardev', osutils.file_kind('/dev/null'))
275
mkfifo = getattr(os, 'mkfifo', None)
279
self.assertEquals('fifo', osutils.file_kind('fifo'))
283
AF_UNIX = getattr(socket, 'AF_UNIX', None)
285
s = socket.socket(AF_UNIX)
288
self.assertEquals('socket', osutils.file_kind('socket'))
292
def test_kind_marker(self):
293
self.assertEqual("", osutils.kind_marker("file"))
294
self.assertEqual("/", osutils.kind_marker('directory'))
295
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
296
self.assertEqual("@", osutils.kind_marker("symlink"))
297
self.assertEqual("+", osutils.kind_marker("tree-reference"))
298
self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
301
class TestUmask(tests.TestCaseInTempDir):
303
def test_get_umask(self):
304
if sys.platform == 'win32':
305
# umask always returns '0', no way to set it
306
self.assertEqual(0, osutils.get_umask())
309
orig_umask = osutils.get_umask()
310
self.addCleanup(os.umask, orig_umask)
312
self.assertEqual(0222, osutils.get_umask())
314
self.assertEqual(0022, osutils.get_umask())
316
self.assertEqual(0002, osutils.get_umask())
318
self.assertEqual(0027, osutils.get_umask())
321
class TestDateTime(tests.TestCase):
323
def assertFormatedDelta(self, expected, seconds):
324
"""Assert osutils.format_delta formats as expected"""
325
actual = osutils.format_delta(seconds)
326
self.assertEqual(expected, actual)
328
def test_format_delta(self):
329
self.assertFormatedDelta('0 seconds ago', 0)
330
self.assertFormatedDelta('1 second ago', 1)
331
self.assertFormatedDelta('10 seconds ago', 10)
332
self.assertFormatedDelta('59 seconds ago', 59)
333
self.assertFormatedDelta('89 seconds ago', 89)
334
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
335
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
336
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
337
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
338
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
339
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
340
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
341
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
342
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
343
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
344
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
345
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
346
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
347
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
348
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
349
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
350
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
351
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
353
# We handle when time steps the wrong direction because computers
354
# don't have synchronized clocks.
355
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
356
self.assertFormatedDelta('1 second in the future', -1)
357
self.assertFormatedDelta('2 seconds in the future', -2)
359
def test_format_date(self):
360
self.assertRaises(errors.UnsupportedTimezoneFormat,
361
osutils.format_date, 0, timezone='foo')
362
self.assertIsInstance(osutils.format_date(0), str)
363
self.assertIsInstance(osutils.format_local_date(0), unicode)
364
# Testing for the actual value of the local weekday without
365
# duplicating the code from format_date is difficult.
366
# Instead blackbox.test_locale should check for localized
367
# dates once they do occur in output strings.
369
def test_local_time_offset(self):
370
"""Test that local_time_offset() returns a sane value."""
371
offset = osutils.local_time_offset()
372
self.assertTrue(isinstance(offset, int))
373
# Test that the offset is no more than a eighteen hours in
375
# Time zone handling is system specific, so it is difficult to
376
# do more specific tests, but a value outside of this range is
378
eighteen_hours = 18 * 3600
379
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
381
def test_local_time_offset_with_timestamp(self):
382
"""Test that local_time_offset() works with a timestamp."""
383
offset = osutils.local_time_offset(1000000000.1234567)
384
self.assertTrue(isinstance(offset, int))
385
eighteen_hours = 18 * 3600
386
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
389
class TestLinks(tests.TestCaseInTempDir):
391
def test_dereference_path(self):
392
self.requireFeature(tests.SymlinkFeature)
393
cwd = osutils.realpath('.')
395
bar_path = osutils.pathjoin(cwd, 'bar')
396
# Using './' to avoid bug #1213894 (first path component not
397
# dereferenced) in Python 2.4.1 and earlier
398
self.assertEqual(bar_path, osutils.realpath('./bar'))
399
os.symlink('bar', 'foo')
400
self.assertEqual(bar_path, osutils.realpath('./foo'))
402
# Does not dereference terminal symlinks
403
foo_path = osutils.pathjoin(cwd, 'foo')
404
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
406
# Dereferences parent symlinks
408
baz_path = osutils.pathjoin(bar_path, 'baz')
409
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
411
# Dereferences parent symlinks that are the first path element
412
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
414
# Dereferences parent symlinks in absolute paths
415
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
416
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
418
def test_changing_access(self):
419
f = file('file', 'w')
423
# Make a file readonly
424
osutils.make_readonly('file')
425
mode = os.lstat('file').st_mode
426
self.assertEqual(mode, mode & 0777555)
428
# Make a file writable
429
osutils.make_writable('file')
430
mode = os.lstat('file').st_mode
431
self.assertEqual(mode, mode | 0200)
433
if osutils.has_symlinks():
434
# should not error when handed a symlink
435
os.symlink('nonexistent', 'dangling')
436
osutils.make_readonly('dangling')
437
osutils.make_writable('dangling')
439
def test_host_os_dereferences_symlinks(self):
440
osutils.host_os_dereferences_symlinks()
443
class TestCanonicalRelPath(tests.TestCaseInTempDir):
445
_test_needs_features = [tests.CaseInsCasePresFilenameFeature]
447
def test_canonical_relpath_simple(self):
448
f = file('MixedCaseName', 'w')
450
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
451
real_base_dir = osutils.realpath(self.test_base_dir)
452
actual = osutils.canonical_relpath(real_base_dir, 'mixedcasename')
453
self.failUnlessEqual('work/MixedCaseName', actual)
455
def test_canonical_relpath_missing_tail(self):
456
os.mkdir('MixedCaseParent')
457
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
458
real_base_dir = osutils.realpath(self.test_base_dir)
459
actual = osutils.canonical_relpath(real_base_dir,
460
'mixedcaseparent/nochild')
461
self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
464
class TestPumpFile(tests.TestCase):
465
"""Test pumpfile method."""
468
tests.TestCase.setUp(self)
469
# create a test datablock
470
self.block_size = 512
471
pattern = '0123456789ABCDEF'
472
self.test_data = pattern * (3 * self.block_size / len(pattern))
473
self.test_data_len = len(self.test_data)
475
def test_bracket_block_size(self):
476
"""Read data in blocks with the requested read size bracketing the
478
# make sure test data is larger than max read size
479
self.assertTrue(self.test_data_len > self.block_size)
481
from_file = file_utils.FakeReadFile(self.test_data)
484
# read (max / 2) bytes and verify read size wasn't affected
485
num_bytes_to_read = self.block_size / 2
486
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
487
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
488
self.assertEqual(from_file.get_read_count(), 1)
490
# read (max) bytes and verify read size wasn't affected
491
num_bytes_to_read = self.block_size
492
from_file.reset_read_count()
493
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
494
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
495
self.assertEqual(from_file.get_read_count(), 1)
497
# read (max + 1) bytes and verify read size was limited
498
num_bytes_to_read = self.block_size + 1
499
from_file.reset_read_count()
500
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
501
self.assertEqual(from_file.get_max_read_size(), self.block_size)
502
self.assertEqual(from_file.get_read_count(), 2)
504
# finish reading the rest of the data
505
num_bytes_to_read = self.test_data_len - to_file.tell()
506
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
508
# report error if the data wasn't equal (we only report the size due
509
# to the length of the data)
510
response_data = to_file.getvalue()
511
if response_data != self.test_data:
512
message = "Data not equal. Expected %d bytes, received %d."
513
self.fail(message % (len(response_data), self.test_data_len))
515
def test_specified_size(self):
516
"""Request a transfer larger than the maximum block size and verify
517
that the maximum read doesn't exceed the block_size."""
518
# make sure test data is larger than max read size
519
self.assertTrue(self.test_data_len > self.block_size)
521
# retrieve data in blocks
522
from_file = file_utils.FakeReadFile(self.test_data)
524
osutils.pumpfile(from_file, to_file, self.test_data_len,
527
# verify read size was equal to the maximum read size
528
self.assertTrue(from_file.get_max_read_size() > 0)
529
self.assertEqual(from_file.get_max_read_size(), self.block_size)
530
self.assertEqual(from_file.get_read_count(), 3)
532
# report error if the data wasn't equal (we only report the size due
533
# to the length of the data)
534
response_data = to_file.getvalue()
535
if response_data != self.test_data:
536
message = "Data not equal. Expected %d bytes, received %d."
537
self.fail(message % (len(response_data), self.test_data_len))
539
def test_to_eof(self):
540
"""Read to end-of-file and verify that the reads are not larger than
541
the maximum read size."""
542
# make sure test data is larger than max read size
543
self.assertTrue(self.test_data_len > self.block_size)
545
# retrieve data to EOF
546
from_file = file_utils.FakeReadFile(self.test_data)
548
osutils.pumpfile(from_file, to_file, -1, self.block_size)
550
# verify read size was equal to the maximum read size
551
self.assertEqual(from_file.get_max_read_size(), self.block_size)
552
self.assertEqual(from_file.get_read_count(), 4)
554
# report error if the data wasn't equal (we only report the size due
555
# to the length of the data)
556
response_data = to_file.getvalue()
557
if response_data != self.test_data:
558
message = "Data not equal. Expected %d bytes, received %d."
559
self.fail(message % (len(response_data), self.test_data_len))
561
def test_defaults(self):
562
"""Verifies that the default arguments will read to EOF -- this
563
test verifies that any existing usages of pumpfile will not be broken
564
with this new version."""
565
# retrieve data using default (old) pumpfile method
566
from_file = file_utils.FakeReadFile(self.test_data)
568
osutils.pumpfile(from_file, to_file)
570
# report error if the data wasn't equal (we only report the size due
571
# to the length of the data)
572
response_data = to_file.getvalue()
573
if response_data != self.test_data:
574
message = "Data not equal. Expected %d bytes, received %d."
575
self.fail(message % (len(response_data), self.test_data_len))
577
def test_report_activity(self):
579
def log_activity(length, direction):
580
activity.append((length, direction))
581
from_file = StringIO(self.test_data)
583
osutils.pumpfile(from_file, to_file, buff_size=500,
584
report_activity=log_activity, direction='read')
585
self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
586
(36, 'read')], activity)
588
from_file = StringIO(self.test_data)
591
osutils.pumpfile(from_file, to_file, buff_size=500,
592
report_activity=log_activity, direction='write')
593
self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
594
(36, 'write')], activity)
596
# And with a limited amount of data
597
from_file = StringIO(self.test_data)
600
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
601
report_activity=log_activity, direction='read')
602
self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
606
class TestPumpStringFile(tests.TestCase):
608
def test_empty(self):
610
osutils.pump_string_file("", output)
611
self.assertEqual("", output.getvalue())
613
def test_more_than_segment_size(self):
615
osutils.pump_string_file("123456789", output, 2)
616
self.assertEqual("123456789", output.getvalue())
618
def test_segment_size(self):
620
osutils.pump_string_file("12", output, 2)
621
self.assertEqual("12", output.getvalue())
623
def test_segment_size_multiple(self):
625
osutils.pump_string_file("1234", output, 2)
626
self.assertEqual("1234", output.getvalue())
629
class TestRelpath(tests.TestCase):
631
def test_simple_relpath(self):
632
cwd = osutils.getcwd()
633
subdir = cwd + '/subdir'
634
self.assertEqual('subdir', osutils.relpath(cwd, subdir))
636
def test_deep_relpath(self):
637
cwd = osutils.getcwd()
638
subdir = cwd + '/sub/subsubdir'
639
self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
641
def test_not_relative(self):
642
self.assertRaises(errors.PathNotChild,
643
osutils.relpath, 'C:/path', 'H:/path')
644
self.assertRaises(errors.PathNotChild,
645
osutils.relpath, 'C:/', 'H:/path')
648
class TestSafeUnicode(tests.TestCase):
650
def test_from_ascii_string(self):
651
self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
653
def test_from_unicode_string_ascii_contents(self):
654
self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
656
def test_from_unicode_string_unicode_contents(self):
657
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
659
def test_from_utf8_string(self):
660
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
662
def test_bad_utf8_string(self):
663
self.assertRaises(errors.BzrBadParameterNotUnicode,
664
osutils.safe_unicode,
668
class TestSafeUtf8(tests.TestCase):
670
def test_from_ascii_string(self):
672
self.assertEqual('foobar', osutils.safe_utf8(f))
674
def test_from_unicode_string_ascii_contents(self):
675
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
677
def test_from_unicode_string_unicode_contents(self):
678
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
680
def test_from_utf8_string(self):
681
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
683
def test_bad_utf8_string(self):
684
self.assertRaises(errors.BzrBadParameterNotUnicode,
685
osutils.safe_utf8, '\xbb\xbb')
688
class TestSafeRevisionId(tests.TestCase):
690
def test_from_ascii_string(self):
691
# this shouldn't give a warning because it's getting an ascii string
692
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
694
def test_from_unicode_string_ascii_contents(self):
695
self.assertEqual('bargam',
696
osutils.safe_revision_id(u'bargam', warn=False))
698
def test_from_unicode_deprecated(self):
699
self.assertEqual('bargam',
700
self.callDeprecated([osutils._revision_id_warning],
701
osutils.safe_revision_id, u'bargam'))
703
def test_from_unicode_string_unicode_contents(self):
704
self.assertEqual('bargam\xc2\xae',
705
osutils.safe_revision_id(u'bargam\xae', warn=False))
707
def test_from_utf8_string(self):
708
self.assertEqual('foo\xc2\xae',
709
osutils.safe_revision_id('foo\xc2\xae'))
712
"""Currently, None is a valid revision_id"""
713
self.assertEqual(None, osutils.safe_revision_id(None))
716
class TestSafeFileId(tests.TestCase):
718
def test_from_ascii_string(self):
719
self.assertEqual('foobar', osutils.safe_file_id('foobar'))
721
def test_from_unicode_string_ascii_contents(self):
722
self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
724
def test_from_unicode_deprecated(self):
725
self.assertEqual('bargam',
726
self.callDeprecated([osutils._file_id_warning],
727
osutils.safe_file_id, u'bargam'))
729
def test_from_unicode_string_unicode_contents(self):
730
self.assertEqual('bargam\xc2\xae',
731
osutils.safe_file_id(u'bargam\xae', warn=False))
733
def test_from_utf8_string(self):
734
self.assertEqual('foo\xc2\xae',
735
osutils.safe_file_id('foo\xc2\xae'))
738
"""Currently, None is a valid revision_id"""
739
self.assertEqual(None, osutils.safe_file_id(None))
742
class TestWin32Funcs(tests.TestCase):
743
"""Test that _win32 versions of os utilities return appropriate paths."""
745
def test_abspath(self):
746
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
747
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
748
self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
749
self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
751
def test_realpath(self):
752
self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
753
self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
755
def test_pathjoin(self):
756
self.assertEqual('path/to/foo',
757
osutils._win32_pathjoin('path', 'to', 'foo'))
758
self.assertEqual('C:/foo',
759
osutils._win32_pathjoin('path\\to', 'C:\\foo'))
760
self.assertEqual('C:/foo',
761
osutils._win32_pathjoin('path/to', 'C:/foo'))
762
self.assertEqual('path/to/foo',
763
osutils._win32_pathjoin('path/to/', 'foo'))
764
self.assertEqual('/foo',
765
osutils._win32_pathjoin('C:/path/to/', '/foo'))
766
self.assertEqual('/foo',
767
osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
769
def test_normpath(self):
770
self.assertEqual('path/to/foo',
771
osutils._win32_normpath(r'path\\from\..\to\.\foo'))
772
self.assertEqual('path/to/foo',
773
osutils._win32_normpath('path//from/../to/./foo'))
775
def test_getcwd(self):
776
cwd = osutils._win32_getcwd()
777
os_cwd = os.getcwdu()
778
self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
779
# win32 is inconsistent whether it returns lower or upper case
780
# and even if it was consistent the user might type the other
781
# so we force it to uppercase
782
# running python.exe under cmd.exe return capital C:\\
783
# running win32 python inside a cygwin shell returns lowercase
784
self.assertEqual(os_cwd[0].upper(), cwd[0])
786
def test_fixdrive(self):
787
self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
788
self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
789
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
791
def test_win98_abspath(self):
793
self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
794
self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
796
self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
797
self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
799
cwd = osutils.getcwd().rstrip('/')
800
drive = osutils._nt_splitdrive(cwd)[0]
801
self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
802
self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
805
self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
808
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
809
"""Test win32 functions that create files."""
811
def test_getcwd(self):
812
self.requireFeature(tests.UnicodeFilenameFeature)
815
# TODO: jam 20060427 This will probably fail on Mac OSX because
816
# it will change the normalization of B\xe5gfors
817
# Consider using a different unicode character, or make
818
# osutils.getcwd() renormalize the path.
819
self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
821
def test_minimum_path_selection(self):
822
self.assertEqual(set(),
823
osutils.minimum_path_selection([]))
824
self.assertEqual(set(['a']),
825
osutils.minimum_path_selection(['a']))
826
self.assertEqual(set(['a', 'b']),
827
osutils.minimum_path_selection(['a', 'b']))
828
self.assertEqual(set(['a/', 'b']),
829
osutils.minimum_path_selection(['a/', 'b']))
830
self.assertEqual(set(['a/', 'b']),
831
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
832
self.assertEqual(set(['a-b', 'a', 'a0b']),
833
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
835
def test_mkdtemp(self):
836
tmpdir = osutils._win32_mkdtemp(dir='.')
837
self.assertFalse('\\' in tmpdir)
839
def test_rename(self):
847
osutils._win32_rename('b', 'a')
848
self.failUnlessExists('a')
849
self.failIfExists('b')
850
self.assertFileEqual('baz\n', 'a')
852
def test_rename_missing_file(self):
858
osutils._win32_rename('b', 'a')
859
except (IOError, OSError), e:
860
self.assertEqual(errno.ENOENT, e.errno)
861
self.assertFileEqual('foo\n', 'a')
863
def test_rename_missing_dir(self):
866
osutils._win32_rename('b', 'a')
867
except (IOError, OSError), e:
868
self.assertEqual(errno.ENOENT, e.errno)
870
def test_rename_current_dir(self):
873
# You can't rename the working directory
874
# doing rename non-existant . usually
875
# just raises ENOENT, since non-existant
878
osutils._win32_rename('b', '.')
879
except (IOError, OSError), e:
880
self.assertEqual(errno.ENOENT, e.errno)
882
def test_splitpath(self):
883
def check(expected, path):
884
self.assertEqual(expected, osutils.splitpath(path))
887
check(['a', 'b'], 'a/b')
888
check(['a', 'b'], 'a/./b')
889
check(['a', '.b'], 'a/.b')
890
check(['a', '.b'], 'a\\.b')
892
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
895
class TestParentDirectories(tests.TestCaseInTempDir):
896
"""Test osutils.parent_directories()"""
898
def test_parent_directories(self):
899
self.assertEqual([], osutils.parent_directories('a'))
900
self.assertEqual(['a'], osutils.parent_directories('a/b'))
901
self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
904
class TestMacFuncsDirs(tests.TestCaseInTempDir):
905
"""Test mac special functions that require directories."""
907
def test_getcwd(self):
908
self.requireFeature(tests.UnicodeFilenameFeature)
909
os.mkdir(u'B\xe5gfors')
910
os.chdir(u'B\xe5gfors')
911
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
913
def test_getcwd_nonnorm(self):
914
self.requireFeature(tests.UnicodeFilenameFeature)
915
# Test that _mac_getcwd() will normalize this path
916
os.mkdir(u'Ba\u030agfors')
917
os.chdir(u'Ba\u030agfors')
918
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
921
class TestChunksToLines(tests.TestCase):
923
def test_smoketest(self):
924
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
925
osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
926
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
927
osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
929
def test_osutils_binding(self):
930
from bzrlib.tests import test__chunks_to_lines
931
if test__chunks_to_lines.CompiledChunksToLinesFeature.available():
932
from bzrlib._chunks_to_lines_pyx import chunks_to_lines
934
from bzrlib._chunks_to_lines_py import chunks_to_lines
935
self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
938
class TestSplitLines(tests.TestCase):
940
def test_split_unicode(self):
941
self.assertEqual([u'foo\n', u'bar\xae'],
942
osutils.split_lines(u'foo\nbar\xae'))
943
self.assertEqual([u'foo\n', u'bar\xae\n'],
944
osutils.split_lines(u'foo\nbar\xae\n'))
946
def test_split_with_carriage_returns(self):
947
self.assertEqual(['foo\rbar\n'],
948
osutils.split_lines('foo\rbar\n'))
951
class TestWalkDirs(tests.TestCaseInTempDir):
953
def assertExpectedBlocks(self, expected, result):
954
self.assertEqual(expected,
955
[(dirinfo, [line[0:3] for line in block])
956
for dirinfo, block in result])
958
def test_walkdirs(self):
967
self.build_tree(tree)
968
expected_dirblocks = [
970
[('0file', '0file', 'file'),
971
('1dir', '1dir', 'directory'),
972
('2file', '2file', 'file'),
976
[('1dir/0file', '0file', 'file'),
977
('1dir/1dir', '1dir', 'directory'),
980
(('1dir/1dir', './1dir/1dir'),
987
for dirdetail, dirblock in osutils.walkdirs('.'):
988
if len(dirblock) and dirblock[0][1] == '.bzr':
989
# this tests the filtering of selected paths
992
result.append((dirdetail, dirblock))
994
self.assertTrue(found_bzrdir)
995
self.assertExpectedBlocks(expected_dirblocks, result)
996
# you can search a subdir only, with a supplied prefix.
998
for dirblock in osutils.walkdirs('./1dir', '1dir'):
999
result.append(dirblock)
1000
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1002
def test_walkdirs_os_error(self):
1003
# <https://bugs.edge.launchpad.net/bzr/+bug/338653>
1004
# Pyrex readdir didn't raise useful messages if it had an error
1005
# reading the directory
1006
if sys.platform == 'win32':
1007
raise tests.TestNotApplicable(
1008
"readdir IOError not tested on win32")
1009
os.mkdir("test-unreadable")
1010
os.chmod("test-unreadable", 0000)
1011
# must chmod it back so that it can be removed
1012
self.addCleanup(os.chmod, "test-unreadable", 0700)
1013
# The error is not raised until the generator is actually evaluated.
1014
# (It would be ok if it happened earlier but at the moment it
1016
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1017
self.assertEquals('./test-unreadable', e.filename)
1018
self.assertEquals(errno.EACCES, e.errno)
1019
# Ensure the message contains the file name
1020
self.assertContainsRe(str(e), "\./test-unreadable")
1022
def test__walkdirs_utf8(self):
1031
self.build_tree(tree)
1032
expected_dirblocks = [
1034
[('0file', '0file', 'file'),
1035
('1dir', '1dir', 'directory'),
1036
('2file', '2file', 'file'),
1039
(('1dir', './1dir'),
1040
[('1dir/0file', '0file', 'file'),
1041
('1dir/1dir', '1dir', 'directory'),
1044
(('1dir/1dir', './1dir/1dir'),
1050
found_bzrdir = False
1051
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1052
if len(dirblock) and dirblock[0][1] == '.bzr':
1053
# this tests the filtering of selected paths
1056
result.append((dirdetail, dirblock))
1058
self.assertTrue(found_bzrdir)
1059
self.assertExpectedBlocks(expected_dirblocks, result)
1061
# you can search a subdir only, with a supplied prefix.
1063
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1064
result.append(dirblock)
1065
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1067
def _filter_out_stat(self, result):
1068
"""Filter out the stat value from the walkdirs result"""
1069
for dirdetail, dirblock in result:
1071
for info in dirblock:
1072
# Ignore info[3] which is the stat
1073
new_dirblock.append((info[0], info[1], info[2], info[4]))
1074
dirblock[:] = new_dirblock
1076
def _save_platform_info(self):
1077
cur_winver = win32utils.winver
1078
cur_fs_enc = osutils._fs_enc
1079
cur_dir_reader = osutils._selected_dir_reader
1081
win32utils.winver = cur_winver
1082
osutils._fs_enc = cur_fs_enc
1083
osutils._selected_dir_reader = cur_dir_reader
1084
self.addCleanup(restore)
1086
def assertDirReaderIs(self, expected):
1087
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1088
# Force it to redetect
1089
osutils._selected_dir_reader = None
1090
# Nothing to list, but should still trigger the selection logic
1091
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1092
self.assertIsInstance(osutils._selected_dir_reader, expected)
1094
def test_force_walkdirs_utf8_fs_utf8(self):
1095
self.requireFeature(UTF8DirReaderFeature)
1096
self._save_platform_info()
1097
win32utils.winver = None # Avoid the win32 detection code
1098
osutils._fs_enc = 'UTF-8'
1099
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1101
def test_force_walkdirs_utf8_fs_ascii(self):
1102
self.requireFeature(UTF8DirReaderFeature)
1103
self._save_platform_info()
1104
win32utils.winver = None # Avoid the win32 detection code
1105
osutils._fs_enc = 'US-ASCII'
1106
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1108
def test_force_walkdirs_utf8_fs_ANSI(self):
1109
self.requireFeature(UTF8DirReaderFeature)
1110
self._save_platform_info()
1111
win32utils.winver = None # Avoid the win32 detection code
1112
osutils._fs_enc = 'ANSI_X3.4-1968'
1113
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1115
def test_force_walkdirs_utf8_fs_latin1(self):
1116
self._save_platform_info()
1117
win32utils.winver = None # Avoid the win32 detection code
1118
osutils._fs_enc = 'latin1'
1119
self.assertDirReaderIs(osutils.UnicodeDirReader)
1121
def test_force_walkdirs_utf8_nt(self):
1122
# Disabled because the thunk of the whole walkdirs api is disabled.
1123
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1124
self._save_platform_info()
1125
win32utils.winver = 'Windows NT'
1126
from bzrlib._walkdirs_win32 import Win32ReadDir
1127
self.assertDirReaderIs(Win32ReadDir)
1129
def test_force_walkdirs_utf8_98(self):
1130
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1131
self._save_platform_info()
1132
win32utils.winver = 'Windows 98'
1133
self.assertDirReaderIs(osutils.UnicodeDirReader)
1135
def test_unicode_walkdirs(self):
1136
"""Walkdirs should always return unicode paths."""
1137
self.requireFeature(tests.UnicodeFilenameFeature)
1138
name0 = u'0file-\xb6'
1139
name1 = u'1dir-\u062c\u0648'
1140
name2 = u'2file-\u0633'
1144
name1 + '/' + name0,
1145
name1 + '/' + name1 + '/',
1148
self.build_tree(tree)
1149
expected_dirblocks = [
1151
[(name0, name0, 'file', './' + name0),
1152
(name1, name1, 'directory', './' + name1),
1153
(name2, name2, 'file', './' + name2),
1156
((name1, './' + name1),
1157
[(name1 + '/' + name0, name0, 'file', './' + name1
1159
(name1 + '/' + name1, name1, 'directory', './' + name1
1163
((name1 + '/' + name1, './' + name1 + '/' + name1),
1168
result = list(osutils.walkdirs('.'))
1169
self._filter_out_stat(result)
1170
self.assertEqual(expected_dirblocks, result)
1171
result = list(osutils.walkdirs(u'./'+name1, name1))
1172
self._filter_out_stat(result)
1173
self.assertEqual(expected_dirblocks[1:], result)
1175
def test_unicode__walkdirs_utf8(self):
1176
"""Walkdirs_utf8 should always return utf8 paths.
1178
The abspath portion might be in unicode or utf-8
1180
self.requireFeature(tests.UnicodeFilenameFeature)
1181
name0 = u'0file-\xb6'
1182
name1 = u'1dir-\u062c\u0648'
1183
name2 = u'2file-\u0633'
1187
name1 + '/' + name0,
1188
name1 + '/' + name1 + '/',
1191
self.build_tree(tree)
1192
name0 = name0.encode('utf8')
1193
name1 = name1.encode('utf8')
1194
name2 = name2.encode('utf8')
1196
expected_dirblocks = [
1198
[(name0, name0, 'file', './' + name0),
1199
(name1, name1, 'directory', './' + name1),
1200
(name2, name2, 'file', './' + name2),
1203
((name1, './' + name1),
1204
[(name1 + '/' + name0, name0, 'file', './' + name1
1206
(name1 + '/' + name1, name1, 'directory', './' + name1
1210
((name1 + '/' + name1, './' + name1 + '/' + name1),
1216
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1217
# all abspaths are Unicode, and encode them back into utf8.
1218
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1219
self.assertIsInstance(dirdetail[0], str)
1220
if isinstance(dirdetail[1], unicode):
1221
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1222
dirblock = [list(info) for info in dirblock]
1223
for info in dirblock:
1224
self.assertIsInstance(info[4], unicode)
1225
info[4] = info[4].encode('utf8')
1227
for info in dirblock:
1228
self.assertIsInstance(info[0], str)
1229
self.assertIsInstance(info[1], str)
1230
self.assertIsInstance(info[4], str)
1231
# Remove the stat information
1232
new_dirblock.append((info[0], info[1], info[2], info[4]))
1233
result.append((dirdetail, new_dirblock))
1234
self.assertEqual(expected_dirblocks, result)
1236
def test__walkdirs_utf8_with_unicode_fs(self):
1237
"""UnicodeDirReader should be a safe fallback everywhere
1239
The abspath portion should be in unicode
1241
self.requireFeature(tests.UnicodeFilenameFeature)
1242
# Use the unicode reader. TODO: split into driver-and-driven unit
1244
self._save_platform_info()
1245
osutils._selected_dir_reader = osutils.UnicodeDirReader()
1246
name0u = u'0file-\xb6'
1247
name1u = u'1dir-\u062c\u0648'
1248
name2u = u'2file-\u0633'
1252
name1u + '/' + name0u,
1253
name1u + '/' + name1u + '/',
1256
self.build_tree(tree)
1257
name0 = name0u.encode('utf8')
1258
name1 = name1u.encode('utf8')
1259
name2 = name2u.encode('utf8')
1261
# All of the abspaths should be in unicode, all of the relative paths
1263
expected_dirblocks = [
1265
[(name0, name0, 'file', './' + name0u),
1266
(name1, name1, 'directory', './' + name1u),
1267
(name2, name2, 'file', './' + name2u),
1270
((name1, './' + name1u),
1271
[(name1 + '/' + name0, name0, 'file', './' + name1u
1273
(name1 + '/' + name1, name1, 'directory', './' + name1u
1277
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1282
result = list(osutils._walkdirs_utf8('.'))
1283
self._filter_out_stat(result)
1284
self.assertEqual(expected_dirblocks, result)
1286
def test__walkdirs_utf8_win32readdir(self):
1287
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1288
self.requireFeature(tests.UnicodeFilenameFeature)
1289
from bzrlib._walkdirs_win32 import Win32ReadDir
1290
self._save_platform_info()
1291
osutils._selected_dir_reader = Win32ReadDir()
1292
name0u = u'0file-\xb6'
1293
name1u = u'1dir-\u062c\u0648'
1294
name2u = u'2file-\u0633'
1298
name1u + '/' + name0u,
1299
name1u + '/' + name1u + '/',
1302
self.build_tree(tree)
1303
name0 = name0u.encode('utf8')
1304
name1 = name1u.encode('utf8')
1305
name2 = name2u.encode('utf8')
1307
# All of the abspaths should be in unicode, all of the relative paths
1309
expected_dirblocks = [
1311
[(name0, name0, 'file', './' + name0u),
1312
(name1, name1, 'directory', './' + name1u),
1313
(name2, name2, 'file', './' + name2u),
1316
((name1, './' + name1u),
1317
[(name1 + '/' + name0, name0, 'file', './' + name1u
1319
(name1 + '/' + name1, name1, 'directory', './' + name1u
1323
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1328
result = list(osutils._walkdirs_utf8(u'.'))
1329
self._filter_out_stat(result)
1330
self.assertEqual(expected_dirblocks, result)
1332
def assertStatIsCorrect(self, path, win32stat):
1333
os_stat = os.stat(path)
1334
self.assertEqual(os_stat.st_size, win32stat.st_size)
1335
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1336
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1337
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1338
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1339
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1340
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1342
def test__walkdirs_utf_win32_find_file_stat_file(self):
1343
"""make sure our Stat values are valid"""
1344
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1345
self.requireFeature(tests.UnicodeFilenameFeature)
1346
from bzrlib._walkdirs_win32 import Win32ReadDir
1347
name0u = u'0file-\xb6'
1348
name0 = name0u.encode('utf8')
1349
self.build_tree([name0u])
1350
# I hate to sleep() here, but I'm trying to make the ctime different
1353
f = open(name0u, 'ab')
1355
f.write('just a small update')
1359
result = Win32ReadDir().read_dir('', u'.')
1361
self.assertEqual((name0, name0, 'file'), entry[:3])
1362
self.assertEqual(u'./' + name0u, entry[4])
1363
self.assertStatIsCorrect(entry[4], entry[3])
1364
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1366
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1367
"""make sure our Stat values are valid"""
1368
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1369
self.requireFeature(tests.UnicodeFilenameFeature)
1370
from bzrlib._walkdirs_win32 import Win32ReadDir
1371
name0u = u'0dir-\u062c\u0648'
1372
name0 = name0u.encode('utf8')
1373
self.build_tree([name0u + '/'])
1375
result = Win32ReadDir().read_dir('', u'.')
1377
self.assertEqual((name0, name0, 'directory'), entry[:3])
1378
self.assertEqual(u'./' + name0u, entry[4])
1379
self.assertStatIsCorrect(entry[4], entry[3])
1381
def assertPathCompare(self, path_less, path_greater):
1382
"""check that path_less and path_greater compare correctly."""
1383
self.assertEqual(0, osutils.compare_paths_prefix_order(
1384
path_less, path_less))
1385
self.assertEqual(0, osutils.compare_paths_prefix_order(
1386
path_greater, path_greater))
1387
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1388
path_less, path_greater))
1389
self.assertEqual(1, osutils.compare_paths_prefix_order(
1390
path_greater, path_less))
1392
def test_compare_paths_prefix_order(self):
1393
# root before all else
1394
self.assertPathCompare("/", "/a")
1395
# alpha within a dir
1396
self.assertPathCompare("/a", "/b")
1397
self.assertPathCompare("/b", "/z")
1398
# high dirs before lower.
1399
self.assertPathCompare("/z", "/a/a")
1400
# except if the deeper dir should be output first
1401
self.assertPathCompare("/a/b/c", "/d/g")
1402
# lexical betwen dirs of the same height
1403
self.assertPathCompare("/a/z", "/z/z")
1404
self.assertPathCompare("/a/c/z", "/a/d/e")
1406
# this should also be consistent for no leading / paths
1407
# root before all else
1408
self.assertPathCompare("", "a")
1409
# alpha within a dir
1410
self.assertPathCompare("a", "b")
1411
self.assertPathCompare("b", "z")
1412
# high dirs before lower.
1413
self.assertPathCompare("z", "a/a")
1414
# except if the deeper dir should be output first
1415
self.assertPathCompare("a/b/c", "d/g")
1416
# lexical betwen dirs of the same height
1417
self.assertPathCompare("a/z", "z/z")
1418
self.assertPathCompare("a/c/z", "a/d/e")
1420
def test_path_prefix_sorting(self):
1421
"""Doing a sort on path prefix should match our sample data."""
1436
dir_sorted_paths = [
1452
sorted(original_paths, key=osutils.path_prefix_key))
1453
# using the comparison routine shoudl work too:
1456
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1459
class TestCopyTree(tests.TestCaseInTempDir):
1461
def test_copy_basic_tree(self):
1462
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1463
osutils.copy_tree('source', 'target')
1464
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1465
self.assertEqual(['c'], os.listdir('target/b'))
1467
def test_copy_tree_target_exists(self):
1468
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1470
osutils.copy_tree('source', 'target')
1471
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1472
self.assertEqual(['c'], os.listdir('target/b'))
1474
def test_copy_tree_symlinks(self):
1475
self.requireFeature(tests.SymlinkFeature)
1476
self.build_tree(['source/'])
1477
os.symlink('a/generic/path', 'source/lnk')
1478
osutils.copy_tree('source', 'target')
1479
self.assertEqual(['lnk'], os.listdir('target'))
1480
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1482
def test_copy_tree_handlers(self):
1483
processed_files = []
1484
processed_links = []
1485
def file_handler(from_path, to_path):
1486
processed_files.append(('f', from_path, to_path))
1487
def dir_handler(from_path, to_path):
1488
processed_files.append(('d', from_path, to_path))
1489
def link_handler(from_path, to_path):
1490
processed_links.append((from_path, to_path))
1491
handlers = {'file':file_handler,
1492
'directory':dir_handler,
1493
'symlink':link_handler,
1496
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1497
if osutils.has_symlinks():
1498
os.symlink('a/generic/path', 'source/lnk')
1499
osutils.copy_tree('source', 'target', handlers=handlers)
1501
self.assertEqual([('d', 'source', 'target'),
1502
('f', 'source/a', 'target/a'),
1503
('d', 'source/b', 'target/b'),
1504
('f', 'source/b/c', 'target/b/c'),
1506
self.failIfExists('target')
1507
if osutils.has_symlinks():
1508
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1511
class TestSetUnsetEnv(tests.TestCase):
1512
"""Test updating the environment"""
1515
super(TestSetUnsetEnv, self).setUp()
1517
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1518
'Environment was not cleaned up properly.'
1519
' Variable BZR_TEST_ENV_VAR should not exist.')
1521
if 'BZR_TEST_ENV_VAR' in os.environ:
1522
del os.environ['BZR_TEST_ENV_VAR']
1524
self.addCleanup(cleanup)
1527
"""Test that we can set an env variable"""
1528
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1529
self.assertEqual(None, old)
1530
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1532
def test_double_set(self):
1533
"""Test that we get the old value out"""
1534
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1535
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1536
self.assertEqual('foo', old)
1537
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1539
def test_unicode(self):
1540
"""Environment can only contain plain strings
1542
So Unicode strings must be encoded.
1544
uni_val, env_val = tests.probe_unicode_in_user_encoding()
1546
raise tests.TestSkipped(
1547
'Cannot find a unicode character that works in encoding %s'
1548
% (osutils.get_user_encoding(),))
1550
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1551
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1553
def test_unset(self):
1554
"""Test that passing None will remove the env var"""
1555
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1556
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1557
self.assertEqual('foo', old)
1558
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1559
self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1562
class TestSizeShaFile(tests.TestCaseInTempDir):
1564
def test_sha_empty(self):
1565
self.build_tree_contents([('foo', '')])
1566
expected_sha = osutils.sha_string('')
1568
self.addCleanup(f.close)
1569
size, sha = osutils.size_sha_file(f)
1570
self.assertEqual(0, size)
1571
self.assertEqual(expected_sha, sha)
1573
def test_sha_mixed_endings(self):
1574
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1575
self.build_tree_contents([('foo', text)])
1576
expected_sha = osutils.sha_string(text)
1578
self.addCleanup(f.close)
1579
size, sha = osutils.size_sha_file(f)
1580
self.assertEqual(38, size)
1581
self.assertEqual(expected_sha, sha)
1584
class TestShaFileByName(tests.TestCaseInTempDir):
1586
def test_sha_empty(self):
1587
self.build_tree_contents([('foo', '')])
1588
expected_sha = osutils.sha_string('')
1589
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1591
def test_sha_mixed_endings(self):
1592
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1593
self.build_tree_contents([('foo', text)])
1594
expected_sha = osutils.sha_string(text)
1595
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1598
class TestResourceLoading(tests.TestCaseInTempDir):
1600
def test_resource_string(self):
1601
# test resource in bzrlib
1602
text = osutils.resource_string('bzrlib', 'debug.py')
1603
self.assertContainsRe(text, "debug_flags = set()")
1604
# test resource under bzrlib
1605
text = osutils.resource_string('bzrlib.ui', 'text.py')
1606
self.assertContainsRe(text, "class TextUIFactory")
1607
# test unsupported package
1608
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1610
# test unknown resource
1611
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1614
class TestReCompile(tests.TestCase):
1616
def test_re_compile_checked(self):
1617
r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1618
self.assertTrue(r.match('aaaa'))
1619
self.assertTrue(r.match('aAaA'))
1621
def test_re_compile_checked_error(self):
1622
# like https://bugs.launchpad.net/bzr/+bug/251352
1623
err = self.assertRaises(
1624
errors.BzrCommandError,
1625
osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1627
"Invalid regular expression in test case: '*': "
1628
"nothing to repeat",
1632
class TestDirReader(tests.TestCaseInTempDir):
1635
_dir_reader_class = None
1636
_native_to_unicode = None
1639
tests.TestCaseInTempDir.setUp(self)
1641
# Save platform specific info and reset it
1642
cur_dir_reader = osutils._selected_dir_reader
1645
osutils._selected_dir_reader = cur_dir_reader
1646
self.addCleanup(restore)
1648
osutils._selected_dir_reader = self._dir_reader_class()
1650
def _get_ascii_tree(self):
1658
expected_dirblocks = [
1660
[('0file', '0file', 'file'),
1661
('1dir', '1dir', 'directory'),
1662
('2file', '2file', 'file'),
1665
(('1dir', './1dir'),
1666
[('1dir/0file', '0file', 'file'),
1667
('1dir/1dir', '1dir', 'directory'),
1670
(('1dir/1dir', './1dir/1dir'),
1675
return tree, expected_dirblocks
1677
def test_walk_cur_dir(self):
1678
tree, expected_dirblocks = self._get_ascii_tree()
1679
self.build_tree(tree)
1680
result = list(osutils._walkdirs_utf8('.'))
1681
# Filter out stat and abspath
1682
self.assertEqual(expected_dirblocks,
1683
[(dirinfo, [line[0:3] for line in block])
1684
for dirinfo, block in result])
1686
def test_walk_sub_dir(self):
1687
tree, expected_dirblocks = self._get_ascii_tree()
1688
self.build_tree(tree)
1689
# you can search a subdir only, with a supplied prefix.
1690
result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1691
# Filter out stat and abspath
1692
self.assertEqual(expected_dirblocks[1:],
1693
[(dirinfo, [line[0:3] for line in block])
1694
for dirinfo, block in result])
1696
def _get_unicode_tree(self):
1697
name0u = u'0file-\xb6'
1698
name1u = u'1dir-\u062c\u0648'
1699
name2u = u'2file-\u0633'
1703
name1u + '/' + name0u,
1704
name1u + '/' + name1u + '/',
1707
name0 = name0u.encode('UTF-8')
1708
name1 = name1u.encode('UTF-8')
1709
name2 = name2u.encode('UTF-8')
1710
expected_dirblocks = [
1712
[(name0, name0, 'file', './' + name0u),
1713
(name1, name1, 'directory', './' + name1u),
1714
(name2, name2, 'file', './' + name2u),
1717
((name1, './' + name1u),
1718
[(name1 + '/' + name0, name0, 'file', './' + name1u
1720
(name1 + '/' + name1, name1, 'directory', './' + name1u
1724
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1729
return tree, expected_dirblocks
1731
def _filter_out(self, raw_dirblocks):
1732
"""Filter out a walkdirs_utf8 result.
1734
stat field is removed, all native paths are converted to unicode
1736
filtered_dirblocks = []
1737
for dirinfo, block in raw_dirblocks:
1738
dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1741
details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1742
filtered_dirblocks.append((dirinfo, details))
1743
return filtered_dirblocks
1745
def test_walk_unicode_tree(self):
1746
self.requireFeature(tests.UnicodeFilenameFeature)
1747
tree, expected_dirblocks = self._get_unicode_tree()
1748
self.build_tree(tree)
1749
result = list(osutils._walkdirs_utf8('.'))
1750
self.assertEqual(expected_dirblocks, self._filter_out(result))
1752
def test_symlink(self):
1753
self.requireFeature(tests.SymlinkFeature)
1754
self.requireFeature(tests.UnicodeFilenameFeature)
1755
target = u'target\N{Euro Sign}'
1756
link_name = u'l\N{Euro Sign}nk'
1757
os.symlink(target, link_name)
1758
target_utf8 = target.encode('UTF-8')
1759
link_name_utf8 = link_name.encode('UTF-8')
1760
expected_dirblocks = [
1762
[(link_name_utf8, link_name_utf8,
1763
'symlink', './' + link_name),],
1765
result = list(osutils._walkdirs_utf8('.'))
1766
self.assertEqual(expected_dirblocks, self._filter_out(result))
1769
class TestReadLink(tests.TestCaseInTempDir):
1770
"""Exposes os.readlink() problems and the osutils solution.
1772
The only guarantee offered by os.readlink(), starting with 2.6, is that a
1773
unicode string will be returned if a unicode string is passed.
1775
But prior python versions failed to properly encode the passed unicode
1778
_test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1781
super(tests.TestCaseInTempDir, self).setUp()
1782
self.link = u'l\N{Euro Sign}ink'
1783
self.target = u'targe\N{Euro Sign}t'
1784
os.symlink(self.target, self.link)
1786
def test_os_readlink_link_encoding(self):
1787
if sys.version_info < (2, 6):
1788
self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1790
self.assertEquals(self.target, os.readlink(self.link))
1792
def test_os_readlink_link_decoding(self):
1793
self.assertEquals(self.target.encode(osutils._fs_enc),
1794
os.readlink(self.link.encode(osutils._fs_enc)))
1797
class TestConcurrency(tests.TestCase):
1799
def test_local_concurrency(self):
1800
concurrency = osutils.local_concurrency()
1801
self.assertIsInstance(concurrency, int)
1804
class TestFailedToLoadExtension(tests.TestCase):
1806
def _try_loading(self):
1808
import bzrlib._fictional_extension_py
1809
except ImportError, e:
1810
osutils.failed_to_load_extension(e)
1814
super(TestFailedToLoadExtension, self).setUp()
1815
self.saved_failures = osutils._extension_load_failures[:]
1816
del osutils._extension_load_failures[:]
1817
self.addCleanup(self.restore_failures)
1819
def restore_failures(self):
1820
osutils._extension_load_failures = self.saved_failures
1822
def test_failure_to_load(self):
1824
self.assertLength(1, osutils._extension_load_failures)
1825
self.assertEquals(osutils._extension_load_failures[0],
1826
"No module named _fictional_extension_py")
1828
def test_report_extension_load_failures_no_warning(self):
1829
self.assertTrue(self._try_loading())
1830
warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1831
# it used to give a Python warning; it no longer does
1832
self.assertLength(0, warnings)
1834
def test_report_extension_load_failures_message(self):
1836
trace.push_log_file(log)
1837
self.assertTrue(self._try_loading())
1838
osutils.report_extension_load_failures()
1839
self.assertContainsRe(
1841
r"bzr: warning: some compiled extensions could not be loaded; "
1842
"see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"