1
# Copyright (C) 2005 by Canonical Ltd
 
 
3
# This program is free software; you can redistribute it and/or modify
 
 
4
# it under the terms of the GNU General Public License as published by
 
 
5
# the Free Software Foundation; either version 2 of the License, or
 
 
6
# (at your option) any later version.
 
 
8
# This program is distributed in the hope that it will be useful,
 
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
 
11
# GNU General Public License for more details.
 
 
13
# You should have received a copy of the GNU General Public License
 
 
14
# along with this program; if not, write to the Free Software
 
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 
17
"""Tests for the osutils wrapper."""
 
 
26
from bzrlib.errors import BzrBadParameterNotUnicode, InvalidURL
 
 
27
import bzrlib.osutils as osutils
 
 
28
from bzrlib.tests import (
 
 
36
class TestOSUtils(TestCaseInTempDir):
 
 
38
    def test_fancy_rename(self):
 
 
39
        # This should work everywhere
 
 
41
            osutils.fancy_rename(a, b,
 
 
42
                    rename_func=os.rename,
 
 
43
                    unlink_func=os.unlink)
 
 
45
        open('a', 'wb').write('something in a\n')
 
 
47
        self.failIfExists('a')
 
 
48
        self.failUnlessExists('b')
 
 
49
        self.check_file_contents('b', 'something in a\n')
 
 
51
        open('a', 'wb').write('new something in a\n')
 
 
54
        self.check_file_contents('a', 'something in a\n')
 
 
56
    def test_rename(self):
 
 
57
        # Rename should be semi-atomic on all platforms
 
 
58
        open('a', 'wb').write('something in a\n')
 
 
59
        osutils.rename('a', 'b')
 
 
60
        self.failIfExists('a')
 
 
61
        self.failUnlessExists('b')
 
 
62
        self.check_file_contents('b', 'something in a\n')
 
 
64
        open('a', 'wb').write('new something in a\n')
 
 
65
        osutils.rename('b', 'a')
 
 
67
        self.check_file_contents('a', 'something in a\n')
 
 
69
    # TODO: test fancy_rename using a MemoryTransport
 
 
71
    def test_01_rand_chars_empty(self):
 
 
72
        result = osutils.rand_chars(0)
 
 
73
        self.assertEqual(result, '')
 
 
75
    def test_02_rand_chars_100(self):
 
 
76
        result = osutils.rand_chars(100)
 
 
77
        self.assertEqual(len(result), 100)
 
 
78
        self.assertEqual(type(result), str)
 
 
79
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
 
 
82
    def test_rmtree(self):
 
 
83
        # Check to remove tree with read-only files/dirs
 
 
85
        f = file('dir/file', 'w')
 
 
88
        # would like to also try making the directory readonly, but at the
 
 
89
        # moment python shutil.rmtree doesn't handle that properly - it would
 
 
90
        # need to chmod the directory before removing things inside it - deferred
 
 
91
        # for now -- mbp 20060505
 
 
92
        # osutils.make_readonly('dir')
 
 
93
        osutils.make_readonly('dir/file')
 
 
97
        self.failIfExists('dir/file')
 
 
98
        self.failIfExists('dir')
 
 
100
    def test_file_kind(self):
 
 
101
        self.build_tree(['file', 'dir/'])
 
 
102
        self.assertEquals('file', osutils.file_kind('file'))
 
 
103
        self.assertEquals('directory', osutils.file_kind('dir/'))
 
 
104
        if osutils.has_symlinks():
 
 
105
            os.symlink('symlink', 'symlink')
 
 
106
            self.assertEquals('symlink', osutils.file_kind('symlink'))
 
 
108
        # TODO: jam 20060529 Test a block device
 
 
110
            os.lstat('/dev/null')
 
 
112
            if e.errno not in (errno.ENOENT,):
 
 
115
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
 
 
117
        mkfifo = getattr(os, 'mkfifo', None)
 
 
121
                self.assertEquals('fifo', osutils.file_kind('fifo'))
 
 
125
        AF_UNIX = getattr(socket, 'AF_UNIX', None)
 
 
127
            s = socket.socket(AF_UNIX)
 
 
130
                self.assertEquals('socket', osutils.file_kind('socket'))
 
 
134
    def test_get_umask(self):
 
 
135
        if sys.platform == 'win32':
 
 
136
            # umask always returns '0', no way to set it
 
 
137
            self.assertEqual(0, osutils.get_umask())
 
 
140
        orig_umask = osutils.get_umask()
 
 
143
            self.assertEqual(0222, osutils.get_umask())
 
 
145
            self.assertEqual(0022, osutils.get_umask())
 
 
147
            self.assertEqual(0002, osutils.get_umask())
 
 
149
            self.assertEqual(0027, osutils.get_umask())
 
 
154
class TestSafeUnicode(TestCase):
 
 
156
    def test_from_ascii_string(self):
 
 
157
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
 
 
159
    def test_from_unicode_string_ascii_contents(self):
 
 
160
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
 
 
162
    def test_from_unicode_string_unicode_contents(self):
 
 
163
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
 
 
165
    def test_from_utf8_string(self):
 
 
166
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
 
 
168
    def test_bad_utf8_string(self):
 
 
169
        self.assertRaises(BzrBadParameterNotUnicode,
 
 
170
                          osutils.safe_unicode,
 
 
174
class TestWin32Funcs(TestCase):
 
 
175
    """Test that the _win32 versions of os utilities return appropriate paths."""
 
 
177
    def test_abspath(self):
 
 
178
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
 
 
179
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
 
 
181
    def test_realpath(self):
 
 
182
        self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
 
 
183
        self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
 
 
185
    def test_pathjoin(self):
 
 
186
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path', 'to', 'foo'))
 
 
187
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path\\to', 'C:\\foo'))
 
 
188
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path/to', 'C:/foo'))
 
 
189
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path/to/', 'foo'))
 
 
190
        self.assertEqual('/foo', osutils._win32_pathjoin('C:/path/to/', '/foo'))
 
 
191
        self.assertEqual('/foo', osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
 
 
193
    def test_normpath(self):
 
 
194
        self.assertEqual('path/to/foo', osutils._win32_normpath(r'path\\from\..\to\.\foo'))
 
 
195
        self.assertEqual('path/to/foo', osutils._win32_normpath('path//from/../to/./foo'))
 
 
197
    def test_getcwd(self):
 
 
198
        cwd = osutils._win32_getcwd()
 
 
199
        os_cwd = os.getcwdu()
 
 
200
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
 
 
201
        # win32 is inconsistent whether it returns lower or upper case
 
 
202
        # and even if it was consistent the user might type the other
 
 
203
        # so we force it to uppercase
 
 
204
        # running python.exe under cmd.exe return capital C:\\
 
 
205
        # running win32 python inside a cygwin shell returns lowercase
 
 
206
        self.assertEqual(os_cwd[0].upper(), cwd[0])
 
 
208
    def test_fixdrive(self):
 
 
209
        self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
 
 
210
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
 
 
211
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
 
 
214
class TestWin32FuncsDirs(TestCaseInTempDir):
 
 
215
    """Test win32 functions that create files."""
 
 
217
    def test_getcwd(self):
 
 
218
        # Make sure getcwd can handle unicode filenames
 
 
222
            raise TestSkipped("Unable to create Unicode filename")
 
 
225
        # TODO: jam 20060427 This will probably fail on Mac OSX because
 
 
226
        #       it will change the normalization of B\xe5gfors
 
 
227
        #       Consider using a different unicode character, or make
 
 
228
        #       osutils.getcwd() renormalize the path.
 
 
229
        self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
 
 
231
    def test_mkdtemp(self):
 
 
232
        tmpdir = osutils._win32_mkdtemp(dir='.')
 
 
233
        self.assertFalse('\\' in tmpdir)
 
 
235
    def test_rename(self):
 
 
243
        osutils._win32_rename('b', 'a')
 
 
244
        self.failUnlessExists('a')
 
 
245
        self.failIfExists('b')
 
 
246
        self.assertFileEqual('baz\n', 'a')
 
 
248
    def test_rename_missing_file(self):
 
 
254
            osutils._win32_rename('b', 'a')
 
 
255
        except (IOError, OSError), e:
 
 
256
            self.assertEqual(errno.ENOENT, e.errno)
 
 
257
        self.assertFileEqual('foo\n', 'a')
 
 
259
    def test_rename_missing_dir(self):
 
 
262
            osutils._win32_rename('b', 'a')
 
 
263
        except (IOError, OSError), e:
 
 
264
            self.assertEqual(errno.ENOENT, e.errno)
 
 
266
    def test_rename_current_dir(self):
 
 
269
        # You can't rename the working directory
 
 
270
        # doing rename non-existant . usually
 
 
271
        # just raises ENOENT, since non-existant
 
 
274
            osutils._win32_rename('b', '.')
 
 
275
        except (IOError, OSError), e:
 
 
276
            self.assertEqual(errno.ENOENT, e.errno)
 
 
279
class TestMacFuncsDirs(TestCaseInTempDir):
 
 
280
    """Test mac special functions that require directories."""
 
 
282
    def test_getcwd(self):
 
 
283
        # On Mac, this will actually create Ba\u030agfors
 
 
284
        # but chdir will still work, because it accepts both paths
 
 
286
            os.mkdir(u'B\xe5gfors')
 
 
288
            raise TestSkipped("Unable to create Unicode filename")
 
 
290
        os.chdir(u'B\xe5gfors')
 
 
291
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
 
293
    def test_getcwd_nonnorm(self):
 
 
294
        # Test that _mac_getcwd() will normalize this path
 
 
296
            os.mkdir(u'Ba\u030agfors')
 
 
298
            raise TestSkipped("Unable to create Unicode filename")
 
 
300
        os.chdir(u'Ba\u030agfors')
 
 
301
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
 
303
class TestSplitLines(TestCase):
 
 
305
    def test_split_unicode(self):
 
 
306
        self.assertEqual([u'foo\n', u'bar\xae'],
 
 
307
                         osutils.split_lines(u'foo\nbar\xae'))
 
 
308
        self.assertEqual([u'foo\n', u'bar\xae\n'],
 
 
309
                         osutils.split_lines(u'foo\nbar\xae\n'))
 
 
311
    def test_split_with_carriage_returns(self):
 
 
312
        self.assertEqual(['foo\rbar\n'],
 
 
313
                         osutils.split_lines('foo\rbar\n'))
 
 
316
class TestWalkDirs(TestCaseInTempDir):
 
 
318
    def test_walkdirs(self):
 
 
327
        self.build_tree(tree)
 
 
328
        expected_dirblocks = [
 
 
330
                 [('0file', '0file', 'file'),
 
 
331
                  ('1dir', '1dir', 'directory'),
 
 
332
                  ('2file', '2file', 'file'),
 
 
336
                 [('1dir/0file', '0file', 'file'),
 
 
337
                  ('1dir/1dir', '1dir', 'directory'),
 
 
340
                (('1dir/1dir', './1dir/1dir'),
 
 
347
        for dirdetail, dirblock in osutils.walkdirs('.'):
 
 
348
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
 
349
                # this tests the filtering of selected paths
 
 
352
            result.append((dirdetail, dirblock))
 
 
354
        self.assertTrue(found_bzrdir)
 
 
355
        self.assertEqual(expected_dirblocks,
 
 
356
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
 
357
        # you can search a subdir only, with a supplied prefix.
 
 
359
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
 
360
            result.append(dirblock)
 
 
361
        self.assertEqual(expected_dirblocks[1:],
 
 
362
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
 
364
    def assertPathCompare(self, path_less, path_greater):
 
 
365
        """check that path_less and path_greater compare correctly."""
 
 
366
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
 
367
            path_less, path_less))
 
 
368
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
 
369
            path_greater, path_greater))
 
 
370
        self.assertEqual(-1, osutils.compare_paths_prefix_order(
 
 
371
            path_less, path_greater))
 
 
372
        self.assertEqual(1, osutils.compare_paths_prefix_order(
 
 
373
            path_greater, path_less))
 
 
375
    def test_compare_paths_prefix_order(self):
 
 
376
        # root before all else
 
 
377
        self.assertPathCompare("/", "/a")
 
 
379
        self.assertPathCompare("/a", "/b")
 
 
380
        self.assertPathCompare("/b", "/z")
 
 
381
        # high dirs before lower.
 
 
382
        self.assertPathCompare("/z", "/a/a")
 
 
383
        # except if the deeper dir should be output first
 
 
384
        self.assertPathCompare("/a/b/c", "/d/g")
 
 
385
        # lexical betwen dirs of the same height
 
 
386
        self.assertPathCompare("/a/z", "/z/z")
 
 
387
        self.assertPathCompare("/a/c/z", "/a/d/e")
 
 
389
        # this should also be consistent for no leading / paths
 
 
390
        # root before all else
 
 
391
        self.assertPathCompare("", "a")
 
 
393
        self.assertPathCompare("a", "b")
 
 
394
        self.assertPathCompare("b", "z")
 
 
395
        # high dirs before lower.
 
 
396
        self.assertPathCompare("z", "a/a")
 
 
397
        # except if the deeper dir should be output first
 
 
398
        self.assertPathCompare("a/b/c", "d/g")
 
 
399
        # lexical betwen dirs of the same height
 
 
400
        self.assertPathCompare("a/z", "z/z")
 
 
401
        self.assertPathCompare("a/c/z", "a/d/e")
 
 
403
    def test_path_prefix_sorting(self):
 
 
404
        """Doing a sort on path prefix should match our sample data."""
 
 
435
            sorted(original_paths, key=osutils.path_prefix_key))
 
 
436
        # using the comparison routine shoudl work too:
 
 
439
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
 
 
442
class TestCopyTree(TestCaseInTempDir):
 
 
444
    def test_copy_basic_tree(self):
 
 
445
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
 
446
        osutils.copy_tree('source', 'target')
 
 
447
        self.assertEqual(['a', 'b'], os.listdir('target'))
 
 
448
        self.assertEqual(['c'], os.listdir('target/b'))
 
 
450
    def test_copy_tree_target_exists(self):
 
 
451
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
 
 
453
        osutils.copy_tree('source', 'target')
 
 
454
        self.assertEqual(['a', 'b'], os.listdir('target'))
 
 
455
        self.assertEqual(['c'], os.listdir('target/b'))
 
 
457
    def test_copy_tree_symlinks(self):
 
 
458
        if not osutils.has_symlinks():
 
 
460
        self.build_tree(['source/'])
 
 
461
        os.symlink('a/generic/path', 'source/lnk')
 
 
462
        osutils.copy_tree('source', 'target')
 
 
463
        self.assertEqual(['lnk'], os.listdir('target'))
 
 
464
        self.assertEqual('a/generic/path', os.readlink('target/lnk'))
 
 
466
    def test_copy_tree_handlers(self):
 
 
469
        def file_handler(from_path, to_path):
 
 
470
            processed_files.append(('f', from_path, to_path))
 
 
471
        def dir_handler(from_path, to_path):
 
 
472
            processed_files.append(('d', from_path, to_path))
 
 
473
        def link_handler(from_path, to_path):
 
 
474
            processed_links.append((from_path, to_path))
 
 
475
        handlers = {'file':file_handler,
 
 
476
                    'directory':dir_handler,
 
 
477
                    'symlink':link_handler,
 
 
480
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
 
481
        if osutils.has_symlinks():
 
 
482
            os.symlink('a/generic/path', 'source/lnk')
 
 
483
        osutils.copy_tree('source', 'target', handlers=handlers)
 
 
485
        self.assertEqual([('d', 'source', 'target'),
 
 
486
                          ('f', 'source/a', 'target/a'),
 
 
487
                          ('d', 'source/b', 'target/b'),
 
 
488
                          ('f', 'source/b/c', 'target/b/c'),
 
 
490
        self.failIfExists('target')
 
 
491
        if osutils.has_symlinks():
 
 
492
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
 
 
495
class TestTerminalEncoding(TestCase):
 
 
496
    """Test the auto-detection of proper terminal encoding."""
 
 
499
        self._stdout = sys.stdout
 
 
500
        self._stderr = sys.stderr
 
 
501
        self._stdin = sys.stdin
 
 
502
        self._user_encoding = bzrlib.user_encoding
 
 
504
        self.addCleanup(self._reset)
 
 
506
        sys.stdout = StringIOWrapper()
 
 
507
        sys.stdout.encoding = 'stdout_encoding'
 
 
508
        sys.stderr = StringIOWrapper()
 
 
509
        sys.stderr.encoding = 'stderr_encoding'
 
 
510
        sys.stdin = StringIOWrapper()
 
 
511
        sys.stdin.encoding = 'stdin_encoding'
 
 
512
        bzrlib.user_encoding = 'user_encoding'
 
 
515
        sys.stdout = self._stdout
 
 
516
        sys.stderr = self._stderr
 
 
517
        sys.stdin = self._stdin
 
 
518
        bzrlib.user_encoding = self._user_encoding
 
 
520
    def test_get_terminal_encoding(self):
 
 
521
        # first preference is stdout encoding
 
 
522
        self.assertEqual('stdout_encoding', osutils.get_terminal_encoding())
 
 
524
        sys.stdout.encoding = None
 
 
525
        # if sys.stdout is None, fall back to sys.stdin
 
 
526
        self.assertEqual('stdin_encoding', osutils.get_terminal_encoding())
 
 
528
        sys.stdin.encoding = None
 
 
529
        # and in the worst case, use bzrlib.user_encoding
 
 
530
        self.assertEqual('user_encoding', osutils.get_terminal_encoding())