1
# Copyright (C) 2007-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the compiled dirstate helpers."""
34
from .test_osutils import dir_reader_scenarios
35
from .scenarios import (
36
load_tests_apply_scenarios,
44
load_tests = load_tests_apply_scenarios
47
compiled_dirstate_helpers_feature = features.ModuleAvailableFeature(
48
'breezy.bzr._dirstate_helpers_pyx')
51
# FIXME: we should also parametrize against SHA1Provider !
53
ue_scenarios = [('dirstate_Python',
54
{'update_entry': dirstate.py_update_entry})]
55
if compiled_dirstate_helpers_feature.available():
56
update_entry = compiled_dirstate_helpers_feature.module.update_entry
57
ue_scenarios.append(('dirstate_Pyrex', {'update_entry': update_entry}))
59
pe_scenarios = [('dirstate_Python',
60
{'_process_entry': dirstate.ProcessEntryPython})]
61
if compiled_dirstate_helpers_feature.available():
62
process_entry = compiled_dirstate_helpers_feature.module.ProcessEntryC
63
pe_scenarios.append(('dirstate_Pyrex', {'_process_entry': process_entry}))
65
helper_scenarios = [('dirstate_Python', {'helpers': _dirstate_helpers_py})]
66
if compiled_dirstate_helpers_feature.available():
67
helper_scenarios.append(('dirstate_Pyrex',
68
{'helpers': compiled_dirstate_helpers_feature.module}))
71
class TestBisectPathMixin(object):
72
"""Test that _bisect_path_*() returns the expected values.
74
_bisect_path_* is intended to work like bisect.bisect_*() except it
75
knows it is working on paths that are sorted by ('path', 'to', 'foo')
76
chunks rather than by raw 'path/to/foo'.
78
Test Cases should inherit from this and override ``get_bisect_path`` return
79
their implementation, and ``get_bisect`` to return the matching
80
bisect.bisect_* function.
83
def get_bisect_path(self):
84
"""Return an implementation of _bisect_path_*"""
85
raise NotImplementedError
88
"""Return a version of bisect.bisect_*.
90
Also, for the 'exists' check, return the offset to the real values.
91
For example bisect_left returns the index of an entry, while
92
bisect_right returns the index *after* an entry
94
:return: (bisect_func, offset)
96
raise NotImplementedError
98
def assertBisect(self, paths, split_paths, path, exists=True):
99
"""Assert that bisect_split works like bisect_left on the split paths.
101
:param paths: A list of path names
102
:param split_paths: A list of path names that are already split up by directory
103
('path/to/foo' => ('path', 'to', 'foo'))
104
:param path: The path we are indexing.
105
:param exists: The path should be present, so make sure the
106
final location actually points to the right value.
108
All other arguments will be passed along.
110
bisect_path = self.get_bisect_path()
111
self.assertIsInstance(paths, list)
112
bisect_path_idx = bisect_path(paths, path)
113
split_path = self.split_for_dirblocks([path])[0]
114
bisect_func, offset = self.get_bisect()
115
bisect_split_idx = bisect_func(split_paths, split_path)
116
self.assertEqual(bisect_split_idx, bisect_path_idx,
117
'%s disagreed. %s != %s'
119
% (bisect_path.__name__,
120
bisect_split_idx, bisect_path_idx, path)
123
self.assertEqual(path, paths[bisect_path_idx+offset])
125
def split_for_dirblocks(self, paths):
128
dirname, basename = os.path.split(path)
129
dir_split_paths.append((dirname.split(b'/'), basename))
130
dir_split_paths.sort()
131
return dir_split_paths
133
def test_simple(self):
134
"""In the simple case it works just like bisect_left"""
135
paths = [b'', b'a', b'b', b'c', b'd']
136
split_paths = self.split_for_dirblocks(paths)
138
self.assertBisect(paths, split_paths, path, exists=True)
139
self.assertBisect(paths, split_paths, b'_', exists=False)
140
self.assertBisect(paths, split_paths, b'aa', exists=False)
141
self.assertBisect(paths, split_paths, b'bb', exists=False)
142
self.assertBisect(paths, split_paths, b'cc', exists=False)
143
self.assertBisect(paths, split_paths, b'dd', exists=False)
144
self.assertBisect(paths, split_paths, b'a/a', exists=False)
145
self.assertBisect(paths, split_paths, b'b/b', exists=False)
146
self.assertBisect(paths, split_paths, b'c/c', exists=False)
147
self.assertBisect(paths, split_paths, b'd/d', exists=False)
149
def test_involved(self):
150
"""This is where bisect_path_* diverges slightly."""
151
# This is the list of paths and their contents
179
# This is the exact order that is stored by dirstate
180
# All children in a directory are mentioned before an children of
181
# children are mentioned.
182
# So all the root-directory paths, then all the
183
# first sub directory, etc.
184
paths = [# content of '/'
185
b'', b'a', b'a-a', b'a-z', b'a=a', b'a=z',
187
b'a/a', b'a/a-a', b'a/a-z',
189
b'a/z', b'a/z-a', b'a/z-z',
212
split_paths = self.split_for_dirblocks(paths)
214
for dir_parts, basename in split_paths:
215
if dir_parts == [b'']:
216
sorted_paths.append(basename)
218
sorted_paths.append(b'/'.join(dir_parts + [basename]))
220
self.assertEqual(sorted_paths, paths)
223
self.assertBisect(paths, split_paths, path, exists=True)
226
class TestBisectPathLeft(tests.TestCase, TestBisectPathMixin):
227
"""Run all Bisect Path tests against _bisect_path_left."""
229
def get_bisect_path(self):
230
from breezy.bzr._dirstate_helpers_py import _bisect_path_left
231
return _bisect_path_left
233
def get_bisect(self):
234
return bisect.bisect_left, 0
237
class TestCompiledBisectPathLeft(TestBisectPathLeft):
238
"""Run all Bisect Path tests against _bisect_path_lect"""
240
_test_needs_features = [compiled_dirstate_helpers_feature]
242
def get_bisect_path(self):
243
from breezy.bzr._dirstate_helpers_pyx import _bisect_path_left
244
return _bisect_path_left
247
class TestBisectPathRight(tests.TestCase, TestBisectPathMixin):
248
"""Run all Bisect Path tests against _bisect_path_right"""
250
def get_bisect_path(self):
251
from breezy.bzr._dirstate_helpers_py import _bisect_path_right
252
return _bisect_path_right
254
def get_bisect(self):
255
return bisect.bisect_right, -1
258
class TestCompiledBisectPathRight(TestBisectPathRight):
259
"""Run all Bisect Path tests against _bisect_path_right"""
261
_test_needs_features = [compiled_dirstate_helpers_feature]
263
def get_bisect_path(self):
264
from breezy.bzr._dirstate_helpers_pyx import _bisect_path_right
265
return _bisect_path_right
268
class TestBisectDirblock(tests.TestCase):
269
"""Test that bisect_dirblock() returns the expected values.
271
bisect_dirblock is intended to work like bisect.bisect_left() except it
272
knows it is working on dirblocks and that dirblocks are sorted by ('path',
273
'to', 'foo') chunks rather than by raw 'path/to/foo'.
275
This test is parameterized by calling get_bisect_dirblock(). Child test
276
cases can override this function to test against a different
280
def get_bisect_dirblock(self):
281
"""Return an implementation of bisect_dirblock"""
282
from breezy.bzr._dirstate_helpers_py import bisect_dirblock
283
return bisect_dirblock
285
def assertBisect(self, dirblocks, split_dirblocks, path, *args, **kwargs):
286
"""Assert that bisect_split works like bisect_left on the split paths.
288
:param dirblocks: A list of (path, [info]) pairs.
289
:param split_dirblocks: A list of ((split, path), [info]) pairs.
290
:param path: The path we are indexing.
292
All other arguments will be passed along.
294
bisect_dirblock = self.get_bisect_dirblock()
295
self.assertIsInstance(dirblocks, list)
296
bisect_split_idx = bisect_dirblock(dirblocks, path, *args, **kwargs)
297
split_dirblock = (path.split(b'/'), [])
298
bisect_left_idx = bisect.bisect_left(split_dirblocks, split_dirblock,
300
self.assertEqual(bisect_left_idx, bisect_split_idx,
301
'bisect_split disagreed. %s != %s'
303
% (bisect_left_idx, bisect_split_idx, path)
306
def paths_to_dirblocks(self, paths):
307
"""Convert a list of paths into dirblock form.
309
Also, ensure that the paths are in proper sorted order.
311
dirblocks = [(path, []) for path in paths]
312
split_dirblocks = [(path.split(b'/'), []) for path in paths]
313
self.assertEqual(sorted(split_dirblocks), split_dirblocks)
314
return dirblocks, split_dirblocks
316
def test_simple(self):
317
"""In the simple case it works just like bisect_left"""
318
paths = [b'', b'a', b'b', b'c', b'd']
319
dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
321
self.assertBisect(dirblocks, split_dirblocks, path)
322
self.assertBisect(dirblocks, split_dirblocks, b'_')
323
self.assertBisect(dirblocks, split_dirblocks, b'aa')
324
self.assertBisect(dirblocks, split_dirblocks, b'bb')
325
self.assertBisect(dirblocks, split_dirblocks, b'cc')
326
self.assertBisect(dirblocks, split_dirblocks, b'dd')
327
self.assertBisect(dirblocks, split_dirblocks, b'a/a')
328
self.assertBisect(dirblocks, split_dirblocks, b'b/b')
329
self.assertBisect(dirblocks, split_dirblocks, b'c/c')
330
self.assertBisect(dirblocks, split_dirblocks, b'd/d')
332
def test_involved(self):
333
"""This is where bisect_left diverges slightly."""
335
b'a/a', b'a/a/a', b'a/a/z', b'a/a-a', b'a/a-z',
336
b'a/z', b'a/z/a', b'a/z/z', b'a/z-a', b'a/z-z',
338
b'z', b'z/a/a', b'z/a/z', b'z/a-a', b'z/a-z',
339
b'z/z', b'z/z/a', b'z/z/z', b'z/z-a', b'z/z-z',
342
dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
344
self.assertBisect(dirblocks, split_dirblocks, path)
346
def test_involved_cached(self):
347
"""This is where bisect_left diverges slightly."""
349
b'a/a', b'a/a/a', b'a/a/z', b'a/a-a', b'a/a-z',
350
b'a/z', b'a/z/a', b'a/z/z', b'a/z-a', b'a/z-z',
352
b'z', b'z/a/a', b'z/a/z', b'z/a-a', b'z/a-z',
353
b'z/z', b'z/z/a', b'z/z/z', b'z/z-a', b'z/z-z',
357
dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
359
self.assertBisect(dirblocks, split_dirblocks, path, cache=cache)
362
class TestCompiledBisectDirblock(TestBisectDirblock):
363
"""Test that bisect_dirblock() returns the expected values.
365
bisect_dirblock is intended to work like bisect.bisect_left() except it
366
knows it is working on dirblocks and that dirblocks are sorted by ('path',
367
'to', 'foo') chunks rather than by raw 'path/to/foo'.
369
This runs all the normal tests that TestBisectDirblock did, but uses the
373
_test_needs_features = [compiled_dirstate_helpers_feature]
375
def get_bisect_dirblock(self):
376
from breezy.bzr._dirstate_helpers_pyx import bisect_dirblock
377
return bisect_dirblock
380
class TestLtByDirs(tests.TestCase):
381
"""Test an implementation of lt_by_dirs()
383
lt_by_dirs() compares 2 paths by their directory sections, rather than as
386
Child test cases can override ``get_lt_by_dirs`` to test a specific
390
def get_lt_by_dirs(self):
391
"""Get a specific implementation of lt_by_dirs."""
392
from ..bzr._dirstate_helpers_py import lt_by_dirs
395
def assertCmpByDirs(self, expected, str1, str2):
396
"""Compare the two strings, in both directions.
398
:param expected: The expected comparison value. -1 means str1 comes
399
first, 0 means they are equal, 1 means str2 comes first
400
:param str1: string to compare
401
:param str2: string to compare
403
lt_by_dirs = self.get_lt_by_dirs()
405
self.assertEqual(str1, str2)
406
self.assertFalse(lt_by_dirs(str1, str2))
407
self.assertFalse(lt_by_dirs(str2, str1))
409
self.assertFalse(lt_by_dirs(str1, str2))
410
self.assertTrue(lt_by_dirs(str2, str1))
412
self.assertTrue(lt_by_dirs(str1, str2))
413
self.assertFalse(lt_by_dirs(str2, str1))
415
def test_cmp_empty(self):
416
"""Compare against the empty string."""
417
self.assertCmpByDirs(0, b'', b'')
418
self.assertCmpByDirs(1, b'a', b'')
419
self.assertCmpByDirs(1, b'ab', b'')
420
self.assertCmpByDirs(1, b'abc', b'')
421
self.assertCmpByDirs(1, b'abcd', b'')
422
self.assertCmpByDirs(1, b'abcde', b'')
423
self.assertCmpByDirs(1, b'abcdef', b'')
424
self.assertCmpByDirs(1, b'abcdefg', b'')
425
self.assertCmpByDirs(1, b'abcdefgh', b'')
426
self.assertCmpByDirs(1, b'abcdefghi', b'')
427
self.assertCmpByDirs(1, b'test/ing/a/path/', b'')
429
def test_cmp_same_str(self):
430
"""Compare the same string"""
431
self.assertCmpByDirs(0, b'a', b'a')
432
self.assertCmpByDirs(0, b'ab', b'ab')
433
self.assertCmpByDirs(0, b'abc', b'abc')
434
self.assertCmpByDirs(0, b'abcd', b'abcd')
435
self.assertCmpByDirs(0, b'abcde', b'abcde')
436
self.assertCmpByDirs(0, b'abcdef', b'abcdef')
437
self.assertCmpByDirs(0, b'abcdefg', b'abcdefg')
438
self.assertCmpByDirs(0, b'abcdefgh', b'abcdefgh')
439
self.assertCmpByDirs(0, b'abcdefghi', b'abcdefghi')
440
self.assertCmpByDirs(0, b'testing a long string', b'testing a long string')
441
self.assertCmpByDirs(0, b'x'*10000, b'x'*10000)
442
self.assertCmpByDirs(0, b'a/b', b'a/b')
443
self.assertCmpByDirs(0, b'a/b/c', b'a/b/c')
444
self.assertCmpByDirs(0, b'a/b/c/d', b'a/b/c/d')
445
self.assertCmpByDirs(0, b'a/b/c/d/e', b'a/b/c/d/e')
447
def test_simple_paths(self):
448
"""Compare strings that act like normal string comparison"""
449
self.assertCmpByDirs(-1, b'a', b'b')
450
self.assertCmpByDirs(-1, b'aa', b'ab')
451
self.assertCmpByDirs(-1, b'ab', b'bb')
452
self.assertCmpByDirs(-1, b'aaa', b'aab')
453
self.assertCmpByDirs(-1, b'aab', b'abb')
454
self.assertCmpByDirs(-1, b'abb', b'bbb')
455
self.assertCmpByDirs(-1, b'aaaa', b'aaab')
456
self.assertCmpByDirs(-1, b'aaab', b'aabb')
457
self.assertCmpByDirs(-1, b'aabb', b'abbb')
458
self.assertCmpByDirs(-1, b'abbb', b'bbbb')
459
self.assertCmpByDirs(-1, b'aaaaa', b'aaaab')
460
self.assertCmpByDirs(-1, b'a/a', b'a/b')
461
self.assertCmpByDirs(-1, b'a/b', b'b/b')
462
self.assertCmpByDirs(-1, b'a/a/a', b'a/a/b')
463
self.assertCmpByDirs(-1, b'a/a/b', b'a/b/b')
464
self.assertCmpByDirs(-1, b'a/b/b', b'b/b/b')
465
self.assertCmpByDirs(-1, b'a/a/a/a', b'a/a/a/b')
466
self.assertCmpByDirs(-1, b'a/a/a/b', b'a/a/b/b')
467
self.assertCmpByDirs(-1, b'a/a/b/b', b'a/b/b/b')
468
self.assertCmpByDirs(-1, b'a/b/b/b', b'b/b/b/b')
469
self.assertCmpByDirs(-1, b'a/a/a/a/a', b'a/a/a/a/b')
471
def test_tricky_paths(self):
472
self.assertCmpByDirs(1, b'ab/cd/ef', b'ab/cc/ef')
473
self.assertCmpByDirs(1, b'ab/cd/ef', b'ab/c/ef')
474
self.assertCmpByDirs(-1, b'ab/cd/ef', b'ab/cd-ef')
475
self.assertCmpByDirs(-1, b'ab/cd', b'ab/cd-')
476
self.assertCmpByDirs(-1, b'ab/cd', b'ab-cd')
478
def test_cmp_unicode_not_allowed(self):
479
lt_by_dirs = self.get_lt_by_dirs()
480
self.assertRaises(TypeError, lt_by_dirs, u'Unicode', b'str')
481
self.assertRaises(TypeError, lt_by_dirs, b'str', u'Unicode')
482
self.assertRaises(TypeError, lt_by_dirs, u'Unicode', u'Unicode')
484
def test_cmp_non_ascii(self):
485
self.assertCmpByDirs(-1, b'\xc2\xb5', b'\xc3\xa5') # u'\xb5', u'\xe5'
486
self.assertCmpByDirs(-1, b'a', b'\xc3\xa5') # u'a', u'\xe5'
487
self.assertCmpByDirs(-1, b'b', b'\xc2\xb5') # u'b', u'\xb5'
488
self.assertCmpByDirs(-1, b'a/b', b'a/\xc3\xa5') # u'a/b', u'a/\xe5'
489
self.assertCmpByDirs(-1, b'b/a', b'b/\xc2\xb5') # u'b/a', u'b/\xb5'
492
class TestCompiledLtByDirs(TestLtByDirs):
493
"""Test the pyrex implementation of lt_by_dirs"""
495
_test_needs_features = [compiled_dirstate_helpers_feature]
497
def get_lt_by_dirs(self):
498
from ..bzr._dirstate_helpers_pyx import lt_by_dirs
502
class TestLtPathByDirblock(tests.TestCase):
503
"""Test an implementation of _lt_path_by_dirblock()
505
_lt_path_by_dirblock() compares two paths using the sort order used by
506
DirState. All paths in the same directory are sorted together.
508
Child test cases can override ``get_lt_path_by_dirblock`` to test a specific
512
def get_lt_path_by_dirblock(self):
513
"""Get a specific implementation of _lt_path_by_dirblock."""
514
from ..bzr._dirstate_helpers_py import _lt_path_by_dirblock
515
return _lt_path_by_dirblock
517
def assertLtPathByDirblock(self, paths):
518
"""Compare all paths and make sure they evaluate to the correct order.
520
This does N^2 comparisons. It is assumed that ``paths`` is properly
523
:param paths: a sorted list of paths to compare
525
# First, make sure the paths being passed in are correct
527
dirname, basename = os.path.split(p)
528
return dirname.split(b'/'), basename
529
self.assertEqual(sorted(paths, key=_key), paths)
531
lt_path_by_dirblock = self.get_lt_path_by_dirblock()
532
for idx1, path1 in enumerate(paths):
533
for idx2, path2 in enumerate(paths):
534
lt_result = lt_path_by_dirblock(path1, path2)
535
self.assertEqual(idx1 < idx2, lt_result,
536
'%s did not state that %r < %r, lt=%s'
537
% (lt_path_by_dirblock.__name__,
538
path1, path2, lt_result))
540
def test_cmp_simple_paths(self):
541
"""Compare against the empty string."""
542
self.assertLtPathByDirblock([b'', b'a', b'ab', b'abc', b'a/b/c', b'b/d/e'])
543
self.assertLtPathByDirblock([b'kl', b'ab/cd', b'ab/ef', b'gh/ij'])
545
def test_tricky_paths(self):
546
self.assertLtPathByDirblock([
548
b'', b'a', b'a-a', b'a=a', b'b',
550
b'a/a', b'a/a-a', b'a/a=a', b'a/b',
552
b'a/a/a', b'a/a/a-a', b'a/a/a=a',
553
# Contents of 'a/a/a'
554
b'a/a/a/a', b'a/a/a/b',
555
# Contents of 'a/a/a-a',
556
b'a/a/a-a/a', b'a/a/a-a/b',
557
# Contents of 'a/a/a=a',
558
b'a/a/a=a/a', b'a/a/a=a/b',
559
# Contents of 'a/a-a'
561
# Contents of 'a/a-a/a'
562
b'a/a-a/a/a', b'a/a-a/a/b',
563
# Contents of 'a/a=a'
574
self.assertLtPathByDirblock([
576
b'', b'a', b'a-a', b'a-z', b'a=a', b'a=z',
578
b'a/a', b'a/a-a', b'a/a-z',
580
b'a/z', b'a/z-a', b'a/z-z',
604
def test_unicode_not_allowed(self):
605
lt_path_by_dirblock = self.get_lt_path_by_dirblock()
606
self.assertRaises(TypeError, lt_path_by_dirblock, u'Uni', 'str')
607
self.assertRaises(TypeError, lt_path_by_dirblock, 'str', u'Uni')
608
self.assertRaises(TypeError, lt_path_by_dirblock, u'Uni', u'Uni')
609
self.assertRaises(TypeError, lt_path_by_dirblock, u'x/Uni', 'x/str')
610
self.assertRaises(TypeError, lt_path_by_dirblock, 'x/str', u'x/Uni')
611
self.assertRaises(TypeError, lt_path_by_dirblock, u'x/Uni', u'x/Uni')
613
def test_nonascii(self):
614
self.assertLtPathByDirblock([
616
b'', b'a', b'\xc2\xb5', b'\xc3\xa5',
618
b'a/a', b'a/\xc2\xb5', b'a/\xc3\xa5',
620
b'a/a/a', b'a/a/\xc2\xb5', b'a/a/\xc3\xa5',
621
# content of 'a/\xc2\xb5'
622
b'a/\xc2\xb5/a', b'a/\xc2\xb5/\xc2\xb5', b'a/\xc2\xb5/\xc3\xa5',
623
# content of 'a/\xc3\xa5'
624
b'a/\xc3\xa5/a', b'a/\xc3\xa5/\xc2\xb5', b'a/\xc3\xa5/\xc3\xa5',
625
# content of '\xc2\xb5'
626
b'\xc2\xb5/a', b'\xc2\xb5/\xc2\xb5', b'\xc2\xb5/\xc3\xa5',
627
# content of '\xc2\xe5'
628
b'\xc3\xa5/a', b'\xc3\xa5/\xc2\xb5', b'\xc3\xa5/\xc3\xa5',
632
class TestCompiledLtPathByDirblock(TestLtPathByDirblock):
633
"""Test the pyrex implementation of _lt_path_by_dirblock"""
635
_test_needs_features = [compiled_dirstate_helpers_feature]
637
def get_lt_path_by_dirblock(self):
638
from ..bzr._dirstate_helpers_pyx import _lt_path_by_dirblock
639
return _lt_path_by_dirblock
642
class TestMemRChr(tests.TestCase):
643
"""Test memrchr functionality"""
645
_test_needs_features = [compiled_dirstate_helpers_feature]
647
def assertMemRChr(self, expected, s, c):
648
from breezy.bzr._dirstate_helpers_pyx import _py_memrchr
649
self.assertEqual(expected, _py_memrchr(s, c))
651
def test_missing(self):
652
self.assertMemRChr(None, b'', b'a')
653
self.assertMemRChr(None, b'', b'c')
654
self.assertMemRChr(None, b'abcdefghijklm', b'q')
655
self.assertMemRChr(None, b'aaaaaaaaaaaaaaaaaaaaaaa', b'b')
657
def test_single_entry(self):
658
self.assertMemRChr(0, b'abcdefghijklm', b'a')
659
self.assertMemRChr(1, b'abcdefghijklm', b'b')
660
self.assertMemRChr(2, b'abcdefghijklm', b'c')
661
self.assertMemRChr(10, b'abcdefghijklm', b'k')
662
self.assertMemRChr(11, b'abcdefghijklm', b'l')
663
self.assertMemRChr(12, b'abcdefghijklm', b'm')
665
def test_multiple(self):
666
self.assertMemRChr(10, b'abcdefjklmabcdefghijklm', b'a')
667
self.assertMemRChr(11, b'abcdefjklmabcdefghijklm', b'b')
668
self.assertMemRChr(12, b'abcdefjklmabcdefghijklm', b'c')
669
self.assertMemRChr(20, b'abcdefjklmabcdefghijklm', b'k')
670
self.assertMemRChr(21, b'abcdefjklmabcdefghijklm', b'l')
671
self.assertMemRChr(22, b'abcdefjklmabcdefghijklm', b'm')
672
self.assertMemRChr(22, b'aaaaaaaaaaaaaaaaaaaaaaa', b'a')
674
def test_with_nulls(self):
675
self.assertMemRChr(10, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'a')
676
self.assertMemRChr(11, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'b')
677
self.assertMemRChr(12, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'c')
678
self.assertMemRChr(20, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'k')
679
self.assertMemRChr(21, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'l')
680
self.assertMemRChr(22, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'm')
681
self.assertMemRChr(22, b'aaa\0\0\0aaaaaaa\0\0\0aaaaaaa', b'a')
682
self.assertMemRChr(9, b'\0\0\0\0\0\0\0\0\0\0', b'\0')
685
class TestReadDirblocks(test_dirstate.TestCaseWithDirState):
686
"""Test an implementation of _read_dirblocks()
688
_read_dirblocks() reads in all of the dirblock information from the disk
691
Child test cases can override ``get_read_dirblocks`` to test a specific
695
# inherits scenarios from test_dirstate
697
def get_read_dirblocks(self):
698
from breezy.bzr._dirstate_helpers_py import _read_dirblocks
699
return _read_dirblocks
701
def test_smoketest(self):
702
"""Make sure that we can create and read back a simple file."""
703
tree, state, expected = self.create_basic_dirstate()
705
state._read_header_if_needed()
706
self.assertEqual(dirstate.DirState.NOT_IN_MEMORY,
707
state._dirblock_state)
708
read_dirblocks = self.get_read_dirblocks()
709
read_dirblocks(state)
710
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
711
state._dirblock_state)
713
def test_trailing_garbage(self):
714
tree, state, expected = self.create_basic_dirstate()
715
# On Unix, we can write extra data as long as we haven't read yet, but
716
# on Win32, if you've opened the file with FILE_SHARE_READ, trying to
717
# open it in append mode will fail.
719
f = open('dirstate', 'ab')
721
# Add bogus trailing garbage
726
e = self.assertRaises(dirstate.DirstateCorrupt,
727
state._read_dirblocks_if_needed)
728
# Make sure we mention the bogus characters in the error
729
self.assertContainsRe(str(e), 'bogus')
732
class TestCompiledReadDirblocks(TestReadDirblocks):
733
"""Test the pyrex implementation of _read_dirblocks"""
735
_test_needs_features = [compiled_dirstate_helpers_feature]
737
def get_read_dirblocks(self):
738
from breezy.bzr._dirstate_helpers_pyx import _read_dirblocks
739
return _read_dirblocks
742
class TestUsingCompiledIfAvailable(tests.TestCase):
743
"""Check that any compiled functions that are available are the default.
745
It is possible to have typos, etc in the import line, such that
746
_dirstate_helpers_pyx is actually available, but the compiled functions are
750
def test_bisect_dirblock(self):
751
if compiled_dirstate_helpers_feature.available():
752
from breezy.bzr._dirstate_helpers_pyx import bisect_dirblock
754
from breezy.bzr._dirstate_helpers_py import bisect_dirblock
755
self.assertIs(bisect_dirblock, dirstate.bisect_dirblock)
757
def test__bisect_path_left(self):
758
if compiled_dirstate_helpers_feature.available():
759
from breezy.bzr._dirstate_helpers_pyx import _bisect_path_left
761
from breezy.bzr._dirstate_helpers_py import _bisect_path_left
762
self.assertIs(_bisect_path_left, dirstate._bisect_path_left)
764
def test__bisect_path_right(self):
765
if compiled_dirstate_helpers_feature.available():
766
from breezy.bzr._dirstate_helpers_pyx import _bisect_path_right
768
from breezy.bzr._dirstate_helpers_py import _bisect_path_right
769
self.assertIs(_bisect_path_right, dirstate._bisect_path_right)
771
def test_lt_by_dirs(self):
772
if compiled_dirstate_helpers_feature.available():
773
from ..bzr._dirstate_helpers_pyx import lt_by_dirs
775
from ..bzr._dirstate_helpers_py import lt_by_dirs
776
self.assertIs(lt_by_dirs, dirstate.lt_by_dirs)
778
def test__read_dirblocks(self):
779
if compiled_dirstate_helpers_feature.available():
780
from breezy.bzr._dirstate_helpers_pyx import _read_dirblocks
782
from breezy.bzr._dirstate_helpers_py import _read_dirblocks
783
self.assertIs(_read_dirblocks, dirstate._read_dirblocks)
785
def test_update_entry(self):
786
if compiled_dirstate_helpers_feature.available():
787
from breezy.bzr._dirstate_helpers_pyx import update_entry
789
from breezy.bzr.dirstate import update_entry
790
self.assertIs(update_entry, dirstate.update_entry)
792
def test_process_entry(self):
793
if compiled_dirstate_helpers_feature.available():
794
from breezy.bzr._dirstate_helpers_pyx import ProcessEntryC
795
self.assertIs(ProcessEntryC, dirstate._process_entry)
797
from breezy.bzr.dirstate import ProcessEntryPython
798
self.assertIs(ProcessEntryPython, dirstate._process_entry)
801
class TestUpdateEntry(test_dirstate.TestCaseWithDirState):
802
"""Test the DirState.update_entry functions"""
804
scenarios = multiply_scenarios(
805
dir_reader_scenarios(), ue_scenarios)
811
super(TestUpdateEntry, self).setUp()
812
self.overrideAttr(dirstate, 'update_entry', self.update_entry)
814
def get_state_with_a(self):
815
"""Create a DirState tracking a single object named 'a'"""
816
state = test_dirstate.InstrumentedDirState.initialize('dirstate')
817
self.addCleanup(state.unlock)
818
state.add('a', b'a-id', 'file', None, b'')
819
entry = state._get_entry(0, path_utf8=b'a')
822
def test_observed_sha1_cachable(self):
823
state, entry = self.get_state_with_a()
825
atime = time.time() - 10
826
self.build_tree(['a'])
827
statvalue = test_dirstate._FakeStat.from_stat(os.lstat('a'))
828
statvalue.st_mtime = statvalue.st_ctime = atime
829
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
830
state._dirblock_state)
831
state._observed_sha1(entry, b"foo", statvalue)
832
self.assertEqual(b'foo', entry[1][0][1])
833
packed_stat = dirstate.pack_stat(statvalue)
834
self.assertEqual(packed_stat, entry[1][0][4])
835
self.assertEqual(dirstate.DirState.IN_MEMORY_HASH_MODIFIED,
836
state._dirblock_state)
838
def test_observed_sha1_not_cachable(self):
839
state, entry = self.get_state_with_a()
841
oldval = entry[1][0][1]
842
oldstat = entry[1][0][4]
843
self.build_tree(['a'])
844
statvalue = os.lstat('a')
845
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
846
state._dirblock_state)
847
state._observed_sha1(entry, "foo", statvalue)
848
self.assertEqual(oldval, entry[1][0][1])
849
self.assertEqual(oldstat, entry[1][0][4])
850
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
851
state._dirblock_state)
853
def test_update_entry(self):
854
state, _ = self.get_state_with_a()
855
tree = self.make_branch_and_tree('tree')
857
empty_revid = tree.commit('empty')
858
self.build_tree(['tree/a'])
859
tree.add(['a'], [b'a-id'])
860
with_a_id = tree.commit('with_a')
861
self.addCleanup(tree.unlock)
862
state.set_parent_trees(
863
[(empty_revid, tree.branch.repository.revision_tree(empty_revid))],
865
entry = state._get_entry(0, path_utf8=b'a')
866
self.build_tree(['a'])
867
# Add one where we don't provide the stat or sha already
868
self.assertEqual((b'', b'a', b'a-id'), entry[0])
869
self.assertEqual((b'f', b'', 0, False, dirstate.DirState.NULLSTAT),
871
# Flush the buffers to disk
873
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
874
state._dirblock_state)
876
stat_value = os.lstat('a')
877
packed_stat = dirstate.pack_stat(stat_value)
878
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
879
stat_value=stat_value)
880
self.assertEqual(None, link_or_sha1)
882
# The dirblock entry should not have computed or cached the file's
883
# sha1, but it did update the files' st_size. However, this is not
884
# worth writing a dirstate file for, so we leave the state UNMODIFIED
885
self.assertEqual((b'f', b'', 14, False, dirstate.DirState.NULLSTAT),
887
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
888
state._dirblock_state)
889
mode = stat_value.st_mode
890
self.assertEqual([('is_exec', mode, False)], state._log)
893
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
894
state._dirblock_state)
896
# Roll the clock back so the file is guaranteed to look too new. We
897
# should still not compute the sha1.
898
state.adjust_time(-10)
901
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
902
stat_value=stat_value)
903
self.assertEqual([('is_exec', mode, False)], state._log)
904
self.assertEqual(None, link_or_sha1)
905
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
906
state._dirblock_state)
907
self.assertEqual((b'f', b'', 14, False, dirstate.DirState.NULLSTAT),
911
# If it is cachable (the clock has moved forward) but new it still
912
# won't calculate the sha or cache it.
913
state.adjust_time(+20)
915
link_or_sha1 = dirstate.update_entry(state, entry, abspath=b'a',
916
stat_value=stat_value)
917
self.assertEqual(None, link_or_sha1)
918
self.assertEqual([('is_exec', mode, False)], state._log)
919
self.assertEqual((b'f', b'', 14, False, dirstate.DirState.NULLSTAT),
921
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
922
state._dirblock_state)
924
# If the file is no longer new, and the clock has been moved forward
925
# sufficiently, it will cache the sha.
927
state.set_parent_trees(
928
[(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
930
entry = state._get_entry(0, path_utf8=b'a')
932
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
933
stat_value=stat_value)
934
self.assertEqual(b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
936
self.assertEqual([('is_exec', mode, False), ('sha1', b'a')],
938
self.assertEqual((b'f', link_or_sha1, 14, False, packed_stat),
941
# Subsequent calls will just return the cached value
943
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
944
stat_value=stat_value)
945
self.assertEqual(b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
947
self.assertEqual([], state._log)
948
self.assertEqual((b'f', link_or_sha1, 14, False, packed_stat),
951
def test_update_entry_symlink(self):
952
"""Update entry should read symlinks."""
953
self.requireFeature(features.SymlinkFeature)
954
state, entry = self.get_state_with_a()
956
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
957
state._dirblock_state)
958
os.symlink('target', 'a')
960
state.adjust_time(-10) # Make the symlink look new
961
stat_value = os.lstat('a')
962
packed_stat = dirstate.pack_stat(stat_value)
963
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
964
stat_value=stat_value)
965
self.assertEqual(b'target', link_or_sha1)
966
self.assertEqual([('read_link', b'a', b'')], state._log)
967
# Dirblock is not updated (the link is too new)
968
self.assertEqual([(b'l', b'', 6, False, dirstate.DirState.NULLSTAT)],
970
# The file entry turned into a symlink, that is considered
971
# HASH modified worthy.
972
self.assertEqual(dirstate.DirState.IN_MEMORY_HASH_MODIFIED,
973
state._dirblock_state)
975
# Because the stat_value looks new, we should re-read the target
977
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
978
stat_value=stat_value)
979
self.assertEqual(b'target', link_or_sha1)
980
self.assertEqual([('read_link', b'a', b'')], state._log)
981
self.assertEqual([(b'l', b'', 6, False, dirstate.DirState.NULLSTAT)],
984
state.adjust_time(+20) # Skip into the future, all files look old
986
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
987
stat_value=stat_value)
988
# The symlink stayed a symlink. So while it is new enough to cache, we
989
# don't bother setting the flag, because it is not really worth saving
990
# (when we stat the symlink, we'll have paged in the target.)
991
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
992
state._dirblock_state)
993
self.assertEqual(b'target', link_or_sha1)
994
# We need to re-read the link because only now can we cache it
995
self.assertEqual([('read_link', b'a', b'')], state._log)
996
self.assertEqual([(b'l', b'target', 6, False, packed_stat)],
1000
# Another call won't re-read the link
1001
self.assertEqual([], state._log)
1002
link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
1003
stat_value=stat_value)
1004
self.assertEqual(b'target', link_or_sha1)
1005
self.assertEqual([(b'l', b'target', 6, False, packed_stat)],
1008
def do_update_entry(self, state, entry, abspath):
1009
stat_value = os.lstat(abspath)
1010
return self.update_entry(state, entry, abspath, stat_value)
1012
def test_update_entry_dir(self):
1013
state, entry = self.get_state_with_a()
1014
self.build_tree(['a/'])
1015
self.assertIs(None, self.do_update_entry(state, entry, b'a'))
1017
def test_update_entry_dir_unchanged(self):
1018
state, entry = self.get_state_with_a()
1019
self.build_tree(['a/'])
1020
state.adjust_time(+20)
1021
self.assertIs(None, self.do_update_entry(state, entry, b'a'))
1022
# a/ used to be a file, but is now a directory, worth saving
1023
self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
1024
state._dirblock_state)
1026
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1027
state._dirblock_state)
1028
# No changes to a/ means not worth saving.
1029
self.assertIs(None, self.do_update_entry(state, entry, b'a'))
1030
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1031
state._dirblock_state)
1032
# Change the last-modified time for the directory
1033
t = time.time() - 100.0
1035
os.utime('a', (t, t))
1037
# It looks like Win32 + FAT doesn't allow to change times on a dir.
1038
raise tests.TestSkipped("can't update mtime of a dir on FAT")
1039
saved_packed_stat = entry[1][0][-1]
1040
self.assertIs(None, self.do_update_entry(state, entry, b'a'))
1041
# We *do* go ahead and update the information in the dirblocks, but we
1042
# don't bother setting IN_MEMORY_MODIFIED because it is trivial to
1044
self.assertNotEqual(saved_packed_stat, entry[1][0][-1])
1045
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1046
state._dirblock_state)
1048
def test_update_entry_file_unchanged(self):
1049
state, _ = self.get_state_with_a()
1050
tree = self.make_branch_and_tree('tree')
1052
self.build_tree(['tree/a'])
1053
tree.add(['a'], [b'a-id'])
1054
with_a_id = tree.commit('witha')
1055
self.addCleanup(tree.unlock)
1056
state.set_parent_trees(
1057
[(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
1059
entry = state._get_entry(0, path_utf8=b'a')
1060
self.build_tree(['a'])
1061
sha1sum = b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
1062
state.adjust_time(+20)
1063
self.assertEqual(sha1sum, self.do_update_entry(state, entry, b'a'))
1064
self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
1065
state._dirblock_state)
1067
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1068
state._dirblock_state)
1069
self.assertEqual(sha1sum, self.do_update_entry(state, entry, b'a'))
1070
self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
1071
state._dirblock_state)
1073
def test_update_entry_tree_reference(self):
1074
state = test_dirstate.InstrumentedDirState.initialize('dirstate')
1075
self.addCleanup(state.unlock)
1076
state.add('r', b'r-id', 'tree-reference', None, b'')
1077
self.build_tree(['r/'])
1078
entry = state._get_entry(0, path_utf8=b'r')
1079
self.do_update_entry(state, entry, 'r')
1080
entry = state._get_entry(0, path_utf8=b'r')
1081
self.assertEqual(b't', entry[1][0][0])
1083
def create_and_test_file(self, state, entry):
1084
"""Create a file at 'a' and verify the state finds it during update.
1086
The state should already be versioning *something* at 'a'. This makes
1087
sure that state.update_entry recognizes it as a file.
1089
self.build_tree(['a'])
1090
stat_value = os.lstat('a')
1091
packed_stat = dirstate.pack_stat(stat_value)
1093
link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
1094
self.assertEqual(None, link_or_sha1)
1095
self.assertEqual([(b'f', b'', 14, False, dirstate.DirState.NULLSTAT)],
1099
def create_and_test_dir(self, state, entry):
1100
"""Create a directory at 'a' and verify the state finds it.
1102
The state should already be versioning *something* at 'a'. This makes
1103
sure that state.update_entry recognizes it as a directory.
1105
self.build_tree(['a/'])
1106
stat_value = os.lstat('a')
1107
packed_stat = dirstate.pack_stat(stat_value)
1109
link_or_sha1 = self.do_update_entry(state, entry, abspath=b'a')
1110
self.assertIs(None, link_or_sha1)
1111
self.assertEqual([(b'd', b'', 0, False, packed_stat)], entry[1])
1115
# FIXME: Add unicode version
1116
def create_and_test_symlink(self, state, entry):
1117
"""Create a symlink at 'a' and verify the state finds it.
1119
The state should already be versioning *something* at 'a'. This makes
1120
sure that state.update_entry recognizes it as a symlink.
1122
This should not be called if this platform does not have symlink
1125
# caller should care about skipping test on platforms without symlinks
1126
os.symlink('path/to/foo', 'a')
1128
stat_value = os.lstat('a')
1129
packed_stat = dirstate.pack_stat(stat_value)
1131
link_or_sha1 = self.do_update_entry(state, entry, abspath=b'a')
1132
self.assertEqual(b'path/to/foo', link_or_sha1)
1133
self.assertEqual([(b'l', b'path/to/foo', 11, False, packed_stat)],
1137
def test_update_file_to_dir(self):
1138
"""If a file changes to a directory we return None for the sha.
1139
We also update the inventory record.
1141
state, entry = self.get_state_with_a()
1142
# The file sha1 won't be cached unless the file is old
1143
state.adjust_time(+10)
1144
self.create_and_test_file(state, entry)
1146
self.create_and_test_dir(state, entry)
1148
def test_update_file_to_symlink(self):
1149
"""File becomes a symlink"""
1150
self.requireFeature(features.SymlinkFeature)
1151
state, entry = self.get_state_with_a()
1152
# The file sha1 won't be cached unless the file is old
1153
state.adjust_time(+10)
1154
self.create_and_test_file(state, entry)
1156
self.create_and_test_symlink(state, entry)
1158
def test_update_dir_to_file(self):
1159
"""Directory becoming a file updates the entry."""
1160
state, entry = self.get_state_with_a()
1161
# The file sha1 won't be cached unless the file is old
1162
state.adjust_time(+10)
1163
self.create_and_test_dir(state, entry)
1165
self.create_and_test_file(state, entry)
1167
def test_update_dir_to_symlink(self):
1168
"""Directory becomes a symlink"""
1169
self.requireFeature(features.SymlinkFeature)
1170
state, entry = self.get_state_with_a()
1171
# The symlink target won't be cached if it isn't old
1172
state.adjust_time(+10)
1173
self.create_and_test_dir(state, entry)
1175
self.create_and_test_symlink(state, entry)
1177
def test_update_symlink_to_file(self):
1178
"""Symlink becomes a file"""
1179
self.requireFeature(features.SymlinkFeature)
1180
state, entry = self.get_state_with_a()
1181
# The symlink and file info won't be cached unless old
1182
state.adjust_time(+10)
1183
self.create_and_test_symlink(state, entry)
1185
self.create_and_test_file(state, entry)
1187
def test_update_symlink_to_dir(self):
1188
"""Symlink becomes a directory"""
1189
self.requireFeature(features.SymlinkFeature)
1190
state, entry = self.get_state_with_a()
1191
# The symlink target won't be cached if it isn't old
1192
state.adjust_time(+10)
1193
self.create_and_test_symlink(state, entry)
1195
self.create_and_test_dir(state, entry)
1197
def test__is_executable_win32(self):
1198
state, entry = self.get_state_with_a()
1199
self.build_tree(['a'])
1201
# Make sure we are using the win32 implementation of _is_executable
1202
state._is_executable = state._is_executable_win32
1204
# The file on disk is not executable, but we are marking it as though
1205
# it is. With _is_executable_win32 we ignore what is on disk.
1206
entry[1][0] = (b'f', b'', 0, True, dirstate.DirState.NULLSTAT)
1208
stat_value = os.lstat('a')
1209
packed_stat = dirstate.pack_stat(stat_value)
1211
state.adjust_time(-10) # Make sure everything is new
1212
self.update_entry(state, entry, abspath=b'a', stat_value=stat_value)
1214
# The row is updated, but the executable bit stays set.
1215
self.assertEqual([(b'f', b'', 14, True, dirstate.DirState.NULLSTAT)],
1218
# Make the disk object look old enough to cache (but it won't cache the
1219
# sha as it is a new file).
1220
state.adjust_time(+20)
1221
digest = b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
1222
self.update_entry(state, entry, abspath=b'a', stat_value=stat_value)
1223
self.assertEqual([(b'f', b'', 14, True, dirstate.DirState.NULLSTAT)],
1226
def _prepare_tree(self):
1228
text = b'Hello World\n'
1229
tree = self.make_branch_and_tree('tree')
1230
self.build_tree_contents([('tree/a file', text)])
1231
tree.add('a file', b'a-file-id')
1232
# Note: dirstate does not sha prior to the first commit
1233
# so commit now in order for the test to work
1234
tree.commit('first')
1237
def test_sha1provider_sha1_used(self):
1238
tree, text = self._prepare_tree()
1239
state = dirstate.DirState.from_tree(tree, 'dirstate',
1240
UppercaseSHA1Provider())
1241
self.addCleanup(state.unlock)
1242
expected_sha = osutils.sha_string(text.upper() + b"foo")
1243
entry = state._get_entry(0, path_utf8=b'a file')
1244
self.assertNotEqual((None, None), entry)
1245
state._sha_cutoff_time()
1246
state._cutoff_time += 10
1247
sha1 = self.update_entry(state, entry, 'tree/a file',
1248
os.lstat('tree/a file'))
1249
self.assertEqual(expected_sha, sha1)
1251
def test_sha1provider_stat_and_sha1_used(self):
1252
tree, text = self._prepare_tree()
1254
self.addCleanup(tree.unlock)
1255
state = tree._current_dirstate()
1256
state._sha1_provider = UppercaseSHA1Provider()
1257
# If we used the standard provider, it would look like nothing has
1259
file_ids_changed = [change[0] for change
1260
in tree.iter_changes(tree.basis_tree())]
1261
self.assertEqual([b'a-file-id'], file_ids_changed)
1264
class UppercaseSHA1Provider(dirstate.SHA1Provider):
1265
"""A custom SHA1Provider."""
1267
def sha1(self, abspath):
1268
return self.stat_and_sha1(abspath)[1]
1270
def stat_and_sha1(self, abspath):
1271
with open(abspath, 'rb') as file_obj:
1272
statvalue = os.fstat(file_obj.fileno())
1273
text = b''.join(file_obj.readlines())
1274
sha1 = osutils.sha_string(text.upper() + b"foo")
1275
return statvalue, sha1
1278
class TestProcessEntry(test_dirstate.TestCaseWithDirState):
1280
scenarios = multiply_scenarios(dir_reader_scenarios(), pe_scenarios)
1283
_process_entry = None
1286
super(TestProcessEntry, self).setUp()
1287
self.overrideAttr(dirstate, '_process_entry', self._process_entry)
1289
def assertChangedFileIds(self, expected, tree):
1292
file_ids = [info[0] for info
1293
in tree.iter_changes(tree.basis_tree())]
1296
self.assertEqual(sorted(expected), sorted(file_ids))
1298
def test_exceptions_raised(self):
1299
# This is a direct test of bug #495023, it relies on osutils.is_inside
1300
# getting called in an inner function. Which makes it a bit brittle,
1301
# but at least it does reproduce the bug.
1302
tree = self.make_branch_and_tree('tree')
1303
self.build_tree(['tree/file', 'tree/dir/', 'tree/dir/sub',
1304
'tree/dir2/', 'tree/dir2/sub2'])
1305
tree.add(['file', 'dir', 'dir/sub', 'dir2', 'dir2/sub2'])
1306
tree.commit('first commit')
1308
self.addCleanup(tree.unlock)
1309
basis_tree = tree.basis_tree()
1310
def is_inside_raises(*args, **kwargs):
1311
raise RuntimeError('stop this')
1312
self.overrideAttr(dirstate, 'is_inside', is_inside_raises)
1314
from breezy.bzr import _dirstate_helpers_pyx
1318
self.overrideAttr(_dirstate_helpers_pyx, 'is_inside', is_inside_raises)
1319
self.overrideAttr(osutils, 'is_inside', is_inside_raises)
1320
self.assertListRaises(RuntimeError, tree.iter_changes, basis_tree)
1322
def test_simple_changes(self):
1323
tree = self.make_branch_and_tree('tree')
1324
self.build_tree(['tree/file'])
1325
tree.add(['file'], [b'file-id'])
1326
self.assertChangedFileIds([tree.get_root_id(), b'file-id'], tree)
1328
self.assertChangedFileIds([], tree)
1330
def test_sha1provider_stat_and_sha1_used(self):
1331
tree = self.make_branch_and_tree('tree')
1332
self.build_tree(['tree/file'])
1333
tree.add(['file'], [b'file-id'])
1336
self.addCleanup(tree.unlock)
1337
state = tree._current_dirstate()
1338
state._sha1_provider = UppercaseSHA1Provider()
1339
self.assertChangedFileIds([b'file-id'], tree)
1342
class TestPackStat(tests.TestCase):
1343
"""Check packed representaton of stat values is robust on all inputs"""
1345
scenarios = helper_scenarios
1347
def pack(self, statlike_tuple):
1348
return self.helpers.pack_stat(os.stat_result(statlike_tuple))
1351
def unpack_field(packed_string, stat_field):
1352
return _dirstate_helpers_py._unpack_stat(packed_string)[stat_field]
1354
def test_result(self):
1355
self.assertEqual(b"AAAQAAAAABAAAAARAAAAAgAAAAEAAIHk",
1356
self.pack((33252, 1, 2, 0, 0, 0, 4096, 15.5, 16.5, 17.5)))
1358
def test_giant_inode(self):
1359
packed = self.pack((33252, 0xF80000ABC, 0, 0, 0, 0, 0, 0, 0, 0))
1360
self.assertEqual(0x80000ABC, self.unpack_field(packed, "st_ino"))
1362
def test_giant_size(self):
1363
packed = self.pack((33252, 0, 0, 0, 0, 0, (1 << 33) + 4096, 0, 0, 0))
1364
self.assertEqual(4096, self.unpack_field(packed, "st_size"))
1366
def test_fractional_mtime(self):
1367
packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 16.9375, 0))
1368
self.assertEqual(16, self.unpack_field(packed, "st_mtime"))
1370
def test_ancient_mtime(self):
1371
packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, -11644473600.0, 0))
1372
self.assertEqual(1240428288, self.unpack_field(packed, "st_mtime"))
1374
def test_distant_mtime(self):
1375
packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 64060588800.0, 0))
1376
self.assertEqual(3931046656, self.unpack_field(packed, "st_mtime"))
1378
def test_fractional_ctime(self):
1379
packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, 17.5625))
1380
self.assertEqual(17, self.unpack_field(packed, "st_ctime"))
1382
def test_ancient_ctime(self):
1383
packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, -11644473600.0))
1384
self.assertEqual(1240428288, self.unpack_field(packed, "st_ctime"))
1386
def test_distant_ctime(self):
1387
packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, 64060588800.0))
1388
self.assertEqual(3931046656, self.unpack_field(packed, "st_ctime"))
1390
def test_negative_dev(self):
1391
packed = self.pack((33252, 0, -0xFFFFFCDE, 0, 0, 0, 0, 0, 0, 0))
1392
self.assertEqual(0x322, self.unpack_field(packed, "st_dev"))