/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_hashcache.py

  • Committer: Robert Collins
  • Date: 2010-05-05 00:05:29 UTC
  • mto: This revision was merged to the branch mainline in revision 5206.
  • Revision ID: robertc@robertcollins.net-20100505000529-ltmllyms5watqj5u
Make 'pydoc bzrlib.tests.build_tree_shape' useful.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2016 Canonical Ltd
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
import os
18
18
import stat
 
19
import sys
19
20
import time
20
21
 
21
 
from .. import osutils
22
 
from ..errors import BzrError
23
 
from ..hashcache import HashCache
24
 
from . import (
25
 
    TestCaseInTempDir,
26
 
    )
27
 
from .features import (
28
 
    OsFifoFeature,
29
 
    )
30
 
 
31
 
 
32
 
sha1 = osutils.sha_string
 
22
from bzrlib import osutils
 
23
from bzrlib.errors import BzrError
 
24
from bzrlib.hashcache import HashCache
 
25
from bzrlib.tests import OsFifoFeature, TestCaseInTempDir, TestCase
 
26
 
 
27
 
 
28
def sha1(t):
 
29
    return osutils.sha(t).hexdigest()
33
30
 
34
31
 
35
32
def pause():
42
39
    def make_hashcache(self):
43
40
        # make a dummy bzr directory just to hold the cache
44
41
        os.mkdir('.bzr')
45
 
        hc = HashCache(u'.', '.bzr/stat-cache')
 
42
        hc = HashCache('.', '.bzr/stat-cache')
46
43
        return hc
47
44
 
48
45
    def reopen_hashcache(self):
49
 
        hc = HashCache(u'.', '.bzr/stat-cache')
 
46
        hc = HashCache('.', '.bzr/stat-cache')
50
47
        hc.read()
51
48
        return hc
52
49
 
53
50
    def test_hashcache_initial_miss(self):
54
51
        """Get correct hash from an empty hashcache"""
55
52
        hc = self.make_hashcache()
56
 
        self.build_tree_contents([('foo', b'hello')])
57
 
        self.assertEqual(hc.get_sha1('foo'),
58
 
                         b'aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d')
59
 
        self.assertEqual(hc.miss_count, 1)
60
 
        self.assertEqual(hc.hit_count, 0)
 
53
        self.build_tree_contents([('foo', 'hello')])
 
54
        self.assertEquals(hc.get_sha1('foo'),
 
55
                          'aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d')
 
56
        self.assertEquals(hc.miss_count, 1)
 
57
        self.assertEquals(hc.hit_count, 0)
61
58
 
62
59
    def test_hashcache_new_file(self):
63
60
        hc = self.make_hashcache()
64
 
        self.build_tree_contents([('foo', b'goodbye')])
 
61
        self.build_tree_contents([('foo', 'goodbye')])
65
62
        # now read without pausing; it may not be possible to cache it as its
66
63
        # so new
67
 
        self.assertEqual(hc.get_sha1('foo'), sha1(b'goodbye'))
 
64
        self.assertEquals(hc.get_sha1('foo'), sha1('goodbye'))
68
65
 
69
66
    def test_hashcache_nonexistent_file(self):
70
67
        hc = self.make_hashcache()
71
 
        self.assertEqual(hc.get_sha1('no-name-yet'), None)
 
68
        self.assertEquals(hc.get_sha1('no-name-yet'), None)
72
69
 
73
70
    def test_hashcache_replaced_file(self):
74
71
        hc = self.make_hashcache()
75
 
        self.build_tree_contents([('foo', b'goodbye')])
76
 
        self.assertEqual(hc.get_sha1('foo'), sha1(b'goodbye'))
 
72
        self.build_tree_contents([('foo', 'goodbye')])
 
73
        self.assertEquals(hc.get_sha1('foo'), sha1('goodbye'))
77
74
        os.remove('foo')
78
 
        self.assertEqual(hc.get_sha1('foo'), None)
79
 
        self.build_tree_contents([('foo', b'new content')])
80
 
        self.assertEqual(hc.get_sha1('foo'), sha1(b'new content'))
 
75
        self.assertEquals(hc.get_sha1('foo'), None)
 
76
        self.build_tree_contents([('foo', 'new content')])
 
77
        self.assertEquals(hc.get_sha1('foo'), sha1('new content'))
81
78
 
82
79
    def test_hashcache_not_file(self):
83
80
        hc = self.make_hashcache()
84
81
        self.build_tree(['subdir/'])
85
 
        self.assertEqual(hc.get_sha1('subdir'), None)
 
82
        self.assertEquals(hc.get_sha1('subdir'), None)
86
83
 
87
84
    def test_hashcache_load(self):
88
85
        hc = self.make_hashcache()
89
 
        self.build_tree_contents([('foo', b'contents')])
 
86
        self.build_tree_contents([('foo', 'contents')])
90
87
        pause()
91
 
        self.assertEqual(hc.get_sha1('foo'), sha1(b'contents'))
 
88
        self.assertEquals(hc.get_sha1('foo'), sha1('contents'))
92
89
        hc.write()
93
90
        hc = self.reopen_hashcache()
94
 
        self.assertEqual(hc.get_sha1('foo'), sha1(b'contents'))
95
 
        self.assertEqual(hc.hit_count, 1)
 
91
        self.assertEquals(hc.get_sha1('foo'), sha1('contents'))
 
92
        self.assertEquals(hc.hit_count, 1)
96
93
 
97
94
    def test_hammer_hashcache(self):
98
95
        hc = self.make_hashcache()
99
 
        for i in range(10000):
100
 
            with open('foo', 'wb') as f:
101
 
                last_content = b'%08x' % i
 
96
        for i in xrange(10000):
 
97
            self.log('start writing at %s', time.time())
 
98
            f = file('foo', 'w')
 
99
            try:
 
100
                last_content = '%08x' % i
102
101
                f.write(last_content)
 
102
            finally:
 
103
                f.close()
103
104
            last_sha1 = sha1(last_content)
104
105
            self.log("iteration %d: %r -> %r",
105
106
                     i, last_content, last_sha1)
106
107
            got_sha1 = hc.get_sha1('foo')
107
 
            self.assertEqual(got_sha1, last_sha1)
 
108
            self.assertEquals(got_sha1, last_sha1)
108
109
            hc.write()
109
110
            hc = self.reopen_hashcache()
110
111
 
126
127
    This lets us examine how old or new files would be handled, without
127
128
    actually having to wait for time to pass.
128
129
    """
129
 
 
130
130
    def __init__(self):
131
131
        # set root and cache file name to none to make sure we won't touch the
132
132
        # real filesystem
133
 
        HashCache.__init__(self, u'.', 'hashcache')
 
133
        HashCache.__init__(self, '.', 'hashcache')
134
134
        self._files = {}
135
135
        # simulated clock running forward as operations happen
136
136
        self._clock = 0
144
144
        return (len(entry[0]),
145
145
                entry[1], entry[1],
146
146
                10, 20,
147
 
                stat.S_IFREG | 0o600)
 
147
                stat.S_IFREG | 0600)
148
148
 
149
149
    def _really_sha1_file(self, abspath, filters):
150
150
        if abspath in self._files:
169
169
    def test_hashcache_miss_new_file(self):
170
170
        """A new file gives the right sha1 but misses"""
171
171
        hc = self.make_hashcache()
172
 
        hc.put_file('foo', b'hello')
173
 
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
174
 
        self.assertEqual(hc.miss_count, 1)
175
 
        self.assertEqual(hc.hit_count, 0)
 
172
        hc.put_file('foo', 'hello')
 
173
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
 
174
        self.assertEquals(hc.miss_count, 1)
 
175
        self.assertEquals(hc.hit_count, 0)
176
176
        # if we try again it's still too new;
177
 
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
178
 
        self.assertEqual(hc.miss_count, 2)
179
 
        self.assertEqual(hc.hit_count, 0)
 
177
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
 
178
        self.assertEquals(hc.miss_count, 2)
 
179
        self.assertEquals(hc.hit_count, 0)
180
180
 
181
181
    def test_hashcache_old_file(self):
182
182
        """An old file gives the right sha1 and hits"""
183
183
        hc = self.make_hashcache()
184
 
        hc.put_file('foo', b'hello')
 
184
        hc.put_file('foo', 'hello')
185
185
        hc.pretend_to_sleep(20)
186
186
        # file is new; should get the correct hash but miss
187
 
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
188
 
        self.assertEqual(hc.miss_count, 1)
189
 
        self.assertEqual(hc.hit_count, 0)
 
187
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
 
188
        self.assertEquals(hc.miss_count, 1)
 
189
        self.assertEquals(hc.hit_count, 0)
190
190
        # and can now be hit
191
 
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
192
 
        self.assertEqual(hc.miss_count, 1)
193
 
        self.assertEqual(hc.hit_count, 1)
 
191
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
 
192
        self.assertEquals(hc.miss_count, 1)
 
193
        self.assertEquals(hc.hit_count, 1)
194
194
        hc.pretend_to_sleep(3)
195
195
        # and again
196
 
        self.assertEqual(hc.get_sha1('foo'), sha1(b'hello'))
197
 
        self.assertEqual(hc.miss_count, 1)
198
 
        self.assertEqual(hc.hit_count, 2)
 
196
        self.assertEquals(hc.get_sha1('foo'), sha1('hello'))
 
197
        self.assertEquals(hc.miss_count, 1)
 
198
        self.assertEquals(hc.hit_count, 2)
199
199
 
200
200
    def test_hashcache_invalidates(self):
201
201
        hc = self.make_hashcache()
202
 
        hc.put_file('foo', b'hello')
 
202
        hc.put_file('foo', 'hello')
203
203
        hc.pretend_to_sleep(20)
204
204
        hc.get_sha1('foo')
205
 
        hc.put_file('foo', b'h1llo')
206
 
        self.assertEqual(hc.get_sha1('foo'), sha1(b'h1llo'))
207
 
        self.assertEqual(hc.miss_count, 2)
208
 
        self.assertEqual(hc.hit_count, 0)
 
205
        hc.put_file('foo', 'h1llo')
 
206
        self.assertEquals(hc.get_sha1('foo'), sha1('h1llo'))
 
207
        self.assertEquals(hc.miss_count, 2)
 
208
        self.assertEquals(hc.hit_count, 0)