/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test__dirstate_helpers.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-08 23:30:31 UTC
  • mto: This revision was merged to the branch mainline in revision 6690.
  • Revision ID: jelmer@jelmer.uk-20170608233031-3qavls2o7a1pqllj
Update imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2007-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the compiled dirstate helpers."""
 
18
 
 
19
import bisect
 
20
import os
 
21
import time
 
22
 
 
23
from .. import (
 
24
    errors,
 
25
    osutils,
 
26
    tests,
 
27
    )
 
28
from ..bzr import (
 
29
    dirstate,
 
30
    _dirstate_helpers_py,
 
31
    )
 
32
from . import (
 
33
    test_dirstate,
 
34
    )
 
35
from .test_osutils import dir_reader_scenarios
 
36
from .scenarios import (
 
37
    load_tests_apply_scenarios,
 
38
    multiply_scenarios,
 
39
    )
 
40
from . import (
 
41
    features,
 
42
    )
 
43
 
 
44
 
 
45
load_tests = load_tests_apply_scenarios
 
46
 
 
47
 
 
48
compiled_dirstate_helpers_feature = features.ModuleAvailableFeature(
 
49
    'breezy._dirstate_helpers_pyx')
 
50
 
 
51
 
 
52
# FIXME: we should also parametrize against SHA1Provider !
 
53
 
 
54
ue_scenarios = [('dirstate_Python',
 
55
    {'update_entry': dirstate.py_update_entry})]
 
56
if compiled_dirstate_helpers_feature.available():
 
57
    update_entry = compiled_dirstate_helpers_feature.module.update_entry
 
58
    ue_scenarios.append(('dirstate_Pyrex', {'update_entry': update_entry}))
 
59
 
 
60
pe_scenarios = [('dirstate_Python',
 
61
    {'_process_entry': dirstate.ProcessEntryPython})]
 
62
if compiled_dirstate_helpers_feature.available():
 
63
    process_entry = compiled_dirstate_helpers_feature.module.ProcessEntryC
 
64
    pe_scenarios.append(('dirstate_Pyrex', {'_process_entry': process_entry}))
 
65
 
 
66
helper_scenarios = [('dirstate_Python', {'helpers': _dirstate_helpers_py})]
 
67
if compiled_dirstate_helpers_feature.available():
 
68
    helper_scenarios.append(('dirstate_Pyrex',
 
69
        {'helpers': compiled_dirstate_helpers_feature.module}))
 
70
 
 
71
 
 
72
class TestBisectPathMixin(object):
 
73
    """Test that _bisect_path_*() returns the expected values.
 
74
 
 
75
    _bisect_path_* is intended to work like bisect.bisect_*() except it
 
76
    knows it is working on paths that are sorted by ('path', 'to', 'foo')
 
77
    chunks rather than by raw 'path/to/foo'.
 
78
 
 
79
    Test Cases should inherit from this and override ``get_bisect_path`` return
 
80
    their implementation, and ``get_bisect`` to return the matching
 
81
    bisect.bisect_* function.
 
82
    """
 
83
 
 
84
    def get_bisect_path(self):
 
85
        """Return an implementation of _bisect_path_*"""
 
86
        raise NotImplementedError
 
87
 
 
88
    def get_bisect(self):
 
89
        """Return a version of bisect.bisect_*.
 
90
 
 
91
        Also, for the 'exists' check, return the offset to the real values.
 
92
        For example bisect_left returns the index of an entry, while
 
93
        bisect_right returns the index *after* an entry
 
94
 
 
95
        :return: (bisect_func, offset)
 
96
        """
 
97
        raise NotImplementedError
 
98
 
 
99
    def assertBisect(self, paths, split_paths, path, exists=True):
 
100
        """Assert that bisect_split works like bisect_left on the split paths.
 
101
 
 
102
        :param paths: A list of path names
 
103
        :param split_paths: A list of path names that are already split up by directory
 
104
            ('path/to/foo' => ('path', 'to', 'foo'))
 
105
        :param path: The path we are indexing.
 
106
        :param exists: The path should be present, so make sure the
 
107
            final location actually points to the right value.
 
108
 
 
109
        All other arguments will be passed along.
 
110
        """
 
111
        bisect_path = self.get_bisect_path()
 
112
        self.assertIsInstance(paths, list)
 
113
        bisect_path_idx = bisect_path(paths, path)
 
114
        split_path = self.split_for_dirblocks([path])[0]
 
115
        bisect_func, offset = self.get_bisect()
 
116
        bisect_split_idx = bisect_func(split_paths, split_path)
 
117
        self.assertEqual(bisect_split_idx, bisect_path_idx,
 
118
                         '%s disagreed. %s != %s'
 
119
                         ' for key %r'
 
120
                         % (bisect_path.__name__,
 
121
                            bisect_split_idx, bisect_path_idx, path)
 
122
                         )
 
123
        if exists:
 
124
            self.assertEqual(path, paths[bisect_path_idx+offset])
 
125
 
 
126
    def split_for_dirblocks(self, paths):
 
127
        dir_split_paths = []
 
128
        for path in paths:
 
129
            dirname, basename = os.path.split(path)
 
130
            dir_split_paths.append((dirname.split('/'), basename))
 
131
        dir_split_paths.sort()
 
132
        return dir_split_paths
 
133
 
 
134
    def test_simple(self):
 
135
        """In the simple case it works just like bisect_left"""
 
136
        paths = ['', 'a', 'b', 'c', 'd']
 
137
        split_paths = self.split_for_dirblocks(paths)
 
138
        for path in paths:
 
139
            self.assertBisect(paths, split_paths, path, exists=True)
 
140
        self.assertBisect(paths, split_paths, '_', exists=False)
 
141
        self.assertBisect(paths, split_paths, 'aa', exists=False)
 
142
        self.assertBisect(paths, split_paths, 'bb', exists=False)
 
143
        self.assertBisect(paths, split_paths, 'cc', exists=False)
 
144
        self.assertBisect(paths, split_paths, 'dd', exists=False)
 
145
        self.assertBisect(paths, split_paths, 'a/a', exists=False)
 
146
        self.assertBisect(paths, split_paths, 'b/b', exists=False)
 
147
        self.assertBisect(paths, split_paths, 'c/c', exists=False)
 
148
        self.assertBisect(paths, split_paths, 'd/d', exists=False)
 
149
 
 
150
    def test_involved(self):
 
151
        """This is where bisect_path_* diverges slightly."""
 
152
        # This is the list of paths and their contents
 
153
        # a/
 
154
        #   a/
 
155
        #     a
 
156
        #     z
 
157
        #   a-a/
 
158
        #     a
 
159
        #   a-z/
 
160
        #     z
 
161
        #   a=a/
 
162
        #     a
 
163
        #   a=z/
 
164
        #     z
 
165
        #   z/
 
166
        #     a
 
167
        #     z
 
168
        #   z-a
 
169
        #   z-z
 
170
        #   z=a
 
171
        #   z=z
 
172
        # a-a/
 
173
        #   a
 
174
        # a-z/
 
175
        #   z
 
176
        # a=a/
 
177
        #   a
 
178
        # a=z/
 
179
        #   z
 
180
        # This is the exact order that is stored by dirstate
 
181
        # All children in a directory are mentioned before an children of
 
182
        # children are mentioned.
 
183
        # So all the root-directory paths, then all the
 
184
        # first sub directory, etc.
 
185
        paths = [# content of '/'
 
186
                 '', 'a', 'a-a', 'a-z', 'a=a', 'a=z',
 
187
                 # content of 'a/'
 
188
                 'a/a', 'a/a-a', 'a/a-z',
 
189
                 'a/a=a', 'a/a=z',
 
190
                 'a/z', 'a/z-a', 'a/z-z',
 
191
                 'a/z=a', 'a/z=z',
 
192
                 # content of 'a/a/'
 
193
                 'a/a/a', 'a/a/z',
 
194
                 # content of 'a/a-a'
 
195
                 'a/a-a/a',
 
196
                 # content of 'a/a-z'
 
197
                 'a/a-z/z',
 
198
                 # content of 'a/a=a'
 
199
                 'a/a=a/a',
 
200
                 # content of 'a/a=z'
 
201
                 'a/a=z/z',
 
202
                 # content of 'a/z/'
 
203
                 'a/z/a', 'a/z/z',
 
204
                 # content of 'a-a'
 
205
                 'a-a/a',
 
206
                 # content of 'a-z'
 
207
                 'a-z/z',
 
208
                 # content of 'a=a'
 
209
                 'a=a/a',
 
210
                 # content of 'a=z'
 
211
                 'a=z/z',
 
212
                ]
 
213
        split_paths = self.split_for_dirblocks(paths)
 
214
        sorted_paths = []
 
215
        for dir_parts, basename in split_paths:
 
216
            if dir_parts == ['']:
 
217
                sorted_paths.append(basename)
 
218
            else:
 
219
                sorted_paths.append('/'.join(dir_parts + [basename]))
 
220
 
 
221
        self.assertEqual(sorted_paths, paths)
 
222
 
 
223
        for path in paths:
 
224
            self.assertBisect(paths, split_paths, path, exists=True)
 
225
 
 
226
 
 
227
class TestBisectPathLeft(tests.TestCase, TestBisectPathMixin):
 
228
    """Run all Bisect Path tests against _bisect_path_left."""
 
229
 
 
230
    def get_bisect_path(self):
 
231
        from breezy._dirstate_helpers_py import _bisect_path_left
 
232
        return _bisect_path_left
 
233
 
 
234
    def get_bisect(self):
 
235
        return bisect.bisect_left, 0
 
236
 
 
237
 
 
238
class TestCompiledBisectPathLeft(TestBisectPathLeft):
 
239
    """Run all Bisect Path tests against _bisect_path_lect"""
 
240
 
 
241
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
242
 
 
243
    def get_bisect_path(self):
 
244
        from breezy._dirstate_helpers_pyx import _bisect_path_left
 
245
        return _bisect_path_left
 
246
 
 
247
 
 
248
class TestBisectPathRight(tests.TestCase, TestBisectPathMixin):
 
249
    """Run all Bisect Path tests against _bisect_path_right"""
 
250
 
 
251
    def get_bisect_path(self):
 
252
        from breezy._dirstate_helpers_py import _bisect_path_right
 
253
        return _bisect_path_right
 
254
 
 
255
    def get_bisect(self):
 
256
        return bisect.bisect_right, -1
 
257
 
 
258
 
 
259
class TestCompiledBisectPathRight(TestBisectPathRight):
 
260
    """Run all Bisect Path tests against _bisect_path_right"""
 
261
 
 
262
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
263
 
 
264
    def get_bisect_path(self):
 
265
        from breezy._dirstate_helpers_pyx import _bisect_path_right
 
266
        return _bisect_path_right
 
267
 
 
268
 
 
269
class TestBisectDirblock(tests.TestCase):
 
270
    """Test that bisect_dirblock() returns the expected values.
 
271
 
 
272
    bisect_dirblock is intended to work like bisect.bisect_left() except it
 
273
    knows it is working on dirblocks and that dirblocks are sorted by ('path',
 
274
    'to', 'foo') chunks rather than by raw 'path/to/foo'.
 
275
 
 
276
    This test is parameterized by calling get_bisect_dirblock(). Child test
 
277
    cases can override this function to test against a different
 
278
    implementation.
 
279
    """
 
280
 
 
281
    def get_bisect_dirblock(self):
 
282
        """Return an implementation of bisect_dirblock"""
 
283
        from breezy._dirstate_helpers_py import bisect_dirblock
 
284
        return bisect_dirblock
 
285
 
 
286
    def assertBisect(self, dirblocks, split_dirblocks, path, *args, **kwargs):
 
287
        """Assert that bisect_split works like bisect_left on the split paths.
 
288
 
 
289
        :param dirblocks: A list of (path, [info]) pairs.
 
290
        :param split_dirblocks: A list of ((split, path), [info]) pairs.
 
291
        :param path: The path we are indexing.
 
292
 
 
293
        All other arguments will be passed along.
 
294
        """
 
295
        bisect_dirblock = self.get_bisect_dirblock()
 
296
        self.assertIsInstance(dirblocks, list)
 
297
        bisect_split_idx = bisect_dirblock(dirblocks, path, *args, **kwargs)
 
298
        split_dirblock = (path.split('/'), [])
 
299
        bisect_left_idx = bisect.bisect_left(split_dirblocks, split_dirblock,
 
300
                                             *args)
 
301
        self.assertEqual(bisect_left_idx, bisect_split_idx,
 
302
                         'bisect_split disagreed. %s != %s'
 
303
                         ' for key %r'
 
304
                         % (bisect_left_idx, bisect_split_idx, path)
 
305
                         )
 
306
 
 
307
    def paths_to_dirblocks(self, paths):
 
308
        """Convert a list of paths into dirblock form.
 
309
 
 
310
        Also, ensure that the paths are in proper sorted order.
 
311
        """
 
312
        dirblocks = [(path, []) for path in paths]
 
313
        split_dirblocks = [(path.split('/'), []) for path in paths]
 
314
        self.assertEqual(sorted(split_dirblocks), split_dirblocks)
 
315
        return dirblocks, split_dirblocks
 
316
 
 
317
    def test_simple(self):
 
318
        """In the simple case it works just like bisect_left"""
 
319
        paths = ['', 'a', 'b', 'c', 'd']
 
320
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
 
321
        for path in paths:
 
322
            self.assertBisect(dirblocks, split_dirblocks, path)
 
323
        self.assertBisect(dirblocks, split_dirblocks, '_')
 
324
        self.assertBisect(dirblocks, split_dirblocks, 'aa')
 
325
        self.assertBisect(dirblocks, split_dirblocks, 'bb')
 
326
        self.assertBisect(dirblocks, split_dirblocks, 'cc')
 
327
        self.assertBisect(dirblocks, split_dirblocks, 'dd')
 
328
        self.assertBisect(dirblocks, split_dirblocks, 'a/a')
 
329
        self.assertBisect(dirblocks, split_dirblocks, 'b/b')
 
330
        self.assertBisect(dirblocks, split_dirblocks, 'c/c')
 
331
        self.assertBisect(dirblocks, split_dirblocks, 'd/d')
 
332
 
 
333
    def test_involved(self):
 
334
        """This is where bisect_left diverges slightly."""
 
335
        paths = ['', 'a',
 
336
                 'a/a', 'a/a/a', 'a/a/z', 'a/a-a', 'a/a-z',
 
337
                 'a/z', 'a/z/a', 'a/z/z', 'a/z-a', 'a/z-z',
 
338
                 'a-a', 'a-z',
 
339
                 'z', 'z/a/a', 'z/a/z', 'z/a-a', 'z/a-z',
 
340
                 'z/z', 'z/z/a', 'z/z/z', 'z/z-a', 'z/z-z',
 
341
                 'z-a', 'z-z',
 
342
                ]
 
343
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
 
344
        for path in paths:
 
345
            self.assertBisect(dirblocks, split_dirblocks, path)
 
346
 
 
347
    def test_involved_cached(self):
 
348
        """This is where bisect_left diverges slightly."""
 
349
        paths = ['', 'a',
 
350
                 'a/a', 'a/a/a', 'a/a/z', 'a/a-a', 'a/a-z',
 
351
                 'a/z', 'a/z/a', 'a/z/z', 'a/z-a', 'a/z-z',
 
352
                 'a-a', 'a-z',
 
353
                 'z', 'z/a/a', 'z/a/z', 'z/a-a', 'z/a-z',
 
354
                 'z/z', 'z/z/a', 'z/z/z', 'z/z-a', 'z/z-z',
 
355
                 'z-a', 'z-z',
 
356
                ]
 
357
        cache = {}
 
358
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
 
359
        for path in paths:
 
360
            self.assertBisect(dirblocks, split_dirblocks, path, cache=cache)
 
361
 
 
362
 
 
363
class TestCompiledBisectDirblock(TestBisectDirblock):
 
364
    """Test that bisect_dirblock() returns the expected values.
 
365
 
 
366
    bisect_dirblock is intended to work like bisect.bisect_left() except it
 
367
    knows it is working on dirblocks and that dirblocks are sorted by ('path',
 
368
    'to', 'foo') chunks rather than by raw 'path/to/foo'.
 
369
 
 
370
    This runs all the normal tests that TestBisectDirblock did, but uses the
 
371
    compiled version.
 
372
    """
 
373
 
 
374
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
375
 
 
376
    def get_bisect_dirblock(self):
 
377
        from breezy._dirstate_helpers_pyx import bisect_dirblock
 
378
        return bisect_dirblock
 
379
 
 
380
 
 
381
class TestCmpByDirs(tests.TestCase):
 
382
    """Test an implementation of cmp_by_dirs()
 
383
 
 
384
    cmp_by_dirs() compares 2 paths by their directory sections, rather than as
 
385
    plain strings.
 
386
 
 
387
    Child test cases can override ``get_cmp_by_dirs`` to test a specific
 
388
    implementation.
 
389
    """
 
390
 
 
391
    def get_cmp_by_dirs(self):
 
392
        """Get a specific implementation of cmp_by_dirs."""
 
393
        from breezy._dirstate_helpers_py import cmp_by_dirs
 
394
        return cmp_by_dirs
 
395
 
 
396
    def assertCmpByDirs(self, expected, str1, str2):
 
397
        """Compare the two strings, in both directions.
 
398
 
 
399
        :param expected: The expected comparison value. -1 means str1 comes
 
400
            first, 0 means they are equal, 1 means str2 comes first
 
401
        :param str1: string to compare
 
402
        :param str2: string to compare
 
403
        """
 
404
        cmp_by_dirs = self.get_cmp_by_dirs()
 
405
        if expected == 0:
 
406
            self.assertEqual(str1, str2)
 
407
            self.assertEqual(0, cmp_by_dirs(str1, str2))
 
408
            self.assertEqual(0, cmp_by_dirs(str2, str1))
 
409
        elif expected > 0:
 
410
            self.assertPositive(cmp_by_dirs(str1, str2))
 
411
            self.assertNegative(cmp_by_dirs(str2, str1))
 
412
        else:
 
413
            self.assertNegative(cmp_by_dirs(str1, str2))
 
414
            self.assertPositive(cmp_by_dirs(str2, str1))
 
415
 
 
416
    def test_cmp_empty(self):
 
417
        """Compare against the empty string."""
 
418
        self.assertCmpByDirs(0, '', '')
 
419
        self.assertCmpByDirs(1, 'a', '')
 
420
        self.assertCmpByDirs(1, 'ab', '')
 
421
        self.assertCmpByDirs(1, 'abc', '')
 
422
        self.assertCmpByDirs(1, 'abcd', '')
 
423
        self.assertCmpByDirs(1, 'abcde', '')
 
424
        self.assertCmpByDirs(1, 'abcdef', '')
 
425
        self.assertCmpByDirs(1, 'abcdefg', '')
 
426
        self.assertCmpByDirs(1, 'abcdefgh', '')
 
427
        self.assertCmpByDirs(1, 'abcdefghi', '')
 
428
        self.assertCmpByDirs(1, 'test/ing/a/path/', '')
 
429
 
 
430
    def test_cmp_same_str(self):
 
431
        """Compare the same string"""
 
432
        self.assertCmpByDirs(0, 'a', 'a')
 
433
        self.assertCmpByDirs(0, 'ab', 'ab')
 
434
        self.assertCmpByDirs(0, 'abc', 'abc')
 
435
        self.assertCmpByDirs(0, 'abcd', 'abcd')
 
436
        self.assertCmpByDirs(0, 'abcde', 'abcde')
 
437
        self.assertCmpByDirs(0, 'abcdef', 'abcdef')
 
438
        self.assertCmpByDirs(0, 'abcdefg', 'abcdefg')
 
439
        self.assertCmpByDirs(0, 'abcdefgh', 'abcdefgh')
 
440
        self.assertCmpByDirs(0, 'abcdefghi', 'abcdefghi')
 
441
        self.assertCmpByDirs(0, 'testing a long string', 'testing a long string')
 
442
        self.assertCmpByDirs(0, 'x'*10000, 'x'*10000)
 
443
        self.assertCmpByDirs(0, 'a/b', 'a/b')
 
444
        self.assertCmpByDirs(0, 'a/b/c', 'a/b/c')
 
445
        self.assertCmpByDirs(0, 'a/b/c/d', 'a/b/c/d')
 
446
        self.assertCmpByDirs(0, 'a/b/c/d/e', 'a/b/c/d/e')
 
447
 
 
448
    def test_simple_paths(self):
 
449
        """Compare strings that act like normal string comparison"""
 
450
        self.assertCmpByDirs(-1, 'a', 'b')
 
451
        self.assertCmpByDirs(-1, 'aa', 'ab')
 
452
        self.assertCmpByDirs(-1, 'ab', 'bb')
 
453
        self.assertCmpByDirs(-1, 'aaa', 'aab')
 
454
        self.assertCmpByDirs(-1, 'aab', 'abb')
 
455
        self.assertCmpByDirs(-1, 'abb', 'bbb')
 
456
        self.assertCmpByDirs(-1, 'aaaa', 'aaab')
 
457
        self.assertCmpByDirs(-1, 'aaab', 'aabb')
 
458
        self.assertCmpByDirs(-1, 'aabb', 'abbb')
 
459
        self.assertCmpByDirs(-1, 'abbb', 'bbbb')
 
460
        self.assertCmpByDirs(-1, 'aaaaa', 'aaaab')
 
461
        self.assertCmpByDirs(-1, 'a/a', 'a/b')
 
462
        self.assertCmpByDirs(-1, 'a/b', 'b/b')
 
463
        self.assertCmpByDirs(-1, 'a/a/a', 'a/a/b')
 
464
        self.assertCmpByDirs(-1, 'a/a/b', 'a/b/b')
 
465
        self.assertCmpByDirs(-1, 'a/b/b', 'b/b/b')
 
466
        self.assertCmpByDirs(-1, 'a/a/a/a', 'a/a/a/b')
 
467
        self.assertCmpByDirs(-1, 'a/a/a/b', 'a/a/b/b')
 
468
        self.assertCmpByDirs(-1, 'a/a/b/b', 'a/b/b/b')
 
469
        self.assertCmpByDirs(-1, 'a/b/b/b', 'b/b/b/b')
 
470
        self.assertCmpByDirs(-1, 'a/a/a/a/a', 'a/a/a/a/b')
 
471
 
 
472
    def test_tricky_paths(self):
 
473
        self.assertCmpByDirs(1, 'ab/cd/ef', 'ab/cc/ef')
 
474
        self.assertCmpByDirs(1, 'ab/cd/ef', 'ab/c/ef')
 
475
        self.assertCmpByDirs(-1, 'ab/cd/ef', 'ab/cd-ef')
 
476
        self.assertCmpByDirs(-1, 'ab/cd', 'ab/cd-')
 
477
        self.assertCmpByDirs(-1, 'ab/cd', 'ab-cd')
 
478
 
 
479
    def test_cmp_unicode_not_allowed(self):
 
480
        cmp_by_dirs = self.get_cmp_by_dirs()
 
481
        self.assertRaises(TypeError, cmp_by_dirs, u'Unicode', 'str')
 
482
        self.assertRaises(TypeError, cmp_by_dirs, 'str', u'Unicode')
 
483
        self.assertRaises(TypeError, cmp_by_dirs, u'Unicode', u'Unicode')
 
484
 
 
485
    def test_cmp_non_ascii(self):
 
486
        self.assertCmpByDirs(-1, '\xc2\xb5', '\xc3\xa5') # u'\xb5', u'\xe5'
 
487
        self.assertCmpByDirs(-1, 'a', '\xc3\xa5') # u'a', u'\xe5'
 
488
        self.assertCmpByDirs(-1, 'b', '\xc2\xb5') # u'b', u'\xb5'
 
489
        self.assertCmpByDirs(-1, 'a/b', 'a/\xc3\xa5') # u'a/b', u'a/\xe5'
 
490
        self.assertCmpByDirs(-1, 'b/a', 'b/\xc2\xb5') # u'b/a', u'b/\xb5'
 
491
 
 
492
 
 
493
class TestCompiledCmpByDirs(TestCmpByDirs):
 
494
    """Test the pyrex implementation of cmp_by_dirs"""
 
495
 
 
496
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
497
 
 
498
    def get_cmp_by_dirs(self):
 
499
        from breezy._dirstate_helpers_pyx import cmp_by_dirs
 
500
        return cmp_by_dirs
 
501
 
 
502
 
 
503
class TestCmpPathByDirblock(tests.TestCase):
 
504
    """Test an implementation of _cmp_path_by_dirblock()
 
505
 
 
506
    _cmp_path_by_dirblock() compares two paths using the sort order used by
 
507
    DirState. All paths in the same directory are sorted together.
 
508
 
 
509
    Child test cases can override ``get_cmp_path_by_dirblock`` to test a specific
 
510
    implementation.
 
511
    """
 
512
 
 
513
    def get_cmp_path_by_dirblock(self):
 
514
        """Get a specific implementation of _cmp_path_by_dirblock."""
 
515
        from breezy._dirstate_helpers_py import _cmp_path_by_dirblock
 
516
        return _cmp_path_by_dirblock
 
517
 
 
518
    def assertCmpPathByDirblock(self, paths):
 
519
        """Compare all paths and make sure they evaluate to the correct order.
 
520
 
 
521
        This does N^2 comparisons. It is assumed that ``paths`` is properly
 
522
        sorted list.
 
523
 
 
524
        :param paths: a sorted list of paths to compare
 
525
        """
 
526
        # First, make sure the paths being passed in are correct
 
527
        def _key(p):
 
528
            dirname, basename = os.path.split(p)
 
529
            return dirname.split('/'), basename
 
530
        self.assertEqual(sorted(paths, key=_key), paths)
 
531
 
 
532
        cmp_path_by_dirblock = self.get_cmp_path_by_dirblock()
 
533
        for idx1, path1 in enumerate(paths):
 
534
            for idx2, path2 in enumerate(paths):
 
535
                cmp_val = cmp_path_by_dirblock(path1, path2)
 
536
                if idx1 < idx2:
 
537
                    self.assertTrue(cmp_val < 0,
 
538
                        '%s did not state that %r came before %r, cmp=%s'
 
539
                        % (cmp_path_by_dirblock.__name__,
 
540
                           path1, path2, cmp_val))
 
541
                elif idx1 > idx2:
 
542
                    self.assertTrue(cmp_val > 0,
 
543
                        '%s did not state that %r came after %r, cmp=%s'
 
544
                        % (cmp_path_by_dirblock.__name__,
 
545
                           path1, path2, cmp_val))
 
546
                else: # idx1 == idx2
 
547
                    self.assertTrue(cmp_val == 0,
 
548
                        '%s did not state that %r == %r, cmp=%s'
 
549
                        % (cmp_path_by_dirblock.__name__,
 
550
                           path1, path2, cmp_val))
 
551
 
 
552
    def test_cmp_simple_paths(self):
 
553
        """Compare against the empty string."""
 
554
        self.assertCmpPathByDirblock(['', 'a', 'ab', 'abc', 'a/b/c', 'b/d/e'])
 
555
        self.assertCmpPathByDirblock(['kl', 'ab/cd', 'ab/ef', 'gh/ij'])
 
556
 
 
557
    def test_tricky_paths(self):
 
558
        self.assertCmpPathByDirblock([
 
559
            # Contents of ''
 
560
            '', 'a', 'a-a', 'a=a', 'b',
 
561
            # Contents of 'a'
 
562
            'a/a', 'a/a-a', 'a/a=a', 'a/b',
 
563
            # Contents of 'a/a'
 
564
            'a/a/a', 'a/a/a-a', 'a/a/a=a',
 
565
            # Contents of 'a/a/a'
 
566
            'a/a/a/a', 'a/a/a/b',
 
567
            # Contents of 'a/a/a-a',
 
568
            'a/a/a-a/a', 'a/a/a-a/b',
 
569
            # Contents of 'a/a/a=a',
 
570
            'a/a/a=a/a', 'a/a/a=a/b',
 
571
            # Contents of 'a/a-a'
 
572
            'a/a-a/a',
 
573
            # Contents of 'a/a-a/a'
 
574
            'a/a-a/a/a', 'a/a-a/a/b',
 
575
            # Contents of 'a/a=a'
 
576
            'a/a=a/a',
 
577
            # Contents of 'a/b'
 
578
            'a/b/a', 'a/b/b',
 
579
            # Contents of 'a-a',
 
580
            'a-a/a', 'a-a/b',
 
581
            # Contents of 'a=a',
 
582
            'a=a/a', 'a=a/b',
 
583
            # Contents of 'b',
 
584
            'b/a', 'b/b',
 
585
            ])
 
586
        self.assertCmpPathByDirblock([
 
587
                 # content of '/'
 
588
                 '', 'a', 'a-a', 'a-z', 'a=a', 'a=z',
 
589
                 # content of 'a/'
 
590
                 'a/a', 'a/a-a', 'a/a-z',
 
591
                 'a/a=a', 'a/a=z',
 
592
                 'a/z', 'a/z-a', 'a/z-z',
 
593
                 'a/z=a', 'a/z=z',
 
594
                 # content of 'a/a/'
 
595
                 'a/a/a', 'a/a/z',
 
596
                 # content of 'a/a-a'
 
597
                 'a/a-a/a',
 
598
                 # content of 'a/a-z'
 
599
                 'a/a-z/z',
 
600
                 # content of 'a/a=a'
 
601
                 'a/a=a/a',
 
602
                 # content of 'a/a=z'
 
603
                 'a/a=z/z',
 
604
                 # content of 'a/z/'
 
605
                 'a/z/a', 'a/z/z',
 
606
                 # content of 'a-a'
 
607
                 'a-a/a',
 
608
                 # content of 'a-z'
 
609
                 'a-z/z',
 
610
                 # content of 'a=a'
 
611
                 'a=a/a',
 
612
                 # content of 'a=z'
 
613
                 'a=z/z',
 
614
                ])
 
615
 
 
616
    def test_unicode_not_allowed(self):
 
617
        cmp_path_by_dirblock = self.get_cmp_path_by_dirblock()
 
618
        self.assertRaises(TypeError, cmp_path_by_dirblock, u'Uni', 'str')
 
619
        self.assertRaises(TypeError, cmp_path_by_dirblock, 'str', u'Uni')
 
620
        self.assertRaises(TypeError, cmp_path_by_dirblock, u'Uni', u'Uni')
 
621
        self.assertRaises(TypeError, cmp_path_by_dirblock, u'x/Uni', 'x/str')
 
622
        self.assertRaises(TypeError, cmp_path_by_dirblock, 'x/str', u'x/Uni')
 
623
        self.assertRaises(TypeError, cmp_path_by_dirblock, u'x/Uni', u'x/Uni')
 
624
 
 
625
    def test_nonascii(self):
 
626
        self.assertCmpPathByDirblock([
 
627
            # content of '/'
 
628
            '', 'a', '\xc2\xb5', '\xc3\xa5',
 
629
            # content of 'a'
 
630
            'a/a', 'a/\xc2\xb5', 'a/\xc3\xa5',
 
631
            # content of 'a/a'
 
632
            'a/a/a', 'a/a/\xc2\xb5', 'a/a/\xc3\xa5',
 
633
            # content of 'a/\xc2\xb5'
 
634
            'a/\xc2\xb5/a', 'a/\xc2\xb5/\xc2\xb5', 'a/\xc2\xb5/\xc3\xa5',
 
635
            # content of 'a/\xc3\xa5'
 
636
            'a/\xc3\xa5/a', 'a/\xc3\xa5/\xc2\xb5', 'a/\xc3\xa5/\xc3\xa5',
 
637
            # content of '\xc2\xb5'
 
638
            '\xc2\xb5/a', '\xc2\xb5/\xc2\xb5', '\xc2\xb5/\xc3\xa5',
 
639
            # content of '\xc2\xe5'
 
640
            '\xc3\xa5/a', '\xc3\xa5/\xc2\xb5', '\xc3\xa5/\xc3\xa5',
 
641
            ])
 
642
 
 
643
 
 
644
class TestCompiledCmpPathByDirblock(TestCmpPathByDirblock):
 
645
    """Test the pyrex implementation of _cmp_path_by_dirblock"""
 
646
 
 
647
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
648
 
 
649
    def get_cmp_by_dirs(self):
 
650
        from breezy._dirstate_helpers_pyx import _cmp_path_by_dirblock
 
651
        return _cmp_path_by_dirblock
 
652
 
 
653
 
 
654
class TestMemRChr(tests.TestCase):
 
655
    """Test memrchr functionality"""
 
656
 
 
657
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
658
 
 
659
    def assertMemRChr(self, expected, s, c):
 
660
        from breezy._dirstate_helpers_pyx import _py_memrchr
 
661
        self.assertEqual(expected, _py_memrchr(s, c))
 
662
 
 
663
    def test_missing(self):
 
664
        self.assertMemRChr(None, '', 'a')
 
665
        self.assertMemRChr(None, '', 'c')
 
666
        self.assertMemRChr(None, 'abcdefghijklm', 'q')
 
667
        self.assertMemRChr(None, 'aaaaaaaaaaaaaaaaaaaaaaa', 'b')
 
668
 
 
669
    def test_single_entry(self):
 
670
        self.assertMemRChr(0, 'abcdefghijklm', 'a')
 
671
        self.assertMemRChr(1, 'abcdefghijklm', 'b')
 
672
        self.assertMemRChr(2, 'abcdefghijklm', 'c')
 
673
        self.assertMemRChr(10, 'abcdefghijklm', 'k')
 
674
        self.assertMemRChr(11, 'abcdefghijklm', 'l')
 
675
        self.assertMemRChr(12, 'abcdefghijklm', 'm')
 
676
 
 
677
    def test_multiple(self):
 
678
        self.assertMemRChr(10, 'abcdefjklmabcdefghijklm', 'a')
 
679
        self.assertMemRChr(11, 'abcdefjklmabcdefghijklm', 'b')
 
680
        self.assertMemRChr(12, 'abcdefjklmabcdefghijklm', 'c')
 
681
        self.assertMemRChr(20, 'abcdefjklmabcdefghijklm', 'k')
 
682
        self.assertMemRChr(21, 'abcdefjklmabcdefghijklm', 'l')
 
683
        self.assertMemRChr(22, 'abcdefjklmabcdefghijklm', 'm')
 
684
        self.assertMemRChr(22, 'aaaaaaaaaaaaaaaaaaaaaaa', 'a')
 
685
 
 
686
    def test_with_nulls(self):
 
687
        self.assertMemRChr(10, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'a')
 
688
        self.assertMemRChr(11, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'b')
 
689
        self.assertMemRChr(12, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'c')
 
690
        self.assertMemRChr(20, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'k')
 
691
        self.assertMemRChr(21, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'l')
 
692
        self.assertMemRChr(22, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'm')
 
693
        self.assertMemRChr(22, 'aaa\0\0\0aaaaaaa\0\0\0aaaaaaa', 'a')
 
694
        self.assertMemRChr(9, '\0\0\0\0\0\0\0\0\0\0', '\0')
 
695
 
 
696
 
 
697
class TestReadDirblocks(test_dirstate.TestCaseWithDirState):
 
698
    """Test an implementation of _read_dirblocks()
 
699
 
 
700
    _read_dirblocks() reads in all of the dirblock information from the disk
 
701
    file.
 
702
 
 
703
    Child test cases can override ``get_read_dirblocks`` to test a specific
 
704
    implementation.
 
705
    """
 
706
 
 
707
    # inherits scenarios from test_dirstate
 
708
 
 
709
    def get_read_dirblocks(self):
 
710
        from breezy._dirstate_helpers_py import _read_dirblocks
 
711
        return _read_dirblocks
 
712
 
 
713
    def test_smoketest(self):
 
714
        """Make sure that we can create and read back a simple file."""
 
715
        tree, state, expected = self.create_basic_dirstate()
 
716
        del tree
 
717
        state._read_header_if_needed()
 
718
        self.assertEqual(dirstate.DirState.NOT_IN_MEMORY,
 
719
                         state._dirblock_state)
 
720
        read_dirblocks = self.get_read_dirblocks()
 
721
        read_dirblocks(state)
 
722
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
723
                         state._dirblock_state)
 
724
 
 
725
    def test_trailing_garbage(self):
 
726
        tree, state, expected = self.create_basic_dirstate()
 
727
        # On Unix, we can write extra data as long as we haven't read yet, but
 
728
        # on Win32, if you've opened the file with FILE_SHARE_READ, trying to
 
729
        # open it in append mode will fail.
 
730
        state.unlock()
 
731
        f = open('dirstate', 'ab')
 
732
        try:
 
733
            # Add bogus trailing garbage
 
734
            f.write('bogus\n')
 
735
        finally:
 
736
            f.close()
 
737
            state.lock_read()
 
738
        e = self.assertRaises(errors.DirstateCorrupt,
 
739
                              state._read_dirblocks_if_needed)
 
740
        # Make sure we mention the bogus characters in the error
 
741
        self.assertContainsRe(str(e), 'bogus')
 
742
 
 
743
 
 
744
class TestCompiledReadDirblocks(TestReadDirblocks):
 
745
    """Test the pyrex implementation of _read_dirblocks"""
 
746
 
 
747
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
748
 
 
749
    def get_read_dirblocks(self):
 
750
        from breezy._dirstate_helpers_pyx import _read_dirblocks
 
751
        return _read_dirblocks
 
752
 
 
753
 
 
754
class TestUsingCompiledIfAvailable(tests.TestCase):
 
755
    """Check that any compiled functions that are available are the default.
 
756
 
 
757
    It is possible to have typos, etc in the import line, such that
 
758
    _dirstate_helpers_pyx is actually available, but the compiled functions are
 
759
    not being used.
 
760
    """
 
761
 
 
762
    def test_bisect_dirblock(self):
 
763
        if compiled_dirstate_helpers_feature.available():
 
764
            from breezy._dirstate_helpers_pyx import bisect_dirblock
 
765
        else:
 
766
            from breezy._dirstate_helpers_py import bisect_dirblock
 
767
        self.assertIs(bisect_dirblock, dirstate.bisect_dirblock)
 
768
 
 
769
    def test__bisect_path_left(self):
 
770
        if compiled_dirstate_helpers_feature.available():
 
771
            from breezy._dirstate_helpers_pyx import _bisect_path_left
 
772
        else:
 
773
            from breezy._dirstate_helpers_py import _bisect_path_left
 
774
        self.assertIs(_bisect_path_left, dirstate._bisect_path_left)
 
775
 
 
776
    def test__bisect_path_right(self):
 
777
        if compiled_dirstate_helpers_feature.available():
 
778
            from breezy._dirstate_helpers_pyx import _bisect_path_right
 
779
        else:
 
780
            from breezy._dirstate_helpers_py import _bisect_path_right
 
781
        self.assertIs(_bisect_path_right, dirstate._bisect_path_right)
 
782
 
 
783
    def test_cmp_by_dirs(self):
 
784
        if compiled_dirstate_helpers_feature.available():
 
785
            from breezy._dirstate_helpers_pyx import cmp_by_dirs
 
786
        else:
 
787
            from breezy._dirstate_helpers_py import cmp_by_dirs
 
788
        self.assertIs(cmp_by_dirs, dirstate.cmp_by_dirs)
 
789
 
 
790
    def test__read_dirblocks(self):
 
791
        if compiled_dirstate_helpers_feature.available():
 
792
            from breezy._dirstate_helpers_pyx import _read_dirblocks
 
793
        else:
 
794
            from breezy._dirstate_helpers_py import _read_dirblocks
 
795
        self.assertIs(_read_dirblocks, dirstate._read_dirblocks)
 
796
 
 
797
    def test_update_entry(self):
 
798
        if compiled_dirstate_helpers_feature.available():
 
799
            from breezy._dirstate_helpers_pyx import update_entry
 
800
        else:
 
801
            from breezy.dirstate import update_entry
 
802
        self.assertIs(update_entry, dirstate.update_entry)
 
803
 
 
804
    def test_process_entry(self):
 
805
        if compiled_dirstate_helpers_feature.available():
 
806
            from breezy._dirstate_helpers_pyx import ProcessEntryC
 
807
            self.assertIs(ProcessEntryC, dirstate._process_entry)
 
808
        else:
 
809
            from breezy.dirstate import ProcessEntryPython
 
810
            self.assertIs(ProcessEntryPython, dirstate._process_entry)
 
811
 
 
812
 
 
813
class TestUpdateEntry(test_dirstate.TestCaseWithDirState):
 
814
    """Test the DirState.update_entry functions"""
 
815
 
 
816
    scenarios = multiply_scenarios(
 
817
        dir_reader_scenarios(), ue_scenarios)
 
818
 
 
819
    # Set by load_tests
 
820
    update_entry = None
 
821
 
 
822
    def setUp(self):
 
823
        super(TestUpdateEntry, self).setUp()
 
824
        self.overrideAttr(dirstate, 'update_entry', self.update_entry)
 
825
 
 
826
    def get_state_with_a(self):
 
827
        """Create a DirState tracking a single object named 'a'"""
 
828
        state = test_dirstate.InstrumentedDirState.initialize('dirstate')
 
829
        self.addCleanup(state.unlock)
 
830
        state.add('a', 'a-id', 'file', None, '')
 
831
        entry = state._get_entry(0, path_utf8='a')
 
832
        return state, entry
 
833
 
 
834
    def test_observed_sha1_cachable(self):
 
835
        state, entry = self.get_state_with_a()
 
836
        state.save()
 
837
        atime = time.time() - 10
 
838
        self.build_tree(['a'])
 
839
        statvalue = test_dirstate._FakeStat.from_stat(os.lstat('a'))
 
840
        statvalue.st_mtime = statvalue.st_ctime = atime
 
841
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
842
                         state._dirblock_state)
 
843
        state._observed_sha1(entry, "foo", statvalue)
 
844
        self.assertEqual('foo', entry[1][0][1])
 
845
        packed_stat = dirstate.pack_stat(statvalue)
 
846
        self.assertEqual(packed_stat, entry[1][0][4])
 
847
        self.assertEqual(dirstate.DirState.IN_MEMORY_HASH_MODIFIED,
 
848
                         state._dirblock_state)
 
849
 
 
850
    def test_observed_sha1_not_cachable(self):
 
851
        state, entry = self.get_state_with_a()
 
852
        state.save()
 
853
        oldval = entry[1][0][1]
 
854
        oldstat = entry[1][0][4]
 
855
        self.build_tree(['a'])
 
856
        statvalue = os.lstat('a')
 
857
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
858
                         state._dirblock_state)
 
859
        state._observed_sha1(entry, "foo", statvalue)
 
860
        self.assertEqual(oldval, entry[1][0][1])
 
861
        self.assertEqual(oldstat, entry[1][0][4])
 
862
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
863
                         state._dirblock_state)
 
864
 
 
865
    def test_update_entry(self):
 
866
        state, _ = self.get_state_with_a()
 
867
        tree = self.make_branch_and_tree('tree')
 
868
        tree.lock_write()
 
869
        empty_revid = tree.commit('empty')
 
870
        self.build_tree(['tree/a'])
 
871
        tree.add(['a'], ['a-id'])
 
872
        with_a_id = tree.commit('with_a')
 
873
        self.addCleanup(tree.unlock)
 
874
        state.set_parent_trees(
 
875
            [(empty_revid, tree.branch.repository.revision_tree(empty_revid))],
 
876
            [])
 
877
        entry = state._get_entry(0, path_utf8='a')
 
878
        self.build_tree(['a'])
 
879
        # Add one where we don't provide the stat or sha already
 
880
        self.assertEqual(('', 'a', 'a-id'), entry[0])
 
881
        self.assertEqual(('f', '', 0, False, dirstate.DirState.NULLSTAT),
 
882
                         entry[1][0])
 
883
        # Flush the buffers to disk
 
884
        state.save()
 
885
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
886
                         state._dirblock_state)
 
887
 
 
888
        stat_value = os.lstat('a')
 
889
        packed_stat = dirstate.pack_stat(stat_value)
 
890
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
891
                                          stat_value=stat_value)
 
892
        self.assertEqual(None, link_or_sha1)
 
893
 
 
894
        # The dirblock entry should not have computed or cached the file's
 
895
        # sha1, but it did update the files' st_size. However, this is not
 
896
        # worth writing a dirstate file for, so we leave the state UNMODIFIED
 
897
        self.assertEqual(('f', '', 14, False, dirstate.DirState.NULLSTAT),
 
898
                         entry[1][0])
 
899
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
900
                         state._dirblock_state)
 
901
        mode = stat_value.st_mode
 
902
        self.assertEqual([('is_exec', mode, False)], state._log)
 
903
 
 
904
        state.save()
 
905
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
906
                         state._dirblock_state)
 
907
 
 
908
        # Roll the clock back so the file is guaranteed to look too new. We
 
909
        # should still not compute the sha1.
 
910
        state.adjust_time(-10)
 
911
        del state._log[:]
 
912
 
 
913
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
914
                                          stat_value=stat_value)
 
915
        self.assertEqual([('is_exec', mode, False)], state._log)
 
916
        self.assertEqual(None, link_or_sha1)
 
917
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
918
                         state._dirblock_state)
 
919
        self.assertEqual(('f', '', 14, False, dirstate.DirState.NULLSTAT),
 
920
                         entry[1][0])
 
921
        state.save()
 
922
 
 
923
        # If it is cachable (the clock has moved forward) but new it still
 
924
        # won't calculate the sha or cache it.
 
925
        state.adjust_time(+20)
 
926
        del state._log[:]
 
927
        link_or_sha1 = dirstate.update_entry(state, entry, abspath='a',
 
928
                                          stat_value=stat_value)
 
929
        self.assertEqual(None, link_or_sha1)
 
930
        self.assertEqual([('is_exec', mode, False)], state._log)
 
931
        self.assertEqual(('f', '', 14, False, dirstate.DirState.NULLSTAT),
 
932
                         entry[1][0])
 
933
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
934
                         state._dirblock_state)
 
935
 
 
936
        # If the file is no longer new, and the clock has been moved forward
 
937
        # sufficiently, it will cache the sha.
 
938
        del state._log[:]
 
939
        state.set_parent_trees(
 
940
            [(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
 
941
            [])
 
942
        entry = state._get_entry(0, path_utf8='a')
 
943
 
 
944
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
945
                                          stat_value=stat_value)
 
946
        self.assertEqual('b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
 
947
                         link_or_sha1)
 
948
        self.assertEqual([('is_exec', mode, False), ('sha1', 'a')],
 
949
                          state._log)
 
950
        self.assertEqual(('f', link_or_sha1, 14, False, packed_stat),
 
951
                         entry[1][0])
 
952
 
 
953
        # Subsequent calls will just return the cached value
 
954
        del state._log[:]
 
955
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
956
                                          stat_value=stat_value)
 
957
        self.assertEqual('b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
 
958
                         link_or_sha1)
 
959
        self.assertEqual([], state._log)
 
960
        self.assertEqual(('f', link_or_sha1, 14, False, packed_stat),
 
961
                         entry[1][0])
 
962
 
 
963
    def test_update_entry_symlink(self):
 
964
        """Update entry should read symlinks."""
 
965
        self.requireFeature(features.SymlinkFeature)
 
966
        state, entry = self.get_state_with_a()
 
967
        state.save()
 
968
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
969
                         state._dirblock_state)
 
970
        os.symlink('target', 'a')
 
971
 
 
972
        state.adjust_time(-10) # Make the symlink look new
 
973
        stat_value = os.lstat('a')
 
974
        packed_stat = dirstate.pack_stat(stat_value)
 
975
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
976
                                          stat_value=stat_value)
 
977
        self.assertEqual('target', link_or_sha1)
 
978
        self.assertEqual([('read_link', 'a', '')], state._log)
 
979
        # Dirblock is not updated (the link is too new)
 
980
        self.assertEqual([('l', '', 6, False, dirstate.DirState.NULLSTAT)],
 
981
                         entry[1])
 
982
        # The file entry turned into a symlink, that is considered
 
983
        # HASH modified worthy.
 
984
        self.assertEqual(dirstate.DirState.IN_MEMORY_HASH_MODIFIED,
 
985
                         state._dirblock_state)
 
986
 
 
987
        # Because the stat_value looks new, we should re-read the target
 
988
        del state._log[:]
 
989
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
990
                                          stat_value=stat_value)
 
991
        self.assertEqual('target', link_or_sha1)
 
992
        self.assertEqual([('read_link', 'a', '')], state._log)
 
993
        self.assertEqual([('l', '', 6, False, dirstate.DirState.NULLSTAT)],
 
994
                         entry[1])
 
995
        state.save()
 
996
        state.adjust_time(+20) # Skip into the future, all files look old
 
997
        del state._log[:]
 
998
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
999
                                          stat_value=stat_value)
 
1000
        # The symlink stayed a symlink. So while it is new enough to cache, we
 
1001
        # don't bother setting the flag, because it is not really worth saving
 
1002
        # (when we stat the symlink, we'll have paged in the target.)
 
1003
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1004
                         state._dirblock_state)
 
1005
        self.assertEqual('target', link_or_sha1)
 
1006
        # We need to re-read the link because only now can we cache it
 
1007
        self.assertEqual([('read_link', 'a', '')], state._log)
 
1008
        self.assertEqual([('l', 'target', 6, False, packed_stat)],
 
1009
                         entry[1])
 
1010
 
 
1011
        del state._log[:]
 
1012
        # Another call won't re-read the link
 
1013
        self.assertEqual([], state._log)
 
1014
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
 
1015
                                          stat_value=stat_value)
 
1016
        self.assertEqual('target', link_or_sha1)
 
1017
        self.assertEqual([('l', 'target', 6, False, packed_stat)],
 
1018
                         entry[1])
 
1019
 
 
1020
    def do_update_entry(self, state, entry, abspath):
 
1021
        stat_value = os.lstat(abspath)
 
1022
        return self.update_entry(state, entry, abspath, stat_value)
 
1023
 
 
1024
    def test_update_entry_dir(self):
 
1025
        state, entry = self.get_state_with_a()
 
1026
        self.build_tree(['a/'])
 
1027
        self.assertIs(None, self.do_update_entry(state, entry, 'a'))
 
1028
 
 
1029
    def test_update_entry_dir_unchanged(self):
 
1030
        state, entry = self.get_state_with_a()
 
1031
        self.build_tree(['a/'])
 
1032
        state.adjust_time(+20)
 
1033
        self.assertIs(None, self.do_update_entry(state, entry, 'a'))
 
1034
        # a/ used to be a file, but is now a directory, worth saving
 
1035
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
1036
                         state._dirblock_state)
 
1037
        state.save()
 
1038
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1039
                         state._dirblock_state)
 
1040
        # No changes to a/ means not worth saving.
 
1041
        self.assertIs(None, self.do_update_entry(state, entry, 'a'))
 
1042
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1043
                         state._dirblock_state)
 
1044
        # Change the last-modified time for the directory
 
1045
        t = time.time() - 100.0
 
1046
        try:
 
1047
            os.utime('a', (t, t))
 
1048
        except OSError:
 
1049
            # It looks like Win32 + FAT doesn't allow to change times on a dir.
 
1050
            raise tests.TestSkipped("can't update mtime of a dir on FAT")
 
1051
        saved_packed_stat = entry[1][0][-1]
 
1052
        self.assertIs(None, self.do_update_entry(state, entry, 'a'))
 
1053
        # We *do* go ahead and update the information in the dirblocks, but we
 
1054
        # don't bother setting IN_MEMORY_MODIFIED because it is trivial to
 
1055
        # recompute.
 
1056
        self.assertNotEqual(saved_packed_stat, entry[1][0][-1])
 
1057
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1058
                         state._dirblock_state)
 
1059
 
 
1060
    def test_update_entry_file_unchanged(self):
 
1061
        state, _ = self.get_state_with_a()
 
1062
        tree = self.make_branch_and_tree('tree')
 
1063
        tree.lock_write()
 
1064
        self.build_tree(['tree/a'])
 
1065
        tree.add(['a'], ['a-id'])
 
1066
        with_a_id = tree.commit('witha')
 
1067
        self.addCleanup(tree.unlock)
 
1068
        state.set_parent_trees(
 
1069
            [(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
 
1070
            [])
 
1071
        entry = state._get_entry(0, path_utf8='a')
 
1072
        self.build_tree(['a'])
 
1073
        sha1sum = 'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
 
1074
        state.adjust_time(+20)
 
1075
        self.assertEqual(sha1sum, self.do_update_entry(state, entry, 'a'))
 
1076
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
1077
                         state._dirblock_state)
 
1078
        state.save()
 
1079
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1080
                         state._dirblock_state)
 
1081
        self.assertEqual(sha1sum, self.do_update_entry(state, entry, 'a'))
 
1082
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1083
                         state._dirblock_state)
 
1084
 
 
1085
    def test_update_entry_tree_reference(self):
 
1086
        state = test_dirstate.InstrumentedDirState.initialize('dirstate')
 
1087
        self.addCleanup(state.unlock)
 
1088
        state.add('r', 'r-id', 'tree-reference', None, '')
 
1089
        self.build_tree(['r/'])
 
1090
        entry = state._get_entry(0, path_utf8='r')
 
1091
        self.do_update_entry(state, entry, 'r')
 
1092
        entry = state._get_entry(0, path_utf8='r')
 
1093
        self.assertEqual('t', entry[1][0][0])
 
1094
 
 
1095
    def create_and_test_file(self, state, entry):
 
1096
        """Create a file at 'a' and verify the state finds it during update.
 
1097
 
 
1098
        The state should already be versioning *something* at 'a'. This makes
 
1099
        sure that state.update_entry recognizes it as a file.
 
1100
        """
 
1101
        self.build_tree(['a'])
 
1102
        stat_value = os.lstat('a')
 
1103
        packed_stat = dirstate.pack_stat(stat_value)
 
1104
 
 
1105
        link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
 
1106
        self.assertEqual(None, link_or_sha1)
 
1107
        self.assertEqual([('f', '', 14, False, dirstate.DirState.NULLSTAT)],
 
1108
                         entry[1])
 
1109
        return packed_stat
 
1110
 
 
1111
    def create_and_test_dir(self, state, entry):
 
1112
        """Create a directory at 'a' and verify the state finds it.
 
1113
 
 
1114
        The state should already be versioning *something* at 'a'. This makes
 
1115
        sure that state.update_entry recognizes it as a directory.
 
1116
        """
 
1117
        self.build_tree(['a/'])
 
1118
        stat_value = os.lstat('a')
 
1119
        packed_stat = dirstate.pack_stat(stat_value)
 
1120
 
 
1121
        link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
 
1122
        self.assertIs(None, link_or_sha1)
 
1123
        self.assertEqual([('d', '', 0, False, packed_stat)], entry[1])
 
1124
 
 
1125
        return packed_stat
 
1126
 
 
1127
    # FIXME: Add unicode version
 
1128
    def create_and_test_symlink(self, state, entry):
 
1129
        """Create a symlink at 'a' and verify the state finds it.
 
1130
 
 
1131
        The state should already be versioning *something* at 'a'. This makes
 
1132
        sure that state.update_entry recognizes it as a symlink.
 
1133
 
 
1134
        This should not be called if this platform does not have symlink
 
1135
        support.
 
1136
        """
 
1137
        # caller should care about skipping test on platforms without symlinks
 
1138
        os.symlink('path/to/foo', 'a')
 
1139
 
 
1140
        stat_value = os.lstat('a')
 
1141
        packed_stat = dirstate.pack_stat(stat_value)
 
1142
 
 
1143
        link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
 
1144
        self.assertEqual('path/to/foo', link_or_sha1)
 
1145
        self.assertEqual([('l', 'path/to/foo', 11, False, packed_stat)],
 
1146
                         entry[1])
 
1147
        return packed_stat
 
1148
 
 
1149
    def test_update_file_to_dir(self):
 
1150
        """If a file changes to a directory we return None for the sha.
 
1151
        We also update the inventory record.
 
1152
        """
 
1153
        state, entry = self.get_state_with_a()
 
1154
        # The file sha1 won't be cached unless the file is old
 
1155
        state.adjust_time(+10)
 
1156
        self.create_and_test_file(state, entry)
 
1157
        os.remove('a')
 
1158
        self.create_and_test_dir(state, entry)
 
1159
 
 
1160
    def test_update_file_to_symlink(self):
 
1161
        """File becomes a symlink"""
 
1162
        self.requireFeature(features.SymlinkFeature)
 
1163
        state, entry = self.get_state_with_a()
 
1164
        # The file sha1 won't be cached unless the file is old
 
1165
        state.adjust_time(+10)
 
1166
        self.create_and_test_file(state, entry)
 
1167
        os.remove('a')
 
1168
        self.create_and_test_symlink(state, entry)
 
1169
 
 
1170
    def test_update_dir_to_file(self):
 
1171
        """Directory becoming a file updates the entry."""
 
1172
        state, entry = self.get_state_with_a()
 
1173
        # The file sha1 won't be cached unless the file is old
 
1174
        state.adjust_time(+10)
 
1175
        self.create_and_test_dir(state, entry)
 
1176
        os.rmdir('a')
 
1177
        self.create_and_test_file(state, entry)
 
1178
 
 
1179
    def test_update_dir_to_symlink(self):
 
1180
        """Directory becomes a symlink"""
 
1181
        self.requireFeature(features.SymlinkFeature)
 
1182
        state, entry = self.get_state_with_a()
 
1183
        # The symlink target won't be cached if it isn't old
 
1184
        state.adjust_time(+10)
 
1185
        self.create_and_test_dir(state, entry)
 
1186
        os.rmdir('a')
 
1187
        self.create_and_test_symlink(state, entry)
 
1188
 
 
1189
    def test_update_symlink_to_file(self):
 
1190
        """Symlink becomes a file"""
 
1191
        self.requireFeature(features.SymlinkFeature)
 
1192
        state, entry = self.get_state_with_a()
 
1193
        # The symlink and file info won't be cached unless old
 
1194
        state.adjust_time(+10)
 
1195
        self.create_and_test_symlink(state, entry)
 
1196
        os.remove('a')
 
1197
        self.create_and_test_file(state, entry)
 
1198
 
 
1199
    def test_update_symlink_to_dir(self):
 
1200
        """Symlink becomes a directory"""
 
1201
        self.requireFeature(features.SymlinkFeature)
 
1202
        state, entry = self.get_state_with_a()
 
1203
        # The symlink target won't be cached if it isn't old
 
1204
        state.adjust_time(+10)
 
1205
        self.create_and_test_symlink(state, entry)
 
1206
        os.remove('a')
 
1207
        self.create_and_test_dir(state, entry)
 
1208
 
 
1209
    def test__is_executable_win32(self):
 
1210
        state, entry = self.get_state_with_a()
 
1211
        self.build_tree(['a'])
 
1212
 
 
1213
        # Make sure we are using the win32 implementation of _is_executable
 
1214
        state._is_executable = state._is_executable_win32
 
1215
 
 
1216
        # The file on disk is not executable, but we are marking it as though
 
1217
        # it is. With _is_executable_win32 we ignore what is on disk.
 
1218
        entry[1][0] = ('f', '', 0, True, dirstate.DirState.NULLSTAT)
 
1219
 
 
1220
        stat_value = os.lstat('a')
 
1221
        packed_stat = dirstate.pack_stat(stat_value)
 
1222
 
 
1223
        state.adjust_time(-10) # Make sure everything is new
 
1224
        self.update_entry(state, entry, abspath='a', stat_value=stat_value)
 
1225
 
 
1226
        # The row is updated, but the executable bit stays set.
 
1227
        self.assertEqual([('f', '', 14, True, dirstate.DirState.NULLSTAT)],
 
1228
                         entry[1])
 
1229
 
 
1230
        # Make the disk object look old enough to cache (but it won't cache the
 
1231
        # sha as it is a new file).
 
1232
        state.adjust_time(+20)
 
1233
        digest = 'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
 
1234
        self.update_entry(state, entry, abspath='a', stat_value=stat_value)
 
1235
        self.assertEqual([('f', '', 14, True, dirstate.DirState.NULLSTAT)],
 
1236
            entry[1])
 
1237
 
 
1238
    def _prepare_tree(self):
 
1239
        # Create a tree
 
1240
        text = 'Hello World\n'
 
1241
        tree = self.make_branch_and_tree('tree')
 
1242
        self.build_tree_contents([('tree/a file', text)])
 
1243
        tree.add('a file', 'a-file-id')
 
1244
        # Note: dirstate does not sha prior to the first commit
 
1245
        # so commit now in order for the test to work
 
1246
        tree.commit('first')
 
1247
        return tree, text
 
1248
 
 
1249
    def test_sha1provider_sha1_used(self):
 
1250
        tree, text = self._prepare_tree()
 
1251
        state = dirstate.DirState.from_tree(tree, 'dirstate',
 
1252
            UppercaseSHA1Provider())
 
1253
        self.addCleanup(state.unlock)
 
1254
        expected_sha = osutils.sha_string(text.upper() + "foo")
 
1255
        entry = state._get_entry(0, path_utf8='a file')
 
1256
        state._sha_cutoff_time()
 
1257
        state._cutoff_time += 10
 
1258
        sha1 = self.update_entry(state, entry, 'tree/a file',
 
1259
                                 os.lstat('tree/a file'))
 
1260
        self.assertEqual(expected_sha, sha1)
 
1261
 
 
1262
    def test_sha1provider_stat_and_sha1_used(self):
 
1263
        tree, text = self._prepare_tree()
 
1264
        tree.lock_write()
 
1265
        self.addCleanup(tree.unlock)
 
1266
        state = tree._current_dirstate()
 
1267
        state._sha1_provider = UppercaseSHA1Provider()
 
1268
        # If we used the standard provider, it would look like nothing has
 
1269
        # changed
 
1270
        file_ids_changed = [change[0] for change
 
1271
                            in tree.iter_changes(tree.basis_tree())]
 
1272
        self.assertEqual(['a-file-id'], file_ids_changed)
 
1273
 
 
1274
 
 
1275
class UppercaseSHA1Provider(dirstate.SHA1Provider):
 
1276
    """A custom SHA1Provider."""
 
1277
 
 
1278
    def sha1(self, abspath):
 
1279
        return self.stat_and_sha1(abspath)[1]
 
1280
 
 
1281
    def stat_and_sha1(self, abspath):
 
1282
        file_obj = file(abspath, 'rb')
 
1283
        try:
 
1284
            statvalue = os.fstat(file_obj.fileno())
 
1285
            text = ''.join(file_obj.readlines())
 
1286
            sha1 = osutils.sha_string(text.upper() + "foo")
 
1287
        finally:
 
1288
            file_obj.close()
 
1289
        return statvalue, sha1
 
1290
 
 
1291
 
 
1292
class TestProcessEntry(test_dirstate.TestCaseWithDirState):
 
1293
 
 
1294
    scenarios = multiply_scenarios(dir_reader_scenarios(), pe_scenarios)
 
1295
 
 
1296
    # Set by load_tests
 
1297
    _process_entry = None
 
1298
 
 
1299
    def setUp(self):
 
1300
        super(TestProcessEntry, self).setUp()
 
1301
        self.overrideAttr(dirstate, '_process_entry', self._process_entry)
 
1302
 
 
1303
    def assertChangedFileIds(self, expected, tree):
 
1304
        tree.lock_read()
 
1305
        try:
 
1306
            file_ids = [info[0] for info
 
1307
                        in tree.iter_changes(tree.basis_tree())]
 
1308
        finally:
 
1309
            tree.unlock()
 
1310
        self.assertEqual(sorted(expected), sorted(file_ids))
 
1311
 
 
1312
    def test_exceptions_raised(self):
 
1313
        # This is a direct test of bug #495023, it relies on osutils.is_inside
 
1314
        # getting called in an inner function. Which makes it a bit brittle,
 
1315
        # but at least it does reproduce the bug.
 
1316
        tree = self.make_branch_and_tree('tree')
 
1317
        self.build_tree(['tree/file', 'tree/dir/', 'tree/dir/sub',
 
1318
                         'tree/dir2/', 'tree/dir2/sub2'])
 
1319
        tree.add(['file', 'dir', 'dir/sub', 'dir2', 'dir2/sub2'])
 
1320
        tree.commit('first commit')
 
1321
        tree.lock_read()
 
1322
        self.addCleanup(tree.unlock)
 
1323
        basis_tree = tree.basis_tree()
 
1324
        def is_inside_raises(*args, **kwargs):
 
1325
            raise RuntimeError('stop this')
 
1326
        self.overrideAttr(osutils, 'is_inside', is_inside_raises)
 
1327
        self.assertListRaises(RuntimeError, tree.iter_changes, basis_tree)
 
1328
 
 
1329
    def test_simple_changes(self):
 
1330
        tree = self.make_branch_and_tree('tree')
 
1331
        self.build_tree(['tree/file'])
 
1332
        tree.add(['file'], ['file-id'])
 
1333
        self.assertChangedFileIds([tree.get_root_id(), 'file-id'], tree)
 
1334
        tree.commit('one')
 
1335
        self.assertChangedFileIds([], tree)
 
1336
 
 
1337
    def test_sha1provider_stat_and_sha1_used(self):
 
1338
        tree = self.make_branch_and_tree('tree')
 
1339
        self.build_tree(['tree/file'])
 
1340
        tree.add(['file'], ['file-id'])
 
1341
        tree.commit('one')
 
1342
        tree.lock_write()
 
1343
        self.addCleanup(tree.unlock)
 
1344
        state = tree._current_dirstate()
 
1345
        state._sha1_provider = UppercaseSHA1Provider()
 
1346
        self.assertChangedFileIds(['file-id'], tree)
 
1347
 
 
1348
 
 
1349
class TestPackStat(tests.TestCase):
 
1350
    """Check packed representaton of stat values is robust on all inputs"""
 
1351
 
 
1352
    scenarios = helper_scenarios
 
1353
 
 
1354
    def pack(self, statlike_tuple):
 
1355
        return self.helpers.pack_stat(os.stat_result(statlike_tuple))
 
1356
 
 
1357
    @staticmethod
 
1358
    def unpack_field(packed_string, stat_field):
 
1359
        return _dirstate_helpers_py._unpack_stat(packed_string)[stat_field]
 
1360
 
 
1361
    def test_result(self):
 
1362
        self.assertEqual("AAAQAAAAABAAAAARAAAAAgAAAAEAAIHk",
 
1363
            self.pack((33252, 1, 2, 0, 0, 0, 4096, 15.5, 16.5, 17.5)))
 
1364
 
 
1365
    def test_giant_inode(self):
 
1366
        packed = self.pack((33252, 0xF80000ABC, 0, 0, 0, 0, 0, 0, 0, 0))
 
1367
        self.assertEqual(0x80000ABC, self.unpack_field(packed, "st_ino"))
 
1368
 
 
1369
    def test_giant_size(self):
 
1370
        packed = self.pack((33252, 0, 0, 0, 0, 0, (1 << 33) + 4096, 0, 0, 0))
 
1371
        self.assertEqual(4096, self.unpack_field(packed, "st_size"))
 
1372
 
 
1373
    def test_fractional_mtime(self):
 
1374
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 16.9375, 0))
 
1375
        self.assertEqual(16, self.unpack_field(packed, "st_mtime"))
 
1376
 
 
1377
    def test_ancient_mtime(self):
 
1378
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, -11644473600.0, 0))
 
1379
        self.assertEqual(1240428288, self.unpack_field(packed, "st_mtime"))
 
1380
 
 
1381
    def test_distant_mtime(self):
 
1382
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 64060588800.0, 0))
 
1383
        self.assertEqual(3931046656, self.unpack_field(packed, "st_mtime"))
 
1384
 
 
1385
    def test_fractional_ctime(self):
 
1386
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, 17.5625))
 
1387
        self.assertEqual(17, self.unpack_field(packed, "st_ctime"))
 
1388
 
 
1389
    def test_ancient_ctime(self):
 
1390
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, -11644473600.0))
 
1391
        self.assertEqual(1240428288, self.unpack_field(packed, "st_ctime"))
 
1392
 
 
1393
    def test_distant_ctime(self):
 
1394
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, 64060588800.0))
 
1395
        self.assertEqual(3931046656, self.unpack_field(packed, "st_ctime"))
 
1396
 
 
1397
    def test_negative_dev(self):
 
1398
        packed = self.pack((33252, 0, -0xFFFFFCDE, 0, 0, 0, 0, 0, 0, 0))
 
1399
        self.assertEqual(0x322, self.unpack_field(packed, "st_dev"))