1
 
# Copyright (C) 2005 by Canonical Ltd
 
3
 
# This program is free software; you can redistribute it and/or modify
 
4
 
# it under the terms of the GNU General Public License as published by
 
5
 
# the Free Software Foundation; either version 2 of the License, or
 
6
 
# (at your option) any later version.
 
8
 
# This program is distributed in the hope that it will be useful,
 
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
 
# GNU General Public License for more details.
 
13
 
# You should have received a copy of the GNU General Public License
 
14
 
# along with this program; if not, write to the Free Software
 
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
17
 
"""Tests for the osutils wrapper."""
 
26
 
from bzrlib.errors import BzrBadParameterNotUnicode, InvalidURL
 
27
 
import bzrlib.osutils as osutils
 
28
 
from bzrlib.tests import (
 
36
 
class TestOSUtils(TestCaseInTempDir):
 
38
 
    def test_fancy_rename(self):
 
39
 
        # This should work everywhere
 
41
 
            osutils.fancy_rename(a, b,
 
42
 
                    rename_func=os.rename,
 
43
 
                    unlink_func=os.unlink)
 
45
 
        open('a', 'wb').write('something in a\n')
 
47
 
        self.failIfExists('a')
 
48
 
        self.failUnlessExists('b')
 
49
 
        self.check_file_contents('b', 'something in a\n')
 
51
 
        open('a', 'wb').write('new something in a\n')
 
54
 
        self.check_file_contents('a', 'something in a\n')
 
56
 
    def test_rename(self):
 
57
 
        # Rename should be semi-atomic on all platforms
 
58
 
        open('a', 'wb').write('something in a\n')
 
59
 
        osutils.rename('a', 'b')
 
60
 
        self.failIfExists('a')
 
61
 
        self.failUnlessExists('b')
 
62
 
        self.check_file_contents('b', 'something in a\n')
 
64
 
        open('a', 'wb').write('new something in a\n')
 
65
 
        osutils.rename('b', 'a')
 
67
 
        self.check_file_contents('a', 'something in a\n')
 
69
 
    # TODO: test fancy_rename using a MemoryTransport
 
71
 
    def test_01_rand_chars_empty(self):
 
72
 
        result = osutils.rand_chars(0)
 
73
 
        self.assertEqual(result, '')
 
75
 
    def test_02_rand_chars_100(self):
 
76
 
        result = osutils.rand_chars(100)
 
77
 
        self.assertEqual(len(result), 100)
 
78
 
        self.assertEqual(type(result), str)
 
79
 
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
 
82
 
    def test_rmtree(self):
 
83
 
        # Check to remove tree with read-only files/dirs
 
85
 
        f = file('dir/file', 'w')
 
88
 
        # would like to also try making the directory readonly, but at the
 
89
 
        # moment python shutil.rmtree doesn't handle that properly - it would
 
90
 
        # need to chmod the directory before removing things inside it - deferred
 
91
 
        # for now -- mbp 20060505
 
92
 
        # osutils.make_readonly('dir')
 
93
 
        osutils.make_readonly('dir/file')
 
97
 
        self.failIfExists('dir/file')
 
98
 
        self.failIfExists('dir')
 
100
 
    def test_file_kind(self):
 
101
 
        self.build_tree(['file', 'dir/'])
 
102
 
        self.assertEquals('file', osutils.file_kind('file'))
 
103
 
        self.assertEquals('directory', osutils.file_kind('dir/'))
 
104
 
        if osutils.has_symlinks():
 
105
 
            os.symlink('symlink', 'symlink')
 
106
 
            self.assertEquals('symlink', osutils.file_kind('symlink'))
 
108
 
        # TODO: jam 20060529 Test a block device
 
110
 
            os.lstat('/dev/null')
 
112
 
            if e.errno not in (errno.ENOENT,):
 
115
 
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
 
117
 
        mkfifo = getattr(os, 'mkfifo', None)
 
121
 
                self.assertEquals('fifo', osutils.file_kind('fifo'))
 
125
 
        AF_UNIX = getattr(socket, 'AF_UNIX', None)
 
127
 
            s = socket.socket(AF_UNIX)
 
130
 
                self.assertEquals('socket', osutils.file_kind('socket'))
 
134
 
    def test_get_umask(self):
 
135
 
        if sys.platform == 'win32':
 
136
 
            # umask always returns '0', no way to set it
 
137
 
            self.assertEqual(0, osutils.get_umask())
 
140
 
        orig_umask = osutils.get_umask()
 
143
 
            self.assertEqual(0222, osutils.get_umask())
 
145
 
            self.assertEqual(0022, osutils.get_umask())
 
147
 
            self.assertEqual(0002, osutils.get_umask())
 
149
 
            self.assertEqual(0027, osutils.get_umask())
 
154
 
class TestSafeUnicode(TestCase):
 
156
 
    def test_from_ascii_string(self):
 
157
 
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
 
159
 
    def test_from_unicode_string_ascii_contents(self):
 
160
 
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
 
162
 
    def test_from_unicode_string_unicode_contents(self):
 
163
 
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
 
165
 
    def test_from_utf8_string(self):
 
166
 
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
 
168
 
    def test_bad_utf8_string(self):
 
169
 
        self.assertRaises(BzrBadParameterNotUnicode,
 
170
 
                          osutils.safe_unicode,
 
174
 
class TestWin32Funcs(TestCase):
 
175
 
    """Test that the _win32 versions of os utilities return appropriate paths."""
 
177
 
    def test_abspath(self):
 
178
 
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
 
179
 
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
 
181
 
    def test_realpath(self):
 
182
 
        self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
 
183
 
        self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
 
185
 
    def test_pathjoin(self):
 
186
 
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path', 'to', 'foo'))
 
187
 
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path\\to', 'C:\\foo'))
 
188
 
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path/to', 'C:/foo'))
 
189
 
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path/to/', 'foo'))
 
190
 
        self.assertEqual('/foo', osutils._win32_pathjoin('C:/path/to/', '/foo'))
 
191
 
        self.assertEqual('/foo', osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
 
193
 
    def test_normpath(self):
 
194
 
        self.assertEqual('path/to/foo', osutils._win32_normpath(r'path\\from\..\to\.\foo'))
 
195
 
        self.assertEqual('path/to/foo', osutils._win32_normpath('path//from/../to/./foo'))
 
197
 
    def test_getcwd(self):
 
198
 
        cwd = osutils._win32_getcwd()
 
199
 
        os_cwd = os.getcwdu()
 
200
 
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
 
201
 
        # win32 is inconsistent whether it returns lower or upper case
 
202
 
        # and even if it was consistent the user might type the other
 
203
 
        # so we force it to uppercase
 
204
 
        # running python.exe under cmd.exe return capital C:\\
 
205
 
        # running win32 python inside a cygwin shell returns lowercase
 
206
 
        self.assertEqual(os_cwd[0].upper(), cwd[0])
 
208
 
    def test_fixdrive(self):
 
209
 
        self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
 
210
 
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
 
211
 
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
 
214
 
class TestWin32FuncsDirs(TestCaseInTempDir):
 
215
 
    """Test win32 functions that create files."""
 
217
 
    def test_getcwd(self):
 
218
 
        # Make sure getcwd can handle unicode filenames
 
222
 
            raise TestSkipped("Unable to create Unicode filename")
 
225
 
        # TODO: jam 20060427 This will probably fail on Mac OSX because
 
226
 
        #       it will change the normalization of B\xe5gfors
 
227
 
        #       Consider using a different unicode character, or make
 
228
 
        #       osutils.getcwd() renormalize the path.
 
229
 
        self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
 
231
 
    def test_mkdtemp(self):
 
232
 
        tmpdir = osutils._win32_mkdtemp(dir='.')
 
233
 
        self.assertFalse('\\' in tmpdir)
 
235
 
    def test_rename(self):
 
243
 
        osutils._win32_rename('b', 'a')
 
244
 
        self.failUnlessExists('a')
 
245
 
        self.failIfExists('b')
 
246
 
        self.assertFileEqual('baz\n', 'a')
 
248
 
    def test_rename_missing_file(self):
 
254
 
            osutils._win32_rename('b', 'a')
 
255
 
        except (IOError, OSError), e:
 
256
 
            self.assertEqual(errno.ENOENT, e.errno)
 
257
 
        self.assertFileEqual('foo\n', 'a')
 
259
 
    def test_rename_missing_dir(self):
 
262
 
            osutils._win32_rename('b', 'a')
 
263
 
        except (IOError, OSError), e:
 
264
 
            self.assertEqual(errno.ENOENT, e.errno)
 
266
 
    def test_rename_current_dir(self):
 
269
 
        # You can't rename the working directory
 
270
 
        # doing rename non-existant . usually
 
271
 
        # just raises ENOENT, since non-existant
 
274
 
            osutils._win32_rename('b', '.')
 
275
 
        except (IOError, OSError), e:
 
276
 
            self.assertEqual(errno.ENOENT, e.errno)
 
279
 
class TestMacFuncsDirs(TestCaseInTempDir):
 
280
 
    """Test mac special functions that require directories."""
 
282
 
    def test_getcwd(self):
 
283
 
        # On Mac, this will actually create Ba\u030agfors
 
284
 
        # but chdir will still work, because it accepts both paths
 
286
 
            os.mkdir(u'B\xe5gfors')
 
288
 
            raise TestSkipped("Unable to create Unicode filename")
 
290
 
        os.chdir(u'B\xe5gfors')
 
291
 
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
293
 
    def test_getcwd_nonnorm(self):
 
294
 
        # Test that _mac_getcwd() will normalize this path
 
296
 
            os.mkdir(u'Ba\u030agfors')
 
298
 
            raise TestSkipped("Unable to create Unicode filename")
 
300
 
        os.chdir(u'Ba\u030agfors')
 
301
 
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
303
 
class TestSplitLines(TestCase):
 
305
 
    def test_split_unicode(self):
 
306
 
        self.assertEqual([u'foo\n', u'bar\xae'],
 
307
 
                         osutils.split_lines(u'foo\nbar\xae'))
 
308
 
        self.assertEqual([u'foo\n', u'bar\xae\n'],
 
309
 
                         osutils.split_lines(u'foo\nbar\xae\n'))
 
311
 
    def test_split_with_carriage_returns(self):
 
312
 
        self.assertEqual(['foo\rbar\n'],
 
313
 
                         osutils.split_lines('foo\rbar\n'))
 
316
 
class TestWalkDirs(TestCaseInTempDir):
 
318
 
    def test_walkdirs(self):
 
327
 
        self.build_tree(tree)
 
328
 
        expected_dirblocks = [
 
330
 
                 [('0file', '0file', 'file'),
 
331
 
                  ('1dir', '1dir', 'directory'),
 
332
 
                  ('2file', '2file', 'file'),
 
336
 
                 [('1dir/0file', '0file', 'file'),
 
337
 
                  ('1dir/1dir', '1dir', 'directory'),
 
340
 
                (('1dir/1dir', './1dir/1dir'),
 
347
 
        for dirdetail, dirblock in osutils.walkdirs('.'):
 
348
 
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
349
 
                # this tests the filtering of selected paths
 
352
 
            result.append((dirdetail, dirblock))
 
354
 
        self.assertTrue(found_bzrdir)
 
355
 
        self.assertEqual(expected_dirblocks,
 
356
 
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
357
 
        # you can search a subdir only, with a supplied prefix.
 
359
 
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
360
 
            result.append(dirblock)
 
361
 
        self.assertEqual(expected_dirblocks[1:],
 
362
 
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
364
 
    def assertPathCompare(self, path_less, path_greater):
 
365
 
        """check that path_less and path_greater compare correctly."""
 
366
 
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
367
 
            path_less, path_less))
 
368
 
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
369
 
            path_greater, path_greater))
 
370
 
        self.assertEqual(-1, osutils.compare_paths_prefix_order(
 
371
 
            path_less, path_greater))
 
372
 
        self.assertEqual(1, osutils.compare_paths_prefix_order(
 
373
 
            path_greater, path_less))
 
375
 
    def test_compare_paths_prefix_order(self):
 
376
 
        # root before all else
 
377
 
        self.assertPathCompare("/", "/a")
 
379
 
        self.assertPathCompare("/a", "/b")
 
380
 
        self.assertPathCompare("/b", "/z")
 
381
 
        # high dirs before lower.
 
382
 
        self.assertPathCompare("/z", "/a/a")
 
383
 
        # except if the deeper dir should be output first
 
384
 
        self.assertPathCompare("/a/b/c", "/d/g")
 
385
 
        # lexical betwen dirs of the same height
 
386
 
        self.assertPathCompare("/a/z", "/z/z")
 
387
 
        self.assertPathCompare("/a/c/z", "/a/d/e")
 
389
 
        # this should also be consistent for no leading / paths
 
390
 
        # root before all else
 
391
 
        self.assertPathCompare("", "a")
 
393
 
        self.assertPathCompare("a", "b")
 
394
 
        self.assertPathCompare("b", "z")
 
395
 
        # high dirs before lower.
 
396
 
        self.assertPathCompare("z", "a/a")
 
397
 
        # except if the deeper dir should be output first
 
398
 
        self.assertPathCompare("a/b/c", "d/g")
 
399
 
        # lexical betwen dirs of the same height
 
400
 
        self.assertPathCompare("a/z", "z/z")
 
401
 
        self.assertPathCompare("a/c/z", "a/d/e")
 
403
 
    def test_path_prefix_sorting(self):
 
404
 
        """Doing a sort on path prefix should match our sample data."""
 
435
 
            sorted(original_paths, key=osutils.path_prefix_key))
 
436
 
        # using the comparison routine shoudl work too:
 
439
 
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
 
442
 
class TestCopyTree(TestCaseInTempDir):
 
444
 
    def test_copy_basic_tree(self):
 
445
 
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
446
 
        osutils.copy_tree('source', 'target')
 
447
 
        self.assertEqual(['a', 'b'], os.listdir('target'))
 
448
 
        self.assertEqual(['c'], os.listdir('target/b'))
 
450
 
    def test_copy_tree_target_exists(self):
 
451
 
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
 
453
 
        osutils.copy_tree('source', 'target')
 
454
 
        self.assertEqual(['a', 'b'], os.listdir('target'))
 
455
 
        self.assertEqual(['c'], os.listdir('target/b'))
 
457
 
    def test_copy_tree_symlinks(self):
 
458
 
        if not osutils.has_symlinks():
 
460
 
        self.build_tree(['source/'])
 
461
 
        os.symlink('a/generic/path', 'source/lnk')
 
462
 
        osutils.copy_tree('source', 'target')
 
463
 
        self.assertEqual(['lnk'], os.listdir('target'))
 
464
 
        self.assertEqual('a/generic/path', os.readlink('target/lnk'))
 
466
 
    def test_copy_tree_handlers(self):
 
469
 
        def file_handler(from_path, to_path):
 
470
 
            processed_files.append(('f', from_path, to_path))
 
471
 
        def dir_handler(from_path, to_path):
 
472
 
            processed_files.append(('d', from_path, to_path))
 
473
 
        def link_handler(from_path, to_path):
 
474
 
            processed_links.append((from_path, to_path))
 
475
 
        handlers = {'file':file_handler,
 
476
 
                    'directory':dir_handler,
 
477
 
                    'symlink':link_handler,
 
480
 
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
481
 
        if osutils.has_symlinks():
 
482
 
            os.symlink('a/generic/path', 'source/lnk')
 
483
 
        osutils.copy_tree('source', 'target', handlers=handlers)
 
485
 
        self.assertEqual([('d', 'source', 'target'),
 
486
 
                          ('f', 'source/a', 'target/a'),
 
487
 
                          ('d', 'source/b', 'target/b'),
 
488
 
                          ('f', 'source/b/c', 'target/b/c'),
 
490
 
        self.failIfExists('target')
 
491
 
        if osutils.has_symlinks():
 
492
 
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
 
495
 
class TestTerminalEncoding(TestCase):
 
496
 
    """Test the auto-detection of proper terminal encoding."""
 
499
 
        self._stdout = sys.stdout
 
500
 
        self._stderr = sys.stderr
 
501
 
        self._stdin = sys.stdin
 
502
 
        self._user_encoding = bzrlib.user_encoding
 
504
 
        self.addCleanup(self._reset)
 
506
 
        sys.stdout = StringIOWrapper()
 
507
 
        sys.stdout.encoding = 'stdout_encoding'
 
508
 
        sys.stderr = StringIOWrapper()
 
509
 
        sys.stderr.encoding = 'stderr_encoding'
 
510
 
        sys.stdin = StringIOWrapper()
 
511
 
        sys.stdin.encoding = 'stdin_encoding'
 
512
 
        bzrlib.user_encoding = 'user_encoding'
 
515
 
        sys.stdout = self._stdout
 
516
 
        sys.stderr = self._stderr
 
517
 
        sys.stdin = self._stdin
 
518
 
        bzrlib.user_encoding = self._user_encoding
 
520
 
    def test_get_terminal_encoding(self):
 
521
 
        # first preference is stdout encoding
 
522
 
        self.assertEqual('stdout_encoding', osutils.get_terminal_encoding())
 
524
 
        sys.stdout.encoding = None
 
525
 
        # if sys.stdout is None, fall back to sys.stdin
 
526
 
        self.assertEqual('stdin_encoding', osutils.get_terminal_encoding())
 
528
 
        sys.stdin.encoding = None
 
529
 
        # and in the worst case, use bzrlib.user_encoding
 
530
 
        self.assertEqual('user_encoding', osutils.get_terminal_encoding())
 
533
 
class TestSetUnsetEnv(TestCase):
 
534
 
    """Test updating the environment"""
 
537
 
        super(TestSetUnsetEnv, self).setUp()
 
539
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
 
540
 
                         'Environment was not cleaned up properly.'
 
541
 
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
 
543
 
            if 'BZR_TEST_ENV_VAR' in os.environ:
 
544
 
                del os.environ['BZR_TEST_ENV_VAR']
 
546
 
        self.addCleanup(cleanup)
 
549
 
        """Test that we can set an env variable"""
 
550
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
551
 
        self.assertEqual(None, old)
 
552
 
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
 
554
 
    def test_double_set(self):
 
555
 
        """Test that we get the old value out"""
 
556
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
557
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
 
558
 
        self.assertEqual('foo', old)
 
559
 
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
 
561
 
    def test_unicode(self):
 
562
 
        """Environment can only contain plain strings
 
564
 
        So Unicode strings must be encoded.
 
566
 
        # Try a few different characters, to see if we can get
 
567
 
        # one that will be valid in the user_encoding
 
568
 
        possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
569
 
        for uni_val in possible_vals:
 
571
 
                env_val = uni_val.encode(bzrlib.user_encoding)
 
572
 
            except UnicodeEncodeError:
 
573
 
                # Try a different character
 
578
 
            raise TestSkipped('Cannot find a unicode character that works in'
 
579
 
                              ' encoding %s' % (bzrlib.user_encoding,))
 
581
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
 
582
 
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
 
584
 
    def test_unset(self):
 
585
 
        """Test that passing None will remove the env var"""
 
586
 
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
587
 
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
 
588
 
        self.assertEqual('foo', old)
 
589
 
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
 
590
 
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)