/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_hashcache.py

  • Committer: Jelmer Vernooij
  • Date: 2020-04-05 19:11:34 UTC
  • mto: (7490.7.16 work)
  • mto: This revision was merged to the branch mainline in revision 7501.
  • Revision ID: jelmer@jelmer.uk-20200405191134-0aebh8ikiwygxma5
Populate the .gitignore file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
import os
18
18
import stat
19
 
import sys
20
19
import time
21
20
 
22
 
from bzrlib import osutils
23
 
from bzrlib.errors import BzrError
24
 
from bzrlib.hashcache import HashCache
25
 
from bzrlib.tests import OsFifoFeature, TestCaseInTempDir, TestCase
26
 
 
27
 
 
28
 
def sha1(t):
29
 
    return osutils.sha(t).hexdigest()
 
21
from .. import osutils
 
22
from ..errors import BzrError
 
23
from ..hashcache import HashCache
 
24
from . import (
 
25
    TestCaseInTempDir,
 
26
    )
 
27
from .features import (
 
28
    OsFifoFeature,
 
29
    )
 
30
 
 
31
 
 
32
sha1 = osutils.sha_string
30
33
 
31
34
 
32
35
def pause():
39
42
    def make_hashcache(self):
40
43
        # make a dummy bzr directory just to hold the cache
41
44
        os.mkdir('.bzr')
42
 
        hc = HashCache('.', '.bzr/stat-cache')
 
45
        hc = HashCache(u'.', '.bzr/stat-cache')
43
46
        return hc
44
47
 
45
48
    def reopen_hashcache(self):
46
 
        hc = HashCache('.', '.bzr/stat-cache')
 
49
        hc = HashCache(u'.', '.bzr/stat-cache')
47
50
        hc.read()
48
51
        return hc
49
52
 
50
53
    def test_hashcache_initial_miss(self):
51
54
        """Get correct hash from an empty hashcache"""
52
55
        hc = self.make_hashcache()
53
 
        self.build_tree_contents([('foo', 'hello')])
54
 
        self.assertEquals(hc.get_sha1('foo'),
55
 
                          'aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d')
56
 
        self.assertEquals(hc.miss_count, 1)
57
 
        self.assertEquals(hc.hit_count, 0)
 
56
        self.build_tree_contents([('foo', b'hello')])
 
57
        self.assertEqual(hc.get_sha1('foo'),
 
58
                         b'aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d')
 
59
        self.assertEqual(hc.miss_count, 1)
 
60
        self.assertEqual(hc.hit_count, 0)
58
61
 
59
62
    def test_hashcache_new_file(self):
60
63
        hc = self.make_hashcache()
61
 
        self.build_tree_contents([('foo', 'goodbye')])
 
64
        self.build_tree_contents([('foo', b'goodbye')])
62
65
        # now read without pausing; it may not be possible to cache it as its
63
66
        # so new
64
 
        self.assertEquals(hc.get_sha1('foo'), sha1('goodbye'))
 
67
        self.assertEqual(hc.get_sha1('foo'), sha1(b'goodbye'))
65
68
 
66
69
    def test_hashcache_nonexistent_file(self):
67
70
        hc = self.make_hashcache()
68
 
        self.assertEquals(hc.get_sha1('no-name-yet'), None)
 
71
        self.assertEqual(hc.get_sha1('no-name-yet'), None)
69
72
 
70
73
    def test_hashcache_replaced_file(self):
71
74
        hc = self.make_hashcache()
72
 
        self.build_tree_contents([('foo', 'goodbye')])
73
 
        self.assertEquals(hc.get_sha1('foo'), sha1('goodbye'))
 
75
        self.build_tree_contents([('foo', b'goodbye')])
 
76
        self.assertEqual(hc.get_sha1('foo'), sha1(b'goodbye'))
74
77
        os.remove('foo')
75
 
        self.assertEquals(hc.get_sha1('foo'), None)
76
 
        self.build_tree_contents([('foo', 'new content')])
77
 
        self.assertEquals(hc.get_sha1('foo'), sha1('new content'))
 
78
        self.assertEqual(hc.get_sha1('foo'), None)
 
79
        self.build_tree_contents([('foo', b'new content')])
 
80
        self.assertEqual(hc.get_sha1('foo'), sha1(b'new content'))
78
81
 
79
82
    def test_hashcache_not_file(self):
80
83
        hc = self.make_hashcache()
81
84
        self.build_tree(['subdir/'])
82
 
        self.assertEquals(hc.get_sha1('subdir'), None)
 
85
        self.assertEqual(hc.get_sha1('subdir'), None)
83
86
 
84
87
    def test_hashcache_load(self):
85
88
        hc = self.make_hashcache()
86
 
        self.build_tree_contents([('foo', 'contents')])
 
89
        self.build_tree_contents([('foo', b'contents')])
87
90
        pause()
88
 
        self.assertEquals(hc.get_sha1('foo'), sha1('contents'))
 
91
        self.assertEqual(hc.get_sha1('foo'), sha1(b'contents'))
89
92
        hc.write()
90
93
        hc = self.reopen_hashcache()
91
 
        self.assertEquals(hc.get_sha1('foo'), sha1('contents'))
92
 
        self.assertEquals(hc.hit_count, 1)
 
94
        self.assertEqual(hc.get_sha1('foo'), sha1(b'contents'))
 
95
        self.assertEqual(hc.hit_count, 1)
93
96
 
94
97
    def test_hammer_hashcache(self):
95
98
        hc = self.make_hashcache()
96
 
        for i in xrange(10000):
97
 
            self.log('start writing at %s', time.time())
98
 
            f = file('foo', 'w')
99
 
            try:
100
 
                last_content = '%08x' % i
 
99
        for i in range(10000):
 
100
            with open('foo', 'wb') as f:
 
101
                last_content = b'%08x' % i
101
102
                f.write(last_content)
102
 
            finally:
103
 
                f.close()
104
103
            last_sha1 = sha1(last_content)
105
104
            self.log("iteration %d: %r -> %r",
106
105
                     i, last_content, last_sha1)
107
106
            got_sha1 = hc.get_sha1('foo')
108
 
            self.assertEquals(got_sha1, last_sha1)
 
107
            self.assertEqual(got_sha1, last_sha1)
109
108
            hc.write()
110
109
            hc = self.reopen_hashcache()
111
110
 
127
126
    This lets us examine how old or new files would be handled, without
128
127
    actually having to wait for time to pass.
129
128
    """
 
129
 
130
130
    def __init__(self):
131
131
        # set root and cache file name to none to make sure we won't touch the
132
132
        # real filesystem
133
 
        HashCache.__init__(self, '.', 'hashcache')
 
133
        HashCache.__init__(self, u'.', 'hashcache')
134
134
        self._files = {}
135
135
        # simulated clock running forward as operations happen
136
136
        self._clock = 0
144
144
        return (len(entry[0]),
145
145
                entry[1], entry[1],
146
146
                10, 20,
147
 
                stat.S_IFREG | 0600)
 
147
                stat.S_IFREG | 0o600)
148
148
 
149
149
    def _really_sha1_file(self, abspath, filters):
150
150
        if abspath in self._files:
169
169
    def test_hashcache_miss_new_file(self):
170
170
        """A new file gives the right sha1 but misses"""
171
171
        hc = self.make_hashcache()
172
 
        hc.put_file('foo', 'hello')
173
 
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
174
 
        self.assertEquals(hc.miss_count, 1)
175
 
        self.assertEquals(hc.hit_count, 0)
 
172
        hc.put_file('foo', b'hello')
 
173
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
 
174
        self.assertEqual(hc.miss_count, 1)
 
175
        self.assertEqual(hc.hit_count, 0)
176
176
        # if we try again it's still too new;
177
 
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
178
 
        self.assertEquals(hc.miss_count, 2)
179
 
        self.assertEquals(hc.hit_count, 0)
 
177
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
 
178
        self.assertEqual(hc.miss_count, 2)
 
179
        self.assertEqual(hc.hit_count, 0)
180
180
 
181
181
    def test_hashcache_old_file(self):
182
182
        """An old file gives the right sha1 and hits"""
183
183
        hc = self.make_hashcache()
184
 
        hc.put_file('foo', 'hello')
 
184
        hc.put_file('foo', b'hello')
185
185
        hc.pretend_to_sleep(20)
186
186
        # file is new; should get the correct hash but miss
187
 
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
188
 
        self.assertEquals(hc.miss_count, 1)
189
 
        self.assertEquals(hc.hit_count, 0)
 
187
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
 
188
        self.assertEqual(hc.miss_count, 1)
 
189
        self.assertEqual(hc.hit_count, 0)
190
190
        # and can now be hit
191
 
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
192
 
        self.assertEquals(hc.miss_count, 1)
193
 
        self.assertEquals(hc.hit_count, 1)
 
191
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
 
192
        self.assertEqual(hc.miss_count, 1)
 
193
        self.assertEqual(hc.hit_count, 1)
194
194
        hc.pretend_to_sleep(3)
195
195
        # and again
196
 
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
197
 
        self.assertEquals(hc.miss_count, 1)
198
 
        self.assertEquals(hc.hit_count, 2)
 
196
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
 
197
        self.assertEqual(hc.miss_count, 1)
 
198
        self.assertEqual(hc.hit_count, 2)
199
199
 
200
200
    def test_hashcache_invalidates(self):
201
201
        hc = self.make_hashcache()
202
 
        hc.put_file('foo', 'hello')
 
202
        hc.put_file('foo', b'hello')
203
203
        hc.pretend_to_sleep(20)
204
204
        hc.get_sha1('foo')
205
 
        hc.put_file('foo', 'h1llo')
206
 
        self.assertEquals(hc.get_sha1('foo'), sha1('h1llo'))
207
 
        self.assertEquals(hc.miss_count, 2)
208
 
        self.assertEquals(hc.hit_count, 0)
 
205
        hc.put_file('foo', b'h1llo')
 
206
        self.assertEqual(hc.get_sha1('foo'), sha1(b'h1llo'))
 
207
        self.assertEqual(hc.miss_count, 2)
 
208
        self.assertEqual(hc.hit_count, 0)