/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_osutils.py

  • Committer: Jelmer Vernooij
  • Date: 2017-08-07 11:49:46 UTC
  • mto: (6747.3.4 avoid-set-revid-3)
  • mto: This revision was merged to the branch mainline in revision 6750.
  • Revision ID: jelmer@jelmer.uk-20170807114946-luclmxuawyzhpiot
Avoid setting revision_ids.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the osutils wrapper."""
 
18
 
 
19
from __future__ import absolute_import, division
 
20
 
 
21
import errno
 
22
import os
 
23
import re
 
24
import select
 
25
import socket
 
26
import sys
 
27
import tempfile
 
28
import time
 
29
 
 
30
from .. import (
 
31
    errors,
 
32
    lazy_regex,
 
33
    osutils,
 
34
    symbol_versioning,
 
35
    tests,
 
36
    trace,
 
37
    win32utils,
 
38
    )
 
39
from ..sixish import (
 
40
    BytesIO,
 
41
    )
 
42
from . import (
 
43
    features,
 
44
    file_utils,
 
45
    test__walkdirs_win32,
 
46
    )
 
47
from .scenarios import load_tests_apply_scenarios
 
48
 
 
49
 
 
50
class _UTF8DirReaderFeature(features.Feature):
 
51
 
 
52
    def _probe(self):
 
53
        try:
 
54
            from .. import _readdir_pyx
 
55
            self.reader = _readdir_pyx.UTF8DirReader
 
56
            return True
 
57
        except ImportError:
 
58
            return False
 
59
 
 
60
    def feature_name(self):
 
61
        return 'breezy._readdir_pyx'
 
62
 
 
63
UTF8DirReaderFeature = features.ModuleAvailableFeature('breezy._readdir_pyx')
 
64
 
 
65
term_ios_feature = features.ModuleAvailableFeature('termios')
 
66
 
 
67
 
 
68
def _already_unicode(s):
 
69
    return s
 
70
 
 
71
 
 
72
def _utf8_to_unicode(s):
 
73
    return s.decode('UTF-8')
 
74
 
 
75
 
 
76
def dir_reader_scenarios():
 
77
    # For each dir reader we define:
 
78
 
 
79
    # - native_to_unicode: a function converting the native_abspath as returned
 
80
    #   by DirReader.read_dir to its unicode representation
 
81
 
 
82
    # UnicodeDirReader is the fallback, it should be tested on all platforms.
 
83
    scenarios = [('unicode',
 
84
                  dict(_dir_reader_class=osutils.UnicodeDirReader,
 
85
                       _native_to_unicode=_already_unicode))]
 
86
    # Some DirReaders are platform specific and even there they may not be
 
87
    # available.
 
88
    if UTF8DirReaderFeature.available():
 
89
        from .. import _readdir_pyx
 
90
        scenarios.append(('utf8',
 
91
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
 
92
                               _native_to_unicode=_utf8_to_unicode)))
 
93
 
 
94
    if test__walkdirs_win32.win32_readdir_feature.available():
 
95
        try:
 
96
            from .. import _walkdirs_win32
 
97
            scenarios.append(
 
98
                ('win32',
 
99
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
 
100
                      _native_to_unicode=_already_unicode)))
 
101
        except ImportError:
 
102
            pass
 
103
    return scenarios
 
104
 
 
105
 
 
106
load_tests = load_tests_apply_scenarios
 
107
 
 
108
 
 
109
class TestContainsWhitespace(tests.TestCase):
 
110
 
 
111
    def test_contains_whitespace(self):
 
112
        self.assertTrue(osutils.contains_whitespace(u' '))
 
113
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
 
114
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
 
115
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
 
116
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
 
117
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
 
118
 
 
119
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
 
120
        # is whitespace, but we do not.
 
121
        self.assertFalse(osutils.contains_whitespace(u''))
 
122
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
 
123
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
 
124
 
 
125
 
 
126
class TestRename(tests.TestCaseInTempDir):
 
127
 
 
128
    def create_file(self, filename, content):
 
129
        f = open(filename, 'wb')
 
130
        try:
 
131
            f.write(content)
 
132
        finally:
 
133
            f.close()
 
134
 
 
135
    def _fancy_rename(self, a, b):
 
136
        osutils.fancy_rename(a, b, rename_func=os.rename,
 
137
                             unlink_func=os.unlink)
 
138
 
 
139
    def test_fancy_rename(self):
 
140
        # This should work everywhere
 
141
        self.create_file('a', 'something in a\n')
 
142
        self._fancy_rename('a', 'b')
 
143
        self.assertPathDoesNotExist('a')
 
144
        self.assertPathExists('b')
 
145
        self.check_file_contents('b', 'something in a\n')
 
146
 
 
147
        self.create_file('a', 'new something in a\n')
 
148
        self._fancy_rename('b', 'a')
 
149
 
 
150
        self.check_file_contents('a', 'something in a\n')
 
151
 
 
152
    def test_fancy_rename_fails_source_missing(self):
 
153
        # An exception should be raised, and the target should be left in place
 
154
        self.create_file('target', 'data in target\n')
 
155
        self.assertRaises((IOError, OSError), self._fancy_rename,
 
156
                          'missingsource', 'target')
 
157
        self.assertPathExists('target')
 
158
        self.check_file_contents('target', 'data in target\n')
 
159
 
 
160
    def test_fancy_rename_fails_if_source_and_target_missing(self):
 
161
        self.assertRaises((IOError, OSError), self._fancy_rename,
 
162
                          'missingsource', 'missingtarget')
 
163
 
 
164
    def test_rename(self):
 
165
        # Rename should be semi-atomic on all platforms
 
166
        self.create_file('a', 'something in a\n')
 
167
        osutils.rename('a', 'b')
 
168
        self.assertPathDoesNotExist('a')
 
169
        self.assertPathExists('b')
 
170
        self.check_file_contents('b', 'something in a\n')
 
171
 
 
172
        self.create_file('a', 'new something in a\n')
 
173
        osutils.rename('b', 'a')
 
174
 
 
175
        self.check_file_contents('a', 'something in a\n')
 
176
 
 
177
    # TODO: test fancy_rename using a MemoryTransport
 
178
 
 
179
    def test_rename_change_case(self):
 
180
        # on Windows we should be able to change filename case by rename
 
181
        self.build_tree(['a', 'b/'])
 
182
        osutils.rename('a', 'A')
 
183
        osutils.rename('b', 'B')
 
184
        # we can't use failUnlessExists on case-insensitive filesystem
 
185
        # so try to check shape of the tree
 
186
        shape = sorted(os.listdir('.'))
 
187
        self.assertEqual(['A', 'B'], shape)
 
188
 
 
189
    def test_rename_exception(self):
 
190
        try:
 
191
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
 
192
        except OSError as e:
 
193
            self.assertEqual(e.old_filename, 'nonexistent_path')
 
194
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
 
195
            self.assertTrue('nonexistent_path' in e.strerror)
 
196
            self.assertTrue('different_nonexistent_path' in e.strerror)
 
197
 
 
198
 
 
199
class TestRandChars(tests.TestCase):
 
200
 
 
201
    def test_01_rand_chars_empty(self):
 
202
        result = osutils.rand_chars(0)
 
203
        self.assertEqual(result, '')
 
204
 
 
205
    def test_02_rand_chars_100(self):
 
206
        result = osutils.rand_chars(100)
 
207
        self.assertEqual(len(result), 100)
 
208
        self.assertEqual(type(result), str)
 
209
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
 
210
 
 
211
 
 
212
class TestIsInside(tests.TestCase):
 
213
 
 
214
    def test_is_inside(self):
 
215
        is_inside = osutils.is_inside
 
216
        self.assertTrue(is_inside('src', 'src/foo.c'))
 
217
        self.assertFalse(is_inside('src', 'srccontrol'))
 
218
        self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
 
219
        self.assertTrue(is_inside('foo.c', 'foo.c'))
 
220
        self.assertFalse(is_inside('foo.c', ''))
 
221
        self.assertTrue(is_inside('', 'foo.c'))
 
222
 
 
223
    def test_is_inside_any(self):
 
224
        SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
 
225
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
 
226
                         (['src'], SRC_FOO_C),
 
227
                         (['src'], 'src'),
 
228
                         ]:
 
229
            self.assertTrue(osutils.is_inside_any(dirs, fn))
 
230
        for dirs, fn in [(['src'], 'srccontrol'),
 
231
                         (['src'], 'srccontrol/foo')]:
 
232
            self.assertFalse(osutils.is_inside_any(dirs, fn))
 
233
 
 
234
    def test_is_inside_or_parent_of_any(self):
 
235
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
 
236
                         (['src'], 'src/foo.c'),
 
237
                         (['src/bar.c'], 'src'),
 
238
                         (['src/bar.c', 'bla/foo.c'], 'src'),
 
239
                         (['src'], 'src'),
 
240
                         ]:
 
241
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
 
242
 
 
243
        for dirs, fn in [(['src'], 'srccontrol'),
 
244
                         (['srccontrol/foo.c'], 'src'),
 
245
                         (['src'], 'srccontrol/foo')]:
 
246
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
 
247
 
 
248
 
 
249
class TestLstat(tests.TestCaseInTempDir):
 
250
 
 
251
    def test_lstat_matches_fstat(self):
 
252
        # On Windows, lstat and fstat don't always agree, primarily in the
 
253
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
 
254
        # custom implementation.
 
255
        if sys.platform == 'win32':
 
256
            # We only have special lstat/fstat if we have the extension.
 
257
            # Without it, we may end up re-reading content when we don't have
 
258
            # to, but otherwise it doesn't effect correctness.
 
259
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
260
        f = open('test-file.txt', 'wb')
 
261
        self.addCleanup(f.close)
 
262
        f.write('some content\n')
 
263
        f.flush()
 
264
        self.assertEqualStat(osutils.fstat(f.fileno()),
 
265
                             osutils.lstat('test-file.txt'))
 
266
 
 
267
 
 
268
class TestRmTree(tests.TestCaseInTempDir):
 
269
 
 
270
    def test_rmtree(self):
 
271
        # Check to remove tree with read-only files/dirs
 
272
        os.mkdir('dir')
 
273
        f = file('dir/file', 'w')
 
274
        f.write('spam')
 
275
        f.close()
 
276
        # would like to also try making the directory readonly, but at the
 
277
        # moment python shutil.rmtree doesn't handle that properly - it would
 
278
        # need to chmod the directory before removing things inside it - deferred
 
279
        # for now -- mbp 20060505
 
280
        # osutils.make_readonly('dir')
 
281
        osutils.make_readonly('dir/file')
 
282
 
 
283
        osutils.rmtree('dir')
 
284
 
 
285
        self.assertPathDoesNotExist('dir/file')
 
286
        self.assertPathDoesNotExist('dir')
 
287
 
 
288
 
 
289
class TestDeleteAny(tests.TestCaseInTempDir):
 
290
 
 
291
    def test_delete_any_readonly(self):
 
292
        # from <https://bugs.launchpad.net/bzr/+bug/218206>
 
293
        self.build_tree(['d/', 'f'])
 
294
        osutils.make_readonly('d')
 
295
        osutils.make_readonly('f')
 
296
 
 
297
        osutils.delete_any('f')
 
298
        osutils.delete_any('d')
 
299
 
 
300
 
 
301
class TestKind(tests.TestCaseInTempDir):
 
302
 
 
303
    def test_file_kind(self):
 
304
        self.build_tree(['file', 'dir/'])
 
305
        self.assertEqual('file', osutils.file_kind('file'))
 
306
        self.assertEqual('directory', osutils.file_kind('dir/'))
 
307
        if osutils.has_symlinks():
 
308
            os.symlink('symlink', 'symlink')
 
309
            self.assertEqual('symlink', osutils.file_kind('symlink'))
 
310
 
 
311
        # TODO: jam 20060529 Test a block device
 
312
        try:
 
313
            os.lstat('/dev/null')
 
314
        except OSError as e:
 
315
            if e.errno not in (errno.ENOENT,):
 
316
                raise
 
317
        else:
 
318
            self.assertEqual('chardev', osutils.file_kind('/dev/null'))
 
319
 
 
320
        mkfifo = getattr(os, 'mkfifo', None)
 
321
        if mkfifo:
 
322
            mkfifo('fifo')
 
323
            try:
 
324
                self.assertEqual('fifo', osutils.file_kind('fifo'))
 
325
            finally:
 
326
                os.remove('fifo')
 
327
 
 
328
        AF_UNIX = getattr(socket, 'AF_UNIX', None)
 
329
        if AF_UNIX:
 
330
            s = socket.socket(AF_UNIX)
 
331
            s.bind('socket')
 
332
            try:
 
333
                self.assertEqual('socket', osutils.file_kind('socket'))
 
334
            finally:
 
335
                os.remove('socket')
 
336
 
 
337
    def test_kind_marker(self):
 
338
        self.assertEqual("", osutils.kind_marker("file"))
 
339
        self.assertEqual("/", osutils.kind_marker('directory'))
 
340
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
 
341
        self.assertEqual("@", osutils.kind_marker("symlink"))
 
342
        self.assertEqual("+", osutils.kind_marker("tree-reference"))
 
343
        self.assertEqual("", osutils.kind_marker("fifo"))
 
344
        self.assertEqual("", osutils.kind_marker("socket"))
 
345
        self.assertEqual("", osutils.kind_marker("unknown"))
 
346
 
 
347
 
 
348
class TestUmask(tests.TestCaseInTempDir):
 
349
 
 
350
    def test_get_umask(self):
 
351
        if sys.platform == 'win32':
 
352
            # umask always returns '0', no way to set it
 
353
            self.assertEqual(0, osutils.get_umask())
 
354
            return
 
355
 
 
356
        orig_umask = osutils.get_umask()
 
357
        self.addCleanup(os.umask, orig_umask)
 
358
        os.umask(0o222)
 
359
        self.assertEqual(0o222, osutils.get_umask())
 
360
        os.umask(0o022)
 
361
        self.assertEqual(0o022, osutils.get_umask())
 
362
        os.umask(0o002)
 
363
        self.assertEqual(0o002, osutils.get_umask())
 
364
        os.umask(0o027)
 
365
        self.assertEqual(0o027, osutils.get_umask())
 
366
 
 
367
 
 
368
class TestDateTime(tests.TestCase):
 
369
 
 
370
    def assertFormatedDelta(self, expected, seconds):
 
371
        """Assert osutils.format_delta formats as expected"""
 
372
        actual = osutils.format_delta(seconds)
 
373
        self.assertEqual(expected, actual)
 
374
 
 
375
    def test_format_delta(self):
 
376
        self.assertFormatedDelta('0 seconds ago', 0)
 
377
        self.assertFormatedDelta('1 second ago', 1)
 
378
        self.assertFormatedDelta('10 seconds ago', 10)
 
379
        self.assertFormatedDelta('59 seconds ago', 59)
 
380
        self.assertFormatedDelta('89 seconds ago', 89)
 
381
        self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
 
382
        self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
 
383
        self.assertFormatedDelta('3 minutes, 1 second ago', 181)
 
384
        self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
 
385
        self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
 
386
        self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
 
387
        self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
 
388
        self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
 
389
        self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
 
390
        self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
 
391
        self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
 
392
        self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
 
393
        self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
 
394
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
 
395
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
 
396
        self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
 
397
        self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
 
398
        self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
 
399
 
 
400
        # We handle when time steps the wrong direction because computers
 
401
        # don't have synchronized clocks.
 
402
        self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
 
403
        self.assertFormatedDelta('1 second in the future', -1)
 
404
        self.assertFormatedDelta('2 seconds in the future', -2)
 
405
 
 
406
    def test_format_date(self):
 
407
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
 
408
            osutils.format_date, 0, timezone='foo')
 
409
        self.assertIsInstance(osutils.format_date(0), str)
 
410
        self.assertIsInstance(osutils.format_local_date(0), unicode)
 
411
        # Testing for the actual value of the local weekday without
 
412
        # duplicating the code from format_date is difficult.
 
413
        # Instead blackbox.test_locale should check for localized
 
414
        # dates once they do occur in output strings.
 
415
 
 
416
    def test_format_date_with_offset_in_original_timezone(self):
 
417
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
 
418
            osutils.format_date_with_offset_in_original_timezone(0))
 
419
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
 
420
            osutils.format_date_with_offset_in_original_timezone(100000))
 
421
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
 
422
            osutils.format_date_with_offset_in_original_timezone(100000, 7200))
 
423
 
 
424
    def test_local_time_offset(self):
 
425
        """Test that local_time_offset() returns a sane value."""
 
426
        offset = osutils.local_time_offset()
 
427
        self.assertTrue(isinstance(offset, int))
 
428
        # Test that the offset is no more than a eighteen hours in
 
429
        # either direction.
 
430
        # Time zone handling is system specific, so it is difficult to
 
431
        # do more specific tests, but a value outside of this range is
 
432
        # probably wrong.
 
433
        eighteen_hours = 18 * 3600
 
434
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
435
 
 
436
    def test_local_time_offset_with_timestamp(self):
 
437
        """Test that local_time_offset() works with a timestamp."""
 
438
        offset = osutils.local_time_offset(1000000000.1234567)
 
439
        self.assertTrue(isinstance(offset, int))
 
440
        eighteen_hours = 18 * 3600
 
441
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
442
 
 
443
 
 
444
class TestFdatasync(tests.TestCaseInTempDir):
 
445
 
 
446
    def do_fdatasync(self):
 
447
        f = tempfile.NamedTemporaryFile()
 
448
        osutils.fdatasync(f.fileno())
 
449
        f.close()
 
450
 
 
451
    @staticmethod
 
452
    def raise_eopnotsupp(*args, **kwargs):
 
453
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
 
454
 
 
455
    @staticmethod
 
456
    def raise_enotsup(*args, **kwargs):
 
457
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
 
458
 
 
459
    def test_fdatasync_handles_system_function(self):
 
460
        self.overrideAttr(os, "fdatasync")
 
461
        self.do_fdatasync()
 
462
 
 
463
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
 
464
        self.overrideAttr(os, "fdatasync")
 
465
        self.overrideAttr(os, "fsync")
 
466
        self.do_fdatasync()
 
467
 
 
468
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
 
469
        self.overrideAttr(errno, "EOPNOTSUPP")
 
470
        self.do_fdatasync()
 
471
 
 
472
    def test_fdatasync_catches_ENOTSUP(self):
 
473
        enotsup = getattr(errno, "ENOTSUP", None)
 
474
        if enotsup is None:
 
475
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
 
476
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
 
477
        self.do_fdatasync()
 
478
 
 
479
    def test_fdatasync_catches_EOPNOTSUPP(self):
 
480
        enotsup = getattr(errno, "EOPNOTSUPP", None)
 
481
        if enotsup is None:
 
482
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
 
483
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
 
484
        self.do_fdatasync()
 
485
 
 
486
 
 
487
class TestLinks(tests.TestCaseInTempDir):
 
488
 
 
489
    def test_dereference_path(self):
 
490
        self.requireFeature(features.SymlinkFeature)
 
491
        cwd = osutils.realpath('.')
 
492
        os.mkdir('bar')
 
493
        bar_path = osutils.pathjoin(cwd, 'bar')
 
494
        # Using './' to avoid bug #1213894 (first path component not
 
495
        # dereferenced) in Python 2.4.1 and earlier
 
496
        self.assertEqual(bar_path, osutils.realpath('./bar'))
 
497
        os.symlink('bar', 'foo')
 
498
        self.assertEqual(bar_path, osutils.realpath('./foo'))
 
499
 
 
500
        # Does not dereference terminal symlinks
 
501
        foo_path = osutils.pathjoin(cwd, 'foo')
 
502
        self.assertEqual(foo_path, osutils.dereference_path('./foo'))
 
503
 
 
504
        # Dereferences parent symlinks
 
505
        os.mkdir('bar/baz')
 
506
        baz_path = osutils.pathjoin(bar_path, 'baz')
 
507
        self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
 
508
 
 
509
        # Dereferences parent symlinks that are the first path element
 
510
        self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
 
511
 
 
512
        # Dereferences parent symlinks in absolute paths
 
513
        foo_baz_path = osutils.pathjoin(foo_path, 'baz')
 
514
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
 
515
 
 
516
    def test_changing_access(self):
 
517
        f = file('file', 'w')
 
518
        f.write('monkey')
 
519
        f.close()
 
520
 
 
521
        # Make a file readonly
 
522
        osutils.make_readonly('file')
 
523
        mode = os.lstat('file').st_mode
 
524
        self.assertEqual(mode, mode & 0o777555)
 
525
 
 
526
        # Make a file writable
 
527
        osutils.make_writable('file')
 
528
        mode = os.lstat('file').st_mode
 
529
        self.assertEqual(mode, mode | 0o200)
 
530
 
 
531
        if osutils.has_symlinks():
 
532
            # should not error when handed a symlink
 
533
            os.symlink('nonexistent', 'dangling')
 
534
            osutils.make_readonly('dangling')
 
535
            osutils.make_writable('dangling')
 
536
 
 
537
    def test_host_os_dereferences_symlinks(self):
 
538
        osutils.host_os_dereferences_symlinks()
 
539
 
 
540
 
 
541
class TestCanonicalRelPath(tests.TestCaseInTempDir):
 
542
 
 
543
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
 
544
 
 
545
    def test_canonical_relpath_simple(self):
 
546
        f = file('MixedCaseName', 'w')
 
547
        f.close()
 
548
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
 
549
        self.assertEqual('work/MixedCaseName', actual)
 
550
 
 
551
    def test_canonical_relpath_missing_tail(self):
 
552
        os.mkdir('MixedCaseParent')
 
553
        actual = osutils.canonical_relpath(self.test_base_dir,
 
554
                                           'mixedcaseparent/nochild')
 
555
        self.assertEqual('work/MixedCaseParent/nochild', actual)
 
556
 
 
557
 
 
558
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
 
559
 
 
560
    def assertRelpath(self, expected, base, path):
 
561
        actual = osutils._cicp_canonical_relpath(base, path)
 
562
        self.assertEqual(expected, actual)
 
563
 
 
564
    def test_simple(self):
 
565
        self.build_tree(['MixedCaseName'])
 
566
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
 
567
        self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
 
568
 
 
569
    def test_subdir_missing_tail(self):
 
570
        self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
 
571
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
 
572
        self.assertRelpath('MixedCaseParent/a_child', base,
 
573
                           'MixedCaseParent/a_child')
 
574
        self.assertRelpath('MixedCaseParent/a_child', base,
 
575
                           'MixedCaseParent/A_Child')
 
576
        self.assertRelpath('MixedCaseParent/not_child', base,
 
577
                           'MixedCaseParent/not_child')
 
578
 
 
579
    def test_at_root_slash(self):
 
580
        # We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
 
581
        # check...
 
582
        if osutils.MIN_ABS_PATHLENGTH > 1:
 
583
            raise tests.TestSkipped('relpath requires %d chars'
 
584
                                    % osutils.MIN_ABS_PATHLENGTH)
 
585
        self.assertRelpath('foo', '/', '/foo')
 
586
 
 
587
    def test_at_root_drive(self):
 
588
        if sys.platform != 'win32':
 
589
            raise tests.TestNotApplicable('we can only test drive-letter relative'
 
590
                                          ' paths on Windows where we have drive'
 
591
                                          ' letters.')
 
592
        # see bug #322807
 
593
        # The specific issue is that when at the root of a drive, 'abspath'
 
594
        # returns "C:/" or just "/". However, the code assumes that abspath
 
595
        # always returns something like "C:/foo" or "/foo" (no trailing slash).
 
596
        self.assertRelpath('foo', 'C:/', 'C:/foo')
 
597
        self.assertRelpath('foo', 'X:/', 'X:/foo')
 
598
        self.assertRelpath('foo', 'X:/', 'X://foo')
 
599
 
 
600
 
 
601
class TestPumpFile(tests.TestCase):
 
602
    """Test pumpfile method."""
 
603
 
 
604
    def setUp(self):
 
605
        super(TestPumpFile, self).setUp()
 
606
        # create a test datablock
 
607
        self.block_size = 512
 
608
        pattern = b'0123456789ABCDEF'
 
609
        self.test_data = pattern * (3 * self.block_size // len(pattern))
 
610
        self.test_data_len = len(self.test_data)
 
611
 
 
612
    def test_bracket_block_size(self):
 
613
        """Read data in blocks with the requested read size bracketing the
 
614
        block size."""
 
615
        # make sure test data is larger than max read size
 
616
        self.assertTrue(self.test_data_len > self.block_size)
 
617
 
 
618
        from_file = file_utils.FakeReadFile(self.test_data)
 
619
        to_file = BytesIO()
 
620
 
 
621
        # read (max // 2) bytes and verify read size wasn't affected
 
622
        num_bytes_to_read = self.block_size // 2
 
623
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
624
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
625
        self.assertEqual(from_file.get_read_count(), 1)
 
626
 
 
627
        # read (max) bytes and verify read size wasn't affected
 
628
        num_bytes_to_read = self.block_size
 
629
        from_file.reset_read_count()
 
630
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
631
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
632
        self.assertEqual(from_file.get_read_count(), 1)
 
633
 
 
634
        # read (max + 1) bytes and verify read size was limited
 
635
        num_bytes_to_read = self.block_size + 1
 
636
        from_file.reset_read_count()
 
637
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
638
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
639
        self.assertEqual(from_file.get_read_count(), 2)
 
640
 
 
641
        # finish reading the rest of the data
 
642
        num_bytes_to_read = self.test_data_len - to_file.tell()
 
643
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
644
 
 
645
        # report error if the data wasn't equal (we only report the size due
 
646
        # to the length of the data)
 
647
        response_data = to_file.getvalue()
 
648
        if response_data != self.test_data:
 
649
            message = "Data not equal.  Expected %d bytes, received %d."
 
650
            self.fail(message % (len(response_data), self.test_data_len))
 
651
 
 
652
    def test_specified_size(self):
 
653
        """Request a transfer larger than the maximum block size and verify
 
654
        that the maximum read doesn't exceed the block_size."""
 
655
        # make sure test data is larger than max read size
 
656
        self.assertTrue(self.test_data_len > self.block_size)
 
657
 
 
658
        # retrieve data in blocks
 
659
        from_file = file_utils.FakeReadFile(self.test_data)
 
660
        to_file = BytesIO()
 
661
        osutils.pumpfile(from_file, to_file, self.test_data_len,
 
662
                         self.block_size)
 
663
 
 
664
        # verify read size was equal to the maximum read size
 
665
        self.assertTrue(from_file.get_max_read_size() > 0)
 
666
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
667
        self.assertEqual(from_file.get_read_count(), 3)
 
668
 
 
669
        # report error if the data wasn't equal (we only report the size due
 
670
        # to the length of the data)
 
671
        response_data = to_file.getvalue()
 
672
        if response_data != self.test_data:
 
673
            message = "Data not equal.  Expected %d bytes, received %d."
 
674
            self.fail(message % (len(response_data), self.test_data_len))
 
675
 
 
676
    def test_to_eof(self):
 
677
        """Read to end-of-file and verify that the reads are not larger than
 
678
        the maximum read size."""
 
679
        # make sure test data is larger than max read size
 
680
        self.assertTrue(self.test_data_len > self.block_size)
 
681
 
 
682
        # retrieve data to EOF
 
683
        from_file = file_utils.FakeReadFile(self.test_data)
 
684
        to_file = BytesIO()
 
685
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
 
686
 
 
687
        # verify read size was equal to the maximum read size
 
688
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
689
        self.assertEqual(from_file.get_read_count(), 4)
 
690
 
 
691
        # report error if the data wasn't equal (we only report the size due
 
692
        # to the length of the data)
 
693
        response_data = to_file.getvalue()
 
694
        if response_data != self.test_data:
 
695
            message = "Data not equal.  Expected %d bytes, received %d."
 
696
            self.fail(message % (len(response_data), self.test_data_len))
 
697
 
 
698
    def test_defaults(self):
 
699
        """Verifies that the default arguments will read to EOF -- this
 
700
        test verifies that any existing usages of pumpfile will not be broken
 
701
        with this new version."""
 
702
        # retrieve data using default (old) pumpfile method
 
703
        from_file = file_utils.FakeReadFile(self.test_data)
 
704
        to_file = BytesIO()
 
705
        osutils.pumpfile(from_file, to_file)
 
706
 
 
707
        # report error if the data wasn't equal (we only report the size due
 
708
        # to the length of the data)
 
709
        response_data = to_file.getvalue()
 
710
        if response_data != self.test_data:
 
711
            message = "Data not equal.  Expected %d bytes, received %d."
 
712
            self.fail(message % (len(response_data), self.test_data_len))
 
713
 
 
714
    def test_report_activity(self):
 
715
        activity = []
 
716
        def log_activity(length, direction):
 
717
            activity.append((length, direction))
 
718
        from_file = BytesIO(self.test_data)
 
719
        to_file = BytesIO()
 
720
        osutils.pumpfile(from_file, to_file, buff_size=500,
 
721
                         report_activity=log_activity, direction='read')
 
722
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
 
723
                          (36, 'read')], activity)
 
724
 
 
725
        from_file = BytesIO(self.test_data)
 
726
        to_file = BytesIO()
 
727
        del activity[:]
 
728
        osutils.pumpfile(from_file, to_file, buff_size=500,
 
729
                         report_activity=log_activity, direction='write')
 
730
        self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
 
731
                          (36, 'write')], activity)
 
732
 
 
733
        # And with a limited amount of data
 
734
        from_file = BytesIO(self.test_data)
 
735
        to_file = BytesIO()
 
736
        del activity[:]
 
737
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
 
738
                         report_activity=log_activity, direction='read')
 
739
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
740
 
 
741
 
 
742
 
 
743
class TestPumpStringFile(tests.TestCase):
 
744
 
 
745
    def test_empty(self):
 
746
        output = BytesIO()
 
747
        osutils.pump_string_file(b"", output)
 
748
        self.assertEqual(b"", output.getvalue())
 
749
 
 
750
    def test_more_than_segment_size(self):
 
751
        output = BytesIO()
 
752
        osutils.pump_string_file(b"123456789", output, 2)
 
753
        self.assertEqual(b"123456789", output.getvalue())
 
754
 
 
755
    def test_segment_size(self):
 
756
        output = BytesIO()
 
757
        osutils.pump_string_file(b"12", output, 2)
 
758
        self.assertEqual(b"12", output.getvalue())
 
759
 
 
760
    def test_segment_size_multiple(self):
 
761
        output = BytesIO()
 
762
        osutils.pump_string_file(b"1234", output, 2)
 
763
        self.assertEqual(b"1234", output.getvalue())
 
764
 
 
765
 
 
766
class TestRelpath(tests.TestCase):
 
767
 
 
768
    def test_simple_relpath(self):
 
769
        cwd = osutils.getcwd()
 
770
        subdir = cwd + '/subdir'
 
771
        self.assertEqual('subdir', osutils.relpath(cwd, subdir))
 
772
 
 
773
    def test_deep_relpath(self):
 
774
        cwd = osutils.getcwd()
 
775
        subdir = cwd + '/sub/subsubdir'
 
776
        self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
 
777
 
 
778
    def test_not_relative(self):
 
779
        self.assertRaises(errors.PathNotChild,
 
780
                          osutils.relpath, 'C:/path', 'H:/path')
 
781
        self.assertRaises(errors.PathNotChild,
 
782
                          osutils.relpath, 'C:/', 'H:/path')
 
783
 
 
784
 
 
785
class TestSafeUnicode(tests.TestCase):
 
786
 
 
787
    def test_from_ascii_string(self):
 
788
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
 
789
 
 
790
    def test_from_unicode_string_ascii_contents(self):
 
791
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
 
792
 
 
793
    def test_from_unicode_string_unicode_contents(self):
 
794
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
 
795
 
 
796
    def test_from_utf8_string(self):
 
797
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
 
798
 
 
799
    def test_bad_utf8_string(self):
 
800
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
801
                          osutils.safe_unicode,
 
802
                          '\xbb\xbb')
 
803
 
 
804
 
 
805
class TestSafeUtf8(tests.TestCase):
 
806
 
 
807
    def test_from_ascii_string(self):
 
808
        f = 'foobar'
 
809
        self.assertEqual('foobar', osutils.safe_utf8(f))
 
810
 
 
811
    def test_from_unicode_string_ascii_contents(self):
 
812
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
 
813
 
 
814
    def test_from_unicode_string_unicode_contents(self):
 
815
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
816
 
 
817
    def test_from_utf8_string(self):
 
818
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
 
819
 
 
820
    def test_bad_utf8_string(self):
 
821
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
822
                          osutils.safe_utf8, '\xbb\xbb')
 
823
 
 
824
 
 
825
class TestSafeRevisionId(tests.TestCase):
 
826
 
 
827
    def test_from_ascii_string(self):
 
828
        # this shouldn't give a warning because it's getting an ascii string
 
829
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
830
 
 
831
    def test_from_unicode_string_ascii_contents(self):
 
832
        self.assertRaises(TypeError,
 
833
                          osutils.safe_revision_id, u'bargam')
 
834
 
 
835
    def test_from_unicode_string_unicode_contents(self):
 
836
        self.assertRaises(TypeError,
 
837
                         osutils.safe_revision_id, u'bargam\xae')
 
838
 
 
839
    def test_from_utf8_string(self):
 
840
        self.assertEqual('foo\xc2\xae',
 
841
                         osutils.safe_revision_id('foo\xc2\xae'))
 
842
 
 
843
    def test_none(self):
 
844
        """Currently, None is a valid revision_id"""
 
845
        self.assertEqual(None, osutils.safe_revision_id(None))
 
846
 
 
847
 
 
848
class TestSafeFileId(tests.TestCase):
 
849
 
 
850
    def test_from_ascii_string(self):
 
851
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
852
 
 
853
    def test_from_unicode_string_ascii_contents(self):
 
854
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
 
855
 
 
856
    def test_from_unicode_string_unicode_contents(self):
 
857
        self.assertRaises(TypeError,
 
858
                          osutils.safe_file_id, u'bargam\xae')
 
859
 
 
860
    def test_from_utf8_string(self):
 
861
        self.assertEqual('foo\xc2\xae',
 
862
                         osutils.safe_file_id('foo\xc2\xae'))
 
863
 
 
864
    def test_none(self):
 
865
        """Currently, None is a valid revision_id"""
 
866
        self.assertEqual(None, osutils.safe_file_id(None))
 
867
 
 
868
 
 
869
class TestSendAll(tests.TestCase):
 
870
 
 
871
    def test_send_with_disconnected_socket(self):
 
872
        class DisconnectedSocket(object):
 
873
            def __init__(self, err):
 
874
                self.err = err
 
875
            def send(self, content):
 
876
                raise self.err
 
877
            def close(self):
 
878
                pass
 
879
        # All of these should be treated as ConnectionReset
 
880
        errs = []
 
881
        for err_cls in (IOError, socket.error):
 
882
            for errnum in osutils._end_of_stream_errors:
 
883
                errs.append(err_cls(errnum))
 
884
        for err in errs:
 
885
            sock = DisconnectedSocket(err)
 
886
            self.assertRaises(errors.ConnectionReset,
 
887
                osutils.send_all, sock, b'some more content')
 
888
 
 
889
    def test_send_with_no_progress(self):
 
890
        # See https://bugs.launchpad.net/bzr/+bug/1047309
 
891
        # It seems that paramiko can get into a state where it doesn't error,
 
892
        # but it returns 0 bytes sent for requests over and over again.
 
893
        class NoSendingSocket(object):
 
894
            def __init__(self):
 
895
                self.call_count = 0
 
896
            def send(self, bytes):
 
897
                self.call_count += 1
 
898
                if self.call_count > 100:
 
899
                    # Prevent the test suite from hanging
 
900
                    raise RuntimeError('too many calls')
 
901
                return 0
 
902
        sock = NoSendingSocket()
 
903
        self.assertRaises(errors.ConnectionReset,
 
904
                          osutils.send_all, sock, b'content')
 
905
        self.assertEqual(1, sock.call_count)
 
906
 
 
907
 
 
908
class TestPosixFuncs(tests.TestCase):
 
909
    """Test that the posix version of normpath returns an appropriate path
 
910
       when used with 2 leading slashes."""
 
911
 
 
912
    def test_normpath(self):
 
913
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
 
914
        self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
915
        self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
916
 
 
917
 
 
918
class TestWin32Funcs(tests.TestCase):
 
919
    """Test that _win32 versions of os utilities return appropriate paths."""
 
920
 
 
921
    def test_abspath(self):
 
922
        self.requireFeature(features.win32_feature)
 
923
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
 
924
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
 
925
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
 
926
        self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
 
927
 
 
928
    def test_realpath(self):
 
929
        self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
 
930
        self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
 
931
 
 
932
    def test_pathjoin(self):
 
933
        self.assertEqual('path/to/foo',
 
934
                         osutils._win32_pathjoin('path', 'to', 'foo'))
 
935
        self.assertEqual('C:/foo',
 
936
                         osutils._win32_pathjoin('path\\to', 'C:\\foo'))
 
937
        self.assertEqual('C:/foo',
 
938
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
 
939
        self.assertEqual('path/to/foo',
 
940
                         osutils._win32_pathjoin('path/to/', 'foo'))
 
941
 
 
942
    def test_pathjoin_late_bugfix(self):
 
943
        if sys.version_info < (2, 7, 6):
 
944
            expected = '/foo'
 
945
        else:
 
946
            expected = 'C:/foo'
 
947
        self.assertEqual(expected,
 
948
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
 
949
        self.assertEqual(expected,
 
950
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
 
951
 
 
952
    def test_normpath(self):
 
953
        self.assertEqual('path/to/foo',
 
954
                         osutils._win32_normpath(r'path\\from\..\to\.\foo'))
 
955
        self.assertEqual('path/to/foo',
 
956
                         osutils._win32_normpath('path//from/../to/./foo'))
 
957
 
 
958
    def test_getcwd(self):
 
959
        cwd = osutils._win32_getcwd()
 
960
        os_cwd = osutils._getcwd()
 
961
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
 
962
        # win32 is inconsistent whether it returns lower or upper case
 
963
        # and even if it was consistent the user might type the other
 
964
        # so we force it to uppercase
 
965
        # running python.exe under cmd.exe return capital C:\\
 
966
        # running win32 python inside a cygwin shell returns lowercase
 
967
        self.assertEqual(os_cwd[0].upper(), cwd[0])
 
968
 
 
969
    def test_fixdrive(self):
 
970
        self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
 
971
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
 
972
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
 
973
 
 
974
 
 
975
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
 
976
    """Test win32 functions that create files."""
 
977
 
 
978
    def test_getcwd(self):
 
979
        self.requireFeature(features.UnicodeFilenameFeature)
 
980
        os.mkdir(u'mu-\xb5')
 
981
        os.chdir(u'mu-\xb5')
 
982
        # TODO: jam 20060427 This will probably fail on Mac OSX because
 
983
        #       it will change the normalization of B\xe5gfors
 
984
        #       Consider using a different unicode character, or make
 
985
        #       osutils.getcwd() renormalize the path.
 
986
        self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
 
987
 
 
988
    def test_minimum_path_selection(self):
 
989
        self.assertEqual(set(),
 
990
            osutils.minimum_path_selection([]))
 
991
        self.assertEqual({'a'},
 
992
            osutils.minimum_path_selection(['a']))
 
993
        self.assertEqual({'a', 'b'},
 
994
            osutils.minimum_path_selection(['a', 'b']))
 
995
        self.assertEqual({'a/', 'b'},
 
996
            osutils.minimum_path_selection(['a/', 'b']))
 
997
        self.assertEqual({'a/', 'b'},
 
998
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
999
        self.assertEqual({'a-b', 'a', 'a0b'},
 
1000
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
 
1001
 
 
1002
    def test_mkdtemp(self):
 
1003
        tmpdir = osutils._win32_mkdtemp(dir='.')
 
1004
        self.assertFalse('\\' in tmpdir)
 
1005
 
 
1006
    def test_rename(self):
 
1007
        a = open('a', 'wb')
 
1008
        a.write('foo\n')
 
1009
        a.close()
 
1010
        b = open('b', 'wb')
 
1011
        b.write('baz\n')
 
1012
        b.close()
 
1013
 
 
1014
        osutils._win32_rename('b', 'a')
 
1015
        self.assertPathExists('a')
 
1016
        self.assertPathDoesNotExist('b')
 
1017
        self.assertFileEqual('baz\n', 'a')
 
1018
 
 
1019
    def test_rename_missing_file(self):
 
1020
        a = open('a', 'wb')
 
1021
        a.write('foo\n')
 
1022
        a.close()
 
1023
 
 
1024
        try:
 
1025
            osutils._win32_rename('b', 'a')
 
1026
        except (IOError, OSError) as e:
 
1027
            self.assertEqual(errno.ENOENT, e.errno)
 
1028
        self.assertFileEqual('foo\n', 'a')
 
1029
 
 
1030
    def test_rename_missing_dir(self):
 
1031
        os.mkdir('a')
 
1032
        try:
 
1033
            osutils._win32_rename('b', 'a')
 
1034
        except (IOError, OSError) as e:
 
1035
            self.assertEqual(errno.ENOENT, e.errno)
 
1036
 
 
1037
    def test_rename_current_dir(self):
 
1038
        os.mkdir('a')
 
1039
        os.chdir('a')
 
1040
        # You can't rename the working directory
 
1041
        # doing rename non-existant . usually
 
1042
        # just raises ENOENT, since non-existant
 
1043
        # doesn't exist.
 
1044
        try:
 
1045
            osutils._win32_rename('b', '.')
 
1046
        except (IOError, OSError) as e:
 
1047
            self.assertEqual(errno.ENOENT, e.errno)
 
1048
 
 
1049
    def test_splitpath(self):
 
1050
        def check(expected, path):
 
1051
            self.assertEqual(expected, osutils.splitpath(path))
 
1052
 
 
1053
        check(['a'], 'a')
 
1054
        check(['a', 'b'], 'a/b')
 
1055
        check(['a', 'b'], 'a/./b')
 
1056
        check(['a', '.b'], 'a/.b')
 
1057
        check(['a', '.b'], 'a\\.b')
 
1058
 
 
1059
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
 
1060
 
 
1061
 
 
1062
class TestParentDirectories(tests.TestCaseInTempDir):
 
1063
    """Test osutils.parent_directories()"""
 
1064
 
 
1065
    def test_parent_directories(self):
 
1066
        self.assertEqual([], osutils.parent_directories('a'))
 
1067
        self.assertEqual(['a'], osutils.parent_directories('a/b'))
 
1068
        self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
 
1069
 
 
1070
 
 
1071
class TestMacFuncsDirs(tests.TestCaseInTempDir):
 
1072
    """Test mac special functions that require directories."""
 
1073
 
 
1074
    def test_getcwd(self):
 
1075
        self.requireFeature(features.UnicodeFilenameFeature)
 
1076
        os.mkdir(u'B\xe5gfors')
 
1077
        os.chdir(u'B\xe5gfors')
 
1078
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
1079
 
 
1080
    def test_getcwd_nonnorm(self):
 
1081
        self.requireFeature(features.UnicodeFilenameFeature)
 
1082
        # Test that _mac_getcwd() will normalize this path
 
1083
        os.mkdir(u'Ba\u030agfors')
 
1084
        os.chdir(u'Ba\u030agfors')
 
1085
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
1086
 
 
1087
 
 
1088
class TestChunksToLines(tests.TestCase):
 
1089
 
 
1090
    def test_smoketest(self):
 
1091
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
1092
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
 
1093
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
1094
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
 
1095
 
 
1096
    def test_osutils_binding(self):
 
1097
        from . import test__chunks_to_lines
 
1098
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
 
1099
            from .._chunks_to_lines_pyx import chunks_to_lines
 
1100
        else:
 
1101
            from .._chunks_to_lines_py import chunks_to_lines
 
1102
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
 
1103
 
 
1104
 
 
1105
class TestSplitLines(tests.TestCase):
 
1106
 
 
1107
    def test_split_unicode(self):
 
1108
        self.assertEqual([u'foo\n', u'bar\xae'],
 
1109
                         osutils.split_lines(u'foo\nbar\xae'))
 
1110
        self.assertEqual([u'foo\n', u'bar\xae\n'],
 
1111
                         osutils.split_lines(u'foo\nbar\xae\n'))
 
1112
 
 
1113
    def test_split_with_carriage_returns(self):
 
1114
        self.assertEqual(['foo\rbar\n'],
 
1115
                         osutils.split_lines('foo\rbar\n'))
 
1116
 
 
1117
 
 
1118
class TestWalkDirs(tests.TestCaseInTempDir):
 
1119
 
 
1120
    def assertExpectedBlocks(self, expected, result):
 
1121
        self.assertEqual(expected,
 
1122
                         [(dirinfo, [line[0:3] for line in block])
 
1123
                          for dirinfo, block in result])
 
1124
 
 
1125
    def test_walkdirs(self):
 
1126
        tree = [
 
1127
            '.bzr',
 
1128
            '0file',
 
1129
            '1dir/',
 
1130
            '1dir/0file',
 
1131
            '1dir/1dir/',
 
1132
            '2file'
 
1133
            ]
 
1134
        self.build_tree(tree)
 
1135
        expected_dirblocks = [
 
1136
                (('', '.'),
 
1137
                 [('0file', '0file', 'file'),
 
1138
                  ('1dir', '1dir', 'directory'),
 
1139
                  ('2file', '2file', 'file'),
 
1140
                 ]
 
1141
                ),
 
1142
                (('1dir', './1dir'),
 
1143
                 [('1dir/0file', '0file', 'file'),
 
1144
                  ('1dir/1dir', '1dir', 'directory'),
 
1145
                 ]
 
1146
                ),
 
1147
                (('1dir/1dir', './1dir/1dir'),
 
1148
                 [
 
1149
                 ]
 
1150
                ),
 
1151
            ]
 
1152
        result = []
 
1153
        found_bzrdir = False
 
1154
        for dirdetail, dirblock in osutils.walkdirs('.'):
 
1155
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
1156
                # this tests the filtering of selected paths
 
1157
                found_bzrdir = True
 
1158
                del dirblock[0]
 
1159
            result.append((dirdetail, dirblock))
 
1160
 
 
1161
        self.assertTrue(found_bzrdir)
 
1162
        self.assertExpectedBlocks(expected_dirblocks, result)
 
1163
        # you can search a subdir only, with a supplied prefix.
 
1164
        result = []
 
1165
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
1166
            result.append(dirblock)
 
1167
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
 
1168
 
 
1169
    def test_walkdirs_os_error(self):
 
1170
        # <https://bugs.launchpad.net/bzr/+bug/338653>
 
1171
        # Pyrex readdir didn't raise useful messages if it had an error
 
1172
        # reading the directory
 
1173
        if sys.platform == 'win32':
 
1174
            raise tests.TestNotApplicable(
 
1175
                "readdir IOError not tested on win32")
 
1176
        self.requireFeature(features.not_running_as_root)
 
1177
        os.mkdir("test-unreadable")
 
1178
        os.chmod("test-unreadable", 0000)
 
1179
        # must chmod it back so that it can be removed
 
1180
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
 
1181
        # The error is not raised until the generator is actually evaluated.
 
1182
        # (It would be ok if it happened earlier but at the moment it
 
1183
        # doesn't.)
 
1184
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
 
1185
        self.assertEqual('./test-unreadable', e.filename)
 
1186
        self.assertEqual(errno.EACCES, e.errno)
 
1187
        # Ensure the message contains the file name
 
1188
        self.assertContainsRe(str(e), "\./test-unreadable")
 
1189
 
 
1190
 
 
1191
    def test_walkdirs_encoding_error(self):
 
1192
        # <https://bugs.launchpad.net/bzr/+bug/488519>
 
1193
        # walkdirs didn't raise a useful message when the filenames
 
1194
        # are not using the filesystem's encoding
 
1195
 
 
1196
        # require a bytestring based filesystem
 
1197
        self.requireFeature(features.ByteStringNamedFilesystem)
 
1198
 
 
1199
        tree = [
 
1200
            '.bzr',
 
1201
            '0file',
 
1202
            '1dir/',
 
1203
            '1dir/0file',
 
1204
            '1dir/1dir/',
 
1205
            '1file'
 
1206
            ]
 
1207
 
 
1208
        self.build_tree(tree)
 
1209
 
 
1210
        # rename the 1file to a latin-1 filename
 
1211
        os.rename("./1file", "\xe8file")
 
1212
        if "\xe8file" not in os.listdir("."):
 
1213
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
 
1214
 
 
1215
        self._save_platform_info()
 
1216
        osutils._fs_enc = 'UTF-8'
 
1217
 
 
1218
        # this should raise on error
 
1219
        def attempt():
 
1220
            for dirdetail, dirblock in osutils.walkdirs('.'):
 
1221
                pass
 
1222
 
 
1223
        self.assertRaises(errors.BadFilenameEncoding, attempt)
 
1224
 
 
1225
    def test__walkdirs_utf8(self):
 
1226
        tree = [
 
1227
            '.bzr',
 
1228
            '0file',
 
1229
            '1dir/',
 
1230
            '1dir/0file',
 
1231
            '1dir/1dir/',
 
1232
            '2file'
 
1233
            ]
 
1234
        self.build_tree(tree)
 
1235
        expected_dirblocks = [
 
1236
                (('', '.'),
 
1237
                 [('0file', '0file', 'file'),
 
1238
                  ('1dir', '1dir', 'directory'),
 
1239
                  ('2file', '2file', 'file'),
 
1240
                 ]
 
1241
                ),
 
1242
                (('1dir', './1dir'),
 
1243
                 [('1dir/0file', '0file', 'file'),
 
1244
                  ('1dir/1dir', '1dir', 'directory'),
 
1245
                 ]
 
1246
                ),
 
1247
                (('1dir/1dir', './1dir/1dir'),
 
1248
                 [
 
1249
                 ]
 
1250
                ),
 
1251
            ]
 
1252
        result = []
 
1253
        found_bzrdir = False
 
1254
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1255
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
1256
                # this tests the filtering of selected paths
 
1257
                found_bzrdir = True
 
1258
                del dirblock[0]
 
1259
            result.append((dirdetail, dirblock))
 
1260
 
 
1261
        self.assertTrue(found_bzrdir)
 
1262
        self.assertExpectedBlocks(expected_dirblocks, result)
 
1263
 
 
1264
        # you can search a subdir only, with a supplied prefix.
 
1265
        result = []
 
1266
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
1267
            result.append(dirblock)
 
1268
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
 
1269
 
 
1270
    def _filter_out_stat(self, result):
 
1271
        """Filter out the stat value from the walkdirs result"""
 
1272
        for dirdetail, dirblock in result:
 
1273
            new_dirblock = []
 
1274
            for info in dirblock:
 
1275
                # Ignore info[3] which is the stat
 
1276
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
1277
            dirblock[:] = new_dirblock
 
1278
 
 
1279
    def _save_platform_info(self):
 
1280
        self.overrideAttr(osutils, '_fs_enc')
 
1281
        self.overrideAttr(osutils, '_selected_dir_reader')
 
1282
 
 
1283
    def assertDirReaderIs(self, expected):
 
1284
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
 
1285
        # Force it to redetect
 
1286
        osutils._selected_dir_reader = None
 
1287
        # Nothing to list, but should still trigger the selection logic
 
1288
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
 
1289
        self.assertIsInstance(osutils._selected_dir_reader, expected)
 
1290
 
 
1291
    def test_force_walkdirs_utf8_fs_utf8(self):
 
1292
        self.requireFeature(UTF8DirReaderFeature)
 
1293
        self._save_platform_info()
 
1294
        osutils._fs_enc = 'utf-8'
 
1295
        self.assertDirReaderIs(
 
1296
            UTF8DirReaderFeature.module.UTF8DirReader)
 
1297
 
 
1298
    def test_force_walkdirs_utf8_fs_ascii(self):
 
1299
        self.requireFeature(UTF8DirReaderFeature)
 
1300
        self._save_platform_info()
 
1301
        osutils._fs_enc = 'ascii'
 
1302
        self.assertDirReaderIs(
 
1303
            UTF8DirReaderFeature.module.UTF8DirReader)
 
1304
 
 
1305
    def test_force_walkdirs_utf8_fs_latin1(self):
 
1306
        self._save_platform_info()
 
1307
        osutils._fs_enc = 'iso-8859-1'
 
1308
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
1309
 
 
1310
    def test_force_walkdirs_utf8_nt(self):
 
1311
        # Disabled because the thunk of the whole walkdirs api is disabled.
 
1312
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1313
        self._save_platform_info()
 
1314
        from .._walkdirs_win32 import Win32ReadDir
 
1315
        self.assertDirReaderIs(Win32ReadDir)
 
1316
 
 
1317
    def test_unicode_walkdirs(self):
 
1318
        """Walkdirs should always return unicode paths."""
 
1319
        self.requireFeature(features.UnicodeFilenameFeature)
 
1320
        name0 = u'0file-\xb6'
 
1321
        name1 = u'1dir-\u062c\u0648'
 
1322
        name2 = u'2file-\u0633'
 
1323
        tree = [
 
1324
            name0,
 
1325
            name1 + '/',
 
1326
            name1 + '/' + name0,
 
1327
            name1 + '/' + name1 + '/',
 
1328
            name2,
 
1329
            ]
 
1330
        self.build_tree(tree)
 
1331
        expected_dirblocks = [
 
1332
                ((u'', u'.'),
 
1333
                 [(name0, name0, 'file', './' + name0),
 
1334
                  (name1, name1, 'directory', './' + name1),
 
1335
                  (name2, name2, 'file', './' + name2),
 
1336
                 ]
 
1337
                ),
 
1338
                ((name1, './' + name1),
 
1339
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1340
                                                        + '/' + name0),
 
1341
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1342
                                                            + '/' + name1),
 
1343
                 ]
 
1344
                ),
 
1345
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1346
                 [
 
1347
                 ]
 
1348
                ),
 
1349
            ]
 
1350
        result = list(osutils.walkdirs('.'))
 
1351
        self._filter_out_stat(result)
 
1352
        self.assertEqual(expected_dirblocks, result)
 
1353
        result = list(osutils.walkdirs(u'./'+name1, name1))
 
1354
        self._filter_out_stat(result)
 
1355
        self.assertEqual(expected_dirblocks[1:], result)
 
1356
 
 
1357
    def test_unicode__walkdirs_utf8(self):
 
1358
        """Walkdirs_utf8 should always return utf8 paths.
 
1359
 
 
1360
        The abspath portion might be in unicode or utf-8
 
1361
        """
 
1362
        self.requireFeature(features.UnicodeFilenameFeature)
 
1363
        name0 = u'0file-\xb6'
 
1364
        name1 = u'1dir-\u062c\u0648'
 
1365
        name2 = u'2file-\u0633'
 
1366
        tree = [
 
1367
            name0,
 
1368
            name1 + '/',
 
1369
            name1 + '/' + name0,
 
1370
            name1 + '/' + name1 + '/',
 
1371
            name2,
 
1372
            ]
 
1373
        self.build_tree(tree)
 
1374
        name0 = name0.encode('utf8')
 
1375
        name1 = name1.encode('utf8')
 
1376
        name2 = name2.encode('utf8')
 
1377
 
 
1378
        expected_dirblocks = [
 
1379
                (('', '.'),
 
1380
                 [(name0, name0, 'file', './' + name0),
 
1381
                  (name1, name1, 'directory', './' + name1),
 
1382
                  (name2, name2, 'file', './' + name2),
 
1383
                 ]
 
1384
                ),
 
1385
                ((name1, './' + name1),
 
1386
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1387
                                                        + '/' + name0),
 
1388
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1389
                                                            + '/' + name1),
 
1390
                 ]
 
1391
                ),
 
1392
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1393
                 [
 
1394
                 ]
 
1395
                ),
 
1396
            ]
 
1397
        result = []
 
1398
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
 
1399
        # all abspaths are Unicode, and encode them back into utf8.
 
1400
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1401
            self.assertIsInstance(dirdetail[0], str)
 
1402
            if isinstance(dirdetail[1], unicode):
 
1403
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
 
1404
                dirblock = [list(info) for info in dirblock]
 
1405
                for info in dirblock:
 
1406
                    self.assertIsInstance(info[4], unicode)
 
1407
                    info[4] = info[4].encode('utf8')
 
1408
            new_dirblock = []
 
1409
            for info in dirblock:
 
1410
                self.assertIsInstance(info[0], str)
 
1411
                self.assertIsInstance(info[1], str)
 
1412
                self.assertIsInstance(info[4], str)
 
1413
                # Remove the stat information
 
1414
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
1415
            result.append((dirdetail, new_dirblock))
 
1416
        self.assertEqual(expected_dirblocks, result)
 
1417
 
 
1418
    def test__walkdirs_utf8_with_unicode_fs(self):
 
1419
        """UnicodeDirReader should be a safe fallback everywhere
 
1420
 
 
1421
        The abspath portion should be in unicode
 
1422
        """
 
1423
        self.requireFeature(features.UnicodeFilenameFeature)
 
1424
        # Use the unicode reader. TODO: split into driver-and-driven unit
 
1425
        # tests.
 
1426
        self._save_platform_info()
 
1427
        osutils._selected_dir_reader = osutils.UnicodeDirReader()
 
1428
        name0u = u'0file-\xb6'
 
1429
        name1u = u'1dir-\u062c\u0648'
 
1430
        name2u = u'2file-\u0633'
 
1431
        tree = [
 
1432
            name0u,
 
1433
            name1u + '/',
 
1434
            name1u + '/' + name0u,
 
1435
            name1u + '/' + name1u + '/',
 
1436
            name2u,
 
1437
            ]
 
1438
        self.build_tree(tree)
 
1439
        name0 = name0u.encode('utf8')
 
1440
        name1 = name1u.encode('utf8')
 
1441
        name2 = name2u.encode('utf8')
 
1442
 
 
1443
        # All of the abspaths should be in unicode, all of the relative paths
 
1444
        # should be in utf8
 
1445
        expected_dirblocks = [
 
1446
                (('', '.'),
 
1447
                 [(name0, name0, 'file', './' + name0u),
 
1448
                  (name1, name1, 'directory', './' + name1u),
 
1449
                  (name2, name2, 'file', './' + name2u),
 
1450
                 ]
 
1451
                ),
 
1452
                ((name1, './' + name1u),
 
1453
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1454
                                                        + '/' + name0u),
 
1455
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1456
                                                            + '/' + name1u),
 
1457
                 ]
 
1458
                ),
 
1459
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1460
                 [
 
1461
                 ]
 
1462
                ),
 
1463
            ]
 
1464
        result = list(osutils._walkdirs_utf8('.'))
 
1465
        self._filter_out_stat(result)
 
1466
        self.assertEqual(expected_dirblocks, result)
 
1467
 
 
1468
    def test__walkdirs_utf8_win32readdir(self):
 
1469
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1470
        self.requireFeature(features.UnicodeFilenameFeature)
 
1471
        from .._walkdirs_win32 import Win32ReadDir
 
1472
        self._save_platform_info()
 
1473
        osutils._selected_dir_reader = Win32ReadDir()
 
1474
        name0u = u'0file-\xb6'
 
1475
        name1u = u'1dir-\u062c\u0648'
 
1476
        name2u = u'2file-\u0633'
 
1477
        tree = [
 
1478
            name0u,
 
1479
            name1u + '/',
 
1480
            name1u + '/' + name0u,
 
1481
            name1u + '/' + name1u + '/',
 
1482
            name2u,
 
1483
            ]
 
1484
        self.build_tree(tree)
 
1485
        name0 = name0u.encode('utf8')
 
1486
        name1 = name1u.encode('utf8')
 
1487
        name2 = name2u.encode('utf8')
 
1488
 
 
1489
        # All of the abspaths should be in unicode, all of the relative paths
 
1490
        # should be in utf8
 
1491
        expected_dirblocks = [
 
1492
                (('', '.'),
 
1493
                 [(name0, name0, 'file', './' + name0u),
 
1494
                  (name1, name1, 'directory', './' + name1u),
 
1495
                  (name2, name2, 'file', './' + name2u),
 
1496
                 ]
 
1497
                ),
 
1498
                ((name1, './' + name1u),
 
1499
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1500
                                                        + '/' + name0u),
 
1501
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1502
                                                            + '/' + name1u),
 
1503
                 ]
 
1504
                ),
 
1505
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1506
                 [
 
1507
                 ]
 
1508
                ),
 
1509
            ]
 
1510
        result = list(osutils._walkdirs_utf8(u'.'))
 
1511
        self._filter_out_stat(result)
 
1512
        self.assertEqual(expected_dirblocks, result)
 
1513
 
 
1514
    def assertStatIsCorrect(self, path, win32stat):
 
1515
        os_stat = os.stat(path)
 
1516
        self.assertEqual(os_stat.st_size, win32stat.st_size)
 
1517
        self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
 
1518
        self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
 
1519
        self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
 
1520
        self.assertEqual(os_stat.st_dev, win32stat.st_dev)
 
1521
        self.assertEqual(os_stat.st_ino, win32stat.st_ino)
 
1522
        self.assertEqual(os_stat.st_mode, win32stat.st_mode)
 
1523
 
 
1524
    def test__walkdirs_utf_win32_find_file_stat_file(self):
 
1525
        """make sure our Stat values are valid"""
 
1526
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1527
        self.requireFeature(features.UnicodeFilenameFeature)
 
1528
        from .._walkdirs_win32 import Win32ReadDir
 
1529
        name0u = u'0file-\xb6'
 
1530
        name0 = name0u.encode('utf8')
 
1531
        self.build_tree([name0u])
 
1532
        # I hate to sleep() here, but I'm trying to make the ctime different
 
1533
        # from the mtime
 
1534
        time.sleep(2)
 
1535
        f = open(name0u, 'ab')
 
1536
        try:
 
1537
            f.write('just a small update')
 
1538
        finally:
 
1539
            f.close()
 
1540
 
 
1541
        result = Win32ReadDir().read_dir('', u'.')
 
1542
        entry = result[0]
 
1543
        self.assertEqual((name0, name0, 'file'), entry[:3])
 
1544
        self.assertEqual(u'./' + name0u, entry[4])
 
1545
        self.assertStatIsCorrect(entry[4], entry[3])
 
1546
        self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
 
1547
 
 
1548
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
 
1549
        """make sure our Stat values are valid"""
 
1550
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1551
        self.requireFeature(features.UnicodeFilenameFeature)
 
1552
        from .._walkdirs_win32 import Win32ReadDir
 
1553
        name0u = u'0dir-\u062c\u0648'
 
1554
        name0 = name0u.encode('utf8')
 
1555
        self.build_tree([name0u + '/'])
 
1556
 
 
1557
        result = Win32ReadDir().read_dir('', u'.')
 
1558
        entry = result[0]
 
1559
        self.assertEqual((name0, name0, 'directory'), entry[:3])
 
1560
        self.assertEqual(u'./' + name0u, entry[4])
 
1561
        self.assertStatIsCorrect(entry[4], entry[3])
 
1562
 
 
1563
    def assertPathCompare(self, path_less, path_greater):
 
1564
        """check that path_less and path_greater compare correctly."""
 
1565
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
1566
            path_less, path_less))
 
1567
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
1568
            path_greater, path_greater))
 
1569
        self.assertEqual(-1, osutils.compare_paths_prefix_order(
 
1570
            path_less, path_greater))
 
1571
        self.assertEqual(1, osutils.compare_paths_prefix_order(
 
1572
            path_greater, path_less))
 
1573
 
 
1574
    def test_compare_paths_prefix_order(self):
 
1575
        # root before all else
 
1576
        self.assertPathCompare("/", "/a")
 
1577
        # alpha within a dir
 
1578
        self.assertPathCompare("/a", "/b")
 
1579
        self.assertPathCompare("/b", "/z")
 
1580
        # high dirs before lower.
 
1581
        self.assertPathCompare("/z", "/a/a")
 
1582
        # except if the deeper dir should be output first
 
1583
        self.assertPathCompare("/a/b/c", "/d/g")
 
1584
        # lexical betwen dirs of the same height
 
1585
        self.assertPathCompare("/a/z", "/z/z")
 
1586
        self.assertPathCompare("/a/c/z", "/a/d/e")
 
1587
 
 
1588
        # this should also be consistent for no leading / paths
 
1589
        # root before all else
 
1590
        self.assertPathCompare("", "a")
 
1591
        # alpha within a dir
 
1592
        self.assertPathCompare("a", "b")
 
1593
        self.assertPathCompare("b", "z")
 
1594
        # high dirs before lower.
 
1595
        self.assertPathCompare("z", "a/a")
 
1596
        # except if the deeper dir should be output first
 
1597
        self.assertPathCompare("a/b/c", "d/g")
 
1598
        # lexical betwen dirs of the same height
 
1599
        self.assertPathCompare("a/z", "z/z")
 
1600
        self.assertPathCompare("a/c/z", "a/d/e")
 
1601
 
 
1602
    def test_path_prefix_sorting(self):
 
1603
        """Doing a sort on path prefix should match our sample data."""
 
1604
        original_paths = [
 
1605
            'a',
 
1606
            'a/b',
 
1607
            'a/b/c',
 
1608
            'b',
 
1609
            'b/c',
 
1610
            'd',
 
1611
            'd/e',
 
1612
            'd/e/f',
 
1613
            'd/f',
 
1614
            'd/g',
 
1615
            'g',
 
1616
            ]
 
1617
 
 
1618
        dir_sorted_paths = [
 
1619
            'a',
 
1620
            'b',
 
1621
            'd',
 
1622
            'g',
 
1623
            'a/b',
 
1624
            'a/b/c',
 
1625
            'b/c',
 
1626
            'd/e',
 
1627
            'd/f',
 
1628
            'd/g',
 
1629
            'd/e/f',
 
1630
            ]
 
1631
 
 
1632
        self.assertEqual(
 
1633
            dir_sorted_paths,
 
1634
            sorted(original_paths, key=osutils.path_prefix_key))
 
1635
        # using the comparison routine shoudl work too:
 
1636
        self.assertEqual(
 
1637
            dir_sorted_paths,
 
1638
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
 
1639
 
 
1640
 
 
1641
class TestCopyTree(tests.TestCaseInTempDir):
 
1642
 
 
1643
    def test_copy_basic_tree(self):
 
1644
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1645
        osutils.copy_tree('source', 'target')
 
1646
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1647
        self.assertEqual(['c'], os.listdir('target/b'))
 
1648
 
 
1649
    def test_copy_tree_target_exists(self):
 
1650
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
 
1651
                         'target/'])
 
1652
        osutils.copy_tree('source', 'target')
 
1653
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1654
        self.assertEqual(['c'], os.listdir('target/b'))
 
1655
 
 
1656
    def test_copy_tree_symlinks(self):
 
1657
        self.requireFeature(features.SymlinkFeature)
 
1658
        self.build_tree(['source/'])
 
1659
        os.symlink('a/generic/path', 'source/lnk')
 
1660
        osutils.copy_tree('source', 'target')
 
1661
        self.assertEqual(['lnk'], os.listdir('target'))
 
1662
        self.assertEqual('a/generic/path', os.readlink('target/lnk'))
 
1663
 
 
1664
    def test_copy_tree_handlers(self):
 
1665
        processed_files = []
 
1666
        processed_links = []
 
1667
        def file_handler(from_path, to_path):
 
1668
            processed_files.append(('f', from_path, to_path))
 
1669
        def dir_handler(from_path, to_path):
 
1670
            processed_files.append(('d', from_path, to_path))
 
1671
        def link_handler(from_path, to_path):
 
1672
            processed_links.append((from_path, to_path))
 
1673
        handlers = {'file':file_handler,
 
1674
                    'directory':dir_handler,
 
1675
                    'symlink':link_handler,
 
1676
                   }
 
1677
 
 
1678
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1679
        if osutils.has_symlinks():
 
1680
            os.symlink('a/generic/path', 'source/lnk')
 
1681
        osutils.copy_tree('source', 'target', handlers=handlers)
 
1682
 
 
1683
        self.assertEqual([('d', 'source', 'target'),
 
1684
                          ('f', 'source/a', 'target/a'),
 
1685
                          ('d', 'source/b', 'target/b'),
 
1686
                          ('f', 'source/b/c', 'target/b/c'),
 
1687
                         ], processed_files)
 
1688
        self.assertPathDoesNotExist('target')
 
1689
        if osutils.has_symlinks():
 
1690
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
 
1691
 
 
1692
 
 
1693
class TestSetUnsetEnv(tests.TestCase):
 
1694
    """Test updating the environment"""
 
1695
 
 
1696
    def setUp(self):
 
1697
        super(TestSetUnsetEnv, self).setUp()
 
1698
 
 
1699
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
 
1700
                         'Environment was not cleaned up properly.'
 
1701
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
 
1702
        def cleanup():
 
1703
            if 'BRZ_TEST_ENV_VAR' in os.environ:
 
1704
                del os.environ['BRZ_TEST_ENV_VAR']
 
1705
        self.addCleanup(cleanup)
 
1706
 
 
1707
    def test_set(self):
 
1708
        """Test that we can set an env variable"""
 
1709
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1710
        self.assertEqual(None, old)
 
1711
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1712
 
 
1713
    def test_double_set(self):
 
1714
        """Test that we get the old value out"""
 
1715
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1716
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
 
1717
        self.assertEqual('foo', old)
 
1718
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1719
 
 
1720
    def test_unicode(self):
 
1721
        """Environment can only contain plain strings
 
1722
 
 
1723
        So Unicode strings must be encoded.
 
1724
        """
 
1725
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
1726
        if uni_val is None:
 
1727
            raise tests.TestSkipped(
 
1728
                'Cannot find a unicode character that works in encoding %s'
 
1729
                % (osutils.get_user_encoding(),))
 
1730
 
 
1731
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
 
1732
        self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1733
 
 
1734
    def test_unset(self):
 
1735
        """Test that passing None will remove the env var"""
 
1736
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1737
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
 
1738
        self.assertEqual('foo', old)
 
1739
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1740
        self.assertFalse('BRZ_TEST_ENV_VAR' in os.environ)
 
1741
 
 
1742
 
 
1743
class TestSizeShaFile(tests.TestCaseInTempDir):
 
1744
 
 
1745
    def test_sha_empty(self):
 
1746
        self.build_tree_contents([('foo', '')])
 
1747
        expected_sha = osutils.sha_string('')
 
1748
        f = open('foo')
 
1749
        self.addCleanup(f.close)
 
1750
        size, sha = osutils.size_sha_file(f)
 
1751
        self.assertEqual(0, size)
 
1752
        self.assertEqual(expected_sha, sha)
 
1753
 
 
1754
    def test_sha_mixed_endings(self):
 
1755
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1756
        self.build_tree_contents([('foo', text)])
 
1757
        expected_sha = osutils.sha_string(text)
 
1758
        f = open('foo', 'rb')
 
1759
        self.addCleanup(f.close)
 
1760
        size, sha = osutils.size_sha_file(f)
 
1761
        self.assertEqual(38, size)
 
1762
        self.assertEqual(expected_sha, sha)
 
1763
 
 
1764
 
 
1765
class TestShaFileByName(tests.TestCaseInTempDir):
 
1766
 
 
1767
    def test_sha_empty(self):
 
1768
        self.build_tree_contents([('foo', '')])
 
1769
        expected_sha = osutils.sha_string('')
 
1770
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1771
 
 
1772
    def test_sha_mixed_endings(self):
 
1773
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1774
        self.build_tree_contents([('foo', text)])
 
1775
        expected_sha = osutils.sha_string(text)
 
1776
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1777
 
 
1778
 
 
1779
class TestResourceLoading(tests.TestCaseInTempDir):
 
1780
 
 
1781
    def test_resource_string(self):
 
1782
        # test resource in breezy
 
1783
        text = osutils.resource_string('breezy', 'debug.py')
 
1784
        self.assertContainsRe(text, "debug_flags = set()")
 
1785
        # test resource under breezy
 
1786
        text = osutils.resource_string('breezy.ui', 'text.py')
 
1787
        self.assertContainsRe(text, "class TextUIFactory")
 
1788
        # test unsupported package
 
1789
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
 
1790
            'yyy.xx')
 
1791
        # test unknown resource
 
1792
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
 
1793
 
 
1794
 
 
1795
class TestDirReader(tests.TestCaseInTempDir):
 
1796
 
 
1797
    scenarios = dir_reader_scenarios()
 
1798
 
 
1799
    # Set by load_tests
 
1800
    _dir_reader_class = None
 
1801
    _native_to_unicode = None
 
1802
 
 
1803
    def setUp(self):
 
1804
        super(TestDirReader, self).setUp()
 
1805
        self.overrideAttr(osutils,
 
1806
                          '_selected_dir_reader', self._dir_reader_class())
 
1807
 
 
1808
    def _get_ascii_tree(self):
 
1809
        tree = [
 
1810
            '0file',
 
1811
            '1dir/',
 
1812
            '1dir/0file',
 
1813
            '1dir/1dir/',
 
1814
            '2file'
 
1815
            ]
 
1816
        expected_dirblocks = [
 
1817
                (('', '.'),
 
1818
                 [('0file', '0file', 'file'),
 
1819
                  ('1dir', '1dir', 'directory'),
 
1820
                  ('2file', '2file', 'file'),
 
1821
                 ]
 
1822
                ),
 
1823
                (('1dir', './1dir'),
 
1824
                 [('1dir/0file', '0file', 'file'),
 
1825
                  ('1dir/1dir', '1dir', 'directory'),
 
1826
                 ]
 
1827
                ),
 
1828
                (('1dir/1dir', './1dir/1dir'),
 
1829
                 [
 
1830
                 ]
 
1831
                ),
 
1832
            ]
 
1833
        return tree, expected_dirblocks
 
1834
 
 
1835
    def test_walk_cur_dir(self):
 
1836
        tree, expected_dirblocks = self._get_ascii_tree()
 
1837
        self.build_tree(tree)
 
1838
        result = list(osutils._walkdirs_utf8('.'))
 
1839
        # Filter out stat and abspath
 
1840
        self.assertEqual(expected_dirblocks,
 
1841
                         [(dirinfo, [line[0:3] for line in block])
 
1842
                          for dirinfo, block in result])
 
1843
 
 
1844
    def test_walk_sub_dir(self):
 
1845
        tree, expected_dirblocks = self._get_ascii_tree()
 
1846
        self.build_tree(tree)
 
1847
        # you can search a subdir only, with a supplied prefix.
 
1848
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
 
1849
        # Filter out stat and abspath
 
1850
        self.assertEqual(expected_dirblocks[1:],
 
1851
                         [(dirinfo, [line[0:3] for line in block])
 
1852
                          for dirinfo, block in result])
 
1853
 
 
1854
    def _get_unicode_tree(self):
 
1855
        name0u = u'0file-\xb6'
 
1856
        name1u = u'1dir-\u062c\u0648'
 
1857
        name2u = u'2file-\u0633'
 
1858
        tree = [
 
1859
            name0u,
 
1860
            name1u + '/',
 
1861
            name1u + '/' + name0u,
 
1862
            name1u + '/' + name1u + '/',
 
1863
            name2u,
 
1864
            ]
 
1865
        name0 = name0u.encode('UTF-8')
 
1866
        name1 = name1u.encode('UTF-8')
 
1867
        name2 = name2u.encode('UTF-8')
 
1868
        expected_dirblocks = [
 
1869
                (('', '.'),
 
1870
                 [(name0, name0, 'file', './' + name0u),
 
1871
                  (name1, name1, 'directory', './' + name1u),
 
1872
                  (name2, name2, 'file', './' + name2u),
 
1873
                 ]
 
1874
                ),
 
1875
                ((name1, './' + name1u),
 
1876
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1877
                                                        + '/' + name0u),
 
1878
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1879
                                                            + '/' + name1u),
 
1880
                 ]
 
1881
                ),
 
1882
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1883
                 [
 
1884
                 ]
 
1885
                ),
 
1886
            ]
 
1887
        return tree, expected_dirblocks
 
1888
 
 
1889
    def _filter_out(self, raw_dirblocks):
 
1890
        """Filter out a walkdirs_utf8 result.
 
1891
 
 
1892
        stat field is removed, all native paths are converted to unicode
 
1893
        """
 
1894
        filtered_dirblocks = []
 
1895
        for dirinfo, block in raw_dirblocks:
 
1896
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
 
1897
            details = []
 
1898
            for line in block:
 
1899
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
 
1900
            filtered_dirblocks.append((dirinfo, details))
 
1901
        return filtered_dirblocks
 
1902
 
 
1903
    def test_walk_unicode_tree(self):
 
1904
        self.requireFeature(features.UnicodeFilenameFeature)
 
1905
        tree, expected_dirblocks = self._get_unicode_tree()
 
1906
        self.build_tree(tree)
 
1907
        result = list(osutils._walkdirs_utf8('.'))
 
1908
        self.assertEqual(expected_dirblocks, self._filter_out(result))
 
1909
 
 
1910
    def test_symlink(self):
 
1911
        self.requireFeature(features.SymlinkFeature)
 
1912
        self.requireFeature(features.UnicodeFilenameFeature)
 
1913
        target = u'target\N{Euro Sign}'
 
1914
        link_name = u'l\N{Euro Sign}nk'
 
1915
        os.symlink(target, link_name)
 
1916
        target_utf8 = target.encode('UTF-8')
 
1917
        link_name_utf8 = link_name.encode('UTF-8')
 
1918
        expected_dirblocks = [
 
1919
                (('', '.'),
 
1920
                 [(link_name_utf8, link_name_utf8,
 
1921
                   'symlink', './' + link_name),],
 
1922
                 )]
 
1923
        result = list(osutils._walkdirs_utf8('.'))
 
1924
        self.assertEqual(expected_dirblocks, self._filter_out(result))
 
1925
 
 
1926
 
 
1927
class TestReadLink(tests.TestCaseInTempDir):
 
1928
    """Exposes os.readlink() problems and the osutils solution.
 
1929
 
 
1930
    The only guarantee offered by os.readlink(), starting with 2.6, is that a
 
1931
    unicode string will be returned if a unicode string is passed.
 
1932
 
 
1933
    But prior python versions failed to properly encode the passed unicode
 
1934
    string.
 
1935
    """
 
1936
    _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
 
1937
 
 
1938
    def setUp(self):
 
1939
        super(tests.TestCaseInTempDir, self).setUp()
 
1940
        self.link = u'l\N{Euro Sign}ink'
 
1941
        self.target = u'targe\N{Euro Sign}t'
 
1942
        os.symlink(self.target, self.link)
 
1943
 
 
1944
    def test_os_readlink_link_encoding(self):
 
1945
        self.assertEqual(self.target,  os.readlink(self.link))
 
1946
 
 
1947
    def test_os_readlink_link_decoding(self):
 
1948
        self.assertEqual(self.target.encode(osutils._fs_enc),
 
1949
                          os.readlink(self.link.encode(osutils._fs_enc)))
 
1950
 
 
1951
 
 
1952
class TestConcurrency(tests.TestCase):
 
1953
 
 
1954
    def setUp(self):
 
1955
        super(TestConcurrency, self).setUp()
 
1956
        self.overrideAttr(osutils, '_cached_local_concurrency')
 
1957
 
 
1958
    def test_local_concurrency(self):
 
1959
        concurrency = osutils.local_concurrency()
 
1960
        self.assertIsInstance(concurrency, int)
 
1961
 
 
1962
    def test_local_concurrency_environment_variable(self):
 
1963
        self.overrideEnv('BRZ_CONCURRENCY', '2')
 
1964
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
 
1965
        self.overrideEnv('BRZ_CONCURRENCY', '3')
 
1966
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
 
1967
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
 
1968
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
 
1969
 
 
1970
    def test_option_concurrency(self):
 
1971
        self.overrideEnv('BRZ_CONCURRENCY', '1')
 
1972
        self.run_bzr('rocks --concurrency 42')
 
1973
        # Command line overrides environment variable
 
1974
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
 
1975
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
 
1976
 
 
1977
 
 
1978
class TestFailedToLoadExtension(tests.TestCase):
 
1979
 
 
1980
    def _try_loading(self):
 
1981
        try:
 
1982
            import breezy._fictional_extension_py
 
1983
        except ImportError as e:
 
1984
            osutils.failed_to_load_extension(e)
 
1985
            return True
 
1986
 
 
1987
    def setUp(self):
 
1988
        super(TestFailedToLoadExtension, self).setUp()
 
1989
        self.overrideAttr(osutils, '_extension_load_failures', [])
 
1990
 
 
1991
    def test_failure_to_load(self):
 
1992
        self._try_loading()
 
1993
        self.assertLength(1, osutils._extension_load_failures)
 
1994
        self.assertEqual(osutils._extension_load_failures[0],
 
1995
            "No module named _fictional_extension_py")
 
1996
 
 
1997
    def test_report_extension_load_failures_no_warning(self):
 
1998
        self.assertTrue(self._try_loading())
 
1999
        warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
 
2000
        # it used to give a Python warning; it no longer does
 
2001
        self.assertLength(0, warnings)
 
2002
 
 
2003
    def test_report_extension_load_failures_message(self):
 
2004
        log = BytesIO()
 
2005
        trace.push_log_file(log)
 
2006
        self.assertTrue(self._try_loading())
 
2007
        osutils.report_extension_load_failures()
 
2008
        self.assertContainsRe(
 
2009
            log.getvalue(),
 
2010
            r"brz: warning: some compiled extensions could not be loaded; "
 
2011
            "see ``brz help missing-extensions``\n"
 
2012
            )
 
2013
 
 
2014
 
 
2015
class TestTerminalWidth(tests.TestCase):
 
2016
 
 
2017
    def setUp(self):
 
2018
        super(TestTerminalWidth, self).setUp()
 
2019
        self._orig_terminal_size_state = osutils._terminal_size_state
 
2020
        self._orig_first_terminal_size = osutils._first_terminal_size
 
2021
        self.addCleanup(self.restore_osutils_globals)
 
2022
        osutils._terminal_size_state = 'no_data'
 
2023
        osutils._first_terminal_size = None
 
2024
 
 
2025
    def restore_osutils_globals(self):
 
2026
        osutils._terminal_size_state = self._orig_terminal_size_state
 
2027
        osutils._first_terminal_size = self._orig_first_terminal_size
 
2028
 
 
2029
    def replace_stdout(self, new):
 
2030
        self.overrideAttr(sys, 'stdout', new)
 
2031
 
 
2032
    def replace__terminal_size(self, new):
 
2033
        self.overrideAttr(osutils, '_terminal_size', new)
 
2034
 
 
2035
    def set_fake_tty(self):
 
2036
 
 
2037
        class I_am_a_tty(object):
 
2038
            def isatty(self):
 
2039
                return True
 
2040
 
 
2041
        self.replace_stdout(I_am_a_tty())
 
2042
 
 
2043
    def test_default_values(self):
 
2044
        self.assertEqual(80, osutils.default_terminal_width)
 
2045
 
 
2046
    def test_defaults_to_BRZ_COLUMNS(self):
 
2047
        # BRZ_COLUMNS is set by the test framework
 
2048
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
 
2049
        self.overrideEnv('BRZ_COLUMNS', '12')
 
2050
        self.assertEqual(12, osutils.terminal_width())
 
2051
 
 
2052
    def test_BRZ_COLUMNS_0_no_limit(self):
 
2053
        self.overrideEnv('BRZ_COLUMNS', '0')
 
2054
        self.assertEqual(None, osutils.terminal_width())
 
2055
 
 
2056
    def test_falls_back_to_COLUMNS(self):
 
2057
        self.overrideEnv('BRZ_COLUMNS', None)
 
2058
        self.assertNotEqual('42', os.environ['COLUMNS'])
 
2059
        self.set_fake_tty()
 
2060
        self.overrideEnv('COLUMNS', '42')
 
2061
        self.assertEqual(42, osutils.terminal_width())
 
2062
 
 
2063
    def test_tty_default_without_columns(self):
 
2064
        self.overrideEnv('BRZ_COLUMNS', None)
 
2065
        self.overrideEnv('COLUMNS', None)
 
2066
 
 
2067
        def terminal_size(w, h):
 
2068
            return 42, 42
 
2069
 
 
2070
        self.set_fake_tty()
 
2071
        # We need to override the osutils definition as it depends on the
 
2072
        # running environment that we can't control (PQM running without a
 
2073
        # controlling terminal is one example).
 
2074
        self.replace__terminal_size(terminal_size)
 
2075
        self.assertEqual(42, osutils.terminal_width())
 
2076
 
 
2077
    def test_non_tty_default_without_columns(self):
 
2078
        self.overrideEnv('BRZ_COLUMNS', None)
 
2079
        self.overrideEnv('COLUMNS', None)
 
2080
        self.replace_stdout(None)
 
2081
        self.assertEqual(None, osutils.terminal_width())
 
2082
 
 
2083
    def test_no_TIOCGWINSZ(self):
 
2084
        self.requireFeature(term_ios_feature)
 
2085
        termios = term_ios_feature.module
 
2086
        # bug 63539 is about a termios without TIOCGWINSZ attribute
 
2087
        try:
 
2088
            orig = termios.TIOCGWINSZ
 
2089
        except AttributeError:
 
2090
            # We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
 
2091
            pass
 
2092
        else:
 
2093
            self.overrideAttr(termios, 'TIOCGWINSZ')
 
2094
            del termios.TIOCGWINSZ
 
2095
        self.overrideEnv('BRZ_COLUMNS', None)
 
2096
        self.overrideEnv('COLUMNS', None)
 
2097
        # Whatever the result is, if we don't raise an exception, it's ok.
 
2098
        osutils.terminal_width()
 
2099
 
 
2100
 
 
2101
class TestCreationOps(tests.TestCaseInTempDir):
 
2102
    _test_needs_features = [features.chown_feature]
 
2103
 
 
2104
    def setUp(self):
 
2105
        super(TestCreationOps, self).setUp()
 
2106
        self.overrideAttr(os, 'chown', self._dummy_chown)
 
2107
 
 
2108
        # params set by call to _dummy_chown
 
2109
        self.path = self.uid = self.gid = None
 
2110
 
 
2111
    def _dummy_chown(self, path, uid, gid):
 
2112
        self.path, self.uid, self.gid = path, uid, gid
 
2113
 
 
2114
    def test_copy_ownership_from_path(self):
 
2115
        """copy_ownership_from_path test with specified src."""
 
2116
        ownsrc = '/'
 
2117
        f = open('test_file', 'wt')
 
2118
        osutils.copy_ownership_from_path('test_file', ownsrc)
 
2119
 
 
2120
        s = os.stat(ownsrc)
 
2121
        self.assertEqual(self.path, 'test_file')
 
2122
        self.assertEqual(self.uid, s.st_uid)
 
2123
        self.assertEqual(self.gid, s.st_gid)
 
2124
 
 
2125
    def test_copy_ownership_nonesrc(self):
 
2126
        """copy_ownership_from_path test with src=None."""
 
2127
        f = open('test_file', 'wt')
 
2128
        # should use parent dir for permissions
 
2129
        osutils.copy_ownership_from_path('test_file')
 
2130
 
 
2131
        s = os.stat('..')
 
2132
        self.assertEqual(self.path, 'test_file')
 
2133
        self.assertEqual(self.uid, s.st_uid)
 
2134
        self.assertEqual(self.gid, s.st_gid)
 
2135
 
 
2136
 
 
2137
class TestPathFromEnviron(tests.TestCase):
 
2138
 
 
2139
    def test_is_unicode(self):
 
2140
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
 
2141
        path = osutils.path_from_environ('BRZ_TEST_PATH')
 
2142
        self.assertIsInstance(path, unicode)
 
2143
        self.assertEqual(u'./anywhere at all/', path)
 
2144
 
 
2145
    def test_posix_path_env_ascii(self):
 
2146
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
 
2147
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
 
2148
        self.assertIsInstance(home, unicode)
 
2149
        self.assertEqual(u'/tmp', home)
 
2150
 
 
2151
    def test_posix_path_env_unicode(self):
 
2152
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2153
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
 
2154
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2155
        self.assertEqual(u'/home/\xa7test',
 
2156
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2157
        osutils._fs_enc = "iso8859-5"
 
2158
        self.assertEqual(u'/home/\u0407test',
 
2159
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2160
        osutils._fs_enc = "utf-8"
 
2161
        self.assertRaises(errors.BadFilenameEncoding,
 
2162
            osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
 
2163
 
 
2164
 
 
2165
class TestGetHomeDir(tests.TestCase):
 
2166
 
 
2167
    def test_is_unicode(self):
 
2168
        home = osutils._get_home_dir()
 
2169
        self.assertIsInstance(home, unicode)
 
2170
 
 
2171
    def test_posix_homeless(self):
 
2172
        self.overrideEnv('HOME', None)
 
2173
        home = osutils._get_home_dir()
 
2174
        self.assertIsInstance(home, unicode)
 
2175
 
 
2176
    def test_posix_home_ascii(self):
 
2177
        self.overrideEnv('HOME', '/home/test')
 
2178
        home = osutils._posix_get_home_dir()
 
2179
        self.assertIsInstance(home, unicode)
 
2180
        self.assertEqual(u'/home/test', home)
 
2181
 
 
2182
    def test_posix_home_unicode(self):
 
2183
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2184
        self.overrideEnv('HOME', '/home/\xa7test')
 
2185
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2186
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2187
        osutils._fs_enc = "iso8859-5"
 
2188
        self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
 
2189
        osutils._fs_enc = "utf-8"
 
2190
        self.assertRaises(errors.BadFilenameEncoding,
 
2191
            osutils._posix_get_home_dir)
 
2192
 
 
2193
 
 
2194
class TestGetuserUnicode(tests.TestCase):
 
2195
 
 
2196
    def test_is_unicode(self):
 
2197
        user = osutils.getuser_unicode()
 
2198
        self.assertIsInstance(user, unicode)
 
2199
 
 
2200
    def envvar_to_override(self):
 
2201
        if sys.platform == "win32":
 
2202
            # Disable use of platform calls on windows so envvar is used
 
2203
            self.overrideAttr(win32utils, 'has_ctypes', False)
 
2204
            return 'USERNAME' # only variable used on windows
 
2205
        return 'LOGNAME' # first variable checked by getpass.getuser()
 
2206
 
 
2207
    def test_ascii_user(self):
 
2208
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
 
2209
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
 
2210
 
 
2211
    def test_unicode_user(self):
 
2212
        ue = osutils.get_user_encoding()
 
2213
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
2214
        if uni_val is None:
 
2215
            raise tests.TestSkipped(
 
2216
                'Cannot find a unicode character that works in encoding %s'
 
2217
                % (osutils.get_user_encoding(),))
 
2218
        uni_username = u'jrandom' + uni_val
 
2219
        encoded_username = uni_username.encode(ue)
 
2220
        self.overrideEnv(self.envvar_to_override(), encoded_username)
 
2221
        self.assertEqual(uni_username, osutils.getuser_unicode())
 
2222
 
 
2223
 
 
2224
class TestBackupNames(tests.TestCase):
 
2225
 
 
2226
    def setUp(self):
 
2227
        super(TestBackupNames, self).setUp()
 
2228
        self.backups = []
 
2229
 
 
2230
    def backup_exists(self, name):
 
2231
        return name in self.backups
 
2232
 
 
2233
    def available_backup_name(self, name):
 
2234
        backup_name = osutils.available_backup_name(name, self.backup_exists)
 
2235
        self.backups.append(backup_name)
 
2236
        return backup_name
 
2237
 
 
2238
    def assertBackupName(self, expected, name):
 
2239
        self.assertEqual(expected, self.available_backup_name(name))
 
2240
 
 
2241
    def test_empty(self):
 
2242
        self.assertBackupName('file.~1~', 'file')
 
2243
 
 
2244
    def test_existing(self):
 
2245
        self.available_backup_name('file')
 
2246
        self.available_backup_name('file')
 
2247
        self.assertBackupName('file.~3~', 'file')
 
2248
        # Empty slots are found, this is not a strict requirement and may be
 
2249
        # revisited if we test against all implementations.
 
2250
        self.backups.remove('file.~2~')
 
2251
        self.assertBackupName('file.~2~', 'file')
 
2252
 
 
2253
 
 
2254
class TestFindExecutableInPath(tests.TestCase):
 
2255
 
 
2256
    def test_windows(self):
 
2257
        if sys.platform != 'win32':
 
2258
            raise tests.TestSkipped('test requires win32')
 
2259
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
 
2260
        self.assertTrue(
 
2261
            osutils.find_executable_on_path('explorer.exe') is not None)
 
2262
        self.assertTrue(
 
2263
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
 
2264
        self.assertTrue(
 
2265
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2266
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
 
2267
        
 
2268
    def test_windows_app_path(self):
 
2269
        if sys.platform != 'win32':
 
2270
            raise tests.TestSkipped('test requires win32')
 
2271
        # Override PATH env var so that exe can only be found on App Path
 
2272
        self.overrideEnv('PATH', '')
 
2273
        # Internt Explorer is always registered in the App Path
 
2274
        self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
 
2275
 
 
2276
    def test_other(self):
 
2277
        if sys.platform == 'win32':
 
2278
            raise tests.TestSkipped('test requires non-win32')
 
2279
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
 
2280
        self.assertTrue(
 
2281
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2282
 
 
2283
 
 
2284
class TestEnvironmentErrors(tests.TestCase):
 
2285
    """Test handling of environmental errors"""
 
2286
 
 
2287
    def test_is_oserror(self):
 
2288
        self.assertTrue(osutils.is_environment_error(
 
2289
            OSError(errno.EINVAL, "Invalid parameter")))
 
2290
 
 
2291
    def test_is_ioerror(self):
 
2292
        self.assertTrue(osutils.is_environment_error(
 
2293
            IOError(errno.EINVAL, "Invalid parameter")))
 
2294
 
 
2295
    def test_is_socket_error(self):
 
2296
        self.assertTrue(osutils.is_environment_error(
 
2297
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2298
 
 
2299
    def test_is_select_error(self):
 
2300
        self.assertTrue(osutils.is_environment_error(
 
2301
            select.error(errno.EINVAL, "Invalid parameter")))
 
2302
 
 
2303
    def test_is_pywintypes_error(self):
 
2304
        self.requireFeature(features.pywintypes)
 
2305
        import pywintypes
 
2306
        self.assertTrue(osutils.is_environment_error(
 
2307
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))