/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test__dirstate_helpers.py

  • Committer: Jelmer Vernooij
  • Date: 2020-05-24 00:39:50 UTC
  • mto: This revision was merged to the branch mainline in revision 7504.
  • Revision ID: jelmer@jelmer.uk-20200524003950-bbc545r76vc5yajg
Add github action.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2007-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the compiled dirstate helpers."""
 
18
 
 
19
import bisect
 
20
import os
 
21
import time
 
22
 
 
23
from .. import (
 
24
    osutils,
 
25
    tests,
 
26
    )
 
27
from ..bzr import (
 
28
    dirstate,
 
29
    _dirstate_helpers_py,
 
30
    )
 
31
from . import (
 
32
    test_dirstate,
 
33
    )
 
34
from .test_osutils import dir_reader_scenarios
 
35
from .scenarios import (
 
36
    load_tests_apply_scenarios,
 
37
    multiply_scenarios,
 
38
    )
 
39
from . import (
 
40
    features,
 
41
    )
 
42
 
 
43
 
 
44
load_tests = load_tests_apply_scenarios
 
45
 
 
46
 
 
47
compiled_dirstate_helpers_feature = features.ModuleAvailableFeature(
 
48
    'breezy.bzr._dirstate_helpers_pyx')
 
49
 
 
50
 
 
51
# FIXME: we should also parametrize against SHA1Provider !
 
52
 
 
53
ue_scenarios = [('dirstate_Python',
 
54
                 {'update_entry': dirstate.py_update_entry})]
 
55
if compiled_dirstate_helpers_feature.available():
 
56
    update_entry = compiled_dirstate_helpers_feature.module.update_entry
 
57
    ue_scenarios.append(('dirstate_Pyrex', {'update_entry': update_entry}))
 
58
 
 
59
pe_scenarios = [('dirstate_Python',
 
60
                 {'_process_entry': dirstate.ProcessEntryPython})]
 
61
if compiled_dirstate_helpers_feature.available():
 
62
    process_entry = compiled_dirstate_helpers_feature.module.ProcessEntryC
 
63
    pe_scenarios.append(('dirstate_Pyrex', {'_process_entry': process_entry}))
 
64
 
 
65
helper_scenarios = [('dirstate_Python', {'helpers': _dirstate_helpers_py})]
 
66
if compiled_dirstate_helpers_feature.available():
 
67
    helper_scenarios.append(('dirstate_Pyrex',
 
68
                             {'helpers': compiled_dirstate_helpers_feature.module}))
 
69
 
 
70
 
 
71
class TestBisectPathMixin(object):
 
72
    """Test that _bisect_path_*() returns the expected values.
 
73
 
 
74
    _bisect_path_* is intended to work like bisect.bisect_*() except it
 
75
    knows it is working on paths that are sorted by ('path', 'to', 'foo')
 
76
    chunks rather than by raw 'path/to/foo'.
 
77
 
 
78
    Test Cases should inherit from this and override ``get_bisect_path`` return
 
79
    their implementation, and ``get_bisect`` to return the matching
 
80
    bisect.bisect_* function.
 
81
    """
 
82
 
 
83
    def get_bisect_path(self):
 
84
        """Return an implementation of _bisect_path_*"""
 
85
        raise NotImplementedError
 
86
 
 
87
    def get_bisect(self):
 
88
        """Return a version of bisect.bisect_*.
 
89
 
 
90
        Also, for the 'exists' check, return the offset to the real values.
 
91
        For example bisect_left returns the index of an entry, while
 
92
        bisect_right returns the index *after* an entry
 
93
 
 
94
        :return: (bisect_func, offset)
 
95
        """
 
96
        raise NotImplementedError
 
97
 
 
98
    def assertBisect(self, paths, split_paths, path, exists=True):
 
99
        """Assert that bisect_split works like bisect_left on the split paths.
 
100
 
 
101
        :param paths: A list of path names
 
102
        :param split_paths: A list of path names that are already split up by directory
 
103
            ('path/to/foo' => ('path', 'to', 'foo'))
 
104
        :param path: The path we are indexing.
 
105
        :param exists: The path should be present, so make sure the
 
106
            final location actually points to the right value.
 
107
 
 
108
        All other arguments will be passed along.
 
109
        """
 
110
        bisect_path = self.get_bisect_path()
 
111
        self.assertIsInstance(paths, list)
 
112
        bisect_path_idx = bisect_path(paths, path)
 
113
        split_path = self.split_for_dirblocks([path])[0]
 
114
        bisect_func, offset = self.get_bisect()
 
115
        bisect_split_idx = bisect_func(split_paths, split_path)
 
116
        self.assertEqual(bisect_split_idx, bisect_path_idx,
 
117
                         '%s disagreed. %s != %s'
 
118
                         ' for key %r'
 
119
                         % (bisect_path.__name__,
 
120
                            bisect_split_idx, bisect_path_idx, path)
 
121
                         )
 
122
        if exists:
 
123
            self.assertEqual(path, paths[bisect_path_idx + offset])
 
124
 
 
125
    def split_for_dirblocks(self, paths):
 
126
        dir_split_paths = []
 
127
        for path in paths:
 
128
            dirname, basename = os.path.split(path)
 
129
            dir_split_paths.append((dirname.split(b'/'), basename))
 
130
        dir_split_paths.sort()
 
131
        return dir_split_paths
 
132
 
 
133
    def test_simple(self):
 
134
        """In the simple case it works just like bisect_left"""
 
135
        paths = [b'', b'a', b'b', b'c', b'd']
 
136
        split_paths = self.split_for_dirblocks(paths)
 
137
        for path in paths:
 
138
            self.assertBisect(paths, split_paths, path, exists=True)
 
139
        self.assertBisect(paths, split_paths, b'_', exists=False)
 
140
        self.assertBisect(paths, split_paths, b'aa', exists=False)
 
141
        self.assertBisect(paths, split_paths, b'bb', exists=False)
 
142
        self.assertBisect(paths, split_paths, b'cc', exists=False)
 
143
        self.assertBisect(paths, split_paths, b'dd', exists=False)
 
144
        self.assertBisect(paths, split_paths, b'a/a', exists=False)
 
145
        self.assertBisect(paths, split_paths, b'b/b', exists=False)
 
146
        self.assertBisect(paths, split_paths, b'c/c', exists=False)
 
147
        self.assertBisect(paths, split_paths, b'd/d', exists=False)
 
148
 
 
149
    def test_involved(self):
 
150
        """This is where bisect_path_* diverges slightly."""
 
151
        # This is the list of paths and their contents
 
152
        # a/
 
153
        #   a/
 
154
        #     a
 
155
        #     z
 
156
        #   a-a/
 
157
        #     a
 
158
        #   a-z/
 
159
        #     z
 
160
        #   a=a/
 
161
        #     a
 
162
        #   a=z/
 
163
        #     z
 
164
        #   z/
 
165
        #     a
 
166
        #     z
 
167
        #   z-a
 
168
        #   z-z
 
169
        #   z=a
 
170
        #   z=z
 
171
        # a-a/
 
172
        #   a
 
173
        # a-z/
 
174
        #   z
 
175
        # a=a/
 
176
        #   a
 
177
        # a=z/
 
178
        #   z
 
179
        # This is the exact order that is stored by dirstate
 
180
        # All children in a directory are mentioned before an children of
 
181
        # children are mentioned.
 
182
        # So all the root-directory paths, then all the
 
183
        # first sub directory, etc.
 
184
        paths = [  # content of '/'
 
185
            b'', b'a', b'a-a', b'a-z', b'a=a', b'a=z',
 
186
                 # content of 'a/'
 
187
                 b'a/a', b'a/a-a', b'a/a-z',
 
188
                 b'a/a=a', b'a/a=z',
 
189
                 b'a/z', b'a/z-a', b'a/z-z',
 
190
                 b'a/z=a', b'a/z=z',
 
191
                 # content of 'a/a/'
 
192
                 b'a/a/a', b'a/a/z',
 
193
                 # content of 'a/a-a'
 
194
                 b'a/a-a/a',
 
195
                 # content of 'a/a-z'
 
196
                 b'a/a-z/z',
 
197
                 # content of 'a/a=a'
 
198
                 b'a/a=a/a',
 
199
                 # content of 'a/a=z'
 
200
                 b'a/a=z/z',
 
201
                 # content of 'a/z/'
 
202
                 b'a/z/a', b'a/z/z',
 
203
                 # content of 'a-a'
 
204
                 b'a-a/a',
 
205
                 # content of 'a-z'
 
206
                 b'a-z/z',
 
207
                 # content of 'a=a'
 
208
                 b'a=a/a',
 
209
                 # content of 'a=z'
 
210
                 b'a=z/z',
 
211
            ]
 
212
        split_paths = self.split_for_dirblocks(paths)
 
213
        sorted_paths = []
 
214
        for dir_parts, basename in split_paths:
 
215
            if dir_parts == [b'']:
 
216
                sorted_paths.append(basename)
 
217
            else:
 
218
                sorted_paths.append(b'/'.join(dir_parts + [basename]))
 
219
 
 
220
        self.assertEqual(sorted_paths, paths)
 
221
 
 
222
        for path in paths:
 
223
            self.assertBisect(paths, split_paths, path, exists=True)
 
224
 
 
225
 
 
226
class TestBisectPathLeft(tests.TestCase, TestBisectPathMixin):
 
227
    """Run all Bisect Path tests against _bisect_path_left."""
 
228
 
 
229
    def get_bisect_path(self):
 
230
        from breezy.bzr._dirstate_helpers_py import _bisect_path_left
 
231
        return _bisect_path_left
 
232
 
 
233
    def get_bisect(self):
 
234
        return bisect.bisect_left, 0
 
235
 
 
236
 
 
237
class TestCompiledBisectPathLeft(TestBisectPathLeft):
 
238
    """Run all Bisect Path tests against _bisect_path_lect"""
 
239
 
 
240
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
241
 
 
242
    def get_bisect_path(self):
 
243
        from breezy.bzr._dirstate_helpers_pyx import _bisect_path_left
 
244
        return _bisect_path_left
 
245
 
 
246
 
 
247
class TestBisectPathRight(tests.TestCase, TestBisectPathMixin):
 
248
    """Run all Bisect Path tests against _bisect_path_right"""
 
249
 
 
250
    def get_bisect_path(self):
 
251
        from breezy.bzr._dirstate_helpers_py import _bisect_path_right
 
252
        return _bisect_path_right
 
253
 
 
254
    def get_bisect(self):
 
255
        return bisect.bisect_right, -1
 
256
 
 
257
 
 
258
class TestCompiledBisectPathRight(TestBisectPathRight):
 
259
    """Run all Bisect Path tests against _bisect_path_right"""
 
260
 
 
261
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
262
 
 
263
    def get_bisect_path(self):
 
264
        from breezy.bzr._dirstate_helpers_pyx import _bisect_path_right
 
265
        return _bisect_path_right
 
266
 
 
267
 
 
268
class TestBisectDirblock(tests.TestCase):
 
269
    """Test that bisect_dirblock() returns the expected values.
 
270
 
 
271
    bisect_dirblock is intended to work like bisect.bisect_left() except it
 
272
    knows it is working on dirblocks and that dirblocks are sorted by ('path',
 
273
    'to', 'foo') chunks rather than by raw 'path/to/foo'.
 
274
 
 
275
    This test is parameterized by calling get_bisect_dirblock(). Child test
 
276
    cases can override this function to test against a different
 
277
    implementation.
 
278
    """
 
279
 
 
280
    def get_bisect_dirblock(self):
 
281
        """Return an implementation of bisect_dirblock"""
 
282
        from breezy.bzr._dirstate_helpers_py import bisect_dirblock
 
283
        return bisect_dirblock
 
284
 
 
285
    def assertBisect(self, dirblocks, split_dirblocks, path, *args, **kwargs):
 
286
        """Assert that bisect_split works like bisect_left on the split paths.
 
287
 
 
288
        :param dirblocks: A list of (path, [info]) pairs.
 
289
        :param split_dirblocks: A list of ((split, path), [info]) pairs.
 
290
        :param path: The path we are indexing.
 
291
 
 
292
        All other arguments will be passed along.
 
293
        """
 
294
        bisect_dirblock = self.get_bisect_dirblock()
 
295
        self.assertIsInstance(dirblocks, list)
 
296
        bisect_split_idx = bisect_dirblock(dirblocks, path, *args, **kwargs)
 
297
        split_dirblock = (path.split(b'/'), [])
 
298
        bisect_left_idx = bisect.bisect_left(split_dirblocks, split_dirblock,
 
299
                                             *args)
 
300
        self.assertEqual(bisect_left_idx, bisect_split_idx,
 
301
                         'bisect_split disagreed. %s != %s'
 
302
                         ' for key %r'
 
303
                         % (bisect_left_idx, bisect_split_idx, path)
 
304
                         )
 
305
 
 
306
    def paths_to_dirblocks(self, paths):
 
307
        """Convert a list of paths into dirblock form.
 
308
 
 
309
        Also, ensure that the paths are in proper sorted order.
 
310
        """
 
311
        dirblocks = [(path, []) for path in paths]
 
312
        split_dirblocks = [(path.split(b'/'), []) for path in paths]
 
313
        self.assertEqual(sorted(split_dirblocks), split_dirblocks)
 
314
        return dirblocks, split_dirblocks
 
315
 
 
316
    def test_simple(self):
 
317
        """In the simple case it works just like bisect_left"""
 
318
        paths = [b'', b'a', b'b', b'c', b'd']
 
319
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
 
320
        for path in paths:
 
321
            self.assertBisect(dirblocks, split_dirblocks, path)
 
322
        self.assertBisect(dirblocks, split_dirblocks, b'_')
 
323
        self.assertBisect(dirblocks, split_dirblocks, b'aa')
 
324
        self.assertBisect(dirblocks, split_dirblocks, b'bb')
 
325
        self.assertBisect(dirblocks, split_dirblocks, b'cc')
 
326
        self.assertBisect(dirblocks, split_dirblocks, b'dd')
 
327
        self.assertBisect(dirblocks, split_dirblocks, b'a/a')
 
328
        self.assertBisect(dirblocks, split_dirblocks, b'b/b')
 
329
        self.assertBisect(dirblocks, split_dirblocks, b'c/c')
 
330
        self.assertBisect(dirblocks, split_dirblocks, b'd/d')
 
331
 
 
332
    def test_involved(self):
 
333
        """This is where bisect_left diverges slightly."""
 
334
        paths = [b'', b'a',
 
335
                 b'a/a', b'a/a/a', b'a/a/z', b'a/a-a', b'a/a-z',
 
336
                 b'a/z', b'a/z/a', b'a/z/z', b'a/z-a', b'a/z-z',
 
337
                 b'a-a', b'a-z',
 
338
                 b'z', b'z/a/a', b'z/a/z', b'z/a-a', b'z/a-z',
 
339
                 b'z/z', b'z/z/a', b'z/z/z', b'z/z-a', b'z/z-z',
 
340
                 b'z-a', b'z-z',
 
341
                 ]
 
342
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
 
343
        for path in paths:
 
344
            self.assertBisect(dirblocks, split_dirblocks, path)
 
345
 
 
346
    def test_involved_cached(self):
 
347
        """This is where bisect_left diverges slightly."""
 
348
        paths = [b'', b'a',
 
349
                 b'a/a', b'a/a/a', b'a/a/z', b'a/a-a', b'a/a-z',
 
350
                 b'a/z', b'a/z/a', b'a/z/z', b'a/z-a', b'a/z-z',
 
351
                 b'a-a', b'a-z',
 
352
                 b'z', b'z/a/a', b'z/a/z', b'z/a-a', b'z/a-z',
 
353
                 b'z/z', b'z/z/a', b'z/z/z', b'z/z-a', b'z/z-z',
 
354
                 b'z-a', b'z-z',
 
355
                 ]
 
356
        cache = {}
 
357
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
 
358
        for path in paths:
 
359
            self.assertBisect(dirblocks, split_dirblocks, path, cache=cache)
 
360
 
 
361
 
 
362
class TestCompiledBisectDirblock(TestBisectDirblock):
 
363
    """Test that bisect_dirblock() returns the expected values.
 
364
 
 
365
    bisect_dirblock is intended to work like bisect.bisect_left() except it
 
366
    knows it is working on dirblocks and that dirblocks are sorted by ('path',
 
367
    'to', 'foo') chunks rather than by raw 'path/to/foo'.
 
368
 
 
369
    This runs all the normal tests that TestBisectDirblock did, but uses the
 
370
    compiled version.
 
371
    """
 
372
 
 
373
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
374
 
 
375
    def get_bisect_dirblock(self):
 
376
        from breezy.bzr._dirstate_helpers_pyx import bisect_dirblock
 
377
        return bisect_dirblock
 
378
 
 
379
 
 
380
class TestLtByDirs(tests.TestCase):
 
381
    """Test an implementation of lt_by_dirs()
 
382
 
 
383
    lt_by_dirs() compares 2 paths by their directory sections, rather than as
 
384
    plain strings.
 
385
 
 
386
    Child test cases can override ``get_lt_by_dirs`` to test a specific
 
387
    implementation.
 
388
    """
 
389
 
 
390
    def get_lt_by_dirs(self):
 
391
        """Get a specific implementation of lt_by_dirs."""
 
392
        from ..bzr._dirstate_helpers_py import lt_by_dirs
 
393
        return lt_by_dirs
 
394
 
 
395
    def assertCmpByDirs(self, expected, str1, str2):
 
396
        """Compare the two strings, in both directions.
 
397
 
 
398
        :param expected: The expected comparison value. -1 means str1 comes
 
399
            first, 0 means they are equal, 1 means str2 comes first
 
400
        :param str1: string to compare
 
401
        :param str2: string to compare
 
402
        """
 
403
        lt_by_dirs = self.get_lt_by_dirs()
 
404
        if expected == 0:
 
405
            self.assertEqual(str1, str2)
 
406
            self.assertFalse(lt_by_dirs(str1, str2))
 
407
            self.assertFalse(lt_by_dirs(str2, str1))
 
408
        elif expected > 0:
 
409
            self.assertFalse(lt_by_dirs(str1, str2))
 
410
            self.assertTrue(lt_by_dirs(str2, str1))
 
411
        else:
 
412
            self.assertTrue(lt_by_dirs(str1, str2))
 
413
            self.assertFalse(lt_by_dirs(str2, str1))
 
414
 
 
415
    def test_cmp_empty(self):
 
416
        """Compare against the empty string."""
 
417
        self.assertCmpByDirs(0, b'', b'')
 
418
        self.assertCmpByDirs(1, b'a', b'')
 
419
        self.assertCmpByDirs(1, b'ab', b'')
 
420
        self.assertCmpByDirs(1, b'abc', b'')
 
421
        self.assertCmpByDirs(1, b'abcd', b'')
 
422
        self.assertCmpByDirs(1, b'abcde', b'')
 
423
        self.assertCmpByDirs(1, b'abcdef', b'')
 
424
        self.assertCmpByDirs(1, b'abcdefg', b'')
 
425
        self.assertCmpByDirs(1, b'abcdefgh', b'')
 
426
        self.assertCmpByDirs(1, b'abcdefghi', b'')
 
427
        self.assertCmpByDirs(1, b'test/ing/a/path/', b'')
 
428
 
 
429
    def test_cmp_same_str(self):
 
430
        """Compare the same string"""
 
431
        self.assertCmpByDirs(0, b'a', b'a')
 
432
        self.assertCmpByDirs(0, b'ab', b'ab')
 
433
        self.assertCmpByDirs(0, b'abc', b'abc')
 
434
        self.assertCmpByDirs(0, b'abcd', b'abcd')
 
435
        self.assertCmpByDirs(0, b'abcde', b'abcde')
 
436
        self.assertCmpByDirs(0, b'abcdef', b'abcdef')
 
437
        self.assertCmpByDirs(0, b'abcdefg', b'abcdefg')
 
438
        self.assertCmpByDirs(0, b'abcdefgh', b'abcdefgh')
 
439
        self.assertCmpByDirs(0, b'abcdefghi', b'abcdefghi')
 
440
        self.assertCmpByDirs(0, b'testing a long string',
 
441
                             b'testing a long string')
 
442
        self.assertCmpByDirs(0, b'x' * 10000, b'x' * 10000)
 
443
        self.assertCmpByDirs(0, b'a/b', b'a/b')
 
444
        self.assertCmpByDirs(0, b'a/b/c', b'a/b/c')
 
445
        self.assertCmpByDirs(0, b'a/b/c/d', b'a/b/c/d')
 
446
        self.assertCmpByDirs(0, b'a/b/c/d/e', b'a/b/c/d/e')
 
447
 
 
448
    def test_simple_paths(self):
 
449
        """Compare strings that act like normal string comparison"""
 
450
        self.assertCmpByDirs(-1, b'a', b'b')
 
451
        self.assertCmpByDirs(-1, b'aa', b'ab')
 
452
        self.assertCmpByDirs(-1, b'ab', b'bb')
 
453
        self.assertCmpByDirs(-1, b'aaa', b'aab')
 
454
        self.assertCmpByDirs(-1, b'aab', b'abb')
 
455
        self.assertCmpByDirs(-1, b'abb', b'bbb')
 
456
        self.assertCmpByDirs(-1, b'aaaa', b'aaab')
 
457
        self.assertCmpByDirs(-1, b'aaab', b'aabb')
 
458
        self.assertCmpByDirs(-1, b'aabb', b'abbb')
 
459
        self.assertCmpByDirs(-1, b'abbb', b'bbbb')
 
460
        self.assertCmpByDirs(-1, b'aaaaa', b'aaaab')
 
461
        self.assertCmpByDirs(-1, b'a/a', b'a/b')
 
462
        self.assertCmpByDirs(-1, b'a/b', b'b/b')
 
463
        self.assertCmpByDirs(-1, b'a/a/a', b'a/a/b')
 
464
        self.assertCmpByDirs(-1, b'a/a/b', b'a/b/b')
 
465
        self.assertCmpByDirs(-1, b'a/b/b', b'b/b/b')
 
466
        self.assertCmpByDirs(-1, b'a/a/a/a', b'a/a/a/b')
 
467
        self.assertCmpByDirs(-1, b'a/a/a/b', b'a/a/b/b')
 
468
        self.assertCmpByDirs(-1, b'a/a/b/b', b'a/b/b/b')
 
469
        self.assertCmpByDirs(-1, b'a/b/b/b', b'b/b/b/b')
 
470
        self.assertCmpByDirs(-1, b'a/a/a/a/a', b'a/a/a/a/b')
 
471
 
 
472
    def test_tricky_paths(self):
 
473
        self.assertCmpByDirs(1, b'ab/cd/ef', b'ab/cc/ef')
 
474
        self.assertCmpByDirs(1, b'ab/cd/ef', b'ab/c/ef')
 
475
        self.assertCmpByDirs(-1, b'ab/cd/ef', b'ab/cd-ef')
 
476
        self.assertCmpByDirs(-1, b'ab/cd', b'ab/cd-')
 
477
        self.assertCmpByDirs(-1, b'ab/cd', b'ab-cd')
 
478
 
 
479
    def test_cmp_unicode_not_allowed(self):
 
480
        lt_by_dirs = self.get_lt_by_dirs()
 
481
        self.assertRaises(TypeError, lt_by_dirs, u'Unicode', b'str')
 
482
        self.assertRaises(TypeError, lt_by_dirs, b'str', u'Unicode')
 
483
        self.assertRaises(TypeError, lt_by_dirs, u'Unicode', u'Unicode')
 
484
 
 
485
    def test_cmp_non_ascii(self):
 
486
        self.assertCmpByDirs(-1, b'\xc2\xb5', b'\xc3\xa5')  # u'\xb5', u'\xe5'
 
487
        self.assertCmpByDirs(-1, b'a', b'\xc3\xa5')  # u'a', u'\xe5'
 
488
        self.assertCmpByDirs(-1, b'b', b'\xc2\xb5')  # u'b', u'\xb5'
 
489
        self.assertCmpByDirs(-1, b'a/b', b'a/\xc3\xa5')  # u'a/b', u'a/\xe5'
 
490
        self.assertCmpByDirs(-1, b'b/a', b'b/\xc2\xb5')  # u'b/a', u'b/\xb5'
 
491
 
 
492
 
 
493
class TestCompiledLtByDirs(TestLtByDirs):
 
494
    """Test the pyrex implementation of lt_by_dirs"""
 
495
 
 
496
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
497
 
 
498
    def get_lt_by_dirs(self):
 
499
        from ..bzr._dirstate_helpers_pyx import lt_by_dirs
 
500
        return lt_by_dirs
 
501
 
 
502
 
 
503
class TestLtPathByDirblock(tests.TestCase):
 
504
    """Test an implementation of _lt_path_by_dirblock()
 
505
 
 
506
    _lt_path_by_dirblock() compares two paths using the sort order used by
 
507
    DirState. All paths in the same directory are sorted together.
 
508
 
 
509
    Child test cases can override ``get_lt_path_by_dirblock`` to test a specific
 
510
    implementation.
 
511
    """
 
512
 
 
513
    def get_lt_path_by_dirblock(self):
 
514
        """Get a specific implementation of _lt_path_by_dirblock."""
 
515
        from ..bzr._dirstate_helpers_py import _lt_path_by_dirblock
 
516
        return _lt_path_by_dirblock
 
517
 
 
518
    def assertLtPathByDirblock(self, paths):
 
519
        """Compare all paths and make sure they evaluate to the correct order.
 
520
 
 
521
        This does N^2 comparisons. It is assumed that ``paths`` is properly
 
522
        sorted list.
 
523
 
 
524
        :param paths: a sorted list of paths to compare
 
525
        """
 
526
        # First, make sure the paths being passed in are correct
 
527
        def _key(p):
 
528
            dirname, basename = os.path.split(p)
 
529
            return dirname.split(b'/'), basename
 
530
        self.assertEqual(sorted(paths, key=_key), paths)
 
531
 
 
532
        lt_path_by_dirblock = self.get_lt_path_by_dirblock()
 
533
        for idx1, path1 in enumerate(paths):
 
534
            for idx2, path2 in enumerate(paths):
 
535
                lt_result = lt_path_by_dirblock(path1, path2)
 
536
                self.assertEqual(idx1 < idx2, lt_result,
 
537
                                 '%s did not state that %r < %r, lt=%s'
 
538
                                 % (lt_path_by_dirblock.__name__,
 
539
                                    path1, path2, lt_result))
 
540
 
 
541
    def test_cmp_simple_paths(self):
 
542
        """Compare against the empty string."""
 
543
        self.assertLtPathByDirblock(
 
544
            [b'', b'a', b'ab', b'abc', b'a/b/c', b'b/d/e'])
 
545
        self.assertLtPathByDirblock([b'kl', b'ab/cd', b'ab/ef', b'gh/ij'])
 
546
 
 
547
    def test_tricky_paths(self):
 
548
        self.assertLtPathByDirblock([
 
549
            # Contents of ''
 
550
            b'', b'a', b'a-a', b'a=a', b'b',
 
551
            # Contents of 'a'
 
552
            b'a/a', b'a/a-a', b'a/a=a', b'a/b',
 
553
            # Contents of 'a/a'
 
554
            b'a/a/a', b'a/a/a-a', b'a/a/a=a',
 
555
            # Contents of 'a/a/a'
 
556
            b'a/a/a/a', b'a/a/a/b',
 
557
            # Contents of 'a/a/a-a',
 
558
            b'a/a/a-a/a', b'a/a/a-a/b',
 
559
            # Contents of 'a/a/a=a',
 
560
            b'a/a/a=a/a', b'a/a/a=a/b',
 
561
            # Contents of 'a/a-a'
 
562
            b'a/a-a/a',
 
563
            # Contents of 'a/a-a/a'
 
564
            b'a/a-a/a/a', b'a/a-a/a/b',
 
565
            # Contents of 'a/a=a'
 
566
            b'a/a=a/a',
 
567
            # Contents of 'a/b'
 
568
            b'a/b/a', b'a/b/b',
 
569
            # Contents of 'a-a',
 
570
            b'a-a/a', b'a-a/b',
 
571
            # Contents of 'a=a',
 
572
            b'a=a/a', b'a=a/b',
 
573
            # Contents of 'b',
 
574
            b'b/a', b'b/b',
 
575
            ])
 
576
        self.assertLtPathByDirblock([
 
577
            # content of '/'
 
578
            b'', b'a', b'a-a', b'a-z', b'a=a', b'a=z',
 
579
                 # content of 'a/'
 
580
                 b'a/a', b'a/a-a', b'a/a-z',
 
581
                 b'a/a=a', b'a/a=z',
 
582
                 b'a/z', b'a/z-a', b'a/z-z',
 
583
                 b'a/z=a', b'a/z=z',
 
584
                 # content of 'a/a/'
 
585
                 b'a/a/a', b'a/a/z',
 
586
                 # content of 'a/a-a'
 
587
                 b'a/a-a/a',
 
588
                 # content of 'a/a-z'
 
589
                 b'a/a-z/z',
 
590
                 # content of 'a/a=a'
 
591
                 b'a/a=a/a',
 
592
                 # content of 'a/a=z'
 
593
                 b'a/a=z/z',
 
594
                 # content of 'a/z/'
 
595
                 b'a/z/a', b'a/z/z',
 
596
                 # content of 'a-a'
 
597
                 b'a-a/a',
 
598
                 # content of 'a-z'
 
599
                 b'a-z/z',
 
600
                 # content of 'a=a'
 
601
                 b'a=a/a',
 
602
                 # content of 'a=z'
 
603
                 b'a=z/z',
 
604
            ])
 
605
 
 
606
    def test_unicode_not_allowed(self):
 
607
        lt_path_by_dirblock = self.get_lt_path_by_dirblock()
 
608
        self.assertRaises(TypeError, lt_path_by_dirblock, u'Uni', 'str')
 
609
        self.assertRaises(TypeError, lt_path_by_dirblock, 'str', u'Uni')
 
610
        self.assertRaises(TypeError, lt_path_by_dirblock, u'Uni', u'Uni')
 
611
        self.assertRaises(TypeError, lt_path_by_dirblock, u'x/Uni', 'x/str')
 
612
        self.assertRaises(TypeError, lt_path_by_dirblock, 'x/str', u'x/Uni')
 
613
        self.assertRaises(TypeError, lt_path_by_dirblock, u'x/Uni', u'x/Uni')
 
614
 
 
615
    def test_nonascii(self):
 
616
        self.assertLtPathByDirblock([
 
617
            # content of '/'
 
618
            b'', b'a', b'\xc2\xb5', b'\xc3\xa5',
 
619
            # content of 'a'
 
620
            b'a/a', b'a/\xc2\xb5', b'a/\xc3\xa5',
 
621
            # content of 'a/a'
 
622
            b'a/a/a', b'a/a/\xc2\xb5', b'a/a/\xc3\xa5',
 
623
            # content of 'a/\xc2\xb5'
 
624
            b'a/\xc2\xb5/a', b'a/\xc2\xb5/\xc2\xb5', b'a/\xc2\xb5/\xc3\xa5',
 
625
            # content of 'a/\xc3\xa5'
 
626
            b'a/\xc3\xa5/a', b'a/\xc3\xa5/\xc2\xb5', b'a/\xc3\xa5/\xc3\xa5',
 
627
            # content of '\xc2\xb5'
 
628
            b'\xc2\xb5/a', b'\xc2\xb5/\xc2\xb5', b'\xc2\xb5/\xc3\xa5',
 
629
            # content of '\xc2\xe5'
 
630
            b'\xc3\xa5/a', b'\xc3\xa5/\xc2\xb5', b'\xc3\xa5/\xc3\xa5',
 
631
            ])
 
632
 
 
633
 
 
634
class TestCompiledLtPathByDirblock(TestLtPathByDirblock):
 
635
    """Test the pyrex implementation of _lt_path_by_dirblock"""
 
636
 
 
637
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
638
 
 
639
    def get_lt_path_by_dirblock(self):
 
640
        from ..bzr._dirstate_helpers_pyx import _lt_path_by_dirblock
 
641
        return _lt_path_by_dirblock
 
642
 
 
643
 
 
644
class TestMemRChr(tests.TestCase):
 
645
    """Test memrchr functionality"""
 
646
 
 
647
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
648
 
 
649
    def assertMemRChr(self, expected, s, c):
 
650
        from breezy.bzr._dirstate_helpers_pyx import _py_memrchr
 
651
        self.assertEqual(expected, _py_memrchr(s, c))
 
652
 
 
653
    def test_missing(self):
 
654
        self.assertMemRChr(None, b'', b'a')
 
655
        self.assertMemRChr(None, b'', b'c')
 
656
        self.assertMemRChr(None, b'abcdefghijklm', b'q')
 
657
        self.assertMemRChr(None, b'aaaaaaaaaaaaaaaaaaaaaaa', b'b')
 
658
 
 
659
    def test_single_entry(self):
 
660
        self.assertMemRChr(0, b'abcdefghijklm', b'a')
 
661
        self.assertMemRChr(1, b'abcdefghijklm', b'b')
 
662
        self.assertMemRChr(2, b'abcdefghijklm', b'c')
 
663
        self.assertMemRChr(10, b'abcdefghijklm', b'k')
 
664
        self.assertMemRChr(11, b'abcdefghijklm', b'l')
 
665
        self.assertMemRChr(12, b'abcdefghijklm', b'm')
 
666
 
 
667
    def test_multiple(self):
 
668
        self.assertMemRChr(10, b'abcdefjklmabcdefghijklm', b'a')
 
669
        self.assertMemRChr(11, b'abcdefjklmabcdefghijklm', b'b')
 
670
        self.assertMemRChr(12, b'abcdefjklmabcdefghijklm', b'c')
 
671
        self.assertMemRChr(20, b'abcdefjklmabcdefghijklm', b'k')
 
672
        self.assertMemRChr(21, b'abcdefjklmabcdefghijklm', b'l')
 
673
        self.assertMemRChr(22, b'abcdefjklmabcdefghijklm', b'm')
 
674
        self.assertMemRChr(22, b'aaaaaaaaaaaaaaaaaaaaaaa', b'a')
 
675
 
 
676
    def test_with_nulls(self):
 
677
        self.assertMemRChr(10, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'a')
 
678
        self.assertMemRChr(11, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'b')
 
679
        self.assertMemRChr(12, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'c')
 
680
        self.assertMemRChr(20, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'k')
 
681
        self.assertMemRChr(21, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'l')
 
682
        self.assertMemRChr(22, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'm')
 
683
        self.assertMemRChr(22, b'aaa\0\0\0aaaaaaa\0\0\0aaaaaaa', b'a')
 
684
        self.assertMemRChr(9, b'\0\0\0\0\0\0\0\0\0\0', b'\0')
 
685
 
 
686
 
 
687
class TestReadDirblocks(test_dirstate.TestCaseWithDirState):
 
688
    """Test an implementation of _read_dirblocks()
 
689
 
 
690
    _read_dirblocks() reads in all of the dirblock information from the disk
 
691
    file.
 
692
 
 
693
    Child test cases can override ``get_read_dirblocks`` to test a specific
 
694
    implementation.
 
695
    """
 
696
 
 
697
    # inherits scenarios from test_dirstate
 
698
 
 
699
    def get_read_dirblocks(self):
 
700
        from breezy.bzr._dirstate_helpers_py import _read_dirblocks
 
701
        return _read_dirblocks
 
702
 
 
703
    def test_smoketest(self):
 
704
        """Make sure that we can create and read back a simple file."""
 
705
        tree, state, expected = self.create_basic_dirstate()
 
706
        del tree
 
707
        state._read_header_if_needed()
 
708
        self.assertEqual(dirstate.DirState.NOT_IN_MEMORY,
 
709
                         state._dirblock_state)
 
710
        read_dirblocks = self.get_read_dirblocks()
 
711
        read_dirblocks(state)
 
712
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
713
                         state._dirblock_state)
 
714
 
 
715
    def test_trailing_garbage(self):
 
716
        tree, state, expected = self.create_basic_dirstate()
 
717
        # On Unix, we can write extra data as long as we haven't read yet, but
 
718
        # on Win32, if you've opened the file with FILE_SHARE_READ, trying to
 
719
        # open it in append mode will fail.
 
720
        state.unlock()
 
721
        f = open('dirstate', 'ab')
 
722
        try:
 
723
            # Add bogus trailing garbage
 
724
            f.write(b'bogus\n')
 
725
        finally:
 
726
            f.close()
 
727
            state.lock_read()
 
728
        e = self.assertRaises(dirstate.DirstateCorrupt,
 
729
                              state._read_dirblocks_if_needed)
 
730
        # Make sure we mention the bogus characters in the error
 
731
        self.assertContainsRe(str(e), 'bogus')
 
732
 
 
733
 
 
734
class TestCompiledReadDirblocks(TestReadDirblocks):
 
735
    """Test the pyrex implementation of _read_dirblocks"""
 
736
 
 
737
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
738
 
 
739
    def get_read_dirblocks(self):
 
740
        from breezy.bzr._dirstate_helpers_pyx import _read_dirblocks
 
741
        return _read_dirblocks
 
742
 
 
743
 
 
744
class TestUsingCompiledIfAvailable(tests.TestCase):
 
745
    """Check that any compiled functions that are available are the default.
 
746
 
 
747
    It is possible to have typos, etc in the import line, such that
 
748
    _dirstate_helpers_pyx is actually available, but the compiled functions are
 
749
    not being used.
 
750
    """
 
751
 
 
752
    def test_bisect_dirblock(self):
 
753
        if compiled_dirstate_helpers_feature.available():
 
754
            from breezy.bzr._dirstate_helpers_pyx import bisect_dirblock
 
755
        else:
 
756
            from breezy.bzr._dirstate_helpers_py import bisect_dirblock
 
757
        self.assertIs(bisect_dirblock, dirstate.bisect_dirblock)
 
758
 
 
759
    def test__bisect_path_left(self):
 
760
        if compiled_dirstate_helpers_feature.available():
 
761
            from breezy.bzr._dirstate_helpers_pyx import _bisect_path_left
 
762
        else:
 
763
            from breezy.bzr._dirstate_helpers_py import _bisect_path_left
 
764
        self.assertIs(_bisect_path_left, dirstate._bisect_path_left)
 
765
 
 
766
    def test__bisect_path_right(self):
 
767
        if compiled_dirstate_helpers_feature.available():
 
768
            from breezy.bzr._dirstate_helpers_pyx import _bisect_path_right
 
769
        else:
 
770
            from breezy.bzr._dirstate_helpers_py import _bisect_path_right
 
771
        self.assertIs(_bisect_path_right, dirstate._bisect_path_right)
 
772
 
 
773
    def test_lt_by_dirs(self):
 
774
        if compiled_dirstate_helpers_feature.available():
 
775
            from ..bzr._dirstate_helpers_pyx import lt_by_dirs
 
776
        else:
 
777
            from ..bzr._dirstate_helpers_py import lt_by_dirs
 
778
        self.assertIs(lt_by_dirs, dirstate.lt_by_dirs)
 
779
 
 
780
    def test__read_dirblocks(self):
 
781
        if compiled_dirstate_helpers_feature.available():
 
782
            from breezy.bzr._dirstate_helpers_pyx import _read_dirblocks
 
783
        else:
 
784
            from breezy.bzr._dirstate_helpers_py import _read_dirblocks
 
785
        self.assertIs(_read_dirblocks, dirstate._read_dirblocks)
 
786
 
 
787
    def test_update_entry(self):
 
788
        if compiled_dirstate_helpers_feature.available():
 
789
            from breezy.bzr._dirstate_helpers_pyx import update_entry
 
790
        else:
 
791
            from breezy.bzr.dirstate import update_entry
 
792
        self.assertIs(update_entry, dirstate.update_entry)
 
793
 
 
794
    def test_process_entry(self):
 
795
        if compiled_dirstate_helpers_feature.available():
 
796
            from breezy.bzr._dirstate_helpers_pyx import ProcessEntryC
 
797
            self.assertIs(ProcessEntryC, dirstate._process_entry)
 
798
        else:
 
799
            from breezy.bzr.dirstate import ProcessEntryPython
 
800
            self.assertIs(ProcessEntryPython, dirstate._process_entry)
 
801
 
 
802
 
 
803
class TestUpdateEntry(test_dirstate.TestCaseWithDirState):
 
804
    """Test the DirState.update_entry functions"""
 
805
 
 
806
    scenarios = multiply_scenarios(
 
807
        dir_reader_scenarios(), ue_scenarios)
 
808
 
 
809
    # Set by load_tests
 
810
    update_entry = None
 
811
 
 
812
    def setUp(self):
 
813
        super(TestUpdateEntry, self).setUp()
 
814
        self.overrideAttr(dirstate, 'update_entry', self.update_entry)
 
815
 
 
816
    def get_state_with_a(self):
 
817
        """Create a DirState tracking a single object named 'a'"""
 
818
        state = test_dirstate.InstrumentedDirState.initialize('dirstate')
 
819
        self.addCleanup(state.unlock)
 
820
        state.add('a', b'a-id', 'file', None, b'')
 
821
        entry = state._get_entry(0, path_utf8=b'a')
 
822
        return state, entry
 
823
 
 
824
    def test_observed_sha1_cachable(self):
 
825
        state, entry = self.get_state_with_a()
 
826
        state.save()
 
827
        atime = time.time() - 10
 
828
        self.build_tree(['a'])
 
829
        statvalue = test_dirstate._FakeStat.from_stat(os.lstat('a'))
 
830
        statvalue.st_mtime = statvalue.st_ctime = atime
 
831
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
832
                         state._dirblock_state)
 
833
        state._observed_sha1(entry, b"foo", statvalue)
 
834
        self.assertEqual(b'foo', entry[1][0][1])
 
835
        packed_stat = dirstate.pack_stat(statvalue)
 
836
        self.assertEqual(packed_stat, entry[1][0][4])
 
837
        self.assertEqual(dirstate.DirState.IN_MEMORY_HASH_MODIFIED,
 
838
                         state._dirblock_state)
 
839
 
 
840
    def test_observed_sha1_not_cachable(self):
 
841
        state, entry = self.get_state_with_a()
 
842
        state.save()
 
843
        oldval = entry[1][0][1]
 
844
        oldstat = entry[1][0][4]
 
845
        self.build_tree(['a'])
 
846
        statvalue = os.lstat('a')
 
847
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
848
                         state._dirblock_state)
 
849
        state._observed_sha1(entry, "foo", statvalue)
 
850
        self.assertEqual(oldval, entry[1][0][1])
 
851
        self.assertEqual(oldstat, entry[1][0][4])
 
852
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
853
                         state._dirblock_state)
 
854
 
 
855
    def test_update_entry(self):
 
856
        state, _ = self.get_state_with_a()
 
857
        tree = self.make_branch_and_tree('tree')
 
858
        tree.lock_write()
 
859
        empty_revid = tree.commit('empty')
 
860
        self.build_tree(['tree/a'])
 
861
        tree.add(['a'], [b'a-id'])
 
862
        with_a_id = tree.commit('with_a')
 
863
        self.addCleanup(tree.unlock)
 
864
        state.set_parent_trees(
 
865
            [(empty_revid, tree.branch.repository.revision_tree(empty_revid))],
 
866
            [])
 
867
        entry = state._get_entry(0, path_utf8=b'a')
 
868
        self.build_tree(['a'])
 
869
        # Add one where we don't provide the stat or sha already
 
870
        self.assertEqual((b'', b'a', b'a-id'), entry[0])
 
871
        self.assertEqual((b'f', b'', 0, False, dirstate.DirState.NULLSTAT),
 
872
                         entry[1][0])
 
873
        # Flush the buffers to disk
 
874
        state.save()
 
875
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
876
                         state._dirblock_state)
 
877
 
 
878
        stat_value = os.lstat('a')
 
879
        packed_stat = dirstate.pack_stat(stat_value)
 
880
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
881
                                         stat_value=stat_value)
 
882
        self.assertEqual(None, link_or_sha1)
 
883
 
 
884
        # The dirblock entry should not have computed or cached the file's
 
885
        # sha1, but it did update the files' st_size. However, this is not
 
886
        # worth writing a dirstate file for, so we leave the state UNMODIFIED
 
887
        self.assertEqual((b'f', b'', 14, False, dirstate.DirState.NULLSTAT),
 
888
                         entry[1][0])
 
889
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
890
                         state._dirblock_state)
 
891
        mode = stat_value.st_mode
 
892
        self.assertEqual([('is_exec', mode, False)], state._log)
 
893
 
 
894
        state.save()
 
895
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
896
                         state._dirblock_state)
 
897
 
 
898
        # Roll the clock back so the file is guaranteed to look too new. We
 
899
        # should still not compute the sha1.
 
900
        state.adjust_time(-10)
 
901
        del state._log[:]
 
902
 
 
903
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
904
                                         stat_value=stat_value)
 
905
        self.assertEqual([('is_exec', mode, False)], state._log)
 
906
        self.assertEqual(None, link_or_sha1)
 
907
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
908
                         state._dirblock_state)
 
909
        self.assertEqual((b'f', b'', 14, False, dirstate.DirState.NULLSTAT),
 
910
                         entry[1][0])
 
911
        state.save()
 
912
 
 
913
        # If it is cachable (the clock has moved forward) but new it still
 
914
        # won't calculate the sha or cache it.
 
915
        state.adjust_time(+20)
 
916
        del state._log[:]
 
917
        link_or_sha1 = dirstate.update_entry(state, entry, abspath=b'a',
 
918
                                             stat_value=stat_value)
 
919
        self.assertEqual(None, link_or_sha1)
 
920
        self.assertEqual([('is_exec', mode, False)], state._log)
 
921
        self.assertEqual((b'f', b'', 14, False, dirstate.DirState.NULLSTAT),
 
922
                         entry[1][0])
 
923
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
924
                         state._dirblock_state)
 
925
 
 
926
        # If the file is no longer new, and the clock has been moved forward
 
927
        # sufficiently, it will cache the sha.
 
928
        del state._log[:]
 
929
        state.set_parent_trees(
 
930
            [(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
 
931
            [])
 
932
        entry = state._get_entry(0, path_utf8=b'a')
 
933
 
 
934
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
935
                                         stat_value=stat_value)
 
936
        self.assertEqual(b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
 
937
                         link_or_sha1)
 
938
        self.assertEqual([('is_exec', mode, False), ('sha1', b'a')],
 
939
                         state._log)
 
940
        self.assertEqual((b'f', link_or_sha1, 14, False, packed_stat),
 
941
                         entry[1][0])
 
942
 
 
943
        # Subsequent calls will just return the cached value
 
944
        del state._log[:]
 
945
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
946
                                         stat_value=stat_value)
 
947
        self.assertEqual(b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
 
948
                         link_or_sha1)
 
949
        self.assertEqual([], state._log)
 
950
        self.assertEqual((b'f', link_or_sha1, 14, False, packed_stat),
 
951
                         entry[1][0])
 
952
 
 
953
    def test_update_entry_symlink(self):
 
954
        """Update entry should read symlinks."""
 
955
        self.requireFeature(features.SymlinkFeature)
 
956
        state, entry = self.get_state_with_a()
 
957
        state.save()
 
958
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
959
                         state._dirblock_state)
 
960
        os.symlink('target', 'a')
 
961
 
 
962
        state.adjust_time(-10)  # Make the symlink look new
 
963
        stat_value = os.lstat('a')
 
964
        packed_stat = dirstate.pack_stat(stat_value)
 
965
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
966
                                         stat_value=stat_value)
 
967
        self.assertEqual(b'target', link_or_sha1)
 
968
        self.assertEqual([('read_link', b'a', b'')], state._log)
 
969
        # Dirblock is not updated (the link is too new)
 
970
        self.assertEqual([(b'l', b'', 6, False, dirstate.DirState.NULLSTAT)],
 
971
                         entry[1])
 
972
        # The file entry turned into a symlink, that is considered
 
973
        # HASH modified worthy.
 
974
        self.assertEqual(dirstate.DirState.IN_MEMORY_HASH_MODIFIED,
 
975
                         state._dirblock_state)
 
976
 
 
977
        # Because the stat_value looks new, we should re-read the target
 
978
        del state._log[:]
 
979
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
980
                                         stat_value=stat_value)
 
981
        self.assertEqual(b'target', link_or_sha1)
 
982
        self.assertEqual([('read_link', b'a', b'')], state._log)
 
983
        self.assertEqual([(b'l', b'', 6, False, dirstate.DirState.NULLSTAT)],
 
984
                         entry[1])
 
985
        state.save()
 
986
        state.adjust_time(+20)  # Skip into the future, all files look old
 
987
        del state._log[:]
 
988
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
989
                                         stat_value=stat_value)
 
990
        # The symlink stayed a symlink. So while it is new enough to cache, we
 
991
        # don't bother setting the flag, because it is not really worth saving
 
992
        # (when we stat the symlink, we'll have paged in the target.)
 
993
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
994
                         state._dirblock_state)
 
995
        self.assertEqual(b'target', link_or_sha1)
 
996
        # We need to re-read the link because only now can we cache it
 
997
        self.assertEqual([('read_link', b'a', b'')], state._log)
 
998
        self.assertEqual([(b'l', b'target', 6, False, packed_stat)],
 
999
                         entry[1])
 
1000
 
 
1001
        del state._log[:]
 
1002
        # Another call won't re-read the link
 
1003
        self.assertEqual([], state._log)
 
1004
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
1005
                                         stat_value=stat_value)
 
1006
        self.assertEqual(b'target', link_or_sha1)
 
1007
        self.assertEqual([(b'l', b'target', 6, False, packed_stat)],
 
1008
                         entry[1])
 
1009
 
 
1010
    def do_update_entry(self, state, entry, abspath):
 
1011
        stat_value = os.lstat(abspath)
 
1012
        return self.update_entry(state, entry, abspath, stat_value)
 
1013
 
 
1014
    def test_update_entry_dir(self):
 
1015
        state, entry = self.get_state_with_a()
 
1016
        self.build_tree(['a/'])
 
1017
        self.assertIs(None, self.do_update_entry(state, entry, b'a'))
 
1018
 
 
1019
    def test_update_entry_dir_unchanged(self):
 
1020
        state, entry = self.get_state_with_a()
 
1021
        self.build_tree(['a/'])
 
1022
        state.adjust_time(+20)
 
1023
        self.assertIs(None, self.do_update_entry(state, entry, b'a'))
 
1024
        # a/ used to be a file, but is now a directory, worth saving
 
1025
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
1026
                         state._dirblock_state)
 
1027
        state.save()
 
1028
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1029
                         state._dirblock_state)
 
1030
        # No changes to a/ means not worth saving.
 
1031
        self.assertIs(None, self.do_update_entry(state, entry, b'a'))
 
1032
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1033
                         state._dirblock_state)
 
1034
        # Change the last-modified time for the directory
 
1035
        t = time.time() - 100.0
 
1036
        try:
 
1037
            os.utime('a', (t, t))
 
1038
        except OSError:
 
1039
            # It looks like Win32 + FAT doesn't allow to change times on a dir.
 
1040
            raise tests.TestSkipped("can't update mtime of a dir on FAT")
 
1041
        saved_packed_stat = entry[1][0][-1]
 
1042
        self.assertIs(None, self.do_update_entry(state, entry, b'a'))
 
1043
        # We *do* go ahead and update the information in the dirblocks, but we
 
1044
        # don't bother setting IN_MEMORY_MODIFIED because it is trivial to
 
1045
        # recompute.
 
1046
        self.assertNotEqual(saved_packed_stat, entry[1][0][-1])
 
1047
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1048
                         state._dirblock_state)
 
1049
 
 
1050
    def test_update_entry_file_unchanged(self):
 
1051
        state, _ = self.get_state_with_a()
 
1052
        tree = self.make_branch_and_tree('tree')
 
1053
        tree.lock_write()
 
1054
        self.build_tree(['tree/a'])
 
1055
        tree.add(['a'], [b'a-id'])
 
1056
        with_a_id = tree.commit('witha')
 
1057
        self.addCleanup(tree.unlock)
 
1058
        state.set_parent_trees(
 
1059
            [(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
 
1060
            [])
 
1061
        entry = state._get_entry(0, path_utf8=b'a')
 
1062
        self.build_tree(['a'])
 
1063
        sha1sum = b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
 
1064
        state.adjust_time(+20)
 
1065
        self.assertEqual(sha1sum, self.do_update_entry(state, entry, b'a'))
 
1066
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
1067
                         state._dirblock_state)
 
1068
        state.save()
 
1069
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1070
                         state._dirblock_state)
 
1071
        self.assertEqual(sha1sum, self.do_update_entry(state, entry, b'a'))
 
1072
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1073
                         state._dirblock_state)
 
1074
 
 
1075
    def test_update_entry_tree_reference(self):
 
1076
        state = test_dirstate.InstrumentedDirState.initialize('dirstate')
 
1077
        self.addCleanup(state.unlock)
 
1078
        state.add('r', b'r-id', 'tree-reference', None, b'')
 
1079
        self.build_tree(['r/'])
 
1080
        entry = state._get_entry(0, path_utf8=b'r')
 
1081
        self.do_update_entry(state, entry, 'r')
 
1082
        entry = state._get_entry(0, path_utf8=b'r')
 
1083
        self.assertEqual(b't', entry[1][0][0])
 
1084
 
 
1085
    def create_and_test_file(self, state, entry):
 
1086
        """Create a file at 'a' and verify the state finds it during update.
 
1087
 
 
1088
        The state should already be versioning *something* at 'a'. This makes
 
1089
        sure that state.update_entry recognizes it as a file.
 
1090
        """
 
1091
        self.build_tree(['a'])
 
1092
        stat_value = os.lstat('a')
 
1093
        packed_stat = dirstate.pack_stat(stat_value)
 
1094
 
 
1095
        link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
 
1096
        self.assertEqual(None, link_or_sha1)
 
1097
        self.assertEqual([(b'f', b'', 14, False, dirstate.DirState.NULLSTAT)],
 
1098
                         entry[1])
 
1099
        return packed_stat
 
1100
 
 
1101
    def create_and_test_dir(self, state, entry):
 
1102
        """Create a directory at 'a' and verify the state finds it.
 
1103
 
 
1104
        The state should already be versioning *something* at 'a'. This makes
 
1105
        sure that state.update_entry recognizes it as a directory.
 
1106
        """
 
1107
        self.build_tree(['a/'])
 
1108
        stat_value = os.lstat('a')
 
1109
        packed_stat = dirstate.pack_stat(stat_value)
 
1110
 
 
1111
        link_or_sha1 = self.do_update_entry(state, entry, abspath=b'a')
 
1112
        self.assertIs(None, link_or_sha1)
 
1113
        self.assertEqual([(b'd', b'', 0, False, packed_stat)], entry[1])
 
1114
 
 
1115
        return packed_stat
 
1116
 
 
1117
    # FIXME: Add unicode version
 
1118
    def create_and_test_symlink(self, state, entry):
 
1119
        """Create a symlink at 'a' and verify the state finds it.
 
1120
 
 
1121
        The state should already be versioning *something* at 'a'. This makes
 
1122
        sure that state.update_entry recognizes it as a symlink.
 
1123
 
 
1124
        This should not be called if this platform does not have symlink
 
1125
        support.
 
1126
        """
 
1127
        # caller should care about skipping test on platforms without symlinks
 
1128
        os.symlink('path/to/foo', 'a')
 
1129
 
 
1130
        stat_value = os.lstat('a')
 
1131
        packed_stat = dirstate.pack_stat(stat_value)
 
1132
 
 
1133
        link_or_sha1 = self.do_update_entry(state, entry, abspath=b'a')
 
1134
        self.assertEqual(b'path/to/foo', link_or_sha1)
 
1135
        self.assertEqual([(b'l', b'path/to/foo', 11, False, packed_stat)],
 
1136
                         entry[1])
 
1137
        return packed_stat
 
1138
 
 
1139
    def test_update_file_to_dir(self):
 
1140
        """If a file changes to a directory we return None for the sha.
 
1141
        We also update the inventory record.
 
1142
        """
 
1143
        state, entry = self.get_state_with_a()
 
1144
        # The file sha1 won't be cached unless the file is old
 
1145
        state.adjust_time(+10)
 
1146
        self.create_and_test_file(state, entry)
 
1147
        os.remove('a')
 
1148
        self.create_and_test_dir(state, entry)
 
1149
 
 
1150
    def test_update_file_to_symlink(self):
 
1151
        """File becomes a symlink"""
 
1152
        self.requireFeature(features.SymlinkFeature)
 
1153
        state, entry = self.get_state_with_a()
 
1154
        # The file sha1 won't be cached unless the file is old
 
1155
        state.adjust_time(+10)
 
1156
        self.create_and_test_file(state, entry)
 
1157
        os.remove('a')
 
1158
        self.create_and_test_symlink(state, entry)
 
1159
 
 
1160
    def test_update_dir_to_file(self):
 
1161
        """Directory becoming a file updates the entry."""
 
1162
        state, entry = self.get_state_with_a()
 
1163
        # The file sha1 won't be cached unless the file is old
 
1164
        state.adjust_time(+10)
 
1165
        self.create_and_test_dir(state, entry)
 
1166
        os.rmdir('a')
 
1167
        self.create_and_test_file(state, entry)
 
1168
 
 
1169
    def test_update_dir_to_symlink(self):
 
1170
        """Directory becomes a symlink"""
 
1171
        self.requireFeature(features.SymlinkFeature)
 
1172
        state, entry = self.get_state_with_a()
 
1173
        # The symlink target won't be cached if it isn't old
 
1174
        state.adjust_time(+10)
 
1175
        self.create_and_test_dir(state, entry)
 
1176
        os.rmdir('a')
 
1177
        self.create_and_test_symlink(state, entry)
 
1178
 
 
1179
    def test_update_symlink_to_file(self):
 
1180
        """Symlink becomes a file"""
 
1181
        self.requireFeature(features.SymlinkFeature)
 
1182
        state, entry = self.get_state_with_a()
 
1183
        # The symlink and file info won't be cached unless old
 
1184
        state.adjust_time(+10)
 
1185
        self.create_and_test_symlink(state, entry)
 
1186
        os.remove('a')
 
1187
        self.create_and_test_file(state, entry)
 
1188
 
 
1189
    def test_update_symlink_to_dir(self):
 
1190
        """Symlink becomes a directory"""
 
1191
        self.requireFeature(features.SymlinkFeature)
 
1192
        state, entry = self.get_state_with_a()
 
1193
        # The symlink target won't be cached if it isn't old
 
1194
        state.adjust_time(+10)
 
1195
        self.create_and_test_symlink(state, entry)
 
1196
        os.remove('a')
 
1197
        self.create_and_test_dir(state, entry)
 
1198
 
 
1199
    def test__is_executable_win32(self):
 
1200
        state, entry = self.get_state_with_a()
 
1201
        self.build_tree(['a'])
 
1202
 
 
1203
        # Make sure we are using the version of _is_executable that doesn't
 
1204
        # check the filesystem mode.
 
1205
        state._use_filesystem_for_exec = False
 
1206
 
 
1207
        # The file on disk is not executable, but we are marking it as though
 
1208
        # it is. With _use_filesystem_for_exec disabled we ignore what is on
 
1209
        # disk.
 
1210
        entry[1][0] = (b'f', b'', 0, True, dirstate.DirState.NULLSTAT)
 
1211
 
 
1212
        stat_value = os.lstat('a')
 
1213
        packed_stat = dirstate.pack_stat(stat_value)
 
1214
 
 
1215
        state.adjust_time(-10)  # Make sure everything is new
 
1216
        self.update_entry(state, entry, abspath=b'a', stat_value=stat_value)
 
1217
 
 
1218
        # The row is updated, but the executable bit stays set.
 
1219
        self.assertEqual([(b'f', b'', 14, True, dirstate.DirState.NULLSTAT)],
 
1220
                         entry[1])
 
1221
 
 
1222
        # Make the disk object look old enough to cache (but it won't cache the
 
1223
        # sha as it is a new file).
 
1224
        state.adjust_time(+20)
 
1225
        digest = b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
 
1226
        self.update_entry(state, entry, abspath=b'a', stat_value=stat_value)
 
1227
        self.assertEqual([(b'f', b'', 14, True, dirstate.DirState.NULLSTAT)],
 
1228
                         entry[1])
 
1229
 
 
1230
    def _prepare_tree(self):
 
1231
        # Create a tree
 
1232
        text = b'Hello World\n'
 
1233
        tree = self.make_branch_and_tree('tree')
 
1234
        self.build_tree_contents([('tree/a file', text)])
 
1235
        tree.add('a file', b'a-file-id')
 
1236
        # Note: dirstate does not sha prior to the first commit
 
1237
        # so commit now in order for the test to work
 
1238
        tree.commit('first')
 
1239
        return tree, text
 
1240
 
 
1241
    def test_sha1provider_sha1_used(self):
 
1242
        tree, text = self._prepare_tree()
 
1243
        state = dirstate.DirState.from_tree(tree, 'dirstate',
 
1244
                                            UppercaseSHA1Provider())
 
1245
        self.addCleanup(state.unlock)
 
1246
        expected_sha = osutils.sha_string(text.upper() + b"foo")
 
1247
        entry = state._get_entry(0, path_utf8=b'a file')
 
1248
        self.assertNotEqual((None, None), entry)
 
1249
        state._sha_cutoff_time()
 
1250
        state._cutoff_time += 10
 
1251
        sha1 = self.update_entry(state, entry, 'tree/a file',
 
1252
                                 os.lstat('tree/a file'))
 
1253
        self.assertEqual(expected_sha, sha1)
 
1254
 
 
1255
    def test_sha1provider_stat_and_sha1_used(self):
 
1256
        tree, text = self._prepare_tree()
 
1257
        tree.lock_write()
 
1258
        self.addCleanup(tree.unlock)
 
1259
        state = tree._current_dirstate()
 
1260
        state._sha1_provider = UppercaseSHA1Provider()
 
1261
        # If we used the standard provider, it would look like nothing has
 
1262
        # changed
 
1263
        file_ids_changed = [change.file_id for change
 
1264
                            in tree.iter_changes(tree.basis_tree())]
 
1265
        self.assertEqual([b'a-file-id'], file_ids_changed)
 
1266
 
 
1267
 
 
1268
class UppercaseSHA1Provider(dirstate.SHA1Provider):
 
1269
    """A custom SHA1Provider."""
 
1270
 
 
1271
    def sha1(self, abspath):
 
1272
        return self.stat_and_sha1(abspath)[1]
 
1273
 
 
1274
    def stat_and_sha1(self, abspath):
 
1275
        with open(abspath, 'rb') as file_obj:
 
1276
            statvalue = os.fstat(file_obj.fileno())
 
1277
            text = b''.join(file_obj.readlines())
 
1278
            sha1 = osutils.sha_string(text.upper() + b"foo")
 
1279
        return statvalue, sha1
 
1280
 
 
1281
 
 
1282
class TestProcessEntry(test_dirstate.TestCaseWithDirState):
 
1283
 
 
1284
    scenarios = multiply_scenarios(dir_reader_scenarios(), pe_scenarios)
 
1285
 
 
1286
    # Set by load_tests
 
1287
    _process_entry = None
 
1288
 
 
1289
    def setUp(self):
 
1290
        super(TestProcessEntry, self).setUp()
 
1291
        self.overrideAttr(dirstate, '_process_entry', self._process_entry)
 
1292
 
 
1293
    def assertChangedFileIds(self, expected, tree):
 
1294
        with tree.lock_read():
 
1295
            file_ids = [info.file_id for info
 
1296
                        in tree.iter_changes(tree.basis_tree())]
 
1297
        self.assertEqual(sorted(expected), sorted(file_ids))
 
1298
 
 
1299
    def test_exceptions_raised(self):
 
1300
        # This is a direct test of bug #495023, it relies on osutils.is_inside
 
1301
        # getting called in an inner function. Which makes it a bit brittle,
 
1302
        # but at least it does reproduce the bug.
 
1303
        tree = self.make_branch_and_tree('tree')
 
1304
        self.build_tree(['tree/file', 'tree/dir/', 'tree/dir/sub',
 
1305
                         'tree/dir2/', 'tree/dir2/sub2'])
 
1306
        tree.add(['file', 'dir', 'dir/sub', 'dir2', 'dir2/sub2'])
 
1307
        tree.commit('first commit')
 
1308
        tree.lock_read()
 
1309
        self.addCleanup(tree.unlock)
 
1310
        basis_tree = tree.basis_tree()
 
1311
 
 
1312
        def is_inside_raises(*args, **kwargs):
 
1313
            raise RuntimeError('stop this')
 
1314
        self.overrideAttr(dirstate, 'is_inside', is_inside_raises)
 
1315
        try:
 
1316
            from breezy.bzr import _dirstate_helpers_pyx
 
1317
        except ImportError:
 
1318
            pass
 
1319
        else:
 
1320
            self.overrideAttr(_dirstate_helpers_pyx,
 
1321
                              'is_inside', is_inside_raises)
 
1322
        self.overrideAttr(osutils, 'is_inside', is_inside_raises)
 
1323
        self.assertListRaises(RuntimeError, tree.iter_changes, basis_tree)
 
1324
 
 
1325
    def test_simple_changes(self):
 
1326
        tree = self.make_branch_and_tree('tree')
 
1327
        self.build_tree(['tree/file'])
 
1328
        tree.add(['file'], [b'file-id'])
 
1329
        self.assertChangedFileIds([tree.path2id(''), b'file-id'], tree)
 
1330
        tree.commit('one')
 
1331
        self.assertChangedFileIds([], tree)
 
1332
 
 
1333
    def test_sha1provider_stat_and_sha1_used(self):
 
1334
        tree = self.make_branch_and_tree('tree')
 
1335
        self.build_tree(['tree/file'])
 
1336
        tree.add(['file'], [b'file-id'])
 
1337
        tree.commit('one')
 
1338
        tree.lock_write()
 
1339
        self.addCleanup(tree.unlock)
 
1340
        state = tree._current_dirstate()
 
1341
        state._sha1_provider = UppercaseSHA1Provider()
 
1342
        self.assertChangedFileIds([b'file-id'], tree)
 
1343
 
 
1344
 
 
1345
class TestPackStat(tests.TestCase):
 
1346
    """Check packed representaton of stat values is robust on all inputs"""
 
1347
 
 
1348
    scenarios = helper_scenarios
 
1349
 
 
1350
    def pack(self, statlike_tuple):
 
1351
        return self.helpers.pack_stat(os.stat_result(statlike_tuple))
 
1352
 
 
1353
    @staticmethod
 
1354
    def unpack_field(packed_string, stat_field):
 
1355
        return _dirstate_helpers_py._unpack_stat(packed_string)[stat_field]
 
1356
 
 
1357
    def test_result(self):
 
1358
        self.assertEqual(b"AAAQAAAAABAAAAARAAAAAgAAAAEAAIHk",
 
1359
                         self.pack((33252, 1, 2, 0, 0, 0, 4096, 15.5, 16.5, 17.5)))
 
1360
 
 
1361
    def test_giant_inode(self):
 
1362
        packed = self.pack((33252, 0xF80000ABC, 0, 0, 0, 0, 0, 0, 0, 0))
 
1363
        self.assertEqual(0x80000ABC, self.unpack_field(packed, "st_ino"))
 
1364
 
 
1365
    def test_giant_size(self):
 
1366
        packed = self.pack((33252, 0, 0, 0, 0, 0, (1 << 33) + 4096, 0, 0, 0))
 
1367
        self.assertEqual(4096, self.unpack_field(packed, "st_size"))
 
1368
 
 
1369
    def test_fractional_mtime(self):
 
1370
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 16.9375, 0))
 
1371
        self.assertEqual(16, self.unpack_field(packed, "st_mtime"))
 
1372
 
 
1373
    def test_ancient_mtime(self):
 
1374
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, -11644473600.0, 0))
 
1375
        self.assertEqual(1240428288, self.unpack_field(packed, "st_mtime"))
 
1376
 
 
1377
    def test_distant_mtime(self):
 
1378
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 64060588800.0, 0))
 
1379
        self.assertEqual(3931046656, self.unpack_field(packed, "st_mtime"))
 
1380
 
 
1381
    def test_fractional_ctime(self):
 
1382
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, 17.5625))
 
1383
        self.assertEqual(17, self.unpack_field(packed, "st_ctime"))
 
1384
 
 
1385
    def test_ancient_ctime(self):
 
1386
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, -11644473600.0))
 
1387
        self.assertEqual(1240428288, self.unpack_field(packed, "st_ctime"))
 
1388
 
 
1389
    def test_distant_ctime(self):
 
1390
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, 64060588800.0))
 
1391
        self.assertEqual(3931046656, self.unpack_field(packed, "st_ctime"))
 
1392
 
 
1393
    def test_negative_dev(self):
 
1394
        packed = self.pack((33252, 0, -0xFFFFFCDE, 0, 0, 0, 0, 0, 0, 0))
 
1395
        self.assertEqual(0x322, self.unpack_field(packed, "st_dev"))