/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_hashcache.py

  • Committer: Jelmer Vernooij
  • Date: 2018-07-08 14:45:27 UTC
  • mto: This revision was merged to the branch mainline in revision 7036.
  • Revision ID: jelmer@jelmer.uk-20180708144527-codhlvdcdg9y0nji
Fix a bunch of merge tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
import os
18
18
import stat
19
 
import sys
20
19
import time
21
20
 
22
 
from bzrlib import osutils
23
 
from bzrlib.errors import BzrError
24
 
from bzrlib.hashcache import HashCache
25
 
from bzrlib.tests import OsFifoFeature, TestCaseInTempDir, TestCase
26
 
 
27
 
 
28
 
def sha1(t):
29
 
    return osutils.sha(t).hexdigest()
 
21
from .. import osutils
 
22
from ..errors import BzrError
 
23
from ..hashcache import HashCache
 
24
from . import (
 
25
    TestCaseInTempDir,
 
26
    )
 
27
from .features import (
 
28
    OsFifoFeature,
 
29
    )
 
30
 
 
31
 
 
32
sha1 = osutils.sha_string
30
33
 
31
34
 
32
35
def pause():
39
42
    def make_hashcache(self):
40
43
        # make a dummy bzr directory just to hold the cache
41
44
        os.mkdir('.bzr')
42
 
        hc = HashCache('.', '.bzr/stat-cache')
 
45
        hc = HashCache(u'.', '.bzr/stat-cache')
43
46
        return hc
44
47
 
45
48
    def reopen_hashcache(self):
46
 
        hc = HashCache('.', '.bzr/stat-cache')
 
49
        hc = HashCache(u'.', '.bzr/stat-cache')
47
50
        hc.read()
48
51
        return hc
49
52
 
50
53
    def test_hashcache_initial_miss(self):
51
54
        """Get correct hash from an empty hashcache"""
52
55
        hc = self.make_hashcache()
53
 
        self.build_tree_contents([('foo', 'hello')])
54
 
        self.assertEquals(hc.get_sha1('foo'),
55
 
                          'aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d')
56
 
        self.assertEquals(hc.miss_count, 1)
57
 
        self.assertEquals(hc.hit_count, 0)
 
56
        self.build_tree_contents([('foo', b'hello')])
 
57
        self.assertEqual(hc.get_sha1('foo'),
 
58
                          b'aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d')
 
59
        self.assertEqual(hc.miss_count, 1)
 
60
        self.assertEqual(hc.hit_count, 0)
58
61
 
59
62
    def test_hashcache_new_file(self):
60
63
        hc = self.make_hashcache()
61
 
        self.build_tree_contents([('foo', 'goodbye')])
 
64
        self.build_tree_contents([('foo', b'goodbye')])
62
65
        # now read without pausing; it may not be possible to cache it as its
63
66
        # so new
64
 
        self.assertEquals(hc.get_sha1('foo'), sha1('goodbye'))
 
67
        self.assertEqual(hc.get_sha1('foo'), sha1(b'goodbye'))
65
68
 
66
69
    def test_hashcache_nonexistent_file(self):
67
70
        hc = self.make_hashcache()
68
 
        self.assertEquals(hc.get_sha1('no-name-yet'), None)
 
71
        self.assertEqual(hc.get_sha1('no-name-yet'), None)
69
72
 
70
73
    def test_hashcache_replaced_file(self):
71
74
        hc = self.make_hashcache()
72
 
        self.build_tree_contents([('foo', 'goodbye')])
73
 
        self.assertEquals(hc.get_sha1('foo'), sha1('goodbye'))
 
75
        self.build_tree_contents([('foo', b'goodbye')])
 
76
        self.assertEqual(hc.get_sha1('foo'), sha1(b'goodbye'))
74
77
        os.remove('foo')
75
 
        self.assertEquals(hc.get_sha1('foo'), None)
76
 
        self.build_tree_contents([('foo', 'new content')])
77
 
        self.assertEquals(hc.get_sha1('foo'), sha1('new content'))
 
78
        self.assertEqual(hc.get_sha1('foo'), None)
 
79
        self.build_tree_contents([('foo', b'new content')])
 
80
        self.assertEqual(hc.get_sha1('foo'), sha1(b'new content'))
78
81
 
79
82
    def test_hashcache_not_file(self):
80
83
        hc = self.make_hashcache()
81
84
        self.build_tree(['subdir/'])
82
 
        self.assertEquals(hc.get_sha1('subdir'), None)
 
85
        self.assertEqual(hc.get_sha1('subdir'), None)
83
86
 
84
87
    def test_hashcache_load(self):
85
88
        hc = self.make_hashcache()
86
 
        self.build_tree_contents([('foo', 'contents')])
 
89
        self.build_tree_contents([('foo', b'contents')])
87
90
        pause()
88
 
        self.assertEquals(hc.get_sha1('foo'), sha1('contents'))
 
91
        self.assertEqual(hc.get_sha1('foo'), sha1(b'contents'))
89
92
        hc.write()
90
93
        hc = self.reopen_hashcache()
91
 
        self.assertEquals(hc.get_sha1('foo'), sha1('contents'))
92
 
        self.assertEquals(hc.hit_count, 1)
 
94
        self.assertEqual(hc.get_sha1('foo'), sha1(b'contents'))
 
95
        self.assertEqual(hc.hit_count, 1)
93
96
 
94
97
    def test_hammer_hashcache(self):
95
98
        hc = self.make_hashcache()
96
 
        for i in xrange(10000):
97
 
            self.log('start writing at %s', time.time())
98
 
            f = file('foo', 'w')
99
 
            try:
100
 
                last_content = '%08x' % i
 
99
        for i in range(10000):
 
100
            with open('foo', 'wb') as f:
 
101
                last_content = b'%08x' % i
101
102
                f.write(last_content)
102
 
            finally:
103
 
                f.close()
104
103
            last_sha1 = sha1(last_content)
105
104
            self.log("iteration %d: %r -> %r",
106
105
                     i, last_content, last_sha1)
107
106
            got_sha1 = hc.get_sha1('foo')
108
 
            self.assertEquals(got_sha1, last_sha1)
 
107
            self.assertEqual(got_sha1, last_sha1)
109
108
            hc.write()
110
109
            hc = self.reopen_hashcache()
111
110
 
130
129
    def __init__(self):
131
130
        # set root and cache file name to none to make sure we won't touch the
132
131
        # real filesystem
133
 
        HashCache.__init__(self, '.', 'hashcache')
 
132
        HashCache.__init__(self, u'.', 'hashcache')
134
133
        self._files = {}
135
134
        # simulated clock running forward as operations happen
136
135
        self._clock = 0
144
143
        return (len(entry[0]),
145
144
                entry[1], entry[1],
146
145
                10, 20,
147
 
                stat.S_IFREG | 0600)
 
146
                stat.S_IFREG | 0o600)
148
147
 
149
148
    def _really_sha1_file(self, abspath, filters):
150
149
        if abspath in self._files:
169
168
    def test_hashcache_miss_new_file(self):
170
169
        """A new file gives the right sha1 but misses"""
171
170
        hc = self.make_hashcache()
172
 
        hc.put_file('foo', 'hello')
173
 
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
174
 
        self.assertEquals(hc.miss_count, 1)
175
 
        self.assertEquals(hc.hit_count, 0)
 
171
        hc.put_file('foo', b'hello')
 
172
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
 
173
        self.assertEqual(hc.miss_count, 1)
 
174
        self.assertEqual(hc.hit_count, 0)
176
175
        # if we try again it's still too new;
177
 
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
178
 
        self.assertEquals(hc.miss_count, 2)
179
 
        self.assertEquals(hc.hit_count, 0)
 
176
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
 
177
        self.assertEqual(hc.miss_count, 2)
 
178
        self.assertEqual(hc.hit_count, 0)
180
179
 
181
180
    def test_hashcache_old_file(self):
182
181
        """An old file gives the right sha1 and hits"""
183
182
        hc = self.make_hashcache()
184
 
        hc.put_file('foo', 'hello')
 
183
        hc.put_file('foo', b'hello')
185
184
        hc.pretend_to_sleep(20)
186
185
        # file is new; should get the correct hash but miss
187
 
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
188
 
        self.assertEquals(hc.miss_count, 1)
189
 
        self.assertEquals(hc.hit_count, 0)
 
186
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
 
187
        self.assertEqual(hc.miss_count, 1)
 
188
        self.assertEqual(hc.hit_count, 0)
190
189
        # and can now be hit
191
 
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
192
 
        self.assertEquals(hc.miss_count, 1)
193
 
        self.assertEquals(hc.hit_count, 1)
 
190
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
 
191
        self.assertEqual(hc.miss_count, 1)
 
192
        self.assertEqual(hc.hit_count, 1)
194
193
        hc.pretend_to_sleep(3)
195
194
        # and again
196
 
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
197
 
        self.assertEquals(hc.miss_count, 1)
198
 
        self.assertEquals(hc.hit_count, 2)
 
195
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
 
196
        self.assertEqual(hc.miss_count, 1)
 
197
        self.assertEqual(hc.hit_count, 2)
199
198
 
200
199
    def test_hashcache_invalidates(self):
201
200
        hc = self.make_hashcache()
202
 
        hc.put_file('foo', 'hello')
 
201
        hc.put_file('foo', b'hello')
203
202
        hc.pretend_to_sleep(20)
204
203
        hc.get_sha1('foo')
205
 
        hc.put_file('foo', 'h1llo')
206
 
        self.assertEquals(hc.get_sha1('foo'), sha1('h1llo'))
207
 
        self.assertEquals(hc.miss_count, 2)
208
 
        self.assertEquals(hc.hit_count, 0)
 
204
        hc.put_file('foo', b'h1llo')
 
205
        self.assertEqual(hc.get_sha1('foo'), sha1(b'h1llo'))
 
206
        self.assertEqual(hc.miss_count, 2)
 
207
        self.assertEqual(hc.hit_count, 0)