475
475
self.knit_access.set_writer(None, None, (None, None))
478
class RepositoryPackCollection(object):
479
"""Management of packs within a repository."""
481
def __init__(self, repo, transport, index_transport, upload_transport,
483
"""Create a new RepositoryPackCollection.
485
:param transport: Addresses the repository base directory
486
(typically .bzr/repository/).
487
:param index_transport: Addresses the directory containing indices.
488
:param upload_transport: Addresses the directory into which packs are written
489
while they're being created.
490
:param pack_transport: Addresses the directory of existing complete packs.
493
self.transport = transport
494
self._index_transport = index_transport
495
self._upload_transport = upload_transport
496
self._pack_transport = pack_transport
497
self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3}
500
self._packs_by_name = {}
501
# the previous pack-names content
502
self._packs_at_load = None
503
# when a pack is being created by this object, the state of that pack.
504
self._new_pack = None
505
# aggregated revision index data
506
self.revision_index = AggregateIndex()
507
self.inventory_index = AggregateIndex()
508
self.text_index = AggregateIndex()
509
self.signature_index = AggregateIndex()
511
def add_pack_to_memory(self, pack):
512
"""Make a Pack object available to the repository to satisfy queries.
514
:param pack: A Pack object.
516
assert pack.name not in self._packs_by_name
517
self.packs.append(pack)
518
self._packs_by_name[pack.name] = pack
519
self.revision_index.add_index(pack.revision_index, pack)
520
self.inventory_index.add_index(pack.inventory_index, pack)
521
self.text_index.add_index(pack.text_index, pack)
522
self.signature_index.add_index(pack.signature_index, pack)
524
def _add_text_to_weave(self, file_id, revision_id, new_lines, parents,
525
nostore_sha, random_revid):
526
file_id_index = GraphIndexPrefixAdapter(
527
self.text_index.combined_index,
529
add_nodes_callback=self.text_index.add_callback)
530
self.repo._text_knit._index._graph_index = file_id_index
531
self.repo._text_knit._index._add_callback = file_id_index.add_nodes
532
return self.repo._text_knit.add_lines_with_ghosts(
533
revision_id, parents, new_lines, nostore_sha=nostore_sha,
534
random_id=random_revid, check_content=False)[0:2]
537
"""Return a list of all the Pack objects this repository has.
539
Note that an in-progress pack being created is not returned.
541
:return: A list of Pack objects for all the packs in the repository.
544
for name in self.names():
545
result.append(self.get_pack_by_name(name))
549
"""Pack the pack collection incrementally.
551
This will not attempt global reorganisation or recompression,
552
rather it will just ensure that the total number of packs does
553
not grow without bound. It uses the _max_pack_count method to
554
determine if autopacking is needed, and the pack_distribution
555
method to determine the number of revisions in each pack.
557
If autopacking takes place then the packs name collection will have
558
been flushed to disk - packing requires updating the name collection
559
in synchronisation with certain steps. Otherwise the names collection
562
:return: True if packing took place.
564
# XXX: Should not be needed when the management of indices is sane.
565
total_revisions = self.revision_index.combined_index.key_count()
566
total_packs = len(self._names)
567
if self._max_pack_count(total_revisions) >= total_packs:
569
# XXX: the following may want to be a class, to pack with a given
571
mutter('Auto-packing repository %s, which has %d pack files, '
572
'containing %d revisions into %d packs.', self, total_packs,
573
total_revisions, self._max_pack_count(total_revisions))
574
# determine which packs need changing
575
pack_distribution = self.pack_distribution(total_revisions)
577
for pack in self.all_packs():
578
revision_count = pack.get_revision_count()
579
if revision_count == 0:
580
# revision less packs are not generated by normal operation,
581
# only by operations like sign-my-commits, and thus will not
582
# tend to grow rapdily or without bound like commit containing
583
# packs do - leave them alone as packing them really should
584
# group their data with the relevant commit, and that may
585
# involve rewriting ancient history - which autopack tries to
586
# avoid. Alternatively we could not group the data but treat
587
# each of these as having a single revision, and thus add
588
# one revision for each to the total revision count, to get
589
# a matching distribution.
591
existing_packs.append((revision_count, pack))
592
pack_operations = self.plan_autopack_combinations(
593
existing_packs, pack_distribution)
594
self._execute_pack_operations(pack_operations)
597
def create_pack_from_packs(self, packs, suffix, revision_ids=None):
478
class Packer(object):
479
"""Create a pack from packs."""
481
def __init__(self, pack_collection, packs, suffix, revision_ids=None):
484
:param pack_collection: A RepositoryPackCollection object where the
485
new pack is being written to.
486
:param packs: The packs to combine.
487
:param suffix: The suffix to use on the temporary files for the pack.
488
:param revision_ids: Revision ids to limit the pack to.
492
self.revision_ids = revision_ids
493
self._pack_collection = pack_collection
495
def pack(self, pb=None):
598
496
"""Create a new pack by reading data from other packs.
600
498
This does little more than a bulk copy of data. One key difference
604
502
source packs are not altered and are not required to be in the current
607
:param packs: An iterable of Packs to combine.
608
:param revision_ids: Either None, to copy all data, or a list
609
of revision_ids to limit the copied data to the data they
505
:param pb: An optional progress bar to use. A nested bar is created if
611
507
:return: A Pack object, or None if nothing was copied.
613
509
# open a pack - using the same name as the last temporary file
614
510
# - which has already been flushed, so its safe.
615
511
# XXX: - duplicate code warning with start_write_group; fix before
616
512
# considering 'done'.
617
if self._new_pack is not None:
513
if self._pack_collection._new_pack is not None:
618
514
raise errors.BzrError('call to create_pack_from_packs while '
619
515
'another pack is being written.')
620
if revision_ids is not None:
621
if len(revision_ids) == 0:
516
if self.revision_ids is not None:
517
if len(self.revision_ids) == 0:
622
518
# silly fetch request.
625
revision_ids = frozenset(revision_ids)
626
pb = ui.ui_factory.nested_progress_bar()
521
self.revision_ids = frozenset(self.revision_ids)
523
self.pb = ui.ui_factory.nested_progress_bar()
628
return self._create_pack_from_packs(packs, suffix, revision_ids,
527
return self._create_pack_from_packs()
633
def _create_pack_from_packs(self, packs, suffix, revision_ids, pb):
634
pb.update("Opening pack", 0, 5)
635
new_pack = NewPack(self._upload_transport, self._index_transport,
636
self._pack_transport, upload_suffix=suffix)
533
"""Open a pack for the pack we are creating."""
534
return NewPack(self._pack_collection._upload_transport,
535
self._pack_collection._index_transport,
536
self._pack_collection._pack_transport, upload_suffix=self.suffix)
538
def _create_pack_from_packs(self):
539
self.pb.update("Opening pack", 0, 5)
540
new_pack = self.open_pack()
637
541
# buffer data - we won't be reading-back during the pack creation and
638
542
# this makes a significant difference on sftp pushes.
639
543
new_pack.set_write_cache_size(1024*1024)
640
544
if 'pack' in debug.debug_flags:
641
545
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
643
if revision_ids is not None:
644
rev_count = len(revision_ids)
546
for a_pack in self.packs]
547
if self.revision_ids is not None:
548
rev_count = len(self.revision_ids)
646
550
rev_count = 'all'
647
551
mutter('%s: create_pack: creating pack from source packs: '
648
552
'%s%s %s revisions wanted %s t=0',
649
time.ctime(), self._upload_transport.base, new_pack.random_name,
553
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
650
554
plain_pack_list, rev_count)
651
555
# select revisions
653
revision_keys = [(revision_id,) for revision_id in revision_ids]
556
if self.revision_ids:
557
revision_keys = [(revision_id,) for revision_id in self.revision_ids]
655
559
revision_keys = None
657
561
# select revision keys
658
revision_index_map = self._packs_list_to_pack_map_and_index_list(
659
packs, 'revision_index')[0]
660
revision_nodes = self._index_contents(revision_index_map, revision_keys)
562
revision_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
563
self.packs, 'revision_index')[0]
564
revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
661
565
# copy revision keys and adjust values
662
pb.update("Copying revision texts", 1)
566
self.pb.update("Copying revision texts", 1)
663
567
list(self._copy_nodes_graph(revision_nodes, revision_index_map,
664
568
new_pack._writer, new_pack.revision_index))
665
569
if 'pack' in debug.debug_flags:
666
570
mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
667
time.ctime(), self._upload_transport.base, new_pack.random_name,
571
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
668
572
new_pack.revision_index.key_count(),
669
573
time.time() - new_pack.start_time)
670
574
# select inventory keys
718
622
raise errors.RevisionNotPresent(a_missing_key[1],
719
623
a_missing_key[0])
720
624
# copy text keys and adjust values
721
pb.update("Copying content texts", 3)
625
self.pb.update("Copying content texts", 3)
722
626
list(self._copy_nodes_graph(text_nodes, text_index_map,
723
627
new_pack._writer, new_pack.text_index))
724
628
if 'pack' in debug.debug_flags:
725
629
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
726
time.ctime(), self._upload_transport.base, new_pack.random_name,
630
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
727
631
new_pack.text_index.key_count(),
728
632
time.time() - new_pack.start_time)
729
633
# select signature keys
730
634
signature_filter = revision_keys # same keyspace
731
signature_index_map = self._packs_list_to_pack_map_and_index_list(
732
packs, 'signature_index')[0]
733
signature_nodes = self._index_contents(signature_index_map,
635
signature_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
636
self.packs, 'signature_index')[0]
637
signature_nodes = self._pack_collection._index_contents(signature_index_map,
734
638
signature_filter)
735
639
# copy signature keys and adjust values
736
pb.update("Copying signature texts", 4)
640
self.pb.update("Copying signature texts", 4)
737
641
self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
738
642
new_pack.signature_index)
739
643
if 'pack' in debug.debug_flags:
740
644
mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
741
time.ctime(), self._upload_transport.base, new_pack.random_name,
645
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
742
646
new_pack.signature_index.key_count(),
743
647
time.time() - new_pack.start_time)
744
648
if not new_pack.data_inserted():
747
pb.update("Finishing pack", 5)
651
self.pb.update("Finishing pack", 5)
748
652
new_pack.finish()
749
self.allocate(new_pack)
653
self._pack_collection.allocate(new_pack)
656
def _copy_nodes(self, nodes, index_map, writer, write_index):
657
"""Copy knit nodes between packs with no graph references."""
658
pb = ui.ui_factory.nested_progress_bar()
660
return self._do_copy_nodes(nodes, index_map, writer,
665
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb):
666
# for record verification
667
knit_data = _KnitData(None)
668
# plan a readv on each source pack:
670
nodes = sorted(nodes)
671
# how to map this into knit.py - or knit.py into this?
672
# we don't want the typical knit logic, we want grouping by pack
673
# at this point - perhaps a helper library for the following code
674
# duplication points?
676
for index, key, value in nodes:
677
if index not in request_groups:
678
request_groups[index] = []
679
request_groups[index].append((key, value))
681
pb.update("Copied record", record_index, len(nodes))
682
for index, items in request_groups.iteritems():
683
pack_readv_requests = []
684
for key, value in items:
685
# ---- KnitGraphIndex.get_position
686
bits = value[1:].split(' ')
687
offset, length = int(bits[0]), int(bits[1])
688
pack_readv_requests.append((offset, length, (key, value[0])))
689
# linear scan up the pack
690
pack_readv_requests.sort()
692
transport, path = index_map[index]
693
reader = pack.make_readv_reader(transport, path,
694
[offset[0:2] for offset in pack_readv_requests])
695
for (names, read_func), (_1, _2, (key, eol_flag)) in \
696
izip(reader.iter_records(), pack_readv_requests):
697
raw_data = read_func(None)
698
# check the header only
699
df, _ = knit_data._parse_record_header(key[-1], raw_data)
701
pos, size = writer.add_bytes_record(raw_data, names)
702
write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
703
pb.update("Copied record", record_index)
706
def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
708
"""Copy knit nodes between packs.
710
:param output_lines: Return lines present in the copied data as
713
pb = ui.ui_factory.nested_progress_bar()
715
return self._do_copy_nodes_graph(nodes, index_map, writer,
716
write_index, output_lines, pb)
720
def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
722
# for record verification
723
knit_data = _KnitData(None)
724
# for line extraction when requested (inventories only)
726
factory = knit.KnitPlainFactory()
727
# plan a readv on each source pack:
729
nodes = sorted(nodes)
730
# how to map this into knit.py - or knit.py into this?
731
# we don't want the typical knit logic, we want grouping by pack
732
# at this point - perhaps a helper library for the following code
733
# duplication points?
736
pb.update("Copied record", record_index, len(nodes))
737
for index, key, value, references in nodes:
738
if index not in request_groups:
739
request_groups[index] = []
740
request_groups[index].append((key, value, references))
741
for index, items in request_groups.iteritems():
742
pack_readv_requests = []
743
for key, value, references in items:
744
# ---- KnitGraphIndex.get_position
745
bits = value[1:].split(' ')
746
offset, length = int(bits[0]), int(bits[1])
747
pack_readv_requests.append((offset, length, (key, value[0], references)))
748
# linear scan up the pack
749
pack_readv_requests.sort()
751
transport, path = index_map[index]
752
reader = pack.make_readv_reader(transport, path,
753
[offset[0:2] for offset in pack_readv_requests])
754
for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
755
izip(reader.iter_records(), pack_readv_requests):
756
raw_data = read_func(None)
758
# read the entire thing
759
content, _ = knit_data._parse_record(key[-1], raw_data)
760
if len(references[-1]) == 0:
761
line_iterator = factory.get_fulltext_content(content)
763
line_iterator = factory.get_linedelta_content(content)
764
for line in line_iterator:
767
# check the header only
768
df, _ = knit_data._parse_record_header(key[-1], raw_data)
770
pos, size = writer.add_bytes_record(raw_data, names)
771
write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
772
pb.update("Copied record", record_index)
776
class ReconcilePacker(Packer):
777
"""A packer which regenerates indices etc as it copies.
779
This is used by ``bzr reconcile`` to cause parent text pointers to be
784
class RepositoryPackCollection(object):
785
"""Management of packs within a repository."""
787
def __init__(self, repo, transport, index_transport, upload_transport,
789
"""Create a new RepositoryPackCollection.
791
:param transport: Addresses the repository base directory
792
(typically .bzr/repository/).
793
:param index_transport: Addresses the directory containing indices.
794
:param upload_transport: Addresses the directory into which packs are written
795
while they're being created.
796
:param pack_transport: Addresses the directory of existing complete packs.
799
self.transport = transport
800
self._index_transport = index_transport
801
self._upload_transport = upload_transport
802
self._pack_transport = pack_transport
803
self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3}
806
self._packs_by_name = {}
807
# the previous pack-names content
808
self._packs_at_load = None
809
# when a pack is being created by this object, the state of that pack.
810
self._new_pack = None
811
# aggregated revision index data
812
self.revision_index = AggregateIndex()
813
self.inventory_index = AggregateIndex()
814
self.text_index = AggregateIndex()
815
self.signature_index = AggregateIndex()
817
def add_pack_to_memory(self, pack):
818
"""Make a Pack object available to the repository to satisfy queries.
820
:param pack: A Pack object.
822
assert pack.name not in self._packs_by_name
823
self.packs.append(pack)
824
self._packs_by_name[pack.name] = pack
825
self.revision_index.add_index(pack.revision_index, pack)
826
self.inventory_index.add_index(pack.inventory_index, pack)
827
self.text_index.add_index(pack.text_index, pack)
828
self.signature_index.add_index(pack.signature_index, pack)
830
def _add_text_to_weave(self, file_id, revision_id, new_lines, parents,
831
nostore_sha, random_revid):
832
file_id_index = GraphIndexPrefixAdapter(
833
self.text_index.combined_index,
835
add_nodes_callback=self.text_index.add_callback)
836
self.repo._text_knit._index._graph_index = file_id_index
837
self.repo._text_knit._index._add_callback = file_id_index.add_nodes
838
return self.repo._text_knit.add_lines_with_ghosts(
839
revision_id, parents, new_lines, nostore_sha=nostore_sha,
840
random_id=random_revid, check_content=False)[0:2]
843
"""Return a list of all the Pack objects this repository has.
845
Note that an in-progress pack being created is not returned.
847
:return: A list of Pack objects for all the packs in the repository.
850
for name in self.names():
851
result.append(self.get_pack_by_name(name))
855
"""Pack the pack collection incrementally.
857
This will not attempt global reorganisation or recompression,
858
rather it will just ensure that the total number of packs does
859
not grow without bound. It uses the _max_pack_count method to
860
determine if autopacking is needed, and the pack_distribution
861
method to determine the number of revisions in each pack.
863
If autopacking takes place then the packs name collection will have
864
been flushed to disk - packing requires updating the name collection
865
in synchronisation with certain steps. Otherwise the names collection
868
:return: True if packing took place.
870
# XXX: Should not be needed when the management of indices is sane.
871
total_revisions = self.revision_index.combined_index.key_count()
872
total_packs = len(self._names)
873
if self._max_pack_count(total_revisions) >= total_packs:
875
# XXX: the following may want to be a class, to pack with a given
877
mutter('Auto-packing repository %s, which has %d pack files, '
878
'containing %d revisions into %d packs.', self, total_packs,
879
total_revisions, self._max_pack_count(total_revisions))
880
# determine which packs need changing
881
pack_distribution = self.pack_distribution(total_revisions)
883
for pack in self.all_packs():
884
revision_count = pack.get_revision_count()
885
if revision_count == 0:
886
# revision less packs are not generated by normal operation,
887
# only by operations like sign-my-commits, and thus will not
888
# tend to grow rapdily or without bound like commit containing
889
# packs do - leave them alone as packing them really should
890
# group their data with the relevant commit, and that may
891
# involve rewriting ancient history - which autopack tries to
892
# avoid. Alternatively we could not group the data but treat
893
# each of these as having a single revision, and thus add
894
# one revision for each to the total revision count, to get
895
# a matching distribution.
897
existing_packs.append((revision_count, pack))
898
pack_operations = self.plan_autopack_combinations(
899
existing_packs, pack_distribution)
900
self._execute_pack_operations(pack_operations)
752
903
def _execute_pack_operations(self, pack_operations):
753
904
"""Execute a series of pack operations.
842
992
return pack_operations
844
def _copy_nodes(self, nodes, index_map, writer, write_index):
845
"""Copy knit nodes between packs with no graph references."""
846
pb = ui.ui_factory.nested_progress_bar()
848
return self._do_copy_nodes(nodes, index_map, writer,
853
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb):
854
# for record verification
855
knit_data = _KnitData(None)
856
# plan a readv on each source pack:
858
nodes = sorted(nodes)
859
# how to map this into knit.py - or knit.py into this?
860
# we don't want the typical knit logic, we want grouping by pack
861
# at this point - perhaps a helper library for the following code
862
# duplication points?
864
for index, key, value in nodes:
865
if index not in request_groups:
866
request_groups[index] = []
867
request_groups[index].append((key, value))
869
pb.update("Copied record", record_index, len(nodes))
870
for index, items in request_groups.iteritems():
871
pack_readv_requests = []
872
for key, value in items:
873
# ---- KnitGraphIndex.get_position
874
bits = value[1:].split(' ')
875
offset, length = int(bits[0]), int(bits[1])
876
pack_readv_requests.append((offset, length, (key, value[0])))
877
# linear scan up the pack
878
pack_readv_requests.sort()
880
transport, path = index_map[index]
881
reader = pack.make_readv_reader(transport, path,
882
[offset[0:2] for offset in pack_readv_requests])
883
for (names, read_func), (_1, _2, (key, eol_flag)) in \
884
izip(reader.iter_records(), pack_readv_requests):
885
raw_data = read_func(None)
886
# check the header only
887
df, _ = knit_data._parse_record_header(key[-1], raw_data)
889
pos, size = writer.add_bytes_record(raw_data, names)
890
write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
891
pb.update("Copied record", record_index)
894
def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
896
"""Copy knit nodes between packs.
898
:param output_lines: Return lines present in the copied data as
901
pb = ui.ui_factory.nested_progress_bar()
903
return self._do_copy_nodes_graph(nodes, index_map, writer,
904
write_index, output_lines, pb)
908
def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
910
# for record verification
911
knit_data = _KnitData(None)
912
# for line extraction when requested (inventories only)
914
factory = knit.KnitPlainFactory()
915
# plan a readv on each source pack:
917
nodes = sorted(nodes)
918
# how to map this into knit.py - or knit.py into this?
919
# we don't want the typical knit logic, we want grouping by pack
920
# at this point - perhaps a helper library for the following code
921
# duplication points?
924
pb.update("Copied record", record_index, len(nodes))
925
for index, key, value, references in nodes:
926
if index not in request_groups:
927
request_groups[index] = []
928
request_groups[index].append((key, value, references))
929
for index, items in request_groups.iteritems():
930
pack_readv_requests = []
931
for key, value, references in items:
932
# ---- KnitGraphIndex.get_position
933
bits = value[1:].split(' ')
934
offset, length = int(bits[0]), int(bits[1])
935
pack_readv_requests.append((offset, length, (key, value[0], references)))
936
# linear scan up the pack
937
pack_readv_requests.sort()
939
transport, path = index_map[index]
940
reader = pack.make_readv_reader(transport, path,
941
[offset[0:2] for offset in pack_readv_requests])
942
for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
943
izip(reader.iter_records(), pack_readv_requests):
944
raw_data = read_func(None)
946
# read the entire thing
947
content, _ = knit_data._parse_record(key[-1], raw_data)
948
if len(references[-1]) == 0:
949
line_iterator = factory.get_fulltext_content(content)
951
line_iterator = factory.get_linedelta_content(content)
952
for line in line_iterator:
955
# check the header only
956
df, _ = knit_data._parse_record_header(key[-1], raw_data)
958
pos, size = writer.add_bytes_record(raw_data, names)
959
write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
960
pb.update("Copied record", record_index)
963
994
def ensure_loaded(self):
964
995
# NB: if you see an assertion error here, its probably access against
965
996
# an unlocked repo. Naughty.