63
67
from ..sixish import (
67
70
from ..bzr.vf_repository import (
68
71
MetaDirVersionedFileRepository,
69
72
MetaDirVersionedFileRepositoryFormat,
70
73
VersionedFileCommitBuilder,
74
VersionedFileRootCommitBuilder,
72
76
from ..trace import (
87
91
timezone=None, committer=None, revprops=None,
88
92
revision_id=None, lossy=False):
89
93
VersionedFileCommitBuilder.__init__(self, repository, parents, config,
90
timestamp=timestamp, timezone=timezone, committer=committer,
91
revprops=revprops, revision_id=revision_id, lossy=lossy)
94
timestamp=timestamp, timezone=timezone, committer=committer,
95
revprops=revprops, revision_id=revision_id, lossy=lossy)
96
self._file_graph = graph.Graph(
97
repository._pack_collection.text_index.combined_index)
99
def _heads(self, file_id, revision_ids):
100
keys = [(file_id, revision_id) for revision_id in revision_ids]
101
return {key[1] for key in self._file_graph.heads(keys)}
104
class PackRootCommitBuilder(VersionedFileRootCommitBuilder):
105
"""A subclass of RootCommitBuilder to add texts with pack semantics.
107
Specifically this uses one knit object rather than one knit object per
108
added text, reducing memory and object pressure.
111
def __init__(self, repository, parents, config, timestamp=None,
112
timezone=None, committer=None, revprops=None,
113
revision_id=None, lossy=False):
114
super(PackRootCommitBuilder, self).__init__(repository, parents,
115
config, timestamp=timestamp, timezone=timezone,
116
committer=committer, revprops=revprops, revision_id=revision_id,
92
118
self._file_graph = graph.Graph(
93
119
repository._pack_collection.text_index.combined_index)
117
143
def __init__(self, revision_index, inventory_index, text_index,
118
signature_index, chk_index=None):
144
signature_index, chk_index=None):
119
145
"""Create a pack instance.
121
147
:param revision_index: A GraphIndex for determining what revisions are
152
178
missing_items = {}
153
179
for (index_name, external_refs, index) in [
155
self._get_external_refs(self.text_index),
156
self._pack_collection.text_index.combined_index),
158
self._get_external_refs(self.inventory_index),
159
self._pack_collection.inventory_index.combined_index),
181
self._get_external_refs(self.text_index),
182
self._pack_collection.text_index.combined_index),
184
self._get_external_refs(self.inventory_index),
185
self._pack_collection.inventory_index.combined_index),
161
187
missing = external_refs.difference(
162
188
k for (idx, k, v, r) in
163
189
index.iter_entries(external_refs))
206
232
if index_type == 'chk':
207
233
unlimited_cache = True
208
234
index = self.index_class(self.index_transport,
209
self.index_name(index_type, self.name),
210
self.index_sizes[self.index_offset(
212
unlimited_cache=unlimited_cache)
235
self.index_name(index_type, self.name),
236
self.index_sizes[self.index_offset(index_type)],
237
unlimited_cache=unlimited_cache)
213
238
if index_type == 'chk':
214
239
index._leaf_factory = btree_index._gcchk_factory
215
240
setattr(self, index_type + '_index', index)
217
def __lt__(self, other):
218
if not isinstance(other, Pack):
219
raise TypeError(other)
220
return (id(self) < id(other))
223
return hash((type(self), self.revision_index, self.inventory_index,
224
self.text_index, self.signature_index, self.chk_index))
227
243
class ExistingPack(Pack):
228
244
"""An in memory proxy for an existing .pack and its disk indices."""
230
246
def __init__(self, pack_transport, name, revision_index, inventory_index,
231
text_index, signature_index, chk_index=None):
247
text_index, signature_index, chk_index=None):
232
248
"""Create an ExistingPack object.
234
250
:param pack_transport: The transport where the pack file resides.
235
251
:param name: The name of the pack on disk in the pack_transport.
237
253
Pack.__init__(self, revision_index, inventory_index, text_index,
238
signature_index, chk_index)
254
signature_index, chk_index)
240
256
self.pack_transport = pack_transport
241
257
if None in (revision_index, inventory_index, text_index,
242
signature_index, name, pack_transport):
258
signature_index, name, pack_transport):
243
259
raise AssertionError()
245
261
def __eq__(self, other):
253
269
self.__class__.__module__, self.__class__.__name__, id(self),
254
270
self.pack_transport, self.name)
257
return hash((type(self), self.name))
260
273
class ResumedPack(ExistingPack):
262
275
def __init__(self, name, revision_index, inventory_index, text_index,
263
signature_index, upload_transport, pack_transport, index_transport,
264
pack_collection, chk_index=None):
276
signature_index, upload_transport, pack_transport, index_transport,
277
pack_collection, chk_index=None):
265
278
"""Create a ResumedPack object."""
266
279
ExistingPack.__init__(self, pack_transport, name, revision_index,
267
inventory_index, text_index, signature_index,
280
inventory_index, text_index, signature_index,
269
282
self.upload_transport = upload_transport
270
283
self.index_transport = index_transport
271
284
self.index_sizes = [None, None, None, None]
347
360
Pack.__init__(self,
348
# Revisions: parents list, no text compression.
349
index_builder_class(reference_lists=1),
350
# Inventory: We want to map compression only, but currently the
351
# knit code hasn't been updated enough to understand that, so we
352
# have a regular 2-list index giving parents and compression
354
index_builder_class(reference_lists=2),
355
# Texts: compression and per file graph, for all fileids - so two
356
# reference lists and two elements in the key tuple.
357
index_builder_class(reference_lists=2, key_elements=2),
358
# Signatures: Just blobs to store, no compression, no parents
360
index_builder_class(reference_lists=0),
361
# CHK based storage - just blobs, no compression or parents.
361
# Revisions: parents list, no text compression.
362
index_builder_class(reference_lists=1),
363
# Inventory: We want to map compression only, but currently the
364
# knit code hasn't been updated enough to understand that, so we
365
# have a regular 2-list index giving parents and compression
367
index_builder_class(reference_lists=2),
368
# Texts: compression and per file graph, for all fileids - so two
369
# reference lists and two elements in the key tuple.
370
index_builder_class(reference_lists=2, key_elements=2),
371
# Signatures: Just blobs to store, no compression, no parents
373
index_builder_class(reference_lists=0),
374
# CHK based storage - just blobs, no compression or parents.
364
377
self._pack_collection = pack_collection
365
378
# When we make readonly indices, we need this.
366
379
self.index_class = pack_collection._index_class
391
404
self.random_name, mode=self._file_mode)
392
405
if 'pack' in debug.debug_flags:
393
406
mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
394
time.ctime(), self.upload_transport.base, self.random_name,
395
time.time() - self.start_time)
407
time.ctime(), self.upload_transport.base, self.random_name,
408
time.time() - self.start_time)
396
409
# A list of byte sequences to be written to the new pack, and the
397
410
# aggregate size of them. Stored as a list rather than separate
398
411
# variables so that the _write_data closure below can update them.
402
415
# robertc says- this is a closure rather than a method on the object
403
416
# so that the variables are locals, and faster than accessing object
406
418
def _write_data(bytes, flush=False, _buffer=self._buffer,
407
_write=self.write_stream.write, _update=self._hash.update):
419
_write=self.write_stream.write, _update=self._hash.update):
408
420
_buffer[0].append(bytes)
409
421
_buffer[1] += len(bytes)
443
455
def data_inserted(self):
444
456
"""True if data has been added to this pack."""
445
457
return bool(self.get_revision_count() or
446
self.inventory_index.key_count() or
447
self.text_index.key_count() or
448
self.signature_index.key_count() or
449
(self.chk_index is not None and self.chk_index.key_count()))
458
self.inventory_index.key_count() or
459
self.text_index.key_count() or
460
self.signature_index.key_count() or
461
(self.chk_index is not None and self.chk_index.key_count()))
451
463
def finish_content(self):
452
464
if self.name is not None:
477
489
# they're in the names list.
478
490
self.index_sizes = [None, None, None, None]
479
491
self._write_index('revision', self.revision_index, 'revision',
481
493
self._write_index('inventory', self.inventory_index, 'inventory',
483
495
self._write_index('text', self.text_index, 'file texts', suspend)
484
496
self._write_index('signature', self.signature_index,
485
'revision signatures', suspend)
497
'revision signatures', suspend)
486
498
if self.chk_index is not None:
487
499
self.index_sizes.append(None)
488
500
self._write_index('chk', self.chk_index,
489
'content hash bytes', suspend)
501
'content hash bytes', suspend)
490
502
self.write_stream.close(
491
503
want_fdatasync=self._pack_collection.config_stack.get('repository.fdatasync'))
492
504
# Note that this will clobber an existing pack with the same name,
508
520
if 'pack' in debug.debug_flags:
509
521
# XXX: size might be interesting?
510
522
mutter('%s: create_pack: pack finished: %s%s->%s t+%6.3fs',
511
time.ctime(), self.upload_transport.base, self.random_name,
512
new_name, time.time() - self.start_time)
523
time.ctime(), self.upload_transport.base, self.random_name,
524
new_name, time.time() - self.start_time)
515
527
"""Flush any current data."""
540
552
index_tempfile = index.finish()
541
553
index_bytes = index_tempfile.read()
542
554
write_stream = transport.open_write_stream(index_name,
543
mode=self._file_mode)
555
mode=self._file_mode)
544
556
write_stream.write(index_bytes)
545
557
write_stream.close(
546
558
want_fdatasync=self._pack_collection.config_stack.get('repository.fdatasync'))
548
560
if 'pack' in debug.debug_flags:
549
561
# XXX: size might be interesting?
550
562
mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
551
time.ctime(), label, self.upload_transport.base,
552
self.random_name, time.time() - self.start_time)
563
time.ctime(), label, self.upload_transport.base,
564
self.random_name, time.time() - self.start_time)
553
565
# Replace the writable index on this object with a readonly,
554
566
# presently unloaded index. We should alter
555
567
# the index layer to make its finish() error if add_node is
613
625
if self.add_callback is not None:
614
626
raise AssertionError(
615
"%s already has a writable index through %s" %
627
"%s already has a writable index through %s" % \
616
628
(self, self.add_callback))
617
629
# allow writing: queue writes to a new index
618
630
self.add_index(index, pack)
638
650
del self.combined_index._indices[pos]
639
651
del self.combined_index._index_names[pos]
640
652
if (self.add_callback is not None and
641
getattr(index, 'add_nodes', None) == self.add_callback):
653
getattr(index, 'add_nodes', None) == self.add_callback):
642
654
self.add_callback = None
643
655
self.data_access.set_writer(None, None, (None, None))
716
728
def open_pack(self):
717
729
"""Open a pack for the pack we are creating."""
718
730
new_pack = self._pack_collection.pack_factory(self._pack_collection,
719
upload_suffix=self.suffix,
720
file_mode=self._pack_collection.repo.controldir._get_file_mode())
731
upload_suffix=self.suffix,
732
file_mode=self._pack_collection.repo.controldir._get_file_mode())
721
733
# We know that we will process all nodes in order, and don't need to
722
734
# query, so don't combine any indices spilled to disk until we are done
723
735
new_pack.revision_index.set_optimize(combine_backing_indices=False)
748
760
def _log_copied_texts(self):
749
761
if 'pack' in debug.debug_flags:
750
762
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
751
time.ctime(), self._pack_collection._upload_transport.base,
752
self.new_pack.random_name,
753
self.new_pack.text_index.key_count(),
754
time.time() - self.new_pack.start_time)
763
time.ctime(), self._pack_collection._upload_transport.base,
764
self.new_pack.random_name,
765
self.new_pack.text_index.key_count(),
766
time.time() - self.new_pack.start_time)
756
768
def _use_pack(self, new_pack):
757
769
"""Return True if new_pack should be used.
813
825
self.text_index = AggregateIndex(self.reload_pack_names, flush)
814
826
self.signature_index = AggregateIndex(self.reload_pack_names, flush)
815
827
all_indices = [self.revision_index, self.inventory_index,
816
self.text_index, self.signature_index]
828
self.text_index, self.signature_index]
817
829
if use_chk_index:
818
830
self.chk_index = AggregateIndex(self.reload_pack_names, flush)
819
831
all_indices.append(self.chk_index)
916
928
num_old_packs = sum([len(po[1]) for po in pack_operations])
917
929
num_revs_affected = sum([po[0] for po in pack_operations])
918
930
mutter('Auto-packing repository %s, which has %d pack files, '
919
'containing %d revisions. Packing %d files into %d affecting %d'
921
self), total_packs, total_revisions, num_old_packs,
922
num_new_packs, num_revs_affected)
931
'containing %d revisions. Packing %d files into %d affecting %d'
932
' revisions', self, total_packs, total_revisions, num_old_packs,
933
num_new_packs, num_revs_affected)
923
934
result = self._execute_pack_operations(pack_operations, packer_class=self.normal_packer_class,
924
reload_func=self._restart_autopack)
925
mutter('Auto-packing repository %s completed', str(self))
935
reload_func=self._restart_autopack)
936
mutter('Auto-packing repository %s completed', self)
928
939
def _execute_pack_operations(self, pack_operations, packer_class,
930
941
"""Execute a series of pack operations.
932
943
:param pack_operations: A list of [revision_count, packs_to_combine].
988
999
# XXX: the following may want to be a class, to pack with a given
990
1001
mutter('Packing repository %s, which has %d pack files, '
991
'containing %d revisions with hint %r.', str(self), total_packs,
992
total_revisions, hint)
1002
'containing %d revisions with hint %r.', self, total_packs,
1003
total_revisions, hint)
995
1006
self._try_pack_operations(hint)
1013
1024
pack_operations[-1][0] += pack.get_revision_count()
1014
1025
pack_operations[-1][1].append(pack)
1015
1026
self._execute_pack_operations(pack_operations,
1016
packer_class=self.optimising_packer_class,
1017
reload_func=self._restart_pack_operations)
1027
packer_class=self.optimising_packer_class,
1028
reload_func=self._restart_pack_operations)
1019
1030
def plan_autopack_combinations(self, existing_packs, pack_distribution):
1020
1031
"""Plan a pack operation.
1064
1075
final_pack_list.extend(pack_files)
1065
1076
if len(final_pack_list) == 1:
1066
1077
raise AssertionError('We somehow generated an autopack with a'
1067
' single pack file being moved.')
1078
' single pack file being moved.')
1069
1080
return [[final_rev_count, final_pack_list]]
1081
1092
self._names = {}
1082
1093
self._packs_at_load = set()
1083
1094
for index, key, value in self._iter_disk_pack_index():
1084
name = key[0].decode('ascii')
1085
1096
self._names[name] = self._parse_index_sizes(value)
1086
self._packs_at_load.add((name, value))
1097
self._packs_at_load.add((key, value))
1094
1105
def _parse_index_sizes(self, value):
1095
1106
"""Parse a string of index sizes."""
1096
return tuple(int(digits) for digits in value.split(b' '))
1107
return tuple([int(digits) for digits in value.split(' ')])
1098
1109
def get_pack_by_name(self, name):
1099
1110
"""Get a Pack object by name.
1114
1125
chk_index = None
1115
1126
result = ExistingPack(self._pack_transport, name, rev_index,
1116
inv_index, txt_index, sig_index, chk_index)
1127
inv_index, txt_index, sig_index, chk_index)
1117
1128
self.add_pack_to_memory(result)
1140
1151
chk_index = None
1141
1152
result = self.resumed_pack_factory(name, rev_index, inv_index,
1142
txt_index, sig_index, self._upload_transport,
1143
self._pack_transport, self._index_transport, self,
1144
chk_index=chk_index)
1153
txt_index, sig_index, self._upload_transport,
1154
self._pack_transport, self._index_transport, self,
1155
chk_index=chk_index)
1145
1156
except errors.NoSuchFile as e:
1146
1157
raise errors.UnresumableWriteGroup(self.repo, [name], str(e))
1147
1158
self.add_pack_to_memory(result)
1169
1180
:return: An iterator of the index contents.
1171
1182
return self._index_class(self.transport, 'pack-names', None
1172
).iter_all_entries()
1183
).iter_all_entries()
1174
1185
def _make_index(self, name, suffix, resume=False, is_chk=False):
1175
1186
size_offset = self._suffix_offsets[suffix]
1182
1193
index_size = self._names[name][size_offset]
1183
1194
index = self._index_class(transport, index_name, index_size,
1184
1195
unlimited_cache=is_chk)
1185
if is_chk and self._index_class is btree_index.BTreeGraphIndex:
1196
if is_chk and self._index_class is btree_index.BTreeGraphIndex:
1186
1197
index._leaf_factory = btree_index._gcchk_factory
1223
1234
pack.pack_transport.move(pack.file_name(),
1224
'../obsolete_packs/' + pack.file_name())
1235
'../obsolete_packs/' + pack.file_name())
1225
1236
except errors.NoSuchFile:
1226
1237
# perhaps obsolete_packs was removed? Let's create it and
1230
1241
except errors.FileExists:
1232
1243
pack.pack_transport.move(pack.file_name(),
1233
'../obsolete_packs/' + pack.file_name())
1244
'../obsolete_packs/' + pack.file_name())
1234
1245
except (errors.PathError, errors.TransportError) as e:
1235
1246
# TODO: Should these be warnings or mutters?
1236
1247
mutter("couldn't rename obsolete pack, skipping it:\n%s"
1244
1255
for suffix in suffixes:
1246
1257
self._index_transport.move(pack.name + suffix,
1247
'../obsolete_packs/' + pack.name + suffix)
1258
'../obsolete_packs/' + pack.name + suffix)
1248
1259
except (errors.PathError, errors.TransportError) as e:
1249
1260
mutter("couldn't rename obsolete index, skipping it:\n%s"
1332
1343
# load the disk nodes across
1333
1344
disk_nodes = set()
1334
1345
for index, key, value in self._iter_disk_pack_index():
1335
disk_nodes.add((key[0].decode('ascii'), value))
1346
disk_nodes.add((key, value))
1336
1347
orig_disk_nodes = set(disk_nodes)
1338
1349
# do a two-way diff against our original content
1339
1350
current_nodes = set()
1340
for name, sizes in viewitems(self._names):
1351
for name, sizes in self._names.items():
1341
1352
current_nodes.add(
1342
(name, b' '.join(b'%d' % size for size in sizes)))
1353
((name, ), ' '.join(str(size) for size in sizes)))
1344
1355
# Packs no longer present in the repository, which were present when we
1345
1356
# locked the repository
1369
1380
new_names = dict(disk_nodes)
1370
1381
# drop no longer present nodes
1371
1382
for pack in self.all_packs():
1372
if pack.name not in new_names:
1383
if (pack.name,) not in new_names:
1373
1384
removed.append(pack.name)
1374
1385
self._remove_pack_from_memory(pack)
1375
1386
# add new nodes/refresh existing ones
1376
for name, value in disk_nodes:
1387
for key, value in disk_nodes:
1377
1389
sizes = self._parse_index_sizes(value)
1378
1390
if name in self._names:
1420
1432
# TODO: handle same-name, index-size-changes here -
1421
1433
# e.g. use the value from disk, not ours, *unless* we're the one
1423
for name, value in disk_nodes:
1424
builder.add_node((name.encode('ascii'), ), value)
1435
for key, value in disk_nodes:
1436
builder.add_node(key, value)
1425
1437
self.transport.put_file('pack-names', builder.finish(),
1426
mode=self.repo.controldir._get_file_mode())
1438
mode=self.repo.controldir._get_file_mode())
1427
1439
self._packs_at_load = disk_nodes
1428
1440
if clear_obsolete_packs:
1429
1441
to_preserve = None
1443
1455
obsolete_packs = [o for o in obsolete_packs
1444
1456
if o.name not in already_obsolete]
1445
1457
self._obsolete_packs(obsolete_packs)
1446
return [new_node[0] for new_node in new_nodes]
1458
return [new_node[0][0] for new_node in new_nodes]
1448
1460
def reload_pack_names(self):
1449
1461
"""Sync our pack listing with what is present in the repository.
1455
1467
:return: True if the in-memory list of packs has been altered at all.
1457
1469
# The ensure_loaded call is to handle the case where the first call
1458
# made involving the collection was to reload_pack_names, where we
1470
# made involving the collection was to reload_pack_names, where we
1459
1471
# don't have a view of disk contents. It's a bit of a bandaid, and
1460
1472
# causes two reads of pack-names, but it's a rare corner case not
1461
1473
# struck with regular push/pull etc.
1523
1535
if not self.repo.is_write_locked():
1524
1536
raise errors.NotWriteLocked(self)
1525
1537
self._new_pack = self.pack_factory(self, upload_suffix='.pack',
1526
file_mode=self.repo.controldir._get_file_mode())
1538
file_mode=self.repo.controldir._get_file_mode())
1527
1539
# allow writing: queue writes to a new index
1528
1540
self.revision_index.add_writable_index(self._new_pack.revision_index,
1530
1542
self.inventory_index.add_writable_index(self._new_pack.inventory_index,
1532
1544
self.text_index.add_writable_index(self._new_pack.text_index,
1534
1546
self._new_pack.text_index.set_optimize(combine_backing_indices=False)
1535
1547
self.signature_index.add_writable_index(self._new_pack.signature_index,
1537
1549
if self.chk_index is not None:
1538
1550
self.chk_index.add_writable_index(self._new_pack.chk_index,
1540
1552
self.repo.chk_bytes._index._add_callback = self.chk_index.add_callback
1541
self._new_pack.chk_index.set_optimize(
1542
combine_backing_indices=False)
1553
self._new_pack.chk_index.set_optimize(combine_backing_indices=False)
1544
1555
self.repo.inventories._index._add_callback = self.inventory_index.add_callback
1545
1556
self.repo.revisions._index._add_callback = self.revision_index.add_callback
1550
1561
# FIXME: just drop the transient index.
1551
1562
# forget what names there are
1552
1563
if self._new_pack is not None:
1553
with cleanup.ExitStack() as stack:
1554
stack.callback(setattr, self, '_new_pack', None)
1555
# If we aborted while in the middle of finishing the write
1556
# group, _remove_pack_indices could fail because the indexes are
1557
# already gone. But they're not there we shouldn't fail in this
1558
# case, so we pass ignore_missing=True.
1559
stack.callback(self._remove_pack_indices, self._new_pack,
1560
ignore_missing=True)
1561
self._new_pack.abort()
1564
operation = cleanup.OperationWithCleanups(self._new_pack.abort)
1565
operation.add_cleanup(setattr, self, '_new_pack', None)
1566
# If we aborted while in the middle of finishing the write
1567
# group, _remove_pack_indices could fail because the indexes are
1568
# already gone. But they're not there we shouldn't fail in this
1569
# case, so we pass ignore_missing=True.
1570
operation.add_cleanup(self._remove_pack_indices, self._new_pack,
1571
ignore_missing=True)
1572
operation.run_simple()
1562
1573
for resumed_pack in self._resumed_packs:
1563
with cleanup.ExitStack() as stack:
1564
# See comment in previous finally block.
1565
stack.callback(self._remove_pack_indices, resumed_pack,
1566
ignore_missing=True)
1567
resumed_pack.abort()
1574
operation = cleanup.OperationWithCleanups(resumed_pack.abort)
1575
# See comment in previous finally block.
1576
operation.add_cleanup(self._remove_pack_indices, resumed_pack,
1577
ignore_missing=True)
1578
operation.run_simple()
1568
1579
del self._resumed_packs[:]
1570
1581
def _remove_resumed_pack_indices(self):
1595
1606
if all_missing:
1596
1607
raise errors.BzrCheckError(
1597
1608
"Repository %s has missing compression parent(s) %r "
1598
% (self.repo, sorted(all_missing)))
1609
% (self.repo, sorted(all_missing)))
1599
1610
problems = self._check_new_inventories()
1601
1612
problems_summary = '\n'.join(problems)
1682
1693
_serializer = None
1684
1695
def __init__(self, _format, a_controldir, control_files, _commit_builder_class,
1686
1697
MetaDirRepository.__init__(self, _format, a_controldir, control_files)
1687
1698
self._commit_builder_class = _commit_builder_class
1688
1699
self._serializer = _serializer
1814
1825
recompress deltas or do other such expensive operations.
1816
1827
with self.lock_write():
1817
self._pack_collection.pack(
1818
hint=hint, clean_obsolete_packs=clean_obsolete_packs)
1828
self._pack_collection.pack(hint=hint, clean_obsolete_packs=clean_obsolete_packs)
1820
1830
def reconcile(self, other=None, thorough=False):
1821
1831
"""Reconcile this repository."""
1822
from .reconcile import PackReconciler
1832
from breezy.reconcile import PackReconciler
1823
1833
with self.lock_write():
1824
1834
reconciler = PackReconciler(self, thorough=thorough)
1825
return reconciler.reconcile()
1835
reconciler.reconcile()
1827
1838
def _reconcile_pack(self, collection, packs, extension, revs, pb):
1828
1839
raise NotImplementedError(self._reconcile_pack)
1905
1916
files = [('pack-names', builder.finish())]
1906
1917
utf8_files = [('format', self.get_format_string())]
1908
self._upload_blank_content(
1909
a_controldir, dirs, files, utf8_files, shared)
1919
self._upload_blank_content(a_controldir, dirs, files, utf8_files, shared)
1910
1920
repository = self.open(a_controldir=a_controldir, _found=True)
1911
1921
self._run_post_repo_init_hooks(repository, a_controldir, shared)
1912
1922
return repository
1926
1936
repo_transport = a_controldir.get_repository_transport(None)
1927
1937
control_files = lockable_files.LockableFiles(repo_transport,
1928
'lock', lockdir.LockDir)
1938
'lock', lockdir.LockDir)
1929
1939
return self.repository_class(_format=self,
1930
a_controldir=a_controldir,
1931
control_files=control_files,
1932
_commit_builder_class=self._commit_builder_class,
1933
_serializer=self._serializer)
1940
a_controldir=a_controldir,
1941
control_files=control_files,
1942
_commit_builder_class=self._commit_builder_class,
1943
_serializer=self._serializer)
1936
1946
class RetryPackOperations(errors.RetryWithNewPacks):
1964
1974
self._reload_func = reload_func
1965
1975
self._flush_func = flush_func
1967
def add_raw_record(self, key, size, raw_data):
1968
"""Add raw knit bytes to a storage area.
1970
The data is spooled to the container writer in one bytes-record per
1973
:param key: key of the data segment
1974
:param size: length of the data segment
1975
:param raw_data: A bytestring containing the data.
1976
:return: An opaque index memo For _DirectPackAccess the memo is
1977
(index, pos, length), where the index field is the write_index
1978
object supplied to the PackAccess object.
1980
p_offset, p_length = self._container_writer.add_bytes_record(
1982
return (self._write_index, p_offset, p_length)
1984
1977
def add_raw_records(self, key_sizes, raw_data):
1985
1978
"""Add raw knit bytes to a storage area.
1995
1988
length), where the index field is the write_index object supplied
1996
1989
to the PackAccess object.
1998
raw_data = b''.join(raw_data)
1999
1991
if not isinstance(raw_data, bytes):
2000
1992
raise AssertionError(
2001
1993
'data must be plain bytes was %s' % type(raw_data))
2004
1996
for key, size in key_sizes:
2006
self.add_raw_record(key, size, [raw_data[offset:offset + size]]))
1997
p_offset, p_length = self._container_writer.add_bytes_record(
1998
raw_data[offset:offset+size], [])
2000
result.append((self._write_index, p_offset, p_length))
2010
2003
def flush(self):