/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Jelmer Vernooij
  • Date: 2011-11-30 20:02:16 UTC
  • mto: This revision was merged to the branch mainline in revision 6333.
  • Revision ID: jelmer@samba.org-20111130200216-aoju21pdl20d1gkd
Consistently pass tree path when exporting.

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
import sys
23
23
import tempfile
24
24
import threading
 
25
import zlib
25
26
 
26
27
from bzrlib import (
27
28
    bencode,
28
29
    errors,
 
30
    estimate_compressed_size,
29
31
    graph,
30
32
    osutils,
31
33
    pack,
 
34
    trace,
32
35
    ui,
33
 
    versionedfile,
34
36
    )
35
37
from bzrlib.bzrdir import BzrDir
36
38
from bzrlib.smart.request import (
82
84
            recreate_search trusts that clients will look for missing things
83
85
            they expected and get it from elsewhere.
84
86
        """
 
87
        if search_bytes == 'everything':
 
88
            return graph.EverythingResult(repository), None
85
89
        lines = search_bytes.split('\n')
86
90
        if lines[0] == 'ancestry-of':
87
91
            heads = lines[1:]
141
145
            repository.unlock()
142
146
 
143
147
 
 
148
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
 
149
    """Break a repository lock."""
 
150
 
 
151
    def do_repository_request(self, repository):
 
152
        repository.break_lock()
 
153
        return SuccessfulSmartServerResponse(('ok', ))
 
154
 
 
155
 
 
156
_lsprof_count = 0
 
157
 
144
158
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
145
159
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
146
160
 
179
193
        finally:
180
194
            repository.unlock()
181
195
 
182
 
    def _do_repository_request(self, body_bytes):
183
 
        repository = self._repository
184
 
        revision_ids = set(self._revision_ids)
185
 
        include_missing = 'include-missing:' in revision_ids
186
 
        if include_missing:
187
 
            revision_ids.remove('include-missing:')
188
 
        body_lines = body_bytes.split('\n')
189
 
        search_result, error = self.recreate_search_from_recipe(
190
 
            repository, body_lines)
191
 
        if error is not None:
192
 
            return error
193
 
        # TODO might be nice to start up the search again; but thats not
194
 
        # written or tested yet.
195
 
        client_seen_revs = set(search_result.get_keys())
196
 
        # Always include the requested ids.
197
 
        client_seen_revs.difference_update(revision_ids)
198
 
        lines = []
199
 
        repo_graph = repository.get_graph()
 
196
    def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
 
197
                               include_missing, max_size=65536):
200
198
        result = {}
201
199
        queried_revs = set()
202
 
        size_so_far = 0
 
200
        estimator = estimate_compressed_size.ZLibEstimator(max_size)
203
201
        next_revs = revision_ids
204
202
        first_loop_done = False
205
203
        while next_revs:
227
225
                    # add parents to the result
228
226
                    result[encoded_id] = parents
229
227
                    # Approximate the serialized cost of this revision_id.
230
 
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
 
228
                    line = '%s %s\n' % (encoded_id, ' '.join(parents))
 
229
                    estimator.add_content(line)
231
230
            # get all the directly asked for parents, and then flesh out to
232
231
            # 64K (compressed) or so. We do one level of depth at a time to
233
232
            # stay in sync with the client. The 250000 magic number is
234
233
            # estimated compression ratio taken from bzr.dev itself.
235
 
            if self.no_extra_results or (
236
 
                first_loop_done and size_so_far > 250000):
 
234
            if self.no_extra_results or (first_loop_done and estimator.full()):
 
235
                trace.mutter('size: %d, z_size: %d'
 
236
                             % (estimator._uncompressed_size_added,
 
237
                                estimator._compressed_size_added))
237
238
                next_revs = set()
238
239
                break
239
240
            # don't query things we've already queried
240
 
            next_revs.difference_update(queried_revs)
 
241
            next_revs = next_revs.difference(queried_revs)
241
242
            first_loop_done = True
 
243
        return result
 
244
 
 
245
    def _do_repository_request(self, body_bytes):
 
246
        repository = self._repository
 
247
        revision_ids = set(self._revision_ids)
 
248
        include_missing = 'include-missing:' in revision_ids
 
249
        if include_missing:
 
250
            revision_ids.remove('include-missing:')
 
251
        body_lines = body_bytes.split('\n')
 
252
        search_result, error = self.recreate_search_from_recipe(
 
253
            repository, body_lines)
 
254
        if error is not None:
 
255
            return error
 
256
        # TODO might be nice to start up the search again; but thats not
 
257
        # written or tested yet.
 
258
        client_seen_revs = set(search_result.get_keys())
 
259
        # Always include the requested ids.
 
260
        client_seen_revs.difference_update(revision_ids)
 
261
 
 
262
        repo_graph = repository.get_graph()
 
263
        result = self._expand_requested_revs(repo_graph, revision_ids,
 
264
                                             client_seen_revs, include_missing)
242
265
 
243
266
        # sorting trivially puts lexographically similar revision ids together.
244
267
        # Compression FTW.
 
268
        lines = []
245
269
        for revision, parents in sorted(result.items()):
246
270
            lines.append(' '.join((revision, ) + tuple(parents)))
247
271
 
312
336
                ('history-incomplete', earliest_revno, earliest_revid))
313
337
 
314
338
 
 
339
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
 
340
 
 
341
    def do_repository_request(self, repository):
 
342
        """Return the serializer format for this repository.
 
343
 
 
344
        New in 2.5.0.
 
345
 
 
346
        :param repository: The repository to query
 
347
        :return: A smart server response ('ok', FORMAT)
 
348
        """
 
349
        serializer = repository.get_serializer_format()
 
350
        return SuccessfulSmartServerResponse(('ok', serializer))
 
351
 
 
352
 
315
353
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
316
354
 
317
355
    def do_repository_request(self, repository, revision_id):
319
357
 
320
358
        :param repository: The repository to query in.
321
359
        :param revision_id: The utf8 encoded revision_id to lookup.
322
 
        :return: A smart server response of ('ok', ) if the revision is
323
 
            present.
 
360
        :return: A smart server response of ('yes', ) if the revision is
 
361
            present. ('no', ) if it is missing.
324
362
        """
325
363
        if repository.has_revision(revision_id):
326
364
            return SuccessfulSmartServerResponse(('yes', ))
328
366
            return SuccessfulSmartServerResponse(('no', ))
329
367
 
330
368
 
 
369
class SmartServerRequestHasSignatureForRevisionId(
 
370
        SmartServerRepositoryRequest):
 
371
 
 
372
    def do_repository_request(self, repository, revision_id):
 
373
        """Return ok if a signature is present for a revision.
 
374
 
 
375
        Introduced in bzr 2.5.0.
 
376
 
 
377
        :param repository: The repository to query in.
 
378
        :param revision_id: The utf8 encoded revision_id to lookup.
 
379
        :return: A smart server response of ('yes', ) if a
 
380
            signature for the revision is present,
 
381
            ('no', ) if it is missing.
 
382
        """
 
383
        try:
 
384
            if repository.has_signature_for_revision_id(revision_id):
 
385
                return SuccessfulSmartServerResponse(('yes', ))
 
386
            else:
 
387
                return SuccessfulSmartServerResponse(('no', ))
 
388
        except errors.NoSuchRevision:
 
389
            return FailedSmartServerResponse(
 
390
                ('nosuchrevision', revision_id))
 
391
 
 
392
 
331
393
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
332
394
 
333
395
    def do_repository_request(self, repository, revid, committers):
353
415
            decoded_committers = True
354
416
        else:
355
417
            decoded_committers = None
356
 
        stats = repository.gather_stats(decoded_revision_id, decoded_committers)
 
418
        try:
 
419
            stats = repository.gather_stats(decoded_revision_id,
 
420
                decoded_committers)
 
421
        except errors.NoSuchRevision:
 
422
            return FailedSmartServerResponse(('nosuchrevision', revid))
357
423
 
358
424
        body = ''
359
425
        if stats.has_key('committers'):
370
436
        return SuccessfulSmartServerResponse(('ok', ), body)
371
437
 
372
438
 
 
439
class SmartServerRepositoryGetRevisionSignatureText(
 
440
        SmartServerRepositoryRequest):
 
441
    """Return the signature text of a revision.
 
442
 
 
443
    New in 2.5.
 
444
    """
 
445
 
 
446
    def do_repository_request(self, repository, revision_id):
 
447
        """Return the result of repository.get_signature_text().
 
448
 
 
449
        :param repository: The repository to query in.
 
450
        :return: A smart server response of with the signature text as
 
451
            body.
 
452
        """
 
453
        try:
 
454
            text = repository.get_signature_text(revision_id)
 
455
        except errors.NoSuchRevision, err:
 
456
            return FailedSmartServerResponse(
 
457
                ('nosuchrevision', err.revision))
 
458
        return SuccessfulSmartServerResponse(('ok', ), text)
 
459
 
 
460
 
373
461
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
374
462
 
375
463
    def do_repository_request(self, repository):
385
473
            return SuccessfulSmartServerResponse(('no', ))
386
474
 
387
475
 
 
476
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
 
477
 
 
478
    def do_repository_request(self, repository):
 
479
        """Return the result of repository.make_working_trees().
 
480
 
 
481
        Introduced in bzr 2.5.0.
 
482
 
 
483
        :param repository: The repository to query in.
 
484
        :return: A smart server response of ('yes', ) if the repository uses
 
485
            working trees, and ('no', ) if it is not.
 
486
        """
 
487
        if repository.make_working_trees():
 
488
            return SuccessfulSmartServerResponse(('yes', ))
 
489
        else:
 
490
            return SuccessfulSmartServerResponse(('no', ))
 
491
 
 
492
 
388
493
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
389
494
 
390
495
    def do_repository_request(self, repository, token=''):
392
497
        if token == '':
393
498
            token = None
394
499
        try:
395
 
            token = repository.lock_write(token=token)
 
500
            token = repository.lock_write(token=token).repository_token
396
501
        except errors.LockContention, e:
397
502
            return FailedSmartServerResponse(('LockContention',))
398
503
        except errors.UnlockableTransport:
413
518
    def do_repository_request(self, repository, to_network_name):
414
519
        """Get a stream for inserting into a to_format repository.
415
520
 
 
521
        The request body is 'search_bytes', a description of the revisions
 
522
        being requested.
 
523
 
 
524
        In 2.3 this verb added support for search_bytes == 'everything'.  Older
 
525
        implementations will respond with a BadSearch error, and clients should
 
526
        catch this and fallback appropriately.
 
527
 
416
528
        :param repository: The repository to stream from.
417
529
        :param to_network_name: The network name of the format of the target
418
530
            repository.
490
602
 
491
603
 
492
604
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
 
605
    """The same as Repository.get_stream, but will return stream CHK formats to
 
606
    clients.
 
607
 
 
608
    See SmartServerRepositoryGetStream._should_fake_unknown.
 
609
    
 
610
    New in 1.19.
 
611
    """
493
612
 
494
613
    def _should_fake_unknown(self):
495
614
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
505
624
        for record in substream:
506
625
            if record.storage_kind in ('chunked', 'fulltext'):
507
626
                serialised = record_to_fulltext_bytes(record)
508
 
            elif record.storage_kind == 'inventory-delta':
509
 
                serialised = record_to_inventory_delta_bytes(record)
510
627
            elif record.storage_kind == 'absent':
511
628
                raise ValueError("Absent factory for %s" % (record.key,))
512
629
            else:
544
661
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
545
662
    """
546
663
 
547
 
    def __init__(self, byte_stream):
 
664
    def __init__(self, byte_stream, record_counter):
548
665
        """Create a _ByteStreamDecoder."""
549
666
        self.stream_decoder = pack.ContainerPushParser()
550
667
        self.current_type = None
551
668
        self.first_bytes = None
552
669
        self.byte_stream = byte_stream
 
670
        self._record_counter = record_counter
 
671
        self.key_count = 0
553
672
 
554
673
    def iter_stream_decoder(self):
555
674
        """Iterate the contents of the pack from stream_decoder."""
580
699
 
581
700
    def record_stream(self):
582
701
        """Yield substream_type, substream from the byte stream."""
 
702
        def wrap_and_count(pb, rc, substream):
 
703
            """Yield records from stream while showing progress."""
 
704
            counter = 0
 
705
            if rc:
 
706
                if self.current_type != 'revisions' and self.key_count != 0:
 
707
                    # As we know the number of revisions now (in self.key_count)
 
708
                    # we can setup and use record_counter (rc).
 
709
                    if not rc.is_initialized():
 
710
                        rc.setup(self.key_count, self.key_count)
 
711
            for record in substream.read():
 
712
                if rc:
 
713
                    if rc.is_initialized() and counter == rc.STEP:
 
714
                        rc.increment(counter)
 
715
                        pb.update('Estimate', rc.current, rc.max)
 
716
                        counter = 0
 
717
                    if self.current_type == 'revisions':
 
718
                        # Total records is proportional to number of revs
 
719
                        # to fetch. With remote, we used self.key_count to
 
720
                        # track the number of revs. Once we have the revs
 
721
                        # counts in self.key_count, the progress bar changes
 
722
                        # from 'Estimating..' to 'Estimate' above.
 
723
                        self.key_count += 1
 
724
                        if counter == rc.STEP:
 
725
                            pb.update('Estimating..', self.key_count)
 
726
                            counter = 0
 
727
                counter += 1
 
728
                yield record
 
729
 
583
730
        self.seed_state()
 
731
        pb = ui.ui_factory.nested_progress_bar()
 
732
        rc = self._record_counter
584
733
        # Make and consume sub generators, one per substream type:
585
734
        while self.first_bytes is not None:
586
735
            substream = NetworkRecordStream(self.iter_substream_bytes())
587
736
            # after substream is fully consumed, self.current_type is set to
588
737
            # the next type, and self.first_bytes is set to the matching bytes.
589
 
            yield self.current_type, substream.read()
 
738
            yield self.current_type, wrap_and_count(pb, rc, substream)
 
739
        if rc:
 
740
            pb.update('Done', rc.max, rc.max)
 
741
        pb.finished()
590
742
 
591
743
    def seed_state(self):
592
744
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
597
749
        list(self.iter_substream_bytes())
598
750
 
599
751
 
600
 
def _byte_stream_to_stream(byte_stream):
 
752
def _byte_stream_to_stream(byte_stream, record_counter=None):
601
753
    """Convert a byte stream into a format and a stream.
602
754
 
603
755
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
604
756
    :return: (RepositoryFormat, stream_generator)
605
757
    """
606
 
    decoder = _ByteStreamDecoder(byte_stream)
 
758
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
607
759
    for bytes in byte_stream:
608
760
        decoder.stream_decoder.accept_bytes(bytes)
609
761
        for record in decoder.stream_decoder.read_pending_records(max=1):
624
776
        return SuccessfulSmartServerResponse(('ok',))
625
777
 
626
778
 
 
779
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
 
780
    """Get the physical lock status for a repository.
 
781
 
 
782
    New in 2.5.
 
783
    """
 
784
 
 
785
    def do_repository_request(self, repository):
 
786
        if repository.get_physical_lock_status():
 
787
            return SuccessfulSmartServerResponse(('yes', ))
 
788
        else:
 
789
            return SuccessfulSmartServerResponse(('no', ))
 
790
 
 
791
 
627
792
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
628
793
 
629
794
    def do_repository_request(self, repository, str_bool_new_value):
792
957
        self.do_insert_stream_request(repository, resume_tokens)
793
958
 
794
959
 
 
960
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
 
961
    """Add a revision signature text.
 
962
 
 
963
    New in 2.5.
 
964
    """
 
965
 
 
966
    def do_repository_request(self, repository, lock_token, revision_id,
 
967
            *write_group_tokens):
 
968
        """Add a revision signature text.
 
969
 
 
970
        :param repository: Repository to operate on
 
971
        :param lock_token: Lock token
 
972
        :param revision_id: Revision for which to add signature
 
973
        :param write_group_tokens: Write group tokens
 
974
        """
 
975
        self._lock_token = lock_token
 
976
        self._revision_id = revision_id
 
977
        self._write_group_tokens = write_group_tokens
 
978
        return None
 
979
 
 
980
    def do_body(self, body_bytes):
 
981
        """Add a signature text.
 
982
 
 
983
        :param body_bytes: GPG signature text
 
984
        :return: SuccessfulSmartServerResponse with arguments 'ok' and
 
985
            the list of new write group tokens.
 
986
        """
 
987
        self._repository.lock_write(token=self._lock_token)
 
988
        try:
 
989
            self._repository.resume_write_group(self._write_group_tokens)
 
990
            try:
 
991
                self._repository.add_signature_text(self._revision_id,
 
992
                    body_bytes)
 
993
            finally:
 
994
                new_write_group_tokens = self._repository.suspend_write_group()
 
995
        finally:
 
996
            self._repository.unlock()
 
997
        return SuccessfulSmartServerResponse(
 
998
            ('ok', ) + tuple(new_write_group_tokens))
 
999
 
 
1000
 
 
1001
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
 
1002
    """Start a write group.
 
1003
 
 
1004
    New in 2.5.
 
1005
    """
 
1006
 
 
1007
    def do_repository_request(self, repository, lock_token):
 
1008
        """Start a write group."""
 
1009
        repository.lock_write(token=lock_token)
 
1010
        try:
 
1011
            repository.start_write_group()
 
1012
            try:
 
1013
                tokens = repository.suspend_write_group()
 
1014
            except errors.UnsuspendableWriteGroup:
 
1015
                return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
 
1016
        finally:
 
1017
            repository.unlock()
 
1018
        return SuccessfulSmartServerResponse(('ok', tokens))
 
1019
 
 
1020
 
 
1021
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
 
1022
    """Commit a write group.
 
1023
 
 
1024
    New in 2.5.
 
1025
    """
 
1026
 
 
1027
    def do_repository_request(self, repository, lock_token,
 
1028
            write_group_tokens):
 
1029
        """Commit a write group."""
 
1030
        repository.lock_write(token=lock_token)
 
1031
        try:
 
1032
            try:
 
1033
                repository.resume_write_group(write_group_tokens)
 
1034
            except errors.UnresumableWriteGroup, e:
 
1035
                return FailedSmartServerResponse(
 
1036
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1037
            try:
 
1038
                repository.commit_write_group()
 
1039
            except:
 
1040
                write_group_tokens = repository.suspend_write_group()
 
1041
                # FIXME JRV 2011-11-19: What if the write_group_tokens
 
1042
                # have changed?
 
1043
                raise
 
1044
        finally:
 
1045
            repository.unlock()
 
1046
        return SuccessfulSmartServerResponse(('ok', ))
 
1047
 
 
1048
 
 
1049
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
 
1050
    """Abort a write group.
 
1051
 
 
1052
    New in 2.5.
 
1053
    """
 
1054
 
 
1055
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1056
        """Abort a write group."""
 
1057
        repository.lock_write(token=lock_token)
 
1058
        try:
 
1059
            try:
 
1060
                repository.resume_write_group(write_group_tokens)
 
1061
            except errors.UnresumableWriteGroup, e:
 
1062
                return FailedSmartServerResponse(
 
1063
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1064
                repository.abort_write_group()
 
1065
        finally:
 
1066
            repository.unlock()
 
1067
        return SuccessfulSmartServerResponse(('ok', ))
 
1068
 
 
1069
 
 
1070
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
 
1071
    """Check that a write group is still valid.
 
1072
 
 
1073
    New in 2.5.
 
1074
    """
 
1075
 
 
1076
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1077
        """Abort a write group."""
 
1078
        repository.lock_write(token=lock_token)
 
1079
        try:
 
1080
            try:
 
1081
                repository.resume_write_group(write_group_tokens)
 
1082
            except errors.UnresumableWriteGroup, e:
 
1083
                return FailedSmartServerResponse(
 
1084
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1085
            else:
 
1086
                repository.suspend_write_group()
 
1087
        finally:
 
1088
            repository.unlock()
 
1089
        return SuccessfulSmartServerResponse(('ok', ))
 
1090
 
 
1091
 
 
1092
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
 
1093
    """Retrieve all of the revision ids in a repository.
 
1094
 
 
1095
    New in 2.5.
 
1096
    """
 
1097
 
 
1098
    def do_repository_request(self, repository):
 
1099
        revids = repository.all_revision_ids()
 
1100
        return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
 
1101
 
 
1102
 
 
1103
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
 
1104
    """Pack a repository.
 
1105
 
 
1106
    New in 2.5.
 
1107
    """
 
1108
 
 
1109
    def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
 
1110
        self._repository = repository
 
1111
        self._lock_token = lock_token
 
1112
        if clean_obsolete_packs == 'True':
 
1113
            self._clean_obsolete_packs = True
 
1114
        else:
 
1115
            self._clean_obsolete_packs = False
 
1116
        return None
 
1117
 
 
1118
    def do_body(self, body_bytes):
 
1119
        if body_bytes == "":
 
1120
            hint = None
 
1121
        else:
 
1122
            hint = body_bytes.splitlines()
 
1123
        self._repository.lock_write(token=self._lock_token)
 
1124
        try:
 
1125
            self._repository.pack(hint, self._clean_obsolete_packs)
 
1126
        finally:
 
1127
            self._repository.unlock()
 
1128
        return SuccessfulSmartServerResponse(("ok", ), )
 
1129
 
 
1130
 
 
1131
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
 
1132
    """Iterate over the contents of files.
 
1133
 
 
1134
    The client sends a list of desired files to stream, one
 
1135
    per line, and as tuples of file id and revision, separated by
 
1136
    \0.
 
1137
 
 
1138
    The server replies with a stream. Each entry is preceded by a header,
 
1139
    which can either be:
 
1140
 
 
1141
    * "ok\x00IDX\n" where IDX is the index of the entry in the desired files
 
1142
        list sent by the client. This header is followed by the contents of
 
1143
        the file, bzip2-compressed.
 
1144
    * "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
 
1145
        The client can then raise an appropriate RevisionNotPresent error
 
1146
        or check its fallback repositories.
 
1147
 
 
1148
    New in 2.5.
 
1149
    """
 
1150
 
 
1151
    def body_stream(self, repository, desired_files):
 
1152
        self._repository.lock_read()
 
1153
        try:
 
1154
            text_keys = {}
 
1155
            for i, key in enumerate(desired_files):
 
1156
                text_keys[key] = i
 
1157
            for record in repository.texts.get_record_stream(text_keys,
 
1158
                    'unordered', True):
 
1159
                identifier = text_keys[record.key]
 
1160
                if record.storage_kind == 'absent':
 
1161
                    yield "absent\0%s\0%s\0%d\n" % (record.key[0],
 
1162
                        record.key[1], identifier)
 
1163
                    # FIXME: Way to abort early?
 
1164
                    continue
 
1165
                yield "ok\0%d\n" % identifier
 
1166
                compressor = zlib.compressobj()
 
1167
                for bytes in record.get_bytes_as('chunked'):
 
1168
                    data = compressor.compress(bytes)
 
1169
                    if data:
 
1170
                        yield data
 
1171
                data = compressor.flush()
 
1172
                if data:
 
1173
                    yield data
 
1174
        finally:
 
1175
            self._repository.unlock()
 
1176
 
 
1177
    def do_body(self, body_bytes):
 
1178
        desired_files = [
 
1179
            tuple(l.split("\0")) for l in body_bytes.splitlines()]
 
1180
        return SuccessfulSmartServerResponse(('ok', ),
 
1181
            body_stream=self.body_stream(self._repository, desired_files))
 
1182
 
 
1183
    def do_repository_request(self, repository):
 
1184
        # Signal that we want a body
 
1185
        return None
 
1186
 
 
1187
 
 
1188
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
 
1189
    """Stream a list of revisions.
 
1190
 
 
1191
    The client sends a list of newline-separated revision ids in the
 
1192
    body of the request and the server replies with the serializer format,
 
1193
    and a stream of bzip2-compressed revision texts (using the specified
 
1194
    serializer format).
 
1195
 
 
1196
    Any revisions the server does not have are omitted from the stream.
 
1197
 
 
1198
    New in 2.5.
 
1199
    """
 
1200
 
 
1201
    def do_repository_request(self, repository):
 
1202
        self._repository = repository
 
1203
        # Signal there is a body
 
1204
        return None
 
1205
 
 
1206
    def do_body(self, body_bytes):
 
1207
        revision_ids = body_bytes.split("\n")
 
1208
        return SuccessfulSmartServerResponse(
 
1209
            ('ok', self._repository.get_serializer_format()),
 
1210
            body_stream=self.body_stream(self._repository, revision_ids))
 
1211
 
 
1212
    def body_stream(self, repository, revision_ids):
 
1213
        self._repository.lock_read()
 
1214
        try:
 
1215
            for record in repository.revisions.get_record_stream(
 
1216
                [(revid,) for revid in revision_ids], 'unordered', True):
 
1217
                if record.storage_kind == 'absent':
 
1218
                    continue
 
1219
                yield zlib.compress(record.get_bytes_as('fulltext'))
 
1220
        finally:
 
1221
            self._repository.unlock()