1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from bzrlib.lazy_import import lazy_import
18
lazy_import(globals(), """
19
from itertools import izip
29
from bzrlib.graph import Graph
30
from bzrlib.index import (
35
GraphIndexPrefixAdapter,
37
from bzrlib.knit import KnitGraphIndex, _PackAccess, _KnitData
38
from bzrlib.osutils import rand_chars
39
from bzrlib.pack import ContainerWriter
40
from bzrlib.store import revision
56
from bzrlib.decorators import needs_read_lock, needs_write_lock
57
from bzrlib.repofmt.knitrepo import KnitRepository
58
from bzrlib.repository import (
61
MetaDirRepositoryFormat,
64
import bzrlib.revision as _mod_revision
65
from bzrlib.store.revision.knit import KnitRevisionStore
66
from bzrlib.store.versioned import VersionedFileStore
67
from bzrlib.trace import mutter, note, warning
70
class PackCommitBuilder(CommitBuilder):
71
"""A subclass of CommitBuilder to add texts with pack semantics.
73
Specifically this uses one knit object rather than one knit object per
74
added text, reducing memory and object pressure.
77
def __init__(self, repository, parents, config, timestamp=None,
78
timezone=None, committer=None, revprops=None,
80
CommitBuilder.__init__(self, repository, parents, config,
81
timestamp=timestamp, timezone=timezone, committer=committer,
82
revprops=revprops, revision_id=revision_id)
83
self._file_graph = Graph(
84
repository._pack_collection.text_index.combined_index)
86
def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
87
return self.repository._pack_collection._add_text_to_weave(file_id,
88
self._new_revision_id, new_lines, parents, nostore_sha,
91
def _heads(self, file_id, revision_ids):
92
keys = [(file_id, revision_id) for revision_id in revision_ids]
93
return set([key[1] for key in self._file_graph.heads(keys)])
96
class PackRootCommitBuilder(RootCommitBuilder):
97
"""A subclass of RootCommitBuilder to add texts with pack semantics.
99
Specifically this uses one knit object rather than one knit object per
100
added text, reducing memory and object pressure.
103
def __init__(self, repository, parents, config, timestamp=None,
104
timezone=None, committer=None, revprops=None,
106
CommitBuilder.__init__(self, repository, parents, config,
107
timestamp=timestamp, timezone=timezone, committer=committer,
108
revprops=revprops, revision_id=revision_id)
109
self._file_graph = Graph(
110
repository._pack_collection.text_index.combined_index)
112
def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
113
return self.repository._pack_collection._add_text_to_weave(file_id,
114
self._new_revision_id, new_lines, parents, nostore_sha,
117
def _heads(self, file_id, revision_ids):
118
keys = [(file_id, revision_id) for revision_id in revision_ids]
119
return set([key[1] for key in self._file_graph.heads(keys)])
123
"""An in memory proxy for a pack and its indices.
125
This is a base class that is not directly used, instead the classes
126
ExistingPack and NewPack are used.
129
def __init__(self, revision_index, inventory_index, text_index,
131
"""Create a pack instance.
133
:param revision_index: A GraphIndex for determining what revisions are
134
present in the Pack and accessing the locations of their texts.
135
:param inventory_index: A GraphIndex for determining what inventories are
136
present in the Pack and accessing the locations of their
138
:param text_index: A GraphIndex for determining what file texts
139
are present in the pack and accessing the locations of their
140
texts/deltas (via (fileid, revisionid) tuples).
141
:param revision_index: A GraphIndex for determining what signatures are
142
present in the Pack and accessing the locations of their texts.
144
self.revision_index = revision_index
145
self.inventory_index = inventory_index
146
self.text_index = text_index
147
self.signature_index = signature_index
149
def access_tuple(self):
150
"""Return a tuple (transport, name) for the pack content."""
151
return self.pack_transport, self.file_name()
154
"""Get the file name for the pack on disk."""
155
return self.name + '.pack'
157
def get_revision_count(self):
158
return self.revision_index.key_count()
160
def inventory_index_name(self, name):
161
"""The inv index is the name + .iix."""
162
return self.index_name('inventory', name)
164
def revision_index_name(self, name):
165
"""The revision index is the name + .rix."""
166
return self.index_name('revision', name)
168
def signature_index_name(self, name):
169
"""The signature index is the name + .six."""
170
return self.index_name('signature', name)
172
def text_index_name(self, name):
173
"""The text index is the name + .tix."""
174
return self.index_name('text', name)
176
def _external_compression_parents_of_texts(self):
179
for node in self.text_index.iter_all_entries():
181
refs.update(node[3][1])
185
class ExistingPack(Pack):
186
"""An in memory proxy for an existing .pack and its disk indices."""
188
def __init__(self, pack_transport, name, revision_index, inventory_index,
189
text_index, signature_index):
190
"""Create an ExistingPack object.
192
:param pack_transport: The transport where the pack file resides.
193
:param name: The name of the pack on disk in the pack_transport.
195
Pack.__init__(self, revision_index, inventory_index, text_index,
198
self.pack_transport = pack_transport
199
assert None not in (revision_index, inventory_index, text_index,
200
signature_index, name, pack_transport)
202
def __eq__(self, other):
203
return self.__dict__ == other.__dict__
205
def __ne__(self, other):
206
return not self.__eq__(other)
209
return "<bzrlib.repofmt.pack_repo.Pack object at 0x%x, %s, %s" % (
210
id(self), self.transport, self.name)
214
"""An in memory proxy for a pack which is being created."""
216
# A map of index 'type' to the file extension and position in the
218
index_definitions = {
219
'revision': ('.rix', 0),
220
'inventory': ('.iix', 1),
222
'signature': ('.six', 3),
225
def __init__(self, upload_transport, index_transport, pack_transport,
226
upload_suffix='', file_mode=None):
227
"""Create a NewPack instance.
229
:param upload_transport: A writable transport for the pack to be
230
incrementally uploaded to.
231
:param index_transport: A writable transport for the pack's indices to
232
be written to when the pack is finished.
233
:param pack_transport: A writable transport for the pack to be renamed
234
to when the upload is complete. This *must* be the same as
235
upload_transport.clone('../packs').
236
:param upload_suffix: An optional suffix to be given to any temporary
237
files created during the pack creation. e.g '.autopack'
238
:param file_mode: An optional file mode to create the new files with.
240
# The relative locations of the packs are constrained, but all are
241
# passed in because the caller has them, so as to avoid object churn.
243
# Revisions: parents list, no text compression.
244
InMemoryGraphIndex(reference_lists=1),
245
# Inventory: We want to map compression only, but currently the
246
# knit code hasn't been updated enough to understand that, so we
247
# have a regular 2-list index giving parents and compression
249
InMemoryGraphIndex(reference_lists=2),
250
# Texts: compression and per file graph, for all fileids - so two
251
# reference lists and two elements in the key tuple.
252
InMemoryGraphIndex(reference_lists=2, key_elements=2),
253
# Signatures: Just blobs to store, no compression, no parents
255
InMemoryGraphIndex(reference_lists=0),
257
# where should the new pack be opened
258
self.upload_transport = upload_transport
259
# where are indices written out to
260
self.index_transport = index_transport
261
# where is the pack renamed to when it is finished?
262
self.pack_transport = pack_transport
263
# What file mode to upload the pack and indices with.
264
self._file_mode = file_mode
265
# tracks the content written to the .pack file.
266
self._hash = md5.new()
267
# a four-tuple with the length in bytes of the indices, once the pack
268
# is finalised. (rev, inv, text, sigs)
269
self.index_sizes = None
270
# How much data to cache when writing packs. Note that this is not
271
# synchronised with reads, because it's not in the transport layer, so
272
# is not safe unless the client knows it won't be reading from the pack
274
self._cache_limit = 0
275
# the temporary pack file name.
276
self.random_name = rand_chars(20) + upload_suffix
277
# when was this pack started ?
278
self.start_time = time.time()
279
# open an output stream for the data added to the pack.
280
self.write_stream = self.upload_transport.open_write_stream(
281
self.random_name, mode=self._file_mode)
282
if 'pack' in debug.debug_flags:
283
mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
284
time.ctime(), self.upload_transport.base, self.random_name,
285
time.time() - self.start_time)
286
# A list of byte sequences to be written to the new pack, and the
287
# aggregate size of them. Stored as a list rather than separate
288
# variables so that the _write_data closure below can update them.
289
self._buffer = [[], 0]
290
# create a callable for adding data
292
# robertc says- this is a closure rather than a method on the object
293
# so that the variables are locals, and faster than accessing object
295
def _write_data(bytes, flush=False, _buffer=self._buffer,
296
_write=self.write_stream.write, _update=self._hash.update):
297
_buffer[0].append(bytes)
298
_buffer[1] += len(bytes)
300
if _buffer[1] > self._cache_limit or flush:
301
bytes = ''.join(_buffer[0])
305
# expose this on self, for the occasion when clients want to add data.
306
self._write_data = _write_data
307
# a pack writer object to serialise pack records.
308
self._writer = pack.ContainerWriter(self._write_data)
310
# what state is the pack in? (open, finished, aborted)
314
"""Cancel creating this pack."""
315
self._state = 'aborted'
316
self.write_stream.close()
317
# Remove the temporary pack file.
318
self.upload_transport.delete(self.random_name)
319
# The indices have no state on disk.
321
def access_tuple(self):
322
"""Return a tuple (transport, name) for the pack content."""
323
assert self._state in ('open', 'finished')
324
if self._state == 'finished':
325
return Pack.access_tuple(self)
327
return self.upload_transport, self.random_name
329
def data_inserted(self):
330
"""True if data has been added to this pack."""
331
return bool(self.get_revision_count() or
332
self.inventory_index.key_count() or
333
self.text_index.key_count() or
334
self.signature_index.key_count())
337
"""Finish the new pack.
340
- finalises the content
341
- assigns a name (the md5 of the content, currently)
342
- writes out the associated indices
343
- renames the pack into place.
344
- stores the index size tuple for the pack in the index_sizes
349
self._write_data('', flush=True)
350
self.name = self._hash.hexdigest()
352
# XXX: It'd be better to write them all to temporary names, then
353
# rename them all into place, so that the window when only some are
354
# visible is smaller. On the other hand none will be seen until
355
# they're in the names list.
356
self.index_sizes = [None, None, None, None]
357
self._write_index('revision', self.revision_index, 'revision')
358
self._write_index('inventory', self.inventory_index, 'inventory')
359
self._write_index('text', self.text_index, 'file texts')
360
self._write_index('signature', self.signature_index,
361
'revision signatures')
362
self.write_stream.close()
363
# Note that this will clobber an existing pack with the same name,
364
# without checking for hash collisions. While this is undesirable this
365
# is something that can be rectified in a subsequent release. One way
366
# to rectify it may be to leave the pack at the original name, writing
367
# its pack-names entry as something like 'HASH: index-sizes
368
# temporary-name'. Allocate that and check for collisions, if it is
369
# collision free then rename it into place. If clients know this scheme
370
# they can handle missing-file errors by:
371
# - try for HASH.pack
372
# - try for temporary-name
373
# - refresh the pack-list to see if the pack is now absent
374
self.upload_transport.rename(self.random_name,
375
'../packs/' + self.name + '.pack')
376
self._state = 'finished'
377
if 'pack' in debug.debug_flags:
378
# XXX: size might be interesting?
379
mutter('%s: create_pack: pack renamed into place: %s%s->%s%s t+%6.3fs',
380
time.ctime(), self.upload_transport.base, self.random_name,
381
self.pack_transport, self.name,
382
time.time() - self.start_time)
385
"""Flush any current data."""
387
bytes = ''.join(self._buffer[0])
388
self.write_stream.write(bytes)
389
self._hash.update(bytes)
390
self._buffer[:] = [[], 0]
392
def index_name(self, index_type, name):
393
"""Get the disk name of an index type for pack name 'name'."""
394
return name + NewPack.index_definitions[index_type][0]
396
def index_offset(self, index_type):
397
"""Get the position in a index_size array for a given index type."""
398
return NewPack.index_definitions[index_type][1]
400
def _replace_index_with_readonly(self, index_type):
401
setattr(self, index_type + '_index',
402
GraphIndex(self.index_transport,
403
self.index_name(index_type, self.name),
404
self.index_sizes[self.index_offset(index_type)]))
406
def set_write_cache_size(self, size):
407
self._cache_limit = size
409
def _write_index(self, index_type, index, label):
410
"""Write out an index.
412
:param index_type: The type of index to write - e.g. 'revision'.
413
:param index: The index object to serialise.
414
:param label: What label to give the index e.g. 'revision'.
416
index_name = self.index_name(index_type, self.name)
417
self.index_sizes[self.index_offset(index_type)] = \
418
self.index_transport.put_file(index_name, index.finish(),
419
mode=self._file_mode)
420
if 'pack' in debug.debug_flags:
421
# XXX: size might be interesting?
422
mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
423
time.ctime(), label, self.upload_transport.base,
424
self.random_name, time.time() - self.start_time)
425
# Replace the writable index on this object with a readonly,
426
# presently unloaded index. We should alter
427
# the index layer to make its finish() error if add_node is
428
# subsequently used. RBC
429
self._replace_index_with_readonly(index_type)
432
class AggregateIndex(object):
433
"""An aggregated index for the RepositoryPackCollection.
435
AggregateIndex is reponsible for managing the PackAccess object,
436
Index-To-Pack mapping, and all indices list for a specific type of index
437
such as 'revision index'.
439
A CombinedIndex provides an index on a single key space built up
440
from several on-disk indices. The AggregateIndex builds on this
441
to provide a knit access layer, and allows having up to one writable
442
index within the collection.
444
# XXX: Probably 'can be written to' could/should be separated from 'acts
445
# like a knit index' -- mbp 20071024
448
"""Create an AggregateIndex."""
449
self.index_to_pack = {}
450
self.combined_index = CombinedGraphIndex([])
451
self.knit_access = _PackAccess(self.index_to_pack)
453
def replace_indices(self, index_to_pack, indices):
454
"""Replace the current mappings with fresh ones.
456
This should probably not be used eventually, rather incremental add and
457
removal of indices. It has been added during refactoring of existing
460
:param index_to_pack: A mapping from index objects to
461
(transport, name) tuples for the pack file data.
462
:param indices: A list of indices.
464
# refresh the revision pack map dict without replacing the instance.
465
self.index_to_pack.clear()
466
self.index_to_pack.update(index_to_pack)
467
# XXX: API break - clearly a 'replace' method would be good?
468
self.combined_index._indices[:] = indices
469
# the current add nodes callback for the current writable index if
471
self.add_callback = None
473
def add_index(self, index, pack):
474
"""Add index to the aggregate, which is an index for Pack pack.
476
Future searches on the aggregate index will seach this new index
477
before all previously inserted indices.
479
:param index: An Index for the pack.
480
:param pack: A Pack instance.
482
# expose it to the index map
483
self.index_to_pack[index] = pack.access_tuple()
484
# put it at the front of the linear index list
485
self.combined_index.insert_index(0, index)
487
def add_writable_index(self, index, pack):
488
"""Add an index which is able to have data added to it.
490
There can be at most one writable index at any time. Any
491
modifications made to the knit are put into this index.
493
:param index: An index from the pack parameter.
494
:param pack: A Pack instance.
496
assert self.add_callback is None, \
497
"%s already has a writable index through %s" % \
498
(self, self.add_callback)
499
# allow writing: queue writes to a new index
500
self.add_index(index, pack)
501
# Updates the index to packs mapping as a side effect,
502
self.knit_access.set_writer(pack._writer, index, pack.access_tuple())
503
self.add_callback = index.add_nodes
506
"""Reset all the aggregate data to nothing."""
507
self.knit_access.set_writer(None, None, (None, None))
508
self.index_to_pack.clear()
509
del self.combined_index._indices[:]
510
self.add_callback = None
512
def remove_index(self, index, pack):
513
"""Remove index from the indices used to answer queries.
515
:param index: An index from the pack parameter.
516
:param pack: A Pack instance.
518
del self.index_to_pack[index]
519
self.combined_index._indices.remove(index)
520
if (self.add_callback is not None and
521
getattr(index, 'add_nodes', None) == self.add_callback):
522
self.add_callback = None
523
self.knit_access.set_writer(None, None, (None, None))
526
class Packer(object):
527
"""Create a pack from packs."""
529
def __init__(self, pack_collection, packs, suffix, revision_ids=None):
532
:param pack_collection: A RepositoryPackCollection object where the
533
new pack is being written to.
534
:param packs: The packs to combine.
535
:param suffix: The suffix to use on the temporary files for the pack.
536
:param revision_ids: Revision ids to limit the pack to.
540
self.revision_ids = revision_ids
541
# The pack object we are creating.
543
self._pack_collection = pack_collection
544
# The index layer keys for the revisions being copied. None for 'all
546
self._revision_keys = None
547
# What text keys to copy. None for 'all texts'. This is set by
548
# _copy_inventory_texts
549
self._text_filter = None
552
def _extra_init(self):
553
"""A template hook to allow extending the constructor trivially."""
555
def pack(self, pb=None):
556
"""Create a new pack by reading data from other packs.
558
This does little more than a bulk copy of data. One key difference
559
is that data with the same item key across multiple packs is elided
560
from the output. The new pack is written into the current pack store
561
along with its indices, and the name added to the pack names. The
562
source packs are not altered and are not required to be in the current
565
:param pb: An optional progress bar to use. A nested bar is created if
567
:return: A Pack object, or None if nothing was copied.
569
# open a pack - using the same name as the last temporary file
570
# - which has already been flushed, so its safe.
571
# XXX: - duplicate code warning with start_write_group; fix before
572
# considering 'done'.
573
if self._pack_collection._new_pack is not None:
574
raise errors.BzrError('call to create_pack_from_packs while '
575
'another pack is being written.')
576
if self.revision_ids is not None:
577
if len(self.revision_ids) == 0:
578
# silly fetch request.
581
self.revision_ids = frozenset(self.revision_ids)
583
self.pb = ui.ui_factory.nested_progress_bar()
587
return self._create_pack_from_packs()
593
"""Open a pack for the pack we are creating."""
594
return NewPack(self._pack_collection._upload_transport,
595
self._pack_collection._index_transport,
596
self._pack_collection._pack_transport, upload_suffix=self.suffix,
597
file_mode=self._pack_collection.repo.control_files._file_mode)
599
def _copy_revision_texts(self):
600
"""Copy revision data to the new pack."""
602
if self.revision_ids:
603
revision_keys = [(revision_id,) for revision_id in self.revision_ids]
606
# select revision keys
607
revision_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
608
self.packs, 'revision_index')[0]
609
revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
610
# copy revision keys and adjust values
611
self.pb.update("Copying revision texts", 1)
612
list(self._copy_nodes_graph(revision_nodes, revision_index_map,
613
self.new_pack._writer, self.new_pack.revision_index))
614
if 'pack' in debug.debug_flags:
615
mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
616
time.ctime(), self._pack_collection._upload_transport.base,
617
self.new_pack.random_name,
618
self.new_pack.revision_index.key_count(),
619
time.time() - self.new_pack.start_time)
620
self._revision_keys = revision_keys
622
def _copy_inventory_texts(self):
623
"""Copy the inventory texts to the new pack.
625
self._revision_keys is used to determine what inventories to copy.
627
Sets self._text_filter appropriately.
629
# select inventory keys
630
inv_keys = self._revision_keys # currently the same keyspace, and note that
631
# querying for keys here could introduce a bug where an inventory item
632
# is missed, so do not change it to query separately without cross
633
# checking like the text key check below.
634
inventory_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
635
self.packs, 'inventory_index')[0]
636
inv_nodes = self._pack_collection._index_contents(inventory_index_map, inv_keys)
637
# copy inventory keys and adjust values
638
# XXX: Should be a helper function to allow different inv representation
640
self.pb.update("Copying inventory texts", 2)
641
inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
642
self.new_pack._writer, self.new_pack.inventory_index, output_lines=True)
643
if self.revision_ids:
644
self._process_inventory_lines(inv_lines)
646
# eat the iterator to cause it to execute.
648
self._text_filter = None
649
if 'pack' in debug.debug_flags:
650
mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
651
time.ctime(), self._pack_collection._upload_transport.base,
652
self.new_pack.random_name,
653
self.new_pack.inventory_index.key_count(),
654
time.time() - new_pack.start_time)
656
def _copy_text_texts(self):
658
text_index_map, text_nodes = self._get_text_nodes()
659
if self._text_filter is not None:
660
# We could return the keys copied as part of the return value from
661
# _copy_nodes_graph but this doesn't work all that well with the
662
# need to get line output too, so we check separately, and as we're
663
# going to buffer everything anyway, we check beforehand, which
664
# saves reading knit data over the wire when we know there are
666
text_nodes = set(text_nodes)
667
present_text_keys = set(_node[1] for _node in text_nodes)
668
missing_text_keys = set(self._text_filter) - present_text_keys
669
if missing_text_keys:
670
# TODO: raise a specific error that can handle many missing
672
a_missing_key = missing_text_keys.pop()
673
raise errors.RevisionNotPresent(a_missing_key[1],
675
# copy text keys and adjust values
676
self.pb.update("Copying content texts", 3)
677
list(self._copy_nodes_graph(text_nodes, text_index_map,
678
self.new_pack._writer, self.new_pack.text_index))
679
self._log_copied_texts()
681
def _check_references(self):
682
"""Make sure our external refereneces are present."""
683
external_refs = self.new_pack._external_compression_parents_of_texts()
685
index = self._pack_collection.text_index.combined_index
686
found_items = list(index.iter_entries(external_refs))
687
if len(found_items) != len(external_refs):
688
found_keys = set(k for idx, k, refs, value in found_items)
689
missing_items = external_refs - found_keys
690
missing_file_id, missing_revision_id = missing_items.pop()
691
raise errors.RevisionNotPresent(missing_revision_id,
694
def _create_pack_from_packs(self):
695
self.pb.update("Opening pack", 0, 5)
696
self.new_pack = self.open_pack()
697
new_pack = self.new_pack
698
# buffer data - we won't be reading-back during the pack creation and
699
# this makes a significant difference on sftp pushes.
700
new_pack.set_write_cache_size(1024*1024)
701
if 'pack' in debug.debug_flags:
702
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
703
for a_pack in self.packs]
704
if self.revision_ids is not None:
705
rev_count = len(self.revision_ids)
708
mutter('%s: create_pack: creating pack from source packs: '
709
'%s%s %s revisions wanted %s t=0',
710
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
711
plain_pack_list, rev_count)
712
self._copy_revision_texts()
713
self._copy_inventory_texts()
714
self._copy_text_texts()
715
# select signature keys
716
signature_filter = self._revision_keys # same keyspace
717
signature_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
718
self.packs, 'signature_index')[0]
719
signature_nodes = self._pack_collection._index_contents(signature_index_map,
721
# copy signature keys and adjust values
722
self.pb.update("Copying signature texts", 4)
723
self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
724
new_pack.signature_index)
725
if 'pack' in debug.debug_flags:
726
mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
727
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
728
new_pack.signature_index.key_count(),
729
time.time() - new_pack.start_time)
730
self._check_references()
731
if not self._use_pack(new_pack):
734
self.pb.update("Finishing pack", 5)
736
self._pack_collection.allocate(new_pack)
739
def _copy_nodes(self, nodes, index_map, writer, write_index):
740
"""Copy knit nodes between packs with no graph references."""
741
pb = ui.ui_factory.nested_progress_bar()
743
return self._do_copy_nodes(nodes, index_map, writer,
748
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb):
749
# for record verification
750
knit_data = _KnitData(None)
751
# plan a readv on each source pack:
753
nodes = sorted(nodes)
754
# how to map this into knit.py - or knit.py into this?
755
# we don't want the typical knit logic, we want grouping by pack
756
# at this point - perhaps a helper library for the following code
757
# duplication points?
759
for index, key, value in nodes:
760
if index not in request_groups:
761
request_groups[index] = []
762
request_groups[index].append((key, value))
764
pb.update("Copied record", record_index, len(nodes))
765
for index, items in request_groups.iteritems():
766
pack_readv_requests = []
767
for key, value in items:
768
# ---- KnitGraphIndex.get_position
769
bits = value[1:].split(' ')
770
offset, length = int(bits[0]), int(bits[1])
771
pack_readv_requests.append((offset, length, (key, value[0])))
772
# linear scan up the pack
773
pack_readv_requests.sort()
775
transport, path = index_map[index]
776
reader = pack.make_readv_reader(transport, path,
777
[offset[0:2] for offset in pack_readv_requests])
778
for (names, read_func), (_1, _2, (key, eol_flag)) in \
779
izip(reader.iter_records(), pack_readv_requests):
780
raw_data = read_func(None)
781
# check the header only
782
df, _ = knit_data._parse_record_header(key[-1], raw_data)
784
pos, size = writer.add_bytes_record(raw_data, names)
785
write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
786
pb.update("Copied record", record_index)
789
def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
791
"""Copy knit nodes between packs.
793
:param output_lines: Return lines present in the copied data as
794
an iterator of line,version_id.
796
pb = ui.ui_factory.nested_progress_bar()
798
return self._do_copy_nodes_graph(nodes, index_map, writer,
799
write_index, output_lines, pb)
803
def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
805
# for record verification
806
knit_data = _KnitData(None)
807
# for line extraction when requested (inventories only)
809
factory = knit.KnitPlainFactory()
810
# plan a readv on each source pack:
812
nodes = sorted(nodes)
813
# how to map this into knit.py - or knit.py into this?
814
# we don't want the typical knit logic, we want grouping by pack
815
# at this point - perhaps a helper library for the following code
816
# duplication points?
819
pb.update("Copied record", record_index, len(nodes))
820
for index, key, value, references in nodes:
821
if index not in request_groups:
822
request_groups[index] = []
823
request_groups[index].append((key, value, references))
824
for index, items in request_groups.iteritems():
825
pack_readv_requests = []
826
for key, value, references in items:
827
# ---- KnitGraphIndex.get_position
828
bits = value[1:].split(' ')
829
offset, length = int(bits[0]), int(bits[1])
830
pack_readv_requests.append((offset, length, (key, value[0], references)))
831
# linear scan up the pack
832
pack_readv_requests.sort()
834
transport, path = index_map[index]
835
reader = pack.make_readv_reader(transport, path,
836
[offset[0:2] for offset in pack_readv_requests])
837
for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
838
izip(reader.iter_records(), pack_readv_requests):
839
raw_data = read_func(None)
842
# read the entire thing
843
content, _ = knit_data._parse_record(version_id, raw_data)
844
if len(references[-1]) == 0:
845
line_iterator = factory.get_fulltext_content(content)
847
line_iterator = factory.get_linedelta_content(content)
848
for line in line_iterator:
849
yield line, version_id
851
# check the header only
852
df, _ = knit_data._parse_record_header(version_id, raw_data)
854
pos, size = writer.add_bytes_record(raw_data, names)
855
write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
856
pb.update("Copied record", record_index)
859
def _get_text_nodes(self):
860
text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
861
self.packs, 'text_index')[0]
862
return text_index_map, self._pack_collection._index_contents(text_index_map,
865
def _log_copied_texts(self):
866
if 'pack' in debug.debug_flags:
867
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
868
time.ctime(), self._pack_collection._upload_transport.base,
869
self.new_pack.random_name,
870
self.new_pack.text_index.key_count(),
871
time.time() - self.new_pack.start_time)
873
def _process_inventory_lines(self, inv_lines):
874
"""Use up the inv_lines generator and setup a text key filter."""
875
repo = self._pack_collection.repo
876
fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
877
inv_lines, self.revision_ids)
879
for fileid, file_revids in fileid_revisions.iteritems():
880
text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
881
self._text_filter = text_filter
883
def _use_pack(self, new_pack):
884
"""Return True if new_pack should be used.
886
:param new_pack: The pack that has just been created.
887
:return: True if the pack should be used.
889
return new_pack.data_inserted()
892
class ReconcilePacker(Packer):
893
"""A packer which regenerates indices etc as it copies.
895
This is used by ``bzr reconcile`` to cause parent text pointers to be
899
def _extra_init(self):
900
self._data_changed = False
902
def _process_inventory_lines(self, inv_lines):
903
"""Generate a text key reference map rather for reconciling with."""
904
repo = self._pack_collection.repo
905
refs = repo._find_text_key_references_from_xml_inventory_lines(
907
self._text_refs = refs
908
# during reconcile we:
909
# - convert unreferenced texts to full texts
910
# - correct texts which reference a text not copied to be full texts
911
# - copy all others as-is but with corrected parents.
912
# - so at this point we don't know enough to decide what becomes a full
914
self._text_filter = None
916
def _copy_text_texts(self):
917
"""generate what texts we should have and then copy."""
918
self.pb.update("Copying content texts", 3)
919
# we have three major tasks here:
920
# 1) generate the ideal index
921
repo = self._pack_collection.repo
922
ideal_index = repo._generate_text_key_index(self._text_refs)
923
# 2) generate a text_nodes list that contains all the deltas that can
924
# be used as-is, with corrected parents.
928
NULL_REVISION = _mod_revision.NULL_REVISION
929
text_index_map, text_nodes = self._get_text_nodes()
930
for node in text_nodes:
936
ideal_parents = tuple(ideal_index[node[1]])
938
discarded_nodes.append(node)
939
self._data_changed = True
941
if ideal_parents == (NULL_REVISION,):
943
if ideal_parents == node[3][0]:
945
ok_nodes.append(node)
946
elif ideal_parents[0:1] == node[3][0][0:1]:
947
# the left most parent is the same, or there are no parents
948
# today. Either way, we can preserve the representation as
949
# long as we change the refs to be inserted.
950
self._data_changed = True
951
ok_nodes.append((node[0], node[1], node[2],
952
(ideal_parents, node[3][1])))
953
self._data_changed = True
955
# Reinsert this text completely
956
bad_texts.append((node[1], ideal_parents))
957
self._data_changed = True
958
# we're finished with some data.
961
# 3) bulk copy the ok data
962
list(self._copy_nodes_graph(ok_nodes, text_index_map,
963
self.new_pack._writer, self.new_pack.text_index))
964
# 3) adhoc copy all the other texts.
965
transaction = repo.get_transaction()
966
file_id_index = GraphIndexPrefixAdapter(
967
self.new_pack.text_index,
969
add_nodes_callback=self.new_pack.text_index.add_nodes)
970
knit_index = KnitGraphIndex(file_id_index,
971
add_callback=file_id_index.add_nodes,
972
deltas=True, parents=True)
973
output_knit = knit.KnitVersionedFile('reconcile-texts',
974
self._pack_collection.transport,
977
access_method=_PackAccess(
978
{self.new_pack.text_index:self.new_pack.access_tuple()},
979
(self.new_pack._writer, self.new_pack.text_index)),
980
factory=knit.KnitPlainFactory())
981
for key, parent_keys in bad_texts:
982
# We refer to the new pack to delta data being output.
983
# A possible improvement would be to catch errors on short reads
984
# and only flush then.
985
self.new_pack.flush()
987
for parent_key in parent_keys:
988
if parent_key[0] != key[0]:
989
# Graph parents must match the fileid
990
raise errors.BzrError('Mismatched key parent %r:%r' %
992
parents.append(parent_key[1])
993
source_weave = repo.weave_store.get_weave(key[0], transaction)
994
text_lines = source_weave.get_lines(key[1])
995
# adapt the 'knit' to the current file_id.
996
file_id_index = GraphIndexPrefixAdapter(
997
self.new_pack.text_index,
999
add_nodes_callback=self.new_pack.text_index.add_nodes)
1000
knit_index._graph_index = file_id_index
1001
knit_index._add_callback = file_id_index.add_nodes
1002
output_knit.add_lines_with_ghosts(
1003
key[1], parents, text_lines, random_id=True, check_content=False)
1004
# 4) check that nothing inserted has a reference outside the keyspace.
1005
missing_text_keys = self.new_pack._external_compression_parents_of_texts()
1006
if missing_text_keys:
1007
raise errors.BzrError('Reference to missing compression parents %r'
1009
self._log_copied_texts()
1011
def _use_pack(self, new_pack):
1012
"""Override _use_pack to check for reconcile having changed content."""
1013
# XXX: we might be better checking this at the copy time.
1014
original_inventory_keys = set()
1015
inv_index = self._pack_collection.inventory_index.combined_index
1016
for entry in inv_index.iter_all_entries():
1017
original_inventory_keys.add(entry[1])
1018
new_inventory_keys = set()
1019
for entry in new_pack.inventory_index.iter_all_entries():
1020
new_inventory_keys.add(entry[1])
1021
if new_inventory_keys != original_inventory_keys:
1022
self._data_changed = True
1023
return new_pack.data_inserted() and self._data_changed
1026
class RepositoryPackCollection(object):
1027
"""Management of packs within a repository."""
1029
def __init__(self, repo, transport, index_transport, upload_transport,
1031
"""Create a new RepositoryPackCollection.
1033
:param transport: Addresses the repository base directory
1034
(typically .bzr/repository/).
1035
:param index_transport: Addresses the directory containing indices.
1036
:param upload_transport: Addresses the directory into which packs are written
1037
while they're being created.
1038
:param pack_transport: Addresses the directory of existing complete packs.
1041
self.transport = transport
1042
self._index_transport = index_transport
1043
self._upload_transport = upload_transport
1044
self._pack_transport = pack_transport
1045
self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3}
1048
self._packs_by_name = {}
1049
# the previous pack-names content
1050
self._packs_at_load = None
1051
# when a pack is being created by this object, the state of that pack.
1052
self._new_pack = None
1053
# aggregated revision index data
1054
self.revision_index = AggregateIndex()
1055
self.inventory_index = AggregateIndex()
1056
self.text_index = AggregateIndex()
1057
self.signature_index = AggregateIndex()
1059
def add_pack_to_memory(self, pack):
1060
"""Make a Pack object available to the repository to satisfy queries.
1062
:param pack: A Pack object.
1064
assert pack.name not in self._packs_by_name
1065
self.packs.append(pack)
1066
self._packs_by_name[pack.name] = pack
1067
self.revision_index.add_index(pack.revision_index, pack)
1068
self.inventory_index.add_index(pack.inventory_index, pack)
1069
self.text_index.add_index(pack.text_index, pack)
1070
self.signature_index.add_index(pack.signature_index, pack)
1072
def _add_text_to_weave(self, file_id, revision_id, new_lines, parents,
1073
nostore_sha, random_revid):
1074
file_id_index = GraphIndexPrefixAdapter(
1075
self.text_index.combined_index,
1077
add_nodes_callback=self.text_index.add_callback)
1078
self.repo._text_knit._index._graph_index = file_id_index
1079
self.repo._text_knit._index._add_callback = file_id_index.add_nodes
1080
return self.repo._text_knit.add_lines_with_ghosts(
1081
revision_id, parents, new_lines, nostore_sha=nostore_sha,
1082
random_id=random_revid, check_content=False)[0:2]
1084
def all_packs(self):
1085
"""Return a list of all the Pack objects this repository has.
1087
Note that an in-progress pack being created is not returned.
1089
:return: A list of Pack objects for all the packs in the repository.
1092
for name in self.names():
1093
result.append(self.get_pack_by_name(name))
1097
"""Pack the pack collection incrementally.
1099
This will not attempt global reorganisation or recompression,
1100
rather it will just ensure that the total number of packs does
1101
not grow without bound. It uses the _max_pack_count method to
1102
determine if autopacking is needed, and the pack_distribution
1103
method to determine the number of revisions in each pack.
1105
If autopacking takes place then the packs name collection will have
1106
been flushed to disk - packing requires updating the name collection
1107
in synchronisation with certain steps. Otherwise the names collection
1110
:return: True if packing took place.
1112
# XXX: Should not be needed when the management of indices is sane.
1113
total_revisions = self.revision_index.combined_index.key_count()
1114
total_packs = len(self._names)
1115
if self._max_pack_count(total_revisions) >= total_packs:
1117
# XXX: the following may want to be a class, to pack with a given
1119
mutter('Auto-packing repository %s, which has %d pack files, '
1120
'containing %d revisions into %d packs.', self, total_packs,
1121
total_revisions, self._max_pack_count(total_revisions))
1122
# determine which packs need changing
1123
pack_distribution = self.pack_distribution(total_revisions)
1125
for pack in self.all_packs():
1126
revision_count = pack.get_revision_count()
1127
if revision_count == 0:
1128
# revision less packs are not generated by normal operation,
1129
# only by operations like sign-my-commits, and thus will not
1130
# tend to grow rapdily or without bound like commit containing
1131
# packs do - leave them alone as packing them really should
1132
# group their data with the relevant commit, and that may
1133
# involve rewriting ancient history - which autopack tries to
1134
# avoid. Alternatively we could not group the data but treat
1135
# each of these as having a single revision, and thus add
1136
# one revision for each to the total revision count, to get
1137
# a matching distribution.
1139
existing_packs.append((revision_count, pack))
1140
pack_operations = self.plan_autopack_combinations(
1141
existing_packs, pack_distribution)
1142
self._execute_pack_operations(pack_operations)
1145
def _execute_pack_operations(self, pack_operations):
1146
"""Execute a series of pack operations.
1148
:param pack_operations: A list of [revision_count, packs_to_combine].
1151
for revision_count, packs in pack_operations:
1152
# we may have no-ops from the setup logic
1155
Packer(self, packs, '.autopack').pack()
1157
self._remove_pack_from_memory(pack)
1158
# record the newly available packs and stop advertising the old
1160
self._save_pack_names(clear_obsolete_packs=True)
1161
# Move the old packs out of the way now they are no longer referenced.
1162
for revision_count, packs in pack_operations:
1163
self._obsolete_packs(packs)
1165
def lock_names(self):
1166
"""Acquire the mutex around the pack-names index.
1168
This cannot be used in the middle of a read-only transaction on the
1171
self.repo.control_files.lock_write()
1174
"""Pack the pack collection totally."""
1175
self.ensure_loaded()
1176
total_packs = len(self._names)
1179
total_revisions = self.revision_index.combined_index.key_count()
1180
# XXX: the following may want to be a class, to pack with a given
1182
mutter('Packing repository %s, which has %d pack files, '
1183
'containing %d revisions into 1 packs.', self, total_packs,
1185
# determine which packs need changing
1186
pack_distribution = [1]
1187
pack_operations = [[0, []]]
1188
for pack in self.all_packs():
1189
revision_count = pack.get_revision_count()
1190
pack_operations[-1][0] += revision_count
1191
pack_operations[-1][1].append(pack)
1192
self._execute_pack_operations(pack_operations)
1194
def plan_autopack_combinations(self, existing_packs, pack_distribution):
1195
"""Plan a pack operation.
1197
:param existing_packs: The packs to pack. (A list of (revcount, Pack)
1199
:param pack_distribution: A list with the number of revisions desired
1202
if len(existing_packs) <= len(pack_distribution):
1204
existing_packs.sort(reverse=True)
1205
pack_operations = [[0, []]]
1206
# plan out what packs to keep, and what to reorganise
1207
while len(existing_packs):
1208
# take the largest pack, and if its less than the head of the
1209
# distribution chart we will include its contents in the new pack for
1210
# that position. If its larger, we remove its size from the
1211
# distribution chart
1212
next_pack_rev_count, next_pack = existing_packs.pop(0)
1213
if next_pack_rev_count >= pack_distribution[0]:
1214
# this is already packed 'better' than this, so we can
1215
# not waste time packing it.
1216
while next_pack_rev_count > 0:
1217
next_pack_rev_count -= pack_distribution[0]
1218
if next_pack_rev_count >= 0:
1220
del pack_distribution[0]
1222
# didn't use that entire bucket up
1223
pack_distribution[0] = -next_pack_rev_count
1225
# add the revisions we're going to add to the next output pack
1226
pack_operations[-1][0] += next_pack_rev_count
1227
# allocate this pack to the next pack sub operation
1228
pack_operations[-1][1].append(next_pack)
1229
if pack_operations[-1][0] >= pack_distribution[0]:
1230
# this pack is used up, shift left.
1231
del pack_distribution[0]
1232
pack_operations.append([0, []])
1234
return pack_operations
1236
def ensure_loaded(self):
1237
# NB: if you see an assertion error here, its probably access against
1238
# an unlocked repo. Naughty.
1239
assert self.repo.is_locked()
1240
if self._names is None:
1242
self._packs_at_load = set()
1243
for index, key, value in self._iter_disk_pack_index():
1245
self._names[name] = self._parse_index_sizes(value)
1246
self._packs_at_load.add((key, value))
1247
# populate all the metadata.
1250
def _parse_index_sizes(self, value):
1251
"""Parse a string of index sizes."""
1252
return tuple([int(digits) for digits in value.split(' ')])
1254
def get_pack_by_name(self, name):
1255
"""Get a Pack object by name.
1257
:param name: The name of the pack - e.g. '123456'
1258
:return: A Pack object.
1261
return self._packs_by_name[name]
1263
rev_index = self._make_index(name, '.rix')
1264
inv_index = self._make_index(name, '.iix')
1265
txt_index = self._make_index(name, '.tix')
1266
sig_index = self._make_index(name, '.six')
1267
result = ExistingPack(self._pack_transport, name, rev_index,
1268
inv_index, txt_index, sig_index)
1269
self.add_pack_to_memory(result)
1272
def allocate(self, a_new_pack):
1273
"""Allocate name in the list of packs.
1275
:param a_new_pack: A NewPack instance to be added to the collection of
1276
packs for this repository.
1278
self.ensure_loaded()
1279
if a_new_pack.name in self._names:
1280
raise errors.BzrError(
1281
'Pack %r already exists in %s' % (a_new_pack.name, self))
1282
self._names[a_new_pack.name] = tuple(a_new_pack.index_sizes)
1283
self.add_pack_to_memory(a_new_pack)
1285
def _iter_disk_pack_index(self):
1286
"""Iterate over the contents of the pack-names index.
1288
This is used when loading the list from disk, and before writing to
1289
detect updates from others during our write operation.
1290
:return: An iterator of the index contents.
1292
return GraphIndex(self.transport, 'pack-names', None
1293
).iter_all_entries()
1295
def _make_index(self, name, suffix):
1296
size_offset = self._suffix_offsets[suffix]
1297
index_name = name + suffix
1298
index_size = self._names[name][size_offset]
1300
self._index_transport, index_name, index_size)
1302
def _max_pack_count(self, total_revisions):
1303
"""Return the maximum number of packs to use for total revisions.
1305
:param total_revisions: The total number of revisions in the
1308
if not total_revisions:
1310
digits = str(total_revisions)
1312
for digit in digits:
1313
result += int(digit)
1317
"""Provide an order to the underlying names."""
1318
return sorted(self._names.keys())
1320
def _obsolete_packs(self, packs):
1321
"""Move a number of packs which have been obsoleted out of the way.
1323
Each pack and its associated indices are moved out of the way.
1325
Note: for correctness this function should only be called after a new
1326
pack names index has been written without these pack names, and with
1327
the names of packs that contain the data previously available via these
1330
:param packs: The packs to obsolete.
1331
:param return: None.
1334
pack.pack_transport.rename(pack.file_name(),
1335
'../obsolete_packs/' + pack.file_name())
1336
# TODO: Probably needs to know all possible indices for this pack
1337
# - or maybe list the directory and move all indices matching this
1338
# name whether we recognize it or not?
1339
for suffix in ('.iix', '.six', '.tix', '.rix'):
1340
self._index_transport.rename(pack.name + suffix,
1341
'../obsolete_packs/' + pack.name + suffix)
1343
def pack_distribution(self, total_revisions):
1344
"""Generate a list of the number of revisions to put in each pack.
1346
:param total_revisions: The total number of revisions in the
1349
if total_revisions == 0:
1351
digits = reversed(str(total_revisions))
1353
for exponent, count in enumerate(digits):
1354
size = 10 ** exponent
1355
for pos in range(int(count)):
1357
return list(reversed(result))
1359
def _pack_tuple(self, name):
1360
"""Return a tuple with the transport and file name for a pack name."""
1361
return self._pack_transport, name + '.pack'
1363
def _remove_pack_from_memory(self, pack):
1364
"""Remove pack from the packs accessed by this repository.
1366
Only affects memory state, until self._save_pack_names() is invoked.
1368
self._names.pop(pack.name)
1369
self._packs_by_name.pop(pack.name)
1370
self._remove_pack_indices(pack)
1372
def _remove_pack_indices(self, pack):
1373
"""Remove the indices for pack from the aggregated indices."""
1374
self.revision_index.remove_index(pack.revision_index, pack)
1375
self.inventory_index.remove_index(pack.inventory_index, pack)
1376
self.text_index.remove_index(pack.text_index, pack)
1377
self.signature_index.remove_index(pack.signature_index, pack)
1380
"""Clear all cached data."""
1381
# cached revision data
1382
self.repo._revision_knit = None
1383
self.revision_index.clear()
1384
# cached signature data
1385
self.repo._signature_knit = None
1386
self.signature_index.clear()
1387
# cached file text data
1388
self.text_index.clear()
1389
self.repo._text_knit = None
1390
# cached inventory data
1391
self.inventory_index.clear()
1392
# remove the open pack
1393
self._new_pack = None
1394
# information about packs.
1397
self._packs_by_name = {}
1398
self._packs_at_load = None
1400
def _make_index_map(self, index_suffix):
1401
"""Return information on existing indices.
1403
:param suffix: Index suffix added to pack name.
1405
:returns: (pack_map, indices) where indices is a list of GraphIndex
1406
objects, and pack_map is a mapping from those objects to the
1407
pack tuple they describe.
1409
# TODO: stop using this; it creates new indices unnecessarily.
1410
self.ensure_loaded()
1411
suffix_map = {'.rix': 'revision_index',
1412
'.six': 'signature_index',
1413
'.iix': 'inventory_index',
1414
'.tix': 'text_index',
1416
return self._packs_list_to_pack_map_and_index_list(self.all_packs(),
1417
suffix_map[index_suffix])
1419
def _packs_list_to_pack_map_and_index_list(self, packs, index_attribute):
1420
"""Convert a list of packs to an index pack map and index list.
1422
:param packs: The packs list to process.
1423
:param index_attribute: The attribute that the desired index is found
1425
:return: A tuple (map, list) where map contains the dict from
1426
index:pack_tuple, and lsit contains the indices in the same order
1432
index = getattr(pack, index_attribute)
1433
indices.append(index)
1434
pack_map[index] = (pack.pack_transport, pack.file_name())
1435
return pack_map, indices
1437
def _index_contents(self, pack_map, key_filter=None):
1438
"""Get an iterable of the index contents from a pack_map.
1440
:param pack_map: A map from indices to pack details.
1441
:param key_filter: An optional filter to limit the
1444
indices = [index for index in pack_map.iterkeys()]
1445
all_index = CombinedGraphIndex(indices)
1446
if key_filter is None:
1447
return all_index.iter_all_entries()
1449
return all_index.iter_entries(key_filter)
1451
def _unlock_names(self):
1452
"""Release the mutex around the pack-names index."""
1453
self.repo.control_files.unlock()
1455
def _save_pack_names(self, clear_obsolete_packs=False):
1456
"""Save the list of packs.
1458
This will take out the mutex around the pack names list for the
1459
duration of the method call. If concurrent updates have been made, a
1460
three-way merge between the current list and the current in memory list
1463
:param clear_obsolete_packs: If True, clear out the contents of the
1464
obsolete_packs directory.
1468
builder = GraphIndexBuilder()
1469
# load the disk nodes across
1471
for index, key, value in self._iter_disk_pack_index():
1472
disk_nodes.add((key, value))
1473
# do a two-way diff against our original content
1474
current_nodes = set()
1475
for name, sizes in self._names.iteritems():
1477
((name, ), ' '.join(str(size) for size in sizes)))
1478
deleted_nodes = self._packs_at_load - current_nodes
1479
new_nodes = current_nodes - self._packs_at_load
1480
disk_nodes.difference_update(deleted_nodes)
1481
disk_nodes.update(new_nodes)
1482
# TODO: handle same-name, index-size-changes here -
1483
# e.g. use the value from disk, not ours, *unless* we're the one
1485
for key, value in disk_nodes:
1486
builder.add_node(key, value)
1487
self.transport.put_file('pack-names', builder.finish(),
1488
mode=self.repo.control_files._file_mode)
1489
# move the baseline forward
1490
self._packs_at_load = disk_nodes
1491
# now clear out the obsolete packs directory
1492
if clear_obsolete_packs:
1493
self.transport.clone('obsolete_packs').delete_multi(
1494
self.transport.list_dir('obsolete_packs'))
1496
self._unlock_names()
1497
# synchronise the memory packs list with what we just wrote:
1498
new_names = dict(disk_nodes)
1499
# drop no longer present nodes
1500
for pack in self.all_packs():
1501
if (pack.name,) not in new_names:
1502
self._remove_pack_from_memory(pack)
1503
# add new nodes/refresh existing ones
1504
for key, value in disk_nodes:
1506
sizes = self._parse_index_sizes(value)
1507
if name in self._names:
1509
if sizes != self._names[name]:
1510
# the pack for name has had its indices replaced - rare but
1511
# important to handle. XXX: probably can never happen today
1512
# because the three-way merge code above does not handle it
1513
# - you may end up adding the same key twice to the new
1514
# disk index because the set values are the same, unless
1515
# the only index shows up as deleted by the set difference
1516
# - which it may. Until there is a specific test for this,
1517
# assume its broken. RBC 20071017.
1518
self._remove_pack_from_memory(self.get_pack_by_name(name))
1519
self._names[name] = sizes
1520
self.get_pack_by_name(name)
1523
self._names[name] = sizes
1524
self.get_pack_by_name(name)
1526
def _start_write_group(self):
1527
# Do not permit preparation for writing if we're not in a 'write lock'.
1528
if not self.repo.is_write_locked():
1529
raise errors.NotWriteLocked(self)
1530
self._new_pack = NewPack(self._upload_transport, self._index_transport,
1531
self._pack_transport, upload_suffix='.pack',
1532
file_mode=self.repo.control_files._file_mode)
1533
# allow writing: queue writes to a new index
1534
self.revision_index.add_writable_index(self._new_pack.revision_index,
1536
self.inventory_index.add_writable_index(self._new_pack.inventory_index,
1538
self.text_index.add_writable_index(self._new_pack.text_index,
1540
self.signature_index.add_writable_index(self._new_pack.signature_index,
1543
# reused revision and signature knits may need updating
1545
# "Hysterical raisins. client code in bzrlib grabs those knits outside
1546
# of write groups and then mutates it inside the write group."
1547
if self.repo._revision_knit is not None:
1548
self.repo._revision_knit._index._add_callback = \
1549
self.revision_index.add_callback
1550
if self.repo._signature_knit is not None:
1551
self.repo._signature_knit._index._add_callback = \
1552
self.signature_index.add_callback
1553
# create a reused knit object for text addition in commit.
1554
self.repo._text_knit = self.repo.weave_store.get_weave_or_empty(
1557
def _abort_write_group(self):
1558
# FIXME: just drop the transient index.
1559
# forget what names there are
1560
self._new_pack.abort()
1561
self._remove_pack_indices(self._new_pack)
1562
self._new_pack = None
1563
self.repo._text_knit = None
1565
def _commit_write_group(self):
1566
self._remove_pack_indices(self._new_pack)
1567
if self._new_pack.data_inserted():
1568
# get all the data to disk and read to use
1569
self._new_pack.finish()
1570
self.allocate(self._new_pack)
1571
self._new_pack = None
1572
if not self.autopack():
1573
# when autopack takes no steps, the names list is still
1575
self._save_pack_names()
1577
self._new_pack.abort()
1578
self._new_pack = None
1579
self.repo._text_knit = None
1582
class KnitPackRevisionStore(KnitRevisionStore):
1583
"""An object to adapt access from RevisionStore's to use KnitPacks.
1585
This class works by replacing the original RevisionStore.
1586
We need to do this because the KnitPackRevisionStore is less
1587
isolated in its layering - it uses services from the repo.
1590
def __init__(self, repo, transport, revisionstore):
1591
"""Create a KnitPackRevisionStore on repo with revisionstore.
1593
This will store its state in the Repository, use the
1594
indices to provide a KnitGraphIndex,
1595
and at the end of transactions write new indices.
1597
KnitRevisionStore.__init__(self, revisionstore.versioned_file_store)
1599
self._serializer = revisionstore._serializer
1600
self.transport = transport
1602
def get_revision_file(self, transaction):
1603
"""Get the revision versioned file object."""
1604
if getattr(self.repo, '_revision_knit', None) is not None:
1605
return self.repo._revision_knit
1606
self.repo._pack_collection.ensure_loaded()
1607
add_callback = self.repo._pack_collection.revision_index.add_callback
1608
# setup knit specific objects
1609
knit_index = KnitGraphIndex(
1610
self.repo._pack_collection.revision_index.combined_index,
1611
add_callback=add_callback)
1612
self.repo._revision_knit = knit.KnitVersionedFile(
1613
'revisions', self.transport.clone('..'),
1614
self.repo.control_files._file_mode,
1615
create=False, access_mode=self.repo._access_mode(),
1616
index=knit_index, delta=False, factory=knit.KnitPlainFactory(),
1617
access_method=self.repo._pack_collection.revision_index.knit_access)
1618
return self.repo._revision_knit
1620
def get_signature_file(self, transaction):
1621
"""Get the signature versioned file object."""
1622
if getattr(self.repo, '_signature_knit', None) is not None:
1623
return self.repo._signature_knit
1624
self.repo._pack_collection.ensure_loaded()
1625
add_callback = self.repo._pack_collection.signature_index.add_callback
1626
# setup knit specific objects
1627
knit_index = KnitGraphIndex(
1628
self.repo._pack_collection.signature_index.combined_index,
1629
add_callback=add_callback, parents=False)
1630
self.repo._signature_knit = knit.KnitVersionedFile(
1631
'signatures', self.transport.clone('..'),
1632
self.repo.control_files._file_mode,
1633
create=False, access_mode=self.repo._access_mode(),
1634
index=knit_index, delta=False, factory=knit.KnitPlainFactory(),
1635
access_method=self.repo._pack_collection.signature_index.knit_access)
1636
return self.repo._signature_knit
1639
class KnitPackTextStore(VersionedFileStore):
1640
"""Presents a TextStore abstraction on top of packs.
1642
This class works by replacing the original VersionedFileStore.
1643
We need to do this because the KnitPackRevisionStore is less
1644
isolated in its layering - it uses services from the repo and shares them
1645
with all the data written in a single write group.
1648
def __init__(self, repo, transport, weavestore):
1649
"""Create a KnitPackTextStore on repo with weavestore.
1651
This will store its state in the Repository, use the
1652
indices FileNames to provide a KnitGraphIndex,
1653
and at the end of transactions write new indices.
1655
# don't call base class constructor - it's not suitable.
1656
# no transient data stored in the transaction
1658
self._precious = False
1660
self.transport = transport
1661
self.weavestore = weavestore
1662
# XXX for check() which isn't updated yet
1663
self._transport = weavestore._transport
1665
def get_weave_or_empty(self, file_id, transaction):
1666
"""Get a 'Knit' backed by the .tix indices.
1668
The transaction parameter is ignored.
1670
self.repo._pack_collection.ensure_loaded()
1671
add_callback = self.repo._pack_collection.text_index.add_callback
1672
# setup knit specific objects
1673
file_id_index = GraphIndexPrefixAdapter(
1674
self.repo._pack_collection.text_index.combined_index,
1675
(file_id, ), 1, add_nodes_callback=add_callback)
1676
knit_index = KnitGraphIndex(file_id_index,
1677
add_callback=file_id_index.add_nodes,
1678
deltas=True, parents=True)
1679
return knit.KnitVersionedFile('text:' + file_id,
1680
self.transport.clone('..'),
1683
access_method=self.repo._pack_collection.text_index.knit_access,
1684
factory=knit.KnitPlainFactory())
1686
get_weave = get_weave_or_empty
1689
"""Generate a list of the fileids inserted, for use by check."""
1690
self.repo._pack_collection.ensure_loaded()
1692
for index, key, value, refs in \
1693
self.repo._pack_collection.text_index.combined_index.iter_all_entries():
1698
class InventoryKnitThunk(object):
1699
"""An object to manage thunking get_inventory_weave to pack based knits."""
1701
def __init__(self, repo, transport):
1702
"""Create an InventoryKnitThunk for repo at transport.
1704
This will store its state in the Repository, use the
1705
indices FileNames to provide a KnitGraphIndex,
1706
and at the end of transactions write a new index..
1709
self.transport = transport
1711
def get_weave(self):
1712
"""Get a 'Knit' that contains inventory data."""
1713
self.repo._pack_collection.ensure_loaded()
1714
add_callback = self.repo._pack_collection.inventory_index.add_callback
1715
# setup knit specific objects
1716
knit_index = KnitGraphIndex(
1717
self.repo._pack_collection.inventory_index.combined_index,
1718
add_callback=add_callback, deltas=True, parents=True)
1719
return knit.KnitVersionedFile(
1720
'inventory', self.transport.clone('..'),
1721
self.repo.control_files._file_mode,
1722
create=False, access_mode=self.repo._access_mode(),
1723
index=knit_index, delta=True, factory=knit.KnitPlainFactory(),
1724
access_method=self.repo._pack_collection.inventory_index.knit_access)
1727
class KnitPackRepository(KnitRepository):
1728
"""Experimental graph-knit using repository."""
1730
def __init__(self, _format, a_bzrdir, control_files, _revision_store,
1731
control_store, text_store, _commit_builder_class, _serializer):
1732
KnitRepository.__init__(self, _format, a_bzrdir, control_files,
1733
_revision_store, control_store, text_store, _commit_builder_class,
1735
index_transport = control_files._transport.clone('indices')
1736
self._pack_collection = RepositoryPackCollection(self, control_files._transport,
1738
control_files._transport.clone('upload'),
1739
control_files._transport.clone('packs'))
1740
self._revision_store = KnitPackRevisionStore(self, index_transport, self._revision_store)
1741
self.weave_store = KnitPackTextStore(self, index_transport, self.weave_store)
1742
self._inv_thunk = InventoryKnitThunk(self, index_transport)
1743
# True when the repository object is 'write locked' (as opposed to the
1744
# physical lock only taken out around changes to the pack-names list.)
1745
# Another way to represent this would be a decorator around the control
1746
# files object that presents logical locks as physical ones - if this
1747
# gets ugly consider that alternative design. RBC 20071011
1748
self._write_lock_count = 0
1749
self._transaction = None
1751
self._reconcile_does_inventory_gc = True
1752
self._reconcile_fixes_text_parents = True
1753
self._reconcile_backsup_inventory = False
1755
def _abort_write_group(self):
1756
self._pack_collection._abort_write_group()
1758
def _access_mode(self):
1759
"""Return 'w' or 'r' for depending on whether a write lock is active.
1761
This method is a helper for the Knit-thunking support objects.
1763
if self.is_write_locked():
1767
def _find_inconsistent_revision_parents(self):
1768
"""Find revisions with incorrectly cached parents.
1770
:returns: an iterator yielding tuples of (revison-id, parents-in-index,
1771
parents-in-revision).
1773
assert self.is_locked()
1774
pb = ui.ui_factory.nested_progress_bar()
1777
revision_nodes = self._pack_collection.revision_index \
1778
.combined_index.iter_all_entries()
1779
index_positions = []
1780
# Get the cached index values for all revisions, and also the location
1781
# in each index of the revision text so we can perform linear IO.
1782
for index, key, value, refs in revision_nodes:
1783
pos, length = value[1:].split(' ')
1784
index_positions.append((index, int(pos), key[0],
1785
tuple(parent[0] for parent in refs[0])))
1786
pb.update("Reading revision index.", 0, 0)
1787
index_positions.sort()
1788
batch_count = len(index_positions) / 1000 + 1
1789
pb.update("Checking cached revision graph.", 0, batch_count)
1790
for offset in xrange(batch_count):
1791
pb.update("Checking cached revision graph.", offset)
1792
to_query = index_positions[offset * 1000:(offset + 1) * 1000]
1795
rev_ids = [item[2] for item in to_query]
1796
revs = self.get_revisions(rev_ids)
1797
for revision, item in zip(revs, to_query):
1798
index_parents = item[3]
1799
rev_parents = tuple(revision.parent_ids)
1800
if index_parents != rev_parents:
1801
result.append((revision.revision_id, index_parents, rev_parents))
1806
def get_parents(self, revision_ids):
1807
"""See StackedParentsProvider.get_parents.
1809
This implementation accesses the combined revision index to provide
1812
self._pack_collection.ensure_loaded()
1813
index = self._pack_collection.revision_index.combined_index
1815
for revision_id in revision_ids:
1816
if revision_id != _mod_revision.NULL_REVISION:
1817
search_keys.add((revision_id,))
1818
found_parents = {_mod_revision.NULL_REVISION:[]}
1819
for index, key, value, refs in index.iter_entries(search_keys):
1822
parents = (_mod_revision.NULL_REVISION,)
1824
parents = tuple(parent[0] for parent in parents)
1825
found_parents[key[0]] = parents
1827
for revision_id in revision_ids:
1829
result.append(found_parents[revision_id])
1834
def _make_parents_provider(self):
1837
def _refresh_data(self):
1838
if self._write_lock_count == 1 or (
1839
self.control_files._lock_count == 1 and
1840
self.control_files._lock_mode == 'r'):
1841
# forget what names there are
1842
self._pack_collection.reset()
1843
# XXX: Better to do an in-memory merge when acquiring a new lock -
1844
# factor out code from _save_pack_names.
1845
self._pack_collection.ensure_loaded()
1847
def _start_write_group(self):
1848
self._pack_collection._start_write_group()
1850
def _commit_write_group(self):
1851
return self._pack_collection._commit_write_group()
1853
def get_inventory_weave(self):
1854
return self._inv_thunk.get_weave()
1856
def get_transaction(self):
1857
if self._write_lock_count:
1858
return self._transaction
1860
return self.control_files.get_transaction()
1862
def is_locked(self):
1863
return self._write_lock_count or self.control_files.is_locked()
1865
def is_write_locked(self):
1866
return self._write_lock_count
1868
def lock_write(self, token=None):
1869
if not self._write_lock_count and self.is_locked():
1870
raise errors.ReadOnlyError(self)
1871
self._write_lock_count += 1
1872
if self._write_lock_count == 1:
1873
from bzrlib import transactions
1874
self._transaction = transactions.WriteTransaction()
1875
self._refresh_data()
1877
def lock_read(self):
1878
if self._write_lock_count:
1879
self._write_lock_count += 1
1881
self.control_files.lock_read()
1882
self._refresh_data()
1884
def leave_lock_in_place(self):
1885
# not supported - raise an error
1886
raise NotImplementedError(self.leave_lock_in_place)
1888
def dont_leave_lock_in_place(self):
1889
# not supported - raise an error
1890
raise NotImplementedError(self.dont_leave_lock_in_place)
1894
"""Compress the data within the repository.
1896
This will pack all the data to a single pack. In future it may
1897
recompress deltas or do other such expensive operations.
1899
self._pack_collection.pack()
1902
def reconcile(self, other=None, thorough=False):
1903
"""Reconcile this repository."""
1904
from bzrlib.reconcile import PackReconciler
1905
reconciler = PackReconciler(self, thorough=thorough)
1906
reconciler.reconcile()
1910
if self._write_lock_count == 1 and self._write_group is not None:
1911
self.abort_write_group()
1912
self._transaction = None
1913
self._write_lock_count = 0
1914
raise errors.BzrError(
1915
'Must end write group before releasing write lock on %s'
1917
if self._write_lock_count:
1918
self._write_lock_count -= 1
1919
if not self._write_lock_count:
1920
transaction = self._transaction
1921
self._transaction = None
1922
transaction.finish()
1924
self.control_files.unlock()
1927
class RepositoryFormatPack(MetaDirRepositoryFormat):
1928
"""Format logic for pack structured repositories.
1930
This repository format has:
1931
- a list of packs in pack-names
1932
- packs in packs/NAME.pack
1933
- indices in indices/NAME.{iix,six,tix,rix}
1934
- knit deltas in the packs, knit indices mapped to the indices.
1935
- thunk objects to support the knits programming API.
1936
- a format marker of its own
1937
- an optional 'shared-storage' flag
1938
- an optional 'no-working-trees' flag
1942
# Set this attribute in derived classes to control the repository class
1943
# created by open and initialize.
1944
repository_class = None
1945
# Set this attribute in derived classes to control the
1946
# _commit_builder_class that the repository objects will have passed to
1947
# their constructor.
1948
_commit_builder_class = None
1949
# Set this attribute in derived clases to control the _serializer that the
1950
# repository objects will have passed to their constructor.
1953
def _get_control_store(self, repo_transport, control_files):
1954
"""Return the control store for this repository."""
1955
return VersionedFileStore(
1958
file_mode=control_files._file_mode,
1959
versionedfile_class=knit.KnitVersionedFile,
1960
versionedfile_kwargs={'factory': knit.KnitPlainFactory()},
1963
def _get_revision_store(self, repo_transport, control_files):
1964
"""See RepositoryFormat._get_revision_store()."""
1965
versioned_file_store = VersionedFileStore(
1967
file_mode=control_files._file_mode,
1970
versionedfile_class=knit.KnitVersionedFile,
1971
versionedfile_kwargs={'delta': False,
1972
'factory': knit.KnitPlainFactory(),
1976
return KnitRevisionStore(versioned_file_store)
1978
def _get_text_store(self, transport, control_files):
1979
"""See RepositoryFormat._get_text_store()."""
1980
return self._get_versioned_file_store('knits',
1983
versionedfile_class=knit.KnitVersionedFile,
1984
versionedfile_kwargs={
1985
'create_parent_dir': True,
1986
'delay_create': True,
1987
'dir_mode': control_files._dir_mode,
1991
def initialize(self, a_bzrdir, shared=False):
1992
"""Create a pack based repository.
1994
:param a_bzrdir: bzrdir to contain the new repository; must already
1996
:param shared: If true the repository will be initialized as a shared
1999
mutter('creating repository in %s.', a_bzrdir.transport.base)
2000
dirs = ['indices', 'obsolete_packs', 'packs', 'upload']
2001
builder = GraphIndexBuilder()
2002
files = [('pack-names', builder.finish())]
2003
utf8_files = [('format', self.get_format_string())]
2005
self._upload_blank_content(a_bzrdir, dirs, files, utf8_files, shared)
2006
return self.open(a_bzrdir=a_bzrdir, _found=True)
2008
def open(self, a_bzrdir, _found=False, _override_transport=None):
2009
"""See RepositoryFormat.open().
2011
:param _override_transport: INTERNAL USE ONLY. Allows opening the
2012
repository at a slightly different url
2013
than normal. I.e. during 'upgrade'.
2016
format = RepositoryFormat.find_format(a_bzrdir)
2017
assert format.__class__ == self.__class__
2018
if _override_transport is not None:
2019
repo_transport = _override_transport
2021
repo_transport = a_bzrdir.get_repository_transport(None)
2022
control_files = lockable_files.LockableFiles(repo_transport,
2023
'lock', lockdir.LockDir)
2024
text_store = self._get_text_store(repo_transport, control_files)
2025
control_store = self._get_control_store(repo_transport, control_files)
2026
_revision_store = self._get_revision_store(repo_transport, control_files)
2027
return self.repository_class(_format=self,
2029
control_files=control_files,
2030
_revision_store=_revision_store,
2031
control_store=control_store,
2032
text_store=text_store,
2033
_commit_builder_class=self._commit_builder_class,
2034
_serializer=self._serializer)
2037
class RepositoryFormatKnitPack1(RepositoryFormatPack):
2038
"""A no-subtrees parameterised Pack repository.
2040
This format was introduced in 0.92.
2043
repository_class = KnitPackRepository
2044
_commit_builder_class = PackCommitBuilder
2045
_serializer = xml5.serializer_v5
2047
def _get_matching_bzrdir(self):
2048
return bzrdir.format_registry.make_bzrdir('pack-0.92')
2050
def _ignore_setting_bzrdir(self, format):
2053
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2055
def get_format_string(self):
2056
"""See RepositoryFormat.get_format_string()."""
2057
return "Bazaar pack repository format 1 (needs bzr 0.92)\n"
2059
def get_format_description(self):
2060
"""See RepositoryFormat.get_format_description()."""
2061
return "Packs containing knits without subtree support"
2063
def check_conversion_target(self, target_format):
2067
class RepositoryFormatKnitPack3(RepositoryFormatPack):
2068
"""A subtrees parameterised Pack repository.
2070
This repository format uses the xml7 serializer to get:
2071
- support for recording full info about the tree root
2072
- support for recording tree-references
2074
This format was introduced in 0.92.
2077
repository_class = KnitPackRepository
2078
_commit_builder_class = PackRootCommitBuilder
2079
rich_root_data = True
2080
supports_tree_reference = True
2081
_serializer = xml7.serializer_v7
2083
def _get_matching_bzrdir(self):
2084
return bzrdir.format_registry.make_bzrdir(
2085
'pack-0.92-subtree')
2087
def _ignore_setting_bzrdir(self, format):
2090
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2092
def check_conversion_target(self, target_format):
2093
if not target_format.rich_root_data:
2094
raise errors.BadConversionTarget(
2095
'Does not support rich root data.', target_format)
2096
if not getattr(target_format, 'supports_tree_reference', False):
2097
raise errors.BadConversionTarget(
2098
'Does not support nested trees', target_format)
2100
def get_format_string(self):
2101
"""See RepositoryFormat.get_format_string()."""
2102
return "Bazaar pack repository format 1 with subtree support (needs bzr 0.92)\n"
2104
def get_format_description(self):
2105
"""See RepositoryFormat.get_format_description()."""
2106
return "Packs containing knits with subtree support\n"
2109
class RepositoryFormatKnitPack4(RepositoryFormatPack):
2110
"""A rich-root, no subtrees parameterised Pack repository.
2112
This repository format uses the xml6 serializer to get:
2113
- support for recording full info about the tree root
2115
This format was introduced in 1.0.
2118
repository_class = KnitPackRepository
2119
_commit_builder_class = PackRootCommitBuilder
2120
rich_root_data = True
2121
supports_tree_reference = False
2122
_serializer = xml6.serializer_v6
2124
def _get_matching_bzrdir(self):
2125
return bzrdir.format_registry.make_bzrdir(
2128
def _ignore_setting_bzrdir(self, format):
2131
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2133
def check_conversion_target(self, target_format):
2134
if not target_format.rich_root_data:
2135
raise errors.BadConversionTarget(
2136
'Does not support rich root data.', target_format)
2138
def get_format_string(self):
2139
"""See RepositoryFormat.get_format_string()."""
2140
return ("Bazaar pack repository format 1 with rich root"
2141
" (needs bzr 1.0)\n")
2143
def get_format_description(self):
2144
"""See RepositoryFormat.get_format_description()."""
2145
return "Packs containing knits with rich root support\n"