2678
2743
def is_compatible(source, target):
2679
2744
return InterRepository._same_model(source, target)
2682
def copy_content(self, revision_id=None):
2683
"""Make a complete copy of the content in self into destination.
2685
This copies both the repository's revision data, and configuration information
2686
such as the make_working_trees setting.
2688
This is a destructive operation! Do not use it on existing
2691
:param revision_id: Only copy the content needed to construct
2692
revision_id and its parents.
2695
self.target.set_make_working_trees(self.source.make_working_trees())
2696
except NotImplementedError:
2698
# but don't bother fetching if we have the needed data now.
2699
if (revision_id not in (None, _mod_revision.NULL_REVISION) and
2700
self.target.has_revision(revision_id)):
2702
self.target.fetch(self.source, revision_id=revision_id)
2705
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
2706
"""See InterRepository.fetch()."""
2707
from bzrlib.fetch import RepoFetcher
2708
mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
2709
self.source, self.source._format, self.target,
2710
self.target._format)
2711
f = RepoFetcher(to_repository=self.target,
2712
from_repository=self.source,
2713
last_revision=revision_id,
2714
pb=pb, find_ghosts=find_ghosts)
2715
return f.count_copied, f.failed_revisions
2718
2747
class InterWeaveRepo(InterSameDataRepository):
2719
2748
"""Optimised code paths between Weave based repositories.
2721
2750
This should be in bzrlib/repofmt/weaverepo.py but we have not yet
2722
2751
implemented lazy inter-object optimisation.
3047
3072
return self.source.revision_ids_to_search_result(result_set)
3050
class InterModel1and2(InterRepository):
3053
def _get_repo_format_to_test(self):
3057
def is_compatible(source, target):
3058
if not source.supports_rich_root() and target.supports_rich_root():
3064
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3065
"""See InterRepository.fetch()."""
3066
from bzrlib.fetch import Model1toKnit2Fetcher
3067
f = Model1toKnit2Fetcher(to_repository=self.target,
3068
from_repository=self.source,
3069
last_revision=revision_id,
3070
pb=pb, find_ghosts=find_ghosts)
3071
return f.count_copied, f.failed_revisions
3074
def copy_content(self, revision_id=None):
3075
"""Make a complete copy of the content in self into destination.
3077
This is a destructive operation! Do not use it on existing
3080
:param revision_id: Only copy the content needed to construct
3081
revision_id and its parents.
3084
self.target.set_make_working_trees(self.source.make_working_trees())
3085
except NotImplementedError:
3087
# but don't bother fetching if we have the needed data now.
3088
if (revision_id not in (None, _mod_revision.NULL_REVISION) and
3089
self.target.has_revision(revision_id)):
3091
self.target.fetch(self.source, revision_id=revision_id)
3094
class InterKnit1and2(InterKnitRepo):
3097
def _get_repo_format_to_test(self):
3101
def is_compatible(source, target):
3102
"""Be compatible with Knit1 source and Knit3 target"""
3104
from bzrlib.repofmt.knitrepo import (
3105
RepositoryFormatKnit1,
3106
RepositoryFormatKnit3,
3108
from bzrlib.repofmt.pack_repo import (
3109
RepositoryFormatKnitPack1,
3110
RepositoryFormatKnitPack3,
3111
RepositoryFormatKnitPack4,
3112
RepositoryFormatKnitPack5,
3113
RepositoryFormatKnitPack5RichRoot,
3114
RepositoryFormatKnitPack6,
3115
RepositoryFormatKnitPack6RichRoot,
3116
RepositoryFormatPackDevelopment2,
3117
RepositoryFormatPackDevelopment2Subtree,
3120
RepositoryFormatKnit1, # no rr, no subtree
3121
RepositoryFormatKnitPack1, # no rr, no subtree
3122
RepositoryFormatPackDevelopment2, # no rr, no subtree
3123
RepositoryFormatKnitPack5, # no rr, no subtree
3124
RepositoryFormatKnitPack6, # no rr, no subtree
3127
RepositoryFormatKnit3, # rr, subtree
3128
RepositoryFormatKnitPack3, # rr, subtree
3129
RepositoryFormatKnitPack4, # rr, no subtree
3130
RepositoryFormatKnitPack5RichRoot,# rr, no subtree
3131
RepositoryFormatKnitPack6RichRoot,# rr, no subtree
3132
RepositoryFormatPackDevelopment2Subtree, # rr, subtree
3134
for format in norichroot:
3135
if format.rich_root_data:
3136
raise AssertionError('Format %s is a rich-root format'
3137
' but is included in the non-rich-root list'
3139
for format in richroot:
3140
if not format.rich_root_data:
3141
raise AssertionError('Format %s is not a rich-root format'
3142
' but is included in the rich-root list'
3144
# TODO: One alternative is to just check format.rich_root_data,
3145
# instead of keeping membership lists. However, the formats
3146
# *also* have to use the same 'Knit' style of storage
3147
# (line-deltas, fulltexts, etc.)
3148
return (isinstance(source._format, norichroot) and
3149
isinstance(target._format, richroot))
3150
except AttributeError:
3154
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3155
"""See InterRepository.fetch()."""
3156
from bzrlib.fetch import Knit1to2Fetcher
3157
mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
3158
self.source, self.source._format, self.target,
3159
self.target._format)
3160
f = Knit1to2Fetcher(to_repository=self.target,
3161
from_repository=self.source,
3162
last_revision=revision_id,
3163
pb=pb, find_ghosts=find_ghosts)
3164
return f.count_copied, f.failed_revisions
3167
3075
class InterDifferingSerializer(InterKnitRepo):
3587
3524
revision_graph[key] = tuple(parent for parent in parents if parent
3588
3525
in revision_graph)
3589
3526
return revision_graph
3529
class StreamSink(object):
3530
"""An object that can insert a stream into a repository.
3532
This interface handles the complexity of reserialising inventories and
3533
revisions from different formats, and allows unidirectional insertion into
3534
stacked repositories without looking for the missing basis parents
3538
def __init__(self, target_repo):
3539
self.target_repo = target_repo
3541
def insert_stream(self, stream, src_format, resume_tokens):
3542
"""Insert a stream's content into the target repository.
3544
:param src_format: a bzr repository format.
3546
:return: a list of resume tokens and an iterable of keys additional
3547
items required before the insertion can be completed.
3549
self.target_repo.lock_write()
3552
self.target_repo.resume_write_group(resume_tokens)
3554
self.target_repo.start_write_group()
3556
# locked_insert_stream performs a commit|suspend.
3557
return self._locked_insert_stream(stream, src_format)
3559
self.target_repo.abort_write_group(suppress_errors=True)
3562
self.target_repo.unlock()
3564
def _locked_insert_stream(self, stream, src_format):
3565
to_serializer = self.target_repo._format._serializer
3566
src_serializer = src_format._serializer
3567
for substream_type, substream in stream:
3568
if substream_type == 'texts':
3569
self.target_repo.texts.insert_record_stream(substream)
3570
elif substream_type == 'inventories':
3571
if src_serializer == to_serializer:
3572
self.target_repo.inventories.insert_record_stream(
3575
self._extract_and_insert_inventories(
3576
substream, src_serializer)
3577
elif substream_type == 'revisions':
3578
# This may fallback to extract-and-insert more often than
3579
# required if the serializers are different only in terms of
3581
if src_serializer == to_serializer:
3582
self.target_repo.revisions.insert_record_stream(
3585
self._extract_and_insert_revisions(substream,
3587
elif substream_type == 'signatures':
3588
self.target_repo.signatures.insert_record_stream(substream)
3590
raise AssertionError('kaboom! %s' % (substream_type,))
3592
missing_keys = set()
3593
for prefix, versioned_file in (
3594
('texts', self.target_repo.texts),
3595
('inventories', self.target_repo.inventories),
3596
('revisions', self.target_repo.revisions),
3597
('signatures', self.target_repo.signatures),
3599
missing_keys.update((prefix,) + key for key in
3600
versioned_file.get_missing_compression_parent_keys())
3601
except NotImplementedError:
3602
# cannot even attempt suspending, and missing would have failed
3603
# during stream insertion.
3604
missing_keys = set()
3607
# suspend the write group and tell the caller what we is
3608
# missing. We know we can suspend or else we would not have
3609
# entered this code path. (All repositories that can handle
3610
# missing keys can handle suspending a write group).
3611
write_group_tokens = self.target_repo.suspend_write_group()
3612
return write_group_tokens, missing_keys
3613
self.target_repo.commit_write_group()
3616
def _extract_and_insert_inventories(self, substream, serializer):
3617
"""Generate a new inventory versionedfile in target, converting data.
3619
The inventory is retrieved from the source, (deserializing it), and
3620
stored in the target (reserializing it in a different format).
3622
for record in substream:
3623
bytes = record.get_bytes_as('fulltext')
3624
revision_id = record.key[0]
3625
inv = serializer.read_inventory_from_string(bytes, revision_id)
3626
parents = [key[0] for key in record.parents]
3627
self.target_repo.add_inventory(revision_id, inv, parents)
3629
def _extract_and_insert_revisions(self, substream, serializer):
3630
for record in substream:
3631
bytes = record.get_bytes_as('fulltext')
3632
revision_id = record.key[0]
3633
rev = serializer.read_revision_from_string(bytes)
3634
if rev.revision_id != revision_id:
3635
raise AssertionError('wtf: %s != %s' % (rev, revision_id))
3636
self.target_repo.add_revision(revision_id, rev)
3639
if self.target_repo._format._fetch_reconcile:
3640
self.target_repo.reconcile()
3643
class StreamSource(object):
3644
"""A source of a stream for fetching between repositories."""
3646
def __init__(self, from_repository, to_format):
3647
"""Create a StreamSource streaming from from_repository."""
3648
self.from_repository = from_repository
3649
self.to_format = to_format
3651
def delta_on_metadata(self):
3652
"""Return True if delta's are permitted on metadata streams.
3654
That is on revisions and signatures.
3656
src_serializer = self.from_repository._format._serializer
3657
target_serializer = self.to_format._serializer
3658
return (self.to_format._fetch_uses_deltas and
3659
src_serializer == target_serializer)
3661
def _fetch_revision_texts(self, revs):
3662
# fetch signatures first and then the revision texts
3663
# may need to be a InterRevisionStore call here.
3664
from_sf = self.from_repository.signatures
3665
# A missing signature is just skipped.
3666
keys = [(rev_id,) for rev_id in revs]
3667
signatures = versionedfile.filter_absent(from_sf.get_record_stream(
3669
self.to_format._fetch_order,
3670
not self.to_format._fetch_uses_deltas))
3671
# If a revision has a delta, this is actually expanded inside the
3672
# insert_record_stream code now, which is an alternate fix for
3674
from_rf = self.from_repository.revisions
3675
revisions = from_rf.get_record_stream(
3677
self.to_format._fetch_order,
3678
not self.delta_on_metadata())
3679
return [('signatures', signatures), ('revisions', revisions)]
3681
def _generate_root_texts(self, revs):
3682
"""This will be called by __fetch between fetching weave texts and
3683
fetching the inventory weave.
3685
Subclasses should override this if they need to generate root texts
3686
after fetching weave texts.
3688
if self._rich_root_upgrade():
3690
return bzrlib.fetch.Inter1and2Helper(
3691
self.from_repository).generate_root_texts(revs)
3695
def get_stream(self, search):
3697
revs = search.get_keys()
3698
graph = self.from_repository.get_graph()
3699
revs = list(graph.iter_topo_order(revs))
3700
data_to_fetch = self.from_repository.item_keys_introduced_by(revs)
3702
for knit_kind, file_id, revisions in data_to_fetch:
3703
if knit_kind != phase:
3705
# Make a new progress bar for this phase
3706
if knit_kind == "file":
3707
# Accumulate file texts
3708
text_keys.extend([(file_id, revision) for revision in
3710
elif knit_kind == "inventory":
3711
# Now copy the file texts.
3712
from_texts = self.from_repository.texts
3713
yield ('texts', from_texts.get_record_stream(
3714
text_keys, self.to_format._fetch_order,
3715
not self.to_format._fetch_uses_deltas))
3716
# Cause an error if a text occurs after we have done the
3719
# Before we process the inventory we generate the root
3720
# texts (if necessary) so that the inventories references
3722
for _ in self._generate_root_texts(revs):
3724
# NB: This currently reopens the inventory weave in source;
3725
# using a single stream interface instead would avoid this.
3726
from_weave = self.from_repository.inventories
3727
# we fetch only the referenced inventories because we do not
3728
# know for unselected inventories whether all their required
3729
# texts are present in the other repository - it could be
3731
yield ('inventories', from_weave.get_record_stream(
3732
[(rev_id,) for rev_id in revs],
3733
self.inventory_fetch_order(),
3734
not self.delta_on_metadata()))
3735
elif knit_kind == "signatures":
3736
# Nothing to do here; this will be taken care of when
3737
# _fetch_revision_texts happens.
3739
elif knit_kind == "revisions":
3740
for record in self._fetch_revision_texts(revs):
3743
raise AssertionError("Unknown knit kind %r" % knit_kind)
3745
def get_stream_for_missing_keys(self, missing_keys):
3746
# missing keys can only occur when we are byte copying and not
3747
# translating (because translation means we don't send
3748
# unreconstructable deltas ever).
3750
keys['texts'] = set()
3751
keys['revisions'] = set()
3752
keys['inventories'] = set()
3753
keys['signatures'] = set()
3754
for key in missing_keys:
3755
keys[key[0]].add(key[1:])
3756
if len(keys['revisions']):
3757
# If we allowed copying revisions at this point, we could end up
3758
# copying a revision without copying its required texts: a
3759
# violation of the requirements for repository integrity.
3760
raise AssertionError(
3761
'cannot copy revisions to fill in missing deltas %s' % (
3762
keys['revisions'],))
3763
for substream_kind, keys in keys.iteritems():
3764
vf = getattr(self.from_repository, substream_kind)
3765
# Ask for full texts always so that we don't need more round trips
3766
# after this stream.
3767
stream = vf.get_record_stream(keys,
3768
self.to_format._fetch_order, True)
3769
yield substream_kind, stream
3771
def inventory_fetch_order(self):
3772
if self._rich_root_upgrade():
3773
return 'topological'
3775
return self.to_format._fetch_order
3777
def _rich_root_upgrade(self):
3778
return (not self.from_repository._format.rich_root_data and
3779
self.to_format.rich_root_data)