1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the osutils wrapper."""
19
from cStringIO import StringIO
34
from bzrlib.tests import (
40
class _UTF8DirReaderFeature(tests.Feature):
44
from bzrlib import _readdir_pyx
45
self.reader = _readdir_pyx.UTF8DirReader
50
def feature_name(self):
51
return 'bzrlib._readdir_pyx'
53
UTF8DirReaderFeature = _UTF8DirReaderFeature()
56
def _already_unicode(s):
60
def _fs_enc_to_unicode(s):
61
return s.decode(osutils._fs_enc)
64
def _utf8_to_unicode(s):
65
return s.decode('UTF-8')
68
def dir_reader_scenarios():
69
# For each dir reader we define:
71
# - native_to_unicode: a function converting the native_abspath as returned
72
# by DirReader.read_dir to its unicode representation
74
# UnicodeDirReader is the fallback, it should be tested on all platforms.
75
scenarios = [('unicode',
76
dict(_dir_reader_class=osutils.UnicodeDirReader,
77
_native_to_unicode=_already_unicode))]
78
# Some DirReaders are platform specific and even there they may not be
80
if UTF8DirReaderFeature.available():
81
from bzrlib import _readdir_pyx
82
scenarios.append(('utf8',
83
dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
84
_native_to_unicode=_utf8_to_unicode)))
86
if test__walkdirs_win32.Win32ReadDirFeature.available():
88
from bzrlib import _walkdirs_win32
89
# TODO: check on windows, it may be that we need to use/add
90
# safe_unicode instead of _fs_enc_to_unicode
93
dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
94
_native_to_unicode=_fs_enc_to_unicode)))
100
def load_tests(basic_tests, module, loader):
101
suite = loader.suiteClass()
102
dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
103
basic_tests, tests.condition_isinstance(TestDirReader))
104
tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
105
suite.addTest(remaining_tests)
109
class TestContainsWhitespace(tests.TestCase):
111
def test_contains_whitespace(self):
112
self.failUnless(osutils.contains_whitespace(u' '))
113
self.failUnless(osutils.contains_whitespace(u'hello there'))
114
self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
115
self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
116
self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
117
self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
119
# \xa0 is "Non-breaking-space" which on some python locales thinks it
120
# is whitespace, but we do not.
121
self.failIf(osutils.contains_whitespace(u''))
122
self.failIf(osutils.contains_whitespace(u'hellothere'))
123
self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
126
class TestRename(tests.TestCaseInTempDir):
128
def test_fancy_rename(self):
129
# This should work everywhere
131
osutils.fancy_rename(a, b,
132
rename_func=os.rename,
133
unlink_func=os.unlink)
135
open('a', 'wb').write('something in a\n')
137
self.failIfExists('a')
138
self.failUnlessExists('b')
139
self.check_file_contents('b', 'something in a\n')
141
open('a', 'wb').write('new something in a\n')
144
self.check_file_contents('a', 'something in a\n')
146
def test_rename(self):
147
# Rename should be semi-atomic on all platforms
148
open('a', 'wb').write('something in a\n')
149
osutils.rename('a', 'b')
150
self.failIfExists('a')
151
self.failUnlessExists('b')
152
self.check_file_contents('b', 'something in a\n')
154
open('a', 'wb').write('new something in a\n')
155
osutils.rename('b', 'a')
157
self.check_file_contents('a', 'something in a\n')
159
# TODO: test fancy_rename using a MemoryTransport
161
def test_rename_change_case(self):
162
# on Windows we should be able to change filename case by rename
163
self.build_tree(['a', 'b/'])
164
osutils.rename('a', 'A')
165
osutils.rename('b', 'B')
166
# we can't use failUnlessExists on case-insensitive filesystem
167
# so try to check shape of the tree
168
shape = sorted(os.listdir('.'))
169
self.assertEquals(['A', 'B'], shape)
172
class TestRandChars(tests.TestCase):
174
def test_01_rand_chars_empty(self):
175
result = osutils.rand_chars(0)
176
self.assertEqual(result, '')
178
def test_02_rand_chars_100(self):
179
result = osutils.rand_chars(100)
180
self.assertEqual(len(result), 100)
181
self.assertEqual(type(result), str)
182
self.assertContainsRe(result, r'^[a-z0-9]{100}$')
185
class TestIsInside(tests.TestCase):
187
def test_is_inside(self):
188
is_inside = osutils.is_inside
189
self.assertTrue(is_inside('src', 'src/foo.c'))
190
self.assertFalse(is_inside('src', 'srccontrol'))
191
self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
192
self.assertTrue(is_inside('foo.c', 'foo.c'))
193
self.assertFalse(is_inside('foo.c', ''))
194
self.assertTrue(is_inside('', 'foo.c'))
196
def test_is_inside_any(self):
197
SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
198
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
199
(['src'], SRC_FOO_C),
202
self.assert_(osutils.is_inside_any(dirs, fn))
203
for dirs, fn in [(['src'], 'srccontrol'),
204
(['src'], 'srccontrol/foo')]:
205
self.assertFalse(osutils.is_inside_any(dirs, fn))
207
def test_is_inside_or_parent_of_any(self):
208
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
209
(['src'], 'src/foo.c'),
210
(['src/bar.c'], 'src'),
211
(['src/bar.c', 'bla/foo.c'], 'src'),
214
self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
216
for dirs, fn in [(['src'], 'srccontrol'),
217
(['srccontrol/foo.c'], 'src'),
218
(['src'], 'srccontrol/foo')]:
219
self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
222
class TestRmTree(tests.TestCaseInTempDir):
224
def test_rmtree(self):
225
# Check to remove tree with read-only files/dirs
227
f = file('dir/file', 'w')
230
# would like to also try making the directory readonly, but at the
231
# moment python shutil.rmtree doesn't handle that properly - it would
232
# need to chmod the directory before removing things inside it - deferred
233
# for now -- mbp 20060505
234
# osutils.make_readonly('dir')
235
osutils.make_readonly('dir/file')
237
osutils.rmtree('dir')
239
self.failIfExists('dir/file')
240
self.failIfExists('dir')
243
class TestDeleteAny(tests.TestCaseInTempDir):
245
def test_delete_any_readonly(self):
246
# from <https://bugs.launchpad.net/bzr/+bug/218206>
247
self.build_tree(['d/', 'f'])
248
osutils.make_readonly('d')
249
osutils.make_readonly('f')
251
osutils.delete_any('f')
252
osutils.delete_any('d')
255
class TestKind(tests.TestCaseInTempDir):
257
def test_file_kind(self):
258
self.build_tree(['file', 'dir/'])
259
self.assertEquals('file', osutils.file_kind('file'))
260
self.assertEquals('directory', osutils.file_kind('dir/'))
261
if osutils.has_symlinks():
262
os.symlink('symlink', 'symlink')
263
self.assertEquals('symlink', osutils.file_kind('symlink'))
265
# TODO: jam 20060529 Test a block device
267
os.lstat('/dev/null')
269
if e.errno not in (errno.ENOENT,):
272
self.assertEquals('chardev', osutils.file_kind('/dev/null'))
274
mkfifo = getattr(os, 'mkfifo', None)
278
self.assertEquals('fifo', osutils.file_kind('fifo'))
282
AF_UNIX = getattr(socket, 'AF_UNIX', None)
284
s = socket.socket(AF_UNIX)
287
self.assertEquals('socket', osutils.file_kind('socket'))
291
def test_kind_marker(self):
292
self.assertEqual("", osutils.kind_marker("file"))
293
self.assertEqual("/", osutils.kind_marker('directory'))
294
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
295
self.assertEqual("@", osutils.kind_marker("symlink"))
296
self.assertEqual("+", osutils.kind_marker("tree-reference"))
297
self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
300
class TestUmask(tests.TestCaseInTempDir):
302
def test_get_umask(self):
303
if sys.platform == 'win32':
304
# umask always returns '0', no way to set it
305
self.assertEqual(0, osutils.get_umask())
308
orig_umask = osutils.get_umask()
309
self.addCleanup(os.umask, orig_umask)
311
self.assertEqual(0222, osutils.get_umask())
313
self.assertEqual(0022, osutils.get_umask())
315
self.assertEqual(0002, osutils.get_umask())
317
self.assertEqual(0027, osutils.get_umask())
320
class TestDateTime(tests.TestCase):
322
def assertFormatedDelta(self, expected, seconds):
323
"""Assert osutils.format_delta formats as expected"""
324
actual = osutils.format_delta(seconds)
325
self.assertEqual(expected, actual)
327
def test_format_delta(self):
328
self.assertFormatedDelta('0 seconds ago', 0)
329
self.assertFormatedDelta('1 second ago', 1)
330
self.assertFormatedDelta('10 seconds ago', 10)
331
self.assertFormatedDelta('59 seconds ago', 59)
332
self.assertFormatedDelta('89 seconds ago', 89)
333
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
334
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
335
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
336
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
337
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
338
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
339
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
340
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
341
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
342
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
343
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
344
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
345
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
346
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
347
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
348
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
349
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
350
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
352
# We handle when time steps the wrong direction because computers
353
# don't have synchronized clocks.
354
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
355
self.assertFormatedDelta('1 second in the future', -1)
356
self.assertFormatedDelta('2 seconds in the future', -2)
358
def test_format_date(self):
359
self.assertRaises(errors.UnsupportedTimezoneFormat,
360
osutils.format_date, 0, timezone='foo')
361
self.assertIsInstance(osutils.format_date(0), str)
362
self.assertIsInstance(osutils.format_local_date(0), unicode)
363
# Testing for the actual value of the local weekday without
364
# duplicating the code from format_date is difficult.
365
# Instead blackbox.test_locale should check for localized
366
# dates once they do occur in output strings.
368
def test_local_time_offset(self):
369
"""Test that local_time_offset() returns a sane value."""
370
offset = osutils.local_time_offset()
371
self.assertTrue(isinstance(offset, int))
372
# Test that the offset is no more than a eighteen hours in
374
# Time zone handling is system specific, so it is difficult to
375
# do more specific tests, but a value outside of this range is
377
eighteen_hours = 18 * 3600
378
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
380
def test_local_time_offset_with_timestamp(self):
381
"""Test that local_time_offset() works with a timestamp."""
382
offset = osutils.local_time_offset(1000000000.1234567)
383
self.assertTrue(isinstance(offset, int))
384
eighteen_hours = 18 * 3600
385
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
388
class TestLinks(tests.TestCaseInTempDir):
390
def test_dereference_path(self):
391
self.requireFeature(tests.SymlinkFeature)
392
cwd = osutils.realpath('.')
394
bar_path = osutils.pathjoin(cwd, 'bar')
395
# Using './' to avoid bug #1213894 (first path component not
396
# dereferenced) in Python 2.4.1 and earlier
397
self.assertEqual(bar_path, osutils.realpath('./bar'))
398
os.symlink('bar', 'foo')
399
self.assertEqual(bar_path, osutils.realpath('./foo'))
401
# Does not dereference terminal symlinks
402
foo_path = osutils.pathjoin(cwd, 'foo')
403
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
405
# Dereferences parent symlinks
407
baz_path = osutils.pathjoin(bar_path, 'baz')
408
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
410
# Dereferences parent symlinks that are the first path element
411
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
413
# Dereferences parent symlinks in absolute paths
414
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
415
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
417
def test_changing_access(self):
418
f = file('file', 'w')
422
# Make a file readonly
423
osutils.make_readonly('file')
424
mode = os.lstat('file').st_mode
425
self.assertEqual(mode, mode & 0777555)
427
# Make a file writable
428
osutils.make_writable('file')
429
mode = os.lstat('file').st_mode
430
self.assertEqual(mode, mode | 0200)
432
if osutils.has_symlinks():
433
# should not error when handed a symlink
434
os.symlink('nonexistent', 'dangling')
435
osutils.make_readonly('dangling')
436
osutils.make_writable('dangling')
438
def test_host_os_dereferences_symlinks(self):
439
osutils.host_os_dereferences_symlinks()
442
class TestCanonicalRelPath(tests.TestCaseInTempDir):
444
_test_needs_features = [tests.CaseInsCasePresFilenameFeature]
446
def test_canonical_relpath_simple(self):
447
f = file('MixedCaseName', 'w')
449
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
450
real_base_dir = osutils.realpath(self.test_base_dir)
451
actual = osutils.canonical_relpath(real_base_dir, 'mixedcasename')
452
self.failUnlessEqual('work/MixedCaseName', actual)
454
def test_canonical_relpath_missing_tail(self):
455
os.mkdir('MixedCaseParent')
456
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
457
real_base_dir = osutils.realpath(self.test_base_dir)
458
actual = osutils.canonical_relpath(real_base_dir,
459
'mixedcaseparent/nochild')
460
self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
463
class TestPumpFile(tests.TestCase):
464
"""Test pumpfile method."""
467
tests.TestCase.setUp(self)
468
# create a test datablock
469
self.block_size = 512
470
pattern = '0123456789ABCDEF'
471
self.test_data = pattern * (3 * self.block_size / len(pattern))
472
self.test_data_len = len(self.test_data)
474
def test_bracket_block_size(self):
475
"""Read data in blocks with the requested read size bracketing the
477
# make sure test data is larger than max read size
478
self.assertTrue(self.test_data_len > self.block_size)
480
from_file = file_utils.FakeReadFile(self.test_data)
483
# read (max / 2) bytes and verify read size wasn't affected
484
num_bytes_to_read = self.block_size / 2
485
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
486
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
487
self.assertEqual(from_file.get_read_count(), 1)
489
# read (max) bytes and verify read size wasn't affected
490
num_bytes_to_read = self.block_size
491
from_file.reset_read_count()
492
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
493
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
494
self.assertEqual(from_file.get_read_count(), 1)
496
# read (max + 1) bytes and verify read size was limited
497
num_bytes_to_read = self.block_size + 1
498
from_file.reset_read_count()
499
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
500
self.assertEqual(from_file.get_max_read_size(), self.block_size)
501
self.assertEqual(from_file.get_read_count(), 2)
503
# finish reading the rest of the data
504
num_bytes_to_read = self.test_data_len - to_file.tell()
505
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
507
# report error if the data wasn't equal (we only report the size due
508
# to the length of the data)
509
response_data = to_file.getvalue()
510
if response_data != self.test_data:
511
message = "Data not equal. Expected %d bytes, received %d."
512
self.fail(message % (len(response_data), self.test_data_len))
514
def test_specified_size(self):
515
"""Request a transfer larger than the maximum block size and verify
516
that the maximum read doesn't exceed the block_size."""
517
# make sure test data is larger than max read size
518
self.assertTrue(self.test_data_len > self.block_size)
520
# retrieve data in blocks
521
from_file = file_utils.FakeReadFile(self.test_data)
523
osutils.pumpfile(from_file, to_file, self.test_data_len,
526
# verify read size was equal to the maximum read size
527
self.assertTrue(from_file.get_max_read_size() > 0)
528
self.assertEqual(from_file.get_max_read_size(), self.block_size)
529
self.assertEqual(from_file.get_read_count(), 3)
531
# report error if the data wasn't equal (we only report the size due
532
# to the length of the data)
533
response_data = to_file.getvalue()
534
if response_data != self.test_data:
535
message = "Data not equal. Expected %d bytes, received %d."
536
self.fail(message % (len(response_data), self.test_data_len))
538
def test_to_eof(self):
539
"""Read to end-of-file and verify that the reads are not larger than
540
the maximum read size."""
541
# make sure test data is larger than max read size
542
self.assertTrue(self.test_data_len > self.block_size)
544
# retrieve data to EOF
545
from_file = file_utils.FakeReadFile(self.test_data)
547
osutils.pumpfile(from_file, to_file, -1, self.block_size)
549
# verify read size was equal to the maximum read size
550
self.assertEqual(from_file.get_max_read_size(), self.block_size)
551
self.assertEqual(from_file.get_read_count(), 4)
553
# report error if the data wasn't equal (we only report the size due
554
# to the length of the data)
555
response_data = to_file.getvalue()
556
if response_data != self.test_data:
557
message = "Data not equal. Expected %d bytes, received %d."
558
self.fail(message % (len(response_data), self.test_data_len))
560
def test_defaults(self):
561
"""Verifies that the default arguments will read to EOF -- this
562
test verifies that any existing usages of pumpfile will not be broken
563
with this new version."""
564
# retrieve data using default (old) pumpfile method
565
from_file = file_utils.FakeReadFile(self.test_data)
567
osutils.pumpfile(from_file, to_file)
569
# report error if the data wasn't equal (we only report the size due
570
# to the length of the data)
571
response_data = to_file.getvalue()
572
if response_data != self.test_data:
573
message = "Data not equal. Expected %d bytes, received %d."
574
self.fail(message % (len(response_data), self.test_data_len))
576
def test_report_activity(self):
578
def log_activity(length, direction):
579
activity.append((length, direction))
580
from_file = StringIO(self.test_data)
582
osutils.pumpfile(from_file, to_file, buff_size=500,
583
report_activity=log_activity, direction='read')
584
self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
585
(36, 'read')], activity)
587
from_file = StringIO(self.test_data)
590
osutils.pumpfile(from_file, to_file, buff_size=500,
591
report_activity=log_activity, direction='write')
592
self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
593
(36, 'write')], activity)
595
# And with a limited amount of data
596
from_file = StringIO(self.test_data)
599
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
600
report_activity=log_activity, direction='read')
601
self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
605
class TestPumpStringFile(tests.TestCase):
607
def test_empty(self):
609
osutils.pump_string_file("", output)
610
self.assertEqual("", output.getvalue())
612
def test_more_than_segment_size(self):
614
osutils.pump_string_file("123456789", output, 2)
615
self.assertEqual("123456789", output.getvalue())
617
def test_segment_size(self):
619
osutils.pump_string_file("12", output, 2)
620
self.assertEqual("12", output.getvalue())
622
def test_segment_size_multiple(self):
624
osutils.pump_string_file("1234", output, 2)
625
self.assertEqual("1234", output.getvalue())
628
class TestRelpath(tests.TestCase):
630
def test_simple_relpath(self):
631
cwd = osutils.getcwd()
632
subdir = cwd + '/subdir'
633
self.assertEqual('subdir', osutils.relpath(cwd, subdir))
635
def test_not_relative(self):
636
self.assertRaises(errors.PathNotChild,
637
osutils.relpath, 'C:/path', 'H:/path')
638
self.assertRaises(errors.PathNotChild,
639
osutils.relpath, 'C:/', 'H:/path')
642
class TestSafeUnicode(tests.TestCase):
644
def test_from_ascii_string(self):
645
self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
647
def test_from_unicode_string_ascii_contents(self):
648
self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
650
def test_from_unicode_string_unicode_contents(self):
651
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
653
def test_from_utf8_string(self):
654
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
656
def test_bad_utf8_string(self):
657
self.assertRaises(errors.BzrBadParameterNotUnicode,
658
osutils.safe_unicode,
662
class TestSafeUtf8(tests.TestCase):
664
def test_from_ascii_string(self):
666
self.assertEqual('foobar', osutils.safe_utf8(f))
668
def test_from_unicode_string_ascii_contents(self):
669
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
671
def test_from_unicode_string_unicode_contents(self):
672
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
674
def test_from_utf8_string(self):
675
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
677
def test_bad_utf8_string(self):
678
self.assertRaises(errors.BzrBadParameterNotUnicode,
679
osutils.safe_utf8, '\xbb\xbb')
682
class TestSafeRevisionId(tests.TestCase):
684
def test_from_ascii_string(self):
685
# this shouldn't give a warning because it's getting an ascii string
686
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
688
def test_from_unicode_string_ascii_contents(self):
689
self.assertEqual('bargam',
690
osutils.safe_revision_id(u'bargam', warn=False))
692
def test_from_unicode_deprecated(self):
693
self.assertEqual('bargam',
694
self.callDeprecated([osutils._revision_id_warning],
695
osutils.safe_revision_id, u'bargam'))
697
def test_from_unicode_string_unicode_contents(self):
698
self.assertEqual('bargam\xc2\xae',
699
osutils.safe_revision_id(u'bargam\xae', warn=False))
701
def test_from_utf8_string(self):
702
self.assertEqual('foo\xc2\xae',
703
osutils.safe_revision_id('foo\xc2\xae'))
706
"""Currently, None is a valid revision_id"""
707
self.assertEqual(None, osutils.safe_revision_id(None))
710
class TestSafeFileId(tests.TestCase):
712
def test_from_ascii_string(self):
713
self.assertEqual('foobar', osutils.safe_file_id('foobar'))
715
def test_from_unicode_string_ascii_contents(self):
716
self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
718
def test_from_unicode_deprecated(self):
719
self.assertEqual('bargam',
720
self.callDeprecated([osutils._file_id_warning],
721
osutils.safe_file_id, u'bargam'))
723
def test_from_unicode_string_unicode_contents(self):
724
self.assertEqual('bargam\xc2\xae',
725
osutils.safe_file_id(u'bargam\xae', warn=False))
727
def test_from_utf8_string(self):
728
self.assertEqual('foo\xc2\xae',
729
osutils.safe_file_id('foo\xc2\xae'))
732
"""Currently, None is a valid revision_id"""
733
self.assertEqual(None, osutils.safe_file_id(None))
736
class TestWin32Funcs(tests.TestCase):
737
"""Test that _win32 versions of os utilities return appropriate paths."""
739
def test_abspath(self):
740
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
741
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
742
self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
743
self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
745
def test_realpath(self):
746
self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
747
self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
749
def test_pathjoin(self):
750
self.assertEqual('path/to/foo',
751
osutils._win32_pathjoin('path', 'to', 'foo'))
752
self.assertEqual('C:/foo',
753
osutils._win32_pathjoin('path\\to', 'C:\\foo'))
754
self.assertEqual('C:/foo',
755
osutils._win32_pathjoin('path/to', 'C:/foo'))
756
self.assertEqual('path/to/foo',
757
osutils._win32_pathjoin('path/to/', 'foo'))
758
self.assertEqual('/foo',
759
osutils._win32_pathjoin('C:/path/to/', '/foo'))
760
self.assertEqual('/foo',
761
osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
763
def test_normpath(self):
764
self.assertEqual('path/to/foo',
765
osutils._win32_normpath(r'path\\from\..\to\.\foo'))
766
self.assertEqual('path/to/foo',
767
osutils._win32_normpath('path//from/../to/./foo'))
769
def test_getcwd(self):
770
cwd = osutils._win32_getcwd()
771
os_cwd = os.getcwdu()
772
self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
773
# win32 is inconsistent whether it returns lower or upper case
774
# and even if it was consistent the user might type the other
775
# so we force it to uppercase
776
# running python.exe under cmd.exe return capital C:\\
777
# running win32 python inside a cygwin shell returns lowercase
778
self.assertEqual(os_cwd[0].upper(), cwd[0])
780
def test_fixdrive(self):
781
self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
782
self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
783
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
785
def test_win98_abspath(self):
787
self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
788
self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
790
self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
791
self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
793
cwd = osutils.getcwd().rstrip('/')
794
drive = osutils._nt_splitdrive(cwd)[0]
795
self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
796
self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
799
self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
802
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
803
"""Test win32 functions that create files."""
805
def test_getcwd(self):
806
self.requireFeature(tests.UnicodeFilenameFeature)
809
# TODO: jam 20060427 This will probably fail on Mac OSX because
810
# it will change the normalization of B\xe5gfors
811
# Consider using a different unicode character, or make
812
# osutils.getcwd() renormalize the path.
813
self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
815
def test_minimum_path_selection(self):
816
self.assertEqual(set(),
817
osutils.minimum_path_selection([]))
818
self.assertEqual(set(['a']),
819
osutils.minimum_path_selection(['a']))
820
self.assertEqual(set(['a', 'b']),
821
osutils.minimum_path_selection(['a', 'b']))
822
self.assertEqual(set(['a/', 'b']),
823
osutils.minimum_path_selection(['a/', 'b']))
824
self.assertEqual(set(['a/', 'b']),
825
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
826
self.assertEqual(set(['a-b', 'a', 'a0b']),
827
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
829
def test_mkdtemp(self):
830
tmpdir = osutils._win32_mkdtemp(dir='.')
831
self.assertFalse('\\' in tmpdir)
833
def test_rename(self):
841
osutils._win32_rename('b', 'a')
842
self.failUnlessExists('a')
843
self.failIfExists('b')
844
self.assertFileEqual('baz\n', 'a')
846
def test_rename_missing_file(self):
852
osutils._win32_rename('b', 'a')
853
except (IOError, OSError), e:
854
self.assertEqual(errno.ENOENT, e.errno)
855
self.assertFileEqual('foo\n', 'a')
857
def test_rename_missing_dir(self):
860
osutils._win32_rename('b', 'a')
861
except (IOError, OSError), e:
862
self.assertEqual(errno.ENOENT, e.errno)
864
def test_rename_current_dir(self):
867
# You can't rename the working directory
868
# doing rename non-existant . usually
869
# just raises ENOENT, since non-existant
872
osutils._win32_rename('b', '.')
873
except (IOError, OSError), e:
874
self.assertEqual(errno.ENOENT, e.errno)
876
def test_splitpath(self):
877
def check(expected, path):
878
self.assertEqual(expected, osutils.splitpath(path))
881
check(['a', 'b'], 'a/b')
882
check(['a', 'b'], 'a/./b')
883
check(['a', '.b'], 'a/.b')
884
check(['a', '.b'], 'a\\.b')
886
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
889
class TestParentDirectories(tests.TestCaseInTempDir):
890
"""Test osutils.parent_directories()"""
892
def test_parent_directories(self):
893
self.assertEqual([], osutils.parent_directories('a'))
894
self.assertEqual(['a'], osutils.parent_directories('a/b'))
895
self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
898
class TestMacFuncsDirs(tests.TestCaseInTempDir):
899
"""Test mac special functions that require directories."""
901
def test_getcwd(self):
902
self.requireFeature(tests.UnicodeFilenameFeature)
903
os.mkdir(u'B\xe5gfors')
904
os.chdir(u'B\xe5gfors')
905
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
907
def test_getcwd_nonnorm(self):
908
self.requireFeature(tests.UnicodeFilenameFeature)
909
# Test that _mac_getcwd() will normalize this path
910
os.mkdir(u'Ba\u030agfors')
911
os.chdir(u'Ba\u030agfors')
912
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
915
class TestChunksToLines(tests.TestCase):
917
def test_smoketest(self):
918
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
919
osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
920
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
921
osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
923
def test_osutils_binding(self):
924
from bzrlib.tests import test__chunks_to_lines
925
if test__chunks_to_lines.CompiledChunksToLinesFeature.available():
926
from bzrlib._chunks_to_lines_pyx import chunks_to_lines
928
from bzrlib._chunks_to_lines_py import chunks_to_lines
929
self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
932
class TestSplitLines(tests.TestCase):
934
def test_split_unicode(self):
935
self.assertEqual([u'foo\n', u'bar\xae'],
936
osutils.split_lines(u'foo\nbar\xae'))
937
self.assertEqual([u'foo\n', u'bar\xae\n'],
938
osutils.split_lines(u'foo\nbar\xae\n'))
940
def test_split_with_carriage_returns(self):
941
self.assertEqual(['foo\rbar\n'],
942
osutils.split_lines('foo\rbar\n'))
945
class TestWalkDirs(tests.TestCaseInTempDir):
947
def assertExpectedBlocks(self, expected, result):
948
self.assertEqual(expected,
949
[(dirinfo, [line[0:3] for line in block])
950
for dirinfo, block in result])
952
def test_walkdirs(self):
961
self.build_tree(tree)
962
expected_dirblocks = [
964
[('0file', '0file', 'file'),
965
('1dir', '1dir', 'directory'),
966
('2file', '2file', 'file'),
970
[('1dir/0file', '0file', 'file'),
971
('1dir/1dir', '1dir', 'directory'),
974
(('1dir/1dir', './1dir/1dir'),
981
for dirdetail, dirblock in osutils.walkdirs('.'):
982
if len(dirblock) and dirblock[0][1] == '.bzr':
983
# this tests the filtering of selected paths
986
result.append((dirdetail, dirblock))
988
self.assertTrue(found_bzrdir)
989
self.assertExpectedBlocks(expected_dirblocks, result)
990
# you can search a subdir only, with a supplied prefix.
992
for dirblock in osutils.walkdirs('./1dir', '1dir'):
993
result.append(dirblock)
994
self.assertExpectedBlocks(expected_dirblocks[1:], result)
996
def test_walkdirs_os_error(self):
997
# <https://bugs.edge.launchpad.net/bzr/+bug/338653>
998
# Pyrex readdir didn't raise useful messages if it had an error
999
# reading the directory
1000
if sys.platform == 'win32':
1001
raise tests.TestNotApplicable(
1002
"readdir IOError not tested on win32")
1003
os.mkdir("test-unreadable")
1004
os.chmod("test-unreadable", 0000)
1005
# must chmod it back so that it can be removed
1006
self.addCleanup(os.chmod, "test-unreadable", 0700)
1007
# The error is not raised until the generator is actually evaluated.
1008
# (It would be ok if it happened earlier but at the moment it
1010
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1011
self.assertEquals('./test-unreadable', e.filename)
1012
self.assertEquals(errno.EACCES, e.errno)
1013
# Ensure the message contains the file name
1014
self.assertContainsRe(str(e), "\./test-unreadable")
1016
def test__walkdirs_utf8(self):
1025
self.build_tree(tree)
1026
expected_dirblocks = [
1028
[('0file', '0file', 'file'),
1029
('1dir', '1dir', 'directory'),
1030
('2file', '2file', 'file'),
1033
(('1dir', './1dir'),
1034
[('1dir/0file', '0file', 'file'),
1035
('1dir/1dir', '1dir', 'directory'),
1038
(('1dir/1dir', './1dir/1dir'),
1044
found_bzrdir = False
1045
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1046
if len(dirblock) and dirblock[0][1] == '.bzr':
1047
# this tests the filtering of selected paths
1050
result.append((dirdetail, dirblock))
1052
self.assertTrue(found_bzrdir)
1053
self.assertExpectedBlocks(expected_dirblocks, result)
1055
# you can search a subdir only, with a supplied prefix.
1057
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1058
result.append(dirblock)
1059
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1061
def _filter_out_stat(self, result):
1062
"""Filter out the stat value from the walkdirs result"""
1063
for dirdetail, dirblock in result:
1065
for info in dirblock:
1066
# Ignore info[3] which is the stat
1067
new_dirblock.append((info[0], info[1], info[2], info[4]))
1068
dirblock[:] = new_dirblock
1070
def _save_platform_info(self):
1071
cur_winver = win32utils.winver
1072
cur_fs_enc = osutils._fs_enc
1073
cur_dir_reader = osutils._selected_dir_reader
1075
win32utils.winver = cur_winver
1076
osutils._fs_enc = cur_fs_enc
1077
osutils._selected_dir_reader = cur_dir_reader
1078
self.addCleanup(restore)
1080
def assertDirReaderIs(self, expected):
1081
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1082
# Force it to redetect
1083
osutils._selected_dir_reader = None
1084
# Nothing to list, but should still trigger the selection logic
1085
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1086
self.assertIsInstance(osutils._selected_dir_reader, expected)
1088
def test_force_walkdirs_utf8_fs_utf8(self):
1089
self.requireFeature(UTF8DirReaderFeature)
1090
self._save_platform_info()
1091
win32utils.winver = None # Avoid the win32 detection code
1092
osutils._fs_enc = 'UTF-8'
1093
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1095
def test_force_walkdirs_utf8_fs_ascii(self):
1096
self.requireFeature(UTF8DirReaderFeature)
1097
self._save_platform_info()
1098
win32utils.winver = None # Avoid the win32 detection code
1099
osutils._fs_enc = 'US-ASCII'
1100
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1102
def test_force_walkdirs_utf8_fs_ANSI(self):
1103
self.requireFeature(UTF8DirReaderFeature)
1104
self._save_platform_info()
1105
win32utils.winver = None # Avoid the win32 detection code
1106
osutils._fs_enc = 'ANSI_X3.4-1968'
1107
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1109
def test_force_walkdirs_utf8_fs_latin1(self):
1110
self._save_platform_info()
1111
win32utils.winver = None # Avoid the win32 detection code
1112
osutils._fs_enc = 'latin1'
1113
self.assertDirReaderIs(osutils.UnicodeDirReader)
1115
def test_force_walkdirs_utf8_nt(self):
1116
# Disabled because the thunk of the whole walkdirs api is disabled.
1117
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1118
self._save_platform_info()
1119
win32utils.winver = 'Windows NT'
1120
from bzrlib._walkdirs_win32 import Win32ReadDir
1121
self.assertDirReaderIs(Win32ReadDir)
1123
def test_force_walkdirs_utf8_98(self):
1124
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1125
self._save_platform_info()
1126
win32utils.winver = 'Windows 98'
1127
self.assertDirReaderIs(osutils.UnicodeDirReader)
1129
def test_unicode_walkdirs(self):
1130
"""Walkdirs should always return unicode paths."""
1131
self.requireFeature(tests.UnicodeFilenameFeature)
1132
name0 = u'0file-\xb6'
1133
name1 = u'1dir-\u062c\u0648'
1134
name2 = u'2file-\u0633'
1138
name1 + '/' + name0,
1139
name1 + '/' + name1 + '/',
1142
self.build_tree(tree)
1143
expected_dirblocks = [
1145
[(name0, name0, 'file', './' + name0),
1146
(name1, name1, 'directory', './' + name1),
1147
(name2, name2, 'file', './' + name2),
1150
((name1, './' + name1),
1151
[(name1 + '/' + name0, name0, 'file', './' + name1
1153
(name1 + '/' + name1, name1, 'directory', './' + name1
1157
((name1 + '/' + name1, './' + name1 + '/' + name1),
1162
result = list(osutils.walkdirs('.'))
1163
self._filter_out_stat(result)
1164
self.assertEqual(expected_dirblocks, result)
1165
result = list(osutils.walkdirs(u'./'+name1, name1))
1166
self._filter_out_stat(result)
1167
self.assertEqual(expected_dirblocks[1:], result)
1169
def test_unicode__walkdirs_utf8(self):
1170
"""Walkdirs_utf8 should always return utf8 paths.
1172
The abspath portion might be in unicode or utf-8
1174
self.requireFeature(tests.UnicodeFilenameFeature)
1175
name0 = u'0file-\xb6'
1176
name1 = u'1dir-\u062c\u0648'
1177
name2 = u'2file-\u0633'
1181
name1 + '/' + name0,
1182
name1 + '/' + name1 + '/',
1185
self.build_tree(tree)
1186
name0 = name0.encode('utf8')
1187
name1 = name1.encode('utf8')
1188
name2 = name2.encode('utf8')
1190
expected_dirblocks = [
1192
[(name0, name0, 'file', './' + name0),
1193
(name1, name1, 'directory', './' + name1),
1194
(name2, name2, 'file', './' + name2),
1197
((name1, './' + name1),
1198
[(name1 + '/' + name0, name0, 'file', './' + name1
1200
(name1 + '/' + name1, name1, 'directory', './' + name1
1204
((name1 + '/' + name1, './' + name1 + '/' + name1),
1210
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1211
# all abspaths are Unicode, and encode them back into utf8.
1212
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1213
self.assertIsInstance(dirdetail[0], str)
1214
if isinstance(dirdetail[1], unicode):
1215
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1216
dirblock = [list(info) for info in dirblock]
1217
for info in dirblock:
1218
self.assertIsInstance(info[4], unicode)
1219
info[4] = info[4].encode('utf8')
1221
for info in dirblock:
1222
self.assertIsInstance(info[0], str)
1223
self.assertIsInstance(info[1], str)
1224
self.assertIsInstance(info[4], str)
1225
# Remove the stat information
1226
new_dirblock.append((info[0], info[1], info[2], info[4]))
1227
result.append((dirdetail, new_dirblock))
1228
self.assertEqual(expected_dirblocks, result)
1230
def test__walkdirs_utf8_with_unicode_fs(self):
1231
"""UnicodeDirReader should be a safe fallback everywhere
1233
The abspath portion should be in unicode
1235
self.requireFeature(tests.UnicodeFilenameFeature)
1236
# Use the unicode reader. TODO: split into driver-and-driven unit
1238
self._save_platform_info()
1239
osutils._selected_dir_reader = osutils.UnicodeDirReader()
1240
name0u = u'0file-\xb6'
1241
name1u = u'1dir-\u062c\u0648'
1242
name2u = u'2file-\u0633'
1246
name1u + '/' + name0u,
1247
name1u + '/' + name1u + '/',
1250
self.build_tree(tree)
1251
name0 = name0u.encode('utf8')
1252
name1 = name1u.encode('utf8')
1253
name2 = name2u.encode('utf8')
1255
# All of the abspaths should be in unicode, all of the relative paths
1257
expected_dirblocks = [
1259
[(name0, name0, 'file', './' + name0u),
1260
(name1, name1, 'directory', './' + name1u),
1261
(name2, name2, 'file', './' + name2u),
1264
((name1, './' + name1u),
1265
[(name1 + '/' + name0, name0, 'file', './' + name1u
1267
(name1 + '/' + name1, name1, 'directory', './' + name1u
1271
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1276
result = list(osutils._walkdirs_utf8('.'))
1277
self._filter_out_stat(result)
1278
self.assertEqual(expected_dirblocks, result)
1280
def test__walkdirs_utf8_win32readdir(self):
1281
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1282
self.requireFeature(tests.UnicodeFilenameFeature)
1283
from bzrlib._walkdirs_win32 import Win32ReadDir
1284
self._save_platform_info()
1285
osutils._selected_dir_reader = Win32ReadDir()
1286
name0u = u'0file-\xb6'
1287
name1u = u'1dir-\u062c\u0648'
1288
name2u = u'2file-\u0633'
1292
name1u + '/' + name0u,
1293
name1u + '/' + name1u + '/',
1296
self.build_tree(tree)
1297
name0 = name0u.encode('utf8')
1298
name1 = name1u.encode('utf8')
1299
name2 = name2u.encode('utf8')
1301
# All of the abspaths should be in unicode, all of the relative paths
1303
expected_dirblocks = [
1305
[(name0, name0, 'file', './' + name0u),
1306
(name1, name1, 'directory', './' + name1u),
1307
(name2, name2, 'file', './' + name2u),
1310
((name1, './' + name1u),
1311
[(name1 + '/' + name0, name0, 'file', './' + name1u
1313
(name1 + '/' + name1, name1, 'directory', './' + name1u
1317
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1322
result = list(osutils._walkdirs_utf8(u'.'))
1323
self._filter_out_stat(result)
1324
self.assertEqual(expected_dirblocks, result)
1326
def assertStatIsCorrect(self, path, win32stat):
1327
os_stat = os.stat(path)
1328
self.assertEqual(os_stat.st_size, win32stat.st_size)
1329
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1330
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1331
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1332
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1333
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1334
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1336
def test__walkdirs_utf_win32_find_file_stat_file(self):
1337
"""make sure our Stat values are valid"""
1338
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1339
self.requireFeature(tests.UnicodeFilenameFeature)
1340
from bzrlib._walkdirs_win32 import Win32ReadDir
1341
name0u = u'0file-\xb6'
1342
name0 = name0u.encode('utf8')
1343
self.build_tree([name0u])
1344
# I hate to sleep() here, but I'm trying to make the ctime different
1347
f = open(name0u, 'ab')
1349
f.write('just a small update')
1353
result = Win32ReadDir().read_dir('', u'.')
1355
self.assertEqual((name0, name0, 'file'), entry[:3])
1356
self.assertEqual(u'./' + name0u, entry[4])
1357
self.assertStatIsCorrect(entry[4], entry[3])
1358
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1360
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1361
"""make sure our Stat values are valid"""
1362
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1363
self.requireFeature(tests.UnicodeFilenameFeature)
1364
from bzrlib._walkdirs_win32 import Win32ReadDir
1365
name0u = u'0dir-\u062c\u0648'
1366
name0 = name0u.encode('utf8')
1367
self.build_tree([name0u + '/'])
1369
result = Win32ReadDir().read_dir('', u'.')
1371
self.assertEqual((name0, name0, 'directory'), entry[:3])
1372
self.assertEqual(u'./' + name0u, entry[4])
1373
self.assertStatIsCorrect(entry[4], entry[3])
1375
def assertPathCompare(self, path_less, path_greater):
1376
"""check that path_less and path_greater compare correctly."""
1377
self.assertEqual(0, osutils.compare_paths_prefix_order(
1378
path_less, path_less))
1379
self.assertEqual(0, osutils.compare_paths_prefix_order(
1380
path_greater, path_greater))
1381
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1382
path_less, path_greater))
1383
self.assertEqual(1, osutils.compare_paths_prefix_order(
1384
path_greater, path_less))
1386
def test_compare_paths_prefix_order(self):
1387
# root before all else
1388
self.assertPathCompare("/", "/a")
1389
# alpha within a dir
1390
self.assertPathCompare("/a", "/b")
1391
self.assertPathCompare("/b", "/z")
1392
# high dirs before lower.
1393
self.assertPathCompare("/z", "/a/a")
1394
# except if the deeper dir should be output first
1395
self.assertPathCompare("/a/b/c", "/d/g")
1396
# lexical betwen dirs of the same height
1397
self.assertPathCompare("/a/z", "/z/z")
1398
self.assertPathCompare("/a/c/z", "/a/d/e")
1400
# this should also be consistent for no leading / paths
1401
# root before all else
1402
self.assertPathCompare("", "a")
1403
# alpha within a dir
1404
self.assertPathCompare("a", "b")
1405
self.assertPathCompare("b", "z")
1406
# high dirs before lower.
1407
self.assertPathCompare("z", "a/a")
1408
# except if the deeper dir should be output first
1409
self.assertPathCompare("a/b/c", "d/g")
1410
# lexical betwen dirs of the same height
1411
self.assertPathCompare("a/z", "z/z")
1412
self.assertPathCompare("a/c/z", "a/d/e")
1414
def test_path_prefix_sorting(self):
1415
"""Doing a sort on path prefix should match our sample data."""
1430
dir_sorted_paths = [
1446
sorted(original_paths, key=osutils.path_prefix_key))
1447
# using the comparison routine shoudl work too:
1450
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1453
class TestCopyTree(tests.TestCaseInTempDir):
1455
def test_copy_basic_tree(self):
1456
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1457
osutils.copy_tree('source', 'target')
1458
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1459
self.assertEqual(['c'], os.listdir('target/b'))
1461
def test_copy_tree_target_exists(self):
1462
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1464
osutils.copy_tree('source', 'target')
1465
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1466
self.assertEqual(['c'], os.listdir('target/b'))
1468
def test_copy_tree_symlinks(self):
1469
self.requireFeature(tests.SymlinkFeature)
1470
self.build_tree(['source/'])
1471
os.symlink('a/generic/path', 'source/lnk')
1472
osutils.copy_tree('source', 'target')
1473
self.assertEqual(['lnk'], os.listdir('target'))
1474
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1476
def test_copy_tree_handlers(self):
1477
processed_files = []
1478
processed_links = []
1479
def file_handler(from_path, to_path):
1480
processed_files.append(('f', from_path, to_path))
1481
def dir_handler(from_path, to_path):
1482
processed_files.append(('d', from_path, to_path))
1483
def link_handler(from_path, to_path):
1484
processed_links.append((from_path, to_path))
1485
handlers = {'file':file_handler,
1486
'directory':dir_handler,
1487
'symlink':link_handler,
1490
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1491
if osutils.has_symlinks():
1492
os.symlink('a/generic/path', 'source/lnk')
1493
osutils.copy_tree('source', 'target', handlers=handlers)
1495
self.assertEqual([('d', 'source', 'target'),
1496
('f', 'source/a', 'target/a'),
1497
('d', 'source/b', 'target/b'),
1498
('f', 'source/b/c', 'target/b/c'),
1500
self.failIfExists('target')
1501
if osutils.has_symlinks():
1502
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1505
class TestSetUnsetEnv(tests.TestCase):
1506
"""Test updating the environment"""
1509
super(TestSetUnsetEnv, self).setUp()
1511
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1512
'Environment was not cleaned up properly.'
1513
' Variable BZR_TEST_ENV_VAR should not exist.')
1515
if 'BZR_TEST_ENV_VAR' in os.environ:
1516
del os.environ['BZR_TEST_ENV_VAR']
1518
self.addCleanup(cleanup)
1521
"""Test that we can set an env variable"""
1522
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1523
self.assertEqual(None, old)
1524
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1526
def test_double_set(self):
1527
"""Test that we get the old value out"""
1528
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1529
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1530
self.assertEqual('foo', old)
1531
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1533
def test_unicode(self):
1534
"""Environment can only contain plain strings
1536
So Unicode strings must be encoded.
1538
uni_val, env_val = tests.probe_unicode_in_user_encoding()
1540
raise tests.TestSkipped(
1541
'Cannot find a unicode character that works in encoding %s'
1542
% (osutils.get_user_encoding(),))
1544
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1545
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1547
def test_unset(self):
1548
"""Test that passing None will remove the env var"""
1549
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1550
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1551
self.assertEqual('foo', old)
1552
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1553
self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1556
class TestSizeShaFile(tests.TestCaseInTempDir):
1558
def test_sha_empty(self):
1559
self.build_tree_contents([('foo', '')])
1560
expected_sha = osutils.sha_string('')
1562
self.addCleanup(f.close)
1563
size, sha = osutils.size_sha_file(f)
1564
self.assertEqual(0, size)
1565
self.assertEqual(expected_sha, sha)
1567
def test_sha_mixed_endings(self):
1568
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1569
self.build_tree_contents([('foo', text)])
1570
expected_sha = osutils.sha_string(text)
1572
self.addCleanup(f.close)
1573
size, sha = osutils.size_sha_file(f)
1574
self.assertEqual(38, size)
1575
self.assertEqual(expected_sha, sha)
1578
class TestShaFileByName(tests.TestCaseInTempDir):
1580
def test_sha_empty(self):
1581
self.build_tree_contents([('foo', '')])
1582
expected_sha = osutils.sha_string('')
1583
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1585
def test_sha_mixed_endings(self):
1586
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1587
self.build_tree_contents([('foo', text)])
1588
expected_sha = osutils.sha_string(text)
1589
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1592
class TestResourceLoading(tests.TestCaseInTempDir):
1594
def test_resource_string(self):
1595
# test resource in bzrlib
1596
text = osutils.resource_string('bzrlib', 'debug.py')
1597
self.assertContainsRe(text, "debug_flags = set()")
1598
# test resource under bzrlib
1599
text = osutils.resource_string('bzrlib.ui', 'text.py')
1600
self.assertContainsRe(text, "class TextUIFactory")
1601
# test unsupported package
1602
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1604
# test unknown resource
1605
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1608
class TestReCompile(tests.TestCase):
1610
def test_re_compile_checked(self):
1611
r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1612
self.assertTrue(r.match('aaaa'))
1613
self.assertTrue(r.match('aAaA'))
1615
def test_re_compile_checked_error(self):
1616
# like https://bugs.launchpad.net/bzr/+bug/251352
1617
err = self.assertRaises(
1618
errors.BzrCommandError,
1619
osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1621
"Invalid regular expression in test case: '*': "
1622
"nothing to repeat",
1626
class TestDirReader(tests.TestCaseInTempDir):
1629
_dir_reader_class = None
1630
_native_to_unicode = None
1633
tests.TestCaseInTempDir.setUp(self)
1635
# Save platform specific info and reset it
1636
cur_dir_reader = osutils._selected_dir_reader
1639
osutils._selected_dir_reader = cur_dir_reader
1640
self.addCleanup(restore)
1642
osutils._selected_dir_reader = self._dir_reader_class()
1644
def _get_ascii_tree(self):
1652
expected_dirblocks = [
1654
[('0file', '0file', 'file'),
1655
('1dir', '1dir', 'directory'),
1656
('2file', '2file', 'file'),
1659
(('1dir', './1dir'),
1660
[('1dir/0file', '0file', 'file'),
1661
('1dir/1dir', '1dir', 'directory'),
1664
(('1dir/1dir', './1dir/1dir'),
1669
return tree, expected_dirblocks
1671
def test_walk_cur_dir(self):
1672
tree, expected_dirblocks = self._get_ascii_tree()
1673
self.build_tree(tree)
1674
result = list(osutils._walkdirs_utf8('.'))
1675
# Filter out stat and abspath
1676
self.assertEqual(expected_dirblocks,
1677
[(dirinfo, [line[0:3] for line in block])
1678
for dirinfo, block in result])
1680
def test_walk_sub_dir(self):
1681
tree, expected_dirblocks = self._get_ascii_tree()
1682
self.build_tree(tree)
1683
# you can search a subdir only, with a supplied prefix.
1684
result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1685
# Filter out stat and abspath
1686
self.assertEqual(expected_dirblocks[1:],
1687
[(dirinfo, [line[0:3] for line in block])
1688
for dirinfo, block in result])
1690
def _get_unicode_tree(self):
1691
name0u = u'0file-\xb6'
1692
name1u = u'1dir-\u062c\u0648'
1693
name2u = u'2file-\u0633'
1697
name1u + '/' + name0u,
1698
name1u + '/' + name1u + '/',
1701
name0 = name0u.encode('UTF-8')
1702
name1 = name1u.encode('UTF-8')
1703
name2 = name2u.encode('UTF-8')
1704
expected_dirblocks = [
1706
[(name0, name0, 'file', './' + name0u),
1707
(name1, name1, 'directory', './' + name1u),
1708
(name2, name2, 'file', './' + name2u),
1711
((name1, './' + name1u),
1712
[(name1 + '/' + name0, name0, 'file', './' + name1u
1714
(name1 + '/' + name1, name1, 'directory', './' + name1u
1718
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1723
return tree, expected_dirblocks
1725
def _filter_out(self, raw_dirblocks):
1726
"""Filter out a walkdirs_utf8 result.
1728
stat field is removed, all native paths are converted to unicode
1730
filtered_dirblocks = []
1731
for dirinfo, block in raw_dirblocks:
1732
dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1735
details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1736
filtered_dirblocks.append((dirinfo, details))
1737
return filtered_dirblocks
1739
def test_walk_unicode_tree(self):
1740
self.requireFeature(tests.UnicodeFilenameFeature)
1741
tree, expected_dirblocks = self._get_unicode_tree()
1742
self.build_tree(tree)
1743
result = list(osutils._walkdirs_utf8('.'))
1744
self.assertEqual(expected_dirblocks, self._filter_out(result))
1746
def test_symlink(self):
1747
self.requireFeature(tests.SymlinkFeature)
1748
self.requireFeature(tests.UnicodeFilenameFeature)
1749
target = u'target\N{Euro Sign}'
1750
link_name = u'l\N{Euro Sign}nk'
1751
os.symlink(target, link_name)
1752
target_utf8 = target.encode('UTF-8')
1753
link_name_utf8 = link_name.encode('UTF-8')
1754
expected_dirblocks = [
1756
[(link_name_utf8, link_name_utf8,
1757
'symlink', './' + link_name),],
1759
result = list(osutils._walkdirs_utf8('.'))
1760
self.assertEqual(expected_dirblocks, self._filter_out(result))
1763
class TestReadLink(tests.TestCaseInTempDir):
1764
"""Exposes os.readlink() problems and the osutils solution.
1766
The only guarantee offered by os.readlink(), starting with 2.6, is that a
1767
unicode string will be returned if a unicode string is passed.
1769
But prior python versions failed to properly encode the passed unicode
1772
_test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1775
super(tests.TestCaseInTempDir, self).setUp()
1776
self.link = u'l\N{Euro Sign}ink'
1777
self.target = u'targe\N{Euro Sign}t'
1778
os.symlink(self.target, self.link)
1780
def test_os_readlink_link_encoding(self):
1781
if sys.version_info < (2, 6):
1782
self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1784
self.assertEquals(self.target, os.readlink(self.link))
1786
def test_os_readlink_link_decoding(self):
1787
self.assertEquals(self.target.encode(osutils._fs_enc),
1788
os.readlink(self.link.encode(osutils._fs_enc)))
1791
class TestConcurrency(tests.TestCase):
1793
def test_local_concurrency(self):
1794
concurrency = osutils.local_concurrency()
1795
self.assertIsInstance(concurrency, int)