269
265
def __init__(self, name, revision_index, inventory_index, text_index,
270
266
signature_index, upload_transport, pack_transport, index_transport,
267
pack_collection, chk_index=None):
272
268
"""Create a ResumedPack object."""
273
269
ExistingPack.__init__(self, pack_transport, name, revision_index,
274
inventory_index, text_index, signature_index)
270
inventory_index, text_index, signature_index,
275
272
self.upload_transport = upload_transport
276
273
self.index_transport = index_transport
277
274
self.index_sizes = [None, None, None, None]
301
301
self.upload_transport.delete(self.file_name())
302
302
indices = [self.revision_index, self.inventory_index, self.text_index,
303
303
self.signature_index]
304
if self.chk_index is not None:
305
indices.append(self.chk_index)
304
306
for index in indices:
305
307
index._transport.delete(index._name)
307
309
def finish(self):
308
310
self._check_references()
309
new_name = '../packs/' + self.file_name()
310
self.upload_transport.rename(self.file_name(), new_name)
311
for index_type in ['revision', 'inventory', 'text', 'signature']:
311
index_types = ['revision', 'inventory', 'text', 'signature']
312
if self.chk_index is not None:
313
index_types.append('chk')
314
for index_type in index_types:
312
315
old_name = self.index_name(index_type, self.name)
313
316
new_name = '../indices/' + old_name
314
317
self.upload_transport.rename(old_name, new_name)
315
318
self._replace_index_with_readonly(index_type)
319
new_name = '../packs/' + self.file_name()
320
self.upload_transport.rename(self.file_name(), new_name)
316
321
self._state = 'finished'
318
323
def _get_external_refs(self, index):
324
"""Return compression parents for this index that are not present.
326
This returns any compression parents that are referenced by this index,
327
which are not contained *in* this index. They may be present elsewhere.
319
329
return index.external_references(1)
1486
1497
'containing %d revisions. Packing %d files into %d affecting %d'
1487
1498
' revisions', self, total_packs, total_revisions, num_old_packs,
1488
1499
num_new_packs, num_revs_affected)
1489
self._execute_pack_operations(pack_operations,
1500
result = self._execute_pack_operations(pack_operations,
1490
1501
reload_func=self._restart_autopack)
1491
1502
mutter('Auto-packing repository %s completed', self)
1494
1505
def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
1495
1506
reload_func=None):
1539
1551
def _already_packed(self):
1540
1552
"""Is the collection already packed?"""
1541
return len(self._names) < 2
1553
return not (self.repo._format.pack_compresses or (len(self._names) > 1))
1555
def pack(self, hint=None):
1544
1556
"""Pack the pack collection totally."""
1545
1557
self.ensure_loaded()
1546
1558
total_packs = len(self._names)
1547
1559
if self._already_packed():
1548
# This is arguably wrong because we might not be optimal, but for
1549
# now lets leave it in. (e.g. reconcile -> one pack. But not
1552
1561
total_revisions = self.revision_index.combined_index.key_count()
1553
1562
# XXX: the following may want to be a class, to pack with a given
1555
1564
mutter('Packing repository %s, which has %d pack files, '
1556
'containing %d revisions into 1 packs.', self, total_packs,
1565
'containing %d revisions with hint %r.', self, total_packs,
1566
total_revisions, hint)
1558
1567
# determine which packs need changing
1559
pack_distribution = [1]
1560
1568
pack_operations = [[0, []]]
1561
1569
for pack in self.all_packs():
1562
pack_operations[-1][0] += pack.get_revision_count()
1563
pack_operations[-1][1].append(pack)
1570
if not hint or pack.name in hint:
1571
pack_operations[-1][0] += pack.get_revision_count()
1572
pack_operations[-1][1].append(pack)
1564
1573
self._execute_pack_operations(pack_operations, OptimisingPacker)
1566
1575
def plan_autopack_combinations(self, existing_packs, pack_distribution):
1680
1689
inv_index = self._make_index(name, '.iix', resume=True)
1681
1690
txt_index = self._make_index(name, '.tix', resume=True)
1682
1691
sig_index = self._make_index(name, '.six', resume=True)
1683
result = ResumedPack(name, rev_index, inv_index, txt_index,
1684
sig_index, self._upload_transport, self._pack_transport,
1685
self._index_transport, self)
1692
if self.chk_index is not None:
1693
chk_index = self._make_index(name, '.cix', resume=True)
1696
result = self.resumed_pack_factory(name, rev_index, inv_index,
1697
txt_index, sig_index, self._upload_transport,
1698
self._pack_transport, self._index_transport, self,
1699
chk_index=chk_index)
1686
1700
except errors.NoSuchFile, e:
1687
1701
raise errors.UnresumableWriteGroup(self.repo, [name], str(e))
1688
1702
self.add_pack_to_memory(result)
2262
2278
self._pack_collection._start_write_group()
2264
2280
def _commit_write_group(self):
2281
self.revisions._index._key_dependencies.refs.clear()
2265
2282
return self._pack_collection._commit_write_group()
2267
2284
def suspend_write_group(self):
2268
2285
# XXX check self._write_group is self.get_transaction()?
2269
2286
tokens = self._pack_collection._suspend_write_group()
2287
self.revisions._index._key_dependencies.refs.clear()
2270
2288
self._write_group = None
2273
2291
def _resume_write_group(self, tokens):
2274
2292
self._start_write_group()
2275
self._pack_collection._resume_write_group(tokens)
2294
self._pack_collection._resume_write_group(tokens)
2295
except errors.UnresumableWriteGroup:
2296
self._abort_write_group()
2276
2298
for pack in self._pack_collection._resumed_packs:
2277
2299
self.revisions._index.scan_unvalidated_index(pack.revision_index)
2356
2377
transaction = self._transaction
2357
2378
self._transaction = None
2358
2379
transaction.finish()
2359
for repo in self._fallback_repositories:
2362
2381
self.control_files.unlock()
2383
if not self.is_locked():
2363
2384
for repo in self._fallback_repositories:
2388
class KnitPackStreamSource(StreamSource):
2389
"""A StreamSource used to transfer data between same-format KnitPack repos.
2391
This source assumes:
2392
1) Same serialization format for all objects
2393
2) Same root information
2394
3) XML format inventories
2395
4) Atomic inserts (so we can stream inventory texts before text
2400
def __init__(self, from_repository, to_format):
2401
super(KnitPackStreamSource, self).__init__(from_repository, to_format)
2402
self._text_keys = None
2403
self._text_fetch_order = 'unordered'
2405
def _get_filtered_inv_stream(self, revision_ids):
2406
from_repo = self.from_repository
2407
parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids)
2408
parent_keys = [(p,) for p in parent_ids]
2409
find_text_keys = from_repo._find_text_key_references_from_xml_inventory_lines
2410
parent_text_keys = set(find_text_keys(
2411
from_repo._inventory_xml_lines_for_keys(parent_keys)))
2412
content_text_keys = set()
2413
knit = KnitVersionedFiles(None, None)
2414
factory = KnitPlainFactory()
2415
def find_text_keys_from_content(record):
2416
if record.storage_kind not in ('knit-delta-gz', 'knit-ft-gz'):
2417
raise ValueError("Unknown content storage kind for"
2418
" inventory text: %s" % (record.storage_kind,))
2419
# It's a knit record, it has a _raw_record field (even if it was
2420
# reconstituted from a network stream).
2421
raw_data = record._raw_record
2422
# read the entire thing
2423
revision_id = record.key[-1]
2424
content, _ = knit._parse_record(revision_id, raw_data)
2425
if record.storage_kind == 'knit-delta-gz':
2426
line_iterator = factory.get_linedelta_content(content)
2427
elif record.storage_kind == 'knit-ft-gz':
2428
line_iterator = factory.get_fulltext_content(content)
2429
content_text_keys.update(find_text_keys(
2430
[(line, revision_id) for line in line_iterator]))
2431
revision_keys = [(r,) for r in revision_ids]
2432
def _filtered_inv_stream():
2433
source_vf = from_repo.inventories
2434
stream = source_vf.get_record_stream(revision_keys,
2436
for record in stream:
2437
if record.storage_kind == 'absent':
2438
raise errors.NoSuchRevision(from_repo, record.key)
2439
find_text_keys_from_content(record)
2441
self._text_keys = content_text_keys - parent_text_keys
2442
return ('inventories', _filtered_inv_stream())
2444
def _get_text_stream(self):
2445
# Note: We know we don't have to handle adding root keys, because both
2446
# the source and target are the identical network name.
2447
text_stream = self.from_repository.texts.get_record_stream(
2448
self._text_keys, self._text_fetch_order, False)
2449
return ('texts', text_stream)
2451
def get_stream(self, search):
2452
revision_ids = search.get_keys()
2453
for stream_info in self._fetch_revision_texts(revision_ids):
2455
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
2456
yield self._get_filtered_inv_stream(revision_ids)
2457
yield self._get_text_stream()
2367
2461
class RepositoryFormatPack(MetaDirRepositoryFormat):
2368
2462
"""Format logic for pack structured repositories.