/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/smart/repository.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-10 16:40:42 UTC
  • mfrom: (6653.6.7 rename-controldir)
  • mto: This revision was merged to the branch mainline in revision 6690.
  • Revision ID: jelmer@jelmer.uk-20170610164042-zrxqgy2htyduvke2
MergeĀ rename-controldirĀ branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006-2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Server-side repository related request implementations."""
 
18
 
 
19
from __future__ import absolute_import
 
20
 
 
21
import bz2
 
22
import itertools
 
23
import os
 
24
try:
 
25
    import queue
 
26
except ImportError:
 
27
    import Queue as queue
 
28
import sys
 
29
import tempfile
 
30
import threading
 
31
import zlib
 
32
 
 
33
from .. import (
 
34
    bencode,
 
35
    errors,
 
36
    estimate_compressed_size,
 
37
    osutils,
 
38
    trace,
 
39
    ui,
 
40
    )
 
41
from ..bzr import (
 
42
    inventory as _mod_inventory,
 
43
    inventory_delta,
 
44
    pack,
 
45
    vf_search,
 
46
    )
 
47
from ..bzr.bzrdir import BzrDir
 
48
from ..sixish import (
 
49
    reraise,
 
50
)
 
51
from .request import (
 
52
    FailedSmartServerResponse,
 
53
    SmartServerRequest,
 
54
    SuccessfulSmartServerResponse,
 
55
    )
 
56
from ..repository import _strip_NULL_ghosts, network_format_registry
 
57
from .. import revision as _mod_revision
 
58
from ..bzr.versionedfile import (
 
59
    ChunkedContentFactory,
 
60
    NetworkRecordStream,
 
61
    record_to_fulltext_bytes,
 
62
    )
 
63
 
 
64
 
 
65
class SmartServerRepositoryRequest(SmartServerRequest):
 
66
    """Common base class for Repository requests."""
 
67
 
 
68
    def do(self, path, *args):
 
69
        """Execute a repository request.
 
70
 
 
71
        All Repository requests take a path to the repository as their first
 
72
        argument.  The repository must be at the exact path given by the
 
73
        client - no searching is done.
 
74
 
 
75
        The actual logic is delegated to self.do_repository_request.
 
76
 
 
77
        :param client_path: The path for the repository as received from the
 
78
            client.
 
79
        :return: A SmartServerResponse from self.do_repository_request().
 
80
        """
 
81
        transport = self.transport_from_client_path(path)
 
82
        bzrdir = BzrDir.open_from_transport(transport)
 
83
        # Save the repository for use with do_body.
 
84
        self._repository = bzrdir.open_repository()
 
85
        return self.do_repository_request(self._repository, *args)
 
86
 
 
87
    def do_repository_request(self, repository, *args):
 
88
        """Override to provide an implementation for a verb."""
 
89
        # No-op for verbs that take bodies (None as a result indicates a body
 
90
        # is expected)
 
91
        return None
 
92
 
 
93
    def recreate_search(self, repository, search_bytes, discard_excess=False):
 
94
        """Recreate a search from its serialised form.
 
95
 
 
96
        :param discard_excess: If True, and the search refers to data we don't
 
97
            have, just silently accept that fact - the verb calling
 
98
            recreate_search trusts that clients will look for missing things
 
99
            they expected and get it from elsewhere.
 
100
        """
 
101
        if search_bytes == 'everything':
 
102
            return vf_search.EverythingResult(repository), None
 
103
        lines = search_bytes.split('\n')
 
104
        if lines[0] == 'ancestry-of':
 
105
            heads = lines[1:]
 
106
            search_result = vf_search.PendingAncestryResult(heads, repository)
 
107
            return search_result, None
 
108
        elif lines[0] == 'search':
 
109
            return self.recreate_search_from_recipe(repository, lines[1:],
 
110
                discard_excess=discard_excess)
 
111
        else:
 
112
            return (None, FailedSmartServerResponse(('BadSearch',)))
 
113
 
 
114
    def recreate_search_from_recipe(self, repository, lines,
 
115
        discard_excess=False):
 
116
        """Recreate a specific revision search (vs a from-tip search).
 
117
 
 
118
        :param discard_excess: If True, and the search refers to data we don't
 
119
            have, just silently accept that fact - the verb calling
 
120
            recreate_search trusts that clients will look for missing things
 
121
            they expected and get it from elsewhere.
 
122
        """
 
123
        start_keys = set(lines[0].split(' '))
 
124
        exclude_keys = set(lines[1].split(' '))
 
125
        revision_count = int(lines[2])
 
126
        repository.lock_read()
 
127
        try:
 
128
            search = repository.get_graph()._make_breadth_first_searcher(
 
129
                start_keys)
 
130
            while True:
 
131
                try:
 
132
                    next_revs = next(search)
 
133
                except StopIteration:
 
134
                    break
 
135
                search.stop_searching_any(exclude_keys.intersection(next_revs))
 
136
            (started_keys, excludes, included_keys) = search.get_state()
 
137
            if (not discard_excess and len(included_keys) != revision_count):
 
138
                # we got back a different amount of data than expected, this
 
139
                # gets reported as NoSuchRevision, because less revisions
 
140
                # indicates missing revisions, and more should never happen as
 
141
                # the excludes list considers ghosts and ensures that ghost
 
142
                # filling races are not a problem.
 
143
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
144
            search_result = vf_search.SearchResult(started_keys, excludes,
 
145
                len(included_keys), included_keys)
 
146
            return (search_result, None)
 
147
        finally:
 
148
            repository.unlock()
 
149
 
 
150
 
 
151
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
 
152
    """Calls self.do_readlocked_repository_request."""
 
153
 
 
154
    def do_repository_request(self, repository, *args):
 
155
        """Read lock a repository for do_readlocked_repository_request."""
 
156
        repository.lock_read()
 
157
        try:
 
158
            return self.do_readlocked_repository_request(repository, *args)
 
159
        finally:
 
160
            repository.unlock()
 
161
 
 
162
 
 
163
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
 
164
    """Break a repository lock."""
 
165
 
 
166
    def do_repository_request(self, repository):
 
167
        repository.break_lock()
 
168
        return SuccessfulSmartServerResponse(('ok', ))
 
169
 
 
170
 
 
171
_lsprof_count = 0
 
172
 
 
173
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
 
174
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
 
175
 
 
176
    no_extra_results = False
 
177
 
 
178
    def do_repository_request(self, repository, *revision_ids):
 
179
        """Get parent details for some revisions.
 
180
 
 
181
        All the parents for revision_ids are returned. Additionally up to 64KB
 
182
        of additional parent data found by performing a breadth first search
 
183
        from revision_ids is returned. The verb takes a body containing the
 
184
        current search state, see do_body for details.
 
185
 
 
186
        If 'include-missing:' is in revision_ids, ghosts encountered in the
 
187
        graph traversal for getting parent data are included in the result with
 
188
        a prefix of 'missing:'.
 
189
 
 
190
        :param repository: The repository to query in.
 
191
        :param revision_ids: The utf8 encoded revision_id to answer for.
 
192
        """
 
193
        self._revision_ids = revision_ids
 
194
        return None # Signal that we want a body.
 
195
 
 
196
    def do_body(self, body_bytes):
 
197
        """Process the current search state and perform the parent lookup.
 
198
 
 
199
        :return: A smart server response where the body contains an utf8
 
200
            encoded flattened list of the parents of the revisions (the same
 
201
            format as Repository.get_revision_graph) which has been bz2
 
202
            compressed.
 
203
        """
 
204
        repository = self._repository
 
205
        repository.lock_read()
 
206
        try:
 
207
            return self._do_repository_request(body_bytes)
 
208
        finally:
 
209
            repository.unlock()
 
210
 
 
211
    def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
 
212
                               include_missing, max_size=65536):
 
213
        result = {}
 
214
        queried_revs = set()
 
215
        estimator = estimate_compressed_size.ZLibEstimator(max_size)
 
216
        next_revs = revision_ids
 
217
        first_loop_done = False
 
218
        while next_revs:
 
219
            queried_revs.update(next_revs)
 
220
            parent_map = repo_graph.get_parent_map(next_revs)
 
221
            current_revs = next_revs
 
222
            next_revs = set()
 
223
            for revision_id in current_revs:
 
224
                missing_rev = False
 
225
                parents = parent_map.get(revision_id)
 
226
                if parents is not None:
 
227
                    # adjust for the wire
 
228
                    if parents == (_mod_revision.NULL_REVISION,):
 
229
                        parents = ()
 
230
                    # prepare the next query
 
231
                    next_revs.update(parents)
 
232
                    encoded_id = revision_id
 
233
                else:
 
234
                    missing_rev = True
 
235
                    encoded_id = "missing:" + revision_id
 
236
                    parents = []
 
237
                if (revision_id not in client_seen_revs and
 
238
                    (not missing_rev or include_missing)):
 
239
                    # Client does not have this revision, give it to it.
 
240
                    # add parents to the result
 
241
                    result[encoded_id] = parents
 
242
                    # Approximate the serialized cost of this revision_id.
 
243
                    line = '%s %s\n' % (encoded_id, ' '.join(parents))
 
244
                    estimator.add_content(line)
 
245
            # get all the directly asked for parents, and then flesh out to
 
246
            # 64K (compressed) or so. We do one level of depth at a time to
 
247
            # stay in sync with the client. The 250000 magic number is
 
248
            # estimated compression ratio taken from bzr.dev itself.
 
249
            if self.no_extra_results or (first_loop_done and estimator.full()):
 
250
                trace.mutter('size: %d, z_size: %d'
 
251
                             % (estimator._uncompressed_size_added,
 
252
                                estimator._compressed_size_added))
 
253
                next_revs = set()
 
254
                break
 
255
            # don't query things we've already queried
 
256
            next_revs = next_revs.difference(queried_revs)
 
257
            first_loop_done = True
 
258
        return result
 
259
 
 
260
    def _do_repository_request(self, body_bytes):
 
261
        repository = self._repository
 
262
        revision_ids = set(self._revision_ids)
 
263
        include_missing = 'include-missing:' in revision_ids
 
264
        if include_missing:
 
265
            revision_ids.remove('include-missing:')
 
266
        body_lines = body_bytes.split('\n')
 
267
        search_result, error = self.recreate_search_from_recipe(
 
268
            repository, body_lines)
 
269
        if error is not None:
 
270
            return error
 
271
        # TODO might be nice to start up the search again; but thats not
 
272
        # written or tested yet.
 
273
        client_seen_revs = set(search_result.get_keys())
 
274
        # Always include the requested ids.
 
275
        client_seen_revs.difference_update(revision_ids)
 
276
 
 
277
        repo_graph = repository.get_graph()
 
278
        result = self._expand_requested_revs(repo_graph, revision_ids,
 
279
                                             client_seen_revs, include_missing)
 
280
 
 
281
        # sorting trivially puts lexographically similar revision ids together.
 
282
        # Compression FTW.
 
283
        lines = []
 
284
        for revision, parents in sorted(result.items()):
 
285
            lines.append(' '.join((revision, ) + tuple(parents)))
 
286
 
 
287
        return SuccessfulSmartServerResponse(
 
288
            ('ok', ), bz2.compress('\n'.join(lines)))
 
289
 
 
290
 
 
291
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
 
292
 
 
293
    def do_readlocked_repository_request(self, repository, revision_id):
 
294
        """Return the result of repository.get_revision_graph(revision_id).
 
295
 
 
296
        Deprecated as of bzr 1.4, but supported for older clients.
 
297
 
 
298
        :param repository: The repository to query in.
 
299
        :param revision_id: The utf8 encoded revision_id to get a graph from.
 
300
        :return: A smart server response where the body contains an utf8
 
301
            encoded flattened list of the revision graph.
 
302
        """
 
303
        if not revision_id:
 
304
            revision_id = None
 
305
 
 
306
        lines = []
 
307
        graph = repository.get_graph()
 
308
        if revision_id:
 
309
            search_ids = [revision_id]
 
310
        else:
 
311
            search_ids = repository.all_revision_ids()
 
312
        search = graph._make_breadth_first_searcher(search_ids)
 
313
        transitive_ids = set(itertools.chain.from_iterable(search))
 
314
        parent_map = graph.get_parent_map(transitive_ids)
 
315
        revision_graph = _strip_NULL_ghosts(parent_map)
 
316
        if revision_id and revision_id not in revision_graph:
 
317
            # Note that we return an empty body, rather than omitting the body.
 
318
            # This way the client knows that it can always expect to find a body
 
319
            # in the response for this method, even in the error case.
 
320
            return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
 
321
 
 
322
        for revision, parents in revision_graph.items():
 
323
            lines.append(' '.join((revision, ) + tuple(parents)))
 
324
 
 
325
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
 
326
 
 
327
 
 
328
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
 
329
 
 
330
    def do_readlocked_repository_request(self, repository, revno,
 
331
            known_pair):
 
332
        """Find the revid for a given revno, given a known revno/revid pair.
 
333
        
 
334
        New in 1.17.
 
335
        """
 
336
        try:
 
337
            found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
 
338
        except errors.RevisionNotPresent as err:
 
339
            if err.revision_id != known_pair[1]:
 
340
                raise AssertionError(
 
341
                    'get_rev_id_for_revno raised RevisionNotPresent for '
 
342
                    'non-initial revision: ' + err.revision_id)
 
343
            return FailedSmartServerResponse(
 
344
                ('nosuchrevision', err.revision_id))
 
345
        if found_flag:
 
346
            return SuccessfulSmartServerResponse(('ok', result))
 
347
        else:
 
348
            earliest_revno, earliest_revid = result
 
349
            return SuccessfulSmartServerResponse(
 
350
                ('history-incomplete', earliest_revno, earliest_revid))
 
351
 
 
352
 
 
353
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
 
354
 
 
355
    def do_repository_request(self, repository):
 
356
        """Return the serializer format for this repository.
 
357
 
 
358
        New in 2.5.0.
 
359
 
 
360
        :param repository: The repository to query
 
361
        :return: A smart server response ('ok', FORMAT)
 
362
        """
 
363
        serializer = repository.get_serializer_format()
 
364
        return SuccessfulSmartServerResponse(('ok', serializer))
 
365
 
 
366
 
 
367
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
 
368
 
 
369
    def do_repository_request(self, repository, revision_id):
 
370
        """Return ok if a specific revision is in the repository at path.
 
371
 
 
372
        :param repository: The repository to query in.
 
373
        :param revision_id: The utf8 encoded revision_id to lookup.
 
374
        :return: A smart server response of ('yes', ) if the revision is
 
375
            present. ('no', ) if it is missing.
 
376
        """
 
377
        if repository.has_revision(revision_id):
 
378
            return SuccessfulSmartServerResponse(('yes', ))
 
379
        else:
 
380
            return SuccessfulSmartServerResponse(('no', ))
 
381
 
 
382
 
 
383
class SmartServerRequestHasSignatureForRevisionId(
 
384
        SmartServerRepositoryRequest):
 
385
 
 
386
    def do_repository_request(self, repository, revision_id):
 
387
        """Return ok if a signature is present for a revision.
 
388
 
 
389
        Introduced in bzr 2.5.0.
 
390
 
 
391
        :param repository: The repository to query in.
 
392
        :param revision_id: The utf8 encoded revision_id to lookup.
 
393
        :return: A smart server response of ('yes', ) if a
 
394
            signature for the revision is present,
 
395
            ('no', ) if it is missing.
 
396
        """
 
397
        try:
 
398
            if repository.has_signature_for_revision_id(revision_id):
 
399
                return SuccessfulSmartServerResponse(('yes', ))
 
400
            else:
 
401
                return SuccessfulSmartServerResponse(('no', ))
 
402
        except errors.NoSuchRevision:
 
403
            return FailedSmartServerResponse(
 
404
                ('nosuchrevision', revision_id))
 
405
 
 
406
 
 
407
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
 
408
 
 
409
    def do_repository_request(self, repository, revid, committers):
 
410
        """Return the result of repository.gather_stats().
 
411
 
 
412
        :param repository: The repository to query in.
 
413
        :param revid: utf8 encoded rev id or an empty string to indicate None
 
414
        :param committers: 'yes' or 'no'.
 
415
 
 
416
        :return: A SmartServerResponse ('ok',), a encoded body looking like
 
417
              committers: 1
 
418
              firstrev: 1234.230 0
 
419
              latestrev: 345.700 3600
 
420
              revisions: 2
 
421
 
 
422
              But containing only fields returned by the gather_stats() call
 
423
        """
 
424
        if revid == '':
 
425
            decoded_revision_id = None
 
426
        else:
 
427
            decoded_revision_id = revid
 
428
        if committers == 'yes':
 
429
            decoded_committers = True
 
430
        else:
 
431
            decoded_committers = None
 
432
        try:
 
433
            stats = repository.gather_stats(decoded_revision_id,
 
434
                decoded_committers)
 
435
        except errors.NoSuchRevision:
 
436
            return FailedSmartServerResponse(('nosuchrevision', revid))
 
437
 
 
438
        body = ''
 
439
        if 'committers' in stats:
 
440
            body += 'committers: %d\n' % stats['committers']
 
441
        if 'firstrev' in stats:
 
442
            body += 'firstrev: %.3f %d\n' % stats['firstrev']
 
443
        if 'latestrev' in stats:
 
444
             body += 'latestrev: %.3f %d\n' % stats['latestrev']
 
445
        if 'revisions' in stats:
 
446
            body += 'revisions: %d\n' % stats['revisions']
 
447
        if 'size' in stats:
 
448
            body += 'size: %d\n' % stats['size']
 
449
 
 
450
        return SuccessfulSmartServerResponse(('ok', ), body)
 
451
 
 
452
 
 
453
class SmartServerRepositoryGetRevisionSignatureText(
 
454
        SmartServerRepositoryRequest):
 
455
    """Return the signature text of a revision.
 
456
 
 
457
    New in 2.5.
 
458
    """
 
459
 
 
460
    def do_repository_request(self, repository, revision_id):
 
461
        """Return the result of repository.get_signature_text().
 
462
 
 
463
        :param repository: The repository to query in.
 
464
        :return: A smart server response of with the signature text as
 
465
            body.
 
466
        """
 
467
        try:
 
468
            text = repository.get_signature_text(revision_id)
 
469
        except errors.NoSuchRevision as err:
 
470
            return FailedSmartServerResponse(
 
471
                ('nosuchrevision', err.revision))
 
472
        return SuccessfulSmartServerResponse(('ok', ), text)
 
473
 
 
474
 
 
475
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
 
476
 
 
477
    def do_repository_request(self, repository):
 
478
        """Return the result of repository.is_shared().
 
479
 
 
480
        :param repository: The repository to query in.
 
481
        :return: A smart server response of ('yes', ) if the repository is
 
482
            shared, and ('no', ) if it is not.
 
483
        """
 
484
        if repository.is_shared():
 
485
            return SuccessfulSmartServerResponse(('yes', ))
 
486
        else:
 
487
            return SuccessfulSmartServerResponse(('no', ))
 
488
 
 
489
 
 
490
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
 
491
 
 
492
    def do_repository_request(self, repository):
 
493
        """Return the result of repository.make_working_trees().
 
494
 
 
495
        Introduced in bzr 2.5.0.
 
496
 
 
497
        :param repository: The repository to query in.
 
498
        :return: A smart server response of ('yes', ) if the repository uses
 
499
            working trees, and ('no', ) if it is not.
 
500
        """
 
501
        if repository.make_working_trees():
 
502
            return SuccessfulSmartServerResponse(('yes', ))
 
503
        else:
 
504
            return SuccessfulSmartServerResponse(('no', ))
 
505
 
 
506
 
 
507
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
 
508
 
 
509
    def do_repository_request(self, repository, token=''):
 
510
        # XXX: this probably should not have a token.
 
511
        if token == '':
 
512
            token = None
 
513
        try:
 
514
            token = repository.lock_write(token=token).repository_token
 
515
        except errors.LockContention as e:
 
516
            return FailedSmartServerResponse(('LockContention',))
 
517
        except errors.UnlockableTransport:
 
518
            return FailedSmartServerResponse(('UnlockableTransport',))
 
519
        except errors.LockFailed as e:
 
520
            return FailedSmartServerResponse(('LockFailed',
 
521
                str(e.lock), str(e.why)))
 
522
        if token is not None:
 
523
            repository.leave_lock_in_place()
 
524
        repository.unlock()
 
525
        if token is None:
 
526
            token = ''
 
527
        return SuccessfulSmartServerResponse(('ok', token))
 
528
 
 
529
 
 
530
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
 
531
 
 
532
    def do_repository_request(self, repository, to_network_name):
 
533
        """Get a stream for inserting into a to_format repository.
 
534
 
 
535
        The request body is 'search_bytes', a description of the revisions
 
536
        being requested.
 
537
 
 
538
        In 2.3 this verb added support for search_bytes == 'everything'.  Older
 
539
        implementations will respond with a BadSearch error, and clients should
 
540
        catch this and fallback appropriately.
 
541
 
 
542
        :param repository: The repository to stream from.
 
543
        :param to_network_name: The network name of the format of the target
 
544
            repository.
 
545
        """
 
546
        self._to_format = network_format_registry.get(to_network_name)
 
547
        if self._should_fake_unknown():
 
548
            return FailedSmartServerResponse(
 
549
                ('UnknownMethod', 'Repository.get_stream'))
 
550
        return None # Signal that we want a body.
 
551
 
 
552
    def _should_fake_unknown(self):
 
553
        """Return True if we should return UnknownMethod to the client.
 
554
        
 
555
        This is a workaround for bugs in pre-1.19 clients that claim to
 
556
        support receiving streams of CHK repositories.  The pre-1.19 client
 
557
        expects inventory records to be serialized in the format defined by
 
558
        to_network_name, but in pre-1.19 (at least) that format definition
 
559
        tries to use the xml5 serializer, which does not correctly handle
 
560
        rich-roots.  After 1.19 the client can also accept inventory-deltas
 
561
        (which avoids this issue), and those clients will use the
 
562
        Repository.get_stream_1.19 verb instead of this one.
 
563
        So: if this repository is CHK, and the to_format doesn't match,
 
564
        we should just fake an UnknownSmartMethod error so that the client
 
565
        will fallback to VFS, rather than sending it a stream we know it
 
566
        cannot handle.
 
567
        """
 
568
        from_format = self._repository._format
 
569
        to_format = self._to_format
 
570
        if not from_format.supports_chks:
 
571
            # Source not CHK: that's ok
 
572
            return False
 
573
        if (to_format.supports_chks and
 
574
            from_format.repository_class is to_format.repository_class and
 
575
            from_format._serializer == to_format._serializer):
 
576
            # Source is CHK, but target matches: that's ok
 
577
            # (e.g. 2a->2a, or CHK2->2a)
 
578
            return False
 
579
        # Source is CHK, and target is not CHK or incompatible CHK.  We can't
 
580
        # generate a compatible stream.
 
581
        return True
 
582
 
 
583
    def do_body(self, body_bytes):
 
584
        repository = self._repository
 
585
        repository.lock_read()
 
586
        try:
 
587
            search_result, error = self.recreate_search(repository, body_bytes,
 
588
                discard_excess=True)
 
589
            if error is not None:
 
590
                repository.unlock()
 
591
                return error
 
592
            source = repository._get_source(self._to_format)
 
593
            stream = source.get_stream(search_result)
 
594
        except Exception:
 
595
            try:
 
596
                # On non-error, unlocking is done by the body stream handler.
 
597
                repository.unlock()
 
598
            finally:
 
599
                raise
 
600
        return SuccessfulSmartServerResponse(('ok',),
 
601
            body_stream=self.body_stream(stream, repository))
 
602
 
 
603
    def body_stream(self, stream, repository):
 
604
        byte_stream = _stream_to_byte_stream(stream, repository._format)
 
605
        try:
 
606
            for bytes in byte_stream:
 
607
                yield bytes
 
608
        except errors.RevisionNotPresent as e:
 
609
            # This shouldn't be able to happen, but as we don't buffer
 
610
            # everything it can in theory happen.
 
611
            repository.unlock()
 
612
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
613
        else:
 
614
            repository.unlock()
 
615
 
 
616
 
 
617
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
 
618
    """The same as Repository.get_stream, but will return stream CHK formats to
 
619
    clients.
 
620
 
 
621
    See SmartServerRepositoryGetStream._should_fake_unknown.
 
622
    
 
623
    New in 1.19.
 
624
    """
 
625
 
 
626
    def _should_fake_unknown(self):
 
627
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
 
628
        return False
 
629
 
 
630
 
 
631
def _stream_to_byte_stream(stream, src_format):
 
632
    """Convert a record stream to a self delimited byte stream."""
 
633
    pack_writer = pack.ContainerSerialiser()
 
634
    yield pack_writer.begin()
 
635
    yield pack_writer.bytes_record(src_format.network_name(), '')
 
636
    for substream_type, substream in stream:
 
637
        for record in substream:
 
638
            if record.storage_kind in ('chunked', 'fulltext'):
 
639
                serialised = record_to_fulltext_bytes(record)
 
640
            elif record.storage_kind == 'absent':
 
641
                raise ValueError("Absent factory for %s" % (record.key,))
 
642
            else:
 
643
                serialised = record.get_bytes_as(record.storage_kind)
 
644
            if serialised:
 
645
                # Some streams embed the whole stream into the wire
 
646
                # representation of the first record, which means that
 
647
                # later records have no wire representation: we skip them.
 
648
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
 
649
    yield pack_writer.end()
 
650
 
 
651
 
 
652
class _ByteStreamDecoder(object):
 
653
    """Helper for _byte_stream_to_stream.
 
654
 
 
655
    The expected usage of this class is via the function _byte_stream_to_stream
 
656
    which creates a _ByteStreamDecoder, pops off the stream format and then
 
657
    yields the output of record_stream(), the main entry point to
 
658
    _ByteStreamDecoder.
 
659
 
 
660
    Broadly this class has to unwrap two layers of iterators:
 
661
    (type, substream)
 
662
    (substream details)
 
663
 
 
664
    This is complicated by wishing to return type, iterator_for_type, but
 
665
    getting the data for iterator_for_type when we find out type: we can't
 
666
    simply pass a generator down to the NetworkRecordStream parser, instead
 
667
    we have a little local state to seed each NetworkRecordStream instance,
 
668
    and gather the type that we'll be yielding.
 
669
 
 
670
    :ivar byte_stream: The byte stream being decoded.
 
671
    :ivar stream_decoder: A pack parser used to decode the bytestream
 
672
    :ivar current_type: The current type, used to join adjacent records of the
 
673
        same type into a single stream.
 
674
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
 
675
    """
 
676
 
 
677
    def __init__(self, byte_stream, record_counter):
 
678
        """Create a _ByteStreamDecoder."""
 
679
        self.stream_decoder = pack.ContainerPushParser()
 
680
        self.current_type = None
 
681
        self.first_bytes = None
 
682
        self.byte_stream = byte_stream
 
683
        self._record_counter = record_counter
 
684
        self.key_count = 0
 
685
 
 
686
    def iter_stream_decoder(self):
 
687
        """Iterate the contents of the pack from stream_decoder."""
 
688
        # dequeue pending items
 
689
        for record in self.stream_decoder.read_pending_records():
 
690
            yield record
 
691
        # Pull bytes of the wire, decode them to records, yield those records.
 
692
        for bytes in self.byte_stream:
 
693
            self.stream_decoder.accept_bytes(bytes)
 
694
            for record in self.stream_decoder.read_pending_records():
 
695
                yield record
 
696
 
 
697
    def iter_substream_bytes(self):
 
698
        if self.first_bytes is not None:
 
699
            yield self.first_bytes
 
700
            # If we run out of pack records, single the outer layer to stop.
 
701
            self.first_bytes = None
 
702
        for record in self.iter_pack_records:
 
703
            record_names, record_bytes = record
 
704
            record_name, = record_names
 
705
            substream_type = record_name[0]
 
706
            if substream_type != self.current_type:
 
707
                # end of a substream, seed the next substream.
 
708
                self.current_type = substream_type
 
709
                self.first_bytes = record_bytes
 
710
                return
 
711
            yield record_bytes
 
712
 
 
713
    def record_stream(self):
 
714
        """Yield substream_type, substream from the byte stream."""
 
715
        def wrap_and_count(pb, rc, substream):
 
716
            """Yield records from stream while showing progress."""
 
717
            counter = 0
 
718
            if rc:
 
719
                if self.current_type != 'revisions' and self.key_count != 0:
 
720
                    # As we know the number of revisions now (in self.key_count)
 
721
                    # we can setup and use record_counter (rc).
 
722
                    if not rc.is_initialized():
 
723
                        rc.setup(self.key_count, self.key_count)
 
724
            for record in substream.read():
 
725
                if rc:
 
726
                    if rc.is_initialized() and counter == rc.STEP:
 
727
                        rc.increment(counter)
 
728
                        pb.update('Estimate', rc.current, rc.max)
 
729
                        counter = 0
 
730
                    if self.current_type == 'revisions':
 
731
                        # Total records is proportional to number of revs
 
732
                        # to fetch. With remote, we used self.key_count to
 
733
                        # track the number of revs. Once we have the revs
 
734
                        # counts in self.key_count, the progress bar changes
 
735
                        # from 'Estimating..' to 'Estimate' above.
 
736
                        self.key_count += 1
 
737
                        if counter == rc.STEP:
 
738
                            pb.update('Estimating..', self.key_count)
 
739
                            counter = 0
 
740
                counter += 1
 
741
                yield record
 
742
 
 
743
        self.seed_state()
 
744
        pb = ui.ui_factory.nested_progress_bar()
 
745
        rc = self._record_counter
 
746
        try:
 
747
            # Make and consume sub generators, one per substream type:
 
748
            while self.first_bytes is not None:
 
749
                substream = NetworkRecordStream(self.iter_substream_bytes())
 
750
                # after substream is fully consumed, self.current_type is set
 
751
                # to the next type, and self.first_bytes is set to the matching
 
752
                # bytes.
 
753
                yield self.current_type, wrap_and_count(pb, rc, substream)
 
754
        finally:
 
755
            if rc:
 
756
                pb.update('Done', rc.max, rc.max)
 
757
            pb.finished()
 
758
 
 
759
    def seed_state(self):
 
760
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
 
761
        # Set a single generator we can use to get data from the pack stream.
 
762
        self.iter_pack_records = self.iter_stream_decoder()
 
763
        # Seed the very first subiterator with content; after this each one
 
764
        # seeds the next.
 
765
        list(self.iter_substream_bytes())
 
766
 
 
767
 
 
768
def _byte_stream_to_stream(byte_stream, record_counter=None):
 
769
    """Convert a byte stream into a format and a stream.
 
770
 
 
771
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
 
772
    :return: (RepositoryFormat, stream_generator)
 
773
    """
 
774
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
 
775
    for bytes in byte_stream:
 
776
        decoder.stream_decoder.accept_bytes(bytes)
 
777
        for record in decoder.stream_decoder.read_pending_records(max=1):
 
778
            record_names, src_format_name = record
 
779
            src_format = network_format_registry.get(src_format_name)
 
780
            return src_format, decoder.record_stream()
 
781
 
 
782
 
 
783
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
 
784
 
 
785
    def do_repository_request(self, repository, token):
 
786
        try:
 
787
            repository.lock_write(token=token)
 
788
        except errors.TokenMismatch as e:
 
789
            return FailedSmartServerResponse(('TokenMismatch',))
 
790
        repository.dont_leave_lock_in_place()
 
791
        repository.unlock()
 
792
        return SuccessfulSmartServerResponse(('ok',))
 
793
 
 
794
 
 
795
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
 
796
    """Get the physical lock status for a repository.
 
797
 
 
798
    New in 2.5.
 
799
    """
 
800
 
 
801
    def do_repository_request(self, repository):
 
802
        if repository.get_physical_lock_status():
 
803
            return SuccessfulSmartServerResponse(('yes', ))
 
804
        else:
 
805
            return SuccessfulSmartServerResponse(('no', ))
 
806
 
 
807
 
 
808
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
 
809
 
 
810
    def do_repository_request(self, repository, str_bool_new_value):
 
811
        if str_bool_new_value == 'True':
 
812
            new_value = True
 
813
        else:
 
814
            new_value = False
 
815
        repository.set_make_working_trees(new_value)
 
816
        return SuccessfulSmartServerResponse(('ok',))
 
817
 
 
818
 
 
819
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
 
820
    """Get the raw repository files as a tarball.
 
821
 
 
822
    The returned tarball contains a .bzr control directory which in turn
 
823
    contains a repository.
 
824
 
 
825
    This takes one parameter, compression, which currently must be
 
826
    "", "gz", or "bz2".
 
827
 
 
828
    This is used to implement the Repository.copy_content_into operation.
 
829
    """
 
830
 
 
831
    def do_repository_request(self, repository, compression):
 
832
        tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
 
833
        try:
 
834
            controldir_name = tmp_dirname + '/.bzr'
 
835
            return self._tarfile_response(controldir_name, compression)
 
836
        finally:
 
837
            osutils.rmtree(tmp_dirname)
 
838
 
 
839
    def _copy_to_tempdir(self, from_repo):
 
840
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
 
841
        tmp_bzrdir = from_repo.controldir._format.initialize(tmp_dirname)
 
842
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
 
843
        from_repo.copy_content_into(tmp_repo)
 
844
        return tmp_dirname, tmp_repo
 
845
 
 
846
    def _tarfile_response(self, tmp_dirname, compression):
 
847
        temp = tempfile.NamedTemporaryFile()
 
848
        try:
 
849
            self._tarball_of_dir(tmp_dirname, compression, temp.file)
 
850
            # all finished; write the tempfile out to the network
 
851
            temp.seek(0)
 
852
            return SuccessfulSmartServerResponse(('ok',), temp.read())
 
853
            # FIXME: Don't read the whole thing into memory here; rather stream
 
854
            # it out from the file onto the network. mbp 20070411
 
855
        finally:
 
856
            temp.close()
 
857
 
 
858
    def _tarball_of_dir(self, dirname, compression, ofile):
 
859
        import tarfile
 
860
        filename = os.path.basename(ofile.name)
 
861
        tarball = tarfile.open(fileobj=ofile, name=filename,
 
862
            mode='w|' + compression)
 
863
        try:
 
864
            # The tarball module only accepts ascii names, and (i guess)
 
865
            # packs them with their 8bit names.  We know all the files
 
866
            # within the repository have ASCII names so the should be safe
 
867
            # to pack in.
 
868
            dirname = dirname.encode(sys.getfilesystemencoding())
 
869
            # python's tarball module includes the whole path by default so
 
870
            # override it
 
871
            if not dirname.endswith('.bzr'):
 
872
                raise ValueError(dirname)
 
873
            tarball.add(dirname, '.bzr') # recursive by default
 
874
        finally:
 
875
            tarball.close()
 
876
 
 
877
 
 
878
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
 
879
    """Insert a record stream from a RemoteSink into a repository.
 
880
 
 
881
    This gets bytes pushed to it by the network infrastructure and turns that
 
882
    into a bytes iterator using a thread. That is then processed by
 
883
    _byte_stream_to_stream.
 
884
 
 
885
    New in 1.14.
 
886
    """
 
887
 
 
888
    def do_repository_request(self, repository, resume_tokens, lock_token):
 
889
        """StreamSink.insert_stream for a remote repository."""
 
890
        repository.lock_write(token=lock_token)
 
891
        self.do_insert_stream_request(repository, resume_tokens)
 
892
 
 
893
    def do_insert_stream_request(self, repository, resume_tokens):
 
894
        tokens = [token for token in resume_tokens.split(' ') if token]
 
895
        self.tokens = tokens
 
896
        self.repository = repository
 
897
        self.queue = queue.Queue()
 
898
        self.insert_thread = threading.Thread(target=self._inserter_thread)
 
899
        self.insert_thread.start()
 
900
 
 
901
    def do_chunk(self, body_stream_chunk):
 
902
        self.queue.put(body_stream_chunk)
 
903
 
 
904
    def _inserter_thread(self):
 
905
        try:
 
906
            src_format, stream = _byte_stream_to_stream(
 
907
                self.blocking_byte_stream())
 
908
            self.insert_result = self.repository._get_sink().insert_stream(
 
909
                stream, src_format, self.tokens)
 
910
            self.insert_ok = True
 
911
        except:
 
912
            self.insert_exception = sys.exc_info()
 
913
            self.insert_ok = False
 
914
 
 
915
    def blocking_byte_stream(self):
 
916
        while True:
 
917
            bytes = self.queue.get()
 
918
            if bytes is StopIteration:
 
919
                return
 
920
            else:
 
921
                yield bytes
 
922
 
 
923
    def do_end(self):
 
924
        self.queue.put(StopIteration)
 
925
        if self.insert_thread is not None:
 
926
            self.insert_thread.join()
 
927
        if not self.insert_ok:
 
928
            try:
 
929
                reraise(*self.insert_exception)
 
930
            finally:
 
931
                del self.insert_exception
 
932
        write_group_tokens, missing_keys = self.insert_result
 
933
        if write_group_tokens or missing_keys:
 
934
            # bzip needed? missing keys should typically be a small set.
 
935
            # Should this be a streaming body response ?
 
936
            missing_keys = sorted(missing_keys)
 
937
            bytes = bencode.bencode((write_group_tokens, missing_keys))
 
938
            self.repository.unlock()
 
939
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
 
940
        else:
 
941
            self.repository.unlock()
 
942
            return SuccessfulSmartServerResponse(('ok', ))
 
943
 
 
944
 
 
945
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
 
946
    """Insert a record stream from a RemoteSink into a repository.
 
947
 
 
948
    Same as SmartServerRepositoryInsertStreamLocked, except:
 
949
     - the lock token argument is optional
 
950
     - servers that implement this verb accept 'inventory-delta' records in the
 
951
       stream.
 
952
 
 
953
    New in 1.19.
 
954
    """
 
955
 
 
956
    def do_repository_request(self, repository, resume_tokens, lock_token=None):
 
957
        """StreamSink.insert_stream for a remote repository."""
 
958
        SmartServerRepositoryInsertStreamLocked.do_repository_request(
 
959
            self, repository, resume_tokens, lock_token)
 
960
 
 
961
 
 
962
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
 
963
    """Insert a record stream from a RemoteSink into an unlocked repository.
 
964
 
 
965
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
 
966
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
 
967
    like pack format) repository.
 
968
 
 
969
    New in 1.13.
 
970
    """
 
971
 
 
972
    def do_repository_request(self, repository, resume_tokens):
 
973
        """StreamSink.insert_stream for a remote repository."""
 
974
        repository.lock_write()
 
975
        self.do_insert_stream_request(repository, resume_tokens)
 
976
 
 
977
 
 
978
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
 
979
    """Add a revision signature text.
 
980
 
 
981
    New in 2.5.
 
982
    """
 
983
 
 
984
    def do_repository_request(self, repository, lock_token, revision_id,
 
985
            *write_group_tokens):
 
986
        """Add a revision signature text.
 
987
 
 
988
        :param repository: Repository to operate on
 
989
        :param lock_token: Lock token
 
990
        :param revision_id: Revision for which to add signature
 
991
        :param write_group_tokens: Write group tokens
 
992
        """
 
993
        self._lock_token = lock_token
 
994
        self._revision_id = revision_id
 
995
        self._write_group_tokens = write_group_tokens
 
996
        return None
 
997
 
 
998
    def do_body(self, body_bytes):
 
999
        """Add a signature text.
 
1000
 
 
1001
        :param body_bytes: GPG signature text
 
1002
        :return: SuccessfulSmartServerResponse with arguments 'ok' and
 
1003
            the list of new write group tokens.
 
1004
        """
 
1005
        self._repository.lock_write(token=self._lock_token)
 
1006
        try:
 
1007
            self._repository.resume_write_group(self._write_group_tokens)
 
1008
            try:
 
1009
                self._repository.add_signature_text(self._revision_id,
 
1010
                    body_bytes)
 
1011
            finally:
 
1012
                new_write_group_tokens = self._repository.suspend_write_group()
 
1013
        finally:
 
1014
            self._repository.unlock()
 
1015
        return SuccessfulSmartServerResponse(
 
1016
            ('ok', ) + tuple(new_write_group_tokens))
 
1017
 
 
1018
 
 
1019
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
 
1020
    """Start a write group.
 
1021
 
 
1022
    New in 2.5.
 
1023
    """
 
1024
 
 
1025
    def do_repository_request(self, repository, lock_token):
 
1026
        """Start a write group."""
 
1027
        repository.lock_write(token=lock_token)
 
1028
        try:
 
1029
            repository.start_write_group()
 
1030
            try:
 
1031
                tokens = repository.suspend_write_group()
 
1032
            except errors.UnsuspendableWriteGroup:
 
1033
                return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
 
1034
        finally:
 
1035
            repository.unlock()
 
1036
        return SuccessfulSmartServerResponse(('ok', tokens))
 
1037
 
 
1038
 
 
1039
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
 
1040
    """Commit a write group.
 
1041
 
 
1042
    New in 2.5.
 
1043
    """
 
1044
 
 
1045
    def do_repository_request(self, repository, lock_token,
 
1046
            write_group_tokens):
 
1047
        """Commit a write group."""
 
1048
        repository.lock_write(token=lock_token)
 
1049
        try:
 
1050
            try:
 
1051
                repository.resume_write_group(write_group_tokens)
 
1052
            except errors.UnresumableWriteGroup as e:
 
1053
                return FailedSmartServerResponse(
 
1054
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1055
            try:
 
1056
                repository.commit_write_group()
 
1057
            except:
 
1058
                write_group_tokens = repository.suspend_write_group()
 
1059
                # FIXME JRV 2011-11-19: What if the write_group_tokens
 
1060
                # have changed?
 
1061
                raise
 
1062
        finally:
 
1063
            repository.unlock()
 
1064
        return SuccessfulSmartServerResponse(('ok', ))
 
1065
 
 
1066
 
 
1067
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
 
1068
    """Abort a write group.
 
1069
 
 
1070
    New in 2.5.
 
1071
    """
 
1072
 
 
1073
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1074
        """Abort a write group."""
 
1075
        repository.lock_write(token=lock_token)
 
1076
        try:
 
1077
            try:
 
1078
                repository.resume_write_group(write_group_tokens)
 
1079
            except errors.UnresumableWriteGroup as e:
 
1080
                return FailedSmartServerResponse(
 
1081
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1082
                repository.abort_write_group()
 
1083
        finally:
 
1084
            repository.unlock()
 
1085
        return SuccessfulSmartServerResponse(('ok', ))
 
1086
 
 
1087
 
 
1088
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
 
1089
    """Check that a write group is still valid.
 
1090
 
 
1091
    New in 2.5.
 
1092
    """
 
1093
 
 
1094
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1095
        """Abort a write group."""
 
1096
        repository.lock_write(token=lock_token)
 
1097
        try:
 
1098
            try:
 
1099
                repository.resume_write_group(write_group_tokens)
 
1100
            except errors.UnresumableWriteGroup as e:
 
1101
                return FailedSmartServerResponse(
 
1102
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1103
            else:
 
1104
                repository.suspend_write_group()
 
1105
        finally:
 
1106
            repository.unlock()
 
1107
        return SuccessfulSmartServerResponse(('ok', ))
 
1108
 
 
1109
 
 
1110
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
 
1111
    """Retrieve all of the revision ids in a repository.
 
1112
 
 
1113
    New in 2.5.
 
1114
    """
 
1115
 
 
1116
    def do_repository_request(self, repository):
 
1117
        revids = repository.all_revision_ids()
 
1118
        return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
 
1119
 
 
1120
 
 
1121
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
 
1122
    """Reconcile a repository.
 
1123
 
 
1124
    New in 2.5.
 
1125
    """
 
1126
 
 
1127
    def do_repository_request(self, repository, lock_token):
 
1128
        try:
 
1129
            repository.lock_write(token=lock_token)
 
1130
        except errors.TokenLockingNotSupported as e:
 
1131
            return FailedSmartServerResponse(
 
1132
                ('TokenLockingNotSupported', ))
 
1133
        try:
 
1134
            reconciler = repository.reconcile()
 
1135
        finally:
 
1136
            repository.unlock()
 
1137
        body = [
 
1138
            "garbage_inventories: %d\n" % reconciler.garbage_inventories,
 
1139
            "inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
 
1140
            ]
 
1141
        return SuccessfulSmartServerResponse(('ok', ), "".join(body))
 
1142
 
 
1143
 
 
1144
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
 
1145
    """Pack a repository.
 
1146
 
 
1147
    New in 2.5.
 
1148
    """
 
1149
 
 
1150
    def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
 
1151
        self._repository = repository
 
1152
        self._lock_token = lock_token
 
1153
        if clean_obsolete_packs == 'True':
 
1154
            self._clean_obsolete_packs = True
 
1155
        else:
 
1156
            self._clean_obsolete_packs = False
 
1157
        return None
 
1158
 
 
1159
    def do_body(self, body_bytes):
 
1160
        if body_bytes == "":
 
1161
            hint = None
 
1162
        else:
 
1163
            hint = body_bytes.splitlines()
 
1164
        self._repository.lock_write(token=self._lock_token)
 
1165
        try:
 
1166
            self._repository.pack(hint, self._clean_obsolete_packs)
 
1167
        finally:
 
1168
            self._repository.unlock()
 
1169
        return SuccessfulSmartServerResponse(("ok", ), )
 
1170
 
 
1171
 
 
1172
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
 
1173
    """Iterate over the contents of files.
 
1174
 
 
1175
    The client sends a list of desired files to stream, one
 
1176
    per line, and as tuples of file id and revision, separated by
 
1177
    \0.
 
1178
 
 
1179
    The server replies with a stream. Each entry is preceded by a header,
 
1180
    which can either be:
 
1181
 
 
1182
    * "ok\x00IDX\n" where IDX is the index of the entry in the desired files
 
1183
        list sent by the client. This header is followed by the contents of
 
1184
        the file, bzip2-compressed.
 
1185
    * "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
 
1186
        The client can then raise an appropriate RevisionNotPresent error
 
1187
        or check its fallback repositories.
 
1188
 
 
1189
    New in 2.5.
 
1190
    """
 
1191
 
 
1192
    def body_stream(self, repository, desired_files):
 
1193
        self._repository.lock_read()
 
1194
        try:
 
1195
            text_keys = {}
 
1196
            for i, key in enumerate(desired_files):
 
1197
                text_keys[key] = i
 
1198
            for record in repository.texts.get_record_stream(text_keys,
 
1199
                    'unordered', True):
 
1200
                identifier = text_keys[record.key]
 
1201
                if record.storage_kind == 'absent':
 
1202
                    yield "absent\0%s\0%s\0%d\n" % (record.key[0],
 
1203
                        record.key[1], identifier)
 
1204
                    # FIXME: Way to abort early?
 
1205
                    continue
 
1206
                yield "ok\0%d\n" % identifier
 
1207
                compressor = zlib.compressobj()
 
1208
                for bytes in record.get_bytes_as('chunked'):
 
1209
                    data = compressor.compress(bytes)
 
1210
                    if data:
 
1211
                        yield data
 
1212
                data = compressor.flush()
 
1213
                if data:
 
1214
                    yield data
 
1215
        finally:
 
1216
            self._repository.unlock()
 
1217
 
 
1218
    def do_body(self, body_bytes):
 
1219
        desired_files = [
 
1220
            tuple(l.split("\0")) for l in body_bytes.splitlines()]
 
1221
        return SuccessfulSmartServerResponse(('ok', ),
 
1222
            body_stream=self.body_stream(self._repository, desired_files))
 
1223
 
 
1224
    def do_repository_request(self, repository):
 
1225
        # Signal that we want a body
 
1226
        return None
 
1227
 
 
1228
 
 
1229
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
 
1230
    """Stream a list of revisions.
 
1231
 
 
1232
    The client sends a list of newline-separated revision ids in the
 
1233
    body of the request and the server replies with the serializer format,
 
1234
    and a stream of bzip2-compressed revision texts (using the specified
 
1235
    serializer format).
 
1236
 
 
1237
    Any revisions the server does not have are omitted from the stream.
 
1238
 
 
1239
    New in 2.5.
 
1240
    """
 
1241
 
 
1242
    def do_repository_request(self, repository):
 
1243
        self._repository = repository
 
1244
        # Signal there is a body
 
1245
        return None
 
1246
 
 
1247
    def do_body(self, body_bytes):
 
1248
        revision_ids = body_bytes.split("\n")
 
1249
        return SuccessfulSmartServerResponse(
 
1250
            ('ok', self._repository.get_serializer_format()),
 
1251
            body_stream=self.body_stream(self._repository, revision_ids))
 
1252
 
 
1253
    def body_stream(self, repository, revision_ids):
 
1254
        self._repository.lock_read()
 
1255
        try:
 
1256
            for record in repository.revisions.get_record_stream(
 
1257
                [(revid,) for revid in revision_ids], 'unordered', True):
 
1258
                if record.storage_kind == 'absent':
 
1259
                    continue
 
1260
                yield zlib.compress(record.get_bytes_as('fulltext'))
 
1261
        finally:
 
1262
            self._repository.unlock()
 
1263
 
 
1264
 
 
1265
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
 
1266
    """Get the inventory deltas for a set of revision ids.
 
1267
 
 
1268
    This accepts a list of revision ids, and then sends a chain
 
1269
    of deltas for the inventories of those revisions. The first
 
1270
    revision will be empty.
 
1271
 
 
1272
    The server writes back zlibbed serialized inventory deltas,
 
1273
    in the ordering specified. The base for each delta is the
 
1274
    inventory generated by the previous delta.
 
1275
 
 
1276
    New in 2.5.
 
1277
    """
 
1278
 
 
1279
    def _inventory_delta_stream(self, repository, ordering, revids):
 
1280
        prev_inv = _mod_inventory.Inventory(root_id=None,
 
1281
            revision_id=_mod_revision.NULL_REVISION)
 
1282
        serializer = inventory_delta.InventoryDeltaSerializer(
 
1283
            repository.supports_rich_root(),
 
1284
            repository._format.supports_tree_reference)
 
1285
        repository.lock_read()
 
1286
        try:
 
1287
            for inv, revid in repository._iter_inventories(revids, ordering):
 
1288
                if inv is None:
 
1289
                    continue
 
1290
                inv_delta = inv._make_delta(prev_inv)
 
1291
                lines = serializer.delta_to_lines(
 
1292
                    prev_inv.revision_id, inv.revision_id, inv_delta)
 
1293
                yield ChunkedContentFactory(inv.revision_id, None, None, lines)
 
1294
                prev_inv = inv
 
1295
        finally:
 
1296
            repository.unlock()
 
1297
 
 
1298
    def body_stream(self, repository, ordering, revids):
 
1299
        substream = self._inventory_delta_stream(repository,
 
1300
            ordering, revids)
 
1301
        return _stream_to_byte_stream([('inventory-deltas', substream)],
 
1302
            repository._format)
 
1303
 
 
1304
    def do_body(self, body_bytes):
 
1305
        return SuccessfulSmartServerResponse(('ok', ),
 
1306
            body_stream=self.body_stream(self._repository, self._ordering,
 
1307
                body_bytes.splitlines()))
 
1308
 
 
1309
    def do_repository_request(self, repository, ordering):
 
1310
        if ordering == 'unordered':
 
1311
            # inventory deltas for a topologically sorted stream
 
1312
            # are likely to be smaller
 
1313
            ordering = 'topological'
 
1314
        self._ordering = ordering
 
1315
        # Signal that we want a body
 
1316
        return None