/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/smart/repository.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-10 21:59:15 UTC
  • mto: This revision was merged to the branch mainline in revision 6690.
  • Revision ID: jelmer@jelmer.uk-20170610215915-zcpu0in3r1irx3ml
Move serializer to bzr.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
"""Server-side repository related request implmentations."""
 
17
"""Server-side repository related request implementations."""
 
18
 
 
19
from __future__ import absolute_import
18
20
 
19
21
import bz2
 
22
import itertools
20
23
import os
21
 
import Queue
 
24
try:
 
25
    import queue
 
26
except ImportError:
 
27
    import Queue as queue
22
28
import sys
23
29
import tempfile
24
30
import threading
 
31
import zlib
25
32
 
26
 
from bzrlib import (
 
33
from .. import (
27
34
    bencode,
28
35
    errors,
29
 
    graph,
 
36
    estimate_compressed_size,
30
37
    osutils,
 
38
    trace,
 
39
    ui,
 
40
    )
 
41
from ..bzr import (
 
42
    inventory as _mod_inventory,
 
43
    inventory_delta,
31
44
    pack,
32
 
    ui,
33
 
    versionedfile,
 
45
    vf_search,
34
46
    )
35
 
from bzrlib.bzrdir import BzrDir
36
 
from bzrlib.smart.request import (
 
47
from ..bzr.bzrdir import BzrDir
 
48
from ..sixish import (
 
49
    reraise,
 
50
)
 
51
from .request import (
37
52
    FailedSmartServerResponse,
38
53
    SmartServerRequest,
39
54
    SuccessfulSmartServerResponse,
40
55
    )
41
 
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
42
 
from bzrlib import revision as _mod_revision
43
 
from bzrlib.versionedfile import (
 
56
from ..repository import _strip_NULL_ghosts, network_format_registry
 
57
from .. import revision as _mod_revision
 
58
from ..bzr.versionedfile import (
 
59
    ChunkedContentFactory,
44
60
    NetworkRecordStream,
45
61
    record_to_fulltext_bytes,
46
62
    )
82
98
            recreate_search trusts that clients will look for missing things
83
99
            they expected and get it from elsewhere.
84
100
        """
 
101
        if search_bytes == 'everything':
 
102
            return vf_search.EverythingResult(repository), None
85
103
        lines = search_bytes.split('\n')
86
104
        if lines[0] == 'ancestry-of':
87
105
            heads = lines[1:]
88
 
            search_result = graph.PendingAncestryResult(heads, repository)
 
106
            search_result = vf_search.PendingAncestryResult(heads, repository)
89
107
            return search_result, None
90
108
        elif lines[0] == 'search':
91
109
            return self.recreate_search_from_recipe(repository, lines[1:],
111
129
                start_keys)
112
130
            while True:
113
131
                try:
114
 
                    next_revs = search.next()
 
132
                    next_revs = next(search)
115
133
                except StopIteration:
116
134
                    break
117
135
                search.stop_searching_any(exclude_keys.intersection(next_revs))
118
 
            search_result = search.get_result()
119
 
            if (not discard_excess and
120
 
                search_result.get_recipe()[3] != revision_count):
 
136
            (started_keys, excludes, included_keys) = search.get_state()
 
137
            if (not discard_excess and len(included_keys) != revision_count):
121
138
                # we got back a different amount of data than expected, this
122
139
                # gets reported as NoSuchRevision, because less revisions
123
140
                # indicates missing revisions, and more should never happen as
124
141
                # the excludes list considers ghosts and ensures that ghost
125
142
                # filling races are not a problem.
126
143
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
144
            search_result = vf_search.SearchResult(started_keys, excludes,
 
145
                len(included_keys), included_keys)
127
146
            return (search_result, None)
128
147
        finally:
129
148
            repository.unlock()
141
160
            repository.unlock()
142
161
 
143
162
 
 
163
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
 
164
    """Break a repository lock."""
 
165
 
 
166
    def do_repository_request(self, repository):
 
167
        repository.break_lock()
 
168
        return SuccessfulSmartServerResponse(('ok', ))
 
169
 
 
170
 
 
171
_lsprof_count = 0
 
172
 
144
173
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
145
174
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
146
175
 
179
208
        finally:
180
209
            repository.unlock()
181
210
 
182
 
    def _do_repository_request(self, body_bytes):
183
 
        repository = self._repository
184
 
        revision_ids = set(self._revision_ids)
185
 
        include_missing = 'include-missing:' in revision_ids
186
 
        if include_missing:
187
 
            revision_ids.remove('include-missing:')
188
 
        body_lines = body_bytes.split('\n')
189
 
        search_result, error = self.recreate_search_from_recipe(
190
 
            repository, body_lines)
191
 
        if error is not None:
192
 
            return error
193
 
        # TODO might be nice to start up the search again; but thats not
194
 
        # written or tested yet.
195
 
        client_seen_revs = set(search_result.get_keys())
196
 
        # Always include the requested ids.
197
 
        client_seen_revs.difference_update(revision_ids)
198
 
        lines = []
199
 
        repo_graph = repository.get_graph()
 
211
    def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
 
212
                               include_missing, max_size=65536):
200
213
        result = {}
201
214
        queried_revs = set()
202
 
        size_so_far = 0
 
215
        estimator = estimate_compressed_size.ZLibEstimator(max_size)
203
216
        next_revs = revision_ids
204
217
        first_loop_done = False
205
218
        while next_revs:
227
240
                    # add parents to the result
228
241
                    result[encoded_id] = parents
229
242
                    # Approximate the serialized cost of this revision_id.
230
 
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
 
243
                    line = '%s %s\n' % (encoded_id, ' '.join(parents))
 
244
                    estimator.add_content(line)
231
245
            # get all the directly asked for parents, and then flesh out to
232
246
            # 64K (compressed) or so. We do one level of depth at a time to
233
247
            # stay in sync with the client. The 250000 magic number is
234
248
            # estimated compression ratio taken from bzr.dev itself.
235
 
            if self.no_extra_results or (
236
 
                first_loop_done and size_so_far > 250000):
 
249
            if self.no_extra_results or (first_loop_done and estimator.full()):
 
250
                trace.mutter('size: %d, z_size: %d'
 
251
                             % (estimator._uncompressed_size_added,
 
252
                                estimator._compressed_size_added))
237
253
                next_revs = set()
238
254
                break
239
255
            # don't query things we've already queried
240
 
            next_revs.difference_update(queried_revs)
 
256
            next_revs = next_revs.difference(queried_revs)
241
257
            first_loop_done = True
 
258
        return result
 
259
 
 
260
    def _do_repository_request(self, body_bytes):
 
261
        repository = self._repository
 
262
        revision_ids = set(self._revision_ids)
 
263
        include_missing = 'include-missing:' in revision_ids
 
264
        if include_missing:
 
265
            revision_ids.remove('include-missing:')
 
266
        body_lines = body_bytes.split('\n')
 
267
        search_result, error = self.recreate_search_from_recipe(
 
268
            repository, body_lines)
 
269
        if error is not None:
 
270
            return error
 
271
        # TODO might be nice to start up the search again; but thats not
 
272
        # written or tested yet.
 
273
        client_seen_revs = set(search_result.get_keys())
 
274
        # Always include the requested ids.
 
275
        client_seen_revs.difference_update(revision_ids)
 
276
 
 
277
        repo_graph = repository.get_graph()
 
278
        result = self._expand_requested_revs(repo_graph, revision_ids,
 
279
                                             client_seen_revs, include_missing)
242
280
 
243
281
        # sorting trivially puts lexographically similar revision ids together.
244
282
        # Compression FTW.
 
283
        lines = []
245
284
        for revision, parents in sorted(result.items()):
246
285
            lines.append(' '.join((revision, ) + tuple(parents)))
247
286
 
271
310
        else:
272
311
            search_ids = repository.all_revision_ids()
273
312
        search = graph._make_breadth_first_searcher(search_ids)
274
 
        transitive_ids = set()
275
 
        map(transitive_ids.update, list(search))
 
313
        transitive_ids = set(itertools.chain.from_iterable(search))
276
314
        parent_map = graph.get_parent_map(transitive_ids)
277
315
        revision_graph = _strip_NULL_ghosts(parent_map)
278
316
        if revision_id and revision_id not in revision_graph:
297
335
        """
298
336
        try:
299
337
            found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
300
 
        except errors.RevisionNotPresent, err:
 
338
        except errors.RevisionNotPresent as err:
301
339
            if err.revision_id != known_pair[1]:
302
340
                raise AssertionError(
303
341
                    'get_rev_id_for_revno raised RevisionNotPresent for '
312
350
                ('history-incomplete', earliest_revno, earliest_revid))
313
351
 
314
352
 
 
353
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
 
354
 
 
355
    def do_repository_request(self, repository):
 
356
        """Return the serializer format for this repository.
 
357
 
 
358
        New in 2.5.0.
 
359
 
 
360
        :param repository: The repository to query
 
361
        :return: A smart server response ('ok', FORMAT)
 
362
        """
 
363
        serializer = repository.get_serializer_format()
 
364
        return SuccessfulSmartServerResponse(('ok', serializer))
 
365
 
 
366
 
315
367
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
316
368
 
317
369
    def do_repository_request(self, repository, revision_id):
319
371
 
320
372
        :param repository: The repository to query in.
321
373
        :param revision_id: The utf8 encoded revision_id to lookup.
322
 
        :return: A smart server response of ('ok', ) if the revision is
323
 
            present.
 
374
        :return: A smart server response of ('yes', ) if the revision is
 
375
            present. ('no', ) if it is missing.
324
376
        """
325
377
        if repository.has_revision(revision_id):
326
378
            return SuccessfulSmartServerResponse(('yes', ))
328
380
            return SuccessfulSmartServerResponse(('no', ))
329
381
 
330
382
 
 
383
class SmartServerRequestHasSignatureForRevisionId(
 
384
        SmartServerRepositoryRequest):
 
385
 
 
386
    def do_repository_request(self, repository, revision_id):
 
387
        """Return ok if a signature is present for a revision.
 
388
 
 
389
        Introduced in bzr 2.5.0.
 
390
 
 
391
        :param repository: The repository to query in.
 
392
        :param revision_id: The utf8 encoded revision_id to lookup.
 
393
        :return: A smart server response of ('yes', ) if a
 
394
            signature for the revision is present,
 
395
            ('no', ) if it is missing.
 
396
        """
 
397
        try:
 
398
            if repository.has_signature_for_revision_id(revision_id):
 
399
                return SuccessfulSmartServerResponse(('yes', ))
 
400
            else:
 
401
                return SuccessfulSmartServerResponse(('no', ))
 
402
        except errors.NoSuchRevision:
 
403
            return FailedSmartServerResponse(
 
404
                ('nosuchrevision', revision_id))
 
405
 
 
406
 
331
407
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
332
408
 
333
409
    def do_repository_request(self, repository, revid, committers):
353
429
            decoded_committers = True
354
430
        else:
355
431
            decoded_committers = None
356
 
        stats = repository.gather_stats(decoded_revision_id, decoded_committers)
 
432
        try:
 
433
            stats = repository.gather_stats(decoded_revision_id,
 
434
                decoded_committers)
 
435
        except errors.NoSuchRevision:
 
436
            return FailedSmartServerResponse(('nosuchrevision', revid))
357
437
 
358
438
        body = ''
359
 
        if stats.has_key('committers'):
 
439
        if 'committers' in stats:
360
440
            body += 'committers: %d\n' % stats['committers']
361
 
        if stats.has_key('firstrev'):
 
441
        if 'firstrev' in stats:
362
442
            body += 'firstrev: %.3f %d\n' % stats['firstrev']
363
 
        if stats.has_key('latestrev'):
 
443
        if 'latestrev' in stats:
364
444
             body += 'latestrev: %.3f %d\n' % stats['latestrev']
365
 
        if stats.has_key('revisions'):
 
445
        if 'revisions' in stats:
366
446
            body += 'revisions: %d\n' % stats['revisions']
367
 
        if stats.has_key('size'):
 
447
        if 'size' in stats:
368
448
            body += 'size: %d\n' % stats['size']
369
449
 
370
450
        return SuccessfulSmartServerResponse(('ok', ), body)
371
451
 
372
452
 
 
453
class SmartServerRepositoryGetRevisionSignatureText(
 
454
        SmartServerRepositoryRequest):
 
455
    """Return the signature text of a revision.
 
456
 
 
457
    New in 2.5.
 
458
    """
 
459
 
 
460
    def do_repository_request(self, repository, revision_id):
 
461
        """Return the result of repository.get_signature_text().
 
462
 
 
463
        :param repository: The repository to query in.
 
464
        :return: A smart server response of with the signature text as
 
465
            body.
 
466
        """
 
467
        try:
 
468
            text = repository.get_signature_text(revision_id)
 
469
        except errors.NoSuchRevision as err:
 
470
            return FailedSmartServerResponse(
 
471
                ('nosuchrevision', err.revision))
 
472
        return SuccessfulSmartServerResponse(('ok', ), text)
 
473
 
 
474
 
373
475
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
374
476
 
375
477
    def do_repository_request(self, repository):
385
487
            return SuccessfulSmartServerResponse(('no', ))
386
488
 
387
489
 
 
490
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
 
491
 
 
492
    def do_repository_request(self, repository):
 
493
        """Return the result of repository.make_working_trees().
 
494
 
 
495
        Introduced in bzr 2.5.0.
 
496
 
 
497
        :param repository: The repository to query in.
 
498
        :return: A smart server response of ('yes', ) if the repository uses
 
499
            working trees, and ('no', ) if it is not.
 
500
        """
 
501
        if repository.make_working_trees():
 
502
            return SuccessfulSmartServerResponse(('yes', ))
 
503
        else:
 
504
            return SuccessfulSmartServerResponse(('no', ))
 
505
 
 
506
 
388
507
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
389
508
 
390
509
    def do_repository_request(self, repository, token=''):
392
511
        if token == '':
393
512
            token = None
394
513
        try:
395
 
            token = repository.lock_write(token=token)
396
 
        except errors.LockContention, e:
 
514
            token = repository.lock_write(token=token).repository_token
 
515
        except errors.LockContention as e:
397
516
            return FailedSmartServerResponse(('LockContention',))
398
517
        except errors.UnlockableTransport:
399
518
            return FailedSmartServerResponse(('UnlockableTransport',))
400
 
        except errors.LockFailed, e:
 
519
        except errors.LockFailed as e:
401
520
            return FailedSmartServerResponse(('LockFailed',
402
521
                str(e.lock), str(e.why)))
403
522
        if token is not None:
413
532
    def do_repository_request(self, repository, to_network_name):
414
533
        """Get a stream for inserting into a to_format repository.
415
534
 
 
535
        The request body is 'search_bytes', a description of the revisions
 
536
        being requested.
 
537
 
 
538
        In 2.3 this verb added support for search_bytes == 'everything'.  Older
 
539
        implementations will respond with a BadSearch error, and clients should
 
540
        catch this and fallback appropriately.
 
541
 
416
542
        :param repository: The repository to stream from.
417
543
        :param to_network_name: The network name of the format of the target
418
544
            repository.
466
592
            source = repository._get_source(self._to_format)
467
593
            stream = source.get_stream(search_result)
468
594
        except Exception:
469
 
            exc_info = sys.exc_info()
470
595
            try:
471
596
                # On non-error, unlocking is done by the body stream handler.
472
597
                repository.unlock()
473
598
            finally:
474
 
                raise exc_info[0], exc_info[1], exc_info[2]
 
599
                raise
475
600
        return SuccessfulSmartServerResponse(('ok',),
476
601
            body_stream=self.body_stream(stream, repository))
477
602
 
480
605
        try:
481
606
            for bytes in byte_stream:
482
607
                yield bytes
483
 
        except errors.RevisionNotPresent, e:
 
608
        except errors.RevisionNotPresent as e:
484
609
            # This shouldn't be able to happen, but as we don't buffer
485
610
            # everything it can in theory happen.
486
611
            repository.unlock()
490
615
 
491
616
 
492
617
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
 
618
    """The same as Repository.get_stream, but will return stream CHK formats to
 
619
    clients.
 
620
 
 
621
    See SmartServerRepositoryGetStream._should_fake_unknown.
 
622
    
 
623
    New in 1.19.
 
624
    """
493
625
 
494
626
    def _should_fake_unknown(self):
495
627
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
505
637
        for record in substream:
506
638
            if record.storage_kind in ('chunked', 'fulltext'):
507
639
                serialised = record_to_fulltext_bytes(record)
508
 
            elif record.storage_kind == 'inventory-delta':
509
 
                serialised = record_to_inventory_delta_bytes(record)
510
640
            elif record.storage_kind == 'absent':
511
641
                raise ValueError("Absent factory for %s" % (record.key,))
512
642
            else:
544
674
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
545
675
    """
546
676
 
547
 
    def __init__(self, byte_stream):
 
677
    def __init__(self, byte_stream, record_counter):
548
678
        """Create a _ByteStreamDecoder."""
549
679
        self.stream_decoder = pack.ContainerPushParser()
550
680
        self.current_type = None
551
681
        self.first_bytes = None
552
682
        self.byte_stream = byte_stream
 
683
        self._record_counter = record_counter
 
684
        self.key_count = 0
553
685
 
554
686
    def iter_stream_decoder(self):
555
687
        """Iterate the contents of the pack from stream_decoder."""
580
712
 
581
713
    def record_stream(self):
582
714
        """Yield substream_type, substream from the byte stream."""
 
715
        def wrap_and_count(pb, rc, substream):
 
716
            """Yield records from stream while showing progress."""
 
717
            counter = 0
 
718
            if rc:
 
719
                if self.current_type != 'revisions' and self.key_count != 0:
 
720
                    # As we know the number of revisions now (in self.key_count)
 
721
                    # we can setup and use record_counter (rc).
 
722
                    if not rc.is_initialized():
 
723
                        rc.setup(self.key_count, self.key_count)
 
724
            for record in substream.read():
 
725
                if rc:
 
726
                    if rc.is_initialized() and counter == rc.STEP:
 
727
                        rc.increment(counter)
 
728
                        pb.update('Estimate', rc.current, rc.max)
 
729
                        counter = 0
 
730
                    if self.current_type == 'revisions':
 
731
                        # Total records is proportional to number of revs
 
732
                        # to fetch. With remote, we used self.key_count to
 
733
                        # track the number of revs. Once we have the revs
 
734
                        # counts in self.key_count, the progress bar changes
 
735
                        # from 'Estimating..' to 'Estimate' above.
 
736
                        self.key_count += 1
 
737
                        if counter == rc.STEP:
 
738
                            pb.update('Estimating..', self.key_count)
 
739
                            counter = 0
 
740
                counter += 1
 
741
                yield record
 
742
 
583
743
        self.seed_state()
584
 
        # Make and consume sub generators, one per substream type:
585
 
        while self.first_bytes is not None:
586
 
            substream = NetworkRecordStream(self.iter_substream_bytes())
587
 
            # after substream is fully consumed, self.current_type is set to
588
 
            # the next type, and self.first_bytes is set to the matching bytes.
589
 
            yield self.current_type, substream.read()
 
744
        pb = ui.ui_factory.nested_progress_bar()
 
745
        rc = self._record_counter
 
746
        try:
 
747
            # Make and consume sub generators, one per substream type:
 
748
            while self.first_bytes is not None:
 
749
                substream = NetworkRecordStream(self.iter_substream_bytes())
 
750
                # after substream is fully consumed, self.current_type is set
 
751
                # to the next type, and self.first_bytes is set to the matching
 
752
                # bytes.
 
753
                yield self.current_type, wrap_and_count(pb, rc, substream)
 
754
        finally:
 
755
            if rc:
 
756
                pb.update('Done', rc.max, rc.max)
 
757
            pb.finished()
590
758
 
591
759
    def seed_state(self):
592
760
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
597
765
        list(self.iter_substream_bytes())
598
766
 
599
767
 
600
 
def _byte_stream_to_stream(byte_stream):
 
768
def _byte_stream_to_stream(byte_stream, record_counter=None):
601
769
    """Convert a byte stream into a format and a stream.
602
770
 
603
771
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
604
772
    :return: (RepositoryFormat, stream_generator)
605
773
    """
606
 
    decoder = _ByteStreamDecoder(byte_stream)
 
774
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
607
775
    for bytes in byte_stream:
608
776
        decoder.stream_decoder.accept_bytes(bytes)
609
777
        for record in decoder.stream_decoder.read_pending_records(max=1):
617
785
    def do_repository_request(self, repository, token):
618
786
        try:
619
787
            repository.lock_write(token=token)
620
 
        except errors.TokenMismatch, e:
 
788
        except errors.TokenMismatch as e:
621
789
            return FailedSmartServerResponse(('TokenMismatch',))
622
790
        repository.dont_leave_lock_in_place()
623
791
        repository.unlock()
624
792
        return SuccessfulSmartServerResponse(('ok',))
625
793
 
626
794
 
 
795
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
 
796
    """Get the physical lock status for a repository.
 
797
 
 
798
    New in 2.5.
 
799
    """
 
800
 
 
801
    def do_repository_request(self, repository):
 
802
        if repository.get_physical_lock_status():
 
803
            return SuccessfulSmartServerResponse(('yes', ))
 
804
        else:
 
805
            return SuccessfulSmartServerResponse(('no', ))
 
806
 
 
807
 
627
808
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
628
809
 
629
810
    def do_repository_request(self, repository, str_bool_new_value):
657
838
 
658
839
    def _copy_to_tempdir(self, from_repo):
659
840
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
660
 
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
 
841
        tmp_bzrdir = from_repo.controldir._format.initialize(tmp_dirname)
661
842
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
662
843
        from_repo.copy_content_into(tmp_repo)
663
844
        return tmp_dirname, tmp_repo
713
894
        tokens = [token for token in resume_tokens.split(' ') if token]
714
895
        self.tokens = tokens
715
896
        self.repository = repository
716
 
        self.queue = Queue.Queue()
 
897
        self.queue = queue.Queue()
717
898
        self.insert_thread = threading.Thread(target=self._inserter_thread)
718
899
        self.insert_thread.start()
719
900
 
744
925
        if self.insert_thread is not None:
745
926
            self.insert_thread.join()
746
927
        if not self.insert_ok:
747
 
            exc_info = self.insert_exception
748
 
            raise exc_info[0], exc_info[1], exc_info[2]
 
928
            try:
 
929
                reraise(*self.insert_exception)
 
930
            finally:
 
931
                del self.insert_exception
749
932
        write_group_tokens, missing_keys = self.insert_result
750
933
        if write_group_tokens or missing_keys:
751
934
            # bzip needed? missing keys should typically be a small set.
792
975
        self.do_insert_stream_request(repository, resume_tokens)
793
976
 
794
977
 
 
978
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
 
979
    """Add a revision signature text.
 
980
 
 
981
    New in 2.5.
 
982
    """
 
983
 
 
984
    def do_repository_request(self, repository, lock_token, revision_id,
 
985
            *write_group_tokens):
 
986
        """Add a revision signature text.
 
987
 
 
988
        :param repository: Repository to operate on
 
989
        :param lock_token: Lock token
 
990
        :param revision_id: Revision for which to add signature
 
991
        :param write_group_tokens: Write group tokens
 
992
        """
 
993
        self._lock_token = lock_token
 
994
        self._revision_id = revision_id
 
995
        self._write_group_tokens = write_group_tokens
 
996
        return None
 
997
 
 
998
    def do_body(self, body_bytes):
 
999
        """Add a signature text.
 
1000
 
 
1001
        :param body_bytes: GPG signature text
 
1002
        :return: SuccessfulSmartServerResponse with arguments 'ok' and
 
1003
            the list of new write group tokens.
 
1004
        """
 
1005
        self._repository.lock_write(token=self._lock_token)
 
1006
        try:
 
1007
            self._repository.resume_write_group(self._write_group_tokens)
 
1008
            try:
 
1009
                self._repository.add_signature_text(self._revision_id,
 
1010
                    body_bytes)
 
1011
            finally:
 
1012
                new_write_group_tokens = self._repository.suspend_write_group()
 
1013
        finally:
 
1014
            self._repository.unlock()
 
1015
        return SuccessfulSmartServerResponse(
 
1016
            ('ok', ) + tuple(new_write_group_tokens))
 
1017
 
 
1018
 
 
1019
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
 
1020
    """Start a write group.
 
1021
 
 
1022
    New in 2.5.
 
1023
    """
 
1024
 
 
1025
    def do_repository_request(self, repository, lock_token):
 
1026
        """Start a write group."""
 
1027
        repository.lock_write(token=lock_token)
 
1028
        try:
 
1029
            repository.start_write_group()
 
1030
            try:
 
1031
                tokens = repository.suspend_write_group()
 
1032
            except errors.UnsuspendableWriteGroup:
 
1033
                return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
 
1034
        finally:
 
1035
            repository.unlock()
 
1036
        return SuccessfulSmartServerResponse(('ok', tokens))
 
1037
 
 
1038
 
 
1039
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
 
1040
    """Commit a write group.
 
1041
 
 
1042
    New in 2.5.
 
1043
    """
 
1044
 
 
1045
    def do_repository_request(self, repository, lock_token,
 
1046
            write_group_tokens):
 
1047
        """Commit a write group."""
 
1048
        repository.lock_write(token=lock_token)
 
1049
        try:
 
1050
            try:
 
1051
                repository.resume_write_group(write_group_tokens)
 
1052
            except errors.UnresumableWriteGroup as e:
 
1053
                return FailedSmartServerResponse(
 
1054
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1055
            try:
 
1056
                repository.commit_write_group()
 
1057
            except:
 
1058
                write_group_tokens = repository.suspend_write_group()
 
1059
                # FIXME JRV 2011-11-19: What if the write_group_tokens
 
1060
                # have changed?
 
1061
                raise
 
1062
        finally:
 
1063
            repository.unlock()
 
1064
        return SuccessfulSmartServerResponse(('ok', ))
 
1065
 
 
1066
 
 
1067
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
 
1068
    """Abort a write group.
 
1069
 
 
1070
    New in 2.5.
 
1071
    """
 
1072
 
 
1073
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1074
        """Abort a write group."""
 
1075
        repository.lock_write(token=lock_token)
 
1076
        try:
 
1077
            try:
 
1078
                repository.resume_write_group(write_group_tokens)
 
1079
            except errors.UnresumableWriteGroup as e:
 
1080
                return FailedSmartServerResponse(
 
1081
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1082
                repository.abort_write_group()
 
1083
        finally:
 
1084
            repository.unlock()
 
1085
        return SuccessfulSmartServerResponse(('ok', ))
 
1086
 
 
1087
 
 
1088
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
 
1089
    """Check that a write group is still valid.
 
1090
 
 
1091
    New in 2.5.
 
1092
    """
 
1093
 
 
1094
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1095
        """Abort a write group."""
 
1096
        repository.lock_write(token=lock_token)
 
1097
        try:
 
1098
            try:
 
1099
                repository.resume_write_group(write_group_tokens)
 
1100
            except errors.UnresumableWriteGroup as e:
 
1101
                return FailedSmartServerResponse(
 
1102
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1103
            else:
 
1104
                repository.suspend_write_group()
 
1105
        finally:
 
1106
            repository.unlock()
 
1107
        return SuccessfulSmartServerResponse(('ok', ))
 
1108
 
 
1109
 
 
1110
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
 
1111
    """Retrieve all of the revision ids in a repository.
 
1112
 
 
1113
    New in 2.5.
 
1114
    """
 
1115
 
 
1116
    def do_repository_request(self, repository):
 
1117
        revids = repository.all_revision_ids()
 
1118
        return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
 
1119
 
 
1120
 
 
1121
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
 
1122
    """Reconcile a repository.
 
1123
 
 
1124
    New in 2.5.
 
1125
    """
 
1126
 
 
1127
    def do_repository_request(self, repository, lock_token):
 
1128
        try:
 
1129
            repository.lock_write(token=lock_token)
 
1130
        except errors.TokenLockingNotSupported as e:
 
1131
            return FailedSmartServerResponse(
 
1132
                ('TokenLockingNotSupported', ))
 
1133
        try:
 
1134
            reconciler = repository.reconcile()
 
1135
        finally:
 
1136
            repository.unlock()
 
1137
        body = [
 
1138
            "garbage_inventories: %d\n" % reconciler.garbage_inventories,
 
1139
            "inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
 
1140
            ]
 
1141
        return SuccessfulSmartServerResponse(('ok', ), "".join(body))
 
1142
 
 
1143
 
 
1144
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
 
1145
    """Pack a repository.
 
1146
 
 
1147
    New in 2.5.
 
1148
    """
 
1149
 
 
1150
    def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
 
1151
        self._repository = repository
 
1152
        self._lock_token = lock_token
 
1153
        if clean_obsolete_packs == 'True':
 
1154
            self._clean_obsolete_packs = True
 
1155
        else:
 
1156
            self._clean_obsolete_packs = False
 
1157
        return None
 
1158
 
 
1159
    def do_body(self, body_bytes):
 
1160
        if body_bytes == "":
 
1161
            hint = None
 
1162
        else:
 
1163
            hint = body_bytes.splitlines()
 
1164
        self._repository.lock_write(token=self._lock_token)
 
1165
        try:
 
1166
            self._repository.pack(hint, self._clean_obsolete_packs)
 
1167
        finally:
 
1168
            self._repository.unlock()
 
1169
        return SuccessfulSmartServerResponse(("ok", ), )
 
1170
 
 
1171
 
 
1172
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
 
1173
    """Iterate over the contents of files.
 
1174
 
 
1175
    The client sends a list of desired files to stream, one
 
1176
    per line, and as tuples of file id and revision, separated by
 
1177
    \0.
 
1178
 
 
1179
    The server replies with a stream. Each entry is preceded by a header,
 
1180
    which can either be:
 
1181
 
 
1182
    * "ok\x00IDX\n" where IDX is the index of the entry in the desired files
 
1183
        list sent by the client. This header is followed by the contents of
 
1184
        the file, bzip2-compressed.
 
1185
    * "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
 
1186
        The client can then raise an appropriate RevisionNotPresent error
 
1187
        or check its fallback repositories.
 
1188
 
 
1189
    New in 2.5.
 
1190
    """
 
1191
 
 
1192
    def body_stream(self, repository, desired_files):
 
1193
        self._repository.lock_read()
 
1194
        try:
 
1195
            text_keys = {}
 
1196
            for i, key in enumerate(desired_files):
 
1197
                text_keys[key] = i
 
1198
            for record in repository.texts.get_record_stream(text_keys,
 
1199
                    'unordered', True):
 
1200
                identifier = text_keys[record.key]
 
1201
                if record.storage_kind == 'absent':
 
1202
                    yield "absent\0%s\0%s\0%d\n" % (record.key[0],
 
1203
                        record.key[1], identifier)
 
1204
                    # FIXME: Way to abort early?
 
1205
                    continue
 
1206
                yield "ok\0%d\n" % identifier
 
1207
                compressor = zlib.compressobj()
 
1208
                for bytes in record.get_bytes_as('chunked'):
 
1209
                    data = compressor.compress(bytes)
 
1210
                    if data:
 
1211
                        yield data
 
1212
                data = compressor.flush()
 
1213
                if data:
 
1214
                    yield data
 
1215
        finally:
 
1216
            self._repository.unlock()
 
1217
 
 
1218
    def do_body(self, body_bytes):
 
1219
        desired_files = [
 
1220
            tuple(l.split("\0")) for l in body_bytes.splitlines()]
 
1221
        return SuccessfulSmartServerResponse(('ok', ),
 
1222
            body_stream=self.body_stream(self._repository, desired_files))
 
1223
 
 
1224
    def do_repository_request(self, repository):
 
1225
        # Signal that we want a body
 
1226
        return None
 
1227
 
 
1228
 
 
1229
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
 
1230
    """Stream a list of revisions.
 
1231
 
 
1232
    The client sends a list of newline-separated revision ids in the
 
1233
    body of the request and the server replies with the serializer format,
 
1234
    and a stream of bzip2-compressed revision texts (using the specified
 
1235
    serializer format).
 
1236
 
 
1237
    Any revisions the server does not have are omitted from the stream.
 
1238
 
 
1239
    New in 2.5.
 
1240
    """
 
1241
 
 
1242
    def do_repository_request(self, repository):
 
1243
        self._repository = repository
 
1244
        # Signal there is a body
 
1245
        return None
 
1246
 
 
1247
    def do_body(self, body_bytes):
 
1248
        revision_ids = body_bytes.split("\n")
 
1249
        return SuccessfulSmartServerResponse(
 
1250
            ('ok', self._repository.get_serializer_format()),
 
1251
            body_stream=self.body_stream(self._repository, revision_ids))
 
1252
 
 
1253
    def body_stream(self, repository, revision_ids):
 
1254
        self._repository.lock_read()
 
1255
        try:
 
1256
            for record in repository.revisions.get_record_stream(
 
1257
                [(revid,) for revid in revision_ids], 'unordered', True):
 
1258
                if record.storage_kind == 'absent':
 
1259
                    continue
 
1260
                yield zlib.compress(record.get_bytes_as('fulltext'))
 
1261
        finally:
 
1262
            self._repository.unlock()
 
1263
 
 
1264
 
 
1265
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
 
1266
    """Get the inventory deltas for a set of revision ids.
 
1267
 
 
1268
    This accepts a list of revision ids, and then sends a chain
 
1269
    of deltas for the inventories of those revisions. The first
 
1270
    revision will be empty.
 
1271
 
 
1272
    The server writes back zlibbed serialized inventory deltas,
 
1273
    in the ordering specified. The base for each delta is the
 
1274
    inventory generated by the previous delta.
 
1275
 
 
1276
    New in 2.5.
 
1277
    """
 
1278
 
 
1279
    def _inventory_delta_stream(self, repository, ordering, revids):
 
1280
        prev_inv = _mod_inventory.Inventory(root_id=None,
 
1281
            revision_id=_mod_revision.NULL_REVISION)
 
1282
        serializer = inventory_delta.InventoryDeltaSerializer(
 
1283
            repository.supports_rich_root(),
 
1284
            repository._format.supports_tree_reference)
 
1285
        repository.lock_read()
 
1286
        try:
 
1287
            for inv, revid in repository._iter_inventories(revids, ordering):
 
1288
                if inv is None:
 
1289
                    continue
 
1290
                inv_delta = inv._make_delta(prev_inv)
 
1291
                lines = serializer.delta_to_lines(
 
1292
                    prev_inv.revision_id, inv.revision_id, inv_delta)
 
1293
                yield ChunkedContentFactory(inv.revision_id, None, None, lines)
 
1294
                prev_inv = inv
 
1295
        finally:
 
1296
            repository.unlock()
 
1297
 
 
1298
    def body_stream(self, repository, ordering, revids):
 
1299
        substream = self._inventory_delta_stream(repository,
 
1300
            ordering, revids)
 
1301
        return _stream_to_byte_stream([('inventory-deltas', substream)],
 
1302
            repository._format)
 
1303
 
 
1304
    def do_body(self, body_bytes):
 
1305
        return SuccessfulSmartServerResponse(('ok', ),
 
1306
            body_stream=self.body_stream(self._repository, self._ordering,
 
1307
                body_bytes.splitlines()))
 
1308
 
 
1309
    def do_repository_request(self, repository, ordering):
 
1310
        if ordering == 'unordered':
 
1311
            # inventory deltas for a topologically sorted stream
 
1312
            # are likely to be smaller
 
1313
            ordering = 'topological'
 
1314
        self._ordering = ordering
 
1315
        # Signal that we want a body
 
1316
        return None