92
87
timezone=None, committer=None, revprops=None,
93
88
revision_id=None, lossy=False):
94
89
VersionedFileCommitBuilder.__init__(self, repository, parents, config,
95
timestamp=timestamp, timezone=timezone, committer=committer,
96
revprops=revprops, revision_id=revision_id, lossy=lossy)
90
timestamp=timestamp, timezone=timezone, committer=committer,
91
revprops=revprops, revision_id=revision_id, lossy=lossy)
97
92
self._file_graph = graph.Graph(
98
93
repository._pack_collection.text_index.combined_index)
122
117
def __init__(self, revision_index, inventory_index, text_index,
123
signature_index, chk_index=None):
118
signature_index, chk_index=None):
124
119
"""Create a pack instance.
126
121
:param revision_index: A GraphIndex for determining what revisions are
157
152
missing_items = {}
158
153
for (index_name, external_refs, index) in [
160
self._get_external_refs(self.text_index),
161
self._pack_collection.text_index.combined_index),
163
self._get_external_refs(self.inventory_index),
164
self._pack_collection.inventory_index.combined_index),
155
self._get_external_refs(self.text_index),
156
self._pack_collection.text_index.combined_index),
158
self._get_external_refs(self.inventory_index),
159
self._pack_collection.inventory_index.combined_index),
166
161
missing = external_refs.difference(
167
162
k for (idx, k, v, r) in
168
163
index.iter_entries(external_refs))
211
206
if index_type == 'chk':
212
207
unlimited_cache = True
213
208
index = self.index_class(self.index_transport,
214
self.index_name(index_type, self.name),
215
self.index_sizes[self.index_offset(index_type)],
216
unlimited_cache=unlimited_cache)
209
self.index_name(index_type, self.name),
210
self.index_sizes[self.index_offset(
212
unlimited_cache=unlimited_cache)
217
213
if index_type == 'chk':
218
214
index._leaf_factory = btree_index._gcchk_factory
219
215
setattr(self, index_type + '_index', index)
226
222
def __hash__(self):
227
223
return hash((type(self), self.revision_index, self.inventory_index,
228
self.text_index, self.signature_index, self.chk_index))
224
self.text_index, self.signature_index, self.chk_index))
231
227
class ExistingPack(Pack):
232
228
"""An in memory proxy for an existing .pack and its disk indices."""
234
230
def __init__(self, pack_transport, name, revision_index, inventory_index,
235
text_index, signature_index, chk_index=None):
231
text_index, signature_index, chk_index=None):
236
232
"""Create an ExistingPack object.
238
234
:param pack_transport: The transport where the pack file resides.
239
235
:param name: The name of the pack on disk in the pack_transport.
241
237
Pack.__init__(self, revision_index, inventory_index, text_index,
242
signature_index, chk_index)
238
signature_index, chk_index)
244
240
self.pack_transport = pack_transport
245
241
if None in (revision_index, inventory_index, text_index,
246
signature_index, name, pack_transport):
242
signature_index, name, pack_transport):
247
243
raise AssertionError()
249
245
def __eq__(self, other):
264
260
class ResumedPack(ExistingPack):
266
262
def __init__(self, name, revision_index, inventory_index, text_index,
267
signature_index, upload_transport, pack_transport, index_transport,
268
pack_collection, chk_index=None):
263
signature_index, upload_transport, pack_transport, index_transport,
264
pack_collection, chk_index=None):
269
265
"""Create a ResumedPack object."""
270
266
ExistingPack.__init__(self, pack_transport, name, revision_index,
271
inventory_index, text_index, signature_index,
267
inventory_index, text_index, signature_index,
273
269
self.upload_transport = upload_transport
274
270
self.index_transport = index_transport
275
271
self.index_sizes = [None, None, None, None]
302
298
self.upload_transport.delete(self.file_name())
303
299
indices = [self.revision_index, self.inventory_index, self.text_index,
304
self.signature_index]
300
self.signature_index]
305
301
if self.chk_index is not None:
306
302
indices.append(self.chk_index)
307
303
for index in indices:
351
347
Pack.__init__(self,
352
# Revisions: parents list, no text compression.
353
index_builder_class(reference_lists=1),
354
# Inventory: We want to map compression only, but currently the
355
# knit code hasn't been updated enough to understand that, so we
356
# have a regular 2-list index giving parents and compression
358
index_builder_class(reference_lists=2),
359
# Texts: compression and per file graph, for all fileids - so two
360
# reference lists and two elements in the key tuple.
361
index_builder_class(reference_lists=2, key_elements=2),
362
# Signatures: Just blobs to store, no compression, no parents
364
index_builder_class(reference_lists=0),
365
# CHK based storage - just blobs, no compression or parents.
348
# Revisions: parents list, no text compression.
349
index_builder_class(reference_lists=1),
350
# Inventory: We want to map compression only, but currently the
351
# knit code hasn't been updated enough to understand that, so we
352
# have a regular 2-list index giving parents and compression
354
index_builder_class(reference_lists=2),
355
# Texts: compression and per file graph, for all fileids - so two
356
# reference lists and two elements in the key tuple.
357
index_builder_class(reference_lists=2, key_elements=2),
358
# Signatures: Just blobs to store, no compression, no parents
360
index_builder_class(reference_lists=0),
361
# CHK based storage - just blobs, no compression or parents.
368
364
self._pack_collection = pack_collection
369
365
# When we make readonly indices, we need this.
370
366
self.index_class = pack_collection._index_class
395
391
self.random_name, mode=self._file_mode)
396
392
if 'pack' in debug.debug_flags:
397
393
mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
398
time.ctime(), self.upload_transport.base, self.random_name,
399
time.time() - self.start_time)
394
time.ctime(), self.upload_transport.base, self.random_name,
395
time.time() - self.start_time)
400
396
# A list of byte sequences to be written to the new pack, and the
401
397
# aggregate size of them. Stored as a list rather than separate
402
398
# variables so that the _write_data closure below can update them.
406
402
# robertc says- this is a closure rather than a method on the object
407
403
# so that the variables are locals, and faster than accessing object
409
406
def _write_data(bytes, flush=False, _buffer=self._buffer,
410
_write=self.write_stream.write, _update=self._hash.update):
407
_write=self.write_stream.write, _update=self._hash.update):
411
408
_buffer[0].append(bytes)
412
409
_buffer[1] += len(bytes)
446
443
def data_inserted(self):
447
444
"""True if data has been added to this pack."""
448
445
return bool(self.get_revision_count() or
449
self.inventory_index.key_count() or
450
self.text_index.key_count() or
451
self.signature_index.key_count() or
452
(self.chk_index is not None and self.chk_index.key_count()))
446
self.inventory_index.key_count() or
447
self.text_index.key_count() or
448
self.signature_index.key_count() or
449
(self.chk_index is not None and self.chk_index.key_count()))
454
451
def finish_content(self):
455
452
if self.name is not None:
480
477
# they're in the names list.
481
478
self.index_sizes = [None, None, None, None]
482
479
self._write_index('revision', self.revision_index, 'revision',
484
481
self._write_index('inventory', self.inventory_index, 'inventory',
486
483
self._write_index('text', self.text_index, 'file texts', suspend)
487
484
self._write_index('signature', self.signature_index,
488
'revision signatures', suspend)
485
'revision signatures', suspend)
489
486
if self.chk_index is not None:
490
487
self.index_sizes.append(None)
491
488
self._write_index('chk', self.chk_index,
492
'content hash bytes', suspend)
489
'content hash bytes', suspend)
493
490
self.write_stream.close(
494
491
want_fdatasync=self._pack_collection.config_stack.get('repository.fdatasync'))
495
492
# Note that this will clobber an existing pack with the same name,
511
508
if 'pack' in debug.debug_flags:
512
509
# XXX: size might be interesting?
513
510
mutter('%s: create_pack: pack finished: %s%s->%s t+%6.3fs',
514
time.ctime(), self.upload_transport.base, self.random_name,
515
new_name, time.time() - self.start_time)
511
time.ctime(), self.upload_transport.base, self.random_name,
512
new_name, time.time() - self.start_time)
518
515
"""Flush any current data."""
543
540
index_tempfile = index.finish()
544
541
index_bytes = index_tempfile.read()
545
542
write_stream = transport.open_write_stream(index_name,
546
mode=self._file_mode)
543
mode=self._file_mode)
547
544
write_stream.write(index_bytes)
548
545
write_stream.close(
549
546
want_fdatasync=self._pack_collection.config_stack.get('repository.fdatasync'))
551
548
if 'pack' in debug.debug_flags:
552
549
# XXX: size might be interesting?
553
550
mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
554
time.ctime(), label, self.upload_transport.base,
555
self.random_name, time.time() - self.start_time)
551
time.ctime(), label, self.upload_transport.base,
552
self.random_name, time.time() - self.start_time)
556
553
# Replace the writable index on this object with a readonly,
557
554
# presently unloaded index. We should alter
558
555
# the index layer to make its finish() error if add_node is
616
613
if self.add_callback is not None:
617
614
raise AssertionError(
618
"%s already has a writable index through %s" % \
615
"%s already has a writable index through %s" %
619
616
(self, self.add_callback))
620
617
# allow writing: queue writes to a new index
621
618
self.add_index(index, pack)
641
638
del self.combined_index._indices[pos]
642
639
del self.combined_index._index_names[pos]
643
640
if (self.add_callback is not None and
644
getattr(index, 'add_nodes', None) == self.add_callback):
641
getattr(index, 'add_nodes', None) == self.add_callback):
645
642
self.add_callback = None
646
643
self.data_access.set_writer(None, None, (None, None))
719
716
def open_pack(self):
720
717
"""Open a pack for the pack we are creating."""
721
718
new_pack = self._pack_collection.pack_factory(self._pack_collection,
722
upload_suffix=self.suffix,
723
file_mode=self._pack_collection.repo.controldir._get_file_mode())
719
upload_suffix=self.suffix,
720
file_mode=self._pack_collection.repo.controldir._get_file_mode())
724
721
# We know that we will process all nodes in order, and don't need to
725
722
# query, so don't combine any indices spilled to disk until we are done
726
723
new_pack.revision_index.set_optimize(combine_backing_indices=False)
751
748
def _log_copied_texts(self):
752
749
if 'pack' in debug.debug_flags:
753
750
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
754
time.ctime(), self._pack_collection._upload_transport.base,
755
self.new_pack.random_name,
756
self.new_pack.text_index.key_count(),
757
time.time() - self.new_pack.start_time)
751
time.ctime(), self._pack_collection._upload_transport.base,
752
self.new_pack.random_name,
753
self.new_pack.text_index.key_count(),
754
time.time() - self.new_pack.start_time)
759
756
def _use_pack(self, new_pack):
760
757
"""Return True if new_pack should be used.
800
797
self._index_builder_class = index_builder_class
801
798
self._index_class = index_class
802
799
self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3,
805
802
# name:Pack mapping
806
803
self._names = None
816
813
self.text_index = AggregateIndex(self.reload_pack_names, flush)
817
814
self.signature_index = AggregateIndex(self.reload_pack_names, flush)
818
815
all_indices = [self.revision_index, self.inventory_index,
819
self.text_index, self.signature_index]
816
self.text_index, self.signature_index]
820
817
if use_chk_index:
821
818
self.chk_index = AggregateIndex(self.reload_pack_names, flush)
822
819
all_indices.append(self.chk_index)
919
916
num_old_packs = sum([len(po[1]) for po in pack_operations])
920
917
num_revs_affected = sum([po[0] for po in pack_operations])
921
918
mutter('Auto-packing repository %s, which has %d pack files, '
922
'containing %d revisions. Packing %d files into %d affecting %d'
923
' revisions', str(self), total_packs, total_revisions, num_old_packs,
924
num_new_packs, num_revs_affected)
919
'containing %d revisions. Packing %d files into %d affecting %d'
921
self), total_packs, total_revisions, num_old_packs,
922
num_new_packs, num_revs_affected)
925
923
result = self._execute_pack_operations(pack_operations, packer_class=self.normal_packer_class,
926
reload_func=self._restart_autopack)
924
reload_func=self._restart_autopack)
927
925
mutter('Auto-packing repository %s completed', str(self))
930
928
def _execute_pack_operations(self, pack_operations, packer_class,
932
930
"""Execute a series of pack operations.
934
932
:param pack_operations: A list of [revision_count, packs_to_combine].
990
988
# XXX: the following may want to be a class, to pack with a given
992
990
mutter('Packing repository %s, which has %d pack files, '
993
'containing %d revisions with hint %r.', str(self), total_packs,
994
total_revisions, hint)
991
'containing %d revisions with hint %r.', str(self), total_packs,
992
total_revisions, hint)
997
995
self._try_pack_operations(hint)
1015
1013
pack_operations[-1][0] += pack.get_revision_count()
1016
1014
pack_operations[-1][1].append(pack)
1017
1015
self._execute_pack_operations(pack_operations,
1018
packer_class=self.optimising_packer_class,
1019
reload_func=self._restart_pack_operations)
1016
packer_class=self.optimising_packer_class,
1017
reload_func=self._restart_pack_operations)
1021
1019
def plan_autopack_combinations(self, existing_packs, pack_distribution):
1022
1020
"""Plan a pack operation.
1066
1064
final_pack_list.extend(pack_files)
1067
1065
if len(final_pack_list) == 1:
1068
1066
raise AssertionError('We somehow generated an autopack with a'
1069
' single pack file being moved.')
1067
' single pack file being moved.')
1071
1069
return [[final_rev_count, final_pack_list]]
1116
1114
chk_index = None
1117
1115
result = ExistingPack(self._pack_transport, name, rev_index,
1118
inv_index, txt_index, sig_index, chk_index)
1116
inv_index, txt_index, sig_index, chk_index)
1119
1117
self.add_pack_to_memory(result)
1142
1140
chk_index = None
1143
1141
result = self.resumed_pack_factory(name, rev_index, inv_index,
1144
txt_index, sig_index, self._upload_transport,
1145
self._pack_transport, self._index_transport, self,
1146
chk_index=chk_index)
1142
txt_index, sig_index, self._upload_transport,
1143
self._pack_transport, self._index_transport, self,
1144
chk_index=chk_index)
1147
1145
except errors.NoSuchFile as e:
1148
1146
raise errors.UnresumableWriteGroup(self.repo, [name], str(e))
1149
1147
self.add_pack_to_memory(result)
1171
1169
:return: An iterator of the index contents.
1173
1171
return self._index_class(self.transport, 'pack-names', None
1174
).iter_all_entries()
1172
).iter_all_entries()
1176
1174
def _make_index(self, name, suffix, resume=False, is_chk=False):
1177
1175
size_offset = self._suffix_offsets[suffix]
1184
1182
index_size = self._names[name][size_offset]
1185
1183
index = self._index_class(transport, index_name, index_size,
1186
1184
unlimited_cache=is_chk)
1187
if is_chk and self._index_class is btree_index.BTreeGraphIndex:
1185
if is_chk and self._index_class is btree_index.BTreeGraphIndex:
1188
1186
index._leaf_factory = btree_index._gcchk_factory
1225
1223
pack.pack_transport.move(pack.file_name(),
1226
'../obsolete_packs/' + pack.file_name())
1224
'../obsolete_packs/' + pack.file_name())
1227
1225
except errors.NoSuchFile:
1228
1226
# perhaps obsolete_packs was removed? Let's create it and
1232
1230
except errors.FileExists:
1234
1232
pack.pack_transport.move(pack.file_name(),
1235
'../obsolete_packs/' + pack.file_name())
1233
'../obsolete_packs/' + pack.file_name())
1236
1234
except (errors.PathError, errors.TransportError) as e:
1237
1235
# TODO: Should these be warnings or mutters?
1238
1236
mutter("couldn't rename obsolete pack, skipping it:\n%s"
1246
1244
for suffix in suffixes:
1248
1246
self._index_transport.move(pack.name + suffix,
1249
'../obsolete_packs/' + pack.name + suffix)
1247
'../obsolete_packs/' + pack.name + suffix)
1250
1248
except (errors.PathError, errors.TransportError) as e:
1251
1249
mutter("couldn't rename obsolete index, skipping it:\n%s"
1425
1423
for name, value in disk_nodes:
1426
1424
builder.add_node((name.encode('ascii'), ), value)
1427
1425
self.transport.put_file('pack-names', builder.finish(),
1428
mode=self.repo.controldir._get_file_mode())
1426
mode=self.repo.controldir._get_file_mode())
1429
1427
self._packs_at_load = disk_nodes
1430
1428
if clear_obsolete_packs:
1431
1429
to_preserve = None
1457
1455
:return: True if the in-memory list of packs has been altered at all.
1459
1457
# The ensure_loaded call is to handle the case where the first call
1460
# made involving the collection was to reload_pack_names, where we
1458
# made involving the collection was to reload_pack_names, where we
1461
1459
# don't have a view of disk contents. It's a bit of a bandaid, and
1462
1460
# causes two reads of pack-names, but it's a rare corner case not
1463
1461
# struck with regular push/pull etc.
1525
1523
if not self.repo.is_write_locked():
1526
1524
raise errors.NotWriteLocked(self)
1527
1525
self._new_pack = self.pack_factory(self, upload_suffix='.pack',
1528
file_mode=self.repo.controldir._get_file_mode())
1526
file_mode=self.repo.controldir._get_file_mode())
1529
1527
# allow writing: queue writes to a new index
1530
1528
self.revision_index.add_writable_index(self._new_pack.revision_index,
1532
1530
self.inventory_index.add_writable_index(self._new_pack.inventory_index,
1534
1532
self.text_index.add_writable_index(self._new_pack.text_index,
1536
1534
self._new_pack.text_index.set_optimize(combine_backing_indices=False)
1537
1535
self.signature_index.add_writable_index(self._new_pack.signature_index,
1539
1537
if self.chk_index is not None:
1540
1538
self.chk_index.add_writable_index(self._new_pack.chk_index,
1542
1540
self.repo.chk_bytes._index._add_callback = self.chk_index.add_callback
1543
self._new_pack.chk_index.set_optimize(combine_backing_indices=False)
1541
self._new_pack.chk_index.set_optimize(
1542
combine_backing_indices=False)
1545
1544
self.repo.inventories._index._add_callback = self.inventory_index.add_callback
1546
1545
self.repo.revisions._index._add_callback = self.revision_index.add_callback
1558
1557
# already gone. But they're not there we shouldn't fail in this
1559
1558
# case, so we pass ignore_missing=True.
1560
1559
operation.add_cleanup(self._remove_pack_indices, self._new_pack,
1561
ignore_missing=True)
1560
ignore_missing=True)
1562
1561
operation.run_simple()
1563
1562
for resumed_pack in self._resumed_packs:
1564
1563
operation = cleanup.OperationWithCleanups(resumed_pack.abort)
1565
1564
# See comment in previous finally block.
1566
1565
operation.add_cleanup(self._remove_pack_indices, resumed_pack,
1567
ignore_missing=True)
1566
ignore_missing=True)
1568
1567
operation.run_simple()
1569
1568
del self._resumed_packs[:]
1596
1595
if all_missing:
1597
1596
raise errors.BzrCheckError(
1598
1597
"Repository %s has missing compression parent(s) %r "
1599
% (self.repo, sorted(all_missing)))
1598
% (self.repo, sorted(all_missing)))
1600
1599
problems = self._check_new_inventories()
1602
1601
problems_summary = '\n'.join(problems)
1683
1682
_serializer = None
1685
1684
def __init__(self, _format, a_controldir, control_files, _commit_builder_class,
1687
1686
MetaDirRepository.__init__(self, _format, a_controldir, control_files)
1688
1687
self._commit_builder_class = _commit_builder_class
1689
1688
self._serializer = _serializer
1815
1814
recompress deltas or do other such expensive operations.
1817
1816
with self.lock_write():
1818
self._pack_collection.pack(hint=hint, clean_obsolete_packs=clean_obsolete_packs)
1817
self._pack_collection.pack(
1818
hint=hint, clean_obsolete_packs=clean_obsolete_packs)
1820
1820
def reconcile(self, other=None, thorough=False):
1821
1821
"""Reconcile this repository."""
1906
1906
files = [('pack-names', builder.finish())]
1907
1907
utf8_files = [('format', self.get_format_string())]
1909
self._upload_blank_content(a_controldir, dirs, files, utf8_files, shared)
1909
self._upload_blank_content(
1910
a_controldir, dirs, files, utf8_files, shared)
1910
1911
repository = self.open(a_controldir=a_controldir, _found=True)
1911
1912
self._run_post_repo_init_hooks(repository, a_controldir, shared)
1912
1913
return repository
1926
1927
repo_transport = a_controldir.get_repository_transport(None)
1927
1928
control_files = lockable_files.LockableFiles(repo_transport,
1928
'lock', lockdir.LockDir)
1929
'lock', lockdir.LockDir)
1929
1930
return self.repository_class(_format=self,
1930
a_controldir=a_controldir,
1931
control_files=control_files,
1932
_commit_builder_class=self._commit_builder_class,
1933
_serializer=self._serializer)
1931
a_controldir=a_controldir,
1932
control_files=control_files,
1933
_commit_builder_class=self._commit_builder_class,
1934
_serializer=self._serializer)
1936
1937
class RetryPackOperations(errors.RetryWithNewPacks):
1986
1987
for key, size in key_sizes:
1987
1988
p_offset, p_length = self._container_writer.add_bytes_record(
1988
raw_data[offset:offset+size], [])
1989
raw_data[offset:offset + size], [])
1990
1991
result.append((self._write_index, p_offset, p_length))