1067
1054
def _commit_write_group(self):
1068
1055
"""Template method for per-repository write group cleanup.
1070
This is called before the write group is considered to be
1057
This is called before the write group is considered to be
1071
1058
finished and should ensure that all data handed to the repository
1072
for writing during the write group is safely committed (to the
1059
for writing during the write group is safely committed (to the
1073
1060
extent possible considering file system caching etc).
1076
def fetch(self, source, revision_id=None, pb=None, find_ghosts=False):
1063
def suspend_write_group(self):
1064
raise errors.UnsuspendableWriteGroup(self)
1066
def resume_write_group(self, tokens):
1067
if not self.is_write_locked():
1068
raise errors.NotWriteLocked(self)
1069
if self._write_group:
1070
raise errors.BzrError('already in a write group')
1071
self._resume_write_group(tokens)
1072
# so we can detect unlock/relock - the write group is now entered.
1073
self._write_group = self.get_transaction()
1075
def _resume_write_group(self, tokens):
1076
raise errors.UnsuspendableWriteGroup(self)
1078
def fetch(self, source, revision_id=None, pb=None, find_ghosts=False,
1077
1080
"""Fetch the content required to construct revision_id from source.
1079
If revision_id is None all content is copied.
1082
If revision_id is None and fetch_spec is None, then all content is
1080
1085
:param find_ghosts: Find and copy revisions in the source that are
1081
1086
ghosts in the target (and not reachable directly by walking out to
1082
1087
the first-present revision in target from revision_id).
1088
:param revision_id: If specified, all the content needed for this
1089
revision ID will be copied to the target. Fetch will determine for
1090
itself which content needs to be copied.
1091
:param fetch_spec: If specified, a SearchResult or
1092
PendingAncestryResult that describes which revisions to copy. This
1093
allows copying multiple heads at once. Mutually exclusive with
1096
if fetch_spec is not None and revision_id is not None:
1097
raise AssertionError(
1098
"fetch_spec and revision_id are mutually exclusive.")
1084
1099
# fast path same-url fetch operations
1085
if self.has_same_location(source):
1100
if self.has_same_location(source) and fetch_spec is None:
1086
1101
# check that last_revision is in 'from' and then return a
1087
1102
# no-operation.
1088
1103
if (revision_id is not None and
2617
2720
def is_compatible(source, target):
2618
2721
return InterRepository._same_model(source, target)
2621
def copy_content(self, revision_id=None):
2622
"""Make a complete copy of the content in self into destination.
2624
This copies both the repository's revision data, and configuration information
2625
such as the make_working_trees setting.
2627
This is a destructive operation! Do not use it on existing
2630
:param revision_id: Only copy the content needed to construct
2631
revision_id and its parents.
2634
self.target.set_make_working_trees(self.source.make_working_trees())
2635
except NotImplementedError:
2637
# but don't bother fetching if we have the needed data now.
2638
if (revision_id not in (None, _mod_revision.NULL_REVISION) and
2639
self.target.has_revision(revision_id)):
2641
self.target.fetch(self.source, revision_id=revision_id)
2644
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
2645
"""See InterRepository.fetch()."""
2646
from bzrlib.fetch import RepoFetcher
2647
mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
2648
self.source, self.source._format, self.target,
2649
self.target._format)
2650
f = RepoFetcher(to_repository=self.target,
2651
from_repository=self.source,
2652
last_revision=revision_id,
2653
pb=pb, find_ghosts=find_ghosts)
2654
return f.count_copied, f.failed_revisions
2657
2724
class InterWeaveRepo(InterSameDataRepository):
2658
2725
"""Optimised code paths between Weave based repositories.
2660
2727
This should be in bzrlib/repofmt/weaverepo.py but we have not yet
2661
2728
implemented lazy inter-object optimisation.
2986
3033
return self.source.revision_ids_to_search_result(result_set)
2989
class InterModel1and2(InterRepository):
2992
def _get_repo_format_to_test(self):
2996
def is_compatible(source, target):
2997
if not source.supports_rich_root() and target.supports_rich_root():
3003
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3004
"""See InterRepository.fetch()."""
3005
from bzrlib.fetch import Model1toKnit2Fetcher
3006
f = Model1toKnit2Fetcher(to_repository=self.target,
3007
from_repository=self.source,
3008
last_revision=revision_id,
3009
pb=pb, find_ghosts=find_ghosts)
3010
return f.count_copied, f.failed_revisions
3013
def copy_content(self, revision_id=None):
3014
"""Make a complete copy of the content in self into destination.
3016
This is a destructive operation! Do not use it on existing
3019
:param revision_id: Only copy the content needed to construct
3020
revision_id and its parents.
3023
self.target.set_make_working_trees(self.source.make_working_trees())
3024
except NotImplementedError:
3026
# but don't bother fetching if we have the needed data now.
3027
if (revision_id not in (None, _mod_revision.NULL_REVISION) and
3028
self.target.has_revision(revision_id)):
3030
self.target.fetch(self.source, revision_id=revision_id)
3033
class InterKnit1and2(InterKnitRepo):
3036
def _get_repo_format_to_test(self):
3040
def is_compatible(source, target):
3041
"""Be compatible with Knit1 source and Knit3 target"""
3043
from bzrlib.repofmt.knitrepo import (
3044
RepositoryFormatKnit1,
3045
RepositoryFormatKnit3,
3047
from bzrlib.repofmt.pack_repo import (
3048
RepositoryFormatKnitPack1,
3049
RepositoryFormatKnitPack3,
3050
RepositoryFormatKnitPack4,
3051
RepositoryFormatKnitPack5,
3052
RepositoryFormatKnitPack5RichRoot,
3053
RepositoryFormatKnitPack6,
3054
RepositoryFormatKnitPack6RichRoot,
3055
RepositoryFormatPackDevelopment2,
3056
RepositoryFormatPackDevelopment2Subtree,
3059
RepositoryFormatKnit1, # no rr, no subtree
3060
RepositoryFormatKnitPack1, # no rr, no subtree
3061
RepositoryFormatPackDevelopment2, # no rr, no subtree
3062
RepositoryFormatKnitPack5, # no rr, no subtree
3063
RepositoryFormatKnitPack6, # no rr, no subtree
3066
RepositoryFormatKnit3, # rr, subtree
3067
RepositoryFormatKnitPack3, # rr, subtree
3068
RepositoryFormatKnitPack4, # rr, no subtree
3069
RepositoryFormatKnitPack5RichRoot,# rr, no subtree
3070
RepositoryFormatKnitPack6RichRoot,# rr, no subtree
3071
RepositoryFormatPackDevelopment2Subtree, # rr, subtree
3073
for format in norichroot:
3074
if format.rich_root_data:
3075
raise AssertionError('Format %s is a rich-root format'
3076
' but is included in the non-rich-root list'
3078
for format in richroot:
3079
if not format.rich_root_data:
3080
raise AssertionError('Format %s is not a rich-root format'
3081
' but is included in the rich-root list'
3083
# TODO: One alternative is to just check format.rich_root_data,
3084
# instead of keeping membership lists. However, the formats
3085
# *also* have to use the same 'Knit' style of storage
3086
# (line-deltas, fulltexts, etc.)
3087
return (isinstance(source._format, norichroot) and
3088
isinstance(target._format, richroot))
3089
except AttributeError:
3093
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3094
"""See InterRepository.fetch()."""
3095
from bzrlib.fetch import Knit1to2Fetcher
3096
mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
3097
self.source, self.source._format, self.target,
3098
self.target._format)
3099
f = Knit1to2Fetcher(to_repository=self.target,
3100
from_repository=self.source,
3101
last_revision=revision_id,
3102
pb=pb, find_ghosts=find_ghosts)
3103
return f.count_copied, f.failed_revisions
3106
3036
class InterDifferingSerializer(InterKnitRepo):
3137
3086
text_keys = set()
3138
3087
pending_deltas = []
3139
3088
pending_revisions = []
3089
parent_map = self.source.get_parent_map(revision_ids)
3140
3090
for tree in self.source.revision_trees(revision_ids):
3141
3091
current_revision_id = tree.get_revision_id()
3142
delta = tree.inventory._make_delta(basis_tree.inventory)
3092
parent_ids = parent_map.get(current_revision_id, ())
3093
basis_id, delta = self._get_delta_for_revision(tree, parent_ids,
3095
# Find text entries that need to be copied
3143
3096
for old_path, new_path, file_id, entry in delta:
3144
3097
if new_path is not None:
3145
3098
if not (new_path or self.target.supports_rich_root()):
3146
# We leave the inventory delta in, because that
3147
# will have the deserialised inventory root
3099
# We don't copy the text for the root node unless the
3100
# target supports_rich_root.
3151
# "if entry.revision == current_revision_id" ?
3152
if entry.revision == current_revision_id:
3153
text_keys.add((file_id, entry.revision))
3102
text_keys.add((file_id, entry.revision))
3154
3103
revision = self.source.get_revision(current_revision_id)
3155
3104
pending_deltas.append((basis_id, delta,
3156
3105
current_revision_id, revision.parent_ids))
3157
3106
pending_revisions.append(revision)
3107
cache[current_revision_id] = tree
3158
3108
basis_id = current_revision_id
3160
3109
# Copy file texts
3161
3110
from_texts = self.source.texts
3162
3111
to_texts = self.target.texts
3163
3112
to_texts.insert_record_stream(from_texts.get_record_stream(
3164
text_keys, self.target._fetch_order,
3165
not self.target._fetch_uses_deltas))
3113
text_keys, self.target._format._fetch_order,
3114
not self.target._format._fetch_uses_deltas))
3166
3115
# insert deltas
3167
3116
for delta in pending_deltas:
3168
3117
self.target.add_inventory_by_delta(*delta)
3526
3487
revision_graph[key] = tuple(parent for parent in parents if parent
3527
3488
in revision_graph)
3528
3489
return revision_graph
3492
class StreamSink(object):
3493
"""An object that can insert a stream into a repository.
3495
This interface handles the complexity of reserialising inventories and
3496
revisions from different formats, and allows unidirectional insertion into
3497
stacked repositories without looking for the missing basis parents
3501
def __init__(self, target_repo):
3502
self.target_repo = target_repo
3504
def insert_stream(self, stream, src_format, resume_tokens):
3505
"""Insert a stream's content into the target repository.
3507
:param src_format: a bzr repository format.
3509
:return: a list of resume tokens and an iterable of keys additional
3510
items required before the insertion can be completed.
3512
self.target_repo.lock_write()
3515
self.target_repo.resume_write_group(resume_tokens)
3517
self.target_repo.start_write_group()
3519
# locked_insert_stream performs a commit|suspend.
3520
return self._locked_insert_stream(stream, src_format)
3522
self.target_repo.abort_write_group(suppress_errors=True)
3525
self.target_repo.unlock()
3527
def _locked_insert_stream(self, stream, src_format):
3528
to_serializer = self.target_repo._format._serializer
3529
src_serializer = src_format._serializer
3530
for substream_type, substream in stream:
3531
if substream_type == 'texts':
3532
self.target_repo.texts.insert_record_stream(substream)
3533
elif substream_type == 'inventories':
3534
if src_serializer == to_serializer:
3535
self.target_repo.inventories.insert_record_stream(
3538
self._extract_and_insert_inventories(
3539
substream, src_serializer)
3540
elif substream_type == 'revisions':
3541
# This may fallback to extract-and-insert more often than
3542
# required if the serializers are different only in terms of
3544
if src_serializer == to_serializer:
3545
self.target_repo.revisions.insert_record_stream(
3548
self._extract_and_insert_revisions(substream,
3550
elif substream_type == 'signatures':
3551
self.target_repo.signatures.insert_record_stream(substream)
3553
raise AssertionError('kaboom! %s' % (substream_type,))
3555
missing_keys = set()
3556
for prefix, versioned_file in (
3557
('texts', self.target_repo.texts),
3558
('inventories', self.target_repo.inventories),
3559
('revisions', self.target_repo.revisions),
3560
('signatures', self.target_repo.signatures),
3562
missing_keys.update((prefix,) + key for key in
3563
versioned_file.get_missing_compression_parent_keys())
3564
except NotImplementedError:
3565
# cannot even attempt suspending, and missing would have failed
3566
# during stream insertion.
3567
missing_keys = set()
3570
# suspend the write group and tell the caller what we is
3571
# missing. We know we can suspend or else we would not have
3572
# entered this code path. (All repositories that can handle
3573
# missing keys can handle suspending a write group).
3574
write_group_tokens = self.target_repo.suspend_write_group()
3575
return write_group_tokens, missing_keys
3576
self.target_repo.commit_write_group()
3579
def _extract_and_insert_inventories(self, substream, serializer):
3580
"""Generate a new inventory versionedfile in target, converting data.
3582
The inventory is retrieved from the source, (deserializing it), and
3583
stored in the target (reserializing it in a different format).
3585
for record in substream:
3586
bytes = record.get_bytes_as('fulltext')
3587
revision_id = record.key[0]
3588
inv = serializer.read_inventory_from_string(bytes, revision_id)
3589
parents = [key[0] for key in record.parents]
3590
self.target_repo.add_inventory(revision_id, inv, parents)
3592
def _extract_and_insert_revisions(self, substream, serializer):
3593
for record in substream:
3594
bytes = record.get_bytes_as('fulltext')
3595
revision_id = record.key[0]
3596
rev = serializer.read_revision_from_string(bytes)
3597
if rev.revision_id != revision_id:
3598
raise AssertionError('wtf: %s != %s' % (rev, revision_id))
3599
self.target_repo.add_revision(revision_id, rev)
3602
if self.target_repo._format._fetch_reconcile:
3603
self.target_repo.reconcile()
3606
class StreamSource(object):
3607
"""A source of a stream for fetching between repositories."""
3609
def __init__(self, from_repository, to_format):
3610
"""Create a StreamSource streaming from from_repository."""
3611
self.from_repository = from_repository
3612
self.to_format = to_format
3614
def delta_on_metadata(self):
3615
"""Return True if delta's are permitted on metadata streams.
3617
That is on revisions and signatures.
3619
src_serializer = self.from_repository._format._serializer
3620
target_serializer = self.to_format._serializer
3621
return (self.to_format._fetch_uses_deltas and
3622
src_serializer == target_serializer)
3624
def _fetch_revision_texts(self, revs):
3625
# fetch signatures first and then the revision texts
3626
# may need to be a InterRevisionStore call here.
3627
from_sf = self.from_repository.signatures
3628
# A missing signature is just skipped.
3629
keys = [(rev_id,) for rev_id in revs]
3630
signatures = versionedfile.filter_absent(from_sf.get_record_stream(
3632
self.to_format._fetch_order,
3633
not self.to_format._fetch_uses_deltas))
3634
# If a revision has a delta, this is actually expanded inside the
3635
# insert_record_stream code now, which is an alternate fix for
3637
from_rf = self.from_repository.revisions
3638
revisions = from_rf.get_record_stream(
3640
self.to_format._fetch_order,
3641
not self.delta_on_metadata())
3642
return [('signatures', signatures), ('revisions', revisions)]
3644
def _generate_root_texts(self, revs):
3645
"""This will be called by __fetch between fetching weave texts and
3646
fetching the inventory weave.
3648
Subclasses should override this if they need to generate root texts
3649
after fetching weave texts.
3651
if self._rich_root_upgrade():
3653
return bzrlib.fetch.Inter1and2Helper(
3654
self.from_repository).generate_root_texts(revs)
3658
def get_stream(self, search):
3660
revs = search.get_keys()
3661
graph = self.from_repository.get_graph()
3662
revs = list(graph.iter_topo_order(revs))
3663
data_to_fetch = self.from_repository.item_keys_introduced_by(revs)
3665
for knit_kind, file_id, revisions in data_to_fetch:
3666
if knit_kind != phase:
3668
# Make a new progress bar for this phase
3669
if knit_kind == "file":
3670
# Accumulate file texts
3671
text_keys.extend([(file_id, revision) for revision in
3673
elif knit_kind == "inventory":
3674
# Now copy the file texts.
3675
from_texts = self.from_repository.texts
3676
yield ('texts', from_texts.get_record_stream(
3677
text_keys, self.to_format._fetch_order,
3678
not self.to_format._fetch_uses_deltas))
3679
# Cause an error if a text occurs after we have done the
3682
# Before we process the inventory we generate the root
3683
# texts (if necessary) so that the inventories references
3685
for _ in self._generate_root_texts(revs):
3687
# NB: This currently reopens the inventory weave in source;
3688
# using a single stream interface instead would avoid this.
3689
from_weave = self.from_repository.inventories
3690
# we fetch only the referenced inventories because we do not
3691
# know for unselected inventories whether all their required
3692
# texts are present in the other repository - it could be
3694
yield ('inventories', from_weave.get_record_stream(
3695
[(rev_id,) for rev_id in revs],
3696
self.inventory_fetch_order(),
3697
not self.delta_on_metadata()))
3698
elif knit_kind == "signatures":
3699
# Nothing to do here; this will be taken care of when
3700
# _fetch_revision_texts happens.
3702
elif knit_kind == "revisions":
3703
for record in self._fetch_revision_texts(revs):
3706
raise AssertionError("Unknown knit kind %r" % knit_kind)
3708
def get_stream_for_missing_keys(self, missing_keys):
3709
# missing keys can only occur when we are byte copying and not
3710
# translating (because translation means we don't send
3711
# unreconstructable deltas ever).
3713
keys['texts'] = set()
3714
keys['revisions'] = set()
3715
keys['inventories'] = set()
3716
keys['signatures'] = set()
3717
for key in missing_keys:
3718
keys[key[0]].add(key[1:])
3719
if len(keys['revisions']):
3720
# If we allowed copying revisions at this point, we could end up
3721
# copying a revision without copying its required texts: a
3722
# violation of the requirements for repository integrity.
3723
raise AssertionError(
3724
'cannot copy revisions to fill in missing deltas %s' % (
3725
keys['revisions'],))
3726
for substream_kind, keys in keys.iteritems():
3727
vf = getattr(self.from_repository, substream_kind)
3728
# Ask for full texts always so that we don't need more round trips
3729
# after this stream.
3730
stream = vf.get_record_stream(keys,
3731
self.to_format._fetch_order, True)
3732
yield substream_kind, stream
3734
def inventory_fetch_order(self):
3735
if self._rich_root_upgrade():
3736
return 'topological'
3738
return self.to_format._fetch_order
3740
def _rich_root_upgrade(self):
3741
return (not self.from_repository._format.rich_root_data and
3742
self.to_format.rich_root_data)