/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/smart/repository.py

  • Committer: Jelmer Vernooij
  • Date: 2018-03-31 17:16:06 UTC
  • mto: (6883.23.7 bundle-git)
  • mto: This revision was merged to the branch mainline in revision 6955.
  • Revision ID: jelmer@jelmer.uk-20180331171606-kc0lxw0uoxinxx4p
add --lossy option to 'bzr push'.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
"""Server-side repository related request implmentations."""
 
17
"""Server-side repository related request implementations."""
 
18
 
 
19
from __future__ import absolute_import
18
20
 
19
21
import bz2
 
22
import itertools
20
23
import os
21
 
import Queue
 
24
try:
 
25
    import queue
 
26
except ImportError:
 
27
    import Queue as queue
22
28
import sys
23
29
import tempfile
24
30
import threading
 
31
import zlib
25
32
 
26
 
from bzrlib import (
 
33
from ... import (
27
34
    bencode,
28
35
    errors,
29
 
    graph,
 
36
    estimate_compressed_size,
30
37
    osutils,
 
38
    trace,
 
39
    ui,
 
40
    )
 
41
from .. import (
 
42
    inventory as _mod_inventory,
 
43
    inventory_delta,
31
44
    pack,
32
 
    ui,
33
 
    versionedfile,
 
45
    vf_search,
34
46
    )
35
 
from bzrlib.bzrdir import BzrDir
36
 
from bzrlib.smart.request import (
 
47
from ..bzrdir import BzrDir
 
48
from ...sixish import (
 
49
    reraise,
 
50
)
 
51
from .request import (
37
52
    FailedSmartServerResponse,
38
53
    SmartServerRequest,
39
54
    SuccessfulSmartServerResponse,
40
55
    )
41
 
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
42
 
from bzrlib import revision as _mod_revision
43
 
from bzrlib.versionedfile import (
 
56
from ...repository import _strip_NULL_ghosts, network_format_registry
 
57
from ... import revision as _mod_revision
 
58
from ..versionedfile import (
 
59
    ChunkedContentFactory,
44
60
    NetworkRecordStream,
45
61
    record_to_fulltext_bytes,
46
62
    )
82
98
            recreate_search trusts that clients will look for missing things
83
99
            they expected and get it from elsewhere.
84
100
        """
 
101
        if search_bytes == 'everything':
 
102
            return vf_search.EverythingResult(repository), None
85
103
        lines = search_bytes.split('\n')
86
104
        if lines[0] == 'ancestry-of':
87
105
            heads = lines[1:]
88
 
            search_result = graph.PendingAncestryResult(heads, repository)
 
106
            search_result = vf_search.PendingAncestryResult(heads, repository)
89
107
            return search_result, None
90
108
        elif lines[0] == 'search':
91
109
            return self.recreate_search_from_recipe(repository, lines[1:],
105
123
        start_keys = set(lines[0].split(' '))
106
124
        exclude_keys = set(lines[1].split(' '))
107
125
        revision_count = int(lines[2])
108
 
        repository.lock_read()
109
 
        try:
 
126
        with repository.lock_read():
110
127
            search = repository.get_graph()._make_breadth_first_searcher(
111
128
                start_keys)
112
129
            while True:
113
130
                try:
114
 
                    next_revs = search.next()
 
131
                    next_revs = next(search)
115
132
                except StopIteration:
116
133
                    break
117
134
                search.stop_searching_any(exclude_keys.intersection(next_revs))
118
 
            search_result = search.get_result()
119
 
            if (not discard_excess and
120
 
                search_result.get_recipe()[3] != revision_count):
 
135
            (started_keys, excludes, included_keys) = search.get_state()
 
136
            if (not discard_excess and len(included_keys) != revision_count):
121
137
                # we got back a different amount of data than expected, this
122
138
                # gets reported as NoSuchRevision, because less revisions
123
139
                # indicates missing revisions, and more should never happen as
124
140
                # the excludes list considers ghosts and ensures that ghost
125
141
                # filling races are not a problem.
126
142
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
143
            search_result = vf_search.SearchResult(started_keys, excludes,
 
144
                len(included_keys), included_keys)
127
145
            return (search_result, None)
128
 
        finally:
129
 
            repository.unlock()
130
146
 
131
147
 
132
148
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
134
150
 
135
151
    def do_repository_request(self, repository, *args):
136
152
        """Read lock a repository for do_readlocked_repository_request."""
137
 
        repository.lock_read()
138
 
        try:
 
153
        with repository.lock_read():
139
154
            return self.do_readlocked_repository_request(repository, *args)
140
 
        finally:
141
 
            repository.unlock()
142
 
 
 
155
 
 
156
 
 
157
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
 
158
    """Break a repository lock."""
 
159
 
 
160
    def do_repository_request(self, repository):
 
161
        repository.break_lock()
 
162
        return SuccessfulSmartServerResponse(('ok', ))
 
163
 
 
164
 
 
165
_lsprof_count = 0
143
166
 
144
167
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
145
168
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
173
196
            compressed.
174
197
        """
175
198
        repository = self._repository
176
 
        repository.lock_read()
177
 
        try:
 
199
        with repository.lock_read():
178
200
            return self._do_repository_request(body_bytes)
179
 
        finally:
180
 
            repository.unlock()
181
201
 
182
 
    def _do_repository_request(self, body_bytes):
183
 
        repository = self._repository
184
 
        revision_ids = set(self._revision_ids)
185
 
        include_missing = 'include-missing:' in revision_ids
186
 
        if include_missing:
187
 
            revision_ids.remove('include-missing:')
188
 
        body_lines = body_bytes.split('\n')
189
 
        search_result, error = self.recreate_search_from_recipe(
190
 
            repository, body_lines)
191
 
        if error is not None:
192
 
            return error
193
 
        # TODO might be nice to start up the search again; but thats not
194
 
        # written or tested yet.
195
 
        client_seen_revs = set(search_result.get_keys())
196
 
        # Always include the requested ids.
197
 
        client_seen_revs.difference_update(revision_ids)
198
 
        lines = []
199
 
        repo_graph = repository.get_graph()
 
202
    def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
 
203
                               include_missing, max_size=65536):
200
204
        result = {}
201
205
        queried_revs = set()
202
 
        size_so_far = 0
 
206
        estimator = estimate_compressed_size.ZLibEstimator(max_size)
203
207
        next_revs = revision_ids
204
208
        first_loop_done = False
205
209
        while next_revs:
227
231
                    # add parents to the result
228
232
                    result[encoded_id] = parents
229
233
                    # Approximate the serialized cost of this revision_id.
230
 
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
 
234
                    line = '%s %s\n' % (encoded_id, ' '.join(parents))
 
235
                    estimator.add_content(line)
231
236
            # get all the directly asked for parents, and then flesh out to
232
237
            # 64K (compressed) or so. We do one level of depth at a time to
233
238
            # stay in sync with the client. The 250000 magic number is
234
239
            # estimated compression ratio taken from bzr.dev itself.
235
 
            if self.no_extra_results or (
236
 
                first_loop_done and size_so_far > 250000):
 
240
            if self.no_extra_results or (first_loop_done and estimator.full()):
 
241
                trace.mutter('size: %d, z_size: %d'
 
242
                             % (estimator._uncompressed_size_added,
 
243
                                estimator._compressed_size_added))
237
244
                next_revs = set()
238
245
                break
239
246
            # don't query things we've already queried
240
 
            next_revs.difference_update(queried_revs)
 
247
            next_revs = next_revs.difference(queried_revs)
241
248
            first_loop_done = True
 
249
        return result
 
250
 
 
251
    def _do_repository_request(self, body_bytes):
 
252
        repository = self._repository
 
253
        revision_ids = set(self._revision_ids)
 
254
        include_missing = 'include-missing:' in revision_ids
 
255
        if include_missing:
 
256
            revision_ids.remove('include-missing:')
 
257
        body_lines = body_bytes.split('\n')
 
258
        search_result, error = self.recreate_search_from_recipe(
 
259
            repository, body_lines)
 
260
        if error is not None:
 
261
            return error
 
262
        # TODO might be nice to start up the search again; but thats not
 
263
        # written or tested yet.
 
264
        client_seen_revs = set(search_result.get_keys())
 
265
        # Always include the requested ids.
 
266
        client_seen_revs.difference_update(revision_ids)
 
267
 
 
268
        repo_graph = repository.get_graph()
 
269
        result = self._expand_requested_revs(repo_graph, revision_ids,
 
270
                                             client_seen_revs, include_missing)
242
271
 
243
272
        # sorting trivially puts lexographically similar revision ids together.
244
273
        # Compression FTW.
 
274
        lines = []
245
275
        for revision, parents in sorted(result.items()):
246
276
            lines.append(' '.join((revision, ) + tuple(parents)))
247
277
 
271
301
        else:
272
302
            search_ids = repository.all_revision_ids()
273
303
        search = graph._make_breadth_first_searcher(search_ids)
274
 
        transitive_ids = set()
275
 
        map(transitive_ids.update, list(search))
 
304
        transitive_ids = set(itertools.chain.from_iterable(search))
276
305
        parent_map = graph.get_parent_map(transitive_ids)
277
306
        revision_graph = _strip_NULL_ghosts(parent_map)
278
307
        if revision_id and revision_id not in revision_graph:
297
326
        """
298
327
        try:
299
328
            found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
300
 
        except errors.RevisionNotPresent, err:
 
329
        except errors.RevisionNotPresent as err:
301
330
            if err.revision_id != known_pair[1]:
302
331
                raise AssertionError(
303
332
                    'get_rev_id_for_revno raised RevisionNotPresent for '
312
341
                ('history-incomplete', earliest_revno, earliest_revid))
313
342
 
314
343
 
 
344
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
 
345
 
 
346
    def do_repository_request(self, repository):
 
347
        """Return the serializer format for this repository.
 
348
 
 
349
        New in 2.5.0.
 
350
 
 
351
        :param repository: The repository to query
 
352
        :return: A smart server response ('ok', FORMAT)
 
353
        """
 
354
        serializer = repository.get_serializer_format()
 
355
        return SuccessfulSmartServerResponse(('ok', serializer))
 
356
 
 
357
 
315
358
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
316
359
 
317
360
    def do_repository_request(self, repository, revision_id):
319
362
 
320
363
        :param repository: The repository to query in.
321
364
        :param revision_id: The utf8 encoded revision_id to lookup.
322
 
        :return: A smart server response of ('ok', ) if the revision is
323
 
            present.
 
365
        :return: A smart server response of ('yes', ) if the revision is
 
366
            present. ('no', ) if it is missing.
324
367
        """
325
368
        if repository.has_revision(revision_id):
326
369
            return SuccessfulSmartServerResponse(('yes', ))
328
371
            return SuccessfulSmartServerResponse(('no', ))
329
372
 
330
373
 
 
374
class SmartServerRequestHasSignatureForRevisionId(
 
375
        SmartServerRepositoryRequest):
 
376
 
 
377
    def do_repository_request(self, repository, revision_id):
 
378
        """Return ok if a signature is present for a revision.
 
379
 
 
380
        Introduced in bzr 2.5.0.
 
381
 
 
382
        :param repository: The repository to query in.
 
383
        :param revision_id: The utf8 encoded revision_id to lookup.
 
384
        :return: A smart server response of ('yes', ) if a
 
385
            signature for the revision is present,
 
386
            ('no', ) if it is missing.
 
387
        """
 
388
        try:
 
389
            if repository.has_signature_for_revision_id(revision_id):
 
390
                return SuccessfulSmartServerResponse(('yes', ))
 
391
            else:
 
392
                return SuccessfulSmartServerResponse(('no', ))
 
393
        except errors.NoSuchRevision:
 
394
            return FailedSmartServerResponse(
 
395
                ('nosuchrevision', revision_id))
 
396
 
 
397
 
331
398
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
332
399
 
333
400
    def do_repository_request(self, repository, revid, committers):
353
420
            decoded_committers = True
354
421
        else:
355
422
            decoded_committers = None
356
 
        stats = repository.gather_stats(decoded_revision_id, decoded_committers)
 
423
        try:
 
424
            stats = repository.gather_stats(decoded_revision_id,
 
425
                decoded_committers)
 
426
        except errors.NoSuchRevision:
 
427
            return FailedSmartServerResponse(('nosuchrevision', revid))
357
428
 
358
429
        body = ''
359
 
        if stats.has_key('committers'):
 
430
        if 'committers' in stats:
360
431
            body += 'committers: %d\n' % stats['committers']
361
 
        if stats.has_key('firstrev'):
 
432
        if 'firstrev' in stats:
362
433
            body += 'firstrev: %.3f %d\n' % stats['firstrev']
363
 
        if stats.has_key('latestrev'):
 
434
        if 'latestrev' in stats:
364
435
             body += 'latestrev: %.3f %d\n' % stats['latestrev']
365
 
        if stats.has_key('revisions'):
 
436
        if 'revisions' in stats:
366
437
            body += 'revisions: %d\n' % stats['revisions']
367
 
        if stats.has_key('size'):
 
438
        if 'size' in stats:
368
439
            body += 'size: %d\n' % stats['size']
369
440
 
370
441
        return SuccessfulSmartServerResponse(('ok', ), body)
371
442
 
372
443
 
 
444
class SmartServerRepositoryGetRevisionSignatureText(
 
445
        SmartServerRepositoryRequest):
 
446
    """Return the signature text of a revision.
 
447
 
 
448
    New in 2.5.
 
449
    """
 
450
 
 
451
    def do_repository_request(self, repository, revision_id):
 
452
        """Return the result of repository.get_signature_text().
 
453
 
 
454
        :param repository: The repository to query in.
 
455
        :return: A smart server response of with the signature text as
 
456
            body.
 
457
        """
 
458
        try:
 
459
            text = repository.get_signature_text(revision_id)
 
460
        except errors.NoSuchRevision as err:
 
461
            return FailedSmartServerResponse(
 
462
                ('nosuchrevision', err.revision))
 
463
        return SuccessfulSmartServerResponse(('ok', ), text)
 
464
 
 
465
 
373
466
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
374
467
 
375
468
    def do_repository_request(self, repository):
385
478
            return SuccessfulSmartServerResponse(('no', ))
386
479
 
387
480
 
 
481
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
 
482
 
 
483
    def do_repository_request(self, repository):
 
484
        """Return the result of repository.make_working_trees().
 
485
 
 
486
        Introduced in bzr 2.5.0.
 
487
 
 
488
        :param repository: The repository to query in.
 
489
        :return: A smart server response of ('yes', ) if the repository uses
 
490
            working trees, and ('no', ) if it is not.
 
491
        """
 
492
        if repository.make_working_trees():
 
493
            return SuccessfulSmartServerResponse(('yes', ))
 
494
        else:
 
495
            return SuccessfulSmartServerResponse(('no', ))
 
496
 
 
497
 
388
498
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
389
499
 
390
500
    def do_repository_request(self, repository, token=''):
392
502
        if token == '':
393
503
            token = None
394
504
        try:
395
 
            token = repository.lock_write(token=token)
396
 
        except errors.LockContention, e:
 
505
            token = repository.lock_write(token=token).repository_token
 
506
        except errors.LockContention as e:
397
507
            return FailedSmartServerResponse(('LockContention',))
398
508
        except errors.UnlockableTransport:
399
509
            return FailedSmartServerResponse(('UnlockableTransport',))
400
 
        except errors.LockFailed, e:
 
510
        except errors.LockFailed as e:
401
511
            return FailedSmartServerResponse(('LockFailed',
402
512
                str(e.lock), str(e.why)))
403
513
        if token is not None:
413
523
    def do_repository_request(self, repository, to_network_name):
414
524
        """Get a stream for inserting into a to_format repository.
415
525
 
 
526
        The request body is 'search_bytes', a description of the revisions
 
527
        being requested.
 
528
 
 
529
        In 2.3 this verb added support for search_bytes == 'everything'.  Older
 
530
        implementations will respond with a BadSearch error, and clients should
 
531
        catch this and fallback appropriately.
 
532
 
416
533
        :param repository: The repository to stream from.
417
534
        :param to_network_name: The network name of the format of the target
418
535
            repository.
466
583
            source = repository._get_source(self._to_format)
467
584
            stream = source.get_stream(search_result)
468
585
        except Exception:
469
 
            exc_info = sys.exc_info()
470
586
            try:
471
587
                # On non-error, unlocking is done by the body stream handler.
472
588
                repository.unlock()
473
589
            finally:
474
 
                raise exc_info[0], exc_info[1], exc_info[2]
 
590
                raise
475
591
        return SuccessfulSmartServerResponse(('ok',),
476
592
            body_stream=self.body_stream(stream, repository))
477
593
 
480
596
        try:
481
597
            for bytes in byte_stream:
482
598
                yield bytes
483
 
        except errors.RevisionNotPresent, e:
 
599
        except errors.RevisionNotPresent as e:
484
600
            # This shouldn't be able to happen, but as we don't buffer
485
601
            # everything it can in theory happen.
486
602
            repository.unlock()
490
606
 
491
607
 
492
608
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
 
609
    """The same as Repository.get_stream, but will return stream CHK formats to
 
610
    clients.
 
611
 
 
612
    See SmartServerRepositoryGetStream._should_fake_unknown.
 
613
    
 
614
    New in 1.19.
 
615
    """
493
616
 
494
617
    def _should_fake_unknown(self):
495
618
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
505
628
        for record in substream:
506
629
            if record.storage_kind in ('chunked', 'fulltext'):
507
630
                serialised = record_to_fulltext_bytes(record)
508
 
            elif record.storage_kind == 'inventory-delta':
509
 
                serialised = record_to_inventory_delta_bytes(record)
510
631
            elif record.storage_kind == 'absent':
511
632
                raise ValueError("Absent factory for %s" % (record.key,))
512
633
            else:
544
665
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
545
666
    """
546
667
 
547
 
    def __init__(self, byte_stream):
 
668
    def __init__(self, byte_stream, record_counter):
548
669
        """Create a _ByteStreamDecoder."""
549
670
        self.stream_decoder = pack.ContainerPushParser()
550
671
        self.current_type = None
551
672
        self.first_bytes = None
552
673
        self.byte_stream = byte_stream
 
674
        self._record_counter = record_counter
 
675
        self.key_count = 0
553
676
 
554
677
    def iter_stream_decoder(self):
555
678
        """Iterate the contents of the pack from stream_decoder."""
580
703
 
581
704
    def record_stream(self):
582
705
        """Yield substream_type, substream from the byte stream."""
 
706
        def wrap_and_count(pb, rc, substream):
 
707
            """Yield records from stream while showing progress."""
 
708
            counter = 0
 
709
            if rc:
 
710
                if self.current_type != 'revisions' and self.key_count != 0:
 
711
                    # As we know the number of revisions now (in self.key_count)
 
712
                    # we can setup and use record_counter (rc).
 
713
                    if not rc.is_initialized():
 
714
                        rc.setup(self.key_count, self.key_count)
 
715
            for record in substream.read():
 
716
                if rc:
 
717
                    if rc.is_initialized() and counter == rc.STEP:
 
718
                        rc.increment(counter)
 
719
                        pb.update('Estimate', rc.current, rc.max)
 
720
                        counter = 0
 
721
                    if self.current_type == 'revisions':
 
722
                        # Total records is proportional to number of revs
 
723
                        # to fetch. With remote, we used self.key_count to
 
724
                        # track the number of revs. Once we have the revs
 
725
                        # counts in self.key_count, the progress bar changes
 
726
                        # from 'Estimating..' to 'Estimate' above.
 
727
                        self.key_count += 1
 
728
                        if counter == rc.STEP:
 
729
                            pb.update('Estimating..', self.key_count)
 
730
                            counter = 0
 
731
                counter += 1
 
732
                yield record
 
733
 
583
734
        self.seed_state()
584
 
        # Make and consume sub generators, one per substream type:
585
 
        while self.first_bytes is not None:
586
 
            substream = NetworkRecordStream(self.iter_substream_bytes())
587
 
            # after substream is fully consumed, self.current_type is set to
588
 
            # the next type, and self.first_bytes is set to the matching bytes.
589
 
            yield self.current_type, substream.read()
 
735
        with ui.ui_factory.nested_progress_bar() as pb:
 
736
            rc = self._record_counter
 
737
            try:
 
738
                # Make and consume sub generators, one per substream type:
 
739
                while self.first_bytes is not None:
 
740
                    substream = NetworkRecordStream(self.iter_substream_bytes())
 
741
                    # after substream is fully consumed, self.current_type is set
 
742
                    # to the next type, and self.first_bytes is set to the matching
 
743
                    # bytes.
 
744
                    yield self.current_type, wrap_and_count(pb, rc, substream)
 
745
            finally:
 
746
                if rc:
 
747
                    pb.update('Done', rc.max, rc.max)
590
748
 
591
749
    def seed_state(self):
592
750
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
597
755
        list(self.iter_substream_bytes())
598
756
 
599
757
 
600
 
def _byte_stream_to_stream(byte_stream):
 
758
def _byte_stream_to_stream(byte_stream, record_counter=None):
601
759
    """Convert a byte stream into a format and a stream.
602
760
 
603
761
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
604
762
    :return: (RepositoryFormat, stream_generator)
605
763
    """
606
 
    decoder = _ByteStreamDecoder(byte_stream)
 
764
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
607
765
    for bytes in byte_stream:
608
766
        decoder.stream_decoder.accept_bytes(bytes)
609
767
        for record in decoder.stream_decoder.read_pending_records(max=1):
617
775
    def do_repository_request(self, repository, token):
618
776
        try:
619
777
            repository.lock_write(token=token)
620
 
        except errors.TokenMismatch, e:
 
778
        except errors.TokenMismatch as e:
621
779
            return FailedSmartServerResponse(('TokenMismatch',))
622
780
        repository.dont_leave_lock_in_place()
623
781
        repository.unlock()
624
782
        return SuccessfulSmartServerResponse(('ok',))
625
783
 
626
784
 
 
785
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
 
786
    """Get the physical lock status for a repository.
 
787
 
 
788
    New in 2.5.
 
789
    """
 
790
 
 
791
    def do_repository_request(self, repository):
 
792
        if repository.get_physical_lock_status():
 
793
            return SuccessfulSmartServerResponse(('yes', ))
 
794
        else:
 
795
            return SuccessfulSmartServerResponse(('no', ))
 
796
 
 
797
 
627
798
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
628
799
 
629
800
    def do_repository_request(self, repository, str_bool_new_value):
657
828
 
658
829
    def _copy_to_tempdir(self, from_repo):
659
830
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
660
 
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
 
831
        tmp_bzrdir = from_repo.controldir._format.initialize(tmp_dirname)
661
832
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
662
833
        from_repo.copy_content_into(tmp_repo)
663
834
        return tmp_dirname, tmp_repo
664
835
 
665
836
    def _tarfile_response(self, tmp_dirname, compression):
666
 
        temp = tempfile.NamedTemporaryFile()
667
 
        try:
 
837
        with tempfile.NamedTemporaryFile() as temp:
668
838
            self._tarball_of_dir(tmp_dirname, compression, temp.file)
669
839
            # all finished; write the tempfile out to the network
670
840
            temp.seek(0)
671
841
            return SuccessfulSmartServerResponse(('ok',), temp.read())
672
842
            # FIXME: Don't read the whole thing into memory here; rather stream
673
843
            # it out from the file onto the network. mbp 20070411
674
 
        finally:
675
 
            temp.close()
676
844
 
677
845
    def _tarball_of_dir(self, dirname, compression, ofile):
678
846
        import tarfile
713
881
        tokens = [token for token in resume_tokens.split(' ') if token]
714
882
        self.tokens = tokens
715
883
        self.repository = repository
716
 
        self.queue = Queue.Queue()
 
884
        self.queue = queue.Queue()
717
885
        self.insert_thread = threading.Thread(target=self._inserter_thread)
718
886
        self.insert_thread.start()
719
887
 
744
912
        if self.insert_thread is not None:
745
913
            self.insert_thread.join()
746
914
        if not self.insert_ok:
747
 
            exc_info = self.insert_exception
748
 
            raise exc_info[0], exc_info[1], exc_info[2]
 
915
            try:
 
916
                reraise(*self.insert_exception)
 
917
            finally:
 
918
                del self.insert_exception
749
919
        write_group_tokens, missing_keys = self.insert_result
750
920
        if write_group_tokens or missing_keys:
751
921
            # bzip needed? missing keys should typically be a small set.
792
962
        self.do_insert_stream_request(repository, resume_tokens)
793
963
 
794
964
 
 
965
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
 
966
    """Add a revision signature text.
 
967
 
 
968
    New in 2.5.
 
969
    """
 
970
 
 
971
    def do_repository_request(self, repository, lock_token, revision_id,
 
972
            *write_group_tokens):
 
973
        """Add a revision signature text.
 
974
 
 
975
        :param repository: Repository to operate on
 
976
        :param lock_token: Lock token
 
977
        :param revision_id: Revision for which to add signature
 
978
        :param write_group_tokens: Write group tokens
 
979
        """
 
980
        self._lock_token = lock_token
 
981
        self._revision_id = revision_id
 
982
        self._write_group_tokens = write_group_tokens
 
983
        return None
 
984
 
 
985
    def do_body(self, body_bytes):
 
986
        """Add a signature text.
 
987
 
 
988
        :param body_bytes: GPG signature text
 
989
        :return: SuccessfulSmartServerResponse with arguments 'ok' and
 
990
            the list of new write group tokens.
 
991
        """
 
992
        self._repository.lock_write(token=self._lock_token)
 
993
        try:
 
994
            self._repository.resume_write_group(self._write_group_tokens)
 
995
            try:
 
996
                self._repository.add_signature_text(self._revision_id,
 
997
                    body_bytes)
 
998
            finally:
 
999
                new_write_group_tokens = self._repository.suspend_write_group()
 
1000
        finally:
 
1001
            self._repository.unlock()
 
1002
        return SuccessfulSmartServerResponse(
 
1003
            ('ok', ) + tuple(new_write_group_tokens))
 
1004
 
 
1005
 
 
1006
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
 
1007
    """Start a write group.
 
1008
 
 
1009
    New in 2.5.
 
1010
    """
 
1011
 
 
1012
    def do_repository_request(self, repository, lock_token):
 
1013
        """Start a write group."""
 
1014
        repository.lock_write(token=lock_token)
 
1015
        try:
 
1016
            repository.start_write_group()
 
1017
            try:
 
1018
                tokens = repository.suspend_write_group()
 
1019
            except errors.UnsuspendableWriteGroup:
 
1020
                return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
 
1021
        finally:
 
1022
            repository.unlock()
 
1023
        return SuccessfulSmartServerResponse(('ok', tokens))
 
1024
 
 
1025
 
 
1026
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
 
1027
    """Commit a write group.
 
1028
 
 
1029
    New in 2.5.
 
1030
    """
 
1031
 
 
1032
    def do_repository_request(self, repository, lock_token,
 
1033
            write_group_tokens):
 
1034
        """Commit a write group."""
 
1035
        repository.lock_write(token=lock_token)
 
1036
        try:
 
1037
            try:
 
1038
                repository.resume_write_group(write_group_tokens)
 
1039
            except errors.UnresumableWriteGroup as e:
 
1040
                return FailedSmartServerResponse(
 
1041
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1042
            try:
 
1043
                repository.commit_write_group()
 
1044
            except:
 
1045
                write_group_tokens = repository.suspend_write_group()
 
1046
                # FIXME JRV 2011-11-19: What if the write_group_tokens
 
1047
                # have changed?
 
1048
                raise
 
1049
        finally:
 
1050
            repository.unlock()
 
1051
        return SuccessfulSmartServerResponse(('ok', ))
 
1052
 
 
1053
 
 
1054
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
 
1055
    """Abort a write group.
 
1056
 
 
1057
    New in 2.5.
 
1058
    """
 
1059
 
 
1060
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1061
        """Abort a write group."""
 
1062
        repository.lock_write(token=lock_token)
 
1063
        try:
 
1064
            try:
 
1065
                repository.resume_write_group(write_group_tokens)
 
1066
            except errors.UnresumableWriteGroup as e:
 
1067
                return FailedSmartServerResponse(
 
1068
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1069
                repository.abort_write_group()
 
1070
        finally:
 
1071
            repository.unlock()
 
1072
        return SuccessfulSmartServerResponse(('ok', ))
 
1073
 
 
1074
 
 
1075
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
 
1076
    """Check that a write group is still valid.
 
1077
 
 
1078
    New in 2.5.
 
1079
    """
 
1080
 
 
1081
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1082
        """Abort a write group."""
 
1083
        repository.lock_write(token=lock_token)
 
1084
        try:
 
1085
            try:
 
1086
                repository.resume_write_group(write_group_tokens)
 
1087
            except errors.UnresumableWriteGroup as e:
 
1088
                return FailedSmartServerResponse(
 
1089
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1090
            else:
 
1091
                repository.suspend_write_group()
 
1092
        finally:
 
1093
            repository.unlock()
 
1094
        return SuccessfulSmartServerResponse(('ok', ))
 
1095
 
 
1096
 
 
1097
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
 
1098
    """Retrieve all of the revision ids in a repository.
 
1099
 
 
1100
    New in 2.5.
 
1101
    """
 
1102
 
 
1103
    def do_repository_request(self, repository):
 
1104
        revids = repository.all_revision_ids()
 
1105
        return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
 
1106
 
 
1107
 
 
1108
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
 
1109
    """Reconcile a repository.
 
1110
 
 
1111
    New in 2.5.
 
1112
    """
 
1113
 
 
1114
    def do_repository_request(self, repository, lock_token):
 
1115
        try:
 
1116
            repository.lock_write(token=lock_token)
 
1117
        except errors.TokenLockingNotSupported as e:
 
1118
            return FailedSmartServerResponse(
 
1119
                ('TokenLockingNotSupported', ))
 
1120
        try:
 
1121
            reconciler = repository.reconcile()
 
1122
        finally:
 
1123
            repository.unlock()
 
1124
        body = [
 
1125
            "garbage_inventories: %d\n" % reconciler.garbage_inventories,
 
1126
            "inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
 
1127
            ]
 
1128
        return SuccessfulSmartServerResponse(('ok', ), "".join(body))
 
1129
 
 
1130
 
 
1131
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
 
1132
    """Pack a repository.
 
1133
 
 
1134
    New in 2.5.
 
1135
    """
 
1136
 
 
1137
    def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
 
1138
        self._repository = repository
 
1139
        self._lock_token = lock_token
 
1140
        if clean_obsolete_packs == 'True':
 
1141
            self._clean_obsolete_packs = True
 
1142
        else:
 
1143
            self._clean_obsolete_packs = False
 
1144
        return None
 
1145
 
 
1146
    def do_body(self, body_bytes):
 
1147
        if body_bytes == "":
 
1148
            hint = None
 
1149
        else:
 
1150
            hint = body_bytes.splitlines()
 
1151
        self._repository.lock_write(token=self._lock_token)
 
1152
        try:
 
1153
            self._repository.pack(hint, self._clean_obsolete_packs)
 
1154
        finally:
 
1155
            self._repository.unlock()
 
1156
        return SuccessfulSmartServerResponse(("ok", ), )
 
1157
 
 
1158
 
 
1159
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
 
1160
    """Iterate over the contents of files.
 
1161
 
 
1162
    The client sends a list of desired files to stream, one
 
1163
    per line, and as tuples of file id and revision, separated by
 
1164
    \0.
 
1165
 
 
1166
    The server replies with a stream. Each entry is preceded by a header,
 
1167
    which can either be:
 
1168
 
 
1169
    * "ok\x00IDX\n" where IDX is the index of the entry in the desired files
 
1170
        list sent by the client. This header is followed by the contents of
 
1171
        the file, bzip2-compressed.
 
1172
    * "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
 
1173
        The client can then raise an appropriate RevisionNotPresent error
 
1174
        or check its fallback repositories.
 
1175
 
 
1176
    New in 2.5.
 
1177
    """
 
1178
 
 
1179
    def body_stream(self, repository, desired_files):
 
1180
        with self._repository.lock_read():
 
1181
            text_keys = {}
 
1182
            for i, key in enumerate(desired_files):
 
1183
                text_keys[key] = i
 
1184
            for record in repository.texts.get_record_stream(text_keys,
 
1185
                    'unordered', True):
 
1186
                identifier = text_keys[record.key]
 
1187
                if record.storage_kind == 'absent':
 
1188
                    yield "absent\0%s\0%s\0%d\n" % (record.key[0],
 
1189
                        record.key[1], identifier)
 
1190
                    # FIXME: Way to abort early?
 
1191
                    continue
 
1192
                yield "ok\0%d\n" % identifier
 
1193
                compressor = zlib.compressobj()
 
1194
                for bytes in record.get_bytes_as('chunked'):
 
1195
                    data = compressor.compress(bytes)
 
1196
                    if data:
 
1197
                        yield data
 
1198
                data = compressor.flush()
 
1199
                if data:
 
1200
                    yield data
 
1201
 
 
1202
    def do_body(self, body_bytes):
 
1203
        desired_files = [
 
1204
            tuple(l.split("\0")) for l in body_bytes.splitlines()]
 
1205
        return SuccessfulSmartServerResponse(('ok', ),
 
1206
            body_stream=self.body_stream(self._repository, desired_files))
 
1207
 
 
1208
    def do_repository_request(self, repository):
 
1209
        # Signal that we want a body
 
1210
        return None
 
1211
 
 
1212
 
 
1213
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
 
1214
    """Stream a list of revisions.
 
1215
 
 
1216
    The client sends a list of newline-separated revision ids in the
 
1217
    body of the request and the server replies with the serializer format,
 
1218
    and a stream of bzip2-compressed revision texts (using the specified
 
1219
    serializer format).
 
1220
 
 
1221
    Any revisions the server does not have are omitted from the stream.
 
1222
 
 
1223
    New in 2.5.
 
1224
    """
 
1225
 
 
1226
    def do_repository_request(self, repository):
 
1227
        self._repository = repository
 
1228
        # Signal there is a body
 
1229
        return None
 
1230
 
 
1231
    def do_body(self, body_bytes):
 
1232
        revision_ids = body_bytes.split("\n")
 
1233
        return SuccessfulSmartServerResponse(
 
1234
            ('ok', self._repository.get_serializer_format()),
 
1235
            body_stream=self.body_stream(self._repository, revision_ids))
 
1236
 
 
1237
    def body_stream(self, repository, revision_ids):
 
1238
        with self._repository.lock_read():
 
1239
            for record in repository.revisions.get_record_stream(
 
1240
                [(revid,) for revid in revision_ids], 'unordered', True):
 
1241
                if record.storage_kind == 'absent':
 
1242
                    continue
 
1243
                yield zlib.compress(record.get_bytes_as('fulltext'))
 
1244
 
 
1245
 
 
1246
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
 
1247
    """Get the inventory deltas for a set of revision ids.
 
1248
 
 
1249
    This accepts a list of revision ids, and then sends a chain
 
1250
    of deltas for the inventories of those revisions. The first
 
1251
    revision will be empty.
 
1252
 
 
1253
    The server writes back zlibbed serialized inventory deltas,
 
1254
    in the ordering specified. The base for each delta is the
 
1255
    inventory generated by the previous delta.
 
1256
 
 
1257
    New in 2.5.
 
1258
    """
 
1259
 
 
1260
    def _inventory_delta_stream(self, repository, ordering, revids):
 
1261
        prev_inv = _mod_inventory.Inventory(root_id=None,
 
1262
            revision_id=_mod_revision.NULL_REVISION)
 
1263
        serializer = inventory_delta.InventoryDeltaSerializer(
 
1264
            repository.supports_rich_root(),
 
1265
            repository._format.supports_tree_reference)
 
1266
        with repository.lock_read():
 
1267
            for inv, revid in repository._iter_inventories(revids, ordering):
 
1268
                if inv is None:
 
1269
                    continue
 
1270
                inv_delta = inv._make_delta(prev_inv)
 
1271
                lines = serializer.delta_to_lines(
 
1272
                    prev_inv.revision_id, inv.revision_id, inv_delta)
 
1273
                yield ChunkedContentFactory(inv.revision_id, None, None, lines)
 
1274
                prev_inv = inv
 
1275
 
 
1276
    def body_stream(self, repository, ordering, revids):
 
1277
        substream = self._inventory_delta_stream(repository,
 
1278
            ordering, revids)
 
1279
        return _stream_to_byte_stream([('inventory-deltas', substream)],
 
1280
            repository._format)
 
1281
 
 
1282
    def do_body(self, body_bytes):
 
1283
        return SuccessfulSmartServerResponse(('ok', ),
 
1284
            body_stream=self.body_stream(self._repository, self._ordering,
 
1285
                body_bytes.splitlines()))
 
1286
 
 
1287
    def do_repository_request(self, repository, ordering):
 
1288
        if ordering == 'unordered':
 
1289
            # inventory deltas for a topologically sorted stream
 
1290
            # are likely to be smaller
 
1291
            ordering = 'topological'
 
1292
        self._ordering = ordering
 
1293
        # Signal that we want a body
 
1294
        return None