1
# Copyright (C) 2007-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the compiled dirstate helpers."""
34
from .test_osutils import dir_reader_scenarios
35
from .scenarios import (
36
load_tests_apply_scenarios,
44
load_tests = load_tests_apply_scenarios
47
compiled_dirstate_helpers_feature = features.ModuleAvailableFeature(
48
'breezy.bzr._dirstate_helpers_pyx')
51
# FIXME: we should also parametrize against SHA1Provider !
53
ue_scenarios = [('dirstate_Python',
54
{'update_entry': dirstate.py_update_entry})]
55
if compiled_dirstate_helpers_feature.available():
56
update_entry = compiled_dirstate_helpers_feature.module.update_entry
57
ue_scenarios.append(('dirstate_Pyrex', {'update_entry': update_entry}))
59
pe_scenarios = [('dirstate_Python',
60
{'_process_entry': dirstate.ProcessEntryPython})]
61
if compiled_dirstate_helpers_feature.available():
62
process_entry = compiled_dirstate_helpers_feature.module.ProcessEntryC
63
pe_scenarios.append(('dirstate_Pyrex', {'_process_entry': process_entry}))
65
helper_scenarios = [('dirstate_Python', {'helpers': _dirstate_helpers_py})]
66
if compiled_dirstate_helpers_feature.available():
67
helper_scenarios.append(('dirstate_Pyrex',
68
{'helpers': compiled_dirstate_helpers_feature.module}))
71
class TestBisectPathMixin(object):
72
"""Test that _bisect_path_*() returns the expected values.
74
_bisect_path_* is intended to work like bisect.bisect_*() except it
75
knows it is working on paths that are sorted by ('path', 'to', 'foo')
76
chunks rather than by raw 'path/to/foo'.
78
Test Cases should inherit from this and override ``get_bisect_path`` return
79
their implementation, and ``get_bisect`` to return the matching
80
bisect.bisect_* function.
83
def get_bisect_path(self):
84
"""Return an implementation of _bisect_path_*"""
85
raise NotImplementedError
88
"""Return a version of bisect.bisect_*.
90
Also, for the 'exists' check, return the offset to the real values.
91
For example bisect_left returns the index of an entry, while
92
bisect_right returns the index *after* an entry
94
:return: (bisect_func, offset)
96
raise NotImplementedError
98
def assertBisect(self, paths, split_paths, path, exists=True):
99
"""Assert that bisect_split works like bisect_left on the split paths.
101
:param paths: A list of path names
102
:param split_paths: A list of path names that are already split up by directory
103
('path/to/foo' => ('path', 'to', 'foo'))
104
:param path: The path we are indexing.
105
:param exists: The path should be present, so make sure the
106
final location actually points to the right value.
108
All other arguments will be passed along.
110
bisect_path = self.get_bisect_path()
111
self.assertIsInstance(paths, list)
112
bisect_path_idx = bisect_path(paths, path)
113
split_path = self.split_for_dirblocks([path])[0]
114
bisect_func, offset = self.get_bisect()
115
bisect_split_idx = bisect_func(split_paths, split_path)
116
self.assertEqual(bisect_split_idx, bisect_path_idx,
117
'%s disagreed. %s != %s'
119
% (bisect_path.__name__,
120
bisect_split_idx, bisect_path_idx, path)
123
self.assertEqual(path, paths[bisect_path_idx + offset])
125
def split_for_dirblocks(self, paths):
128
dirname, basename = os.path.split(path)
129
dir_split_paths.append((dirname.split(b'/'), basename))
130
dir_split_paths.sort()
131
return dir_split_paths
133
def test_simple(self):
134
"""In the simple case it works just like bisect_left"""
135
paths = [b'', b'a', b'b', b'c', b'd']
136
split_paths = self.split_for_dirblocks(paths)
138
self.assertBisect(paths, split_paths, path, exists=True)
139
self.assertBisect(paths, split_paths, b'_', exists=False)
140
self.assertBisect(paths, split_paths, b'aa', exists=False)
141
self.assertBisect(paths, split_paths, b'bb', exists=False)
142
self.assertBisect(paths, split_paths, b'cc', exists=False)
143
self.assertBisect(paths, split_paths, b'dd', exists=False)
144
self.assertBisect(paths, split_paths, b'a/a', exists=False)
145
self.assertBisect(paths, split_paths, b'b/b', exists=False)
146
self.assertBisect(paths, split_paths, b'c/c', exists=False)
147
self.assertBisect(paths, split_paths, b'd/d', exists=False)
149
def test_involved(self):
150
"""This is where bisect_path_* diverges slightly."""
151
# This is the list of paths and their contents
179
# This is the exact order that is stored by dirstate
180
# All children in a directory are mentioned before an children of
181
# children are mentioned.
182
# So all the root-directory paths, then all the
183
# first sub directory, etc.
184
paths = [ # content of '/'
185
b'', b'a', b'a-a', b'a-z', b'a=a', b'a=z',
187
b'a/a', b'a/a-a', b'a/a-z',
189
b'a/z', b'a/z-a', b'a/z-z',
212
split_paths = self.split_for_dirblocks(paths)
214
for dir_parts, basename in split_paths:
215
if dir_parts == [b'']:
216
sorted_paths.append(basename)
218
sorted_paths.append(b'/'.join(dir_parts + [basename]))
220
self.assertEqual(sorted_paths, paths)
223
self.assertBisect(paths, split_paths, path, exists=True)
226
class TestBisectPathLeft(tests.TestCase, TestBisectPathMixin):
227
"""Run all Bisect Path tests against _bisect_path_left."""
229
def get_bisect_path(self):
230
from breezy.bzr._dirstate_helpers_py import _bisect_path_left
231
return _bisect_path_left
233
def get_bisect(self):
234
return bisect.bisect_left, 0
237
class TestCompiledBisectPathLeft(TestBisectPathLeft):
238
"""Run all Bisect Path tests against _bisect_path_lect"""
240
_test_needs_features = [compiled_dirstate_helpers_feature]
242
def get_bisect_path(self):
243
from breezy.bzr._dirstate_helpers_pyx import _bisect_path_left
244
return _bisect_path_left
247
class TestBisectPathRight(tests.TestCase, TestBisectPathMixin):
248
"""Run all Bisect Path tests against _bisect_path_right"""
250
def get_bisect_path(self):
251
from breezy.bzr._dirstate_helpers_py import _bisect_path_right
252
return _bisect_path_right
254
def get_bisect(self):
255
return bisect.bisect_right, -1
258
class TestCompiledBisectPathRight(TestBisectPathRight):
259
"""Run all Bisect Path tests against _bisect_path_right"""
261
_test_needs_features = [compiled_dirstate_helpers_feature]
263
def get_bisect_path(self):
264
from breezy.bzr._dirstate_helpers_pyx import _bisect_path_right
265
return _bisect_path_right
268
class TestBisectDirblock(tests.TestCase):
269
"""Test that bisect_dirblock() returns the expected values.
271
bisect_dirblock is intended to work like bisect.bisect_left() except it
272
knows it is working on dirblocks and that dirblocks are sorted by ('path',
273
'to', 'foo') chunks rather than by raw 'path/to/foo'.
275
This test is parameterized by calling get_bisect_dirblock(). Child test
276
cases can override this function to test against a different
280
def get_bisect_dirblock(self):
281
"""Return an implementation of bisect_dirblock"""
282
from breezy.bzr._dirstate_helpers_py import bisect_dirblock
283
return bisect_dirblock
285
def assertBisect(self, dirblocks, split_dirblocks, path, *args, **kwargs):
286
"""Assert that bisect_split works like bisect_left on the split paths.
288
:param dirblocks: A list of (path, [info]) pairs.
289
:param split_dirblocks: A list of ((split, path), [info]) pairs.
290
:param path: The path we are indexing.
292
All other arguments will be passed along.
294
bisect_dirblock = self.get_bisect_dirblock()
295
self.assertIsInstance(dirblocks, list)
296
bisect_split_idx = bisect_dirblock(dirblocks, path, *args, **kwargs)
297
split_dirblock = (path.split(b'/'), [])
298
bisect_left_idx = bisect.bisect_left(split_dirblocks, split_dirblock,
300
self.assertEqual(bisect_left_idx, bisect_split_idx,
301
'bisect_split disagreed. %s != %s'
303
% (bisect_left_idx, bisect_split_idx, path)
306
def paths_to_dirblocks(self, paths):
307
"""Convert a list of paths into dirblock form.
309
Also, ensure that the paths are in proper sorted order.
311
dirblocks = [(path, []) for path in paths]
312
split_dirblocks = [(path.split(b'/'), []) for path in paths]
313
self.assertEqual(sorted(split_dirblocks), split_dirblocks)
314
return dirblocks, split_dirblocks
316
def test_simple(self):
317
"""In the simple case it works just like bisect_left"""
318
paths = [b'', b'a', b'b', b'c', b'd']
319
dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
321
self.assertBisect(dirblocks, split_dirblocks, path)
322
self.assertBisect(dirblocks, split_dirblocks, b'_')
323
self.assertBisect(dirblocks, split_dirblocks, b'aa')
324
self.assertBisect(dirblocks, split_dirblocks, b'bb')
325
self.assertBisect(dirblocks, split_dirblocks, b'cc')
326
self.assertBisect(dirblocks, split_dirblocks, b'dd')
327
self.assertBisect(dirblocks, split_dirblocks, b'a/a')
328
self.assertBisect(dirblocks, split_dirblocks, b'b/b')
329
self.assertBisect(dirblocks, split_dirblocks, b'c/c')
330
self.assertBisect(dirblocks, split_dirblocks, b'd/d')
332
def test_involved(self):
333
"""This is where bisect_left diverges slightly."""
335
b'a/a', b'a/a/a', b'a/a/z', b'a/a-a', b'a/a-z',
336
b'a/z', b'a/z/a', b'a/z/z', b'a/z-a', b'a/z-z',
338
b'z', b'z/a/a', b'z/a/z', b'z/a-a', b'z/a-z',
339
b'z/z', b'z/z/a', b'z/z/z', b'z/z-a', b'z/z-z',
342
dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
344
self.assertBisect(dirblocks, split_dirblocks, path)
346
def test_involved_cached(self):
347
"""This is where bisect_left diverges slightly."""
349
b'a/a', b'a/a/a', b'a/a/z', b'a/a-a', b'a/a-z',
350
b'a/z', b'a/z/a', b'a/z/z', b'a/z-a', b'a/z-z',
352
b'z', b'z/a/a', b'z/a/z', b'z/a-a', b'z/a-z',
353
b'z/z', b'z/z/a', b'z/z/z', b'z/z-a', b'z/z-z',
357
dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
359
self.assertBisect(dirblocks, split_dirblocks, path, cache=cache)
362
class TestCompiledBisectDirblock(TestBisectDirblock):
363
"""Test that bisect_dirblock() returns the expected values.
365
bisect_dirblock is intended to work like bisect.bisect_left() except it
366
knows it is working on dirblocks and that dirblocks are sorted by ('path',
367
'to', 'foo') chunks rather than by raw 'path/to/foo'.
369
This runs all the normal tests that TestBisectDirblock did, but uses the
373
_test_needs_features = [compiled_dirstate_helpers_feature]
375
def get_bisect_dirblock(self):
376
from breezy.bzr._dirstate_helpers_pyx import bisect_dirblock
377
return bisect_dirblock
380
class TestLtByDirs(tests.TestCase):
381
"""Test an implementation of lt_by_dirs()
383
lt_by_dirs() compares 2 paths by their directory sections, rather than as
386
Child test cases can override ``get_lt_by_dirs`` to test a specific
390
def get_lt_by_dirs(self):
391
"""Get a specific implementation of lt_by_dirs."""
392
from ..bzr._dirstate_helpers_py import lt_by_dirs
395
def assertCmpByDirs(self, expected, str1, str2):
396
"""Compare the two strings, in both directions.
398
:param expected: The expected comparison value. -1 means str1 comes
399
first, 0 means they are equal, 1 means str2 comes first
400
:param str1: string to compare
401
:param str2: string to compare
403
lt_by_dirs = self.get_lt_by_dirs()
405
self.assertEqual(str1, str2)
406
self.assertFalse(lt_by_dirs(str1, str2))
407
self.assertFalse(lt_by_dirs(str2, str1))
409
self.assertFalse(lt_by_dirs(str1, str2))
410
self.assertTrue(lt_by_dirs(str2, str1))
412
self.assertTrue(lt_by_dirs(str1, str2))
413
self.assertFalse(lt_by_dirs(str2, str1))
415
def test_cmp_empty(self):
416
"""Compare against the empty string."""
417
self.assertCmpByDirs(0, b'', b'')
418
self.assertCmpByDirs(1, b'a', b'')
419
self.assertCmpByDirs(1, b'ab', b'')
420
self.assertCmpByDirs(1, b'abc', b'')
421
self.assertCmpByDirs(1, b'abcd', b'')
422
self.assertCmpByDirs(1, b'abcde', b'')
423
self.assertCmpByDirs(1, b'abcdef', b'')
424
self.assertCmpByDirs(1, b'abcdefg', b'')
425
self.assertCmpByDirs(1, b'abcdefgh', b'')
426
self.assertCmpByDirs(1, b'abcdefghi', b'')
427
self.assertCmpByDirs(1, b'test/ing/a/path/', b'')
429
def test_cmp_same_str(self):
430
"""Compare the same string"""
431
self.assertCmpByDirs(0, b'a', b'a')
432
self.assertCmpByDirs(0, b'ab', b'ab')
433
self.assertCmpByDirs(0, b'abc', b'abc')
434
self.assertCmpByDirs(0, b'abcd', b'abcd')
435
self.assertCmpByDirs(0, b'abcde', b'abcde')
436
self.assertCmpByDirs(0, b'abcdef', b'abcdef')
437
self.assertCmpByDirs(0, b'abcdefg', b'abcdefg')
438
self.assertCmpByDirs(0, b'abcdefgh', b'abcdefgh')
439
self.assertCmpByDirs(0, b'abcdefghi', b'abcdefghi')
440
self.assertCmpByDirs(0, b'testing a long string',
441
b'testing a long string')
442
self.assertCmpByDirs(0, b'x' * 10000, b'x' * 10000)
443
self.assertCmpByDirs(0, b'a/b', b'a/b')
444
self.assertCmpByDirs(0, b'a/b/c', b'a/b/c')
445
self.assertCmpByDirs(0, b'a/b/c/d', b'a/b/c/d')
446
self.assertCmpByDirs(0, b'a/b/c/d/e', b'a/b/c/d/e')
448
def test_simple_paths(self):
449
"""Compare strings that act like normal string comparison"""
450
self.assertCmpByDirs(-1, b'a', b'b')
451
self.assertCmpByDirs(-1, b'aa', b'ab')
452
self.assertCmpByDirs(-1, b'ab', b'bb')
453
self.assertCmpByDirs(-1, b'aaa', b'aab')
454
self.assertCmpByDirs(-1, b'aab', b'abb')
455
self.assertCmpByDirs(-1, b'abb', b'bbb')
456
self.assertCmpByDirs(-1, b'aaaa', b'aaab')
457
self.assertCmpByDirs(-1, b'aaab', b'aabb')
458
self.assertCmpByDirs(-1, b'aabb', b'abbb')
459
self.assertCmpByDirs(-1, b'abbb', b'bbbb')
460
self.assertCmpByDirs(-1, b'aaaaa', b'aaaab')
461
self.assertCmpByDirs(-1, b'a/a', b'a/b')
462
self.assertCmpByDirs(-1, b'a/b', b'b/b')
463
self.assertCmpByDirs(-1, b'a/a/a', b'a/a/b')
464
self.assertCmpByDirs(-1, b'a/a/b', b'a/b/b')
465
self.assertCmpByDirs(-1, b'a/b/b', b'b/b/b')
466
self.assertCmpByDirs(-1, b'a/a/a/a', b'a/a/a/b')
467
self.assertCmpByDirs(-1, b'a/a/a/b', b'a/a/b/b')
468
self.assertCmpByDirs(-1, b'a/a/b/b', b'a/b/b/b')
469
self.assertCmpByDirs(-1, b'a/b/b/b', b'b/b/b/b')
470
self.assertCmpByDirs(-1, b'a/a/a/a/a', b'a/a/a/a/b')
472
def test_tricky_paths(self):
473
self.assertCmpByDirs(1, b'ab/cd/ef', b'ab/cc/ef')
474
self.assertCmpByDirs(1, b'ab/cd/ef', b'ab/c/ef')
475
self.assertCmpByDirs(-1, b'ab/cd/ef', b'ab/cd-ef')
476
self.assertCmpByDirs(-1, b'ab/cd', b'ab/cd-')
477
self.assertCmpByDirs(-1, b'ab/cd', b'ab-cd')
479
def test_cmp_unicode_not_allowed(self):
480
lt_by_dirs = self.get_lt_by_dirs()
481
self.assertRaises(TypeError, lt_by_dirs, u'Unicode', b'str')
482
self.assertRaises(TypeError, lt_by_dirs, b'str', u'Unicode')
483
self.assertRaises(TypeError, lt_by_dirs, u'Unicode', u'Unicode')
485
def test_cmp_non_ascii(self):
486
self.assertCmpByDirs(-1, b'\xc2\xb5', b'\xc3\xa5') # u'\xb5', u'\xe5'
487
self.assertCmpByDirs(-1, b'a', b'\xc3\xa5') # u'a', u'\xe5'
488
self.assertCmpByDirs(-1, b'b', b'\xc2\xb5') # u'b', u'\xb5'
489
self.assertCmpByDirs(-1, b'a/b', b'a/\xc3\xa5') # u'a/b', u'a/\xe5'
490
self.assertCmpByDirs(-1, b'b/a', b'b/\xc2\xb5') # u'b/a', u'b/\xb5'
493
class TestCompiledLtByDirs(TestLtByDirs):
494
"""Test the pyrex implementation of lt_by_dirs"""
496
_test_needs_features = [compiled_dirstate_helpers_feature]
498
def get_lt_by_dirs(self):
499
from ..bzr._dirstate_helpers_pyx import lt_by_dirs
503
class TestLtPathByDirblock(tests.TestCase):
504
"""Test an implementation of _lt_path_by_dirblock()
506
_lt_path_by_dirblock() compares two paths using the sort order used by
507
DirState. All paths in the same directory are sorted together.
509
Child test cases can override ``get_lt_path_by_dirblock`` to test a specific
513
def get_lt_path_by_dirblock(self):
514
"""Get a specific implementation of _lt_path_by_dirblock."""
515
from ..bzr._dirstate_helpers_py import _lt_path_by_dirblock
516
return _lt_path_by_dirblock
518
def assertLtPathByDirblock(self, paths):
519
"""Compare all paths and make sure they evaluate to the correct order.
521
This does N^2 comparisons. It is assumed that ``paths`` is properly
524
:param paths: a sorted list of paths to compare
526
# First, make sure the paths being passed in are correct
528
dirname, basename = os.path.split(p)
529
return dirname.split(b'/'), basename
530
self.assertEqual(sorted(paths, key=_key), paths)
532
lt_path_by_dirblock = self.get_lt_path_by_dirblock()
533
for idx1, path1 in enumerate(paths):
534
for idx2, path2 in enumerate(paths):
535
lt_result = lt_path_by_dirblock(path1, path2)
536
self.assertEqual(idx1 < idx2, lt_result,
537
'%s did not state that %r < %r, lt=%s'
538
% (lt_path_by_dirblock.__name__,
539
path1, path2, lt_result))
541
def test_cmp_simple_paths(self):
542
"""Compare against the empty string."""
543
self.assertLtPathByDirblock(
544
[b'', b'a', b'ab', b'abc', b'a/b/c', b'b/d/e'])
545
self.assertLtPathByDirblock([b'kl', b'ab/cd', b'ab/ef', b'gh/ij'])
547
def test_tricky_paths(self):
548
self.assertLtPathByDirblock([
550
b'', b'a', b'a-a', b'a=a', b'b',
552
b'a/a', b'a/a-a', b'a/a=a', b'a/b',
554
b'a/a/a', b'a/a/a-a', b'a/a/a=a',
555
# Contents of 'a/a/a'
556
b'a/a/a/a', b'a/a/a/b',
557
# Contents of 'a/a/a-a',
558
b'a/a/a-a/a', b'a/a/a-a/b',
559
# Contents of 'a/a/a=a',
560
b'a/a/a=a/a', b'a/a/a=a/b',
561
# Contents of 'a/a-a'
563
# Contents of 'a/a-a/a'
564
b'a/a-a/a/a', b'a/a-a/a/b',
565
# Contents of 'a/a=a'
576
self.assertLtPathByDirblock([
578
b'', b'a', b'a-a', b'a-z', b'a=a', b'a=z',
580
b'a/a', b'a/a-a', b'a/a-z',
582
b'a/z', b'a/z-a', b'a/z-z',
606
def test_unicode_not_allowed(self):
607
lt_path_by_dirblock = self.get_lt_path_by_dirblock()
608
self.assertRaises(TypeError, lt_path_by_dirblock, u'Uni', 'str')
609
self.assertRaises(TypeError, lt_path_by_dirblock, 'str', u'Uni')
610
self.assertRaises(TypeError, lt_path_by_dirblock, u'Uni', u'Uni')
611
self.assertRaises(TypeError, lt_path_by_dirblock, u'x/Uni', 'x/str')
612
self.assertRaises(TypeError, lt_path_by_dirblock, 'x/str', u'x/Uni')
613
self.assertRaises(TypeError, lt_path_by_dirblock, u'x/Uni', u'x/Uni')
615
def test_nonascii(self):
616
self.assertLtPathByDirblock([
618
b'', b'a', b'\xc2\xb5', b'\xc3\xa5',
620
b'a/a', b'a/\xc2\xb5', b'a/\xc3\xa5',
622
b'a/a/a', b'a/a/\xc2\xb5', b'a/a/\xc3\xa5',
623
# content of 'a/\xc2\xb5'
624
b'a/\xc2\xb5/a', b'a/\xc2\xb5/\xc2\xb5', b'a/\xc2\xb5/\xc3\xa5',
625
# content of 'a/\xc3\xa5'
626
b'a/\xc3\xa5/a', b'a/\xc3\xa5/\xc2\xb5', b'a/\xc3\xa5/\xc3\xa5',
627
# content of '\xc2\xb5'
628
b'\xc2\xb5/a', b'\xc2\xb5/\xc2\xb5', b'\xc2\xb5/\xc3\xa5',
629
# content of '\xc2\xe5'
630
b'\xc3\xa5/a', b'\xc3\xa5/\xc2\xb5', b'\xc3\xa5/\xc3\xa5',
634
class TestCompiledLtPathByDirblock(TestLtPathByDirblock):
635
"""Test the pyrex implementation of _lt_path_by_dirblock"""
637
_test_needs_features = [compiled_dirstate_helpers_feature]
639
def get_lt_path_by_dirblock(self):
640
from ..bzr._dirstate_helpers_pyx import _lt_path_by_dirblock
641
return _lt_path_by_dirblock
644
class TestMemRChr(tests.TestCase):
645
"""Test memrchr functionality"""
647
_test_needs_features = [compiled_dirstate_helpers_feature]
649
def assertMemRChr(self, expected, s, c):
650
from breezy.bzr._dirstate_helpers_pyx import _py_memrchr
651
self.assertEqual(expected, _py_memrchr(s, c))
653
def test_missing(self):
654
self.assertMemRChr(None, b'', b'a')
655
self.assertMemRChr(None, b'', b'c')
656
self.assertMemRChr(None, b'abcdefghijklm', b'q')
657
self.assertMemRChr(None, b'aaaaaaaaaaaaaaaaaaaaaaa', b'b')
659
def test_single_entry(self):
660
self.assertMemRChr(0, b'abcdefghijklm', b'a')
661
self.assertMemRChr(1, b'abcdefghijklm', b'b')
662
self.assertMemRChr(2, b'abcdefghijklm', b'c')
663
self.assertMemRChr(10, b'abcdefghijklm', b'k')
664
self.assertMemRChr(11, b'abcdefghijklm', b'l')
665
self.assertMemRChr(12, b'abcdefghijklm', b'm')
667
def test_multiple(self):
668
self.assertMemRChr(10, b'abcdefjklmabcdefghijklm', b'a')
669
self.assertMemRChr(11, b'abcdefjklmabcdefghijklm', b'b')
670
self.assertMemRChr(12, b'abcdefjklmabcdefghijklm', b'c')
671
self.assertMemRChr(20, b'abcdefjklmabcdefghijklm', b'k')
672
self.assertMemRChr(21, b'abcdefjklmabcdefghijklm', b'l')
673
self.assertMemRChr(22, b'abcdefjklmabcdefghijklm', b'm')
674
self.assertMemRChr(22, b'aaaaaaaaaaaaaaaaaaaaaaa', b'a')
676
def test_with_nulls(self):
677
self.assertMemRChr(10, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'a')
678
self.assertMemRChr(11, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'b')
679
self.assertMemRChr(12, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'c')
680
self.assertMemRChr(20, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'k')
681
self.assertMemRChr(21, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'l')
682
self.assertMemRChr(22, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'm')
683
self.assertMemRChr(22, b'aaa\0\0\0aaaaaaa\0\0\0aaaaaaa', b'a')
684
self.assertMemRChr(9, b'\0\0\0\0\0\0\0\0\0\0', b'\0')
687
class TestReadDirblocks(test_dirstate.TestCaseWithDirState):
688
"""Test an implementation of _read_dirblocks()
690
_read_dirblocks() reads in all of the dirblock information from the disk
693
Child test cases can override ``get_read_dirblocks`` to test a specific
697
# inherits scenarios from test_dirstate
699
def get_read_dirblocks(self):
700
from breezy.bzr._dirstate_helpers_py import _read_dirblocks
701
return _read_dirblocks
703
def test_smoketest(self):
704
"""Make sure that we can create and read back a simple file."""
705
tree, state, expected = self.create_basic_dirstate()
707
state._read_header_if_needed()
708
self.assertEqual(dirstate.DirState.NOT_IN_MEMORY,
709
state._dirblock_state)
710
read_dirblocks = self.get_read_dirblocks()
711
read_dirblocks(state)
712
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
713
state._dirblock_state)
715
def test_trailing_garbage(self):
716
tree, state, expected = self.create_basic_dirstate()
717
# On Unix, we can write extra data as long as we haven't read yet, but
718
# on Win32, if you've opened the file with FILE_SHARE_READ, trying to
719
# open it in append mode will fail.
721
f = open('dirstate', 'ab')
723
# Add bogus trailing garbage
728
e = self.assertRaises(dirstate.DirstateCorrupt,
729
state._read_dirblocks_if_needed)
730
# Make sure we mention the bogus characters in the error
731
self.assertContainsRe(str(e), 'bogus')
734
class TestCompiledReadDirblocks(TestReadDirblocks):
735
"""Test the pyrex implementation of _read_dirblocks"""
737
_test_needs_features = [compiled_dirstate_helpers_feature]
739
def get_read_dirblocks(self):
740
from breezy.bzr._dirstate_helpers_pyx import _read_dirblocks
741
return _read_dirblocks
744
class TestUsingCompiledIfAvailable(tests.TestCase):
745
"""Check that any compiled functions that are available are the default.
747
It is possible to have typos, etc in the import line, such that
748
_dirstate_helpers_pyx is actually available, but the compiled functions are
752
def test_bisect_dirblock(self):
753
if compiled_dirstate_helpers_feature.available():
754
from breezy.bzr._dirstate_helpers_pyx import bisect_dirblock
756
from breezy.bzr._dirstate_helpers_py import bisect_dirblock
757
self.assertIs(bisect_dirblock, dirstate.bisect_dirblock)
759
def test__bisect_path_left(self):
760
if compiled_dirstate_helpers_feature.available():
761
from breezy.bzr._dirstate_helpers_pyx import _bisect_path_left
763
from breezy.bzr._dirstate_helpers_py import _bisect_path_left
764
self.assertIs(_bisect_path_left, dirstate._bisect_path_left)
766
def test__bisect_path_right(self):
767
if compiled_dirstate_helpers_feature.available():
768
from breezy.bzr._dirstate_helpers_pyx import _bisect_path_right
770
from breezy.bzr._dirstate_helpers_py import _bisect_path_right
771
self.assertIs(_bisect_path_right, dirstate._bisect_path_right)
773
def test_lt_by_dirs(self):
774
if compiled_dirstate_helpers_feature.available():
775
from ..bzr._dirstate_helpers_pyx import lt_by_dirs
777
from ..bzr._dirstate_helpers_py import lt_by_dirs
778
self.assertIs(lt_by_dirs, dirstate.lt_by_dirs)
780
def test__read_dirblocks(self):
781
if compiled_dirstate_helpers_feature.available():
782
from breezy.bzr._dirstate_helpers_pyx import _read_dirblocks
784
from breezy.bzr._dirstate_helpers_py import _read_dirblocks
785
self.assertIs(_read_dirblocks, dirstate._read_dirblocks)
787
def test_update_entry(self):
788
if compiled_dirstate_helpers_feature.available():
789
from breezy.bzr._dirstate_helpers_pyx import update_entry
791
from breezy.bzr.dirstate import update_entry
792
self.assertIs(update_entry, dirstate.update_entry)
794
def test_process_entry(self):
795
if compiled_dirstate_helpers_feature.available():
796
from breezy.bzr._dirstate_helpers_pyx import ProcessEntryC
797
self.assertIs(ProcessEntryC, dirstate._process_entry)
799
from breezy.bzr.dirstate import ProcessEntryPython
800
self.assertIs(ProcessEntryPython, dirstate._process_entry)
803
class TestUpdateEntry(test_dirstate.TestCaseWithDirState):
804
"""Test the DirState.update_entry functions"""
806
scenarios = multiply_scenarios(
807
dir_reader_scenarios(), ue_scenarios)
813
super(TestUpdateEntry, self).setUp()
814
self.overrideAttr(dirstate, 'update_entry', self.update_entry)
816
def get_state_with_a(self):
817
"""Create a DirState tracking a single object named 'a'"""
818
state = test_dirstate.InstrumentedDirState.initialize('dirstate')
819
self.addCleanup(state.unlock)
820
state.add('a', b'a-id', 'file', None, b'')
821
entry = state._get_entry(0, path_utf8=b'a')
824
def test_observed_sha1_cachable(self):
825
state, entry = self.get_state_with_a()
827
atime = time.time() - 10
828
self.build_tree(['a'])
829
statvalue = test_dirstate._FakeStat.from_stat(os.lstat('a'))
830
statvalue.st_mtime = statvalue.st_ctime = atime
831
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
832
state._dirblock_state)
833
state._observed_sha1(entry, b"foo", statvalue)
834
self.assertEqual(b'foo', entry[1][0][1])
835
packed_stat = dirstate.pack_stat(statvalue)
836
self.assertEqual(packed_stat, entry[1][0][4])
837
self.assertEqual(dirstate.DirState.IN_MEMORY_HASH_MODIFIED,
838
state._dirblock_state)
840
def test_observed_sha1_not_cachable(self):
841
state, entry = self.get_state_with_a()
843
oldval = entry[1][0][1]
844
oldstat = entry[1][0][4]
845
self.build_tree(['a'])
846
statvalue = os.lstat('a')
847
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
848
state._dirblock_state)
849
state._observed_sha1(entry, "foo", statvalue)
850
self.assertEqual(oldval, entry[1][0][1])
851
self.assertEqual(oldstat, entry[1][0][4])
852
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
853
state._dirblock_state)
855
def test_update_entry(self):
856
state, _ = self.get_state_with_a()
857
tree = self.make_branch_and_tree('tree')
859
empty_revid = tree.commit('empty')
860
self.build_tree(['tree/a'])
861
tree.add(['a'], [b'a-id'])
862
with_a_id = tree.commit('with_a')
863
self.addCleanup(tree.unlock)
864
state.set_parent_trees(
865
[(empty_revid, tree.branch.repository.revision_tree(empty_revid))],
867
entry = state._get_entry(0, path_utf8=b'a')
868
self.build_tree(['a'])
869
# Add one where we don't provide the stat or sha already
870
self.assertEqual((b'', b'a', b'a-id'), entry[0])
871
self.assertEqual((b'f', b'', 0, False, dirstate.DirState.NULLSTAT),
873
# Flush the buffers to disk
875
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
876
state._dirblock_state)
878
stat_value = os.lstat('a')
879
packed_stat = dirstate.pack_stat(stat_value)
880
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
881
stat_value=stat_value)
882
self.assertEqual(None, link_or_sha1)
884
# The dirblock entry should not have computed or cached the file's
885
# sha1, but it did update the files' st_size. However, this is not
886
# worth writing a dirstate file for, so we leave the state UNMODIFIED
887
self.assertEqual((b'f', b'', 14, False, dirstate.DirState.NULLSTAT),
889
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
890
state._dirblock_state)
891
mode = stat_value.st_mode
892
self.assertEqual([('is_exec', mode, False)], state._log)
895
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
896
state._dirblock_state)
898
# Roll the clock back so the file is guaranteed to look too new. We
899
# should still not compute the sha1.
900
state.adjust_time(-10)
903
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
904
stat_value=stat_value)
905
self.assertEqual([('is_exec', mode, False)], state._log)
906
self.assertEqual(None, link_or_sha1)
907
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
908
state._dirblock_state)
909
self.assertEqual((b'f', b'', 14, False, dirstate.DirState.NULLSTAT),
913
# If it is cachable (the clock has moved forward) but new it still
914
# won't calculate the sha or cache it.
915
state.adjust_time(+20)
917
link_or_sha1 = dirstate.update_entry(state, entry, abspath=b'a',
918
stat_value=stat_value)
919
self.assertEqual(None, link_or_sha1)
920
self.assertEqual([('is_exec', mode, False)], state._log)
921
self.assertEqual((b'f', b'', 14, False, dirstate.DirState.NULLSTAT),
923
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
924
state._dirblock_state)
926
# If the file is no longer new, and the clock has been moved forward
927
# sufficiently, it will cache the sha.
929
state.set_parent_trees(
930
[(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
932
entry = state._get_entry(0, path_utf8=b'a')
934
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
935
stat_value=stat_value)
936
self.assertEqual(b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
938
self.assertEqual([('is_exec', mode, False), ('sha1', b'a')],
940
self.assertEqual((b'f', link_or_sha1, 14, False, packed_stat),
943
# Subsequent calls will just return the cached value
945
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
946
stat_value=stat_value)
947
self.assertEqual(b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
949
self.assertEqual([], state._log)
950
self.assertEqual((b'f', link_or_sha1, 14, False, packed_stat),
953
def test_update_entry_symlink(self):
954
"""Update entry should read symlinks."""
955
self.requireFeature(features.SymlinkFeature)
956
state, entry = self.get_state_with_a()
958
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
959
state._dirblock_state)
960
os.symlink('target', 'a')
962
state.adjust_time(-10) # Make the symlink look new
963
stat_value = os.lstat('a')
964
packed_stat = dirstate.pack_stat(stat_value)
965
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
966
stat_value=stat_value)
967
self.assertEqual(b'target', link_or_sha1)
968
self.assertEqual([('read_link', b'a', b'')], state._log)
969
# Dirblock is not updated (the link is too new)
970
self.assertEqual([(b'l', b'', 6, False, dirstate.DirState.NULLSTAT)],
972
# The file entry turned into a symlink, that is considered
973
# HASH modified worthy.
974
self.assertEqual(dirstate.DirState.IN_MEMORY_HASH_MODIFIED,
975
state._dirblock_state)
977
# Because the stat_value looks new, we should re-read the target
979
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
980
stat_value=stat_value)
981
self.assertEqual(b'target', link_or_sha1)
982
self.assertEqual([('read_link', b'a', b'')], state._log)
983
self.assertEqual([(b'l', b'', 6, False, dirstate.DirState.NULLSTAT)],
986
state.adjust_time(+20) # Skip into the future, all files look old
988
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
989
stat_value=stat_value)
990
# The symlink stayed a symlink. So while it is new enough to cache, we
991
# don't bother setting the flag, because it is not really worth saving
992
# (when we stat the symlink, we'll have paged in the target.)
993
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
994
state._dirblock_state)
995
self.assertEqual(b'target', link_or_sha1)
996
# We need to re-read the link because only now can we cache it
997
self.assertEqual([('read_link', b'a', b'')], state._log)
998
self.assertEqual([(b'l', b'target', 6, False, packed_stat)],
1002
# Another call won't re-read the link
1003
self.assertEqual([], state._log)
1004
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
1005
stat_value=stat_value)
1006
self.assertEqual(b'target', link_or_sha1)
1007
self.assertEqual([(b'l', b'target', 6, False, packed_stat)],
1010
def do_update_entry(self, state, entry, abspath):
1011
stat_value = os.lstat(abspath)
1012
return self.update_entry(state, entry, abspath, stat_value)
1014
def test_update_entry_dir(self):
1015
state, entry = self.get_state_with_a()
1016
self.build_tree(['a/'])
1017
self.assertIs(None, self.do_update_entry(state, entry, b'a'))
1019
def test_update_entry_dir_unchanged(self):
1020
state, entry = self.get_state_with_a()
1021
self.build_tree(['a/'])
1022
state.adjust_time(+20)
1023
self.assertIs(None, self.do_update_entry(state, entry, b'a'))
1024
# a/ used to be a file, but is now a directory, worth saving
1025
self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
1026
state._dirblock_state)
1028
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1029
state._dirblock_state)
1030
# No changes to a/ means not worth saving.
1031
self.assertIs(None, self.do_update_entry(state, entry, b'a'))
1032
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1033
state._dirblock_state)
1034
# Change the last-modified time for the directory
1035
t = time.time() - 100.0
1037
os.utime('a', (t, t))
1039
# It looks like Win32 + FAT doesn't allow to change times on a dir.
1040
raise tests.TestSkipped("can't update mtime of a dir on FAT")
1041
saved_packed_stat = entry[1][0][-1]
1042
self.assertIs(None, self.do_update_entry(state, entry, b'a'))
1043
# We *do* go ahead and update the information in the dirblocks, but we
1044
# don't bother setting IN_MEMORY_MODIFIED because it is trivial to
1046
self.assertNotEqual(saved_packed_stat, entry[1][0][-1])
1047
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1048
state._dirblock_state)
1050
def test_update_entry_file_unchanged(self):
1051
state, _ = self.get_state_with_a()
1052
tree = self.make_branch_and_tree('tree')
1054
self.build_tree(['tree/a'])
1055
tree.add(['a'], [b'a-id'])
1056
with_a_id = tree.commit('witha')
1057
self.addCleanup(tree.unlock)
1058
state.set_parent_trees(
1059
[(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
1061
entry = state._get_entry(0, path_utf8=b'a')
1062
self.build_tree(['a'])
1063
sha1sum = b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
1064
state.adjust_time(+20)
1065
self.assertEqual(sha1sum, self.do_update_entry(state, entry, b'a'))
1066
self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
1067
state._dirblock_state)
1069
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1070
state._dirblock_state)
1071
self.assertEqual(sha1sum, self.do_update_entry(state, entry, b'a'))
1072
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1073
state._dirblock_state)
1075
def test_update_entry_tree_reference(self):
1076
state = test_dirstate.InstrumentedDirState.initialize('dirstate')
1077
self.addCleanup(state.unlock)
1078
state.add('r', b'r-id', 'tree-reference', None, b'')
1079
self.build_tree(['r/'])
1080
entry = state._get_entry(0, path_utf8=b'r')
1081
self.do_update_entry(state, entry, 'r')
1082
entry = state._get_entry(0, path_utf8=b'r')
1083
self.assertEqual(b't', entry[1][0][0])
1085
def create_and_test_file(self, state, entry):
1086
"""Create a file at 'a' and verify the state finds it during update.
1088
The state should already be versioning *something* at 'a'. This makes
1089
sure that state.update_entry recognizes it as a file.
1091
self.build_tree(['a'])
1092
stat_value = os.lstat('a')
1093
packed_stat = dirstate.pack_stat(stat_value)
1095
link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
1096
self.assertEqual(None, link_or_sha1)
1097
self.assertEqual([(b'f', b'', 14, False, dirstate.DirState.NULLSTAT)],
1101
def create_and_test_dir(self, state, entry):
1102
"""Create a directory at 'a' and verify the state finds it.
1104
The state should already be versioning *something* at 'a'. This makes
1105
sure that state.update_entry recognizes it as a directory.
1107
self.build_tree(['a/'])
1108
stat_value = os.lstat('a')
1109
packed_stat = dirstate.pack_stat(stat_value)
1111
link_or_sha1 = self.do_update_entry(state, entry, abspath=b'a')
1112
self.assertIs(None, link_or_sha1)
1113
self.assertEqual([(b'd', b'', 0, False, packed_stat)], entry[1])
1117
# FIXME: Add unicode version
1118
def create_and_test_symlink(self, state, entry):
1119
"""Create a symlink at 'a' and verify the state finds it.
1121
The state should already be versioning *something* at 'a'. This makes
1122
sure that state.update_entry recognizes it as a symlink.
1124
This should not be called if this platform does not have symlink
1127
# caller should care about skipping test on platforms without symlinks
1128
os.symlink('path/to/foo', 'a')
1130
stat_value = os.lstat('a')
1131
packed_stat = dirstate.pack_stat(stat_value)
1133
link_or_sha1 = self.do_update_entry(state, entry, abspath=b'a')
1134
self.assertEqual(b'path/to/foo', link_or_sha1)
1135
self.assertEqual([(b'l', b'path/to/foo', 11, False, packed_stat)],
1139
def test_update_file_to_dir(self):
1140
"""If a file changes to a directory we return None for the sha.
1141
We also update the inventory record.
1143
state, entry = self.get_state_with_a()
1144
# The file sha1 won't be cached unless the file is old
1145
state.adjust_time(+10)
1146
self.create_and_test_file(state, entry)
1148
self.create_and_test_dir(state, entry)
1150
def test_update_file_to_symlink(self):
1151
"""File becomes a symlink"""
1152
self.requireFeature(features.SymlinkFeature)
1153
state, entry = self.get_state_with_a()
1154
# The file sha1 won't be cached unless the file is old
1155
state.adjust_time(+10)
1156
self.create_and_test_file(state, entry)
1158
self.create_and_test_symlink(state, entry)
1160
def test_update_dir_to_file(self):
1161
"""Directory becoming a file updates the entry."""
1162
state, entry = self.get_state_with_a()
1163
# The file sha1 won't be cached unless the file is old
1164
state.adjust_time(+10)
1165
self.create_and_test_dir(state, entry)
1167
self.create_and_test_file(state, entry)
1169
def test_update_dir_to_symlink(self):
1170
"""Directory becomes a symlink"""
1171
self.requireFeature(features.SymlinkFeature)
1172
state, entry = self.get_state_with_a()
1173
# The symlink target won't be cached if it isn't old
1174
state.adjust_time(+10)
1175
self.create_and_test_dir(state, entry)
1177
self.create_and_test_symlink(state, entry)
1179
def test_update_symlink_to_file(self):
1180
"""Symlink becomes a file"""
1181
self.requireFeature(features.SymlinkFeature)
1182
state, entry = self.get_state_with_a()
1183
# The symlink and file info won't be cached unless old
1184
state.adjust_time(+10)
1185
self.create_and_test_symlink(state, entry)
1187
self.create_and_test_file(state, entry)
1189
def test_update_symlink_to_dir(self):
1190
"""Symlink becomes a directory"""
1191
self.requireFeature(features.SymlinkFeature)
1192
state, entry = self.get_state_with_a()
1193
# The symlink target won't be cached if it isn't old
1194
state.adjust_time(+10)
1195
self.create_and_test_symlink(state, entry)
1197
self.create_and_test_dir(state, entry)
1199
def test__is_executable_win32(self):
1200
state, entry = self.get_state_with_a()
1201
self.build_tree(['a'])
1203
# Make sure we are using the version of _is_executable that doesn't
1204
# check the filesystem mode.
1205
state._use_filesystem_for_exec = False
1207
# The file on disk is not executable, but we are marking it as though
1208
# it is. With _use_filesystem_for_exec disabled we ignore what is on
1210
entry[1][0] = (b'f', b'', 0, True, dirstate.DirState.NULLSTAT)
1212
stat_value = os.lstat('a')
1213
packed_stat = dirstate.pack_stat(stat_value)
1215
state.adjust_time(-10) # Make sure everything is new
1216
self.update_entry(state, entry, abspath=b'a', stat_value=stat_value)
1218
# The row is updated, but the executable bit stays set.
1219
self.assertEqual([(b'f', b'', 14, True, dirstate.DirState.NULLSTAT)],
1222
# Make the disk object look old enough to cache (but it won't cache the
1223
# sha as it is a new file).
1224
state.adjust_time(+20)
1225
digest = b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
1226
self.update_entry(state, entry, abspath=b'a', stat_value=stat_value)
1227
self.assertEqual([(b'f', b'', 14, True, dirstate.DirState.NULLSTAT)],
1230
def _prepare_tree(self):
1232
text = b'Hello World\n'
1233
tree = self.make_branch_and_tree('tree')
1234
self.build_tree_contents([('tree/a file', text)])
1235
tree.add('a file', b'a-file-id')
1236
# Note: dirstate does not sha prior to the first commit
1237
# so commit now in order for the test to work
1238
tree.commit('first')
1241
def test_sha1provider_sha1_used(self):
1242
tree, text = self._prepare_tree()
1243
state = dirstate.DirState.from_tree(tree, 'dirstate',
1244
UppercaseSHA1Provider())
1245
self.addCleanup(state.unlock)
1246
expected_sha = osutils.sha_string(text.upper() + b"foo")
1247
entry = state._get_entry(0, path_utf8=b'a file')
1248
self.assertNotEqual((None, None), entry)
1249
state._sha_cutoff_time()
1250
state._cutoff_time += 10
1251
sha1 = self.update_entry(state, entry, 'tree/a file',
1252
os.lstat('tree/a file'))
1253
self.assertEqual(expected_sha, sha1)
1255
def test_sha1provider_stat_and_sha1_used(self):
1256
tree, text = self._prepare_tree()
1258
self.addCleanup(tree.unlock)
1259
state = tree._current_dirstate()
1260
state._sha1_provider = UppercaseSHA1Provider()
1261
# If we used the standard provider, it would look like nothing has
1263
file_ids_changed = [change.file_id for change
1264
in tree.iter_changes(tree.basis_tree())]
1265
self.assertEqual([b'a-file-id'], file_ids_changed)
1268
class UppercaseSHA1Provider(dirstate.SHA1Provider):
1269
"""A custom SHA1Provider."""
1271
def sha1(self, abspath):
1272
return self.stat_and_sha1(abspath)[1]
1274
def stat_and_sha1(self, abspath):
1275
with open(abspath, 'rb') as file_obj:
1276
statvalue = os.fstat(file_obj.fileno())
1277
text = b''.join(file_obj.readlines())
1278
sha1 = osutils.sha_string(text.upper() + b"foo")
1279
return statvalue, sha1
1282
class TestProcessEntry(test_dirstate.TestCaseWithDirState):
1284
scenarios = multiply_scenarios(dir_reader_scenarios(), pe_scenarios)
1287
_process_entry = None
1290
super(TestProcessEntry, self).setUp()
1291
self.overrideAttr(dirstate, '_process_entry', self._process_entry)
1293
def assertChangedFileIds(self, expected, tree):
1294
with tree.lock_read():
1295
file_ids = [info.file_id for info
1296
in tree.iter_changes(tree.basis_tree())]
1297
self.assertEqual(sorted(expected), sorted(file_ids))
1299
def test_exceptions_raised(self):
1300
# This is a direct test of bug #495023, it relies on osutils.is_inside
1301
# getting called in an inner function. Which makes it a bit brittle,
1302
# but at least it does reproduce the bug.
1303
tree = self.make_branch_and_tree('tree')
1304
self.build_tree(['tree/file', 'tree/dir/', 'tree/dir/sub',
1305
'tree/dir2/', 'tree/dir2/sub2'])
1306
tree.add(['file', 'dir', 'dir/sub', 'dir2', 'dir2/sub2'])
1307
tree.commit('first commit')
1309
self.addCleanup(tree.unlock)
1310
basis_tree = tree.basis_tree()
1312
def is_inside_raises(*args, **kwargs):
1313
raise RuntimeError('stop this')
1314
self.overrideAttr(dirstate, 'is_inside', is_inside_raises)
1316
from breezy.bzr import _dirstate_helpers_pyx
1320
self.overrideAttr(_dirstate_helpers_pyx,
1321
'is_inside', is_inside_raises)
1322
self.overrideAttr(osutils, 'is_inside', is_inside_raises)
1323
self.assertListRaises(RuntimeError, tree.iter_changes, basis_tree)
1325
def test_simple_changes(self):
1326
tree = self.make_branch_and_tree('tree')
1327
self.build_tree(['tree/file'])
1328
tree.add(['file'], [b'file-id'])
1329
self.assertChangedFileIds([tree.path2id(''), b'file-id'], tree)
1331
self.assertChangedFileIds([], tree)
1333
def test_sha1provider_stat_and_sha1_used(self):
1334
tree = self.make_branch_and_tree('tree')
1335
self.build_tree(['tree/file'])
1336
tree.add(['file'], [b'file-id'])
1339
self.addCleanup(tree.unlock)
1340
state = tree._current_dirstate()
1341
state._sha1_provider = UppercaseSHA1Provider()
1342
self.assertChangedFileIds([b'file-id'], tree)
1345
class TestPackStat(tests.TestCase):
1346
"""Check packed representaton of stat values is robust on all inputs"""
1348
scenarios = helper_scenarios
1350
def pack(self, statlike_tuple):
1351
return self.helpers.pack_stat(os.stat_result(statlike_tuple))
1354
def unpack_field(packed_string, stat_field):
1355
return _dirstate_helpers_py._unpack_stat(packed_string)[stat_field]
1357
def test_result(self):
1358
self.assertEqual(b"AAAQAAAAABAAAAARAAAAAgAAAAEAAIHk",
1359
self.pack((33252, 1, 2, 0, 0, 0, 4096, 15.5, 16.5, 17.5)))
1361
def test_giant_inode(self):
1362
packed = self.pack((33252, 0xF80000ABC, 0, 0, 0, 0, 0, 0, 0, 0))
1363
self.assertEqual(0x80000ABC, self.unpack_field(packed, "st_ino"))
1365
def test_giant_size(self):
1366
packed = self.pack((33252, 0, 0, 0, 0, 0, (1 << 33) + 4096, 0, 0, 0))
1367
self.assertEqual(4096, self.unpack_field(packed, "st_size"))
1369
def test_fractional_mtime(self):
1370
packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 16.9375, 0))
1371
self.assertEqual(16, self.unpack_field(packed, "st_mtime"))
1373
def test_ancient_mtime(self):
1374
packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, -11644473600.0, 0))
1375
self.assertEqual(1240428288, self.unpack_field(packed, "st_mtime"))
1377
def test_distant_mtime(self):
1378
packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 64060588800.0, 0))
1379
self.assertEqual(3931046656, self.unpack_field(packed, "st_mtime"))
1381
def test_fractional_ctime(self):
1382
packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, 17.5625))
1383
self.assertEqual(17, self.unpack_field(packed, "st_ctime"))
1385
def test_ancient_ctime(self):
1386
packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, -11644473600.0))
1387
self.assertEqual(1240428288, self.unpack_field(packed, "st_ctime"))
1389
def test_distant_ctime(self):
1390
packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, 64060588800.0))
1391
self.assertEqual(3931046656, self.unpack_field(packed, "st_ctime"))
1393
def test_negative_dev(self):
1394
packed = self.pack((33252, 0, -0xFFFFFCDE, 0, 0, 0, 0, 0, 0, 0))
1395
self.assertEqual(0x322, self.unpack_field(packed, "st_dev"))