108
108
def encode_name(content_kind, revision_id, file_id=None):
109
109
"""Encode semantic ids as a container name"""
110
assert content_kind in ('revision', 'file', 'inventory', 'signature',
110
if content_kind not in ('revision', 'file', 'inventory', 'signature',
112
raise ValueError(content_kind)
113
113
if content_kind == 'file':
114
assert file_id is not None
115
raise AssertionError()
116
assert file_id is None
117
if file_id is not None:
118
raise AssertionError()
117
119
if content_kind == 'info':
118
assert revision_id is None
120
assert revision_id is not None
120
if revision_id is not None:
121
raise AssertionError()
122
elif revision_id is None:
123
raise AssertionError()
121
124
names = [n.replace('/', '//') for n in
122
125
(content_kind, revision_id, file_id) if n is not None]
123
126
return '/'.join(names)
147
def __init__(self, fileobj):
150
def __init__(self, fileobj, stream_input=True):
153
:param fileobj: a file containing a bzip-encoded container
154
:param stream_input: If True, the BundleReader stream input rather than
155
reading it all into memory at once. Reading it into memory all at
156
once is (currently) faster.
148
158
line = fileobj.readline()
150
160
fileobj.readline()
151
161
self.patch_lines = []
152
self._container = pack.ContainerReader(
153
iterablefile.IterableFile(self.iter_decode(fileobj)))
163
source_file = iterablefile.IterableFile(self.iter_decode(fileobj))
165
source_file = StringIO(bz2.decompress(fileobj.read()))
166
self._container_file = source_file
156
169
def iter_decode(fileobj):
157
170
"""Iterate through decoded fragments of the file"""
158
171
decompressor = bz2.BZ2Decompressor()
159
172
for line in fileobj:
160
yield decompressor.decompress(line)
174
yield decompressor.decompress(line)
163
179
def decode_name(name):
189
205
:return: a generator of (bytes, metadata, content_kind, revision_id,
192
iterator = self._container.iter_records()
193
for names, meta_bytes in iterator:
208
iterator = pack.iter_records_from_file(self._container_file)
209
for names, bytes in iterator:
194
210
if len(names) != 1:
195
211
raise errors.BadBundle('Record has %d names instead of 1'
197
metadata = bencode.bdecode(meta_bytes(None))
213
metadata = bencode.bdecode(bytes)
198
214
if metadata['storage_kind'] == 'header':
201
217
_unused, bytes = iterator.next()
203
218
yield (bytes, metadata) + self.decode_name(names[0][0])
206
class BundleSerializerV4(serializer.BundleSerializer):
221
class BundleSerializerV4(bundle_serializer.BundleSerializer):
207
222
"""Implement the high-level bundle interface"""
209
224
def write(self, repository, revision_ids, forced_bases, fileobj):
255
270
self.repository = repository
256
271
bundle = BundleWriter(fileobj)
257
272
self.bundle = bundle
258
self.base_ancestry = set(repository.get_ancestry(base,
260
273
if revision_ids is not None:
261
274
self.revision_ids = revision_ids
263
revision_ids = set(repository.get_ancestry(target,
265
self.revision_ids = revision_ids.difference(self.base_ancestry)
276
graph = repository.get_graph()
277
revision_ids = graph.find_unique_ancestors(target, [base])
279
parents = graph.get_parent_map(revision_ids)
280
self.revision_ids = [r for r in revision_ids if r in parents]
281
self.revision_keys = set([(revid,) for revid in self.revision_ids])
267
283
def do_write(self):
268
284
"""Write all data to the bundle"""
272
self.write_revisions()
285
trace.note('Bundling %d revision(s).', len(self.revision_ids))
286
self.repository.lock_read()
291
self.write_revisions()
294
self.repository.unlock()
274
295
return self.revision_ids
276
297
def write_info(self):
281
302
self.bundle.add_info_record(serializer=serializer_format,
282
303
supports_rich_root=supports_rich_root)
284
def iter_file_revisions(self):
285
"""Iterate through all relevant revisions of all files.
287
This is the correct implementation, but is not compatible with bzr.dev,
288
because certain old revisions were not converted correctly, and have
289
the wrong "revision" marker in inventories.
291
transaction = self.repository.get_transaction()
292
altered = self.repository.fileids_altered_by_revision_ids(
294
for file_id, file_revision_ids in altered.iteritems():
295
vf = self.repository.weave_store.get_weave(file_id, transaction)
296
yield vf, file_id, file_revision_ids
298
def iter_file_revisions_aggressive(self):
299
"""Iterate through all relevant revisions of all files.
301
This uses the standard iter_file_revisions to determine what revisions
302
are referred to by inventories, but then uses the versionedfile to
303
determine what the build-dependencies of each required revision.
305
All build dependencies which are not ancestors of the base revision
308
for vf, file_id, file_revision_ids in self.iter_file_revisions():
309
new_revision_ids = set()
310
pending = list(file_revision_ids)
311
while len(pending) > 0:
312
revision_id = pending.pop()
313
if revision_id in new_revision_ids:
315
if revision_id in self.base_ancestry:
317
new_revision_ids.add(revision_id)
318
pending.extend(vf.get_parents(revision_id))
319
yield vf, file_id, new_revision_ids
321
305
def write_files(self):
322
306
"""Write bundle records for all revisions of all files"""
323
for vf, file_id, revision_ids in self.iter_file_revisions_aggressive():
324
self.add_mp_records('file', file_id, vf, revision_ids)
308
altered_fileids = self.repository.fileids_altered_by_revision_ids(
310
for file_id, revision_ids in altered_fileids.iteritems():
311
for revision_id in revision_ids:
312
text_keys.append((file_id, revision_id))
313
self._add_mp_records_keys('file', self.repository.texts, text_keys)
326
315
def write_revisions(self):
327
316
"""Write bundle records for all revisions and signatures"""
328
inv_vf = self.repository.get_inventory_weave()
329
revision_order = list(multiparent.topo_iter(inv_vf, self.revision_ids))
317
inv_vf = self.repository.inventories
318
revision_order = [key[-1] for key in multiparent.topo_iter_keys(inv_vf,
330
320
if self.target is not None and self.target in self.revision_ids:
331
321
revision_order.remove(self.target)
332
322
revision_order.append(self.target)
333
self.add_mp_records('inventory', None, inv_vf, revision_order)
334
parents_list = self.repository.get_parents(revision_order)
335
for parents, revision_id in zip(parents_list, revision_order):
336
revision_text = self.repository.get_revision_xml(revision_id)
323
self._add_mp_records_keys('inventory', inv_vf, [(revid,) for revid in revision_order])
324
parent_map = self.repository.get_parent_map(revision_order)
325
revision_to_str = self.repository._serializer.write_revision_to_string
326
revisions = self.repository.get_revisions(revision_order)
327
for revision in revisions:
328
revision_id = revision.revision_id
329
parents = parent_map.get(revision_id, None)
330
revision_text = revision_to_str(revision)
337
331
self.bundle.add_fulltext_record(revision_text, parents,
338
332
'revision', revision_id)
358
352
base = parents[0]
359
353
return base, target
361
def add_mp_records(self, repo_kind, file_id, vf, revision_ids):
355
def _add_mp_records_keys(self, repo_kind, vf, keys):
362
356
"""Add multi-parent diff records to a bundle"""
363
revision_ids = list(multiparent.topo_iter(vf, revision_ids))
364
mpdiffs = vf.make_mpdiffs(revision_ids)
365
sha1s = vf.get_sha1s(revision_ids)
366
for mpdiff, revision_id, sha1, in zip(mpdiffs, revision_ids, sha1s):
367
parents = vf.get_parents(revision_id)
357
ordered_keys = list(multiparent.topo_iter_keys(vf, keys))
358
mpdiffs = vf.make_mpdiffs(ordered_keys)
359
sha1s = vf.get_sha1s(ordered_keys)
360
parent_map = vf.get_parent_map(ordered_keys)
361
for mpdiff, item_key, in zip(mpdiffs, ordered_keys):
362
sha1 = sha1s[item_key]
363
parents = [key[-1] for key in parent_map[item_key]]
368
364
text = ''.join(mpdiff.to_patch())
365
# Infer file id records as appropriate.
366
if len(item_key) == 2:
367
file_id = item_key[0]
369
370
self.bundle.add_multiparent_record(text, sha1, parents, repo_kind,
370
revision_id, file_id)
371
item_key[-1], file_id)
373
374
class BundleInfoV4(object):
382
383
def install(self, repository):
383
384
return self.install_revisions(repository)
385
def install_revisions(self, repository):
386
"""Install this bundle's revisions into the specified repository"""
386
def install_revisions(self, repository, stream_input=True):
387
"""Install this bundle's revisions into the specified repository
389
:param target_repo: The repository to install into
390
:param stream_input: If True, will stream input rather than reading it
391
all into memory at once. Reading it into memory all at once is
387
394
repository.lock_write()
389
ri = RevisionInstaller(self.get_bundle_reader(),
396
ri = RevisionInstaller(self.get_bundle_reader(stream_input),
390
397
self._serializer, repository)
391
398
return ri.install()
444
457
self._info = None
446
459
def install(self):
447
"""Perform the installation"""
460
"""Perform the installation.
462
Must be called with the Repository locked.
464
self._repository.start_write_group()
466
result = self._install_in_write_group()
468
self._repository.abort_write_group()
470
self._repository.commit_write_group()
473
def _install_in_write_group(self):
448
474
current_file = None
449
475
current_versionedfile = None
450
476
pending_file_records = []
478
pending_inventory_records = []
451
479
added_inv = set()
452
480
target_revision = None
453
481
for bytes, metadata, repo_kind, revision_id, file_id in\
454
482
self._container.iter_records():
455
483
if repo_kind == 'info':
456
assert self._info is None
484
if self._info is not None:
485
raise AssertionError()
457
486
self._handle_info(metadata)
458
if repo_kind != 'file':
459
self._install_mp_records(current_versionedfile,
487
if (pending_file_records and
488
(repo_kind, file_id) != ('file', current_file)):
489
# Flush the data for a single file - prevents memory
490
# spiking due to buffering all files in memory.
491
self._install_mp_records_keys(self._repository.texts,
460
492
pending_file_records)
461
493
current_file = None
462
current_versionedfile = None
463
pending_file_records = []
464
if repo_kind == 'inventory':
465
self._install_inventory(revision_id, metadata, bytes)
466
if repo_kind == 'revision':
467
target_revision = revision_id
468
self._install_revision(revision_id, metadata, bytes)
469
if repo_kind == 'signature':
470
self._install_signature(revision_id, metadata, bytes)
494
del pending_file_records[:]
495
if len(pending_inventory_records) > 0 and repo_kind != 'inventory':
496
self._install_inventory_records(pending_inventory_records)
497
pending_inventory_records = []
498
if repo_kind == 'inventory':
499
pending_inventory_records.append(((revision_id,), metadata, bytes))
500
if repo_kind == 'revision':
501
target_revision = revision_id
502
self._install_revision(revision_id, metadata, bytes)
503
if repo_kind == 'signature':
504
self._install_signature(revision_id, metadata, bytes)
471
505
if repo_kind == 'file':
472
if file_id != current_file:
473
self._install_mp_records(current_versionedfile,
474
pending_file_records)
475
current_file = file_id
476
current_versionedfile = \
477
self._repository.weave_store.get_weave_or_empty(
478
file_id, self._repository.get_transaction())
479
pending_file_records = []
480
if revision_id in current_versionedfile:
482
pending_file_records.append((revision_id, metadata, bytes))
483
self._install_mp_records(current_versionedfile, pending_file_records)
506
current_file = file_id
507
pending_file_records.append(((file_id, revision_id), metadata, bytes))
508
self._install_mp_records_keys(self._repository.texts, pending_file_records)
484
509
return target_revision
486
511
def _handle_info(self, info):
501
526
records if r not in versionedfile]
502
527
versionedfile.add_mpdiffs(vf_records)
504
def _install_inventory(self, revision_id, metadata, text):
505
vf = self._repository.get_inventory_weave()
506
if revision_id in vf:
508
parent_ids = metadata['parents']
529
def _install_mp_records_keys(self, versionedfile, records):
530
d_func = multiparent.MultiParent.from_patch
532
for key, meta, text in records:
533
# Adapt to tuple interface: A length two key is a file_id,
534
# revision_id pair, a length 1 key is a
535
# revision/signature/inventory. We need to do this because
536
# the metadata extraction from the bundle has not yet been updated
537
# to use the consistent tuple interface itself.
542
parents = [prefix + (parent,) for parent in meta['parents']]
543
vf_records.append((key, parents, meta['sha1'], d_func(text)))
544
versionedfile.add_mpdiffs(vf_records)
546
def _install_inventory_records(self, records):
509
547
if self._info['serializer'] == self._repository._serializer.format_num:
510
return self._install_mp_records(vf, [(revision_id, metadata,
512
parents = [self._repository.get_inventory(p)
514
parent_texts = [self._source_serializer.write_inventory_to_string(p)
516
target_lines = multiparent.MultiParent.from_patch(text).to_lines(
518
sha1 = osutils.sha_strings(target_lines)
519
if sha1 != metadata['sha1']:
520
raise errors.BadBundle("Can't convert to target format")
521
target_inv = self._source_serializer.read_inventory_from_string(
522
''.join(target_lines))
523
self._handle_root(target_inv, parent_ids)
525
self._repository.add_inventory(revision_id, target_inv, parent_ids)
526
except errors.UnsupportedInventoryKind:
527
raise errors.IncompatibleRevision(repr(self._repository))
548
return self._install_mp_records_keys(self._repository.inventories,
550
for key, metadata, bytes in records:
551
revision_id = key[-1]
552
parent_ids = metadata['parents']
553
parents = [self._repository.get_inventory(p)
555
p_texts = [self._source_serializer.write_inventory_to_string(p)
557
target_lines = multiparent.MultiParent.from_patch(bytes).to_lines(
559
sha1 = osutils.sha_strings(target_lines)
560
if sha1 != metadata['sha1']:
561
raise errors.BadBundle("Can't convert to target format")
562
target_inv = self._source_serializer.read_inventory_from_string(
563
''.join(target_lines))
564
self._handle_root(target_inv, parent_ids)
566
self._repository.add_inventory(revision_id, target_inv,
568
except errors.UnsupportedInventoryKind:
569
raise errors.IncompatibleRevision(repr(self._repository))
529
571
def _handle_root(self, target_inv, parent_ids):
530
572
revision_id = target_inv.revision_id
531
573
if self.update_root:
532
target_inv.root.revision = revision_id
533
store = self._repository.weave_store
534
transaction = self._repository.get_transaction()
535
vf = store.get_weave_or_empty(target_inv.root.file_id, transaction)
536
vf.add_lines(revision_id, parent_ids, [])
574
text_key = (target_inv.root.file_id, revision_id)
575
parent_keys = [(target_inv.root.file_id, parent) for
576
parent in parent_ids]
577
self._repository.texts.add_lines(text_key, parent_keys, [])
537
578
elif not self._repository.supports_rich_root():
538
579
if target_inv.root.revision != revision_id:
539
580
raise errors.IncompatibleRevision(repr(self._repository))
542
582
def _install_revision(self, revision_id, metadata, text):
543
583
if self._repository.has_revision(revision_id):
545
self._repository._add_revision_text(revision_id, text)
585
revision = self._source_serializer.read_revision_from_string(text)
586
self._repository.add_revision(revision.revision_id, revision)
547
588
def _install_signature(self, revision_id, metadata, text):
548
589
transaction = self._repository.get_transaction()
549
if self._repository._revision_store.has_signature(revision_id,
590
if self._repository.has_signature_for_revision_id(revision_id):
552
self._repository._revision_store.add_revision_signature_text(
553
revision_id, text, transaction)
592
self._repository.add_signature_text(revision_id, text)