/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_osutils.py

  • Committer: Jelmer Vernooij
  • Date: 2020-02-07 02:14:30 UTC
  • mto: This revision was merged to the branch mainline in revision 7492.
  • Revision ID: jelmer@jelmer.uk-20200207021430-m49iq3x4x8xlib6x
Drop python2 support.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the osutils wrapper."""
 
18
 
 
19
from __future__ import absolute_import, division
 
20
 
 
21
import errno
 
22
from io import BytesIO
 
23
import os
 
24
import select
 
25
import socket
 
26
import sys
 
27
import tempfile
 
28
import time
 
29
 
 
30
from .. import (
 
31
    errors,
 
32
    osutils,
 
33
    tests,
 
34
    trace,
 
35
    win32utils,
 
36
    )
 
37
from . import (
 
38
    features,
 
39
    file_utils,
 
40
    test__walkdirs_win32,
 
41
    )
 
42
from .scenarios import load_tests_apply_scenarios
 
43
 
 
44
 
 
45
class _UTF8DirReaderFeature(features.ModuleAvailableFeature):
 
46
 
 
47
    def _probe(self):
 
48
        try:
 
49
            from .. import _readdir_pyx
 
50
            self._module = _readdir_pyx
 
51
            self.reader = _readdir_pyx.UTF8DirReader
 
52
            return True
 
53
        except ImportError:
 
54
            return False
 
55
 
 
56
 
 
57
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
 
58
 
 
59
term_ios_feature = features.ModuleAvailableFeature('termios')
 
60
 
 
61
 
 
62
def _already_unicode(s):
 
63
    return s
 
64
 
 
65
 
 
66
def _utf8_to_unicode(s):
 
67
    return s.decode('UTF-8')
 
68
 
 
69
 
 
70
def dir_reader_scenarios():
 
71
    # For each dir reader we define:
 
72
 
 
73
    # - native_to_unicode: a function converting the native_abspath as returned
 
74
    #   by DirReader.read_dir to its unicode representation
 
75
 
 
76
    # UnicodeDirReader is the fallback, it should be tested on all platforms.
 
77
    scenarios = [('unicode',
 
78
                  dict(_dir_reader_class=osutils.UnicodeDirReader,
 
79
                       _native_to_unicode=_already_unicode))]
 
80
    # Some DirReaders are platform specific and even there they may not be
 
81
    # available.
 
82
    if UTF8DirReaderFeature.available():
 
83
        from .. import _readdir_pyx
 
84
        scenarios.append(('utf8',
 
85
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
 
86
                               _native_to_unicode=_utf8_to_unicode)))
 
87
 
 
88
    if test__walkdirs_win32.win32_readdir_feature.available():
 
89
        try:
 
90
            from .. import _walkdirs_win32
 
91
            scenarios.append(
 
92
                ('win32',
 
93
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
 
94
                      _native_to_unicode=_already_unicode)))
 
95
        except ImportError:
 
96
            pass
 
97
    return scenarios
 
98
 
 
99
 
 
100
load_tests = load_tests_apply_scenarios
 
101
 
 
102
 
 
103
class TestContainsWhitespace(tests.TestCase):
 
104
 
 
105
    def test_contains_whitespace(self):
 
106
        self.assertTrue(osutils.contains_whitespace(u' '))
 
107
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
 
108
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
 
109
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
 
110
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
 
111
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
 
112
 
 
113
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
 
114
        # is whitespace, but we do not.
 
115
        self.assertFalse(osutils.contains_whitespace(u''))
 
116
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
 
117
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
 
118
 
 
119
 
 
120
class TestRename(tests.TestCaseInTempDir):
 
121
 
 
122
    def create_file(self, filename, content):
 
123
        f = open(filename, 'wb')
 
124
        try:
 
125
            f.write(content)
 
126
        finally:
 
127
            f.close()
 
128
 
 
129
    def _fancy_rename(self, a, b):
 
130
        osutils.fancy_rename(a, b, rename_func=os.rename,
 
131
                             unlink_func=os.unlink)
 
132
 
 
133
    def test_fancy_rename(self):
 
134
        # This should work everywhere
 
135
        self.create_file('a', b'something in a\n')
 
136
        self._fancy_rename('a', 'b')
 
137
        self.assertPathDoesNotExist('a')
 
138
        self.assertPathExists('b')
 
139
        self.check_file_contents('b', b'something in a\n')
 
140
 
 
141
        self.create_file('a', b'new something in a\n')
 
142
        self._fancy_rename('b', 'a')
 
143
 
 
144
        self.check_file_contents('a', b'something in a\n')
 
145
 
 
146
    def test_fancy_rename_fails_source_missing(self):
 
147
        # An exception should be raised, and the target should be left in place
 
148
        self.create_file('target', b'data in target\n')
 
149
        self.assertRaises((IOError, OSError), self._fancy_rename,
 
150
                          'missingsource', 'target')
 
151
        self.assertPathExists('target')
 
152
        self.check_file_contents('target', b'data in target\n')
 
153
 
 
154
    def test_fancy_rename_fails_if_source_and_target_missing(self):
 
155
        self.assertRaises((IOError, OSError), self._fancy_rename,
 
156
                          'missingsource', 'missingtarget')
 
157
 
 
158
    def test_rename(self):
 
159
        # Rename should be semi-atomic on all platforms
 
160
        self.create_file('a', b'something in a\n')
 
161
        osutils.rename('a', 'b')
 
162
        self.assertPathDoesNotExist('a')
 
163
        self.assertPathExists('b')
 
164
        self.check_file_contents('b', b'something in a\n')
 
165
 
 
166
        self.create_file('a', b'new something in a\n')
 
167
        osutils.rename('b', 'a')
 
168
 
 
169
        self.check_file_contents('a', b'something in a\n')
 
170
 
 
171
    # TODO: test fancy_rename using a MemoryTransport
 
172
 
 
173
    def test_rename_change_case(self):
 
174
        # on Windows we should be able to change filename case by rename
 
175
        self.build_tree(['a', 'b/'])
 
176
        osutils.rename('a', 'A')
 
177
        osutils.rename('b', 'B')
 
178
        # we can't use failUnlessExists on case-insensitive filesystem
 
179
        # so try to check shape of the tree
 
180
        shape = sorted(os.listdir('.'))
 
181
        self.assertEqual(['A', 'B'], shape)
 
182
 
 
183
    def test_rename_exception(self):
 
184
        try:
 
185
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
 
186
        except OSError as e:
 
187
            self.assertEqual(e.old_filename, 'nonexistent_path')
 
188
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
 
189
            self.assertTrue('nonexistent_path' in e.strerror)
 
190
            self.assertTrue('different_nonexistent_path' in e.strerror)
 
191
 
 
192
 
 
193
class TestRandChars(tests.TestCase):
 
194
 
 
195
    def test_01_rand_chars_empty(self):
 
196
        result = osutils.rand_chars(0)
 
197
        self.assertEqual(result, '')
 
198
 
 
199
    def test_02_rand_chars_100(self):
 
200
        result = osutils.rand_chars(100)
 
201
        self.assertEqual(len(result), 100)
 
202
        self.assertEqual(type(result), str)
 
203
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
 
204
 
 
205
 
 
206
class TestIsInside(tests.TestCase):
 
207
 
 
208
    def test_is_inside(self):
 
209
        is_inside = osutils.is_inside
 
210
        self.assertTrue(is_inside('src', 'src/foo.c'))
 
211
        self.assertFalse(is_inside('src', 'srccontrol'))
 
212
        self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
 
213
        self.assertTrue(is_inside('foo.c', 'foo.c'))
 
214
        self.assertFalse(is_inside('foo.c', ''))
 
215
        self.assertTrue(is_inside('', 'foo.c'))
 
216
 
 
217
    def test_is_inside_any(self):
 
218
        SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
 
219
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
 
220
                         (['src'], SRC_FOO_C),
 
221
                         (['src'], 'src'),
 
222
                         ]:
 
223
            self.assertTrue(osutils.is_inside_any(dirs, fn))
 
224
        for dirs, fn in [(['src'], 'srccontrol'),
 
225
                         (['src'], 'srccontrol/foo')]:
 
226
            self.assertFalse(osutils.is_inside_any(dirs, fn))
 
227
 
 
228
    def test_is_inside_or_parent_of_any(self):
 
229
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
 
230
                         (['src'], 'src/foo.c'),
 
231
                         (['src/bar.c'], 'src'),
 
232
                         (['src/bar.c', 'bla/foo.c'], 'src'),
 
233
                         (['src'], 'src'),
 
234
                         ]:
 
235
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
 
236
 
 
237
        for dirs, fn in [(['src'], 'srccontrol'),
 
238
                         (['srccontrol/foo.c'], 'src'),
 
239
                         (['src'], 'srccontrol/foo')]:
 
240
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
 
241
 
 
242
 
 
243
class TestLstat(tests.TestCaseInTempDir):
 
244
 
 
245
    def test_lstat_matches_fstat(self):
 
246
        # On Windows, lstat and fstat don't always agree, primarily in the
 
247
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
 
248
        # custom implementation.
 
249
        if sys.platform == 'win32':
 
250
            # We only have special lstat/fstat if we have the extension.
 
251
            # Without it, we may end up re-reading content when we don't have
 
252
            # to, but otherwise it doesn't effect correctness.
 
253
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
254
        with open('test-file.txt', 'wb') as f:
 
255
            f.write(b'some content\n')
 
256
            f.flush()
 
257
            self.assertEqualStat(osutils.fstat(f.fileno()),
 
258
                                 osutils.lstat('test-file.txt'))
 
259
 
 
260
 
 
261
class TestRmTree(tests.TestCaseInTempDir):
 
262
 
 
263
    def test_rmtree(self):
 
264
        # Check to remove tree with read-only files/dirs
 
265
        os.mkdir('dir')
 
266
        with open('dir/file', 'w') as f:
 
267
            f.write('spam')
 
268
        # would like to also try making the directory readonly, but at the
 
269
        # moment python shutil.rmtree doesn't handle that properly - it would
 
270
        # need to chmod the directory before removing things inside it - deferred
 
271
        # for now -- mbp 20060505
 
272
        # osutils.make_readonly('dir')
 
273
        osutils.make_readonly('dir/file')
 
274
 
 
275
        osutils.rmtree('dir')
 
276
 
 
277
        self.assertPathDoesNotExist('dir/file')
 
278
        self.assertPathDoesNotExist('dir')
 
279
 
 
280
 
 
281
class TestDeleteAny(tests.TestCaseInTempDir):
 
282
 
 
283
    def test_delete_any_readonly(self):
 
284
        # from <https://bugs.launchpad.net/bzr/+bug/218206>
 
285
        self.build_tree(['d/', 'f'])
 
286
        osutils.make_readonly('d')
 
287
        osutils.make_readonly('f')
 
288
 
 
289
        osutils.delete_any('f')
 
290
        osutils.delete_any('d')
 
291
 
 
292
 
 
293
class TestKind(tests.TestCaseInTempDir):
 
294
 
 
295
    def test_file_kind(self):
 
296
        self.build_tree(['file', 'dir/'])
 
297
        self.assertEqual('file', osutils.file_kind('file'))
 
298
        self.assertEqual('directory', osutils.file_kind('dir/'))
 
299
        if osutils.has_symlinks():
 
300
            os.symlink('symlink', 'symlink')
 
301
            self.assertEqual('symlink', osutils.file_kind('symlink'))
 
302
 
 
303
        # TODO: jam 20060529 Test a block device
 
304
        try:
 
305
            os.lstat('/dev/null')
 
306
        except OSError as e:
 
307
            if e.errno not in (errno.ENOENT,):
 
308
                raise
 
309
        else:
 
310
            self.assertEqual(
 
311
                'chardev',
 
312
                osutils.file_kind(os.path.realpath('/dev/null')))
 
313
 
 
314
        mkfifo = getattr(os, 'mkfifo', None)
 
315
        if mkfifo:
 
316
            mkfifo('fifo')
 
317
            try:
 
318
                self.assertEqual('fifo', osutils.file_kind('fifo'))
 
319
            finally:
 
320
                os.remove('fifo')
 
321
 
 
322
        AF_UNIX = getattr(socket, 'AF_UNIX', None)
 
323
        if AF_UNIX:
 
324
            s = socket.socket(AF_UNIX)
 
325
            s.bind('socket')
 
326
            try:
 
327
                self.assertEqual('socket', osutils.file_kind('socket'))
 
328
            finally:
 
329
                os.remove('socket')
 
330
 
 
331
    def test_kind_marker(self):
 
332
        self.assertEqual("", osutils.kind_marker("file"))
 
333
        self.assertEqual("/", osutils.kind_marker('directory'))
 
334
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
 
335
        self.assertEqual("@", osutils.kind_marker("symlink"))
 
336
        self.assertEqual("+", osutils.kind_marker("tree-reference"))
 
337
        self.assertEqual("", osutils.kind_marker("fifo"))
 
338
        self.assertEqual("", osutils.kind_marker("socket"))
 
339
        self.assertEqual("", osutils.kind_marker("unknown"))
 
340
 
 
341
 
 
342
class TestUmask(tests.TestCaseInTempDir):
 
343
 
 
344
    def test_get_umask(self):
 
345
        if sys.platform == 'win32':
 
346
            # umask always returns '0', no way to set it
 
347
            self.assertEqual(0, osutils.get_umask())
 
348
            return
 
349
 
 
350
        orig_umask = osutils.get_umask()
 
351
        self.addCleanup(os.umask, orig_umask)
 
352
        os.umask(0o222)
 
353
        self.assertEqual(0o222, osutils.get_umask())
 
354
        os.umask(0o022)
 
355
        self.assertEqual(0o022, osutils.get_umask())
 
356
        os.umask(0o002)
 
357
        self.assertEqual(0o002, osutils.get_umask())
 
358
        os.umask(0o027)
 
359
        self.assertEqual(0o027, osutils.get_umask())
 
360
 
 
361
 
 
362
class TestDateTime(tests.TestCase):
 
363
 
 
364
    def assertFormatedDelta(self, expected, seconds):
 
365
        """Assert osutils.format_delta formats as expected"""
 
366
        actual = osutils.format_delta(seconds)
 
367
        self.assertEqual(expected, actual)
 
368
 
 
369
    def test_format_delta(self):
 
370
        self.assertFormatedDelta('0 seconds ago', 0)
 
371
        self.assertFormatedDelta('1 second ago', 1)
 
372
        self.assertFormatedDelta('10 seconds ago', 10)
 
373
        self.assertFormatedDelta('59 seconds ago', 59)
 
374
        self.assertFormatedDelta('89 seconds ago', 89)
 
375
        self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
 
376
        self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
 
377
        self.assertFormatedDelta('3 minutes, 1 second ago', 181)
 
378
        self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
 
379
        self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
 
380
        self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
 
381
        self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
 
382
        self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
 
383
        self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
 
384
        self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
 
385
        self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
 
386
        self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
 
387
        self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
 
388
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
 
389
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
 
390
        self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
 
391
        self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
 
392
        self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
 
393
 
 
394
        # We handle when time steps the wrong direction because computers
 
395
        # don't have synchronized clocks.
 
396
        self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
 
397
        self.assertFormatedDelta('1 second in the future', -1)
 
398
        self.assertFormatedDelta('2 seconds in the future', -2)
 
399
 
 
400
    def test_format_date(self):
 
401
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
 
402
                          osutils.format_date, 0, timezone='foo')
 
403
        self.assertIsInstance(osutils.format_date(0), str)
 
404
        self.assertIsInstance(osutils.format_local_date(0), str)
 
405
        # Testing for the actual value of the local weekday without
 
406
        # duplicating the code from format_date is difficult.
 
407
        # Instead blackbox.test_locale should check for localized
 
408
        # dates once they do occur in output strings.
 
409
 
 
410
    def test_format_date_with_offset_in_original_timezone(self):
 
411
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
 
412
                         osutils.format_date_with_offset_in_original_timezone(0))
 
413
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
 
414
                         osutils.format_date_with_offset_in_original_timezone(100000))
 
415
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
 
416
                         osutils.format_date_with_offset_in_original_timezone(100000, 7200))
 
417
 
 
418
    def test_local_time_offset(self):
 
419
        """Test that local_time_offset() returns a sane value."""
 
420
        offset = osutils.local_time_offset()
 
421
        self.assertTrue(isinstance(offset, int))
 
422
        # Test that the offset is no more than a eighteen hours in
 
423
        # either direction.
 
424
        # Time zone handling is system specific, so it is difficult to
 
425
        # do more specific tests, but a value outside of this range is
 
426
        # probably wrong.
 
427
        eighteen_hours = 18 * 3600
 
428
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
429
 
 
430
    def test_local_time_offset_with_timestamp(self):
 
431
        """Test that local_time_offset() works with a timestamp."""
 
432
        offset = osutils.local_time_offset(1000000000.1234567)
 
433
        self.assertTrue(isinstance(offset, int))
 
434
        eighteen_hours = 18 * 3600
 
435
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
436
 
 
437
 
 
438
class TestFdatasync(tests.TestCaseInTempDir):
 
439
 
 
440
    def do_fdatasync(self):
 
441
        f = tempfile.NamedTemporaryFile()
 
442
        osutils.fdatasync(f.fileno())
 
443
        f.close()
 
444
 
 
445
    @staticmethod
 
446
    def raise_eopnotsupp(*args, **kwargs):
 
447
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
 
448
 
 
449
    @staticmethod
 
450
    def raise_enotsup(*args, **kwargs):
 
451
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
 
452
 
 
453
    def test_fdatasync_handles_system_function(self):
 
454
        self.overrideAttr(os, "fdatasync")
 
455
        self.do_fdatasync()
 
456
 
 
457
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
 
458
        self.overrideAttr(os, "fdatasync")
 
459
        self.overrideAttr(os, "fsync")
 
460
        self.do_fdatasync()
 
461
 
 
462
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
 
463
        self.overrideAttr(errno, "EOPNOTSUPP")
 
464
        self.do_fdatasync()
 
465
 
 
466
    def test_fdatasync_catches_ENOTSUP(self):
 
467
        enotsup = getattr(errno, "ENOTSUP", None)
 
468
        if enotsup is None:
 
469
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
 
470
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
 
471
        self.do_fdatasync()
 
472
 
 
473
    def test_fdatasync_catches_EOPNOTSUPP(self):
 
474
        enotsup = getattr(errno, "EOPNOTSUPP", None)
 
475
        if enotsup is None:
 
476
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
 
477
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
 
478
        self.do_fdatasync()
 
479
 
 
480
 
 
481
class TestLinks(tests.TestCaseInTempDir):
 
482
 
 
483
    def test_dereference_path(self):
 
484
        self.requireFeature(features.SymlinkFeature)
 
485
        cwd = osutils.realpath('.')
 
486
        os.mkdir('bar')
 
487
        bar_path = osutils.pathjoin(cwd, 'bar')
 
488
        # Using './' to avoid bug #1213894 (first path component not
 
489
        # dereferenced) in Python 2.4.1 and earlier
 
490
        self.assertEqual(bar_path, osutils.realpath('./bar'))
 
491
        os.symlink('bar', 'foo')
 
492
        self.assertEqual(bar_path, osutils.realpath('./foo'))
 
493
 
 
494
        # Does not dereference terminal symlinks
 
495
        foo_path = osutils.pathjoin(cwd, 'foo')
 
496
        self.assertEqual(foo_path, osutils.dereference_path('./foo'))
 
497
 
 
498
        # Dereferences parent symlinks
 
499
        os.mkdir('bar/baz')
 
500
        baz_path = osutils.pathjoin(bar_path, 'baz')
 
501
        self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
 
502
 
 
503
        # Dereferences parent symlinks that are the first path element
 
504
        self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
 
505
 
 
506
        # Dereferences parent symlinks in absolute paths
 
507
        foo_baz_path = osutils.pathjoin(foo_path, 'baz')
 
508
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
 
509
 
 
510
    def test_changing_access(self):
 
511
        with open('file', 'w') as f:
 
512
            f.write('monkey')
 
513
 
 
514
        # Make a file readonly
 
515
        osutils.make_readonly('file')
 
516
        mode = os.lstat('file').st_mode
 
517
        self.assertEqual(mode, mode & 0o777555)
 
518
 
 
519
        # Make a file writable
 
520
        osutils.make_writable('file')
 
521
        mode = os.lstat('file').st_mode
 
522
        self.assertEqual(mode, mode | 0o200)
 
523
 
 
524
        if osutils.has_symlinks():
 
525
            # should not error when handed a symlink
 
526
            os.symlink('nonexistent', 'dangling')
 
527
            osutils.make_readonly('dangling')
 
528
            osutils.make_writable('dangling')
 
529
 
 
530
    def test_host_os_dereferences_symlinks(self):
 
531
        osutils.host_os_dereferences_symlinks()
 
532
 
 
533
 
 
534
class TestCanonicalRelPath(tests.TestCaseInTempDir):
 
535
 
 
536
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
 
537
 
 
538
    def test_canonical_relpath_simple(self):
 
539
        f = open('MixedCaseName', 'w')
 
540
        f.close()
 
541
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
 
542
        self.assertEqual('work/MixedCaseName', actual)
 
543
 
 
544
    def test_canonical_relpath_missing_tail(self):
 
545
        os.mkdir('MixedCaseParent')
 
546
        actual = osutils.canonical_relpath(self.test_base_dir,
 
547
                                           'mixedcaseparent/nochild')
 
548
        self.assertEqual('work/MixedCaseParent/nochild', actual)
 
549
 
 
550
 
 
551
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
 
552
 
 
553
    def assertRelpath(self, expected, base, path):
 
554
        actual = osutils._cicp_canonical_relpath(base, path)
 
555
        self.assertEqual(expected, actual)
 
556
 
 
557
    def test_simple(self):
 
558
        self.build_tree(['MixedCaseName'])
 
559
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
 
560
        self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
 
561
 
 
562
    def test_subdir_missing_tail(self):
 
563
        self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
 
564
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
 
565
        self.assertRelpath('MixedCaseParent/a_child', base,
 
566
                           'MixedCaseParent/a_child')
 
567
        self.assertRelpath('MixedCaseParent/a_child', base,
 
568
                           'MixedCaseParent/A_Child')
 
569
        self.assertRelpath('MixedCaseParent/not_child', base,
 
570
                           'MixedCaseParent/not_child')
 
571
 
 
572
    def test_at_root_slash(self):
 
573
        # We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
 
574
        # check...
 
575
        if osutils.MIN_ABS_PATHLENGTH > 1:
 
576
            raise tests.TestSkipped('relpath requires %d chars'
 
577
                                    % osutils.MIN_ABS_PATHLENGTH)
 
578
        self.assertRelpath('foo', '/', '/foo')
 
579
 
 
580
    def test_at_root_drive(self):
 
581
        if sys.platform != 'win32':
 
582
            raise tests.TestNotApplicable('we can only test drive-letter relative'
 
583
                                          ' paths on Windows where we have drive'
 
584
                                          ' letters.')
 
585
        # see bug #322807
 
586
        # The specific issue is that when at the root of a drive, 'abspath'
 
587
        # returns "C:/" or just "/". However, the code assumes that abspath
 
588
        # always returns something like "C:/foo" or "/foo" (no trailing slash).
 
589
        self.assertRelpath('foo', 'C:/', 'C:/foo')
 
590
        self.assertRelpath('foo', 'X:/', 'X:/foo')
 
591
        self.assertRelpath('foo', 'X:/', 'X://foo')
 
592
 
 
593
 
 
594
class TestPumpFile(tests.TestCase):
 
595
    """Test pumpfile method."""
 
596
 
 
597
    def setUp(self):
 
598
        super(TestPumpFile, self).setUp()
 
599
        # create a test datablock
 
600
        self.block_size = 512
 
601
        pattern = b'0123456789ABCDEF'
 
602
        self.test_data = pattern * (3 * self.block_size // len(pattern))
 
603
        self.test_data_len = len(self.test_data)
 
604
 
 
605
    def test_bracket_block_size(self):
 
606
        """Read data in blocks with the requested read size bracketing the
 
607
        block size."""
 
608
        # make sure test data is larger than max read size
 
609
        self.assertTrue(self.test_data_len > self.block_size)
 
610
 
 
611
        from_file = file_utils.FakeReadFile(self.test_data)
 
612
        to_file = BytesIO()
 
613
 
 
614
        # read (max // 2) bytes and verify read size wasn't affected
 
615
        num_bytes_to_read = self.block_size // 2
 
616
        osutils.pumpfile(from_file, to_file,
 
617
                         num_bytes_to_read, self.block_size)
 
618
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
619
        self.assertEqual(from_file.get_read_count(), 1)
 
620
 
 
621
        # read (max) bytes and verify read size wasn't affected
 
622
        num_bytes_to_read = self.block_size
 
623
        from_file.reset_read_count()
 
624
        osutils.pumpfile(from_file, to_file,
 
625
                         num_bytes_to_read, self.block_size)
 
626
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
627
        self.assertEqual(from_file.get_read_count(), 1)
 
628
 
 
629
        # read (max + 1) bytes and verify read size was limited
 
630
        num_bytes_to_read = self.block_size + 1
 
631
        from_file.reset_read_count()
 
632
        osutils.pumpfile(from_file, to_file,
 
633
                         num_bytes_to_read, self.block_size)
 
634
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
635
        self.assertEqual(from_file.get_read_count(), 2)
 
636
 
 
637
        # finish reading the rest of the data
 
638
        num_bytes_to_read = self.test_data_len - to_file.tell()
 
639
        osutils.pumpfile(from_file, to_file,
 
640
                         num_bytes_to_read, self.block_size)
 
641
 
 
642
        # report error if the data wasn't equal (we only report the size due
 
643
        # to the length of the data)
 
644
        response_data = to_file.getvalue()
 
645
        if response_data != self.test_data:
 
646
            message = "Data not equal.  Expected %d bytes, received %d."
 
647
            self.fail(message % (len(response_data), self.test_data_len))
 
648
 
 
649
    def test_specified_size(self):
 
650
        """Request a transfer larger than the maximum block size and verify
 
651
        that the maximum read doesn't exceed the block_size."""
 
652
        # make sure test data is larger than max read size
 
653
        self.assertTrue(self.test_data_len > self.block_size)
 
654
 
 
655
        # retrieve data in blocks
 
656
        from_file = file_utils.FakeReadFile(self.test_data)
 
657
        to_file = BytesIO()
 
658
        osutils.pumpfile(from_file, to_file, self.test_data_len,
 
659
                         self.block_size)
 
660
 
 
661
        # verify read size was equal to the maximum read size
 
662
        self.assertTrue(from_file.get_max_read_size() > 0)
 
663
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
664
        self.assertEqual(from_file.get_read_count(), 3)
 
665
 
 
666
        # report error if the data wasn't equal (we only report the size due
 
667
        # to the length of the data)
 
668
        response_data = to_file.getvalue()
 
669
        if response_data != self.test_data:
 
670
            message = "Data not equal.  Expected %d bytes, received %d."
 
671
            self.fail(message % (len(response_data), self.test_data_len))
 
672
 
 
673
    def test_to_eof(self):
 
674
        """Read to end-of-file and verify that the reads are not larger than
 
675
        the maximum read size."""
 
676
        # make sure test data is larger than max read size
 
677
        self.assertTrue(self.test_data_len > self.block_size)
 
678
 
 
679
        # retrieve data to EOF
 
680
        from_file = file_utils.FakeReadFile(self.test_data)
 
681
        to_file = BytesIO()
 
682
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
 
683
 
 
684
        # verify read size was equal to the maximum read size
 
685
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
686
        self.assertEqual(from_file.get_read_count(), 4)
 
687
 
 
688
        # report error if the data wasn't equal (we only report the size due
 
689
        # to the length of the data)
 
690
        response_data = to_file.getvalue()
 
691
        if response_data != self.test_data:
 
692
            message = "Data not equal.  Expected %d bytes, received %d."
 
693
            self.fail(message % (len(response_data), self.test_data_len))
 
694
 
 
695
    def test_defaults(self):
 
696
        """Verifies that the default arguments will read to EOF -- this
 
697
        test verifies that any existing usages of pumpfile will not be broken
 
698
        with this new version."""
 
699
        # retrieve data using default (old) pumpfile method
 
700
        from_file = file_utils.FakeReadFile(self.test_data)
 
701
        to_file = BytesIO()
 
702
        osutils.pumpfile(from_file, to_file)
 
703
 
 
704
        # report error if the data wasn't equal (we only report the size due
 
705
        # to the length of the data)
 
706
        response_data = to_file.getvalue()
 
707
        if response_data != self.test_data:
 
708
            message = "Data not equal.  Expected %d bytes, received %d."
 
709
            self.fail(message % (len(response_data), self.test_data_len))
 
710
 
 
711
    def test_report_activity(self):
 
712
        activity = []
 
713
 
 
714
        def log_activity(length, direction):
 
715
            activity.append((length, direction))
 
716
        from_file = BytesIO(self.test_data)
 
717
        to_file = BytesIO()
 
718
        osutils.pumpfile(from_file, to_file, buff_size=500,
 
719
                         report_activity=log_activity, direction='read')
 
720
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
 
721
                          (36, 'read')], activity)
 
722
 
 
723
        from_file = BytesIO(self.test_data)
 
724
        to_file = BytesIO()
 
725
        del activity[:]
 
726
        osutils.pumpfile(from_file, to_file, buff_size=500,
 
727
                         report_activity=log_activity, direction='write')
 
728
        self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
 
729
                          (36, 'write')], activity)
 
730
 
 
731
        # And with a limited amount of data
 
732
        from_file = BytesIO(self.test_data)
 
733
        to_file = BytesIO()
 
734
        del activity[:]
 
735
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
 
736
                         report_activity=log_activity, direction='read')
 
737
        self.assertEqual(
 
738
            [(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
739
 
 
740
 
 
741
class TestPumpStringFile(tests.TestCase):
 
742
 
 
743
    def test_empty(self):
 
744
        output = BytesIO()
 
745
        osutils.pump_string_file(b"", output)
 
746
        self.assertEqual(b"", output.getvalue())
 
747
 
 
748
    def test_more_than_segment_size(self):
 
749
        output = BytesIO()
 
750
        osutils.pump_string_file(b"123456789", output, 2)
 
751
        self.assertEqual(b"123456789", output.getvalue())
 
752
 
 
753
    def test_segment_size(self):
 
754
        output = BytesIO()
 
755
        osutils.pump_string_file(b"12", output, 2)
 
756
        self.assertEqual(b"12", output.getvalue())
 
757
 
 
758
    def test_segment_size_multiple(self):
 
759
        output = BytesIO()
 
760
        osutils.pump_string_file(b"1234", output, 2)
 
761
        self.assertEqual(b"1234", output.getvalue())
 
762
 
 
763
 
 
764
class TestRelpath(tests.TestCase):
 
765
 
 
766
    def test_simple_relpath(self):
 
767
        cwd = osutils.getcwd()
 
768
        subdir = cwd + '/subdir'
 
769
        self.assertEqual('subdir', osutils.relpath(cwd, subdir))
 
770
 
 
771
    def test_deep_relpath(self):
 
772
        cwd = osutils.getcwd()
 
773
        subdir = cwd + '/sub/subsubdir'
 
774
        self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
 
775
 
 
776
    def test_not_relative(self):
 
777
        self.assertRaises(errors.PathNotChild,
 
778
                          osutils.relpath, 'C:/path', 'H:/path')
 
779
        self.assertRaises(errors.PathNotChild,
 
780
                          osutils.relpath, 'C:/', 'H:/path')
 
781
 
 
782
 
 
783
class TestSafeUnicode(tests.TestCase):
 
784
 
 
785
    def test_from_ascii_string(self):
 
786
        self.assertEqual(u'foobar', osutils.safe_unicode(b'foobar'))
 
787
 
 
788
    def test_from_unicode_string_ascii_contents(self):
 
789
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
 
790
 
 
791
    def test_from_unicode_string_unicode_contents(self):
 
792
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
 
793
 
 
794
    def test_from_utf8_string(self):
 
795
        self.assertEqual(u'foo\xae', osutils.safe_unicode(b'foo\xc2\xae'))
 
796
 
 
797
    def test_bad_utf8_string(self):
 
798
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
799
                          osutils.safe_unicode,
 
800
                          b'\xbb\xbb')
 
801
 
 
802
 
 
803
class TestSafeUtf8(tests.TestCase):
 
804
 
 
805
    def test_from_ascii_string(self):
 
806
        f = b'foobar'
 
807
        self.assertEqual(b'foobar', osutils.safe_utf8(f))
 
808
 
 
809
    def test_from_unicode_string_ascii_contents(self):
 
810
        self.assertEqual(b'bargam', osutils.safe_utf8(u'bargam'))
 
811
 
 
812
    def test_from_unicode_string_unicode_contents(self):
 
813
        self.assertEqual(b'bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
814
 
 
815
    def test_from_utf8_string(self):
 
816
        self.assertEqual(b'foo\xc2\xae', osutils.safe_utf8(b'foo\xc2\xae'))
 
817
 
 
818
    def test_bad_utf8_string(self):
 
819
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
820
                          osutils.safe_utf8, b'\xbb\xbb')
 
821
 
 
822
 
 
823
class TestSafeRevisionId(tests.TestCase):
 
824
 
 
825
    def test_from_ascii_string(self):
 
826
        # this shouldn't give a warning because it's getting an ascii string
 
827
        self.assertEqual(b'foobar', osutils.safe_revision_id(b'foobar'))
 
828
 
 
829
    def test_from_unicode_string_ascii_contents(self):
 
830
        self.assertRaises(TypeError,
 
831
                          osutils.safe_revision_id, u'bargam')
 
832
 
 
833
    def test_from_unicode_string_unicode_contents(self):
 
834
        self.assertRaises(TypeError,
 
835
                          osutils.safe_revision_id, u'bargam\xae')
 
836
 
 
837
    def test_from_utf8_string(self):
 
838
        self.assertEqual(b'foo\xc2\xae',
 
839
                         osutils.safe_revision_id(b'foo\xc2\xae'))
 
840
 
 
841
    def test_none(self):
 
842
        """Currently, None is a valid revision_id"""
 
843
        self.assertEqual(None, osutils.safe_revision_id(None))
 
844
 
 
845
 
 
846
class TestSafeFileId(tests.TestCase):
 
847
 
 
848
    def test_from_ascii_string(self):
 
849
        self.assertEqual(b'foobar', osutils.safe_file_id(b'foobar'))
 
850
 
 
851
    def test_from_unicode_string_ascii_contents(self):
 
852
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
 
853
 
 
854
    def test_from_unicode_string_unicode_contents(self):
 
855
        self.assertRaises(TypeError,
 
856
                          osutils.safe_file_id, u'bargam\xae')
 
857
 
 
858
    def test_from_utf8_string(self):
 
859
        self.assertEqual(b'foo\xc2\xae',
 
860
                         osutils.safe_file_id(b'foo\xc2\xae'))
 
861
 
 
862
    def test_none(self):
 
863
        """Currently, None is a valid revision_id"""
 
864
        self.assertEqual(None, osutils.safe_file_id(None))
 
865
 
 
866
 
 
867
class TestSendAll(tests.TestCase):
 
868
 
 
869
    def test_send_with_disconnected_socket(self):
 
870
        class DisconnectedSocket(object):
 
871
            def __init__(self, err):
 
872
                self.err = err
 
873
 
 
874
            def send(self, content):
 
875
                raise self.err
 
876
 
 
877
            def close(self):
 
878
                pass
 
879
        # All of these should be treated as ConnectionReset
 
880
        errs = []
 
881
        for err_cls in (IOError, socket.error):
 
882
            for errnum in osutils._end_of_stream_errors:
 
883
                errs.append(err_cls(errnum))
 
884
        for err in errs:
 
885
            sock = DisconnectedSocket(err)
 
886
            self.assertRaises(errors.ConnectionReset,
 
887
                              osutils.send_all, sock, b'some more content')
 
888
 
 
889
    def test_send_with_no_progress(self):
 
890
        # See https://bugs.launchpad.net/bzr/+bug/1047309
 
891
        # It seems that paramiko can get into a state where it doesn't error,
 
892
        # but it returns 0 bytes sent for requests over and over again.
 
893
        class NoSendingSocket(object):
 
894
            def __init__(self):
 
895
                self.call_count = 0
 
896
 
 
897
            def send(self, bytes):
 
898
                self.call_count += 1
 
899
                if self.call_count > 100:
 
900
                    # Prevent the test suite from hanging
 
901
                    raise RuntimeError('too many calls')
 
902
                return 0
 
903
        sock = NoSendingSocket()
 
904
        self.assertRaises(errors.ConnectionReset,
 
905
                          osutils.send_all, sock, b'content')
 
906
        self.assertEqual(1, sock.call_count)
 
907
 
 
908
 
 
909
class TestPosixFuncs(tests.TestCase):
 
910
    """Test that the posix version of normpath returns an appropriate path
 
911
       when used with 2 leading slashes."""
 
912
 
 
913
    def test_normpath(self):
 
914
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
 
915
        self.assertEqual(
 
916
            '/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
917
        self.assertEqual(
 
918
            '/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
919
 
 
920
 
 
921
class TestWin32Funcs(tests.TestCase):
 
922
    """Test that _win32 versions of os utilities return appropriate paths."""
 
923
 
 
924
    def test_abspath(self):
 
925
        self.requireFeature(features.win32_feature)
 
926
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
 
927
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
 
928
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
 
929
        self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
 
930
 
 
931
    def test_realpath(self):
 
932
        self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
 
933
        self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
 
934
 
 
935
    def test_pathjoin(self):
 
936
        self.assertEqual('path/to/foo',
 
937
                         osutils._win32_pathjoin('path', 'to', 'foo'))
 
938
        self.assertEqual('C:/foo',
 
939
                         osutils._win32_pathjoin('path\\to', 'C:\\foo'))
 
940
        self.assertEqual('C:/foo',
 
941
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
 
942
        self.assertEqual('path/to/foo',
 
943
                         osutils._win32_pathjoin('path/to/', 'foo'))
 
944
 
 
945
    def test_pathjoin_late_bugfix(self):
 
946
        expected = 'C:/foo'
 
947
        self.assertEqual(expected,
 
948
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
 
949
        self.assertEqual(expected,
 
950
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
 
951
 
 
952
    def test_normpath(self):
 
953
        self.assertEqual('path/to/foo',
 
954
                         osutils._win32_normpath(r'path\\from\..\to\.\foo'))
 
955
        self.assertEqual('path/to/foo',
 
956
                         osutils._win32_normpath('path//from/../to/./foo'))
 
957
 
 
958
    def test_getcwd(self):
 
959
        cwd = osutils._win32_getcwd()
 
960
        os_cwd = osutils._getcwd()
 
961
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
 
962
        # win32 is inconsistent whether it returns lower or upper case
 
963
        # and even if it was consistent the user might type the other
 
964
        # so we force it to uppercase
 
965
        # running python.exe under cmd.exe return capital C:\\
 
966
        # running win32 python inside a cygwin shell returns lowercase
 
967
        self.assertEqual(os_cwd[0].upper(), cwd[0])
 
968
 
 
969
    def test_fixdrive(self):
 
970
        self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
 
971
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
 
972
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
 
973
 
 
974
 
 
975
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
 
976
    """Test win32 functions that create files."""
 
977
 
 
978
    def test_getcwd(self):
 
979
        self.requireFeature(features.UnicodeFilenameFeature)
 
980
        os.mkdir(u'mu-\xb5')
 
981
        os.chdir(u'mu-\xb5')
 
982
        # TODO: jam 20060427 This will probably fail on Mac OSX because
 
983
        #       it will change the normalization of B\xe5gfors
 
984
        #       Consider using a different unicode character, or make
 
985
        #       osutils.getcwd() renormalize the path.
 
986
        self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
 
987
 
 
988
    def test_minimum_path_selection(self):
 
989
        self.assertEqual(set(),
 
990
                         osutils.minimum_path_selection([]))
 
991
        self.assertEqual({'a'},
 
992
                         osutils.minimum_path_selection(['a']))
 
993
        self.assertEqual({'a', 'b'},
 
994
                         osutils.minimum_path_selection(['a', 'b']))
 
995
        self.assertEqual({'a/', 'b'},
 
996
                         osutils.minimum_path_selection(['a/', 'b']))
 
997
        self.assertEqual({'a/', 'b'},
 
998
                         osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
999
        self.assertEqual({'a-b', 'a', 'a0b'},
 
1000
                         osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
 
1001
 
 
1002
    def test_mkdtemp(self):
 
1003
        tmpdir = osutils._win32_mkdtemp(dir='.')
 
1004
        self.assertFalse('\\' in tmpdir)
 
1005
 
 
1006
    def test_rename(self):
 
1007
        with open('a', 'wb') as a:
 
1008
            a.write(b'foo\n')
 
1009
        with open('b', 'wb') as b:
 
1010
            b.write(b'baz\n')
 
1011
 
 
1012
        osutils._win32_rename('b', 'a')
 
1013
        self.assertPathExists('a')
 
1014
        self.assertPathDoesNotExist('b')
 
1015
        self.assertFileEqual(b'baz\n', 'a')
 
1016
 
 
1017
    def test_rename_missing_file(self):
 
1018
        with open('a', 'wb') as a:
 
1019
            a.write(b'foo\n')
 
1020
 
 
1021
        try:
 
1022
            osutils._win32_rename('b', 'a')
 
1023
        except (IOError, OSError) as e:
 
1024
            self.assertEqual(errno.ENOENT, e.errno)
 
1025
        self.assertFileEqual(b'foo\n', 'a')
 
1026
 
 
1027
    def test_rename_missing_dir(self):
 
1028
        os.mkdir('a')
 
1029
        try:
 
1030
            osutils._win32_rename('b', 'a')
 
1031
        except (IOError, OSError) as e:
 
1032
            self.assertEqual(errno.ENOENT, e.errno)
 
1033
 
 
1034
    def test_rename_current_dir(self):
 
1035
        os.mkdir('a')
 
1036
        os.chdir('a')
 
1037
        # You can't rename the working directory
 
1038
        # doing rename non-existant . usually
 
1039
        # just raises ENOENT, since non-existant
 
1040
        # doesn't exist.
 
1041
        try:
 
1042
            osutils._win32_rename('b', '.')
 
1043
        except (IOError, OSError) as e:
 
1044
            self.assertEqual(errno.ENOENT, e.errno)
 
1045
 
 
1046
    def test_splitpath(self):
 
1047
        def check(expected, path):
 
1048
            self.assertEqual(expected, osutils.splitpath(path))
 
1049
 
 
1050
        check(['a'], 'a')
 
1051
        check(['a', 'b'], 'a/b')
 
1052
        check(['a', 'b'], 'a/./b')
 
1053
        check(['a', '.b'], 'a/.b')
 
1054
        if os.path.sep == '\\':
 
1055
            check(['a', '.b'], 'a\\.b')
 
1056
        else:
 
1057
            check(['a\\.b'], 'a\\.b')
 
1058
 
 
1059
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
 
1060
 
 
1061
 
 
1062
class TestParentDirectories(tests.TestCaseInTempDir):
 
1063
    """Test osutils.parent_directories()"""
 
1064
 
 
1065
    def test_parent_directories(self):
 
1066
        self.assertEqual([], osutils.parent_directories('a'))
 
1067
        self.assertEqual(['a'], osutils.parent_directories('a/b'))
 
1068
        self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
 
1069
 
 
1070
 
 
1071
class TestMacFuncsDirs(tests.TestCaseInTempDir):
 
1072
    """Test mac special functions that require directories."""
 
1073
 
 
1074
    def test_getcwd(self):
 
1075
        self.requireFeature(features.UnicodeFilenameFeature)
 
1076
        os.mkdir(u'B\xe5gfors')
 
1077
        os.chdir(u'B\xe5gfors')
 
1078
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
1079
 
 
1080
    def test_getcwd_nonnorm(self):
 
1081
        self.requireFeature(features.UnicodeFilenameFeature)
 
1082
        # Test that _mac_getcwd() will normalize this path
 
1083
        os.mkdir(u'Ba\u030agfors')
 
1084
        os.chdir(u'Ba\u030agfors')
 
1085
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
1086
 
 
1087
 
 
1088
class TestChunksToLines(tests.TestCase):
 
1089
 
 
1090
    def test_smoketest(self):
 
1091
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
 
1092
                         osutils.chunks_to_lines([b'foo\nbar', b'\nbaz\n']))
 
1093
        self.assertEqual([b'foo\n', b'bar\n', b'baz\n'],
 
1094
                         osutils.chunks_to_lines([b'foo\n', b'bar\n', b'baz\n']))
 
1095
 
 
1096
    def test_osutils_binding(self):
 
1097
        from . import test__chunks_to_lines
 
1098
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
 
1099
            from .._chunks_to_lines_pyx import chunks_to_lines
 
1100
        else:
 
1101
            from .._chunks_to_lines_py import chunks_to_lines
 
1102
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
 
1103
 
 
1104
 
 
1105
class TestSplitLines(tests.TestCase):
 
1106
 
 
1107
    def test_split_unicode(self):
 
1108
        self.assertEqual([u'foo\n', u'bar\xae'],
 
1109
                         osutils.split_lines(u'foo\nbar\xae'))
 
1110
        self.assertEqual([u'foo\n', u'bar\xae\n'],
 
1111
                         osutils.split_lines(u'foo\nbar\xae\n'))
 
1112
 
 
1113
    def test_split_with_carriage_returns(self):
 
1114
        self.assertEqual([b'foo\rbar\n'],
 
1115
                         osutils.split_lines(b'foo\rbar\n'))
 
1116
 
 
1117
 
 
1118
class TestWalkDirs(tests.TestCaseInTempDir):
 
1119
 
 
1120
    def assertExpectedBlocks(self, expected, result):
 
1121
        self.assertEqual(expected,
 
1122
                         [(dirinfo, [line[0:3] for line in block])
 
1123
                          for dirinfo, block in result])
 
1124
 
 
1125
    def test_walkdirs(self):
 
1126
        tree = [
 
1127
            '.bzr',
 
1128
            '0file',
 
1129
            '1dir/',
 
1130
            '1dir/0file',
 
1131
            '1dir/1dir/',
 
1132
            '2file'
 
1133
            ]
 
1134
        self.build_tree(tree)
 
1135
        expected_dirblocks = [
 
1136
            (('', '.'),
 
1137
             [('0file', '0file', 'file'),
 
1138
              ('1dir', '1dir', 'directory'),
 
1139
              ('2file', '2file', 'file'),
 
1140
              ]
 
1141
             ),
 
1142
            (('1dir', './1dir'),
 
1143
             [('1dir/0file', '0file', 'file'),
 
1144
              ('1dir/1dir', '1dir', 'directory'),
 
1145
              ]
 
1146
             ),
 
1147
            (('1dir/1dir', './1dir/1dir'),
 
1148
             [
 
1149
                ]
 
1150
             ),
 
1151
            ]
 
1152
        result = []
 
1153
        found_bzrdir = False
 
1154
        for dirdetail, dirblock in osutils.walkdirs('.'):
 
1155
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
1156
                # this tests the filtering of selected paths
 
1157
                found_bzrdir = True
 
1158
                del dirblock[0]
 
1159
            result.append((dirdetail, dirblock))
 
1160
 
 
1161
        self.assertTrue(found_bzrdir)
 
1162
        self.assertExpectedBlocks(expected_dirblocks, result)
 
1163
        # you can search a subdir only, with a supplied prefix.
 
1164
        result = []
 
1165
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
1166
            result.append(dirblock)
 
1167
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
 
1168
 
 
1169
    def test_walkdirs_os_error(self):
 
1170
        # <https://bugs.launchpad.net/bzr/+bug/338653>
 
1171
        # Pyrex readdir didn't raise useful messages if it had an error
 
1172
        # reading the directory
 
1173
        if sys.platform == 'win32':
 
1174
            raise tests.TestNotApplicable(
 
1175
                "readdir IOError not tested on win32")
 
1176
        self.requireFeature(features.not_running_as_root)
 
1177
        os.mkdir("test-unreadable")
 
1178
        os.chmod("test-unreadable", 0000)
 
1179
        # must chmod it back so that it can be removed
 
1180
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
 
1181
        # The error is not raised until the generator is actually evaluated.
 
1182
        # (It would be ok if it happened earlier but at the moment it
 
1183
        # doesn't.)
 
1184
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
 
1185
        self.assertEqual('./test-unreadable', osutils.safe_unicode(e.filename))
 
1186
        self.assertEqual(errno.EACCES, e.errno)
 
1187
        # Ensure the message contains the file name
 
1188
        self.assertContainsRe(str(e), "\\./test-unreadable")
 
1189
 
 
1190
    def test_walkdirs_encoding_error(self):
 
1191
        # <https://bugs.launchpad.net/bzr/+bug/488519>
 
1192
        # walkdirs didn't raise a useful message when the filenames
 
1193
        # are not using the filesystem's encoding
 
1194
 
 
1195
        # require a bytestring based filesystem
 
1196
        self.requireFeature(features.ByteStringNamedFilesystem)
 
1197
 
 
1198
        tree = [
 
1199
            '.bzr',
 
1200
            '0file',
 
1201
            '1dir/',
 
1202
            '1dir/0file',
 
1203
            '1dir/1dir/',
 
1204
            '1file'
 
1205
            ]
 
1206
 
 
1207
        self.build_tree(tree)
 
1208
 
 
1209
        # rename the 1file to a latin-1 filename
 
1210
        os.rename(b"./1file", b"\xe8file")
 
1211
        if b"\xe8file" not in os.listdir("."):
 
1212
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
 
1213
 
 
1214
        self._save_platform_info()
 
1215
        osutils._fs_enc = 'UTF-8'
 
1216
 
 
1217
        # this should raise on error
 
1218
        def attempt():
 
1219
            for dirdetail, dirblock in osutils.walkdirs(b'.'):
 
1220
                pass
 
1221
 
 
1222
        self.assertRaises(errors.BadFilenameEncoding, attempt)
 
1223
 
 
1224
    def test__walkdirs_utf8(self):
 
1225
        tree = [
 
1226
            '.bzr',
 
1227
            '0file',
 
1228
            '1dir/',
 
1229
            '1dir/0file',
 
1230
            '1dir/1dir/',
 
1231
            '2file'
 
1232
            ]
 
1233
        self.build_tree(tree)
 
1234
        expected_dirblocks = [
 
1235
            (('', '.'),
 
1236
             [('0file', '0file', 'file'),
 
1237
              ('1dir', '1dir', 'directory'),
 
1238
              ('2file', '2file', 'file'),
 
1239
              ]
 
1240
             ),
 
1241
            (('1dir', './1dir'),
 
1242
             [('1dir/0file', '0file', 'file'),
 
1243
              ('1dir/1dir', '1dir', 'directory'),
 
1244
              ]
 
1245
             ),
 
1246
            (('1dir/1dir', './1dir/1dir'),
 
1247
             [
 
1248
                ]
 
1249
             ),
 
1250
            ]
 
1251
        result = []
 
1252
        found_bzrdir = False
 
1253
        for dirdetail, dirblock in osutils._walkdirs_utf8(b'.'):
 
1254
            if len(dirblock) and dirblock[0][1] == b'.bzr':
 
1255
                # this tests the filtering of selected paths
 
1256
                found_bzrdir = True
 
1257
                del dirblock[0]
 
1258
            dirdetail = (dirdetail[0].decode('utf-8'),
 
1259
                         osutils.safe_unicode(dirdetail[1]))
 
1260
            dirblock = [
 
1261
                (entry[0].decode('utf-8'), entry[1].decode('utf-8'), entry[2])
 
1262
                for entry in dirblock]
 
1263
            result.append((dirdetail, dirblock))
 
1264
 
 
1265
        self.assertTrue(found_bzrdir)
 
1266
        self.assertExpectedBlocks(expected_dirblocks, result)
 
1267
 
 
1268
        # you can search a subdir only, with a supplied prefix.
 
1269
        result = []
 
1270
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
1271
            result.append(dirblock)
 
1272
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
 
1273
 
 
1274
    def _filter_out_stat(self, result):
 
1275
        """Filter out the stat value from the walkdirs result"""
 
1276
        for dirdetail, dirblock in result:
 
1277
            new_dirblock = []
 
1278
            for info in dirblock:
 
1279
                # Ignore info[3] which is the stat
 
1280
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
1281
            dirblock[:] = new_dirblock
 
1282
 
 
1283
    def _save_platform_info(self):
 
1284
        self.overrideAttr(osutils, '_fs_enc')
 
1285
        self.overrideAttr(osutils, '_selected_dir_reader')
 
1286
 
 
1287
    def assertDirReaderIs(self, expected, top):
 
1288
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
 
1289
        # Force it to redetect
 
1290
        osutils._selected_dir_reader = None
 
1291
        # Nothing to list, but should still trigger the selection logic
 
1292
        self.assertEqual([((b'', top), [])], list(osutils._walkdirs_utf8('.')))
 
1293
        self.assertIsInstance(osutils._selected_dir_reader, expected)
 
1294
 
 
1295
    def test_force_walkdirs_utf8_fs_utf8(self):
 
1296
        self.requireFeature(UTF8DirReaderFeature)
 
1297
        self._save_platform_info()
 
1298
        osutils._fs_enc = 'utf-8'
 
1299
        self.assertDirReaderIs(UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1300
 
 
1301
    def test_force_walkdirs_utf8_fs_ascii(self):
 
1302
        self.requireFeature(UTF8DirReaderFeature)
 
1303
        self._save_platform_info()
 
1304
        osutils._fs_enc = 'ascii'
 
1305
        self.assertDirReaderIs(
 
1306
            UTF8DirReaderFeature.module.UTF8DirReader, b".")
 
1307
 
 
1308
    def test_force_walkdirs_utf8_fs_latin1(self):
 
1309
        self._save_platform_info()
 
1310
        osutils._fs_enc = 'iso-8859-1'
 
1311
        self.assertDirReaderIs(osutils.UnicodeDirReader, ".")
 
1312
 
 
1313
    def test_force_walkdirs_utf8_nt(self):
 
1314
        # Disabled because the thunk of the whole walkdirs api is disabled.
 
1315
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1316
        self._save_platform_info()
 
1317
        from .._walkdirs_win32 import Win32ReadDir
 
1318
        self.assertDirReaderIs(Win32ReadDir, ".")
 
1319
 
 
1320
    def test_unicode_walkdirs(self):
 
1321
        """Walkdirs should always return unicode paths."""
 
1322
        self.requireFeature(features.UnicodeFilenameFeature)
 
1323
        name0 = u'0file-\xb6'
 
1324
        name1 = u'1dir-\u062c\u0648'
 
1325
        name2 = u'2file-\u0633'
 
1326
        tree = [
 
1327
            name0,
 
1328
            name1 + '/',
 
1329
            name1 + '/' + name0,
 
1330
            name1 + '/' + name1 + '/',
 
1331
            name2,
 
1332
            ]
 
1333
        self.build_tree(tree)
 
1334
        expected_dirblocks = [
 
1335
            ((u'', u'.'),
 
1336
             [(name0, name0, 'file', './' + name0),
 
1337
              (name1, name1, 'directory', './' + name1),
 
1338
              (name2, name2, 'file', './' + name2),
 
1339
              ]
 
1340
             ),
 
1341
            ((name1, './' + name1),
 
1342
             [(name1 + '/' + name0, name0, 'file', './' + name1
 
1343
               + '/' + name0),
 
1344
              (name1 + '/' + name1, name1, 'directory', './' + name1
 
1345
               + '/' + name1),
 
1346
              ]
 
1347
             ),
 
1348
            ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1349
             [
 
1350
                ]
 
1351
             ),
 
1352
            ]
 
1353
        result = list(osutils.walkdirs('.'))
 
1354
        self._filter_out_stat(result)
 
1355
        self.assertEqual(expected_dirblocks, result)
 
1356
        result = list(osutils.walkdirs(u'./' + name1, name1))
 
1357
        self._filter_out_stat(result)
 
1358
        self.assertEqual(expected_dirblocks[1:], result)
 
1359
 
 
1360
    def test_unicode__walkdirs_utf8(self):
 
1361
        """Walkdirs_utf8 should always return utf8 paths.
 
1362
 
 
1363
        The abspath portion might be in unicode or utf-8
 
1364
        """
 
1365
        self.requireFeature(features.UnicodeFilenameFeature)
 
1366
        name0 = u'0file-\xb6'
 
1367
        name1 = u'1dir-\u062c\u0648'
 
1368
        name2 = u'2file-\u0633'
 
1369
        tree = [
 
1370
            name0,
 
1371
            name1 + '/',
 
1372
            name1 + '/' + name0,
 
1373
            name1 + '/' + name1 + '/',
 
1374
            name2,
 
1375
            ]
 
1376
        self.build_tree(tree)
 
1377
        name0 = name0.encode('utf8')
 
1378
        name1 = name1.encode('utf8')
 
1379
        name2 = name2.encode('utf8')
 
1380
 
 
1381
        expected_dirblocks = [
 
1382
            ((b'', b'.'),
 
1383
             [(name0, name0, 'file', b'./' + name0),
 
1384
              (name1, name1, 'directory', b'./' + name1),
 
1385
              (name2, name2, 'file', b'./' + name2),
 
1386
              ]
 
1387
             ),
 
1388
            ((name1, b'./' + name1),
 
1389
             [(name1 + b'/' + name0, name0, 'file', b'./' + name1
 
1390
               + b'/' + name0),
 
1391
              (name1 + b'/' + name1, name1, 'directory', b'./' + name1
 
1392
               + b'/' + name1),
 
1393
              ]
 
1394
             ),
 
1395
            ((name1 + b'/' + name1, b'./' + name1 + b'/' + name1),
 
1396
             [
 
1397
                ]
 
1398
             ),
 
1399
            ]
 
1400
        result = []
 
1401
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
 
1402
        # all abspaths are Unicode, and encode them back into utf8.
 
1403
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1404
            self.assertIsInstance(dirdetail[0], bytes)
 
1405
            if isinstance(dirdetail[1], str):
 
1406
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
 
1407
                dirblock = [list(info) for info in dirblock]
 
1408
                for info in dirblock:
 
1409
                    self.assertIsInstance(info[4], str)
 
1410
                    info[4] = info[4].encode('utf8')
 
1411
            new_dirblock = []
 
1412
            for info in dirblock:
 
1413
                self.assertIsInstance(info[0], bytes)
 
1414
                self.assertIsInstance(info[1], bytes)
 
1415
                self.assertIsInstance(info[4], bytes)
 
1416
                # Remove the stat information
 
1417
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
1418
            result.append((dirdetail, new_dirblock))
 
1419
        self.assertEqual(expected_dirblocks, result)
 
1420
 
 
1421
    def test__walkdirs_utf8_with_unicode_fs(self):
 
1422
        """UnicodeDirReader should be a safe fallback everywhere
 
1423
 
 
1424
        The abspath portion should be in unicode
 
1425
        """
 
1426
        self.requireFeature(features.UnicodeFilenameFeature)
 
1427
        # Use the unicode reader. TODO: split into driver-and-driven unit
 
1428
        # tests.
 
1429
        self._save_platform_info()
 
1430
        osutils._selected_dir_reader = osutils.UnicodeDirReader()
 
1431
        name0u = u'0file-\xb6'
 
1432
        name1u = u'1dir-\u062c\u0648'
 
1433
        name2u = u'2file-\u0633'
 
1434
        tree = [
 
1435
            name0u,
 
1436
            name1u + '/',
 
1437
            name1u + '/' + name0u,
 
1438
            name1u + '/' + name1u + '/',
 
1439
            name2u,
 
1440
            ]
 
1441
        self.build_tree(tree)
 
1442
        name0 = name0u.encode('utf8')
 
1443
        name1 = name1u.encode('utf8')
 
1444
        name2 = name2u.encode('utf8')
 
1445
 
 
1446
        # All of the abspaths should be in unicode, all of the relative paths
 
1447
        # should be in utf8
 
1448
        expected_dirblocks = [
 
1449
            ((b'', '.'),
 
1450
             [(name0, name0, 'file', './' + name0u),
 
1451
              (name1, name1, 'directory', './' + name1u),
 
1452
              (name2, name2, 'file', './' + name2u),
 
1453
              ]
 
1454
             ),
 
1455
            ((name1, './' + name1u),
 
1456
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
 
1457
               + '/' + name0u),
 
1458
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
 
1459
               + '/' + name1u),
 
1460
              ]
 
1461
             ),
 
1462
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
 
1463
             [
 
1464
                ]
 
1465
             ),
 
1466
            ]
 
1467
        result = list(osutils._walkdirs_utf8('.'))
 
1468
        self._filter_out_stat(result)
 
1469
        self.assertEqual(expected_dirblocks, result)
 
1470
 
 
1471
    def test__walkdirs_utf8_win32readdir(self):
 
1472
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1473
        self.requireFeature(features.UnicodeFilenameFeature)
 
1474
        from .._walkdirs_win32 import Win32ReadDir
 
1475
        self._save_platform_info()
 
1476
        osutils._selected_dir_reader = Win32ReadDir()
 
1477
        name0u = u'0file-\xb6'
 
1478
        name1u = u'1dir-\u062c\u0648'
 
1479
        name2u = u'2file-\u0633'
 
1480
        tree = [
 
1481
            name0u,
 
1482
            name1u + '/',
 
1483
            name1u + '/' + name0u,
 
1484
            name1u + '/' + name1u + '/',
 
1485
            name2u,
 
1486
            ]
 
1487
        self.build_tree(tree)
 
1488
        name0 = name0u.encode('utf8')
 
1489
        name1 = name1u.encode('utf8')
 
1490
        name2 = name2u.encode('utf8')
 
1491
 
 
1492
        # All of the abspaths should be in unicode, all of the relative paths
 
1493
        # should be in utf8
 
1494
        expected_dirblocks = [
 
1495
            (('', '.'),
 
1496
             [(name0, name0, 'file', './' + name0u),
 
1497
              (name1, name1, 'directory', './' + name1u),
 
1498
              (name2, name2, 'file', './' + name2u),
 
1499
              ]
 
1500
             ),
 
1501
            ((name1, './' + name1u),
 
1502
             [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1503
               + '/' + name0u),
 
1504
              (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1505
               + '/' + name1u),
 
1506
              ]
 
1507
             ),
 
1508
            ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1509
             [
 
1510
                ]
 
1511
             ),
 
1512
            ]
 
1513
        result = list(osutils._walkdirs_utf8(u'.'))
 
1514
        self._filter_out_stat(result)
 
1515
        self.assertEqual(expected_dirblocks, result)
 
1516
 
 
1517
    def assertStatIsCorrect(self, path, win32stat):
 
1518
        os_stat = os.stat(path)
 
1519
        self.assertEqual(os_stat.st_size, win32stat.st_size)
 
1520
        self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
 
1521
        self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
 
1522
        self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
 
1523
        self.assertEqual(os_stat.st_dev, win32stat.st_dev)
 
1524
        self.assertEqual(os_stat.st_ino, win32stat.st_ino)
 
1525
        self.assertEqual(os_stat.st_mode, win32stat.st_mode)
 
1526
 
 
1527
    def test__walkdirs_utf_win32_find_file_stat_file(self):
 
1528
        """make sure our Stat values are valid"""
 
1529
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1530
        self.requireFeature(features.UnicodeFilenameFeature)
 
1531
        from .._walkdirs_win32 import Win32ReadDir
 
1532
        name0u = u'0file-\xb6'
 
1533
        name0 = name0u.encode('utf8')
 
1534
        self.build_tree([name0u])
 
1535
        # I hate to sleep() here, but I'm trying to make the ctime different
 
1536
        # from the mtime
 
1537
        time.sleep(2)
 
1538
        with open(name0u, 'ab') as f:
 
1539
            f.write(b'just a small update')
 
1540
 
 
1541
        result = Win32ReadDir().read_dir('', u'.')
 
1542
        entry = result[0]
 
1543
        self.assertEqual((name0, name0, 'file'), entry[:3])
 
1544
        self.assertEqual(u'./' + name0u, entry[4])
 
1545
        self.assertStatIsCorrect(entry[4], entry[3])
 
1546
        self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
 
1547
 
 
1548
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
 
1549
        """make sure our Stat values are valid"""
 
1550
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1551
        self.requireFeature(features.UnicodeFilenameFeature)
 
1552
        from .._walkdirs_win32 import Win32ReadDir
 
1553
        name0u = u'0dir-\u062c\u0648'
 
1554
        name0 = name0u.encode('utf8')
 
1555
        self.build_tree([name0u + '/'])
 
1556
 
 
1557
        result = Win32ReadDir().read_dir('', u'.')
 
1558
        entry = result[0]
 
1559
        self.assertEqual((name0, name0, 'directory'), entry[:3])
 
1560
        self.assertEqual(u'./' + name0u, entry[4])
 
1561
        self.assertStatIsCorrect(entry[4], entry[3])
 
1562
 
 
1563
    def assertPathCompare(self, path_less, path_greater):
 
1564
        """check that path_less and path_greater compare correctly."""
 
1565
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
1566
            path_less, path_less))
 
1567
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
1568
            path_greater, path_greater))
 
1569
        self.assertEqual(-1, osutils.compare_paths_prefix_order(
 
1570
            path_less, path_greater))
 
1571
        self.assertEqual(1, osutils.compare_paths_prefix_order(
 
1572
            path_greater, path_less))
 
1573
 
 
1574
    def test_compare_paths_prefix_order(self):
 
1575
        # root before all else
 
1576
        self.assertPathCompare("/", "/a")
 
1577
        # alpha within a dir
 
1578
        self.assertPathCompare("/a", "/b")
 
1579
        self.assertPathCompare("/b", "/z")
 
1580
        # high dirs before lower.
 
1581
        self.assertPathCompare("/z", "/a/a")
 
1582
        # except if the deeper dir should be output first
 
1583
        self.assertPathCompare("/a/b/c", "/d/g")
 
1584
        # lexical betwen dirs of the same height
 
1585
        self.assertPathCompare("/a/z", "/z/z")
 
1586
        self.assertPathCompare("/a/c/z", "/a/d/e")
 
1587
 
 
1588
        # this should also be consistent for no leading / paths
 
1589
        # root before all else
 
1590
        self.assertPathCompare("", "a")
 
1591
        # alpha within a dir
 
1592
        self.assertPathCompare("a", "b")
 
1593
        self.assertPathCompare("b", "z")
 
1594
        # high dirs before lower.
 
1595
        self.assertPathCompare("z", "a/a")
 
1596
        # except if the deeper dir should be output first
 
1597
        self.assertPathCompare("a/b/c", "d/g")
 
1598
        # lexical betwen dirs of the same height
 
1599
        self.assertPathCompare("a/z", "z/z")
 
1600
        self.assertPathCompare("a/c/z", "a/d/e")
 
1601
 
 
1602
    def test_path_prefix_sorting(self):
 
1603
        """Doing a sort on path prefix should match our sample data."""
 
1604
        original_paths = [
 
1605
            'a',
 
1606
            'a/b',
 
1607
            'a/b/c',
 
1608
            'b',
 
1609
            'b/c',
 
1610
            'd',
 
1611
            'd/e',
 
1612
            'd/e/f',
 
1613
            'd/f',
 
1614
            'd/g',
 
1615
            'g',
 
1616
            ]
 
1617
 
 
1618
        dir_sorted_paths = [
 
1619
            'a',
 
1620
            'b',
 
1621
            'd',
 
1622
            'g',
 
1623
            'a/b',
 
1624
            'a/b/c',
 
1625
            'b/c',
 
1626
            'd/e',
 
1627
            'd/f',
 
1628
            'd/g',
 
1629
            'd/e/f',
 
1630
            ]
 
1631
 
 
1632
        self.assertEqual(
 
1633
            dir_sorted_paths,
 
1634
            sorted(original_paths, key=osutils.path_prefix_key))
 
1635
        # using the comparison routine shoudl work too:
 
1636
        self.assertEqual(
 
1637
            dir_sorted_paths,
 
1638
            sorted(original_paths, key=osutils.path_prefix_key))
 
1639
 
 
1640
 
 
1641
class TestCopyTree(tests.TestCaseInTempDir):
 
1642
 
 
1643
    def test_copy_basic_tree(self):
 
1644
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1645
        osutils.copy_tree('source', 'target')
 
1646
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1647
        self.assertEqual(['c'], os.listdir('target/b'))
 
1648
 
 
1649
    def test_copy_tree_target_exists(self):
 
1650
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
 
1651
                         'target/'])
 
1652
        osutils.copy_tree('source', 'target')
 
1653
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1654
        self.assertEqual(['c'], os.listdir('target/b'))
 
1655
 
 
1656
    def test_copy_tree_symlinks(self):
 
1657
        self.requireFeature(features.SymlinkFeature)
 
1658
        self.build_tree(['source/'])
 
1659
        os.symlink('a/generic/path', 'source/lnk')
 
1660
        osutils.copy_tree('source', 'target')
 
1661
        self.assertEqual(['lnk'], os.listdir('target'))
 
1662
        self.assertEqual('a/generic/path', os.readlink('target/lnk'))
 
1663
 
 
1664
    def test_copy_tree_handlers(self):
 
1665
        processed_files = []
 
1666
        processed_links = []
 
1667
 
 
1668
        def file_handler(from_path, to_path):
 
1669
            processed_files.append(('f', from_path, to_path))
 
1670
 
 
1671
        def dir_handler(from_path, to_path):
 
1672
            processed_files.append(('d', from_path, to_path))
 
1673
 
 
1674
        def link_handler(from_path, to_path):
 
1675
            processed_links.append((from_path, to_path))
 
1676
        handlers = {'file': file_handler,
 
1677
                    'directory': dir_handler,
 
1678
                    'symlink': link_handler,
 
1679
                    }
 
1680
 
 
1681
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1682
        if osutils.has_symlinks():
 
1683
            os.symlink('a/generic/path', 'source/lnk')
 
1684
        osutils.copy_tree('source', 'target', handlers=handlers)
 
1685
 
 
1686
        self.assertEqual([('d', 'source', 'target'),
 
1687
                          ('f', 'source/a', 'target/a'),
 
1688
                          ('d', 'source/b', 'target/b'),
 
1689
                          ('f', 'source/b/c', 'target/b/c'),
 
1690
                          ], processed_files)
 
1691
        self.assertPathDoesNotExist('target')
 
1692
        if osutils.has_symlinks():
 
1693
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
 
1694
 
 
1695
 
 
1696
class TestSetUnsetEnv(tests.TestCase):
 
1697
    """Test updating the environment"""
 
1698
 
 
1699
    def setUp(self):
 
1700
        super(TestSetUnsetEnv, self).setUp()
 
1701
 
 
1702
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
 
1703
                         'Environment was not cleaned up properly.'
 
1704
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
 
1705
 
 
1706
        def cleanup():
 
1707
            if 'BRZ_TEST_ENV_VAR' in os.environ:
 
1708
                del os.environ['BRZ_TEST_ENV_VAR']
 
1709
        self.addCleanup(cleanup)
 
1710
 
 
1711
    def test_set(self):
 
1712
        """Test that we can set an env variable"""
 
1713
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1714
        self.assertEqual(None, old)
 
1715
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1716
 
 
1717
    def test_double_set(self):
 
1718
        """Test that we get the old value out"""
 
1719
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1720
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
 
1721
        self.assertEqual('foo', old)
 
1722
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1723
 
 
1724
    def test_unicode(self):
 
1725
        """Environment can only contain plain strings
 
1726
 
 
1727
        So Unicode strings must be encoded.
 
1728
        """
 
1729
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
1730
        if uni_val is None:
 
1731
            raise tests.TestSkipped(
 
1732
                'Cannot find a unicode character that works in encoding %s'
 
1733
                % (osutils.get_user_encoding(),))
 
1734
 
 
1735
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
 
1736
        self.assertEqual(uni_val, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1737
 
 
1738
    def test_unset(self):
 
1739
        """Test that passing None will remove the env var"""
 
1740
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1741
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
 
1742
        self.assertEqual('foo', old)
 
1743
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1744
        self.assertNotIn('BRZ_TEST_ENV_VAR', os.environ)
 
1745
 
 
1746
 
 
1747
class TestSizeShaFile(tests.TestCaseInTempDir):
 
1748
 
 
1749
    def test_sha_empty(self):
 
1750
        self.build_tree_contents([('foo', b'')])
 
1751
        expected_sha = osutils.sha_string(b'')
 
1752
        f = open('foo')
 
1753
        self.addCleanup(f.close)
 
1754
        size, sha = osutils.size_sha_file(f)
 
1755
        self.assertEqual(0, size)
 
1756
        self.assertEqual(expected_sha, sha)
 
1757
 
 
1758
    def test_sha_mixed_endings(self):
 
1759
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1760
        self.build_tree_contents([('foo', text)])
 
1761
        expected_sha = osutils.sha_string(text)
 
1762
        f = open('foo', 'rb')
 
1763
        self.addCleanup(f.close)
 
1764
        size, sha = osutils.size_sha_file(f)
 
1765
        self.assertEqual(38, size)
 
1766
        self.assertEqual(expected_sha, sha)
 
1767
 
 
1768
 
 
1769
class TestShaFileByName(tests.TestCaseInTempDir):
 
1770
 
 
1771
    def test_sha_empty(self):
 
1772
        self.build_tree_contents([('foo', b'')])
 
1773
        expected_sha = osutils.sha_string(b'')
 
1774
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1775
 
 
1776
    def test_sha_mixed_endings(self):
 
1777
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1778
        self.build_tree_contents([('foo', text)])
 
1779
        expected_sha = osutils.sha_string(text)
 
1780
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1781
 
 
1782
 
 
1783
class TestResourceLoading(tests.TestCaseInTempDir):
 
1784
 
 
1785
    def test_resource_string(self):
 
1786
        # test resource in breezy
 
1787
        text = osutils.resource_string('breezy', 'debug.py')
 
1788
        self.assertContainsRe(text, "debug_flags = set()")
 
1789
        # test resource under breezy
 
1790
        text = osutils.resource_string('breezy.ui', 'text.py')
 
1791
        self.assertContainsRe(text, "class TextUIFactory")
 
1792
        # test unsupported package
 
1793
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
 
1794
                          'yyy.xx')
 
1795
        # test unknown resource
 
1796
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
 
1797
 
 
1798
 
 
1799
class TestDirReader(tests.TestCaseInTempDir):
 
1800
 
 
1801
    scenarios = dir_reader_scenarios()
 
1802
 
 
1803
    # Set by load_tests
 
1804
    _dir_reader_class = None
 
1805
    _native_to_unicode = None
 
1806
 
 
1807
    def setUp(self):
 
1808
        super(TestDirReader, self).setUp()
 
1809
        self.overrideAttr(osutils,
 
1810
                          '_selected_dir_reader', self._dir_reader_class())
 
1811
 
 
1812
    def _get_ascii_tree(self):
 
1813
        tree = [
 
1814
            '0file',
 
1815
            '1dir/',
 
1816
            '1dir/0file',
 
1817
            '1dir/1dir/',
 
1818
            '2file'
 
1819
            ]
 
1820
        expected_dirblocks = [
 
1821
            ((b'', '.'),
 
1822
             [(b'0file', b'0file', 'file', './0file'),
 
1823
              (b'1dir', b'1dir', 'directory', './1dir'),
 
1824
              (b'2file', b'2file', 'file', './2file'),
 
1825
              ]
 
1826
             ),
 
1827
            ((b'1dir', './1dir'),
 
1828
             [(b'1dir/0file', b'0file', 'file', './1dir/0file'),
 
1829
              (b'1dir/1dir', b'1dir', 'directory', './1dir/1dir'),
 
1830
              ]
 
1831
             ),
 
1832
            ((b'1dir/1dir', './1dir/1dir'),
 
1833
             [
 
1834
                ]
 
1835
             ),
 
1836
            ]
 
1837
        return tree, expected_dirblocks
 
1838
 
 
1839
    def test_walk_cur_dir(self):
 
1840
        tree, expected_dirblocks = self._get_ascii_tree()
 
1841
        self.build_tree(tree)
 
1842
        result = list(osutils._walkdirs_utf8('.'))
 
1843
        # Filter out stat and abspath
 
1844
        self.assertEqual(expected_dirblocks,
 
1845
                         self._filter_out(result))
 
1846
 
 
1847
    def test_walk_sub_dir(self):
 
1848
        tree, expected_dirblocks = self._get_ascii_tree()
 
1849
        self.build_tree(tree)
 
1850
        # you can search a subdir only, with a supplied prefix.
 
1851
        result = list(osutils._walkdirs_utf8(b'./1dir', b'1dir'))
 
1852
        # Filter out stat and abspath
 
1853
        self.assertEqual(expected_dirblocks[1:],
 
1854
                         self._filter_out(result))
 
1855
 
 
1856
    def _get_unicode_tree(self):
 
1857
        name0u = u'0file-\xb6'
 
1858
        name1u = u'1dir-\u062c\u0648'
 
1859
        name2u = u'2file-\u0633'
 
1860
        tree = [
 
1861
            name0u,
 
1862
            name1u + '/',
 
1863
            name1u + '/' + name0u,
 
1864
            name1u + '/' + name1u + '/',
 
1865
            name2u,
 
1866
            ]
 
1867
        name0 = name0u.encode('UTF-8')
 
1868
        name1 = name1u.encode('UTF-8')
 
1869
        name2 = name2u.encode('UTF-8')
 
1870
        expected_dirblocks = [
 
1871
            ((b'', '.'),
 
1872
             [(name0, name0, 'file', './' + name0u),
 
1873
              (name1, name1, 'directory', './' + name1u),
 
1874
              (name2, name2, 'file', './' + name2u),
 
1875
              ]
 
1876
             ),
 
1877
            ((name1, './' + name1u),
 
1878
             [(name1 + b'/' + name0, name0, 'file', './' + name1u
 
1879
               + '/' + name0u),
 
1880
              (name1 + b'/' + name1, name1, 'directory', './' + name1u
 
1881
               + '/' + name1u),
 
1882
              ]
 
1883
             ),
 
1884
            ((name1 + b'/' + name1, './' + name1u + '/' + name1u),
 
1885
             [
 
1886
                ]
 
1887
             ),
 
1888
            ]
 
1889
        return tree, expected_dirblocks
 
1890
 
 
1891
    def _filter_out(self, raw_dirblocks):
 
1892
        """Filter out a walkdirs_utf8 result.
 
1893
 
 
1894
        stat field is removed, all native paths are converted to unicode
 
1895
        """
 
1896
        filtered_dirblocks = []
 
1897
        for dirinfo, block in raw_dirblocks:
 
1898
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
 
1899
            details = []
 
1900
            for line in block:
 
1901
                details.append(
 
1902
                    line[0:3] + (self._native_to_unicode(line[4]), ))
 
1903
            filtered_dirblocks.append((dirinfo, details))
 
1904
        return filtered_dirblocks
 
1905
 
 
1906
    def test_walk_unicode_tree(self):
 
1907
        self.requireFeature(features.UnicodeFilenameFeature)
 
1908
        tree, expected_dirblocks = self._get_unicode_tree()
 
1909
        self.build_tree(tree)
 
1910
        result = list(osutils._walkdirs_utf8('.'))
 
1911
        self.assertEqual(expected_dirblocks, self._filter_out(result))
 
1912
 
 
1913
    def test_symlink(self):
 
1914
        self.requireFeature(features.SymlinkFeature)
 
1915
        self.requireFeature(features.UnicodeFilenameFeature)
 
1916
        target = u'target\N{Euro Sign}'
 
1917
        link_name = u'l\N{Euro Sign}nk'
 
1918
        os.symlink(target, link_name)
 
1919
        link_name_utf8 = link_name.encode('UTF-8')
 
1920
        expected_dirblocks = [
 
1921
            ((b'', '.'),
 
1922
             [(link_name_utf8, link_name_utf8,
 
1923
               'symlink', './' + link_name), ],
 
1924
             )]
 
1925
        result = list(osutils._walkdirs_utf8('.'))
 
1926
        self.assertEqual(expected_dirblocks, self._filter_out(result))
 
1927
 
 
1928
 
 
1929
class TestReadLink(tests.TestCaseInTempDir):
 
1930
    """Exposes os.readlink() problems and the osutils solution.
 
1931
 
 
1932
    The only guarantee offered by os.readlink(), starting with 2.6, is that a
 
1933
    unicode string will be returned if a unicode string is passed.
 
1934
 
 
1935
    But prior python versions failed to properly encode the passed unicode
 
1936
    string.
 
1937
    """
 
1938
    _test_needs_features = [features.SymlinkFeature,
 
1939
                            features.UnicodeFilenameFeature]
 
1940
 
 
1941
    def setUp(self):
 
1942
        super(tests.TestCaseInTempDir, self).setUp()
 
1943
        self.link = u'l\N{Euro Sign}ink'
 
1944
        self.target = u'targe\N{Euro Sign}t'
 
1945
        os.symlink(self.target, self.link)
 
1946
 
 
1947
    def test_os_readlink_link_encoding(self):
 
1948
        self.assertEqual(self.target, os.readlink(self.link))
 
1949
 
 
1950
    def test_os_readlink_link_decoding(self):
 
1951
        self.assertEqual(self.target.encode(osutils._fs_enc),
 
1952
                         os.readlink(self.link.encode(osutils._fs_enc)))
 
1953
 
 
1954
 
 
1955
class TestConcurrency(tests.TestCase):
 
1956
 
 
1957
    def setUp(self):
 
1958
        super(TestConcurrency, self).setUp()
 
1959
        self.overrideAttr(osutils, '_cached_local_concurrency')
 
1960
 
 
1961
    def test_local_concurrency(self):
 
1962
        concurrency = osutils.local_concurrency()
 
1963
        self.assertIsInstance(concurrency, int)
 
1964
 
 
1965
    def test_local_concurrency_environment_variable(self):
 
1966
        self.overrideEnv('BRZ_CONCURRENCY', '2')
 
1967
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
 
1968
        self.overrideEnv('BRZ_CONCURRENCY', '3')
 
1969
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
 
1970
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
 
1971
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
 
1972
 
 
1973
    def test_option_concurrency(self):
 
1974
        self.overrideEnv('BRZ_CONCURRENCY', '1')
 
1975
        self.run_bzr('rocks --concurrency 42')
 
1976
        # Command line overrides environment variable
 
1977
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
 
1978
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
 
1979
 
 
1980
 
 
1981
class TestFailedToLoadExtension(tests.TestCase):
 
1982
 
 
1983
    def _try_loading(self):
 
1984
        try:
 
1985
            import breezy._fictional_extension_py  # noqa: F401
 
1986
        except ImportError as e:
 
1987
            osutils.failed_to_load_extension(e)
 
1988
            return True
 
1989
 
 
1990
    def setUp(self):
 
1991
        super(TestFailedToLoadExtension, self).setUp()
 
1992
        self.overrideAttr(osutils, '_extension_load_failures', [])
 
1993
 
 
1994
    def test_failure_to_load(self):
 
1995
        self._try_loading()
 
1996
        self.assertLength(1, osutils._extension_load_failures)
 
1997
        self.assertEqual(
 
1998
            osutils._extension_load_failures[0],
 
1999
            "No module named 'breezy._fictional_extension_py'")
 
2000
 
 
2001
    def test_report_extension_load_failures_no_warning(self):
 
2002
        self.assertTrue(self._try_loading())
 
2003
        warnings, result = self.callCatchWarnings(
 
2004
            osutils.report_extension_load_failures)
 
2005
        # it used to give a Python warning; it no longer does
 
2006
        self.assertLength(0, warnings)
 
2007
 
 
2008
    def test_report_extension_load_failures_message(self):
 
2009
        log = BytesIO()
 
2010
        trace.push_log_file(log)
 
2011
        self.assertTrue(self._try_loading())
 
2012
        osutils.report_extension_load_failures()
 
2013
        self.assertContainsRe(
 
2014
            log.getvalue(),
 
2015
            br"brz: warning: some compiled extensions could not be loaded; "
 
2016
            b"see ``brz help missing-extensions``\n"
 
2017
            )
 
2018
 
 
2019
 
 
2020
class TestTerminalWidth(tests.TestCase):
 
2021
 
 
2022
    def setUp(self):
 
2023
        super(TestTerminalWidth, self).setUp()
 
2024
        self._orig_terminal_size_state = osutils._terminal_size_state
 
2025
        self._orig_first_terminal_size = osutils._first_terminal_size
 
2026
        self.addCleanup(self.restore_osutils_globals)
 
2027
        osutils._terminal_size_state = 'no_data'
 
2028
        osutils._first_terminal_size = None
 
2029
 
 
2030
    def restore_osutils_globals(self):
 
2031
        osutils._terminal_size_state = self._orig_terminal_size_state
 
2032
        osutils._first_terminal_size = self._orig_first_terminal_size
 
2033
 
 
2034
    def replace_stdout(self, new):
 
2035
        self.overrideAttr(sys, 'stdout', new)
 
2036
 
 
2037
    def replace__terminal_size(self, new):
 
2038
        self.overrideAttr(osutils, '_terminal_size', new)
 
2039
 
 
2040
    def set_fake_tty(self):
 
2041
 
 
2042
        class I_am_a_tty(object):
 
2043
            def isatty(self):
 
2044
                return True
 
2045
 
 
2046
        self.replace_stdout(I_am_a_tty())
 
2047
 
 
2048
    def test_default_values(self):
 
2049
        self.assertEqual(80, osutils.default_terminal_width)
 
2050
 
 
2051
    def test_defaults_to_BRZ_COLUMNS(self):
 
2052
        # BRZ_COLUMNS is set by the test framework
 
2053
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
 
2054
        self.overrideEnv('BRZ_COLUMNS', '12')
 
2055
        self.assertEqual(12, osutils.terminal_width())
 
2056
 
 
2057
    def test_BRZ_COLUMNS_0_no_limit(self):
 
2058
        self.overrideEnv('BRZ_COLUMNS', '0')
 
2059
        self.assertEqual(None, osutils.terminal_width())
 
2060
 
 
2061
    def test_falls_back_to_COLUMNS(self):
 
2062
        self.overrideEnv('BRZ_COLUMNS', None)
 
2063
        self.assertNotEqual('42', os.environ['COLUMNS'])
 
2064
        self.set_fake_tty()
 
2065
        self.overrideEnv('COLUMNS', '42')
 
2066
        self.assertEqual(42, osutils.terminal_width())
 
2067
 
 
2068
    def test_tty_default_without_columns(self):
 
2069
        self.overrideEnv('BRZ_COLUMNS', None)
 
2070
        self.overrideEnv('COLUMNS', None)
 
2071
 
 
2072
        def terminal_size(w, h):
 
2073
            return 42, 42
 
2074
 
 
2075
        self.set_fake_tty()
 
2076
        # We need to override the osutils definition as it depends on the
 
2077
        # running environment that we can't control (PQM running without a
 
2078
        # controlling terminal is one example).
 
2079
        self.replace__terminal_size(terminal_size)
 
2080
        self.assertEqual(42, osutils.terminal_width())
 
2081
 
 
2082
    def test_non_tty_default_without_columns(self):
 
2083
        self.overrideEnv('BRZ_COLUMNS', None)
 
2084
        self.overrideEnv('COLUMNS', None)
 
2085
        self.replace_stdout(None)
 
2086
        self.assertEqual(None, osutils.terminal_width())
 
2087
 
 
2088
    def test_no_TIOCGWINSZ(self):
 
2089
        self.requireFeature(term_ios_feature)
 
2090
        termios = term_ios_feature.module
 
2091
        # bug 63539 is about a termios without TIOCGWINSZ attribute
 
2092
        try:
 
2093
            termios.TIOCGWINSZ
 
2094
        except AttributeError:
 
2095
            # We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
 
2096
            pass
 
2097
        else:
 
2098
            self.overrideAttr(termios, 'TIOCGWINSZ')
 
2099
            del termios.TIOCGWINSZ
 
2100
        self.overrideEnv('BRZ_COLUMNS', None)
 
2101
        self.overrideEnv('COLUMNS', None)
 
2102
        # Whatever the result is, if we don't raise an exception, it's ok.
 
2103
        osutils.terminal_width()
 
2104
 
 
2105
 
 
2106
class TestCreationOps(tests.TestCaseInTempDir):
 
2107
    _test_needs_features = [features.chown_feature]
 
2108
 
 
2109
    def setUp(self):
 
2110
        super(TestCreationOps, self).setUp()
 
2111
        self.overrideAttr(os, 'chown', self._dummy_chown)
 
2112
 
 
2113
        # params set by call to _dummy_chown
 
2114
        self.path = self.uid = self.gid = None
 
2115
 
 
2116
    def _dummy_chown(self, path, uid, gid):
 
2117
        self.path, self.uid, self.gid = path, uid, gid
 
2118
 
 
2119
    def test_copy_ownership_from_path(self):
 
2120
        """copy_ownership_from_path test with specified src."""
 
2121
        ownsrc = '/'
 
2122
        open('test_file', 'wt').close()
 
2123
        osutils.copy_ownership_from_path('test_file', ownsrc)
 
2124
 
 
2125
        s = os.stat(ownsrc)
 
2126
        self.assertEqual(self.path, 'test_file')
 
2127
        self.assertEqual(self.uid, s.st_uid)
 
2128
        self.assertEqual(self.gid, s.st_gid)
 
2129
 
 
2130
    def test_copy_ownership_nonesrc(self):
 
2131
        """copy_ownership_from_path test with src=None."""
 
2132
        open('test_file', 'wt').close()
 
2133
        # should use parent dir for permissions
 
2134
        osutils.copy_ownership_from_path('test_file')
 
2135
 
 
2136
        s = os.stat('..')
 
2137
        self.assertEqual(self.path, 'test_file')
 
2138
        self.assertEqual(self.uid, s.st_uid)
 
2139
        self.assertEqual(self.gid, s.st_gid)
 
2140
 
 
2141
 
 
2142
class TestPathFromEnviron(tests.TestCase):
 
2143
 
 
2144
    def test_is_unicode(self):
 
2145
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
 
2146
        path = osutils.path_from_environ('BRZ_TEST_PATH')
 
2147
        self.assertIsInstance(path, str)
 
2148
        self.assertEqual(u'./anywhere at all/', path)
 
2149
 
 
2150
    def test_posix_path_env_ascii(self):
 
2151
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
 
2152
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
 
2153
        self.assertIsInstance(home, str)
 
2154
        self.assertEqual(u'/tmp', home)
 
2155
 
 
2156
    def test_posix_path_env_unicode(self):
 
2157
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2158
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
 
2159
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2160
        self.assertEqual(u'/home/\xa7test',
 
2161
                         osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2162
        osutils._fs_enc = "iso8859-5"
 
2163
        # In Python 3, os.environ returns unicode.
 
2164
        self.assertEqual(u'/home/\xa7test',
 
2165
                         osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2166
 
 
2167
 
 
2168
class TestGetHomeDir(tests.TestCase):
 
2169
 
 
2170
    def test_is_unicode(self):
 
2171
        home = osutils._get_home_dir()
 
2172
        self.assertIsInstance(home, str)
 
2173
 
 
2174
    def test_posix_homeless(self):
 
2175
        self.overrideEnv('HOME', None)
 
2176
        home = osutils._get_home_dir()
 
2177
        self.assertIsInstance(home, str)
 
2178
 
 
2179
    def test_posix_home_ascii(self):
 
2180
        self.overrideEnv('HOME', '/home/test')
 
2181
        home = osutils._posix_get_home_dir()
 
2182
        self.assertIsInstance(home, str)
 
2183
        self.assertEqual(u'/home/test', home)
 
2184
 
 
2185
    def test_posix_home_unicode(self):
 
2186
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2187
        self.overrideEnv('HOME', '/home/\xa7test')
 
2188
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2189
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2190
        osutils._fs_enc = "iso8859-5"
 
2191
        # In python 3, os.environ returns unicode
 
2192
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2193
 
 
2194
 
 
2195
class TestGetuserUnicode(tests.TestCase):
 
2196
 
 
2197
    def test_is_unicode(self):
 
2198
        user = osutils.getuser_unicode()
 
2199
        self.assertIsInstance(user, str)
 
2200
 
 
2201
    def envvar_to_override(self):
 
2202
        if sys.platform == "win32":
 
2203
            # Disable use of platform calls on windows so envvar is used
 
2204
            self.overrideAttr(win32utils, 'has_ctypes', False)
 
2205
            return 'USERNAME'  # only variable used on windows
 
2206
        return 'LOGNAME'  # first variable checked by getpass.getuser()
 
2207
 
 
2208
    def test_ascii_user(self):
 
2209
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
 
2210
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
 
2211
 
 
2212
    def test_unicode_user(self):
 
2213
        ue = osutils.get_user_encoding()
 
2214
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
2215
        if uni_val is None:
 
2216
            raise tests.TestSkipped(
 
2217
                'Cannot find a unicode character that works in encoding %s'
 
2218
                % (osutils.get_user_encoding(),))
 
2219
        uni_username = u'jrandom' + uni_val
 
2220
        encoded_username = uni_username.encode(ue)
 
2221
        self.overrideEnv(self.envvar_to_override(), uni_username)
 
2222
        self.assertEqual(uni_username, osutils.getuser_unicode())
 
2223
 
 
2224
 
 
2225
class TestBackupNames(tests.TestCase):
 
2226
 
 
2227
    def setUp(self):
 
2228
        super(TestBackupNames, self).setUp()
 
2229
        self.backups = []
 
2230
 
 
2231
    def backup_exists(self, name):
 
2232
        return name in self.backups
 
2233
 
 
2234
    def available_backup_name(self, name):
 
2235
        backup_name = osutils.available_backup_name(name, self.backup_exists)
 
2236
        self.backups.append(backup_name)
 
2237
        return backup_name
 
2238
 
 
2239
    def assertBackupName(self, expected, name):
 
2240
        self.assertEqual(expected, self.available_backup_name(name))
 
2241
 
 
2242
    def test_empty(self):
 
2243
        self.assertBackupName('file.~1~', 'file')
 
2244
 
 
2245
    def test_existing(self):
 
2246
        self.available_backup_name('file')
 
2247
        self.available_backup_name('file')
 
2248
        self.assertBackupName('file.~3~', 'file')
 
2249
        # Empty slots are found, this is not a strict requirement and may be
 
2250
        # revisited if we test against all implementations.
 
2251
        self.backups.remove('file.~2~')
 
2252
        self.assertBackupName('file.~2~', 'file')
 
2253
 
 
2254
 
 
2255
class TestFindExecutableInPath(tests.TestCase):
 
2256
 
 
2257
    def test_windows(self):
 
2258
        if sys.platform != 'win32':
 
2259
            raise tests.TestSkipped('test requires win32')
 
2260
        self.assertTrue(osutils.find_executable_on_path(
 
2261
            'explorer') is not None)
 
2262
        self.assertTrue(
 
2263
            osutils.find_executable_on_path('explorer.exe') is not None)
 
2264
        self.assertTrue(
 
2265
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
 
2266
        self.assertTrue(
 
2267
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2268
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
 
2269
 
 
2270
    def test_windows_app_path(self):
 
2271
        if sys.platform != 'win32':
 
2272
            raise tests.TestSkipped('test requires win32')
 
2273
        # Override PATH env var so that exe can only be found on App Path
 
2274
        self.overrideEnv('PATH', '')
 
2275
        # Internt Explorer is always registered in the App Path
 
2276
        self.assertTrue(osutils.find_executable_on_path(
 
2277
            'iexplore') is not None)
 
2278
 
 
2279
    def test_other(self):
 
2280
        if sys.platform == 'win32':
 
2281
            raise tests.TestSkipped('test requires non-win32')
 
2282
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
 
2283
        self.assertTrue(
 
2284
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2285
 
 
2286
 
 
2287
class TestEnvironmentErrors(tests.TestCase):
 
2288
    """Test handling of environmental errors"""
 
2289
 
 
2290
    def test_is_oserror(self):
 
2291
        self.assertTrue(osutils.is_environment_error(
 
2292
            OSError(errno.EINVAL, "Invalid parameter")))
 
2293
 
 
2294
    def test_is_ioerror(self):
 
2295
        self.assertTrue(osutils.is_environment_error(
 
2296
            IOError(errno.EINVAL, "Invalid parameter")))
 
2297
 
 
2298
    def test_is_socket_error(self):
 
2299
        self.assertTrue(osutils.is_environment_error(
 
2300
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2301
 
 
2302
    def test_is_select_error(self):
 
2303
        self.assertTrue(osutils.is_environment_error(
 
2304
            select.error(errno.EINVAL, "Invalid parameter")))
 
2305
 
 
2306
    def test_is_pywintypes_error(self):
 
2307
        self.requireFeature(features.pywintypes)
 
2308
        import pywintypes
 
2309
        self.assertTrue(osutils.is_environment_error(
 
2310
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))
 
2311
 
 
2312
 
 
2313
class SupportsExecutableTests(tests.TestCaseInTempDir):
 
2314
 
 
2315
    def test_returns_bool(self):
 
2316
        self.assertIsInstance(osutils.supports_executable(self.test_dir), bool)
 
2317
 
 
2318
 
 
2319
class SupportsSymlinksTests(tests.TestCaseInTempDir):
 
2320
 
 
2321
    def test_returns_bool(self):
 
2322
        self.assertIsInstance(osutils.supports_symlinks(self.test_dir), bool)
 
2323
 
 
2324
 
 
2325
class MtabReader(tests.TestCaseInTempDir):
 
2326
 
 
2327
    def test_read_mtab(self):
 
2328
        self.build_tree_contents([('mtab', """\
 
2329
/dev/mapper/blah--vg-root / ext4 rw,relatime,errors=remount-ro 0 0
 
2330
/dev/mapper/blah--vg-home /home vfat rw,relatime 0 0
 
2331
# comment
 
2332
 
 
2333
iminvalid
 
2334
""")])
 
2335
        self.assertEqual(
 
2336
            list(osutils.read_mtab('mtab')),
 
2337
            [(b'/', 'ext4'),
 
2338
             (b'/home', 'vfat')])
 
2339
 
 
2340
 
 
2341
class GetFsTypeTests(tests.TestCaseInTempDir):
 
2342
 
 
2343
    def test_returns_string_or_none(self):
 
2344
        ret = osutils.get_fs_type(self.test_dir)
 
2345
        self.assertTrue(isinstance(ret, str) or ret is None)
 
2346
 
 
2347
    def test_returns_most_specific(self):
 
2348
        self.overrideAttr(
 
2349
            osutils, '_FILESYSTEM_FINDER',
 
2350
            osutils.FilesystemFinder(
 
2351
                [(b'/', 'ext4'), (b'/home', 'vfat'),
 
2352
                 (b'/home/jelmer', 'ext2')]))
 
2353
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer/blah'), 'ext2')
 
2354
        self.assertEqual(osutils.get_fs_type('/home/jelmer/blah'), 'ext2')
 
2355
        self.assertEqual(osutils.get_fs_type(b'/home/jelmer'), 'ext2')
 
2356
        self.assertEqual(osutils.get_fs_type(b'/home/martin'), 'vfat')
 
2357
        self.assertEqual(osutils.get_fs_type(b'/home'), 'vfat')
 
2358
        self.assertEqual(osutils.get_fs_type(b'/other'), 'ext4')
 
2359
 
 
2360
    def test_returns_none(self):
 
2361
        self.overrideAttr(
 
2362
            osutils, '_FILESYSTEM_FINDER',
 
2363
            osutils.FilesystemFinder([]))
 
2364
        self.assertIs(osutils.get_fs_type('/home/jelmer/blah'), None)
 
2365
        self.assertIs(osutils.get_fs_type(b'/home/jelmer/blah'), None)
 
2366
        self.assertIs(osutils.get_fs_type('/home/jelmer'), None)