/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/bundle/serializer/v4.py

merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
from cStringIO import StringIO
18
18
import bz2
27
27
    pack,
28
28
    revision as _mod_revision,
29
29
    trace,
30
 
    xml_serializer,
 
30
    serializer,
31
31
    )
32
 
from bzrlib.bundle import bundle_data, serializer
 
32
from bzrlib.bundle import bundle_data, serializer as bundle_serializer
33
33
from bzrlib import bencode
34
34
 
35
35
 
54
54
 
55
55
    def begin(self):
56
56
        """Start writing the bundle"""
57
 
        self._fileobj.write(serializer._get_bundle_header(
58
 
            serializer.v4_string))
 
57
        self._fileobj.write(bundle_serializer._get_bundle_header(
 
58
            bundle_serializer.v4_string))
59
59
        self._fileobj.write('#\n')
60
60
        self._container.begin()
61
61
 
107
107
    @staticmethod
108
108
    def encode_name(content_kind, revision_id, file_id=None):
109
109
        """Encode semantic ids as a container name"""
110
 
        assert content_kind in ('revision', 'file', 'inventory', 'signature',
111
 
                                'info')
112
 
 
 
110
        if content_kind not in ('revision', 'file', 'inventory', 'signature',
 
111
                'info'):
 
112
            raise ValueError(content_kind)
113
113
        if content_kind == 'file':
114
 
            assert file_id is not None
 
114
            if file_id is None:
 
115
                raise AssertionError()
115
116
        else:
116
 
            assert file_id is None
 
117
            if file_id is not None:
 
118
                raise AssertionError()
117
119
        if content_kind == 'info':
118
 
            assert revision_id is None
119
 
        else:
120
 
            assert revision_id is not None
 
120
            if revision_id is not None:
 
121
                raise AssertionError()
 
122
        elif revision_id is None:
 
123
            raise AssertionError()
121
124
        names = [n.replace('/', '//') for n in
122
125
                 (content_kind, revision_id, file_id) if n is not None]
123
126
        return '/'.join(names)
144
147
    body
145
148
    """
146
149
 
147
 
    def __init__(self, fileobj):
 
150
    def __init__(self, fileobj, stream_input=True):
 
151
        """Constructor
 
152
 
 
153
        :param fileobj: a file containing a bzip-encoded container
 
154
        :param stream_input: If True, the BundleReader stream input rather than
 
155
            reading it all into memory at once.  Reading it into memory all at
 
156
            once is (currently) faster.
 
157
        """
148
158
        line = fileobj.readline()
149
159
        if line != '\n':
150
160
            fileobj.readline()
151
161
        self.patch_lines = []
152
 
        self._container = pack.ContainerReader(
153
 
            iterablefile.IterableFile(self.iter_decode(fileobj)))
 
162
        if stream_input:
 
163
            source_file = iterablefile.IterableFile(self.iter_decode(fileobj))
 
164
        else:
 
165
            source_file = StringIO(bz2.decompress(fileobj.read()))
 
166
        self._container_file = source_file
154
167
 
155
168
    @staticmethod
156
169
    def iter_decode(fileobj):
157
170
        """Iterate through decoded fragments of the file"""
158
171
        decompressor = bz2.BZ2Decompressor()
159
172
        for line in fileobj:
160
 
            yield decompressor.decompress(line)
 
173
            try:
 
174
                yield decompressor.decompress(line)
 
175
            except EOFError:
 
176
                return
161
177
 
162
178
    @staticmethod
163
179
    def decode_name(name):
189
205
        :return: a generator of (bytes, metadata, content_kind, revision_id,
190
206
            file_id)
191
207
        """
192
 
        iterator = self._container.iter_records()
193
 
        for names, meta_bytes in iterator:
 
208
        iterator = pack.iter_records_from_file(self._container_file)
 
209
        for names, bytes in iterator:
194
210
            if len(names) != 1:
195
211
                raise errors.BadBundle('Record has %d names instead of 1'
196
212
                                       % len(names))
197
 
            metadata = bencode.bdecode(meta_bytes(None))
 
213
            metadata = bencode.bdecode(bytes)
198
214
            if metadata['storage_kind'] == 'header':
199
215
                bytes = None
200
216
            else:
201
217
                _unused, bytes = iterator.next()
202
 
                bytes = bytes(None)
203
218
            yield (bytes, metadata) + self.decode_name(names[0][0])
204
219
 
205
220
 
206
 
class BundleSerializerV4(serializer.BundleSerializer):
 
221
class BundleSerializerV4(bundle_serializer.BundleSerializer):
207
222
    """Implement the high-level bundle interface"""
208
223
 
209
224
    def write(self, repository, revision_ids, forced_bases, fileobj):
235
250
    @staticmethod
236
251
    def get_source_serializer(info):
237
252
        """Retrieve the serializer for a given info object"""
238
 
        return xml_serializer.format_registry.get(info['serializer'])
 
253
        return serializer.format_registry.get(info['serializer'])
239
254
 
240
255
 
241
256
class BundleWriteOperation(object):
255
270
        self.repository = repository
256
271
        bundle = BundleWriter(fileobj)
257
272
        self.bundle = bundle
258
 
        self.base_ancestry = set(repository.get_ancestry(base,
259
 
                                                         topo_sorted=False))
260
273
        if revision_ids is not None:
261
274
            self.revision_ids = revision_ids
262
275
        else:
263
 
            revision_ids = set(repository.get_ancestry(target,
264
 
                                                       topo_sorted=False))
265
 
            self.revision_ids = revision_ids.difference(self.base_ancestry)
 
276
            graph = repository.get_graph()
 
277
            revision_ids = graph.find_unique_ancestors(target, [base])
 
278
            # Strip ghosts
 
279
            parents = graph.get_parent_map(revision_ids)
 
280
            self.revision_ids = [r for r in revision_ids if r in parents]
 
281
        self.revision_keys = set([(revid,) for revid in self.revision_ids])
266
282
 
267
283
    def do_write(self):
268
284
        """Write all data to the bundle"""
269
 
        self.bundle.begin()
270
 
        self.write_info()
271
 
        self.write_files()
272
 
        self.write_revisions()
273
 
        self.bundle.end()
 
285
        trace.note('Bundling %d revision(s).', len(self.revision_ids))
 
286
        self.repository.lock_read()
 
287
        try:
 
288
            self.bundle.begin()
 
289
            self.write_info()
 
290
            self.write_files()
 
291
            self.write_revisions()
 
292
            self.bundle.end()
 
293
        finally:
 
294
            self.repository.unlock()
274
295
        return self.revision_ids
275
296
 
276
297
    def write_info(self):
281
302
        self.bundle.add_info_record(serializer=serializer_format,
282
303
                                    supports_rich_root=supports_rich_root)
283
304
 
284
 
    def iter_file_revisions(self):
285
 
        """Iterate through all relevant revisions of all files.
286
 
 
287
 
        This is the correct implementation, but is not compatible with bzr.dev,
288
 
        because certain old revisions were not converted correctly, and have
289
 
        the wrong "revision" marker in inventories.
290
 
        """
291
 
        transaction = self.repository.get_transaction()
292
 
        altered = self.repository.fileids_altered_by_revision_ids(
293
 
            self.revision_ids)
294
 
        for file_id, file_revision_ids in altered.iteritems():
295
 
            vf = self.repository.weave_store.get_weave(file_id, transaction)
296
 
            yield vf, file_id, file_revision_ids
297
 
 
298
 
    def iter_file_revisions_aggressive(self):
299
 
        """Iterate through all relevant revisions of all files.
300
 
 
301
 
        This uses the standard iter_file_revisions to determine what revisions
302
 
        are referred to by inventories, but then uses the versionedfile to
303
 
        determine what the build-dependencies of each required revision.
304
 
 
305
 
        All build dependencies which are not ancestors of the base revision
306
 
        are emitted.
307
 
        """
308
 
        for vf, file_id, file_revision_ids in self.iter_file_revisions():
309
 
            new_revision_ids = set()
310
 
            pending = list(file_revision_ids)
311
 
            while len(pending) > 0:
312
 
                revision_id = pending.pop()
313
 
                if revision_id in new_revision_ids:
314
 
                    continue
315
 
                if revision_id in self.base_ancestry:
316
 
                    continue
317
 
                new_revision_ids.add(revision_id)
318
 
                pending.extend(vf.get_parents(revision_id))
319
 
            yield vf, file_id, new_revision_ids
320
 
 
321
305
    def write_files(self):
322
306
        """Write bundle records for all revisions of all files"""
323
 
        for vf, file_id, revision_ids in self.iter_file_revisions_aggressive():
324
 
            self.add_mp_records('file', file_id, vf, revision_ids)
 
307
        text_keys = []
 
308
        altered_fileids = self.repository.fileids_altered_by_revision_ids(
 
309
                self.revision_ids)
 
310
        for file_id, revision_ids in altered_fileids.iteritems():
 
311
            for revision_id in revision_ids:
 
312
                text_keys.append((file_id, revision_id))
 
313
        self._add_mp_records_keys('file', self.repository.texts, text_keys)
325
314
 
326
315
    def write_revisions(self):
327
316
        """Write bundle records for all revisions and signatures"""
328
 
        inv_vf = self.repository.get_inventory_weave()
329
 
        revision_order = list(multiparent.topo_iter(inv_vf, self.revision_ids))
 
317
        inv_vf = self.repository.inventories
 
318
        revision_order = [key[-1] for key in multiparent.topo_iter_keys(inv_vf,
 
319
            self.revision_keys)]
330
320
        if self.target is not None and self.target in self.revision_ids:
331
321
            revision_order.remove(self.target)
332
322
            revision_order.append(self.target)
333
 
        self.add_mp_records('inventory', None, inv_vf, revision_order)
334
 
        parents_list = self.repository.get_parents(revision_order)
335
 
        for parents, revision_id in zip(parents_list, revision_order):
336
 
            revision_text = self.repository.get_revision_xml(revision_id)
 
323
        self._add_mp_records_keys('inventory', inv_vf, [(revid,) for revid in revision_order])
 
324
        parent_map = self.repository.get_parent_map(revision_order)
 
325
        revision_to_str = self.repository._serializer.write_revision_to_string
 
326
        revisions = self.repository.get_revisions(revision_order)
 
327
        for revision in revisions:
 
328
            revision_id = revision.revision_id
 
329
            parents = parent_map.get(revision_id, None)
 
330
            revision_text = revision_to_str(revision)
337
331
            self.bundle.add_fulltext_record(revision_text, parents,
338
332
                                       'revision', revision_id)
339
333
            try:
358
352
                base = parents[0]
359
353
        return base, target
360
354
 
361
 
    def add_mp_records(self, repo_kind, file_id, vf, revision_ids):
 
355
    def _add_mp_records_keys(self, repo_kind, vf, keys):
362
356
        """Add multi-parent diff records to a bundle"""
363
 
        revision_ids = list(multiparent.topo_iter(vf, revision_ids))
364
 
        mpdiffs = vf.make_mpdiffs(revision_ids)
365
 
        sha1s = vf.get_sha1s(revision_ids)
366
 
        for mpdiff, revision_id, sha1, in zip(mpdiffs, revision_ids, sha1s):
367
 
            parents = vf.get_parents(revision_id)
 
357
        ordered_keys = list(multiparent.topo_iter_keys(vf, keys))
 
358
        mpdiffs = vf.make_mpdiffs(ordered_keys)
 
359
        sha1s = vf.get_sha1s(ordered_keys)
 
360
        parent_map = vf.get_parent_map(ordered_keys)
 
361
        for mpdiff, item_key, in zip(mpdiffs, ordered_keys):
 
362
            sha1 = sha1s[item_key]
 
363
            parents = [key[-1] for key in parent_map[item_key]]
368
364
            text = ''.join(mpdiff.to_patch())
 
365
            # Infer file id records as appropriate.
 
366
            if len(item_key) == 2:
 
367
                file_id = item_key[0]
 
368
            else:
 
369
                file_id = None
369
370
            self.bundle.add_multiparent_record(text, sha1, parents, repo_kind,
370
 
                                               revision_id, file_id)
 
371
                                               item_key[-1], file_id)
371
372
 
372
373
 
373
374
class BundleInfoV4(object):
382
383
    def install(self, repository):
383
384
        return self.install_revisions(repository)
384
385
 
385
 
    def install_revisions(self, repository):
386
 
        """Install this bundle's revisions into the specified repository"""
 
386
    def install_revisions(self, repository, stream_input=True):
 
387
        """Install this bundle's revisions into the specified repository
 
388
 
 
389
        :param target_repo: The repository to install into
 
390
        :param stream_input: If True, will stream input rather than reading it
 
391
            all into memory at once.  Reading it into memory all at once is
 
392
            (currently) faster.
 
393
        """
387
394
        repository.lock_write()
388
395
        try:
389
 
            ri = RevisionInstaller(self.get_bundle_reader(),
 
396
            ri = RevisionInstaller(self.get_bundle_reader(stream_input),
390
397
                                   self._serializer, repository)
391
398
            return ri.install()
392
399
        finally:
399
406
        """
400
407
        return None, self.target, 'inapplicable'
401
408
 
402
 
    def get_bundle_reader(self):
 
409
    def get_bundle_reader(self, stream_input=True):
 
410
        """Return a new BundleReader for the associated bundle
 
411
 
 
412
        :param stream_input: If True, the BundleReader stream input rather than
 
413
            reading it all into memory at once.  Reading it into memory all at
 
414
            once is (currently) faster.
 
415
        """
403
416
        self._fileobj.seek(0)
404
 
        return BundleReader(self._fileobj)
 
417
        return BundleReader(self._fileobj, stream_input)
405
418
 
406
419
    def _get_real_revisions(self):
407
420
        if self.__real_revisions is None:
444
457
        self._info = None
445
458
 
446
459
    def install(self):
447
 
        """Perform the installation"""
 
460
        """Perform the installation.
 
461
 
 
462
        Must be called with the Repository locked.
 
463
        """
 
464
        self._repository.start_write_group()
 
465
        try:
 
466
            result = self._install_in_write_group()
 
467
        except:
 
468
            self._repository.abort_write_group()
 
469
            raise
 
470
        self._repository.commit_write_group()
 
471
        return result
 
472
 
 
473
    def _install_in_write_group(self):
448
474
        current_file = None
449
475
        current_versionedfile = None
450
476
        pending_file_records = []
 
477
        inventory_vf = None
 
478
        pending_inventory_records = []
451
479
        added_inv = set()
452
480
        target_revision = None
453
481
        for bytes, metadata, repo_kind, revision_id, file_id in\
454
482
            self._container.iter_records():
455
483
            if repo_kind == 'info':
456
 
                assert self._info is None
 
484
                if self._info is not None:
 
485
                    raise AssertionError()
457
486
                self._handle_info(metadata)
458
 
            if repo_kind != 'file':
459
 
                self._install_mp_records(current_versionedfile,
 
487
            if (pending_file_records and
 
488
                (repo_kind, file_id) != ('file', current_file)):
 
489
                # Flush the data for a single file - prevents memory
 
490
                # spiking due to buffering all files in memory.
 
491
                self._install_mp_records_keys(self._repository.texts,
460
492
                    pending_file_records)
461
493
                current_file = None
462
 
                current_versionedfile = None
463
 
                pending_file_records = []
464
 
                if repo_kind == 'inventory':
465
 
                    self._install_inventory(revision_id, metadata, bytes)
466
 
                if repo_kind == 'revision':
467
 
                    target_revision = revision_id
468
 
                    self._install_revision(revision_id, metadata, bytes)
469
 
                if repo_kind == 'signature':
470
 
                    self._install_signature(revision_id, metadata, bytes)
 
494
                del pending_file_records[:]
 
495
            if len(pending_inventory_records) > 0 and repo_kind != 'inventory':
 
496
                self._install_inventory_records(pending_inventory_records)
 
497
                pending_inventory_records = []
 
498
            if repo_kind == 'inventory':
 
499
                pending_inventory_records.append(((revision_id,), metadata, bytes))
 
500
            if repo_kind == 'revision':
 
501
                target_revision = revision_id
 
502
                self._install_revision(revision_id, metadata, bytes)
 
503
            if repo_kind == 'signature':
 
504
                self._install_signature(revision_id, metadata, bytes)
471
505
            if repo_kind == 'file':
472
 
                if file_id != current_file:
473
 
                    self._install_mp_records(current_versionedfile,
474
 
                        pending_file_records)
475
 
                    current_file = file_id
476
 
                    current_versionedfile = \
477
 
                        self._repository.weave_store.get_weave_or_empty(
478
 
                        file_id, self._repository.get_transaction())
479
 
                    pending_file_records = []
480
 
                if revision_id in current_versionedfile:
481
 
                    continue
482
 
                pending_file_records.append((revision_id, metadata, bytes))
483
 
        self._install_mp_records(current_versionedfile, pending_file_records)
 
506
                current_file = file_id
 
507
                pending_file_records.append(((file_id, revision_id), metadata, bytes))
 
508
        self._install_mp_records_keys(self._repository.texts, pending_file_records)
484
509
        return target_revision
485
510
 
486
511
    def _handle_info(self, info):
501
526
                      records if r not in versionedfile]
502
527
        versionedfile.add_mpdiffs(vf_records)
503
528
 
504
 
    def _install_inventory(self, revision_id, metadata, text):
505
 
        vf = self._repository.get_inventory_weave()
506
 
        if revision_id in vf:
507
 
            return
508
 
        parent_ids = metadata['parents']
 
529
    def _install_mp_records_keys(self, versionedfile, records):
 
530
        d_func = multiparent.MultiParent.from_patch
 
531
        vf_records = []
 
532
        for key, meta, text in records:
 
533
            # Adapt to tuple interface: A length two key is a file_id,
 
534
            # revision_id pair, a length 1 key is a
 
535
            # revision/signature/inventory. We need to do this because
 
536
            # the metadata extraction from the bundle has not yet been updated
 
537
            # to use the consistent tuple interface itself.
 
538
            if len(key) == 2:
 
539
                prefix = key[:1]
 
540
            else:
 
541
                prefix = ()
 
542
            parents = [prefix + (parent,) for parent in meta['parents']]
 
543
            vf_records.append((key, parents, meta['sha1'], d_func(text)))
 
544
        versionedfile.add_mpdiffs(vf_records)
 
545
 
 
546
    def _install_inventory_records(self, records):
509
547
        if self._info['serializer'] == self._repository._serializer.format_num:
510
 
            return self._install_mp_records(vf, [(revision_id, metadata,
511
 
                                                  text)])
512
 
        parents = [self._repository.get_inventory(p)
513
 
                   for p in parent_ids]
514
 
        parent_texts = [self._source_serializer.write_inventory_to_string(p)
515
 
                        for p in parents]
516
 
        target_lines = multiparent.MultiParent.from_patch(text).to_lines(
517
 
            parent_texts)
518
 
        sha1 = osutils.sha_strings(target_lines)
519
 
        if sha1 != metadata['sha1']:
520
 
            raise errors.BadBundle("Can't convert to target format")
521
 
        target_inv = self._source_serializer.read_inventory_from_string(
522
 
            ''.join(target_lines))
523
 
        self._handle_root(target_inv, parent_ids)
524
 
        try:
525
 
            self._repository.add_inventory(revision_id, target_inv, parent_ids)
526
 
        except errors.UnsupportedInventoryKind:
527
 
            raise errors.IncompatibleRevision(repr(self._repository))
 
548
            return self._install_mp_records_keys(self._repository.inventories,
 
549
                records)
 
550
        for key, metadata, bytes in records:
 
551
            revision_id = key[-1]
 
552
            parent_ids = metadata['parents']
 
553
            parents = [self._repository.get_inventory(p)
 
554
                       for p in parent_ids]
 
555
            p_texts = [self._source_serializer.write_inventory_to_string(p)
 
556
                       for p in parents]
 
557
            target_lines = multiparent.MultiParent.from_patch(bytes).to_lines(
 
558
                p_texts)
 
559
            sha1 = osutils.sha_strings(target_lines)
 
560
            if sha1 != metadata['sha1']:
 
561
                raise errors.BadBundle("Can't convert to target format")
 
562
            target_inv = self._source_serializer.read_inventory_from_string(
 
563
                ''.join(target_lines))
 
564
            self._handle_root(target_inv, parent_ids)
 
565
            try:
 
566
                self._repository.add_inventory(revision_id, target_inv,
 
567
                                               parent_ids)
 
568
            except errors.UnsupportedInventoryKind:
 
569
                raise errors.IncompatibleRevision(repr(self._repository))
528
570
 
529
571
    def _handle_root(self, target_inv, parent_ids):
530
572
        revision_id = target_inv.revision_id
531
573
        if self.update_root:
532
 
            target_inv.root.revision = revision_id
533
 
            store = self._repository.weave_store
534
 
            transaction = self._repository.get_transaction()
535
 
            vf = store.get_weave_or_empty(target_inv.root.file_id, transaction)
536
 
            vf.add_lines(revision_id, parent_ids, [])
 
574
            text_key = (target_inv.root.file_id, revision_id)
 
575
            parent_keys = [(target_inv.root.file_id, parent) for
 
576
                parent in parent_ids]
 
577
            self._repository.texts.add_lines(text_key, parent_keys, [])
537
578
        elif not self._repository.supports_rich_root():
538
579
            if target_inv.root.revision != revision_id:
539
580
                raise errors.IncompatibleRevision(repr(self._repository))
540
581
 
541
 
 
542
582
    def _install_revision(self, revision_id, metadata, text):
543
583
        if self._repository.has_revision(revision_id):
544
584
            return
545
 
        self._repository._add_revision_text(revision_id, text)
 
585
        revision = self._source_serializer.read_revision_from_string(text)
 
586
        self._repository.add_revision(revision.revision_id, revision)
546
587
 
547
588
    def _install_signature(self, revision_id, metadata, text):
548
589
        transaction = self._repository.get_transaction()
549
 
        if self._repository._revision_store.has_signature(revision_id,
550
 
                                                          transaction):
 
590
        if self._repository.has_signature_for_revision_id(revision_id):
551
591
            return
552
 
        self._repository._revision_store.add_revision_signature_text(
553
 
            revision_id, text, transaction)
 
592
        self._repository.add_signature_text(revision_id, text)