/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test__dirstate_helpers.py

  • Committer: Breezy landing bot
  • Author(s): Jelmer Vernooij
  • Date: 2018-11-16 18:26:22 UTC
  • mfrom: (7167.1.4 run-flake8)
  • Revision ID: breezy.the.bot@gmail.com-20181116182622-qw3gan3hz78a2imw
Add a flake8 test.

Merged from https://code.launchpad.net/~jelmer/brz/run-flake8/+merge/358902

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2007-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the compiled dirstate helpers."""
 
18
 
 
19
import bisect
 
20
import os
 
21
import time
 
22
 
 
23
from .. import (
 
24
    osutils,
 
25
    tests,
 
26
    )
 
27
from ..bzr import (
 
28
    dirstate,
 
29
    _dirstate_helpers_py,
 
30
    )
 
31
from . import (
 
32
    test_dirstate,
 
33
    )
 
34
from .test_osutils import dir_reader_scenarios
 
35
from .scenarios import (
 
36
    load_tests_apply_scenarios,
 
37
    multiply_scenarios,
 
38
    )
 
39
from . import (
 
40
    features,
 
41
    )
 
42
 
 
43
 
 
44
load_tests = load_tests_apply_scenarios
 
45
 
 
46
 
 
47
compiled_dirstate_helpers_feature = features.ModuleAvailableFeature(
 
48
    'breezy.bzr._dirstate_helpers_pyx')
 
49
 
 
50
 
 
51
# FIXME: we should also parametrize against SHA1Provider !
 
52
 
 
53
ue_scenarios = [('dirstate_Python',
 
54
    {'update_entry': dirstate.py_update_entry})]
 
55
if compiled_dirstate_helpers_feature.available():
 
56
    update_entry = compiled_dirstate_helpers_feature.module.update_entry
 
57
    ue_scenarios.append(('dirstate_Pyrex', {'update_entry': update_entry}))
 
58
 
 
59
pe_scenarios = [('dirstate_Python',
 
60
    {'_process_entry': dirstate.ProcessEntryPython})]
 
61
if compiled_dirstate_helpers_feature.available():
 
62
    process_entry = compiled_dirstate_helpers_feature.module.ProcessEntryC
 
63
    pe_scenarios.append(('dirstate_Pyrex', {'_process_entry': process_entry}))
 
64
 
 
65
helper_scenarios = [('dirstate_Python', {'helpers': _dirstate_helpers_py})]
 
66
if compiled_dirstate_helpers_feature.available():
 
67
    helper_scenarios.append(('dirstate_Pyrex',
 
68
        {'helpers': compiled_dirstate_helpers_feature.module}))
 
69
 
 
70
 
 
71
class TestBisectPathMixin(object):
 
72
    """Test that _bisect_path_*() returns the expected values.
 
73
 
 
74
    _bisect_path_* is intended to work like bisect.bisect_*() except it
 
75
    knows it is working on paths that are sorted by ('path', 'to', 'foo')
 
76
    chunks rather than by raw 'path/to/foo'.
 
77
 
 
78
    Test Cases should inherit from this and override ``get_bisect_path`` return
 
79
    their implementation, and ``get_bisect`` to return the matching
 
80
    bisect.bisect_* function.
 
81
    """
 
82
 
 
83
    def get_bisect_path(self):
 
84
        """Return an implementation of _bisect_path_*"""
 
85
        raise NotImplementedError
 
86
 
 
87
    def get_bisect(self):
 
88
        """Return a version of bisect.bisect_*.
 
89
 
 
90
        Also, for the 'exists' check, return the offset to the real values.
 
91
        For example bisect_left returns the index of an entry, while
 
92
        bisect_right returns the index *after* an entry
 
93
 
 
94
        :return: (bisect_func, offset)
 
95
        """
 
96
        raise NotImplementedError
 
97
 
 
98
    def assertBisect(self, paths, split_paths, path, exists=True):
 
99
        """Assert that bisect_split works like bisect_left on the split paths.
 
100
 
 
101
        :param paths: A list of path names
 
102
        :param split_paths: A list of path names that are already split up by directory
 
103
            ('path/to/foo' => ('path', 'to', 'foo'))
 
104
        :param path: The path we are indexing.
 
105
        :param exists: The path should be present, so make sure the
 
106
            final location actually points to the right value.
 
107
 
 
108
        All other arguments will be passed along.
 
109
        """
 
110
        bisect_path = self.get_bisect_path()
 
111
        self.assertIsInstance(paths, list)
 
112
        bisect_path_idx = bisect_path(paths, path)
 
113
        split_path = self.split_for_dirblocks([path])[0]
 
114
        bisect_func, offset = self.get_bisect()
 
115
        bisect_split_idx = bisect_func(split_paths, split_path)
 
116
        self.assertEqual(bisect_split_idx, bisect_path_idx,
 
117
                         '%s disagreed. %s != %s'
 
118
                         ' for key %r'
 
119
                         % (bisect_path.__name__,
 
120
                            bisect_split_idx, bisect_path_idx, path)
 
121
                         )
 
122
        if exists:
 
123
            self.assertEqual(path, paths[bisect_path_idx+offset])
 
124
 
 
125
    def split_for_dirblocks(self, paths):
 
126
        dir_split_paths = []
 
127
        for path in paths:
 
128
            dirname, basename = os.path.split(path)
 
129
            dir_split_paths.append((dirname.split(b'/'), basename))
 
130
        dir_split_paths.sort()
 
131
        return dir_split_paths
 
132
 
 
133
    def test_simple(self):
 
134
        """In the simple case it works just like bisect_left"""
 
135
        paths = [b'', b'a', b'b', b'c', b'd']
 
136
        split_paths = self.split_for_dirblocks(paths)
 
137
        for path in paths:
 
138
            self.assertBisect(paths, split_paths, path, exists=True)
 
139
        self.assertBisect(paths, split_paths, b'_', exists=False)
 
140
        self.assertBisect(paths, split_paths, b'aa', exists=False)
 
141
        self.assertBisect(paths, split_paths, b'bb', exists=False)
 
142
        self.assertBisect(paths, split_paths, b'cc', exists=False)
 
143
        self.assertBisect(paths, split_paths, b'dd', exists=False)
 
144
        self.assertBisect(paths, split_paths, b'a/a', exists=False)
 
145
        self.assertBisect(paths, split_paths, b'b/b', exists=False)
 
146
        self.assertBisect(paths, split_paths, b'c/c', exists=False)
 
147
        self.assertBisect(paths, split_paths, b'd/d', exists=False)
 
148
 
 
149
    def test_involved(self):
 
150
        """This is where bisect_path_* diverges slightly."""
 
151
        # This is the list of paths and their contents
 
152
        # a/
 
153
        #   a/
 
154
        #     a
 
155
        #     z
 
156
        #   a-a/
 
157
        #     a
 
158
        #   a-z/
 
159
        #     z
 
160
        #   a=a/
 
161
        #     a
 
162
        #   a=z/
 
163
        #     z
 
164
        #   z/
 
165
        #     a
 
166
        #     z
 
167
        #   z-a
 
168
        #   z-z
 
169
        #   z=a
 
170
        #   z=z
 
171
        # a-a/
 
172
        #   a
 
173
        # a-z/
 
174
        #   z
 
175
        # a=a/
 
176
        #   a
 
177
        # a=z/
 
178
        #   z
 
179
        # This is the exact order that is stored by dirstate
 
180
        # All children in a directory are mentioned before an children of
 
181
        # children are mentioned.
 
182
        # So all the root-directory paths, then all the
 
183
        # first sub directory, etc.
 
184
        paths = [# content of '/'
 
185
                 b'', b'a', b'a-a', b'a-z', b'a=a', b'a=z',
 
186
                 # content of 'a/'
 
187
                 b'a/a', b'a/a-a', b'a/a-z',
 
188
                 b'a/a=a', b'a/a=z',
 
189
                 b'a/z', b'a/z-a', b'a/z-z',
 
190
                 b'a/z=a', b'a/z=z',
 
191
                 # content of 'a/a/'
 
192
                 b'a/a/a', b'a/a/z',
 
193
                 # content of 'a/a-a'
 
194
                 b'a/a-a/a',
 
195
                 # content of 'a/a-z'
 
196
                 b'a/a-z/z',
 
197
                 # content of 'a/a=a'
 
198
                 b'a/a=a/a',
 
199
                 # content of 'a/a=z'
 
200
                 b'a/a=z/z',
 
201
                 # content of 'a/z/'
 
202
                 b'a/z/a', b'a/z/z',
 
203
                 # content of 'a-a'
 
204
                 b'a-a/a',
 
205
                 # content of 'a-z'
 
206
                 b'a-z/z',
 
207
                 # content of 'a=a'
 
208
                 b'a=a/a',
 
209
                 # content of 'a=z'
 
210
                 b'a=z/z',
 
211
                ]
 
212
        split_paths = self.split_for_dirblocks(paths)
 
213
        sorted_paths = []
 
214
        for dir_parts, basename in split_paths:
 
215
            if dir_parts == [b'']:
 
216
                sorted_paths.append(basename)
 
217
            else:
 
218
                sorted_paths.append(b'/'.join(dir_parts + [basename]))
 
219
 
 
220
        self.assertEqual(sorted_paths, paths)
 
221
 
 
222
        for path in paths:
 
223
            self.assertBisect(paths, split_paths, path, exists=True)
 
224
 
 
225
 
 
226
class TestBisectPathLeft(tests.TestCase, TestBisectPathMixin):
 
227
    """Run all Bisect Path tests against _bisect_path_left."""
 
228
 
 
229
    def get_bisect_path(self):
 
230
        from breezy.bzr._dirstate_helpers_py import _bisect_path_left
 
231
        return _bisect_path_left
 
232
 
 
233
    def get_bisect(self):
 
234
        return bisect.bisect_left, 0
 
235
 
 
236
 
 
237
class TestCompiledBisectPathLeft(TestBisectPathLeft):
 
238
    """Run all Bisect Path tests against _bisect_path_lect"""
 
239
 
 
240
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
241
 
 
242
    def get_bisect_path(self):
 
243
        from breezy.bzr._dirstate_helpers_pyx import _bisect_path_left
 
244
        return _bisect_path_left
 
245
 
 
246
 
 
247
class TestBisectPathRight(tests.TestCase, TestBisectPathMixin):
 
248
    """Run all Bisect Path tests against _bisect_path_right"""
 
249
 
 
250
    def get_bisect_path(self):
 
251
        from breezy.bzr._dirstate_helpers_py import _bisect_path_right
 
252
        return _bisect_path_right
 
253
 
 
254
    def get_bisect(self):
 
255
        return bisect.bisect_right, -1
 
256
 
 
257
 
 
258
class TestCompiledBisectPathRight(TestBisectPathRight):
 
259
    """Run all Bisect Path tests against _bisect_path_right"""
 
260
 
 
261
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
262
 
 
263
    def get_bisect_path(self):
 
264
        from breezy.bzr._dirstate_helpers_pyx import _bisect_path_right
 
265
        return _bisect_path_right
 
266
 
 
267
 
 
268
class TestBisectDirblock(tests.TestCase):
 
269
    """Test that bisect_dirblock() returns the expected values.
 
270
 
 
271
    bisect_dirblock is intended to work like bisect.bisect_left() except it
 
272
    knows it is working on dirblocks and that dirblocks are sorted by ('path',
 
273
    'to', 'foo') chunks rather than by raw 'path/to/foo'.
 
274
 
 
275
    This test is parameterized by calling get_bisect_dirblock(). Child test
 
276
    cases can override this function to test against a different
 
277
    implementation.
 
278
    """
 
279
 
 
280
    def get_bisect_dirblock(self):
 
281
        """Return an implementation of bisect_dirblock"""
 
282
        from breezy.bzr._dirstate_helpers_py import bisect_dirblock
 
283
        return bisect_dirblock
 
284
 
 
285
    def assertBisect(self, dirblocks, split_dirblocks, path, *args, **kwargs):
 
286
        """Assert that bisect_split works like bisect_left on the split paths.
 
287
 
 
288
        :param dirblocks: A list of (path, [info]) pairs.
 
289
        :param split_dirblocks: A list of ((split, path), [info]) pairs.
 
290
        :param path: The path we are indexing.
 
291
 
 
292
        All other arguments will be passed along.
 
293
        """
 
294
        bisect_dirblock = self.get_bisect_dirblock()
 
295
        self.assertIsInstance(dirblocks, list)
 
296
        bisect_split_idx = bisect_dirblock(dirblocks, path, *args, **kwargs)
 
297
        split_dirblock = (path.split(b'/'), [])
 
298
        bisect_left_idx = bisect.bisect_left(split_dirblocks, split_dirblock,
 
299
                                             *args)
 
300
        self.assertEqual(bisect_left_idx, bisect_split_idx,
 
301
                         'bisect_split disagreed. %s != %s'
 
302
                         ' for key %r'
 
303
                         % (bisect_left_idx, bisect_split_idx, path)
 
304
                         )
 
305
 
 
306
    def paths_to_dirblocks(self, paths):
 
307
        """Convert a list of paths into dirblock form.
 
308
 
 
309
        Also, ensure that the paths are in proper sorted order.
 
310
        """
 
311
        dirblocks = [(path, []) for path in paths]
 
312
        split_dirblocks = [(path.split(b'/'), []) for path in paths]
 
313
        self.assertEqual(sorted(split_dirblocks), split_dirblocks)
 
314
        return dirblocks, split_dirblocks
 
315
 
 
316
    def test_simple(self):
 
317
        """In the simple case it works just like bisect_left"""
 
318
        paths = [b'', b'a', b'b', b'c', b'd']
 
319
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
 
320
        for path in paths:
 
321
            self.assertBisect(dirblocks, split_dirblocks, path)
 
322
        self.assertBisect(dirblocks, split_dirblocks, b'_')
 
323
        self.assertBisect(dirblocks, split_dirblocks, b'aa')
 
324
        self.assertBisect(dirblocks, split_dirblocks, b'bb')
 
325
        self.assertBisect(dirblocks, split_dirblocks, b'cc')
 
326
        self.assertBisect(dirblocks, split_dirblocks, b'dd')
 
327
        self.assertBisect(dirblocks, split_dirblocks, b'a/a')
 
328
        self.assertBisect(dirblocks, split_dirblocks, b'b/b')
 
329
        self.assertBisect(dirblocks, split_dirblocks, b'c/c')
 
330
        self.assertBisect(dirblocks, split_dirblocks, b'd/d')
 
331
 
 
332
    def test_involved(self):
 
333
        """This is where bisect_left diverges slightly."""
 
334
        paths = [b'', b'a',
 
335
                 b'a/a', b'a/a/a', b'a/a/z', b'a/a-a', b'a/a-z',
 
336
                 b'a/z', b'a/z/a', b'a/z/z', b'a/z-a', b'a/z-z',
 
337
                 b'a-a', b'a-z',
 
338
                 b'z', b'z/a/a', b'z/a/z', b'z/a-a', b'z/a-z',
 
339
                 b'z/z', b'z/z/a', b'z/z/z', b'z/z-a', b'z/z-z',
 
340
                 b'z-a', b'z-z',
 
341
                ]
 
342
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
 
343
        for path in paths:
 
344
            self.assertBisect(dirblocks, split_dirblocks, path)
 
345
 
 
346
    def test_involved_cached(self):
 
347
        """This is where bisect_left diverges slightly."""
 
348
        paths = [b'', b'a',
 
349
                 b'a/a', b'a/a/a', b'a/a/z', b'a/a-a', b'a/a-z',
 
350
                 b'a/z', b'a/z/a', b'a/z/z', b'a/z-a', b'a/z-z',
 
351
                 b'a-a', b'a-z',
 
352
                 b'z', b'z/a/a', b'z/a/z', b'z/a-a', b'z/a-z',
 
353
                 b'z/z', b'z/z/a', b'z/z/z', b'z/z-a', b'z/z-z',
 
354
                 b'z-a', b'z-z',
 
355
                ]
 
356
        cache = {}
 
357
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
 
358
        for path in paths:
 
359
            self.assertBisect(dirblocks, split_dirblocks, path, cache=cache)
 
360
 
 
361
 
 
362
class TestCompiledBisectDirblock(TestBisectDirblock):
 
363
    """Test that bisect_dirblock() returns the expected values.
 
364
 
 
365
    bisect_dirblock is intended to work like bisect.bisect_left() except it
 
366
    knows it is working on dirblocks and that dirblocks are sorted by ('path',
 
367
    'to', 'foo') chunks rather than by raw 'path/to/foo'.
 
368
 
 
369
    This runs all the normal tests that TestBisectDirblock did, but uses the
 
370
    compiled version.
 
371
    """
 
372
 
 
373
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
374
 
 
375
    def get_bisect_dirblock(self):
 
376
        from breezy.bzr._dirstate_helpers_pyx import bisect_dirblock
 
377
        return bisect_dirblock
 
378
 
 
379
 
 
380
class TestLtByDirs(tests.TestCase):
 
381
    """Test an implementation of lt_by_dirs()
 
382
 
 
383
    lt_by_dirs() compares 2 paths by their directory sections, rather than as
 
384
    plain strings.
 
385
 
 
386
    Child test cases can override ``get_lt_by_dirs`` to test a specific
 
387
    implementation.
 
388
    """
 
389
 
 
390
    def get_lt_by_dirs(self):
 
391
        """Get a specific implementation of lt_by_dirs."""
 
392
        from ..bzr._dirstate_helpers_py import lt_by_dirs
 
393
        return lt_by_dirs
 
394
 
 
395
    def assertCmpByDirs(self, expected, str1, str2):
 
396
        """Compare the two strings, in both directions.
 
397
 
 
398
        :param expected: The expected comparison value. -1 means str1 comes
 
399
            first, 0 means they are equal, 1 means str2 comes first
 
400
        :param str1: string to compare
 
401
        :param str2: string to compare
 
402
        """
 
403
        lt_by_dirs = self.get_lt_by_dirs()
 
404
        if expected == 0:
 
405
            self.assertEqual(str1, str2)
 
406
            self.assertFalse(lt_by_dirs(str1, str2))
 
407
            self.assertFalse(lt_by_dirs(str2, str1))
 
408
        elif expected > 0:
 
409
            self.assertFalse(lt_by_dirs(str1, str2))
 
410
            self.assertTrue(lt_by_dirs(str2, str1))
 
411
        else:
 
412
            self.assertTrue(lt_by_dirs(str1, str2))
 
413
            self.assertFalse(lt_by_dirs(str2, str1))
 
414
 
 
415
    def test_cmp_empty(self):
 
416
        """Compare against the empty string."""
 
417
        self.assertCmpByDirs(0, b'', b'')
 
418
        self.assertCmpByDirs(1, b'a', b'')
 
419
        self.assertCmpByDirs(1, b'ab', b'')
 
420
        self.assertCmpByDirs(1, b'abc', b'')
 
421
        self.assertCmpByDirs(1, b'abcd', b'')
 
422
        self.assertCmpByDirs(1, b'abcde', b'')
 
423
        self.assertCmpByDirs(1, b'abcdef', b'')
 
424
        self.assertCmpByDirs(1, b'abcdefg', b'')
 
425
        self.assertCmpByDirs(1, b'abcdefgh', b'')
 
426
        self.assertCmpByDirs(1, b'abcdefghi', b'')
 
427
        self.assertCmpByDirs(1, b'test/ing/a/path/', b'')
 
428
 
 
429
    def test_cmp_same_str(self):
 
430
        """Compare the same string"""
 
431
        self.assertCmpByDirs(0, b'a', b'a')
 
432
        self.assertCmpByDirs(0, b'ab', b'ab')
 
433
        self.assertCmpByDirs(0, b'abc', b'abc')
 
434
        self.assertCmpByDirs(0, b'abcd', b'abcd')
 
435
        self.assertCmpByDirs(0, b'abcde', b'abcde')
 
436
        self.assertCmpByDirs(0, b'abcdef', b'abcdef')
 
437
        self.assertCmpByDirs(0, b'abcdefg', b'abcdefg')
 
438
        self.assertCmpByDirs(0, b'abcdefgh', b'abcdefgh')
 
439
        self.assertCmpByDirs(0, b'abcdefghi', b'abcdefghi')
 
440
        self.assertCmpByDirs(0, b'testing a long string', b'testing a long string')
 
441
        self.assertCmpByDirs(0, b'x'*10000, b'x'*10000)
 
442
        self.assertCmpByDirs(0, b'a/b', b'a/b')
 
443
        self.assertCmpByDirs(0, b'a/b/c', b'a/b/c')
 
444
        self.assertCmpByDirs(0, b'a/b/c/d', b'a/b/c/d')
 
445
        self.assertCmpByDirs(0, b'a/b/c/d/e', b'a/b/c/d/e')
 
446
 
 
447
    def test_simple_paths(self):
 
448
        """Compare strings that act like normal string comparison"""
 
449
        self.assertCmpByDirs(-1, b'a', b'b')
 
450
        self.assertCmpByDirs(-1, b'aa', b'ab')
 
451
        self.assertCmpByDirs(-1, b'ab', b'bb')
 
452
        self.assertCmpByDirs(-1, b'aaa', b'aab')
 
453
        self.assertCmpByDirs(-1, b'aab', b'abb')
 
454
        self.assertCmpByDirs(-1, b'abb', b'bbb')
 
455
        self.assertCmpByDirs(-1, b'aaaa', b'aaab')
 
456
        self.assertCmpByDirs(-1, b'aaab', b'aabb')
 
457
        self.assertCmpByDirs(-1, b'aabb', b'abbb')
 
458
        self.assertCmpByDirs(-1, b'abbb', b'bbbb')
 
459
        self.assertCmpByDirs(-1, b'aaaaa', b'aaaab')
 
460
        self.assertCmpByDirs(-1, b'a/a', b'a/b')
 
461
        self.assertCmpByDirs(-1, b'a/b', b'b/b')
 
462
        self.assertCmpByDirs(-1, b'a/a/a', b'a/a/b')
 
463
        self.assertCmpByDirs(-1, b'a/a/b', b'a/b/b')
 
464
        self.assertCmpByDirs(-1, b'a/b/b', b'b/b/b')
 
465
        self.assertCmpByDirs(-1, b'a/a/a/a', b'a/a/a/b')
 
466
        self.assertCmpByDirs(-1, b'a/a/a/b', b'a/a/b/b')
 
467
        self.assertCmpByDirs(-1, b'a/a/b/b', b'a/b/b/b')
 
468
        self.assertCmpByDirs(-1, b'a/b/b/b', b'b/b/b/b')
 
469
        self.assertCmpByDirs(-1, b'a/a/a/a/a', b'a/a/a/a/b')
 
470
 
 
471
    def test_tricky_paths(self):
 
472
        self.assertCmpByDirs(1, b'ab/cd/ef', b'ab/cc/ef')
 
473
        self.assertCmpByDirs(1, b'ab/cd/ef', b'ab/c/ef')
 
474
        self.assertCmpByDirs(-1, b'ab/cd/ef', b'ab/cd-ef')
 
475
        self.assertCmpByDirs(-1, b'ab/cd', b'ab/cd-')
 
476
        self.assertCmpByDirs(-1, b'ab/cd', b'ab-cd')
 
477
 
 
478
    def test_cmp_unicode_not_allowed(self):
 
479
        lt_by_dirs = self.get_lt_by_dirs()
 
480
        self.assertRaises(TypeError, lt_by_dirs, u'Unicode', b'str')
 
481
        self.assertRaises(TypeError, lt_by_dirs, b'str', u'Unicode')
 
482
        self.assertRaises(TypeError, lt_by_dirs, u'Unicode', u'Unicode')
 
483
 
 
484
    def test_cmp_non_ascii(self):
 
485
        self.assertCmpByDirs(-1, b'\xc2\xb5', b'\xc3\xa5') # u'\xb5', u'\xe5'
 
486
        self.assertCmpByDirs(-1, b'a', b'\xc3\xa5') # u'a', u'\xe5'
 
487
        self.assertCmpByDirs(-1, b'b', b'\xc2\xb5') # u'b', u'\xb5'
 
488
        self.assertCmpByDirs(-1, b'a/b', b'a/\xc3\xa5') # u'a/b', u'a/\xe5'
 
489
        self.assertCmpByDirs(-1, b'b/a', b'b/\xc2\xb5') # u'b/a', u'b/\xb5'
 
490
 
 
491
 
 
492
class TestCompiledLtByDirs(TestLtByDirs):
 
493
    """Test the pyrex implementation of lt_by_dirs"""
 
494
 
 
495
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
496
 
 
497
    def get_lt_by_dirs(self):
 
498
        from ..bzr._dirstate_helpers_pyx import lt_by_dirs
 
499
        return lt_by_dirs
 
500
 
 
501
 
 
502
class TestLtPathByDirblock(tests.TestCase):
 
503
    """Test an implementation of _lt_path_by_dirblock()
 
504
 
 
505
    _lt_path_by_dirblock() compares two paths using the sort order used by
 
506
    DirState. All paths in the same directory are sorted together.
 
507
 
 
508
    Child test cases can override ``get_lt_path_by_dirblock`` to test a specific
 
509
    implementation.
 
510
    """
 
511
 
 
512
    def get_lt_path_by_dirblock(self):
 
513
        """Get a specific implementation of _lt_path_by_dirblock."""
 
514
        from ..bzr._dirstate_helpers_py import _lt_path_by_dirblock
 
515
        return _lt_path_by_dirblock
 
516
 
 
517
    def assertLtPathByDirblock(self, paths):
 
518
        """Compare all paths and make sure they evaluate to the correct order.
 
519
 
 
520
        This does N^2 comparisons. It is assumed that ``paths`` is properly
 
521
        sorted list.
 
522
 
 
523
        :param paths: a sorted list of paths to compare
 
524
        """
 
525
        # First, make sure the paths being passed in are correct
 
526
        def _key(p):
 
527
            dirname, basename = os.path.split(p)
 
528
            return dirname.split(b'/'), basename
 
529
        self.assertEqual(sorted(paths, key=_key), paths)
 
530
 
 
531
        lt_path_by_dirblock = self.get_lt_path_by_dirblock()
 
532
        for idx1, path1 in enumerate(paths):
 
533
            for idx2, path2 in enumerate(paths):
 
534
                lt_result = lt_path_by_dirblock(path1, path2)
 
535
                self.assertEqual(idx1 < idx2, lt_result,
 
536
                        '%s did not state that %r < %r, lt=%s'
 
537
                        % (lt_path_by_dirblock.__name__,
 
538
                           path1, path2, lt_result))
 
539
 
 
540
    def test_cmp_simple_paths(self):
 
541
        """Compare against the empty string."""
 
542
        self.assertLtPathByDirblock([b'', b'a', b'ab', b'abc', b'a/b/c', b'b/d/e'])
 
543
        self.assertLtPathByDirblock([b'kl', b'ab/cd', b'ab/ef', b'gh/ij'])
 
544
 
 
545
    def test_tricky_paths(self):
 
546
        self.assertLtPathByDirblock([
 
547
            # Contents of ''
 
548
            b'', b'a', b'a-a', b'a=a', b'b',
 
549
            # Contents of 'a'
 
550
            b'a/a', b'a/a-a', b'a/a=a', b'a/b',
 
551
            # Contents of 'a/a'
 
552
            b'a/a/a', b'a/a/a-a', b'a/a/a=a',
 
553
            # Contents of 'a/a/a'
 
554
            b'a/a/a/a', b'a/a/a/b',
 
555
            # Contents of 'a/a/a-a',
 
556
            b'a/a/a-a/a', b'a/a/a-a/b',
 
557
            # Contents of 'a/a/a=a',
 
558
            b'a/a/a=a/a', b'a/a/a=a/b',
 
559
            # Contents of 'a/a-a'
 
560
            b'a/a-a/a',
 
561
            # Contents of 'a/a-a/a'
 
562
            b'a/a-a/a/a', b'a/a-a/a/b',
 
563
            # Contents of 'a/a=a'
 
564
            b'a/a=a/a',
 
565
            # Contents of 'a/b'
 
566
            b'a/b/a', b'a/b/b',
 
567
            # Contents of 'a-a',
 
568
            b'a-a/a', b'a-a/b',
 
569
            # Contents of 'a=a',
 
570
            b'a=a/a', b'a=a/b',
 
571
            # Contents of 'b',
 
572
            b'b/a', b'b/b',
 
573
            ])
 
574
        self.assertLtPathByDirblock([
 
575
                 # content of '/'
 
576
                 b'', b'a', b'a-a', b'a-z', b'a=a', b'a=z',
 
577
                 # content of 'a/'
 
578
                 b'a/a', b'a/a-a', b'a/a-z',
 
579
                 b'a/a=a', b'a/a=z',
 
580
                 b'a/z', b'a/z-a', b'a/z-z',
 
581
                 b'a/z=a', b'a/z=z',
 
582
                 # content of 'a/a/'
 
583
                 b'a/a/a', b'a/a/z',
 
584
                 # content of 'a/a-a'
 
585
                 b'a/a-a/a',
 
586
                 # content of 'a/a-z'
 
587
                 b'a/a-z/z',
 
588
                 # content of 'a/a=a'
 
589
                 b'a/a=a/a',
 
590
                 # content of 'a/a=z'
 
591
                 b'a/a=z/z',
 
592
                 # content of 'a/z/'
 
593
                 b'a/z/a', b'a/z/z',
 
594
                 # content of 'a-a'
 
595
                 b'a-a/a',
 
596
                 # content of 'a-z'
 
597
                 b'a-z/z',
 
598
                 # content of 'a=a'
 
599
                 b'a=a/a',
 
600
                 # content of 'a=z'
 
601
                 b'a=z/z',
 
602
                ])
 
603
 
 
604
    def test_unicode_not_allowed(self):
 
605
        lt_path_by_dirblock = self.get_lt_path_by_dirblock()
 
606
        self.assertRaises(TypeError, lt_path_by_dirblock, u'Uni', 'str')
 
607
        self.assertRaises(TypeError, lt_path_by_dirblock, 'str', u'Uni')
 
608
        self.assertRaises(TypeError, lt_path_by_dirblock, u'Uni', u'Uni')
 
609
        self.assertRaises(TypeError, lt_path_by_dirblock, u'x/Uni', 'x/str')
 
610
        self.assertRaises(TypeError, lt_path_by_dirblock, 'x/str', u'x/Uni')
 
611
        self.assertRaises(TypeError, lt_path_by_dirblock, u'x/Uni', u'x/Uni')
 
612
 
 
613
    def test_nonascii(self):
 
614
        self.assertLtPathByDirblock([
 
615
            # content of '/'
 
616
            b'', b'a', b'\xc2\xb5', b'\xc3\xa5',
 
617
            # content of 'a'
 
618
            b'a/a', b'a/\xc2\xb5', b'a/\xc3\xa5',
 
619
            # content of 'a/a'
 
620
            b'a/a/a', b'a/a/\xc2\xb5', b'a/a/\xc3\xa5',
 
621
            # content of 'a/\xc2\xb5'
 
622
            b'a/\xc2\xb5/a', b'a/\xc2\xb5/\xc2\xb5', b'a/\xc2\xb5/\xc3\xa5',
 
623
            # content of 'a/\xc3\xa5'
 
624
            b'a/\xc3\xa5/a', b'a/\xc3\xa5/\xc2\xb5', b'a/\xc3\xa5/\xc3\xa5',
 
625
            # content of '\xc2\xb5'
 
626
            b'\xc2\xb5/a', b'\xc2\xb5/\xc2\xb5', b'\xc2\xb5/\xc3\xa5',
 
627
            # content of '\xc2\xe5'
 
628
            b'\xc3\xa5/a', b'\xc3\xa5/\xc2\xb5', b'\xc3\xa5/\xc3\xa5',
 
629
            ])
 
630
 
 
631
 
 
632
class TestCompiledLtPathByDirblock(TestLtPathByDirblock):
 
633
    """Test the pyrex implementation of _lt_path_by_dirblock"""
 
634
 
 
635
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
636
 
 
637
    def get_lt_path_by_dirblock(self):
 
638
        from ..bzr._dirstate_helpers_pyx import _lt_path_by_dirblock
 
639
        return _lt_path_by_dirblock
 
640
 
 
641
 
 
642
class TestMemRChr(tests.TestCase):
 
643
    """Test memrchr functionality"""
 
644
 
 
645
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
646
 
 
647
    def assertMemRChr(self, expected, s, c):
 
648
        from breezy.bzr._dirstate_helpers_pyx import _py_memrchr
 
649
        self.assertEqual(expected, _py_memrchr(s, c))
 
650
 
 
651
    def test_missing(self):
 
652
        self.assertMemRChr(None, b'', b'a')
 
653
        self.assertMemRChr(None, b'', b'c')
 
654
        self.assertMemRChr(None, b'abcdefghijklm', b'q')
 
655
        self.assertMemRChr(None, b'aaaaaaaaaaaaaaaaaaaaaaa', b'b')
 
656
 
 
657
    def test_single_entry(self):
 
658
        self.assertMemRChr(0, b'abcdefghijklm', b'a')
 
659
        self.assertMemRChr(1, b'abcdefghijklm', b'b')
 
660
        self.assertMemRChr(2, b'abcdefghijklm', b'c')
 
661
        self.assertMemRChr(10, b'abcdefghijklm', b'k')
 
662
        self.assertMemRChr(11, b'abcdefghijklm', b'l')
 
663
        self.assertMemRChr(12, b'abcdefghijklm', b'm')
 
664
 
 
665
    def test_multiple(self):
 
666
        self.assertMemRChr(10, b'abcdefjklmabcdefghijklm', b'a')
 
667
        self.assertMemRChr(11, b'abcdefjklmabcdefghijklm', b'b')
 
668
        self.assertMemRChr(12, b'abcdefjklmabcdefghijklm', b'c')
 
669
        self.assertMemRChr(20, b'abcdefjklmabcdefghijklm', b'k')
 
670
        self.assertMemRChr(21, b'abcdefjklmabcdefghijklm', b'l')
 
671
        self.assertMemRChr(22, b'abcdefjklmabcdefghijklm', b'm')
 
672
        self.assertMemRChr(22, b'aaaaaaaaaaaaaaaaaaaaaaa', b'a')
 
673
 
 
674
    def test_with_nulls(self):
 
675
        self.assertMemRChr(10, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'a')
 
676
        self.assertMemRChr(11, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'b')
 
677
        self.assertMemRChr(12, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'c')
 
678
        self.assertMemRChr(20, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'k')
 
679
        self.assertMemRChr(21, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'l')
 
680
        self.assertMemRChr(22, b'abc\0\0\0jklmabc\0\0\0ghijklm', b'm')
 
681
        self.assertMemRChr(22, b'aaa\0\0\0aaaaaaa\0\0\0aaaaaaa', b'a')
 
682
        self.assertMemRChr(9, b'\0\0\0\0\0\0\0\0\0\0', b'\0')
 
683
 
 
684
 
 
685
class TestReadDirblocks(test_dirstate.TestCaseWithDirState):
 
686
    """Test an implementation of _read_dirblocks()
 
687
 
 
688
    _read_dirblocks() reads in all of the dirblock information from the disk
 
689
    file.
 
690
 
 
691
    Child test cases can override ``get_read_dirblocks`` to test a specific
 
692
    implementation.
 
693
    """
 
694
 
 
695
    # inherits scenarios from test_dirstate
 
696
 
 
697
    def get_read_dirblocks(self):
 
698
        from breezy.bzr._dirstate_helpers_py import _read_dirblocks
 
699
        return _read_dirblocks
 
700
 
 
701
    def test_smoketest(self):
 
702
        """Make sure that we can create and read back a simple file."""
 
703
        tree, state, expected = self.create_basic_dirstate()
 
704
        del tree
 
705
        state._read_header_if_needed()
 
706
        self.assertEqual(dirstate.DirState.NOT_IN_MEMORY,
 
707
                         state._dirblock_state)
 
708
        read_dirblocks = self.get_read_dirblocks()
 
709
        read_dirblocks(state)
 
710
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
711
                         state._dirblock_state)
 
712
 
 
713
    def test_trailing_garbage(self):
 
714
        tree, state, expected = self.create_basic_dirstate()
 
715
        # On Unix, we can write extra data as long as we haven't read yet, but
 
716
        # on Win32, if you've opened the file with FILE_SHARE_READ, trying to
 
717
        # open it in append mode will fail.
 
718
        state.unlock()
 
719
        f = open('dirstate', 'ab')
 
720
        try:
 
721
            # Add bogus trailing garbage
 
722
            f.write(b'bogus\n')
 
723
        finally:
 
724
            f.close()
 
725
            state.lock_read()
 
726
        e = self.assertRaises(dirstate.DirstateCorrupt,
 
727
                              state._read_dirblocks_if_needed)
 
728
        # Make sure we mention the bogus characters in the error
 
729
        self.assertContainsRe(str(e), 'bogus')
 
730
 
 
731
 
 
732
class TestCompiledReadDirblocks(TestReadDirblocks):
 
733
    """Test the pyrex implementation of _read_dirblocks"""
 
734
 
 
735
    _test_needs_features = [compiled_dirstate_helpers_feature]
 
736
 
 
737
    def get_read_dirblocks(self):
 
738
        from breezy.bzr._dirstate_helpers_pyx import _read_dirblocks
 
739
        return _read_dirblocks
 
740
 
 
741
 
 
742
class TestUsingCompiledIfAvailable(tests.TestCase):
 
743
    """Check that any compiled functions that are available are the default.
 
744
 
 
745
    It is possible to have typos, etc in the import line, such that
 
746
    _dirstate_helpers_pyx is actually available, but the compiled functions are
 
747
    not being used.
 
748
    """
 
749
 
 
750
    def test_bisect_dirblock(self):
 
751
        if compiled_dirstate_helpers_feature.available():
 
752
            from breezy.bzr._dirstate_helpers_pyx import bisect_dirblock
 
753
        else:
 
754
            from breezy.bzr._dirstate_helpers_py import bisect_dirblock
 
755
        self.assertIs(bisect_dirblock, dirstate.bisect_dirblock)
 
756
 
 
757
    def test__bisect_path_left(self):
 
758
        if compiled_dirstate_helpers_feature.available():
 
759
            from breezy.bzr._dirstate_helpers_pyx import _bisect_path_left
 
760
        else:
 
761
            from breezy.bzr._dirstate_helpers_py import _bisect_path_left
 
762
        self.assertIs(_bisect_path_left, dirstate._bisect_path_left)
 
763
 
 
764
    def test__bisect_path_right(self):
 
765
        if compiled_dirstate_helpers_feature.available():
 
766
            from breezy.bzr._dirstate_helpers_pyx import _bisect_path_right
 
767
        else:
 
768
            from breezy.bzr._dirstate_helpers_py import _bisect_path_right
 
769
        self.assertIs(_bisect_path_right, dirstate._bisect_path_right)
 
770
 
 
771
    def test_lt_by_dirs(self):
 
772
        if compiled_dirstate_helpers_feature.available():
 
773
            from ..bzr._dirstate_helpers_pyx import lt_by_dirs
 
774
        else:
 
775
            from ..bzr._dirstate_helpers_py import lt_by_dirs
 
776
        self.assertIs(lt_by_dirs, dirstate.lt_by_dirs)
 
777
 
 
778
    def test__read_dirblocks(self):
 
779
        if compiled_dirstate_helpers_feature.available():
 
780
            from breezy.bzr._dirstate_helpers_pyx import _read_dirblocks
 
781
        else:
 
782
            from breezy.bzr._dirstate_helpers_py import _read_dirblocks
 
783
        self.assertIs(_read_dirblocks, dirstate._read_dirblocks)
 
784
 
 
785
    def test_update_entry(self):
 
786
        if compiled_dirstate_helpers_feature.available():
 
787
            from breezy.bzr._dirstate_helpers_pyx import update_entry
 
788
        else:
 
789
            from breezy.bzr.dirstate import update_entry
 
790
        self.assertIs(update_entry, dirstate.update_entry)
 
791
 
 
792
    def test_process_entry(self):
 
793
        if compiled_dirstate_helpers_feature.available():
 
794
            from breezy.bzr._dirstate_helpers_pyx import ProcessEntryC
 
795
            self.assertIs(ProcessEntryC, dirstate._process_entry)
 
796
        else:
 
797
            from breezy.bzr.dirstate import ProcessEntryPython
 
798
            self.assertIs(ProcessEntryPython, dirstate._process_entry)
 
799
 
 
800
 
 
801
class TestUpdateEntry(test_dirstate.TestCaseWithDirState):
 
802
    """Test the DirState.update_entry functions"""
 
803
 
 
804
    scenarios = multiply_scenarios(
 
805
        dir_reader_scenarios(), ue_scenarios)
 
806
 
 
807
    # Set by load_tests
 
808
    update_entry = None
 
809
 
 
810
    def setUp(self):
 
811
        super(TestUpdateEntry, self).setUp()
 
812
        self.overrideAttr(dirstate, 'update_entry', self.update_entry)
 
813
 
 
814
    def get_state_with_a(self):
 
815
        """Create a DirState tracking a single object named 'a'"""
 
816
        state = test_dirstate.InstrumentedDirState.initialize('dirstate')
 
817
        self.addCleanup(state.unlock)
 
818
        state.add('a', b'a-id', 'file', None, b'')
 
819
        entry = state._get_entry(0, path_utf8=b'a')
 
820
        return state, entry
 
821
 
 
822
    def test_observed_sha1_cachable(self):
 
823
        state, entry = self.get_state_with_a()
 
824
        state.save()
 
825
        atime = time.time() - 10
 
826
        self.build_tree(['a'])
 
827
        statvalue = test_dirstate._FakeStat.from_stat(os.lstat('a'))
 
828
        statvalue.st_mtime = statvalue.st_ctime = atime
 
829
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
830
                         state._dirblock_state)
 
831
        state._observed_sha1(entry, b"foo", statvalue)
 
832
        self.assertEqual(b'foo', entry[1][0][1])
 
833
        packed_stat = dirstate.pack_stat(statvalue)
 
834
        self.assertEqual(packed_stat, entry[1][0][4])
 
835
        self.assertEqual(dirstate.DirState.IN_MEMORY_HASH_MODIFIED,
 
836
                         state._dirblock_state)
 
837
 
 
838
    def test_observed_sha1_not_cachable(self):
 
839
        state, entry = self.get_state_with_a()
 
840
        state.save()
 
841
        oldval = entry[1][0][1]
 
842
        oldstat = entry[1][0][4]
 
843
        self.build_tree(['a'])
 
844
        statvalue = os.lstat('a')
 
845
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
846
                         state._dirblock_state)
 
847
        state._observed_sha1(entry, "foo", statvalue)
 
848
        self.assertEqual(oldval, entry[1][0][1])
 
849
        self.assertEqual(oldstat, entry[1][0][4])
 
850
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
851
                         state._dirblock_state)
 
852
 
 
853
    def test_update_entry(self):
 
854
        state, _ = self.get_state_with_a()
 
855
        tree = self.make_branch_and_tree('tree')
 
856
        tree.lock_write()
 
857
        empty_revid = tree.commit('empty')
 
858
        self.build_tree(['tree/a'])
 
859
        tree.add(['a'], [b'a-id'])
 
860
        with_a_id = tree.commit('with_a')
 
861
        self.addCleanup(tree.unlock)
 
862
        state.set_parent_trees(
 
863
            [(empty_revid, tree.branch.repository.revision_tree(empty_revid))],
 
864
            [])
 
865
        entry = state._get_entry(0, path_utf8=b'a')
 
866
        self.build_tree(['a'])
 
867
        # Add one where we don't provide the stat or sha already
 
868
        self.assertEqual((b'', b'a', b'a-id'), entry[0])
 
869
        self.assertEqual((b'f', b'', 0, False, dirstate.DirState.NULLSTAT),
 
870
                         entry[1][0])
 
871
        # Flush the buffers to disk
 
872
        state.save()
 
873
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
874
                         state._dirblock_state)
 
875
 
 
876
        stat_value = os.lstat('a')
 
877
        packed_stat = dirstate.pack_stat(stat_value)
 
878
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
879
                                          stat_value=stat_value)
 
880
        self.assertEqual(None, link_or_sha1)
 
881
 
 
882
        # The dirblock entry should not have computed or cached the file's
 
883
        # sha1, but it did update the files' st_size. However, this is not
 
884
        # worth writing a dirstate file for, so we leave the state UNMODIFIED
 
885
        self.assertEqual((b'f', b'', 14, False, dirstate.DirState.NULLSTAT),
 
886
                         entry[1][0])
 
887
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
888
                         state._dirblock_state)
 
889
        mode = stat_value.st_mode
 
890
        self.assertEqual([('is_exec', mode, False)], state._log)
 
891
 
 
892
        state.save()
 
893
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
894
                         state._dirblock_state)
 
895
 
 
896
        # Roll the clock back so the file is guaranteed to look too new. We
 
897
        # should still not compute the sha1.
 
898
        state.adjust_time(-10)
 
899
        del state._log[:]
 
900
 
 
901
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
902
                                          stat_value=stat_value)
 
903
        self.assertEqual([('is_exec', mode, False)], state._log)
 
904
        self.assertEqual(None, link_or_sha1)
 
905
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
906
                         state._dirblock_state)
 
907
        self.assertEqual((b'f', b'', 14, False, dirstate.DirState.NULLSTAT),
 
908
                         entry[1][0])
 
909
        state.save()
 
910
 
 
911
        # If it is cachable (the clock has moved forward) but new it still
 
912
        # won't calculate the sha or cache it.
 
913
        state.adjust_time(+20)
 
914
        del state._log[:]
 
915
        link_or_sha1 = dirstate.update_entry(state, entry, abspath=b'a',
 
916
                                          stat_value=stat_value)
 
917
        self.assertEqual(None, link_or_sha1)
 
918
        self.assertEqual([('is_exec', mode, False)], state._log)
 
919
        self.assertEqual((b'f', b'', 14, False, dirstate.DirState.NULLSTAT),
 
920
                         entry[1][0])
 
921
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
922
                         state._dirblock_state)
 
923
 
 
924
        # If the file is no longer new, and the clock has been moved forward
 
925
        # sufficiently, it will cache the sha.
 
926
        del state._log[:]
 
927
        state.set_parent_trees(
 
928
            [(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
 
929
            [])
 
930
        entry = state._get_entry(0, path_utf8=b'a')
 
931
 
 
932
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
933
                                          stat_value=stat_value)
 
934
        self.assertEqual(b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
 
935
                         link_or_sha1)
 
936
        self.assertEqual([('is_exec', mode, False), ('sha1', b'a')],
 
937
                          state._log)
 
938
        self.assertEqual((b'f', link_or_sha1, 14, False, packed_stat),
 
939
                         entry[1][0])
 
940
 
 
941
        # Subsequent calls will just return the cached value
 
942
        del state._log[:]
 
943
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
944
                                          stat_value=stat_value)
 
945
        self.assertEqual(b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
 
946
                         link_or_sha1)
 
947
        self.assertEqual([], state._log)
 
948
        self.assertEqual((b'f', link_or_sha1, 14, False, packed_stat),
 
949
                         entry[1][0])
 
950
 
 
951
    def test_update_entry_symlink(self):
 
952
        """Update entry should read symlinks."""
 
953
        self.requireFeature(features.SymlinkFeature)
 
954
        state, entry = self.get_state_with_a()
 
955
        state.save()
 
956
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
957
                         state._dirblock_state)
 
958
        os.symlink('target', 'a')
 
959
 
 
960
        state.adjust_time(-10) # Make the symlink look new
 
961
        stat_value = os.lstat('a')
 
962
        packed_stat = dirstate.pack_stat(stat_value)
 
963
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
964
                                          stat_value=stat_value)
 
965
        self.assertEqual(b'target', link_or_sha1)
 
966
        self.assertEqual([('read_link', b'a', b'')], state._log)
 
967
        # Dirblock is not updated (the link is too new)
 
968
        self.assertEqual([(b'l', b'', 6, False, dirstate.DirState.NULLSTAT)],
 
969
                         entry[1])
 
970
        # The file entry turned into a symlink, that is considered
 
971
        # HASH modified worthy.
 
972
        self.assertEqual(dirstate.DirState.IN_MEMORY_HASH_MODIFIED,
 
973
                         state._dirblock_state)
 
974
 
 
975
        # Because the stat_value looks new, we should re-read the target
 
976
        del state._log[:]
 
977
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
978
                                          stat_value=stat_value)
 
979
        self.assertEqual(b'target', link_or_sha1)
 
980
        self.assertEqual([('read_link', b'a', b'')], state._log)
 
981
        self.assertEqual([(b'l', b'', 6, False, dirstate.DirState.NULLSTAT)],
 
982
                         entry[1])
 
983
        state.save()
 
984
        state.adjust_time(+20) # Skip into the future, all files look old
 
985
        del state._log[:]
 
986
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
987
                                          stat_value=stat_value)
 
988
        # The symlink stayed a symlink. So while it is new enough to cache, we
 
989
        # don't bother setting the flag, because it is not really worth saving
 
990
        # (when we stat the symlink, we'll have paged in the target.)
 
991
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
992
                         state._dirblock_state)
 
993
        self.assertEqual(b'target', link_or_sha1)
 
994
        # We need to re-read the link because only now can we cache it
 
995
        self.assertEqual([('read_link', b'a', b'')], state._log)
 
996
        self.assertEqual([(b'l', b'target', 6, False, packed_stat)],
 
997
                         entry[1])
 
998
 
 
999
        del state._log[:]
 
1000
        # Another call won't re-read the link
 
1001
        self.assertEqual([], state._log)
 
1002
        link_or_sha1 = self.update_entry(state, entry, abspath=b'a',
 
1003
                                          stat_value=stat_value)
 
1004
        self.assertEqual(b'target', link_or_sha1)
 
1005
        self.assertEqual([(b'l', b'target', 6, False, packed_stat)],
 
1006
                         entry[1])
 
1007
 
 
1008
    def do_update_entry(self, state, entry, abspath):
 
1009
        stat_value = os.lstat(abspath)
 
1010
        return self.update_entry(state, entry, abspath, stat_value)
 
1011
 
 
1012
    def test_update_entry_dir(self):
 
1013
        state, entry = self.get_state_with_a()
 
1014
        self.build_tree(['a/'])
 
1015
        self.assertIs(None, self.do_update_entry(state, entry, b'a'))
 
1016
 
 
1017
    def test_update_entry_dir_unchanged(self):
 
1018
        state, entry = self.get_state_with_a()
 
1019
        self.build_tree(['a/'])
 
1020
        state.adjust_time(+20)
 
1021
        self.assertIs(None, self.do_update_entry(state, entry, b'a'))
 
1022
        # a/ used to be a file, but is now a directory, worth saving
 
1023
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
1024
                         state._dirblock_state)
 
1025
        state.save()
 
1026
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1027
                         state._dirblock_state)
 
1028
        # No changes to a/ means not worth saving.
 
1029
        self.assertIs(None, self.do_update_entry(state, entry, b'a'))
 
1030
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1031
                         state._dirblock_state)
 
1032
        # Change the last-modified time for the directory
 
1033
        t = time.time() - 100.0
 
1034
        try:
 
1035
            os.utime('a', (t, t))
 
1036
        except OSError:
 
1037
            # It looks like Win32 + FAT doesn't allow to change times on a dir.
 
1038
            raise tests.TestSkipped("can't update mtime of a dir on FAT")
 
1039
        saved_packed_stat = entry[1][0][-1]
 
1040
        self.assertIs(None, self.do_update_entry(state, entry, b'a'))
 
1041
        # We *do* go ahead and update the information in the dirblocks, but we
 
1042
        # don't bother setting IN_MEMORY_MODIFIED because it is trivial to
 
1043
        # recompute.
 
1044
        self.assertNotEqual(saved_packed_stat, entry[1][0][-1])
 
1045
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1046
                         state._dirblock_state)
 
1047
 
 
1048
    def test_update_entry_file_unchanged(self):
 
1049
        state, _ = self.get_state_with_a()
 
1050
        tree = self.make_branch_and_tree('tree')
 
1051
        tree.lock_write()
 
1052
        self.build_tree(['tree/a'])
 
1053
        tree.add(['a'], [b'a-id'])
 
1054
        with_a_id = tree.commit('witha')
 
1055
        self.addCleanup(tree.unlock)
 
1056
        state.set_parent_trees(
 
1057
            [(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
 
1058
            [])
 
1059
        entry = state._get_entry(0, path_utf8=b'a')
 
1060
        self.build_tree(['a'])
 
1061
        sha1sum = b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
 
1062
        state.adjust_time(+20)
 
1063
        self.assertEqual(sha1sum, self.do_update_entry(state, entry, b'a'))
 
1064
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
1065
                         state._dirblock_state)
 
1066
        state.save()
 
1067
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1068
                         state._dirblock_state)
 
1069
        self.assertEqual(sha1sum, self.do_update_entry(state, entry, b'a'))
 
1070
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1071
                         state._dirblock_state)
 
1072
 
 
1073
    def test_update_entry_tree_reference(self):
 
1074
        state = test_dirstate.InstrumentedDirState.initialize('dirstate')
 
1075
        self.addCleanup(state.unlock)
 
1076
        state.add('r', b'r-id', 'tree-reference', None, b'')
 
1077
        self.build_tree(['r/'])
 
1078
        entry = state._get_entry(0, path_utf8=b'r')
 
1079
        self.do_update_entry(state, entry, 'r')
 
1080
        entry = state._get_entry(0, path_utf8=b'r')
 
1081
        self.assertEqual(b't', entry[1][0][0])
 
1082
 
 
1083
    def create_and_test_file(self, state, entry):
 
1084
        """Create a file at 'a' and verify the state finds it during update.
 
1085
 
 
1086
        The state should already be versioning *something* at 'a'. This makes
 
1087
        sure that state.update_entry recognizes it as a file.
 
1088
        """
 
1089
        self.build_tree(['a'])
 
1090
        stat_value = os.lstat('a')
 
1091
        packed_stat = dirstate.pack_stat(stat_value)
 
1092
 
 
1093
        link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
 
1094
        self.assertEqual(None, link_or_sha1)
 
1095
        self.assertEqual([(b'f', b'', 14, False, dirstate.DirState.NULLSTAT)],
 
1096
                         entry[1])
 
1097
        return packed_stat
 
1098
 
 
1099
    def create_and_test_dir(self, state, entry):
 
1100
        """Create a directory at 'a' and verify the state finds it.
 
1101
 
 
1102
        The state should already be versioning *something* at 'a'. This makes
 
1103
        sure that state.update_entry recognizes it as a directory.
 
1104
        """
 
1105
        self.build_tree(['a/'])
 
1106
        stat_value = os.lstat('a')
 
1107
        packed_stat = dirstate.pack_stat(stat_value)
 
1108
 
 
1109
        link_or_sha1 = self.do_update_entry(state, entry, abspath=b'a')
 
1110
        self.assertIs(None, link_or_sha1)
 
1111
        self.assertEqual([(b'd', b'', 0, False, packed_stat)], entry[1])
 
1112
 
 
1113
        return packed_stat
 
1114
 
 
1115
    # FIXME: Add unicode version
 
1116
    def create_and_test_symlink(self, state, entry):
 
1117
        """Create a symlink at 'a' and verify the state finds it.
 
1118
 
 
1119
        The state should already be versioning *something* at 'a'. This makes
 
1120
        sure that state.update_entry recognizes it as a symlink.
 
1121
 
 
1122
        This should not be called if this platform does not have symlink
 
1123
        support.
 
1124
        """
 
1125
        # caller should care about skipping test on platforms without symlinks
 
1126
        os.symlink('path/to/foo', 'a')
 
1127
 
 
1128
        stat_value = os.lstat('a')
 
1129
        packed_stat = dirstate.pack_stat(stat_value)
 
1130
 
 
1131
        link_or_sha1 = self.do_update_entry(state, entry, abspath=b'a')
 
1132
        self.assertEqual(b'path/to/foo', link_or_sha1)
 
1133
        self.assertEqual([(b'l', b'path/to/foo', 11, False, packed_stat)],
 
1134
                         entry[1])
 
1135
        return packed_stat
 
1136
 
 
1137
    def test_update_file_to_dir(self):
 
1138
        """If a file changes to a directory we return None for the sha.
 
1139
        We also update the inventory record.
 
1140
        """
 
1141
        state, entry = self.get_state_with_a()
 
1142
        # The file sha1 won't be cached unless the file is old
 
1143
        state.adjust_time(+10)
 
1144
        self.create_and_test_file(state, entry)
 
1145
        os.remove('a')
 
1146
        self.create_and_test_dir(state, entry)
 
1147
 
 
1148
    def test_update_file_to_symlink(self):
 
1149
        """File becomes a symlink"""
 
1150
        self.requireFeature(features.SymlinkFeature)
 
1151
        state, entry = self.get_state_with_a()
 
1152
        # The file sha1 won't be cached unless the file is old
 
1153
        state.adjust_time(+10)
 
1154
        self.create_and_test_file(state, entry)
 
1155
        os.remove('a')
 
1156
        self.create_and_test_symlink(state, entry)
 
1157
 
 
1158
    def test_update_dir_to_file(self):
 
1159
        """Directory becoming a file updates the entry."""
 
1160
        state, entry = self.get_state_with_a()
 
1161
        # The file sha1 won't be cached unless the file is old
 
1162
        state.adjust_time(+10)
 
1163
        self.create_and_test_dir(state, entry)
 
1164
        os.rmdir('a')
 
1165
        self.create_and_test_file(state, entry)
 
1166
 
 
1167
    def test_update_dir_to_symlink(self):
 
1168
        """Directory becomes a symlink"""
 
1169
        self.requireFeature(features.SymlinkFeature)
 
1170
        state, entry = self.get_state_with_a()
 
1171
        # The symlink target won't be cached if it isn't old
 
1172
        state.adjust_time(+10)
 
1173
        self.create_and_test_dir(state, entry)
 
1174
        os.rmdir('a')
 
1175
        self.create_and_test_symlink(state, entry)
 
1176
 
 
1177
    def test_update_symlink_to_file(self):
 
1178
        """Symlink becomes a file"""
 
1179
        self.requireFeature(features.SymlinkFeature)
 
1180
        state, entry = self.get_state_with_a()
 
1181
        # The symlink and file info won't be cached unless old
 
1182
        state.adjust_time(+10)
 
1183
        self.create_and_test_symlink(state, entry)
 
1184
        os.remove('a')
 
1185
        self.create_and_test_file(state, entry)
 
1186
 
 
1187
    def test_update_symlink_to_dir(self):
 
1188
        """Symlink becomes a directory"""
 
1189
        self.requireFeature(features.SymlinkFeature)
 
1190
        state, entry = self.get_state_with_a()
 
1191
        # The symlink target won't be cached if it isn't old
 
1192
        state.adjust_time(+10)
 
1193
        self.create_and_test_symlink(state, entry)
 
1194
        os.remove('a')
 
1195
        self.create_and_test_dir(state, entry)
 
1196
 
 
1197
    def test__is_executable_win32(self):
 
1198
        state, entry = self.get_state_with_a()
 
1199
        self.build_tree(['a'])
 
1200
 
 
1201
        # Make sure we are using the win32 implementation of _is_executable
 
1202
        state._is_executable = state._is_executable_win32
 
1203
 
 
1204
        # The file on disk is not executable, but we are marking it as though
 
1205
        # it is. With _is_executable_win32 we ignore what is on disk.
 
1206
        entry[1][0] = (b'f', b'', 0, True, dirstate.DirState.NULLSTAT)
 
1207
 
 
1208
        stat_value = os.lstat('a')
 
1209
        packed_stat = dirstate.pack_stat(stat_value)
 
1210
 
 
1211
        state.adjust_time(-10) # Make sure everything is new
 
1212
        self.update_entry(state, entry, abspath=b'a', stat_value=stat_value)
 
1213
 
 
1214
        # The row is updated, but the executable bit stays set.
 
1215
        self.assertEqual([(b'f', b'', 14, True, dirstate.DirState.NULLSTAT)],
 
1216
                         entry[1])
 
1217
 
 
1218
        # Make the disk object look old enough to cache (but it won't cache the
 
1219
        # sha as it is a new file).
 
1220
        state.adjust_time(+20)
 
1221
        digest = b'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
 
1222
        self.update_entry(state, entry, abspath=b'a', stat_value=stat_value)
 
1223
        self.assertEqual([(b'f', b'', 14, True, dirstate.DirState.NULLSTAT)],
 
1224
            entry[1])
 
1225
 
 
1226
    def _prepare_tree(self):
 
1227
        # Create a tree
 
1228
        text = b'Hello World\n'
 
1229
        tree = self.make_branch_and_tree('tree')
 
1230
        self.build_tree_contents([('tree/a file', text)])
 
1231
        tree.add('a file', b'a-file-id')
 
1232
        # Note: dirstate does not sha prior to the first commit
 
1233
        # so commit now in order for the test to work
 
1234
        tree.commit('first')
 
1235
        return tree, text
 
1236
 
 
1237
    def test_sha1provider_sha1_used(self):
 
1238
        tree, text = self._prepare_tree()
 
1239
        state = dirstate.DirState.from_tree(tree, 'dirstate',
 
1240
            UppercaseSHA1Provider())
 
1241
        self.addCleanup(state.unlock)
 
1242
        expected_sha = osutils.sha_string(text.upper() + b"foo")
 
1243
        entry = state._get_entry(0, path_utf8=b'a file')
 
1244
        self.assertNotEqual((None, None), entry)
 
1245
        state._sha_cutoff_time()
 
1246
        state._cutoff_time += 10
 
1247
        sha1 = self.update_entry(state, entry, 'tree/a file',
 
1248
                                 os.lstat('tree/a file'))
 
1249
        self.assertEqual(expected_sha, sha1)
 
1250
 
 
1251
    def test_sha1provider_stat_and_sha1_used(self):
 
1252
        tree, text = self._prepare_tree()
 
1253
        tree.lock_write()
 
1254
        self.addCleanup(tree.unlock)
 
1255
        state = tree._current_dirstate()
 
1256
        state._sha1_provider = UppercaseSHA1Provider()
 
1257
        # If we used the standard provider, it would look like nothing has
 
1258
        # changed
 
1259
        file_ids_changed = [change[0] for change
 
1260
                            in tree.iter_changes(tree.basis_tree())]
 
1261
        self.assertEqual([b'a-file-id'], file_ids_changed)
 
1262
 
 
1263
 
 
1264
class UppercaseSHA1Provider(dirstate.SHA1Provider):
 
1265
    """A custom SHA1Provider."""
 
1266
 
 
1267
    def sha1(self, abspath):
 
1268
        return self.stat_and_sha1(abspath)[1]
 
1269
 
 
1270
    def stat_and_sha1(self, abspath):
 
1271
        with open(abspath, 'rb') as file_obj:
 
1272
            statvalue = os.fstat(file_obj.fileno())
 
1273
            text = b''.join(file_obj.readlines())
 
1274
            sha1 = osutils.sha_string(text.upper() + b"foo")
 
1275
        return statvalue, sha1
 
1276
 
 
1277
 
 
1278
class TestProcessEntry(test_dirstate.TestCaseWithDirState):
 
1279
 
 
1280
    scenarios = multiply_scenarios(dir_reader_scenarios(), pe_scenarios)
 
1281
 
 
1282
    # Set by load_tests
 
1283
    _process_entry = None
 
1284
 
 
1285
    def setUp(self):
 
1286
        super(TestProcessEntry, self).setUp()
 
1287
        self.overrideAttr(dirstate, '_process_entry', self._process_entry)
 
1288
 
 
1289
    def assertChangedFileIds(self, expected, tree):
 
1290
        tree.lock_read()
 
1291
        try:
 
1292
            file_ids = [info[0] for info
 
1293
                        in tree.iter_changes(tree.basis_tree())]
 
1294
        finally:
 
1295
            tree.unlock()
 
1296
        self.assertEqual(sorted(expected), sorted(file_ids))
 
1297
 
 
1298
    def test_exceptions_raised(self):
 
1299
        # This is a direct test of bug #495023, it relies on osutils.is_inside
 
1300
        # getting called in an inner function. Which makes it a bit brittle,
 
1301
        # but at least it does reproduce the bug.
 
1302
        tree = self.make_branch_and_tree('tree')
 
1303
        self.build_tree(['tree/file', 'tree/dir/', 'tree/dir/sub',
 
1304
                         'tree/dir2/', 'tree/dir2/sub2'])
 
1305
        tree.add(['file', 'dir', 'dir/sub', 'dir2', 'dir2/sub2'])
 
1306
        tree.commit('first commit')
 
1307
        tree.lock_read()
 
1308
        self.addCleanup(tree.unlock)
 
1309
        basis_tree = tree.basis_tree()
 
1310
        def is_inside_raises(*args, **kwargs):
 
1311
            raise RuntimeError('stop this')
 
1312
        self.overrideAttr(dirstate, 'is_inside', is_inside_raises)
 
1313
        try:
 
1314
            from breezy.bzr import _dirstate_helpers_pyx
 
1315
        except ImportError:
 
1316
            pass
 
1317
        else:
 
1318
            self.overrideAttr(_dirstate_helpers_pyx, 'is_inside', is_inside_raises)
 
1319
        self.overrideAttr(osutils, 'is_inside', is_inside_raises)
 
1320
        self.assertListRaises(RuntimeError, tree.iter_changes, basis_tree)
 
1321
 
 
1322
    def test_simple_changes(self):
 
1323
        tree = self.make_branch_and_tree('tree')
 
1324
        self.build_tree(['tree/file'])
 
1325
        tree.add(['file'], [b'file-id'])
 
1326
        self.assertChangedFileIds([tree.get_root_id(), b'file-id'], tree)
 
1327
        tree.commit('one')
 
1328
        self.assertChangedFileIds([], tree)
 
1329
 
 
1330
    def test_sha1provider_stat_and_sha1_used(self):
 
1331
        tree = self.make_branch_and_tree('tree')
 
1332
        self.build_tree(['tree/file'])
 
1333
        tree.add(['file'], [b'file-id'])
 
1334
        tree.commit('one')
 
1335
        tree.lock_write()
 
1336
        self.addCleanup(tree.unlock)
 
1337
        state = tree._current_dirstate()
 
1338
        state._sha1_provider = UppercaseSHA1Provider()
 
1339
        self.assertChangedFileIds([b'file-id'], tree)
 
1340
 
 
1341
 
 
1342
class TestPackStat(tests.TestCase):
 
1343
    """Check packed representaton of stat values is robust on all inputs"""
 
1344
 
 
1345
    scenarios = helper_scenarios
 
1346
 
 
1347
    def pack(self, statlike_tuple):
 
1348
        return self.helpers.pack_stat(os.stat_result(statlike_tuple))
 
1349
 
 
1350
    @staticmethod
 
1351
    def unpack_field(packed_string, stat_field):
 
1352
        return _dirstate_helpers_py._unpack_stat(packed_string)[stat_field]
 
1353
 
 
1354
    def test_result(self):
 
1355
        self.assertEqual(b"AAAQAAAAABAAAAARAAAAAgAAAAEAAIHk",
 
1356
            self.pack((33252, 1, 2, 0, 0, 0, 4096, 15.5, 16.5, 17.5)))
 
1357
 
 
1358
    def test_giant_inode(self):
 
1359
        packed = self.pack((33252, 0xF80000ABC, 0, 0, 0, 0, 0, 0, 0, 0))
 
1360
        self.assertEqual(0x80000ABC, self.unpack_field(packed, "st_ino"))
 
1361
 
 
1362
    def test_giant_size(self):
 
1363
        packed = self.pack((33252, 0, 0, 0, 0, 0, (1 << 33) + 4096, 0, 0, 0))
 
1364
        self.assertEqual(4096, self.unpack_field(packed, "st_size"))
 
1365
 
 
1366
    def test_fractional_mtime(self):
 
1367
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 16.9375, 0))
 
1368
        self.assertEqual(16, self.unpack_field(packed, "st_mtime"))
 
1369
 
 
1370
    def test_ancient_mtime(self):
 
1371
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, -11644473600.0, 0))
 
1372
        self.assertEqual(1240428288, self.unpack_field(packed, "st_mtime"))
 
1373
 
 
1374
    def test_distant_mtime(self):
 
1375
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 64060588800.0, 0))
 
1376
        self.assertEqual(3931046656, self.unpack_field(packed, "st_mtime"))
 
1377
 
 
1378
    def test_fractional_ctime(self):
 
1379
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, 17.5625))
 
1380
        self.assertEqual(17, self.unpack_field(packed, "st_ctime"))
 
1381
 
 
1382
    def test_ancient_ctime(self):
 
1383
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, -11644473600.0))
 
1384
        self.assertEqual(1240428288, self.unpack_field(packed, "st_ctime"))
 
1385
 
 
1386
    def test_distant_ctime(self):
 
1387
        packed = self.pack((33252, 0, 0, 0, 0, 0, 0, 0, 0, 64060588800.0))
 
1388
        self.assertEqual(3931046656, self.unpack_field(packed, "st_ctime"))
 
1389
 
 
1390
    def test_negative_dev(self):
 
1391
        packed = self.pack((33252, 0, -0xFFFFFCDE, 0, 0, 0, 0, 0, 0, 0))
 
1392
        self.assertEqual(0x322, self.unpack_field(packed, "st_dev"))