/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_osutils.py

  • Committer: Jelmer Vernooij
  • Date: 2018-05-19 13:16:11 UTC
  • mto: (6968.4.3 git-archive)
  • mto: This revision was merged to the branch mainline in revision 6972.
  • Revision ID: jelmer@jelmer.uk-20180519131611-l9h9ud41j7qg1m03
Move tar/zip to breezy.archive.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the osutils wrapper."""
 
18
 
 
19
from __future__ import absolute_import, division
 
20
 
 
21
import errno
 
22
import os
 
23
import re
 
24
import select
 
25
import socket
 
26
import sys
 
27
import tempfile
 
28
import time
 
29
 
 
30
from .. import (
 
31
    errors,
 
32
    lazy_regex,
 
33
    osutils,
 
34
    symbol_versioning,
 
35
    tests,
 
36
    trace,
 
37
    win32utils,
 
38
    )
 
39
from ..sixish import (
 
40
    BytesIO,
 
41
    )
 
42
from . import (
 
43
    features,
 
44
    file_utils,
 
45
    test__walkdirs_win32,
 
46
    )
 
47
from .scenarios import load_tests_apply_scenarios
 
48
 
 
49
 
 
50
class _UTF8DirReaderFeature(features.ModuleAvailableFeature):
 
51
 
 
52
    def _probe(self):
 
53
        try:
 
54
            from .. import _readdir_pyx
 
55
            self._module = _readdir_pyx
 
56
            self.reader = _readdir_pyx.UTF8DirReader
 
57
            return True
 
58
        except ImportError:
 
59
            return False
 
60
 
 
61
UTF8DirReaderFeature = _UTF8DirReaderFeature('breezy._readdir_pyx')
 
62
 
 
63
term_ios_feature = features.ModuleAvailableFeature('termios')
 
64
 
 
65
 
 
66
def _already_unicode(s):
 
67
    return s
 
68
 
 
69
 
 
70
def _utf8_to_unicode(s):
 
71
    return s.decode('UTF-8')
 
72
 
 
73
 
 
74
def dir_reader_scenarios():
 
75
    # For each dir reader we define:
 
76
 
 
77
    # - native_to_unicode: a function converting the native_abspath as returned
 
78
    #   by DirReader.read_dir to its unicode representation
 
79
 
 
80
    # UnicodeDirReader is the fallback, it should be tested on all platforms.
 
81
    scenarios = [('unicode',
 
82
                  dict(_dir_reader_class=osutils.UnicodeDirReader,
 
83
                       _native_to_unicode=_already_unicode))]
 
84
    # Some DirReaders are platform specific and even there they may not be
 
85
    # available.
 
86
    if UTF8DirReaderFeature.available():
 
87
        from .. import _readdir_pyx
 
88
        scenarios.append(('utf8',
 
89
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
 
90
                               _native_to_unicode=_utf8_to_unicode)))
 
91
 
 
92
    if test__walkdirs_win32.win32_readdir_feature.available():
 
93
        try:
 
94
            from .. import _walkdirs_win32
 
95
            scenarios.append(
 
96
                ('win32',
 
97
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
 
98
                      _native_to_unicode=_already_unicode)))
 
99
        except ImportError:
 
100
            pass
 
101
    return scenarios
 
102
 
 
103
 
 
104
load_tests = load_tests_apply_scenarios
 
105
 
 
106
 
 
107
class TestContainsWhitespace(tests.TestCase):
 
108
 
 
109
    def test_contains_whitespace(self):
 
110
        self.assertTrue(osutils.contains_whitespace(u' '))
 
111
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
 
112
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
 
113
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
 
114
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
 
115
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
 
116
 
 
117
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
 
118
        # is whitespace, but we do not.
 
119
        self.assertFalse(osutils.contains_whitespace(u''))
 
120
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
 
121
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
 
122
 
 
123
 
 
124
class TestRename(tests.TestCaseInTempDir):
 
125
 
 
126
    def create_file(self, filename, content):
 
127
        f = open(filename, 'wb')
 
128
        try:
 
129
            f.write(content)
 
130
        finally:
 
131
            f.close()
 
132
 
 
133
    def _fancy_rename(self, a, b):
 
134
        osutils.fancy_rename(a, b, rename_func=os.rename,
 
135
                             unlink_func=os.unlink)
 
136
 
 
137
    def test_fancy_rename(self):
 
138
        # This should work everywhere
 
139
        self.create_file('a', 'something in a\n')
 
140
        self._fancy_rename('a', 'b')
 
141
        self.assertPathDoesNotExist('a')
 
142
        self.assertPathExists('b')
 
143
        self.check_file_contents('b', 'something in a\n')
 
144
 
 
145
        self.create_file('a', 'new something in a\n')
 
146
        self._fancy_rename('b', 'a')
 
147
 
 
148
        self.check_file_contents('a', 'something in a\n')
 
149
 
 
150
    def test_fancy_rename_fails_source_missing(self):
 
151
        # An exception should be raised, and the target should be left in place
 
152
        self.create_file('target', 'data in target\n')
 
153
        self.assertRaises((IOError, OSError), self._fancy_rename,
 
154
                          'missingsource', 'target')
 
155
        self.assertPathExists('target')
 
156
        self.check_file_contents('target', 'data in target\n')
 
157
 
 
158
    def test_fancy_rename_fails_if_source_and_target_missing(self):
 
159
        self.assertRaises((IOError, OSError), self._fancy_rename,
 
160
                          'missingsource', 'missingtarget')
 
161
 
 
162
    def test_rename(self):
 
163
        # Rename should be semi-atomic on all platforms
 
164
        self.create_file('a', 'something in a\n')
 
165
        osutils.rename('a', 'b')
 
166
        self.assertPathDoesNotExist('a')
 
167
        self.assertPathExists('b')
 
168
        self.check_file_contents('b', 'something in a\n')
 
169
 
 
170
        self.create_file('a', 'new something in a\n')
 
171
        osutils.rename('b', 'a')
 
172
 
 
173
        self.check_file_contents('a', 'something in a\n')
 
174
 
 
175
    # TODO: test fancy_rename using a MemoryTransport
 
176
 
 
177
    def test_rename_change_case(self):
 
178
        # on Windows we should be able to change filename case by rename
 
179
        self.build_tree(['a', 'b/'])
 
180
        osutils.rename('a', 'A')
 
181
        osutils.rename('b', 'B')
 
182
        # we can't use failUnlessExists on case-insensitive filesystem
 
183
        # so try to check shape of the tree
 
184
        shape = sorted(os.listdir('.'))
 
185
        self.assertEqual(['A', 'B'], shape)
 
186
 
 
187
    def test_rename_exception(self):
 
188
        try:
 
189
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
 
190
        except OSError as e:
 
191
            self.assertEqual(e.old_filename, 'nonexistent_path')
 
192
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
 
193
            self.assertTrue('nonexistent_path' in e.strerror)
 
194
            self.assertTrue('different_nonexistent_path' in e.strerror)
 
195
 
 
196
 
 
197
class TestRandChars(tests.TestCase):
 
198
 
 
199
    def test_01_rand_chars_empty(self):
 
200
        result = osutils.rand_chars(0)
 
201
        self.assertEqual(result, '')
 
202
 
 
203
    def test_02_rand_chars_100(self):
 
204
        result = osutils.rand_chars(100)
 
205
        self.assertEqual(len(result), 100)
 
206
        self.assertEqual(type(result), str)
 
207
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
 
208
 
 
209
 
 
210
class TestIsInside(tests.TestCase):
 
211
 
 
212
    def test_is_inside(self):
 
213
        is_inside = osutils.is_inside
 
214
        self.assertTrue(is_inside('src', 'src/foo.c'))
 
215
        self.assertFalse(is_inside('src', 'srccontrol'))
 
216
        self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
 
217
        self.assertTrue(is_inside('foo.c', 'foo.c'))
 
218
        self.assertFalse(is_inside('foo.c', ''))
 
219
        self.assertTrue(is_inside('', 'foo.c'))
 
220
 
 
221
    def test_is_inside_any(self):
 
222
        SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
 
223
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
 
224
                         (['src'], SRC_FOO_C),
 
225
                         (['src'], 'src'),
 
226
                         ]:
 
227
            self.assertTrue(osutils.is_inside_any(dirs, fn))
 
228
        for dirs, fn in [(['src'], 'srccontrol'),
 
229
                         (['src'], 'srccontrol/foo')]:
 
230
            self.assertFalse(osutils.is_inside_any(dirs, fn))
 
231
 
 
232
    def test_is_inside_or_parent_of_any(self):
 
233
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
 
234
                         (['src'], 'src/foo.c'),
 
235
                         (['src/bar.c'], 'src'),
 
236
                         (['src/bar.c', 'bla/foo.c'], 'src'),
 
237
                         (['src'], 'src'),
 
238
                         ]:
 
239
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
 
240
 
 
241
        for dirs, fn in [(['src'], 'srccontrol'),
 
242
                         (['srccontrol/foo.c'], 'src'),
 
243
                         (['src'], 'srccontrol/foo')]:
 
244
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
 
245
 
 
246
 
 
247
class TestLstat(tests.TestCaseInTempDir):
 
248
 
 
249
    def test_lstat_matches_fstat(self):
 
250
        # On Windows, lstat and fstat don't always agree, primarily in the
 
251
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
 
252
        # custom implementation.
 
253
        if sys.platform == 'win32':
 
254
            # We only have special lstat/fstat if we have the extension.
 
255
            # Without it, we may end up re-reading content when we don't have
 
256
            # to, but otherwise it doesn't effect correctness.
 
257
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
258
        f = open('test-file.txt', 'wb')
 
259
        self.addCleanup(f.close)
 
260
        f.write('some content\n')
 
261
        f.flush()
 
262
        self.assertEqualStat(osutils.fstat(f.fileno()),
 
263
                             osutils.lstat('test-file.txt'))
 
264
 
 
265
 
 
266
class TestRmTree(tests.TestCaseInTempDir):
 
267
 
 
268
    def test_rmtree(self):
 
269
        # Check to remove tree with read-only files/dirs
 
270
        os.mkdir('dir')
 
271
        f = file('dir/file', 'w')
 
272
        f.write('spam')
 
273
        f.close()
 
274
        # would like to also try making the directory readonly, but at the
 
275
        # moment python shutil.rmtree doesn't handle that properly - it would
 
276
        # need to chmod the directory before removing things inside it - deferred
 
277
        # for now -- mbp 20060505
 
278
        # osutils.make_readonly('dir')
 
279
        osutils.make_readonly('dir/file')
 
280
 
 
281
        osutils.rmtree('dir')
 
282
 
 
283
        self.assertPathDoesNotExist('dir/file')
 
284
        self.assertPathDoesNotExist('dir')
 
285
 
 
286
 
 
287
class TestDeleteAny(tests.TestCaseInTempDir):
 
288
 
 
289
    def test_delete_any_readonly(self):
 
290
        # from <https://bugs.launchpad.net/bzr/+bug/218206>
 
291
        self.build_tree(['d/', 'f'])
 
292
        osutils.make_readonly('d')
 
293
        osutils.make_readonly('f')
 
294
 
 
295
        osutils.delete_any('f')
 
296
        osutils.delete_any('d')
 
297
 
 
298
 
 
299
class TestKind(tests.TestCaseInTempDir):
 
300
 
 
301
    def test_file_kind(self):
 
302
        self.build_tree(['file', 'dir/'])
 
303
        self.assertEqual('file', osutils.file_kind('file'))
 
304
        self.assertEqual('directory', osutils.file_kind('dir/'))
 
305
        if osutils.has_symlinks():
 
306
            os.symlink('symlink', 'symlink')
 
307
            self.assertEqual('symlink', osutils.file_kind('symlink'))
 
308
 
 
309
        # TODO: jam 20060529 Test a block device
 
310
        try:
 
311
            os.lstat('/dev/null')
 
312
        except OSError as e:
 
313
            if e.errno not in (errno.ENOENT,):
 
314
                raise
 
315
        else:
 
316
            self.assertEqual('chardev', osutils.file_kind('/dev/null'))
 
317
 
 
318
        mkfifo = getattr(os, 'mkfifo', None)
 
319
        if mkfifo:
 
320
            mkfifo('fifo')
 
321
            try:
 
322
                self.assertEqual('fifo', osutils.file_kind('fifo'))
 
323
            finally:
 
324
                os.remove('fifo')
 
325
 
 
326
        AF_UNIX = getattr(socket, 'AF_UNIX', None)
 
327
        if AF_UNIX:
 
328
            s = socket.socket(AF_UNIX)
 
329
            s.bind('socket')
 
330
            try:
 
331
                self.assertEqual('socket', osutils.file_kind('socket'))
 
332
            finally:
 
333
                os.remove('socket')
 
334
 
 
335
    def test_kind_marker(self):
 
336
        self.assertEqual("", osutils.kind_marker("file"))
 
337
        self.assertEqual("/", osutils.kind_marker('directory'))
 
338
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
 
339
        self.assertEqual("@", osutils.kind_marker("symlink"))
 
340
        self.assertEqual("+", osutils.kind_marker("tree-reference"))
 
341
        self.assertEqual("", osutils.kind_marker("fifo"))
 
342
        self.assertEqual("", osutils.kind_marker("socket"))
 
343
        self.assertEqual("", osutils.kind_marker("unknown"))
 
344
 
 
345
 
 
346
class TestUmask(tests.TestCaseInTempDir):
 
347
 
 
348
    def test_get_umask(self):
 
349
        if sys.platform == 'win32':
 
350
            # umask always returns '0', no way to set it
 
351
            self.assertEqual(0, osutils.get_umask())
 
352
            return
 
353
 
 
354
        orig_umask = osutils.get_umask()
 
355
        self.addCleanup(os.umask, orig_umask)
 
356
        os.umask(0o222)
 
357
        self.assertEqual(0o222, osutils.get_umask())
 
358
        os.umask(0o022)
 
359
        self.assertEqual(0o022, osutils.get_umask())
 
360
        os.umask(0o002)
 
361
        self.assertEqual(0o002, osutils.get_umask())
 
362
        os.umask(0o027)
 
363
        self.assertEqual(0o027, osutils.get_umask())
 
364
 
 
365
 
 
366
class TestDateTime(tests.TestCase):
 
367
 
 
368
    def assertFormatedDelta(self, expected, seconds):
 
369
        """Assert osutils.format_delta formats as expected"""
 
370
        actual = osutils.format_delta(seconds)
 
371
        self.assertEqual(expected, actual)
 
372
 
 
373
    def test_format_delta(self):
 
374
        self.assertFormatedDelta('0 seconds ago', 0)
 
375
        self.assertFormatedDelta('1 second ago', 1)
 
376
        self.assertFormatedDelta('10 seconds ago', 10)
 
377
        self.assertFormatedDelta('59 seconds ago', 59)
 
378
        self.assertFormatedDelta('89 seconds ago', 89)
 
379
        self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
 
380
        self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
 
381
        self.assertFormatedDelta('3 minutes, 1 second ago', 181)
 
382
        self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
 
383
        self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
 
384
        self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
 
385
        self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
 
386
        self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
 
387
        self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
 
388
        self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
 
389
        self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
 
390
        self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
 
391
        self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
 
392
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
 
393
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
 
394
        self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
 
395
        self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
 
396
        self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
 
397
 
 
398
        # We handle when time steps the wrong direction because computers
 
399
        # don't have synchronized clocks.
 
400
        self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
 
401
        self.assertFormatedDelta('1 second in the future', -1)
 
402
        self.assertFormatedDelta('2 seconds in the future', -2)
 
403
 
 
404
    def test_format_date(self):
 
405
        self.assertRaises(osutils.UnsupportedTimezoneFormat,
 
406
            osutils.format_date, 0, timezone='foo')
 
407
        self.assertIsInstance(osutils.format_date(0), str)
 
408
        self.assertIsInstance(osutils.format_local_date(0), unicode)
 
409
        # Testing for the actual value of the local weekday without
 
410
        # duplicating the code from format_date is difficult.
 
411
        # Instead blackbox.test_locale should check for localized
 
412
        # dates once they do occur in output strings.
 
413
 
 
414
    def test_format_date_with_offset_in_original_timezone(self):
 
415
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
 
416
            osutils.format_date_with_offset_in_original_timezone(0))
 
417
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
 
418
            osutils.format_date_with_offset_in_original_timezone(100000))
 
419
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
 
420
            osutils.format_date_with_offset_in_original_timezone(100000, 7200))
 
421
 
 
422
    def test_local_time_offset(self):
 
423
        """Test that local_time_offset() returns a sane value."""
 
424
        offset = osutils.local_time_offset()
 
425
        self.assertTrue(isinstance(offset, int))
 
426
        # Test that the offset is no more than a eighteen hours in
 
427
        # either direction.
 
428
        # Time zone handling is system specific, so it is difficult to
 
429
        # do more specific tests, but a value outside of this range is
 
430
        # probably wrong.
 
431
        eighteen_hours = 18 * 3600
 
432
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
433
 
 
434
    def test_local_time_offset_with_timestamp(self):
 
435
        """Test that local_time_offset() works with a timestamp."""
 
436
        offset = osutils.local_time_offset(1000000000.1234567)
 
437
        self.assertTrue(isinstance(offset, int))
 
438
        eighteen_hours = 18 * 3600
 
439
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
440
 
 
441
 
 
442
class TestFdatasync(tests.TestCaseInTempDir):
 
443
 
 
444
    def do_fdatasync(self):
 
445
        f = tempfile.NamedTemporaryFile()
 
446
        osutils.fdatasync(f.fileno())
 
447
        f.close()
 
448
 
 
449
    @staticmethod
 
450
    def raise_eopnotsupp(*args, **kwargs):
 
451
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
 
452
 
 
453
    @staticmethod
 
454
    def raise_enotsup(*args, **kwargs):
 
455
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
 
456
 
 
457
    def test_fdatasync_handles_system_function(self):
 
458
        self.overrideAttr(os, "fdatasync")
 
459
        self.do_fdatasync()
 
460
 
 
461
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
 
462
        self.overrideAttr(os, "fdatasync")
 
463
        self.overrideAttr(os, "fsync")
 
464
        self.do_fdatasync()
 
465
 
 
466
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
 
467
        self.overrideAttr(errno, "EOPNOTSUPP")
 
468
        self.do_fdatasync()
 
469
 
 
470
    def test_fdatasync_catches_ENOTSUP(self):
 
471
        enotsup = getattr(errno, "ENOTSUP", None)
 
472
        if enotsup is None:
 
473
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
 
474
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
 
475
        self.do_fdatasync()
 
476
 
 
477
    def test_fdatasync_catches_EOPNOTSUPP(self):
 
478
        enotsup = getattr(errno, "EOPNOTSUPP", None)
 
479
        if enotsup is None:
 
480
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
 
481
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
 
482
        self.do_fdatasync()
 
483
 
 
484
 
 
485
class TestLinks(tests.TestCaseInTempDir):
 
486
 
 
487
    def test_dereference_path(self):
 
488
        self.requireFeature(features.SymlinkFeature)
 
489
        cwd = osutils.realpath('.')
 
490
        os.mkdir('bar')
 
491
        bar_path = osutils.pathjoin(cwd, 'bar')
 
492
        # Using './' to avoid bug #1213894 (first path component not
 
493
        # dereferenced) in Python 2.4.1 and earlier
 
494
        self.assertEqual(bar_path, osutils.realpath('./bar'))
 
495
        os.symlink('bar', 'foo')
 
496
        self.assertEqual(bar_path, osutils.realpath('./foo'))
 
497
 
 
498
        # Does not dereference terminal symlinks
 
499
        foo_path = osutils.pathjoin(cwd, 'foo')
 
500
        self.assertEqual(foo_path, osutils.dereference_path('./foo'))
 
501
 
 
502
        # Dereferences parent symlinks
 
503
        os.mkdir('bar/baz')
 
504
        baz_path = osutils.pathjoin(bar_path, 'baz')
 
505
        self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
 
506
 
 
507
        # Dereferences parent symlinks that are the first path element
 
508
        self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
 
509
 
 
510
        # Dereferences parent symlinks in absolute paths
 
511
        foo_baz_path = osutils.pathjoin(foo_path, 'baz')
 
512
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
 
513
 
 
514
    def test_changing_access(self):
 
515
        f = file('file', 'w')
 
516
        f.write('monkey')
 
517
        f.close()
 
518
 
 
519
        # Make a file readonly
 
520
        osutils.make_readonly('file')
 
521
        mode = os.lstat('file').st_mode
 
522
        self.assertEqual(mode, mode & 0o777555)
 
523
 
 
524
        # Make a file writable
 
525
        osutils.make_writable('file')
 
526
        mode = os.lstat('file').st_mode
 
527
        self.assertEqual(mode, mode | 0o200)
 
528
 
 
529
        if osutils.has_symlinks():
 
530
            # should not error when handed a symlink
 
531
            os.symlink('nonexistent', 'dangling')
 
532
            osutils.make_readonly('dangling')
 
533
            osutils.make_writable('dangling')
 
534
 
 
535
    def test_host_os_dereferences_symlinks(self):
 
536
        osutils.host_os_dereferences_symlinks()
 
537
 
 
538
 
 
539
class TestCanonicalRelPath(tests.TestCaseInTempDir):
 
540
 
 
541
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
 
542
 
 
543
    def test_canonical_relpath_simple(self):
 
544
        f = file('MixedCaseName', 'w')
 
545
        f.close()
 
546
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
 
547
        self.assertEqual('work/MixedCaseName', actual)
 
548
 
 
549
    def test_canonical_relpath_missing_tail(self):
 
550
        os.mkdir('MixedCaseParent')
 
551
        actual = osutils.canonical_relpath(self.test_base_dir,
 
552
                                           'mixedcaseparent/nochild')
 
553
        self.assertEqual('work/MixedCaseParent/nochild', actual)
 
554
 
 
555
 
 
556
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
 
557
 
 
558
    def assertRelpath(self, expected, base, path):
 
559
        actual = osutils._cicp_canonical_relpath(base, path)
 
560
        self.assertEqual(expected, actual)
 
561
 
 
562
    def test_simple(self):
 
563
        self.build_tree(['MixedCaseName'])
 
564
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
 
565
        self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
 
566
 
 
567
    def test_subdir_missing_tail(self):
 
568
        self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
 
569
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
 
570
        self.assertRelpath('MixedCaseParent/a_child', base,
 
571
                           'MixedCaseParent/a_child')
 
572
        self.assertRelpath('MixedCaseParent/a_child', base,
 
573
                           'MixedCaseParent/A_Child')
 
574
        self.assertRelpath('MixedCaseParent/not_child', base,
 
575
                           'MixedCaseParent/not_child')
 
576
 
 
577
    def test_at_root_slash(self):
 
578
        # We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
 
579
        # check...
 
580
        if osutils.MIN_ABS_PATHLENGTH > 1:
 
581
            raise tests.TestSkipped('relpath requires %d chars'
 
582
                                    % osutils.MIN_ABS_PATHLENGTH)
 
583
        self.assertRelpath('foo', '/', '/foo')
 
584
 
 
585
    def test_at_root_drive(self):
 
586
        if sys.platform != 'win32':
 
587
            raise tests.TestNotApplicable('we can only test drive-letter relative'
 
588
                                          ' paths on Windows where we have drive'
 
589
                                          ' letters.')
 
590
        # see bug #322807
 
591
        # The specific issue is that when at the root of a drive, 'abspath'
 
592
        # returns "C:/" or just "/". However, the code assumes that abspath
 
593
        # always returns something like "C:/foo" or "/foo" (no trailing slash).
 
594
        self.assertRelpath('foo', 'C:/', 'C:/foo')
 
595
        self.assertRelpath('foo', 'X:/', 'X:/foo')
 
596
        self.assertRelpath('foo', 'X:/', 'X://foo')
 
597
 
 
598
 
 
599
class TestPumpFile(tests.TestCase):
 
600
    """Test pumpfile method."""
 
601
 
 
602
    def setUp(self):
 
603
        super(TestPumpFile, self).setUp()
 
604
        # create a test datablock
 
605
        self.block_size = 512
 
606
        pattern = b'0123456789ABCDEF'
 
607
        self.test_data = pattern * (3 * self.block_size // len(pattern))
 
608
        self.test_data_len = len(self.test_data)
 
609
 
 
610
    def test_bracket_block_size(self):
 
611
        """Read data in blocks with the requested read size bracketing the
 
612
        block size."""
 
613
        # make sure test data is larger than max read size
 
614
        self.assertTrue(self.test_data_len > self.block_size)
 
615
 
 
616
        from_file = file_utils.FakeReadFile(self.test_data)
 
617
        to_file = BytesIO()
 
618
 
 
619
        # read (max // 2) bytes and verify read size wasn't affected
 
620
        num_bytes_to_read = self.block_size // 2
 
621
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
622
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
623
        self.assertEqual(from_file.get_read_count(), 1)
 
624
 
 
625
        # read (max) bytes and verify read size wasn't affected
 
626
        num_bytes_to_read = self.block_size
 
627
        from_file.reset_read_count()
 
628
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
629
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
630
        self.assertEqual(from_file.get_read_count(), 1)
 
631
 
 
632
        # read (max + 1) bytes and verify read size was limited
 
633
        num_bytes_to_read = self.block_size + 1
 
634
        from_file.reset_read_count()
 
635
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
636
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
637
        self.assertEqual(from_file.get_read_count(), 2)
 
638
 
 
639
        # finish reading the rest of the data
 
640
        num_bytes_to_read = self.test_data_len - to_file.tell()
 
641
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
642
 
 
643
        # report error if the data wasn't equal (we only report the size due
 
644
        # to the length of the data)
 
645
        response_data = to_file.getvalue()
 
646
        if response_data != self.test_data:
 
647
            message = "Data not equal.  Expected %d bytes, received %d."
 
648
            self.fail(message % (len(response_data), self.test_data_len))
 
649
 
 
650
    def test_specified_size(self):
 
651
        """Request a transfer larger than the maximum block size and verify
 
652
        that the maximum read doesn't exceed the block_size."""
 
653
        # make sure test data is larger than max read size
 
654
        self.assertTrue(self.test_data_len > self.block_size)
 
655
 
 
656
        # retrieve data in blocks
 
657
        from_file = file_utils.FakeReadFile(self.test_data)
 
658
        to_file = BytesIO()
 
659
        osutils.pumpfile(from_file, to_file, self.test_data_len,
 
660
                         self.block_size)
 
661
 
 
662
        # verify read size was equal to the maximum read size
 
663
        self.assertTrue(from_file.get_max_read_size() > 0)
 
664
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
665
        self.assertEqual(from_file.get_read_count(), 3)
 
666
 
 
667
        # report error if the data wasn't equal (we only report the size due
 
668
        # to the length of the data)
 
669
        response_data = to_file.getvalue()
 
670
        if response_data != self.test_data:
 
671
            message = "Data not equal.  Expected %d bytes, received %d."
 
672
            self.fail(message % (len(response_data), self.test_data_len))
 
673
 
 
674
    def test_to_eof(self):
 
675
        """Read to end-of-file and verify that the reads are not larger than
 
676
        the maximum read size."""
 
677
        # make sure test data is larger than max read size
 
678
        self.assertTrue(self.test_data_len > self.block_size)
 
679
 
 
680
        # retrieve data to EOF
 
681
        from_file = file_utils.FakeReadFile(self.test_data)
 
682
        to_file = BytesIO()
 
683
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
 
684
 
 
685
        # verify read size was equal to the maximum read size
 
686
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
687
        self.assertEqual(from_file.get_read_count(), 4)
 
688
 
 
689
        # report error if the data wasn't equal (we only report the size due
 
690
        # to the length of the data)
 
691
        response_data = to_file.getvalue()
 
692
        if response_data != self.test_data:
 
693
            message = "Data not equal.  Expected %d bytes, received %d."
 
694
            self.fail(message % (len(response_data), self.test_data_len))
 
695
 
 
696
    def test_defaults(self):
 
697
        """Verifies that the default arguments will read to EOF -- this
 
698
        test verifies that any existing usages of pumpfile will not be broken
 
699
        with this new version."""
 
700
        # retrieve data using default (old) pumpfile method
 
701
        from_file = file_utils.FakeReadFile(self.test_data)
 
702
        to_file = BytesIO()
 
703
        osutils.pumpfile(from_file, to_file)
 
704
 
 
705
        # report error if the data wasn't equal (we only report the size due
 
706
        # to the length of the data)
 
707
        response_data = to_file.getvalue()
 
708
        if response_data != self.test_data:
 
709
            message = "Data not equal.  Expected %d bytes, received %d."
 
710
            self.fail(message % (len(response_data), self.test_data_len))
 
711
 
 
712
    def test_report_activity(self):
 
713
        activity = []
 
714
        def log_activity(length, direction):
 
715
            activity.append((length, direction))
 
716
        from_file = BytesIO(self.test_data)
 
717
        to_file = BytesIO()
 
718
        osutils.pumpfile(from_file, to_file, buff_size=500,
 
719
                         report_activity=log_activity, direction='read')
 
720
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
 
721
                          (36, 'read')], activity)
 
722
 
 
723
        from_file = BytesIO(self.test_data)
 
724
        to_file = BytesIO()
 
725
        del activity[:]
 
726
        osutils.pumpfile(from_file, to_file, buff_size=500,
 
727
                         report_activity=log_activity, direction='write')
 
728
        self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
 
729
                          (36, 'write')], activity)
 
730
 
 
731
        # And with a limited amount of data
 
732
        from_file = BytesIO(self.test_data)
 
733
        to_file = BytesIO()
 
734
        del activity[:]
 
735
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
 
736
                         report_activity=log_activity, direction='read')
 
737
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
 
738
 
 
739
 
 
740
 
 
741
class TestPumpStringFile(tests.TestCase):
 
742
 
 
743
    def test_empty(self):
 
744
        output = BytesIO()
 
745
        osutils.pump_string_file(b"", output)
 
746
        self.assertEqual(b"", output.getvalue())
 
747
 
 
748
    def test_more_than_segment_size(self):
 
749
        output = BytesIO()
 
750
        osutils.pump_string_file(b"123456789", output, 2)
 
751
        self.assertEqual(b"123456789", output.getvalue())
 
752
 
 
753
    def test_segment_size(self):
 
754
        output = BytesIO()
 
755
        osutils.pump_string_file(b"12", output, 2)
 
756
        self.assertEqual(b"12", output.getvalue())
 
757
 
 
758
    def test_segment_size_multiple(self):
 
759
        output = BytesIO()
 
760
        osutils.pump_string_file(b"1234", output, 2)
 
761
        self.assertEqual(b"1234", output.getvalue())
 
762
 
 
763
 
 
764
class TestRelpath(tests.TestCase):
 
765
 
 
766
    def test_simple_relpath(self):
 
767
        cwd = osutils.getcwd()
 
768
        subdir = cwd + '/subdir'
 
769
        self.assertEqual('subdir', osutils.relpath(cwd, subdir))
 
770
 
 
771
    def test_deep_relpath(self):
 
772
        cwd = osutils.getcwd()
 
773
        subdir = cwd + '/sub/subsubdir'
 
774
        self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
 
775
 
 
776
    def test_not_relative(self):
 
777
        self.assertRaises(errors.PathNotChild,
 
778
                          osutils.relpath, 'C:/path', 'H:/path')
 
779
        self.assertRaises(errors.PathNotChild,
 
780
                          osutils.relpath, 'C:/', 'H:/path')
 
781
 
 
782
 
 
783
class TestSafeUnicode(tests.TestCase):
 
784
 
 
785
    def test_from_ascii_string(self):
 
786
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
 
787
 
 
788
    def test_from_unicode_string_ascii_contents(self):
 
789
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
 
790
 
 
791
    def test_from_unicode_string_unicode_contents(self):
 
792
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
 
793
 
 
794
    def test_from_utf8_string(self):
 
795
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
 
796
 
 
797
    def test_bad_utf8_string(self):
 
798
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
799
                          osutils.safe_unicode,
 
800
                          '\xbb\xbb')
 
801
 
 
802
 
 
803
class TestSafeUtf8(tests.TestCase):
 
804
 
 
805
    def test_from_ascii_string(self):
 
806
        f = 'foobar'
 
807
        self.assertEqual('foobar', osutils.safe_utf8(f))
 
808
 
 
809
    def test_from_unicode_string_ascii_contents(self):
 
810
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
 
811
 
 
812
    def test_from_unicode_string_unicode_contents(self):
 
813
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
814
 
 
815
    def test_from_utf8_string(self):
 
816
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
 
817
 
 
818
    def test_bad_utf8_string(self):
 
819
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
820
                          osutils.safe_utf8, '\xbb\xbb')
 
821
 
 
822
 
 
823
class TestSafeRevisionId(tests.TestCase):
 
824
 
 
825
    def test_from_ascii_string(self):
 
826
        # this shouldn't give a warning because it's getting an ascii string
 
827
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
828
 
 
829
    def test_from_unicode_string_ascii_contents(self):
 
830
        self.assertRaises(TypeError,
 
831
                          osutils.safe_revision_id, u'bargam')
 
832
 
 
833
    def test_from_unicode_string_unicode_contents(self):
 
834
        self.assertRaises(TypeError,
 
835
                         osutils.safe_revision_id, u'bargam\xae')
 
836
 
 
837
    def test_from_utf8_string(self):
 
838
        self.assertEqual('foo\xc2\xae',
 
839
                         osutils.safe_revision_id('foo\xc2\xae'))
 
840
 
 
841
    def test_none(self):
 
842
        """Currently, None is a valid revision_id"""
 
843
        self.assertEqual(None, osutils.safe_revision_id(None))
 
844
 
 
845
 
 
846
class TestSafeFileId(tests.TestCase):
 
847
 
 
848
    def test_from_ascii_string(self):
 
849
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
850
 
 
851
    def test_from_unicode_string_ascii_contents(self):
 
852
        self.assertRaises(TypeError, osutils.safe_file_id, u'bargam')
 
853
 
 
854
    def test_from_unicode_string_unicode_contents(self):
 
855
        self.assertRaises(TypeError,
 
856
                          osutils.safe_file_id, u'bargam\xae')
 
857
 
 
858
    def test_from_utf8_string(self):
 
859
        self.assertEqual('foo\xc2\xae',
 
860
                         osutils.safe_file_id('foo\xc2\xae'))
 
861
 
 
862
    def test_none(self):
 
863
        """Currently, None is a valid revision_id"""
 
864
        self.assertEqual(None, osutils.safe_file_id(None))
 
865
 
 
866
 
 
867
class TestSendAll(tests.TestCase):
 
868
 
 
869
    def test_send_with_disconnected_socket(self):
 
870
        class DisconnectedSocket(object):
 
871
            def __init__(self, err):
 
872
                self.err = err
 
873
            def send(self, content):
 
874
                raise self.err
 
875
            def close(self):
 
876
                pass
 
877
        # All of these should be treated as ConnectionReset
 
878
        errs = []
 
879
        for err_cls in (IOError, socket.error):
 
880
            for errnum in osutils._end_of_stream_errors:
 
881
                errs.append(err_cls(errnum))
 
882
        for err in errs:
 
883
            sock = DisconnectedSocket(err)
 
884
            self.assertRaises(errors.ConnectionReset,
 
885
                osutils.send_all, sock, b'some more content')
 
886
 
 
887
    def test_send_with_no_progress(self):
 
888
        # See https://bugs.launchpad.net/bzr/+bug/1047309
 
889
        # It seems that paramiko can get into a state where it doesn't error,
 
890
        # but it returns 0 bytes sent for requests over and over again.
 
891
        class NoSendingSocket(object):
 
892
            def __init__(self):
 
893
                self.call_count = 0
 
894
            def send(self, bytes):
 
895
                self.call_count += 1
 
896
                if self.call_count > 100:
 
897
                    # Prevent the test suite from hanging
 
898
                    raise RuntimeError('too many calls')
 
899
                return 0
 
900
        sock = NoSendingSocket()
 
901
        self.assertRaises(errors.ConnectionReset,
 
902
                          osutils.send_all, sock, b'content')
 
903
        self.assertEqual(1, sock.call_count)
 
904
 
 
905
 
 
906
class TestPosixFuncs(tests.TestCase):
 
907
    """Test that the posix version of normpath returns an appropriate path
 
908
       when used with 2 leading slashes."""
 
909
 
 
910
    def test_normpath(self):
 
911
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
 
912
        self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
913
        self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
914
 
 
915
 
 
916
class TestWin32Funcs(tests.TestCase):
 
917
    """Test that _win32 versions of os utilities return appropriate paths."""
 
918
 
 
919
    def test_abspath(self):
 
920
        self.requireFeature(features.win32_feature)
 
921
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
 
922
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
 
923
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
 
924
        self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
 
925
 
 
926
    def test_realpath(self):
 
927
        self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
 
928
        self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
 
929
 
 
930
    def test_pathjoin(self):
 
931
        self.assertEqual('path/to/foo',
 
932
                         osutils._win32_pathjoin('path', 'to', 'foo'))
 
933
        self.assertEqual('C:/foo',
 
934
                         osutils._win32_pathjoin('path\\to', 'C:\\foo'))
 
935
        self.assertEqual('C:/foo',
 
936
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
 
937
        self.assertEqual('path/to/foo',
 
938
                         osutils._win32_pathjoin('path/to/', 'foo'))
 
939
 
 
940
    def test_pathjoin_late_bugfix(self):
 
941
        if sys.version_info < (2, 7, 6):
 
942
            expected = '/foo'
 
943
        else:
 
944
            expected = 'C:/foo'
 
945
        self.assertEqual(expected,
 
946
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
 
947
        self.assertEqual(expected,
 
948
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
 
949
 
 
950
    def test_normpath(self):
 
951
        self.assertEqual('path/to/foo',
 
952
                         osutils._win32_normpath(r'path\\from\..\to\.\foo'))
 
953
        self.assertEqual('path/to/foo',
 
954
                         osutils._win32_normpath('path//from/../to/./foo'))
 
955
 
 
956
    def test_getcwd(self):
 
957
        cwd = osutils._win32_getcwd()
 
958
        os_cwd = osutils._getcwd()
 
959
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
 
960
        # win32 is inconsistent whether it returns lower or upper case
 
961
        # and even if it was consistent the user might type the other
 
962
        # so we force it to uppercase
 
963
        # running python.exe under cmd.exe return capital C:\\
 
964
        # running win32 python inside a cygwin shell returns lowercase
 
965
        self.assertEqual(os_cwd[0].upper(), cwd[0])
 
966
 
 
967
    def test_fixdrive(self):
 
968
        self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
 
969
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
 
970
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
 
971
 
 
972
 
 
973
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
 
974
    """Test win32 functions that create files."""
 
975
 
 
976
    def test_getcwd(self):
 
977
        self.requireFeature(features.UnicodeFilenameFeature)
 
978
        os.mkdir(u'mu-\xb5')
 
979
        os.chdir(u'mu-\xb5')
 
980
        # TODO: jam 20060427 This will probably fail on Mac OSX because
 
981
        #       it will change the normalization of B\xe5gfors
 
982
        #       Consider using a different unicode character, or make
 
983
        #       osutils.getcwd() renormalize the path.
 
984
        self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
 
985
 
 
986
    def test_minimum_path_selection(self):
 
987
        self.assertEqual(set(),
 
988
            osutils.minimum_path_selection([]))
 
989
        self.assertEqual({'a'},
 
990
            osutils.minimum_path_selection(['a']))
 
991
        self.assertEqual({'a', 'b'},
 
992
            osutils.minimum_path_selection(['a', 'b']))
 
993
        self.assertEqual({'a/', 'b'},
 
994
            osutils.minimum_path_selection(['a/', 'b']))
 
995
        self.assertEqual({'a/', 'b'},
 
996
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
997
        self.assertEqual({'a-b', 'a', 'a0b'},
 
998
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
 
999
 
 
1000
    def test_mkdtemp(self):
 
1001
        tmpdir = osutils._win32_mkdtemp(dir='.')
 
1002
        self.assertFalse('\\' in tmpdir)
 
1003
 
 
1004
    def test_rename(self):
 
1005
        a = open('a', 'wb')
 
1006
        a.write('foo\n')
 
1007
        a.close()
 
1008
        b = open('b', 'wb')
 
1009
        b.write('baz\n')
 
1010
        b.close()
 
1011
 
 
1012
        osutils._win32_rename('b', 'a')
 
1013
        self.assertPathExists('a')
 
1014
        self.assertPathDoesNotExist('b')
 
1015
        self.assertFileEqual('baz\n', 'a')
 
1016
 
 
1017
    def test_rename_missing_file(self):
 
1018
        a = open('a', 'wb')
 
1019
        a.write('foo\n')
 
1020
        a.close()
 
1021
 
 
1022
        try:
 
1023
            osutils._win32_rename('b', 'a')
 
1024
        except (IOError, OSError) as e:
 
1025
            self.assertEqual(errno.ENOENT, e.errno)
 
1026
        self.assertFileEqual('foo\n', 'a')
 
1027
 
 
1028
    def test_rename_missing_dir(self):
 
1029
        os.mkdir('a')
 
1030
        try:
 
1031
            osutils._win32_rename('b', 'a')
 
1032
        except (IOError, OSError) as e:
 
1033
            self.assertEqual(errno.ENOENT, e.errno)
 
1034
 
 
1035
    def test_rename_current_dir(self):
 
1036
        os.mkdir('a')
 
1037
        os.chdir('a')
 
1038
        # You can't rename the working directory
 
1039
        # doing rename non-existant . usually
 
1040
        # just raises ENOENT, since non-existant
 
1041
        # doesn't exist.
 
1042
        try:
 
1043
            osutils._win32_rename('b', '.')
 
1044
        except (IOError, OSError) as e:
 
1045
            self.assertEqual(errno.ENOENT, e.errno)
 
1046
 
 
1047
    def test_splitpath(self):
 
1048
        def check(expected, path):
 
1049
            self.assertEqual(expected, osutils.splitpath(path))
 
1050
 
 
1051
        check(['a'], 'a')
 
1052
        check(['a', 'b'], 'a/b')
 
1053
        check(['a', 'b'], 'a/./b')
 
1054
        check(['a', '.b'], 'a/.b')
 
1055
        check(['a', '.b'], 'a\\.b')
 
1056
 
 
1057
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
 
1058
 
 
1059
 
 
1060
class TestParentDirectories(tests.TestCaseInTempDir):
 
1061
    """Test osutils.parent_directories()"""
 
1062
 
 
1063
    def test_parent_directories(self):
 
1064
        self.assertEqual([], osutils.parent_directories('a'))
 
1065
        self.assertEqual(['a'], osutils.parent_directories('a/b'))
 
1066
        self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
 
1067
 
 
1068
 
 
1069
class TestMacFuncsDirs(tests.TestCaseInTempDir):
 
1070
    """Test mac special functions that require directories."""
 
1071
 
 
1072
    def test_getcwd(self):
 
1073
        self.requireFeature(features.UnicodeFilenameFeature)
 
1074
        os.mkdir(u'B\xe5gfors')
 
1075
        os.chdir(u'B\xe5gfors')
 
1076
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
1077
 
 
1078
    def test_getcwd_nonnorm(self):
 
1079
        self.requireFeature(features.UnicodeFilenameFeature)
 
1080
        # Test that _mac_getcwd() will normalize this path
 
1081
        os.mkdir(u'Ba\u030agfors')
 
1082
        os.chdir(u'Ba\u030agfors')
 
1083
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
1084
 
 
1085
 
 
1086
class TestChunksToLines(tests.TestCase):
 
1087
 
 
1088
    def test_smoketest(self):
 
1089
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
1090
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
 
1091
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
1092
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
 
1093
 
 
1094
    def test_osutils_binding(self):
 
1095
        from . import test__chunks_to_lines
 
1096
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
 
1097
            from .._chunks_to_lines_pyx import chunks_to_lines
 
1098
        else:
 
1099
            from .._chunks_to_lines_py import chunks_to_lines
 
1100
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
 
1101
 
 
1102
 
 
1103
class TestSplitLines(tests.TestCase):
 
1104
 
 
1105
    def test_split_unicode(self):
 
1106
        self.assertEqual([u'foo\n', u'bar\xae'],
 
1107
                         osutils.split_lines(u'foo\nbar\xae'))
 
1108
        self.assertEqual([u'foo\n', u'bar\xae\n'],
 
1109
                         osutils.split_lines(u'foo\nbar\xae\n'))
 
1110
 
 
1111
    def test_split_with_carriage_returns(self):
 
1112
        self.assertEqual(['foo\rbar\n'],
 
1113
                         osutils.split_lines('foo\rbar\n'))
 
1114
 
 
1115
 
 
1116
class TestWalkDirs(tests.TestCaseInTempDir):
 
1117
 
 
1118
    def assertExpectedBlocks(self, expected, result):
 
1119
        self.assertEqual(expected,
 
1120
                         [(dirinfo, [line[0:3] for line in block])
 
1121
                          for dirinfo, block in result])
 
1122
 
 
1123
    def test_walkdirs(self):
 
1124
        tree = [
 
1125
            '.bzr',
 
1126
            '0file',
 
1127
            '1dir/',
 
1128
            '1dir/0file',
 
1129
            '1dir/1dir/',
 
1130
            '2file'
 
1131
            ]
 
1132
        self.build_tree(tree)
 
1133
        expected_dirblocks = [
 
1134
                (('', '.'),
 
1135
                 [('0file', '0file', 'file'),
 
1136
                  ('1dir', '1dir', 'directory'),
 
1137
                  ('2file', '2file', 'file'),
 
1138
                 ]
 
1139
                ),
 
1140
                (('1dir', './1dir'),
 
1141
                 [('1dir/0file', '0file', 'file'),
 
1142
                  ('1dir/1dir', '1dir', 'directory'),
 
1143
                 ]
 
1144
                ),
 
1145
                (('1dir/1dir', './1dir/1dir'),
 
1146
                 [
 
1147
                 ]
 
1148
                ),
 
1149
            ]
 
1150
        result = []
 
1151
        found_bzrdir = False
 
1152
        for dirdetail, dirblock in osutils.walkdirs('.'):
 
1153
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
1154
                # this tests the filtering of selected paths
 
1155
                found_bzrdir = True
 
1156
                del dirblock[0]
 
1157
            result.append((dirdetail, dirblock))
 
1158
 
 
1159
        self.assertTrue(found_bzrdir)
 
1160
        self.assertExpectedBlocks(expected_dirblocks, result)
 
1161
        # you can search a subdir only, with a supplied prefix.
 
1162
        result = []
 
1163
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
1164
            result.append(dirblock)
 
1165
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
 
1166
 
 
1167
    def test_walkdirs_os_error(self):
 
1168
        # <https://bugs.launchpad.net/bzr/+bug/338653>
 
1169
        # Pyrex readdir didn't raise useful messages if it had an error
 
1170
        # reading the directory
 
1171
        if sys.platform == 'win32':
 
1172
            raise tests.TestNotApplicable(
 
1173
                "readdir IOError not tested on win32")
 
1174
        self.requireFeature(features.not_running_as_root)
 
1175
        os.mkdir("test-unreadable")
 
1176
        os.chmod("test-unreadable", 0000)
 
1177
        # must chmod it back so that it can be removed
 
1178
        self.addCleanup(os.chmod, "test-unreadable", 0o700)
 
1179
        # The error is not raised until the generator is actually evaluated.
 
1180
        # (It would be ok if it happened earlier but at the moment it
 
1181
        # doesn't.)
 
1182
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
 
1183
        self.assertEqual('./test-unreadable', e.filename)
 
1184
        self.assertEqual(errno.EACCES, e.errno)
 
1185
        # Ensure the message contains the file name
 
1186
        self.assertContainsRe(str(e), "\\./test-unreadable")
 
1187
 
 
1188
 
 
1189
    def test_walkdirs_encoding_error(self):
 
1190
        # <https://bugs.launchpad.net/bzr/+bug/488519>
 
1191
        # walkdirs didn't raise a useful message when the filenames
 
1192
        # are not using the filesystem's encoding
 
1193
 
 
1194
        # require a bytestring based filesystem
 
1195
        self.requireFeature(features.ByteStringNamedFilesystem)
 
1196
 
 
1197
        tree = [
 
1198
            '.bzr',
 
1199
            '0file',
 
1200
            '1dir/',
 
1201
            '1dir/0file',
 
1202
            '1dir/1dir/',
 
1203
            '1file'
 
1204
            ]
 
1205
 
 
1206
        self.build_tree(tree)
 
1207
 
 
1208
        # rename the 1file to a latin-1 filename
 
1209
        os.rename("./1file", "\xe8file")
 
1210
        if "\xe8file" not in os.listdir("."):
 
1211
            self.skipTest("Lack filesystem that preserves arbitrary bytes")
 
1212
 
 
1213
        self._save_platform_info()
 
1214
        osutils._fs_enc = 'UTF-8'
 
1215
 
 
1216
        # this should raise on error
 
1217
        def attempt():
 
1218
            for dirdetail, dirblock in osutils.walkdirs('.'):
 
1219
                pass
 
1220
 
 
1221
        self.assertRaises(errors.BadFilenameEncoding, attempt)
 
1222
 
 
1223
    def test__walkdirs_utf8(self):
 
1224
        tree = [
 
1225
            '.bzr',
 
1226
            '0file',
 
1227
            '1dir/',
 
1228
            '1dir/0file',
 
1229
            '1dir/1dir/',
 
1230
            '2file'
 
1231
            ]
 
1232
        self.build_tree(tree)
 
1233
        expected_dirblocks = [
 
1234
                (('', '.'),
 
1235
                 [('0file', '0file', 'file'),
 
1236
                  ('1dir', '1dir', 'directory'),
 
1237
                  ('2file', '2file', 'file'),
 
1238
                 ]
 
1239
                ),
 
1240
                (('1dir', './1dir'),
 
1241
                 [('1dir/0file', '0file', 'file'),
 
1242
                  ('1dir/1dir', '1dir', 'directory'),
 
1243
                 ]
 
1244
                ),
 
1245
                (('1dir/1dir', './1dir/1dir'),
 
1246
                 [
 
1247
                 ]
 
1248
                ),
 
1249
            ]
 
1250
        result = []
 
1251
        found_bzrdir = False
 
1252
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1253
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
1254
                # this tests the filtering of selected paths
 
1255
                found_bzrdir = True
 
1256
                del dirblock[0]
 
1257
            result.append((dirdetail, dirblock))
 
1258
 
 
1259
        self.assertTrue(found_bzrdir)
 
1260
        self.assertExpectedBlocks(expected_dirblocks, result)
 
1261
 
 
1262
        # you can search a subdir only, with a supplied prefix.
 
1263
        result = []
 
1264
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
1265
            result.append(dirblock)
 
1266
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
 
1267
 
 
1268
    def _filter_out_stat(self, result):
 
1269
        """Filter out the stat value from the walkdirs result"""
 
1270
        for dirdetail, dirblock in result:
 
1271
            new_dirblock = []
 
1272
            for info in dirblock:
 
1273
                # Ignore info[3] which is the stat
 
1274
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
1275
            dirblock[:] = new_dirblock
 
1276
 
 
1277
    def _save_platform_info(self):
 
1278
        self.overrideAttr(osutils, '_fs_enc')
 
1279
        self.overrideAttr(osutils, '_selected_dir_reader')
 
1280
 
 
1281
    def assertDirReaderIs(self, expected):
 
1282
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
 
1283
        # Force it to redetect
 
1284
        osutils._selected_dir_reader = None
 
1285
        # Nothing to list, but should still trigger the selection logic
 
1286
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
 
1287
        self.assertIsInstance(osutils._selected_dir_reader, expected)
 
1288
 
 
1289
    def test_force_walkdirs_utf8_fs_utf8(self):
 
1290
        self.requireFeature(UTF8DirReaderFeature)
 
1291
        self._save_platform_info()
 
1292
        osutils._fs_enc = 'utf-8'
 
1293
        self.assertDirReaderIs(
 
1294
            UTF8DirReaderFeature.module.UTF8DirReader)
 
1295
 
 
1296
    def test_force_walkdirs_utf8_fs_ascii(self):
 
1297
        self.requireFeature(UTF8DirReaderFeature)
 
1298
        self._save_platform_info()
 
1299
        osutils._fs_enc = 'ascii'
 
1300
        self.assertDirReaderIs(
 
1301
            UTF8DirReaderFeature.module.UTF8DirReader)
 
1302
 
 
1303
    def test_force_walkdirs_utf8_fs_latin1(self):
 
1304
        self._save_platform_info()
 
1305
        osutils._fs_enc = 'iso-8859-1'
 
1306
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
1307
 
 
1308
    def test_force_walkdirs_utf8_nt(self):
 
1309
        # Disabled because the thunk of the whole walkdirs api is disabled.
 
1310
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1311
        self._save_platform_info()
 
1312
        from .._walkdirs_win32 import Win32ReadDir
 
1313
        self.assertDirReaderIs(Win32ReadDir)
 
1314
 
 
1315
    def test_unicode_walkdirs(self):
 
1316
        """Walkdirs should always return unicode paths."""
 
1317
        self.requireFeature(features.UnicodeFilenameFeature)
 
1318
        name0 = u'0file-\xb6'
 
1319
        name1 = u'1dir-\u062c\u0648'
 
1320
        name2 = u'2file-\u0633'
 
1321
        tree = [
 
1322
            name0,
 
1323
            name1 + '/',
 
1324
            name1 + '/' + name0,
 
1325
            name1 + '/' + name1 + '/',
 
1326
            name2,
 
1327
            ]
 
1328
        self.build_tree(tree)
 
1329
        expected_dirblocks = [
 
1330
                ((u'', u'.'),
 
1331
                 [(name0, name0, 'file', './' + name0),
 
1332
                  (name1, name1, 'directory', './' + name1),
 
1333
                  (name2, name2, 'file', './' + name2),
 
1334
                 ]
 
1335
                ),
 
1336
                ((name1, './' + name1),
 
1337
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1338
                                                        + '/' + name0),
 
1339
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1340
                                                            + '/' + name1),
 
1341
                 ]
 
1342
                ),
 
1343
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1344
                 [
 
1345
                 ]
 
1346
                ),
 
1347
            ]
 
1348
        result = list(osutils.walkdirs('.'))
 
1349
        self._filter_out_stat(result)
 
1350
        self.assertEqual(expected_dirblocks, result)
 
1351
        result = list(osutils.walkdirs(u'./'+name1, name1))
 
1352
        self._filter_out_stat(result)
 
1353
        self.assertEqual(expected_dirblocks[1:], result)
 
1354
 
 
1355
    def test_unicode__walkdirs_utf8(self):
 
1356
        """Walkdirs_utf8 should always return utf8 paths.
 
1357
 
 
1358
        The abspath portion might be in unicode or utf-8
 
1359
        """
 
1360
        self.requireFeature(features.UnicodeFilenameFeature)
 
1361
        name0 = u'0file-\xb6'
 
1362
        name1 = u'1dir-\u062c\u0648'
 
1363
        name2 = u'2file-\u0633'
 
1364
        tree = [
 
1365
            name0,
 
1366
            name1 + '/',
 
1367
            name1 + '/' + name0,
 
1368
            name1 + '/' + name1 + '/',
 
1369
            name2,
 
1370
            ]
 
1371
        self.build_tree(tree)
 
1372
        name0 = name0.encode('utf8')
 
1373
        name1 = name1.encode('utf8')
 
1374
        name2 = name2.encode('utf8')
 
1375
 
 
1376
        expected_dirblocks = [
 
1377
                (('', '.'),
 
1378
                 [(name0, name0, 'file', './' + name0),
 
1379
                  (name1, name1, 'directory', './' + name1),
 
1380
                  (name2, name2, 'file', './' + name2),
 
1381
                 ]
 
1382
                ),
 
1383
                ((name1, './' + name1),
 
1384
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1385
                                                        + '/' + name0),
 
1386
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1387
                                                            + '/' + name1),
 
1388
                 ]
 
1389
                ),
 
1390
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1391
                 [
 
1392
                 ]
 
1393
                ),
 
1394
            ]
 
1395
        result = []
 
1396
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
 
1397
        # all abspaths are Unicode, and encode them back into utf8.
 
1398
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1399
            self.assertIsInstance(dirdetail[0], str)
 
1400
            if isinstance(dirdetail[1], unicode):
 
1401
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
 
1402
                dirblock = [list(info) for info in dirblock]
 
1403
                for info in dirblock:
 
1404
                    self.assertIsInstance(info[4], unicode)
 
1405
                    info[4] = info[4].encode('utf8')
 
1406
            new_dirblock = []
 
1407
            for info in dirblock:
 
1408
                self.assertIsInstance(info[0], str)
 
1409
                self.assertIsInstance(info[1], str)
 
1410
                self.assertIsInstance(info[4], str)
 
1411
                # Remove the stat information
 
1412
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
1413
            result.append((dirdetail, new_dirblock))
 
1414
        self.assertEqual(expected_dirblocks, result)
 
1415
 
 
1416
    def test__walkdirs_utf8_with_unicode_fs(self):
 
1417
        """UnicodeDirReader should be a safe fallback everywhere
 
1418
 
 
1419
        The abspath portion should be in unicode
 
1420
        """
 
1421
        self.requireFeature(features.UnicodeFilenameFeature)
 
1422
        # Use the unicode reader. TODO: split into driver-and-driven unit
 
1423
        # tests.
 
1424
        self._save_platform_info()
 
1425
        osutils._selected_dir_reader = osutils.UnicodeDirReader()
 
1426
        name0u = u'0file-\xb6'
 
1427
        name1u = u'1dir-\u062c\u0648'
 
1428
        name2u = u'2file-\u0633'
 
1429
        tree = [
 
1430
            name0u,
 
1431
            name1u + '/',
 
1432
            name1u + '/' + name0u,
 
1433
            name1u + '/' + name1u + '/',
 
1434
            name2u,
 
1435
            ]
 
1436
        self.build_tree(tree)
 
1437
        name0 = name0u.encode('utf8')
 
1438
        name1 = name1u.encode('utf8')
 
1439
        name2 = name2u.encode('utf8')
 
1440
 
 
1441
        # All of the abspaths should be in unicode, all of the relative paths
 
1442
        # should be in utf8
 
1443
        expected_dirblocks = [
 
1444
                (('', '.'),
 
1445
                 [(name0, name0, 'file', './' + name0u),
 
1446
                  (name1, name1, 'directory', './' + name1u),
 
1447
                  (name2, name2, 'file', './' + name2u),
 
1448
                 ]
 
1449
                ),
 
1450
                ((name1, './' + name1u),
 
1451
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1452
                                                        + '/' + name0u),
 
1453
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1454
                                                            + '/' + name1u),
 
1455
                 ]
 
1456
                ),
 
1457
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1458
                 [
 
1459
                 ]
 
1460
                ),
 
1461
            ]
 
1462
        result = list(osutils._walkdirs_utf8('.'))
 
1463
        self._filter_out_stat(result)
 
1464
        self.assertEqual(expected_dirblocks, result)
 
1465
 
 
1466
    def test__walkdirs_utf8_win32readdir(self):
 
1467
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1468
        self.requireFeature(features.UnicodeFilenameFeature)
 
1469
        from .._walkdirs_win32 import Win32ReadDir
 
1470
        self._save_platform_info()
 
1471
        osutils._selected_dir_reader = Win32ReadDir()
 
1472
        name0u = u'0file-\xb6'
 
1473
        name1u = u'1dir-\u062c\u0648'
 
1474
        name2u = u'2file-\u0633'
 
1475
        tree = [
 
1476
            name0u,
 
1477
            name1u + '/',
 
1478
            name1u + '/' + name0u,
 
1479
            name1u + '/' + name1u + '/',
 
1480
            name2u,
 
1481
            ]
 
1482
        self.build_tree(tree)
 
1483
        name0 = name0u.encode('utf8')
 
1484
        name1 = name1u.encode('utf8')
 
1485
        name2 = name2u.encode('utf8')
 
1486
 
 
1487
        # All of the abspaths should be in unicode, all of the relative paths
 
1488
        # should be in utf8
 
1489
        expected_dirblocks = [
 
1490
                (('', '.'),
 
1491
                 [(name0, name0, 'file', './' + name0u),
 
1492
                  (name1, name1, 'directory', './' + name1u),
 
1493
                  (name2, name2, 'file', './' + name2u),
 
1494
                 ]
 
1495
                ),
 
1496
                ((name1, './' + name1u),
 
1497
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1498
                                                        + '/' + name0u),
 
1499
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1500
                                                            + '/' + name1u),
 
1501
                 ]
 
1502
                ),
 
1503
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1504
                 [
 
1505
                 ]
 
1506
                ),
 
1507
            ]
 
1508
        result = list(osutils._walkdirs_utf8(u'.'))
 
1509
        self._filter_out_stat(result)
 
1510
        self.assertEqual(expected_dirblocks, result)
 
1511
 
 
1512
    def assertStatIsCorrect(self, path, win32stat):
 
1513
        os_stat = os.stat(path)
 
1514
        self.assertEqual(os_stat.st_size, win32stat.st_size)
 
1515
        self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
 
1516
        self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
 
1517
        self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
 
1518
        self.assertEqual(os_stat.st_dev, win32stat.st_dev)
 
1519
        self.assertEqual(os_stat.st_ino, win32stat.st_ino)
 
1520
        self.assertEqual(os_stat.st_mode, win32stat.st_mode)
 
1521
 
 
1522
    def test__walkdirs_utf_win32_find_file_stat_file(self):
 
1523
        """make sure our Stat values are valid"""
 
1524
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1525
        self.requireFeature(features.UnicodeFilenameFeature)
 
1526
        from .._walkdirs_win32 import Win32ReadDir
 
1527
        name0u = u'0file-\xb6'
 
1528
        name0 = name0u.encode('utf8')
 
1529
        self.build_tree([name0u])
 
1530
        # I hate to sleep() here, but I'm trying to make the ctime different
 
1531
        # from the mtime
 
1532
        time.sleep(2)
 
1533
        f = open(name0u, 'ab')
 
1534
        try:
 
1535
            f.write('just a small update')
 
1536
        finally:
 
1537
            f.close()
 
1538
 
 
1539
        result = Win32ReadDir().read_dir('', u'.')
 
1540
        entry = result[0]
 
1541
        self.assertEqual((name0, name0, 'file'), entry[:3])
 
1542
        self.assertEqual(u'./' + name0u, entry[4])
 
1543
        self.assertStatIsCorrect(entry[4], entry[3])
 
1544
        self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
 
1545
 
 
1546
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
 
1547
        """make sure our Stat values are valid"""
 
1548
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1549
        self.requireFeature(features.UnicodeFilenameFeature)
 
1550
        from .._walkdirs_win32 import Win32ReadDir
 
1551
        name0u = u'0dir-\u062c\u0648'
 
1552
        name0 = name0u.encode('utf8')
 
1553
        self.build_tree([name0u + '/'])
 
1554
 
 
1555
        result = Win32ReadDir().read_dir('', u'.')
 
1556
        entry = result[0]
 
1557
        self.assertEqual((name0, name0, 'directory'), entry[:3])
 
1558
        self.assertEqual(u'./' + name0u, entry[4])
 
1559
        self.assertStatIsCorrect(entry[4], entry[3])
 
1560
 
 
1561
    def assertPathCompare(self, path_less, path_greater):
 
1562
        """check that path_less and path_greater compare correctly."""
 
1563
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
1564
            path_less, path_less))
 
1565
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
1566
            path_greater, path_greater))
 
1567
        self.assertEqual(-1, osutils.compare_paths_prefix_order(
 
1568
            path_less, path_greater))
 
1569
        self.assertEqual(1, osutils.compare_paths_prefix_order(
 
1570
            path_greater, path_less))
 
1571
 
 
1572
    def test_compare_paths_prefix_order(self):
 
1573
        # root before all else
 
1574
        self.assertPathCompare("/", "/a")
 
1575
        # alpha within a dir
 
1576
        self.assertPathCompare("/a", "/b")
 
1577
        self.assertPathCompare("/b", "/z")
 
1578
        # high dirs before lower.
 
1579
        self.assertPathCompare("/z", "/a/a")
 
1580
        # except if the deeper dir should be output first
 
1581
        self.assertPathCompare("/a/b/c", "/d/g")
 
1582
        # lexical betwen dirs of the same height
 
1583
        self.assertPathCompare("/a/z", "/z/z")
 
1584
        self.assertPathCompare("/a/c/z", "/a/d/e")
 
1585
 
 
1586
        # this should also be consistent for no leading / paths
 
1587
        # root before all else
 
1588
        self.assertPathCompare("", "a")
 
1589
        # alpha within a dir
 
1590
        self.assertPathCompare("a", "b")
 
1591
        self.assertPathCompare("b", "z")
 
1592
        # high dirs before lower.
 
1593
        self.assertPathCompare("z", "a/a")
 
1594
        # except if the deeper dir should be output first
 
1595
        self.assertPathCompare("a/b/c", "d/g")
 
1596
        # lexical betwen dirs of the same height
 
1597
        self.assertPathCompare("a/z", "z/z")
 
1598
        self.assertPathCompare("a/c/z", "a/d/e")
 
1599
 
 
1600
    def test_path_prefix_sorting(self):
 
1601
        """Doing a sort on path prefix should match our sample data."""
 
1602
        original_paths = [
 
1603
            'a',
 
1604
            'a/b',
 
1605
            'a/b/c',
 
1606
            'b',
 
1607
            'b/c',
 
1608
            'd',
 
1609
            'd/e',
 
1610
            'd/e/f',
 
1611
            'd/f',
 
1612
            'd/g',
 
1613
            'g',
 
1614
            ]
 
1615
 
 
1616
        dir_sorted_paths = [
 
1617
            'a',
 
1618
            'b',
 
1619
            'd',
 
1620
            'g',
 
1621
            'a/b',
 
1622
            'a/b/c',
 
1623
            'b/c',
 
1624
            'd/e',
 
1625
            'd/f',
 
1626
            'd/g',
 
1627
            'd/e/f',
 
1628
            ]
 
1629
 
 
1630
        self.assertEqual(
 
1631
            dir_sorted_paths,
 
1632
            sorted(original_paths, key=osutils.path_prefix_key))
 
1633
        # using the comparison routine shoudl work too:
 
1634
        self.assertEqual(
 
1635
            dir_sorted_paths,
 
1636
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
 
1637
 
 
1638
 
 
1639
class TestCopyTree(tests.TestCaseInTempDir):
 
1640
 
 
1641
    def test_copy_basic_tree(self):
 
1642
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1643
        osutils.copy_tree('source', 'target')
 
1644
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1645
        self.assertEqual(['c'], os.listdir('target/b'))
 
1646
 
 
1647
    def test_copy_tree_target_exists(self):
 
1648
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
 
1649
                         'target/'])
 
1650
        osutils.copy_tree('source', 'target')
 
1651
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1652
        self.assertEqual(['c'], os.listdir('target/b'))
 
1653
 
 
1654
    def test_copy_tree_symlinks(self):
 
1655
        self.requireFeature(features.SymlinkFeature)
 
1656
        self.build_tree(['source/'])
 
1657
        os.symlink('a/generic/path', 'source/lnk')
 
1658
        osutils.copy_tree('source', 'target')
 
1659
        self.assertEqual(['lnk'], os.listdir('target'))
 
1660
        self.assertEqual('a/generic/path', os.readlink('target/lnk'))
 
1661
 
 
1662
    def test_copy_tree_handlers(self):
 
1663
        processed_files = []
 
1664
        processed_links = []
 
1665
        def file_handler(from_path, to_path):
 
1666
            processed_files.append(('f', from_path, to_path))
 
1667
        def dir_handler(from_path, to_path):
 
1668
            processed_files.append(('d', from_path, to_path))
 
1669
        def link_handler(from_path, to_path):
 
1670
            processed_links.append((from_path, to_path))
 
1671
        handlers = {'file': file_handler,
 
1672
                    'directory': dir_handler,
 
1673
                    'symlink': link_handler,
 
1674
                   }
 
1675
 
 
1676
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1677
        if osutils.has_symlinks():
 
1678
            os.symlink('a/generic/path', 'source/lnk')
 
1679
        osutils.copy_tree('source', 'target', handlers=handlers)
 
1680
 
 
1681
        self.assertEqual([('d', 'source', 'target'),
 
1682
                          ('f', 'source/a', 'target/a'),
 
1683
                          ('d', 'source/b', 'target/b'),
 
1684
                          ('f', 'source/b/c', 'target/b/c'),
 
1685
                         ], processed_files)
 
1686
        self.assertPathDoesNotExist('target')
 
1687
        if osutils.has_symlinks():
 
1688
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
 
1689
 
 
1690
 
 
1691
class TestSetUnsetEnv(tests.TestCase):
 
1692
    """Test updating the environment"""
 
1693
 
 
1694
    def setUp(self):
 
1695
        super(TestSetUnsetEnv, self).setUp()
 
1696
 
 
1697
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'),
 
1698
                         'Environment was not cleaned up properly.'
 
1699
                         ' Variable BRZ_TEST_ENV_VAR should not exist.')
 
1700
        def cleanup():
 
1701
            if 'BRZ_TEST_ENV_VAR' in os.environ:
 
1702
                del os.environ['BRZ_TEST_ENV_VAR']
 
1703
        self.addCleanup(cleanup)
 
1704
 
 
1705
    def test_set(self):
 
1706
        """Test that we can set an env variable"""
 
1707
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1708
        self.assertEqual(None, old)
 
1709
        self.assertEqual('foo', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1710
 
 
1711
    def test_double_set(self):
 
1712
        """Test that we get the old value out"""
 
1713
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1714
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'bar')
 
1715
        self.assertEqual('foo', old)
 
1716
        self.assertEqual('bar', os.environ.get('BRZ_TEST_ENV_VAR'))
 
1717
 
 
1718
    def test_unicode(self):
 
1719
        """Environment can only contain plain strings
 
1720
 
 
1721
        So Unicode strings must be encoded.
 
1722
        """
 
1723
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
1724
        if uni_val is None:
 
1725
            raise tests.TestSkipped(
 
1726
                'Cannot find a unicode character that works in encoding %s'
 
1727
                % (osutils.get_user_encoding(),))
 
1728
 
 
1729
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', uni_val)
 
1730
        self.assertEqual(env_val, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1731
 
 
1732
    def test_unset(self):
 
1733
        """Test that passing None will remove the env var"""
 
1734
        osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', 'foo')
 
1735
        old = osutils.set_or_unset_env('BRZ_TEST_ENV_VAR', None)
 
1736
        self.assertEqual('foo', old)
 
1737
        self.assertEqual(None, os.environ.get('BRZ_TEST_ENV_VAR'))
 
1738
        self.assertFalse('BRZ_TEST_ENV_VAR' in os.environ)
 
1739
 
 
1740
 
 
1741
class TestSizeShaFile(tests.TestCaseInTempDir):
 
1742
 
 
1743
    def test_sha_empty(self):
 
1744
        self.build_tree_contents([('foo', b'')])
 
1745
        expected_sha = osutils.sha_string('')
 
1746
        f = open('foo')
 
1747
        self.addCleanup(f.close)
 
1748
        size, sha = osutils.size_sha_file(f)
 
1749
        self.assertEqual(0, size)
 
1750
        self.assertEqual(expected_sha, sha)
 
1751
 
 
1752
    def test_sha_mixed_endings(self):
 
1753
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1754
        self.build_tree_contents([('foo', text)])
 
1755
        expected_sha = osutils.sha_string(text)
 
1756
        f = open('foo', 'rb')
 
1757
        self.addCleanup(f.close)
 
1758
        size, sha = osutils.size_sha_file(f)
 
1759
        self.assertEqual(38, size)
 
1760
        self.assertEqual(expected_sha, sha)
 
1761
 
 
1762
 
 
1763
class TestShaFileByName(tests.TestCaseInTempDir):
 
1764
 
 
1765
    def test_sha_empty(self):
 
1766
        self.build_tree_contents([('foo', b'')])
 
1767
        expected_sha = osutils.sha_string('')
 
1768
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1769
 
 
1770
    def test_sha_mixed_endings(self):
 
1771
        text = b'test\r\nwith\nall\rpossible line endings\r\n'
 
1772
        self.build_tree_contents([('foo', text)])
 
1773
        expected_sha = osutils.sha_string(text)
 
1774
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1775
 
 
1776
 
 
1777
class TestResourceLoading(tests.TestCaseInTempDir):
 
1778
 
 
1779
    def test_resource_string(self):
 
1780
        # test resource in breezy
 
1781
        text = osutils.resource_string('breezy', 'debug.py')
 
1782
        self.assertContainsRe(text, "debug_flags = set()")
 
1783
        # test resource under breezy
 
1784
        text = osutils.resource_string('breezy.ui', 'text.py')
 
1785
        self.assertContainsRe(text, "class TextUIFactory")
 
1786
        # test unsupported package
 
1787
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
 
1788
            'yyy.xx')
 
1789
        # test unknown resource
 
1790
        self.assertRaises(IOError, osutils.resource_string, 'breezy', 'yyy.xx')
 
1791
 
 
1792
 
 
1793
class TestDirReader(tests.TestCaseInTempDir):
 
1794
 
 
1795
    scenarios = dir_reader_scenarios()
 
1796
 
 
1797
    # Set by load_tests
 
1798
    _dir_reader_class = None
 
1799
    _native_to_unicode = None
 
1800
 
 
1801
    def setUp(self):
 
1802
        super(TestDirReader, self).setUp()
 
1803
        self.overrideAttr(osutils,
 
1804
                          '_selected_dir_reader', self._dir_reader_class())
 
1805
 
 
1806
    def _get_ascii_tree(self):
 
1807
        tree = [
 
1808
            '0file',
 
1809
            '1dir/',
 
1810
            '1dir/0file',
 
1811
            '1dir/1dir/',
 
1812
            '2file'
 
1813
            ]
 
1814
        expected_dirblocks = [
 
1815
                (('', '.'),
 
1816
                 [('0file', '0file', 'file'),
 
1817
                  ('1dir', '1dir', 'directory'),
 
1818
                  ('2file', '2file', 'file'),
 
1819
                 ]
 
1820
                ),
 
1821
                (('1dir', './1dir'),
 
1822
                 [('1dir/0file', '0file', 'file'),
 
1823
                  ('1dir/1dir', '1dir', 'directory'),
 
1824
                 ]
 
1825
                ),
 
1826
                (('1dir/1dir', './1dir/1dir'),
 
1827
                 [
 
1828
                 ]
 
1829
                ),
 
1830
            ]
 
1831
        return tree, expected_dirblocks
 
1832
 
 
1833
    def test_walk_cur_dir(self):
 
1834
        tree, expected_dirblocks = self._get_ascii_tree()
 
1835
        self.build_tree(tree)
 
1836
        result = list(osutils._walkdirs_utf8('.'))
 
1837
        # Filter out stat and abspath
 
1838
        self.assertEqual(expected_dirblocks,
 
1839
                         [(dirinfo, [line[0:3] for line in block])
 
1840
                          for dirinfo, block in result])
 
1841
 
 
1842
    def test_walk_sub_dir(self):
 
1843
        tree, expected_dirblocks = self._get_ascii_tree()
 
1844
        self.build_tree(tree)
 
1845
        # you can search a subdir only, with a supplied prefix.
 
1846
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
 
1847
        # Filter out stat and abspath
 
1848
        self.assertEqual(expected_dirblocks[1:],
 
1849
                         [(dirinfo, [line[0:3] for line in block])
 
1850
                          for dirinfo, block in result])
 
1851
 
 
1852
    def _get_unicode_tree(self):
 
1853
        name0u = u'0file-\xb6'
 
1854
        name1u = u'1dir-\u062c\u0648'
 
1855
        name2u = u'2file-\u0633'
 
1856
        tree = [
 
1857
            name0u,
 
1858
            name1u + '/',
 
1859
            name1u + '/' + name0u,
 
1860
            name1u + '/' + name1u + '/',
 
1861
            name2u,
 
1862
            ]
 
1863
        name0 = name0u.encode('UTF-8')
 
1864
        name1 = name1u.encode('UTF-8')
 
1865
        name2 = name2u.encode('UTF-8')
 
1866
        expected_dirblocks = [
 
1867
                (('', '.'),
 
1868
                 [(name0, name0, 'file', './' + name0u),
 
1869
                  (name1, name1, 'directory', './' + name1u),
 
1870
                  (name2, name2, 'file', './' + name2u),
 
1871
                 ]
 
1872
                ),
 
1873
                ((name1, './' + name1u),
 
1874
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1875
                                                        + '/' + name0u),
 
1876
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1877
                                                            + '/' + name1u),
 
1878
                 ]
 
1879
                ),
 
1880
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1881
                 [
 
1882
                 ]
 
1883
                ),
 
1884
            ]
 
1885
        return tree, expected_dirblocks
 
1886
 
 
1887
    def _filter_out(self, raw_dirblocks):
 
1888
        """Filter out a walkdirs_utf8 result.
 
1889
 
 
1890
        stat field is removed, all native paths are converted to unicode
 
1891
        """
 
1892
        filtered_dirblocks = []
 
1893
        for dirinfo, block in raw_dirblocks:
 
1894
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
 
1895
            details = []
 
1896
            for line in block:
 
1897
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
 
1898
            filtered_dirblocks.append((dirinfo, details))
 
1899
        return filtered_dirblocks
 
1900
 
 
1901
    def test_walk_unicode_tree(self):
 
1902
        self.requireFeature(features.UnicodeFilenameFeature)
 
1903
        tree, expected_dirblocks = self._get_unicode_tree()
 
1904
        self.build_tree(tree)
 
1905
        result = list(osutils._walkdirs_utf8('.'))
 
1906
        self.assertEqual(expected_dirblocks, self._filter_out(result))
 
1907
 
 
1908
    def test_symlink(self):
 
1909
        self.requireFeature(features.SymlinkFeature)
 
1910
        self.requireFeature(features.UnicodeFilenameFeature)
 
1911
        target = u'target\N{Euro Sign}'
 
1912
        link_name = u'l\N{Euro Sign}nk'
 
1913
        os.symlink(target, link_name)
 
1914
        target_utf8 = target.encode('UTF-8')
 
1915
        link_name_utf8 = link_name.encode('UTF-8')
 
1916
        expected_dirblocks = [
 
1917
                (('', '.'),
 
1918
                 [(link_name_utf8, link_name_utf8,
 
1919
                   'symlink', './' + link_name),],
 
1920
                 )]
 
1921
        result = list(osutils._walkdirs_utf8('.'))
 
1922
        self.assertEqual(expected_dirblocks, self._filter_out(result))
 
1923
 
 
1924
 
 
1925
class TestReadLink(tests.TestCaseInTempDir):
 
1926
    """Exposes os.readlink() problems and the osutils solution.
 
1927
 
 
1928
    The only guarantee offered by os.readlink(), starting with 2.6, is that a
 
1929
    unicode string will be returned if a unicode string is passed.
 
1930
 
 
1931
    But prior python versions failed to properly encode the passed unicode
 
1932
    string.
 
1933
    """
 
1934
    _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
 
1935
 
 
1936
    def setUp(self):
 
1937
        super(tests.TestCaseInTempDir, self).setUp()
 
1938
        self.link = u'l\N{Euro Sign}ink'
 
1939
        self.target = u'targe\N{Euro Sign}t'
 
1940
        os.symlink(self.target, self.link)
 
1941
 
 
1942
    def test_os_readlink_link_encoding(self):
 
1943
        self.assertEqual(self.target,  os.readlink(self.link))
 
1944
 
 
1945
    def test_os_readlink_link_decoding(self):
 
1946
        self.assertEqual(self.target.encode(osutils._fs_enc),
 
1947
                          os.readlink(self.link.encode(osutils._fs_enc)))
 
1948
 
 
1949
 
 
1950
class TestConcurrency(tests.TestCase):
 
1951
 
 
1952
    def setUp(self):
 
1953
        super(TestConcurrency, self).setUp()
 
1954
        self.overrideAttr(osutils, '_cached_local_concurrency')
 
1955
 
 
1956
    def test_local_concurrency(self):
 
1957
        concurrency = osutils.local_concurrency()
 
1958
        self.assertIsInstance(concurrency, int)
 
1959
 
 
1960
    def test_local_concurrency_environment_variable(self):
 
1961
        self.overrideEnv('BRZ_CONCURRENCY', '2')
 
1962
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
 
1963
        self.overrideEnv('BRZ_CONCURRENCY', '3')
 
1964
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
 
1965
        self.overrideEnv('BRZ_CONCURRENCY', 'foo')
 
1966
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
 
1967
 
 
1968
    def test_option_concurrency(self):
 
1969
        self.overrideEnv('BRZ_CONCURRENCY', '1')
 
1970
        self.run_bzr('rocks --concurrency 42')
 
1971
        # Command line overrides environment variable
 
1972
        self.assertEqual('42', os.environ['BRZ_CONCURRENCY'])
 
1973
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
 
1974
 
 
1975
 
 
1976
class TestFailedToLoadExtension(tests.TestCase):
 
1977
 
 
1978
    def _try_loading(self):
 
1979
        try:
 
1980
            import breezy._fictional_extension_py
 
1981
        except ImportError as e:
 
1982
            osutils.failed_to_load_extension(e)
 
1983
            return True
 
1984
 
 
1985
    def setUp(self):
 
1986
        super(TestFailedToLoadExtension, self).setUp()
 
1987
        self.overrideAttr(osutils, '_extension_load_failures', [])
 
1988
 
 
1989
    def test_failure_to_load(self):
 
1990
        self._try_loading()
 
1991
        self.assertLength(1, osutils._extension_load_failures)
 
1992
        self.assertEqual(osutils._extension_load_failures[0],
 
1993
            "No module named _fictional_extension_py")
 
1994
 
 
1995
    def test_report_extension_load_failures_no_warning(self):
 
1996
        self.assertTrue(self._try_loading())
 
1997
        warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
 
1998
        # it used to give a Python warning; it no longer does
 
1999
        self.assertLength(0, warnings)
 
2000
 
 
2001
    def test_report_extension_load_failures_message(self):
 
2002
        log = BytesIO()
 
2003
        trace.push_log_file(log)
 
2004
        self.assertTrue(self._try_loading())
 
2005
        osutils.report_extension_load_failures()
 
2006
        self.assertContainsRe(
 
2007
            log.getvalue(),
 
2008
            r"brz: warning: some compiled extensions could not be loaded; "
 
2009
            "see ``brz help missing-extensions``\n"
 
2010
            )
 
2011
 
 
2012
 
 
2013
class TestTerminalWidth(tests.TestCase):
 
2014
 
 
2015
    def setUp(self):
 
2016
        super(TestTerminalWidth, self).setUp()
 
2017
        self._orig_terminal_size_state = osutils._terminal_size_state
 
2018
        self._orig_first_terminal_size = osutils._first_terminal_size
 
2019
        self.addCleanup(self.restore_osutils_globals)
 
2020
        osutils._terminal_size_state = 'no_data'
 
2021
        osutils._first_terminal_size = None
 
2022
 
 
2023
    def restore_osutils_globals(self):
 
2024
        osutils._terminal_size_state = self._orig_terminal_size_state
 
2025
        osutils._first_terminal_size = self._orig_first_terminal_size
 
2026
 
 
2027
    def replace_stdout(self, new):
 
2028
        self.overrideAttr(sys, 'stdout', new)
 
2029
 
 
2030
    def replace__terminal_size(self, new):
 
2031
        self.overrideAttr(osutils, '_terminal_size', new)
 
2032
 
 
2033
    def set_fake_tty(self):
 
2034
 
 
2035
        class I_am_a_tty(object):
 
2036
            def isatty(self):
 
2037
                return True
 
2038
 
 
2039
        self.replace_stdout(I_am_a_tty())
 
2040
 
 
2041
    def test_default_values(self):
 
2042
        self.assertEqual(80, osutils.default_terminal_width)
 
2043
 
 
2044
    def test_defaults_to_BRZ_COLUMNS(self):
 
2045
        # BRZ_COLUMNS is set by the test framework
 
2046
        self.assertNotEqual('12', os.environ['BRZ_COLUMNS'])
 
2047
        self.overrideEnv('BRZ_COLUMNS', '12')
 
2048
        self.assertEqual(12, osutils.terminal_width())
 
2049
 
 
2050
    def test_BRZ_COLUMNS_0_no_limit(self):
 
2051
        self.overrideEnv('BRZ_COLUMNS', '0')
 
2052
        self.assertEqual(None, osutils.terminal_width())
 
2053
 
 
2054
    def test_falls_back_to_COLUMNS(self):
 
2055
        self.overrideEnv('BRZ_COLUMNS', None)
 
2056
        self.assertNotEqual('42', os.environ['COLUMNS'])
 
2057
        self.set_fake_tty()
 
2058
        self.overrideEnv('COLUMNS', '42')
 
2059
        self.assertEqual(42, osutils.terminal_width())
 
2060
 
 
2061
    def test_tty_default_without_columns(self):
 
2062
        self.overrideEnv('BRZ_COLUMNS', None)
 
2063
        self.overrideEnv('COLUMNS', None)
 
2064
 
 
2065
        def terminal_size(w, h):
 
2066
            return 42, 42
 
2067
 
 
2068
        self.set_fake_tty()
 
2069
        # We need to override the osutils definition as it depends on the
 
2070
        # running environment that we can't control (PQM running without a
 
2071
        # controlling terminal is one example).
 
2072
        self.replace__terminal_size(terminal_size)
 
2073
        self.assertEqual(42, osutils.terminal_width())
 
2074
 
 
2075
    def test_non_tty_default_without_columns(self):
 
2076
        self.overrideEnv('BRZ_COLUMNS', None)
 
2077
        self.overrideEnv('COLUMNS', None)
 
2078
        self.replace_stdout(None)
 
2079
        self.assertEqual(None, osutils.terminal_width())
 
2080
 
 
2081
    def test_no_TIOCGWINSZ(self):
 
2082
        self.requireFeature(term_ios_feature)
 
2083
        termios = term_ios_feature.module
 
2084
        # bug 63539 is about a termios without TIOCGWINSZ attribute
 
2085
        try:
 
2086
            orig = termios.TIOCGWINSZ
 
2087
        except AttributeError:
 
2088
            # We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
 
2089
            pass
 
2090
        else:
 
2091
            self.overrideAttr(termios, 'TIOCGWINSZ')
 
2092
            del termios.TIOCGWINSZ
 
2093
        self.overrideEnv('BRZ_COLUMNS', None)
 
2094
        self.overrideEnv('COLUMNS', None)
 
2095
        # Whatever the result is, if we don't raise an exception, it's ok.
 
2096
        osutils.terminal_width()
 
2097
 
 
2098
 
 
2099
class TestCreationOps(tests.TestCaseInTempDir):
 
2100
    _test_needs_features = [features.chown_feature]
 
2101
 
 
2102
    def setUp(self):
 
2103
        super(TestCreationOps, self).setUp()
 
2104
        self.overrideAttr(os, 'chown', self._dummy_chown)
 
2105
 
 
2106
        # params set by call to _dummy_chown
 
2107
        self.path = self.uid = self.gid = None
 
2108
 
 
2109
    def _dummy_chown(self, path, uid, gid):
 
2110
        self.path, self.uid, self.gid = path, uid, gid
 
2111
 
 
2112
    def test_copy_ownership_from_path(self):
 
2113
        """copy_ownership_from_path test with specified src."""
 
2114
        ownsrc = '/'
 
2115
        f = open('test_file', 'wt')
 
2116
        osutils.copy_ownership_from_path('test_file', ownsrc)
 
2117
 
 
2118
        s = os.stat(ownsrc)
 
2119
        self.assertEqual(self.path, 'test_file')
 
2120
        self.assertEqual(self.uid, s.st_uid)
 
2121
        self.assertEqual(self.gid, s.st_gid)
 
2122
 
 
2123
    def test_copy_ownership_nonesrc(self):
 
2124
        """copy_ownership_from_path test with src=None."""
 
2125
        f = open('test_file', 'wt')
 
2126
        # should use parent dir for permissions
 
2127
        osutils.copy_ownership_from_path('test_file')
 
2128
 
 
2129
        s = os.stat('..')
 
2130
        self.assertEqual(self.path, 'test_file')
 
2131
        self.assertEqual(self.uid, s.st_uid)
 
2132
        self.assertEqual(self.gid, s.st_gid)
 
2133
 
 
2134
 
 
2135
class TestPathFromEnviron(tests.TestCase):
 
2136
 
 
2137
    def test_is_unicode(self):
 
2138
        self.overrideEnv('BRZ_TEST_PATH', './anywhere at all/')
 
2139
        path = osutils.path_from_environ('BRZ_TEST_PATH')
 
2140
        self.assertIsInstance(path, unicode)
 
2141
        self.assertEqual(u'./anywhere at all/', path)
 
2142
 
 
2143
    def test_posix_path_env_ascii(self):
 
2144
        self.overrideEnv('BRZ_TEST_PATH', '/tmp')
 
2145
        home = osutils._posix_path_from_environ('BRZ_TEST_PATH')
 
2146
        self.assertIsInstance(home, unicode)
 
2147
        self.assertEqual(u'/tmp', home)
 
2148
 
 
2149
    def test_posix_path_env_unicode(self):
 
2150
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2151
        self.overrideEnv('BRZ_TEST_PATH', '/home/\xa7test')
 
2152
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2153
        self.assertEqual(u'/home/\xa7test',
 
2154
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2155
        osutils._fs_enc = "iso8859-5"
 
2156
        self.assertEqual(u'/home/\u0407test',
 
2157
            osutils._posix_path_from_environ('BRZ_TEST_PATH'))
 
2158
        osutils._fs_enc = "utf-8"
 
2159
        self.assertRaises(errors.BadFilenameEncoding,
 
2160
            osutils._posix_path_from_environ, 'BRZ_TEST_PATH')
 
2161
 
 
2162
 
 
2163
class TestGetHomeDir(tests.TestCase):
 
2164
 
 
2165
    def test_is_unicode(self):
 
2166
        home = osutils._get_home_dir()
 
2167
        self.assertIsInstance(home, unicode)
 
2168
 
 
2169
    def test_posix_homeless(self):
 
2170
        self.overrideEnv('HOME', None)
 
2171
        home = osutils._get_home_dir()
 
2172
        self.assertIsInstance(home, unicode)
 
2173
 
 
2174
    def test_posix_home_ascii(self):
 
2175
        self.overrideEnv('HOME', '/home/test')
 
2176
        home = osutils._posix_get_home_dir()
 
2177
        self.assertIsInstance(home, unicode)
 
2178
        self.assertEqual(u'/home/test', home)
 
2179
 
 
2180
    def test_posix_home_unicode(self):
 
2181
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2182
        self.overrideEnv('HOME', '/home/\xa7test')
 
2183
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2184
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2185
        osutils._fs_enc = "iso8859-5"
 
2186
        self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
 
2187
        osutils._fs_enc = "utf-8"
 
2188
        self.assertRaises(errors.BadFilenameEncoding,
 
2189
            osutils._posix_get_home_dir)
 
2190
 
 
2191
 
 
2192
class TestGetuserUnicode(tests.TestCase):
 
2193
 
 
2194
    def test_is_unicode(self):
 
2195
        user = osutils.getuser_unicode()
 
2196
        self.assertIsInstance(user, unicode)
 
2197
 
 
2198
    def envvar_to_override(self):
 
2199
        if sys.platform == "win32":
 
2200
            # Disable use of platform calls on windows so envvar is used
 
2201
            self.overrideAttr(win32utils, 'has_ctypes', False)
 
2202
            return 'USERNAME' # only variable used on windows
 
2203
        return 'LOGNAME' # first variable checked by getpass.getuser()
 
2204
 
 
2205
    def test_ascii_user(self):
 
2206
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
 
2207
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
 
2208
 
 
2209
    def test_unicode_user(self):
 
2210
        ue = osutils.get_user_encoding()
 
2211
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
2212
        if uni_val is None:
 
2213
            raise tests.TestSkipped(
 
2214
                'Cannot find a unicode character that works in encoding %s'
 
2215
                % (osutils.get_user_encoding(),))
 
2216
        uni_username = u'jrandom' + uni_val
 
2217
        encoded_username = uni_username.encode(ue)
 
2218
        self.overrideEnv(self.envvar_to_override(), encoded_username)
 
2219
        self.assertEqual(uni_username, osutils.getuser_unicode())
 
2220
 
 
2221
 
 
2222
class TestBackupNames(tests.TestCase):
 
2223
 
 
2224
    def setUp(self):
 
2225
        super(TestBackupNames, self).setUp()
 
2226
        self.backups = []
 
2227
 
 
2228
    def backup_exists(self, name):
 
2229
        return name in self.backups
 
2230
 
 
2231
    def available_backup_name(self, name):
 
2232
        backup_name = osutils.available_backup_name(name, self.backup_exists)
 
2233
        self.backups.append(backup_name)
 
2234
        return backup_name
 
2235
 
 
2236
    def assertBackupName(self, expected, name):
 
2237
        self.assertEqual(expected, self.available_backup_name(name))
 
2238
 
 
2239
    def test_empty(self):
 
2240
        self.assertBackupName('file.~1~', 'file')
 
2241
 
 
2242
    def test_existing(self):
 
2243
        self.available_backup_name('file')
 
2244
        self.available_backup_name('file')
 
2245
        self.assertBackupName('file.~3~', 'file')
 
2246
        # Empty slots are found, this is not a strict requirement and may be
 
2247
        # revisited if we test against all implementations.
 
2248
        self.backups.remove('file.~2~')
 
2249
        self.assertBackupName('file.~2~', 'file')
 
2250
 
 
2251
 
 
2252
class TestFindExecutableInPath(tests.TestCase):
 
2253
 
 
2254
    def test_windows(self):
 
2255
        if sys.platform != 'win32':
 
2256
            raise tests.TestSkipped('test requires win32')
 
2257
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
 
2258
        self.assertTrue(
 
2259
            osutils.find_executable_on_path('explorer.exe') is not None)
 
2260
        self.assertTrue(
 
2261
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
 
2262
        self.assertTrue(
 
2263
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2264
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
 
2265
        
 
2266
    def test_windows_app_path(self):
 
2267
        if sys.platform != 'win32':
 
2268
            raise tests.TestSkipped('test requires win32')
 
2269
        # Override PATH env var so that exe can only be found on App Path
 
2270
        self.overrideEnv('PATH', '')
 
2271
        # Internt Explorer is always registered in the App Path
 
2272
        self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
 
2273
 
 
2274
    def test_other(self):
 
2275
        if sys.platform == 'win32':
 
2276
            raise tests.TestSkipped('test requires non-win32')
 
2277
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
 
2278
        self.assertTrue(
 
2279
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2280
 
 
2281
 
 
2282
class TestEnvironmentErrors(tests.TestCase):
 
2283
    """Test handling of environmental errors"""
 
2284
 
 
2285
    def test_is_oserror(self):
 
2286
        self.assertTrue(osutils.is_environment_error(
 
2287
            OSError(errno.EINVAL, "Invalid parameter")))
 
2288
 
 
2289
    def test_is_ioerror(self):
 
2290
        self.assertTrue(osutils.is_environment_error(
 
2291
            IOError(errno.EINVAL, "Invalid parameter")))
 
2292
 
 
2293
    def test_is_socket_error(self):
 
2294
        self.assertTrue(osutils.is_environment_error(
 
2295
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2296
 
 
2297
    def test_is_select_error(self):
 
2298
        self.assertTrue(osutils.is_environment_error(
 
2299
            select.error(errno.EINVAL, "Invalid parameter")))
 
2300
 
 
2301
    def test_is_pywintypes_error(self):
 
2302
        self.requireFeature(features.pywintypes)
 
2303
        import pywintypes
 
2304
        self.assertTrue(osutils.is_environment_error(
 
2305
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))