/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to cache_manager.py

  • Committer: John Arbash Meinel
  • Date: 2009-11-13 08:00:16 UTC
  • mto: (0.64.267 trunk)
  • mto: This revision was merged to the branch mainline in revision 6631.
  • Revision ID: john@arbash-meinel.com-20091113080016-5c4de35jjqn8lsar
Dump sticky blobs to disk when memory pressure gets high.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""A manager of caches."""
18
18
 
 
19
import os
 
20
import shutil
 
21
import tempfile
 
22
import time
19
23
 
20
24
from bzrlib import lru_cache, trace
21
25
from bzrlib.plugins.fastimport import helpers
22
26
 
 
27
 
 
28
class _Cleanup(object):
 
29
    """This class makes sure we clean up when CacheManager goes away.
 
30
 
 
31
    We use a helper class to ensure that we are never in a refcycle.
 
32
    """
 
33
 
 
34
    def __init__(self, disk_blobs):
 
35
        self.disk_blobs = disk_blobs
 
36
        self.tempdir = None
 
37
        self.small_blobs = None
 
38
 
 
39
    def __del__(self):
 
40
        self.finalize()
 
41
 
 
42
    def finalize(self):
 
43
        if self.disk_blobs is not None:
 
44
            for info in self.disk_blobs.itervalues():
 
45
                info[-1].close()
 
46
            self.disk_blobs = None
 
47
        if self.small_blobs is not None:
 
48
            self.small_blobs.close()
 
49
            self.small_blobs = None
 
50
        if self.tempdir is not None:
 
51
            shutils.rmtree(self.tempdir)
 
52
        
 
53
 
23
54
class CacheManager(object):
 
55
    
 
56
    _small_blob_threshold = 100*1024
 
57
    _sticky_cache_size = 200*1024*1024
 
58
    _sticky_flushed_size = 100*1024*1024
24
59
 
25
60
    def __init__(self, info=None, verbose=False, inventory_cache_size=10):
26
61
        """Create a manager of caches.
31
66
        self.verbose = verbose
32
67
 
33
68
        # dataref -> data. datref is either :mark or the sha-1.
34
 
        # Sticky blobs aren't removed after being referenced.
 
69
        # Sticky blobs are referenced more than once, and are saved until their
 
70
        # refcount goes to 0
35
71
        self._blobs = {}
36
72
        self._sticky_blobs = {}
 
73
        self._sticky_memory_bytes = 0
 
74
        # if we overflow our memory cache, then we will dump large blobs to
 
75
        # disk in this directory
 
76
        self._tempdir = None
 
77
        # id => TemporaryFile
 
78
        self._disk_blobs = {}
 
79
        self._cleanup = _Cleanup(self._disk_blobs)
37
80
 
38
81
        # revision-id -> Inventory cache
39
82
        # these are large and we probably don't need too many as
108
151
        self.heads.clear()
109
152
        self.inventories.clear()
110
153
 
 
154
    def _flush_blobs_to_disk(self):
 
155
        blobs = self._sticky_blobs.keys()
 
156
        sticky_blobs = self._sticky_blobs
 
157
        blobs.sort(key=lambda k:len(sticky_blobs[k]))
 
158
        if self._tempdir is None:
 
159
            self._tempdir = tempfile.mkdtemp(prefix='bzr_fastimport_blobs-')
 
160
            self._cleanup.tempdir = self._tempdir
 
161
            self._cleanup.small_blobs = tempfile.TemporaryFile(
 
162
                prefix='small-blobs-')
 
163
        count = 0
 
164
        bytes = 0
 
165
        n_small_bytes = 0
 
166
        while self._sticky_memory_bytes > self._sticky_flushed_size:
 
167
            id = blobs.pop()
 
168
            blob = self._sticky_blobs.pop(id)
 
169
            n_bytes = len(blob)
 
170
            self._sticky_memory_bytes -= n_bytes
 
171
            if n_bytes < self._small_blob_threshold:
 
172
                f = self._cleanup.small_blobs
 
173
                f.seek(0, os.SEEK_END)
 
174
                self._disk_blobs[id] = (True, f.tell(), n_bytes, f)
 
175
                n_small_bytes += n_bytes
 
176
            else:
 
177
                f = tempfile.TemporaryFile(prefix='blob-', dir=self._tempdir)
 
178
                self._disk_blobs[id] = (False, 0, n_bytes, f)
 
179
            f.write(blob)
 
180
            bytes += n_bytes
 
181
            del blob
 
182
            count += 1
 
183
        trace.note('flushed %d blobs w/ %.1fMB (%.1fMB small) to disk'
 
184
                   % (count, bytes / 1024. / 1024,
 
185
                      n_small_bytes / 1024. / 1024))
 
186
        
 
187
 
111
188
    def store_blob(self, id, data):
112
189
        """Store a blob of data."""
113
190
        # Note: If we're not reference counting, everything has to be sticky
114
191
        if not self._blob_ref_counts or id in self._blob_ref_counts:
115
192
            self._sticky_blobs[id] = data
 
193
            self._sticky_memory_bytes += len(data)
 
194
            if self._sticky_memory_bytes > self._sticky_cache_size:
 
195
                self._flush_blobs_to_disk()
116
196
        elif data == '':
117
197
            # Empty data is always sticky
118
198
            self._sticky_blobs[id] = data
119
199
        else:
120
200
            self._blobs[id] = data
121
201
 
 
202
    def _decref(self, id, cache, f):
 
203
        if not self._blob_ref_counts:
 
204
            return
 
205
        count = self._blob_ref_counts.get(id, None)
 
206
        if count is not None:
 
207
            count -= 1
 
208
            if count <= 0:
 
209
                del cache[id]
 
210
                if f is not None:
 
211
                    f.close()
 
212
                del self._blob_ref_counts[id]
 
213
            else:
 
214
                self._blob_ref_counts[id] = count
 
215
 
122
216
    def fetch_blob(self, id):
123
217
        """Fetch a blob of data."""
124
 
        try:
125
 
            b = self._sticky_blobs[id]
126
 
            if self._blob_ref_counts and b != '':
127
 
                self._blob_ref_counts[id] -= 1
128
 
                if self._blob_ref_counts[id] == 0:
129
 
                    del self._sticky_blobs[id]
130
 
            return b
131
 
        except KeyError:
 
218
        if id in self._blobs:
132
219
            return self._blobs.pop(id)
 
220
        if id in self._disk_blobs:
 
221
            (is_small, offset, n_bytes, f) = self._disk_blobs[id]
 
222
            f.seek(offset)
 
223
            content = f.read(n_bytes)
 
224
            self._decref(id, self._disk_blobs, f)
 
225
            return content
 
226
        content = self._sticky_blobs[id]
 
227
        self._sticky_memory_bytes -= len(content)
 
228
        self._decref(id, self._sticky_blobs, None)
 
229
        return content
133
230
 
134
231
    def track_heads(self, cmd):
135
232
        """Track the repository heads given a CommitCommand.