/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/smart/repository.py

  • Committer: Jelmer Vernooij
  • Date: 2020-05-24 00:42:36 UTC
  • mto: This revision was merged to the branch mainline in revision 7505.
  • Revision ID: jelmer@jelmer.uk-20200524004236-jdj6obo4k5lznqw2
Cleanup Windows functions.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
"""Server-side repository related request implmentations."""
 
17
"""Server-side repository related request implementations."""
18
18
 
19
19
import bz2
 
20
import itertools
20
21
import os
21
 
import Queue
 
22
import queue
22
23
import sys
23
24
import tempfile
24
25
import threading
 
26
import zlib
25
27
 
26
 
from bzrlib import (
 
28
from ... import (
27
29
    bencode,
28
30
    errors,
29
 
    graph,
 
31
    estimate_compressed_size,
30
32
    osutils,
 
33
    trace,
 
34
    ui,
 
35
    )
 
36
from .. import (
 
37
    inventory as _mod_inventory,
 
38
    inventory_delta,
31
39
    pack,
32
 
    ui,
33
 
    versionedfile,
 
40
    vf_search,
34
41
    )
35
 
from bzrlib.bzrdir import BzrDir
36
 
from bzrlib.smart.request import (
 
42
from ..bzrdir import BzrDir
 
43
from .request import (
37
44
    FailedSmartServerResponse,
38
45
    SmartServerRequest,
39
46
    SuccessfulSmartServerResponse,
40
47
    )
41
 
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
42
 
from bzrlib import revision as _mod_revision
43
 
from bzrlib.versionedfile import (
 
48
from ...repository import (
 
49
    _strip_NULL_ghosts,
 
50
    network_format_registry,
 
51
    )
 
52
from ... import revision as _mod_revision
 
53
from ..versionedfile import (
 
54
    ChunkedContentFactory,
44
55
    NetworkRecordStream,
45
56
    record_to_fulltext_bytes,
46
57
    )
82
93
            recreate_search trusts that clients will look for missing things
83
94
            they expected and get it from elsewhere.
84
95
        """
85
 
        lines = search_bytes.split('\n')
86
 
        if lines[0] == 'ancestry-of':
 
96
        if search_bytes == b'everything':
 
97
            return vf_search.EverythingResult(repository), None
 
98
        lines = search_bytes.split(b'\n')
 
99
        if lines[0] == b'ancestry-of':
87
100
            heads = lines[1:]
88
 
            search_result = graph.PendingAncestryResult(heads, repository)
 
101
            search_result = vf_search.PendingAncestryResult(heads, repository)
89
102
            return search_result, None
90
 
        elif lines[0] == 'search':
 
103
        elif lines[0] == b'search':
91
104
            return self.recreate_search_from_recipe(repository, lines[1:],
92
 
                discard_excess=discard_excess)
 
105
                                                    discard_excess=discard_excess)
93
106
        else:
94
 
            return (None, FailedSmartServerResponse(('BadSearch',)))
 
107
            return (None, FailedSmartServerResponse((b'BadSearch',)))
95
108
 
96
109
    def recreate_search_from_recipe(self, repository, lines,
97
 
        discard_excess=False):
 
110
                                    discard_excess=False):
98
111
        """Recreate a specific revision search (vs a from-tip search).
99
112
 
100
113
        :param discard_excess: If True, and the search refers to data we don't
102
115
            recreate_search trusts that clients will look for missing things
103
116
            they expected and get it from elsewhere.
104
117
        """
105
 
        start_keys = set(lines[0].split(' '))
106
 
        exclude_keys = set(lines[1].split(' '))
107
 
        revision_count = int(lines[2])
108
 
        repository.lock_read()
109
 
        try:
 
118
        start_keys = set(lines[0].split(b' '))
 
119
        exclude_keys = set(lines[1].split(b' '))
 
120
        revision_count = int(lines[2].decode('ascii'))
 
121
        with repository.lock_read():
110
122
            search = repository.get_graph()._make_breadth_first_searcher(
111
123
                start_keys)
112
124
            while True:
113
125
                try:
114
 
                    next_revs = search.next()
 
126
                    next_revs = next(search)
115
127
                except StopIteration:
116
128
                    break
117
129
                search.stop_searching_any(exclude_keys.intersection(next_revs))
118
 
            search_result = search.get_result()
119
 
            if (not discard_excess and
120
 
                search_result.get_recipe()[3] != revision_count):
 
130
            (started_keys, excludes, included_keys) = search.get_state()
 
131
            if (not discard_excess and len(included_keys) != revision_count):
121
132
                # we got back a different amount of data than expected, this
122
133
                # gets reported as NoSuchRevision, because less revisions
123
134
                # indicates missing revisions, and more should never happen as
124
135
                # the excludes list considers ghosts and ensures that ghost
125
136
                # filling races are not a problem.
126
 
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
137
                return (None, FailedSmartServerResponse((b'NoSuchRevision',)))
 
138
            search_result = vf_search.SearchResult(started_keys, excludes,
 
139
                                                   len(included_keys), included_keys)
127
140
            return (search_result, None)
128
 
        finally:
129
 
            repository.unlock()
130
141
 
131
142
 
132
143
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
134
145
 
135
146
    def do_repository_request(self, repository, *args):
136
147
        """Read lock a repository for do_readlocked_repository_request."""
137
 
        repository.lock_read()
138
 
        try:
 
148
        with repository.lock_read():
139
149
            return self.do_readlocked_repository_request(repository, *args)
140
 
        finally:
141
 
            repository.unlock()
 
150
 
 
151
 
 
152
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
 
153
    """Break a repository lock."""
 
154
 
 
155
    def do_repository_request(self, repository):
 
156
        repository.break_lock()
 
157
        return SuccessfulSmartServerResponse((b'ok', ))
 
158
 
 
159
 
 
160
_lsprof_count = 0
142
161
 
143
162
 
144
163
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
162
181
        :param revision_ids: The utf8 encoded revision_id to answer for.
163
182
        """
164
183
        self._revision_ids = revision_ids
165
 
        return None # Signal that we want a body.
 
184
        return None  # Signal that we want a body.
166
185
 
167
186
    def do_body(self, body_bytes):
168
187
        """Process the current search state and perform the parent lookup.
173
192
            compressed.
174
193
        """
175
194
        repository = self._repository
176
 
        repository.lock_read()
177
 
        try:
 
195
        with repository.lock_read():
178
196
            return self._do_repository_request(body_bytes)
179
 
        finally:
180
 
            repository.unlock()
181
197
 
182
 
    def _do_repository_request(self, body_bytes):
183
 
        repository = self._repository
184
 
        revision_ids = set(self._revision_ids)
185
 
        include_missing = 'include-missing:' in revision_ids
186
 
        if include_missing:
187
 
            revision_ids.remove('include-missing:')
188
 
        body_lines = body_bytes.split('\n')
189
 
        search_result, error = self.recreate_search_from_recipe(
190
 
            repository, body_lines)
191
 
        if error is not None:
192
 
            return error
193
 
        # TODO might be nice to start up the search again; but thats not
194
 
        # written or tested yet.
195
 
        client_seen_revs = set(search_result.get_keys())
196
 
        # Always include the requested ids.
197
 
        client_seen_revs.difference_update(revision_ids)
198
 
        lines = []
199
 
        repo_graph = repository.get_graph()
 
198
    def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
 
199
                               include_missing, max_size=65536):
200
200
        result = {}
201
201
        queried_revs = set()
202
 
        size_so_far = 0
 
202
        estimator = estimate_compressed_size.ZLibEstimator(max_size)
203
203
        next_revs = revision_ids
204
204
        first_loop_done = False
205
205
        while next_revs:
219
219
                    encoded_id = revision_id
220
220
                else:
221
221
                    missing_rev = True
222
 
                    encoded_id = "missing:" + revision_id
 
222
                    encoded_id = b"missing:" + revision_id
223
223
                    parents = []
224
 
                if (revision_id not in client_seen_revs and
225
 
                    (not missing_rev or include_missing)):
 
224
                if (revision_id not in client_seen_revs
 
225
                        and (not missing_rev or include_missing)):
226
226
                    # Client does not have this revision, give it to it.
227
227
                    # add parents to the result
228
228
                    result[encoded_id] = parents
229
229
                    # Approximate the serialized cost of this revision_id.
230
 
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
 
230
                    line = encoded_id + b' ' + b' '.join(parents) + b'\n'
 
231
                    estimator.add_content(line)
231
232
            # get all the directly asked for parents, and then flesh out to
232
233
            # 64K (compressed) or so. We do one level of depth at a time to
233
234
            # stay in sync with the client. The 250000 magic number is
234
235
            # estimated compression ratio taken from bzr.dev itself.
235
 
            if self.no_extra_results or (
236
 
                first_loop_done and size_so_far > 250000):
 
236
            if self.no_extra_results or (first_loop_done and estimator.full()):
 
237
                trace.mutter('size: %d, z_size: %d'
 
238
                             % (estimator._uncompressed_size_added,
 
239
                                estimator._compressed_size_added))
237
240
                next_revs = set()
238
241
                break
239
242
            # don't query things we've already queried
240
 
            next_revs.difference_update(queried_revs)
 
243
            next_revs = next_revs.difference(queried_revs)
241
244
            first_loop_done = True
 
245
        return result
 
246
 
 
247
    def _do_repository_request(self, body_bytes):
 
248
        repository = self._repository
 
249
        revision_ids = set(self._revision_ids)
 
250
        include_missing = b'include-missing:' in revision_ids
 
251
        if include_missing:
 
252
            revision_ids.remove(b'include-missing:')
 
253
        body_lines = body_bytes.split(b'\n')
 
254
        search_result, error = self.recreate_search_from_recipe(
 
255
            repository, body_lines)
 
256
        if error is not None:
 
257
            return error
 
258
        # TODO might be nice to start up the search again; but thats not
 
259
        # written or tested yet.
 
260
        client_seen_revs = set(search_result.get_keys())
 
261
        # Always include the requested ids.
 
262
        client_seen_revs.difference_update(revision_ids)
 
263
 
 
264
        repo_graph = repository.get_graph()
 
265
        result = self._expand_requested_revs(repo_graph, revision_ids,
 
266
                                             client_seen_revs, include_missing)
242
267
 
243
268
        # sorting trivially puts lexographically similar revision ids together.
244
269
        # Compression FTW.
 
270
        lines = []
245
271
        for revision, parents in sorted(result.items()):
246
 
            lines.append(' '.join((revision, ) + tuple(parents)))
 
272
            lines.append(b' '.join((revision, ) + tuple(parents)))
247
273
 
248
274
        return SuccessfulSmartServerResponse(
249
 
            ('ok', ), bz2.compress('\n'.join(lines)))
 
275
            (b'ok', ), bz2.compress(b'\n'.join(lines)))
250
276
 
251
277
 
252
278
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
271
297
        else:
272
298
            search_ids = repository.all_revision_ids()
273
299
        search = graph._make_breadth_first_searcher(search_ids)
274
 
        transitive_ids = set()
275
 
        map(transitive_ids.update, list(search))
 
300
        transitive_ids = set(itertools.chain.from_iterable(search))
276
301
        parent_map = graph.get_parent_map(transitive_ids)
277
302
        revision_graph = _strip_NULL_ghosts(parent_map)
278
303
        if revision_id and revision_id not in revision_graph:
279
304
            # Note that we return an empty body, rather than omitting the body.
280
305
            # This way the client knows that it can always expect to find a body
281
306
            # in the response for this method, even in the error case.
282
 
            return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
 
307
            return FailedSmartServerResponse((b'nosuchrevision', revision_id), b'')
283
308
 
284
309
        for revision, parents in revision_graph.items():
285
 
            lines.append(' '.join((revision, ) + tuple(parents)))
 
310
            lines.append(b' '.join((revision, ) + tuple(parents)))
286
311
 
287
 
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
 
312
        return SuccessfulSmartServerResponse((b'ok', ), b'\n'.join(lines))
288
313
 
289
314
 
290
315
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
291
316
 
292
317
    def do_readlocked_repository_request(self, repository, revno,
293
 
            known_pair):
 
318
                                         known_pair):
294
319
        """Find the revid for a given revno, given a known revno/revid pair.
295
 
        
 
320
 
296
321
        New in 1.17.
297
322
        """
298
323
        try:
299
 
            found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
300
 
        except errors.RevisionNotPresent, err:
301
 
            if err.revision_id != known_pair[1]:
 
324
            found_flag, result = repository.get_rev_id_for_revno(
 
325
                revno, known_pair)
 
326
        except errors.NoSuchRevision as err:
 
327
            if err.revision != known_pair[1]:
302
328
                raise AssertionError(
303
329
                    'get_rev_id_for_revno raised RevisionNotPresent for '
304
 
                    'non-initial revision: ' + err.revision_id)
305
 
            return FailedSmartServerResponse(
306
 
                ('nosuchrevision', err.revision_id))
 
330
                    'non-initial revision: ' + err.revision)
 
331
            return FailedSmartServerResponse(
 
332
                (b'nosuchrevision', err.revision))
 
333
        except errors.RevnoOutOfBounds as e:
 
334
            return FailedSmartServerResponse(
 
335
                (b'revno-outofbounds', e.revno, e.minimum, e.maximum))
307
336
        if found_flag:
308
 
            return SuccessfulSmartServerResponse(('ok', result))
 
337
            return SuccessfulSmartServerResponse((b'ok', result))
309
338
        else:
310
339
            earliest_revno, earliest_revid = result
311
340
            return SuccessfulSmartServerResponse(
312
 
                ('history-incomplete', earliest_revno, earliest_revid))
 
341
                (b'history-incomplete', earliest_revno, earliest_revid))
 
342
 
 
343
 
 
344
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
 
345
 
 
346
    def do_repository_request(self, repository):
 
347
        """Return the serializer format for this repository.
 
348
 
 
349
        New in 2.5.0.
 
350
 
 
351
        :param repository: The repository to query
 
352
        :return: A smart server response (b'ok', FORMAT)
 
353
        """
 
354
        serializer = repository.get_serializer_format()
 
355
        return SuccessfulSmartServerResponse((b'ok', serializer))
313
356
 
314
357
 
315
358
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
319
362
 
320
363
        :param repository: The repository to query in.
321
364
        :param revision_id: The utf8 encoded revision_id to lookup.
322
 
        :return: A smart server response of ('ok', ) if the revision is
323
 
            present.
 
365
        :return: A smart server response of ('yes', ) if the revision is
 
366
            present. ('no', ) if it is missing.
324
367
        """
325
368
        if repository.has_revision(revision_id):
326
 
            return SuccessfulSmartServerResponse(('yes', ))
 
369
            return SuccessfulSmartServerResponse((b'yes', ))
327
370
        else:
328
 
            return SuccessfulSmartServerResponse(('no', ))
 
371
            return SuccessfulSmartServerResponse((b'no', ))
 
372
 
 
373
 
 
374
class SmartServerRequestHasSignatureForRevisionId(
 
375
        SmartServerRepositoryRequest):
 
376
 
 
377
    def do_repository_request(self, repository, revision_id):
 
378
        """Return ok if a signature is present for a revision.
 
379
 
 
380
        Introduced in bzr 2.5.0.
 
381
 
 
382
        :param repository: The repository to query in.
 
383
        :param revision_id: The utf8 encoded revision_id to lookup.
 
384
        :return: A smart server response of ('yes', ) if a
 
385
            signature for the revision is present,
 
386
            ('no', ) if it is missing.
 
387
        """
 
388
        try:
 
389
            if repository.has_signature_for_revision_id(revision_id):
 
390
                return SuccessfulSmartServerResponse((b'yes', ))
 
391
            else:
 
392
                return SuccessfulSmartServerResponse((b'no', ))
 
393
        except errors.NoSuchRevision:
 
394
            return FailedSmartServerResponse(
 
395
                (b'nosuchrevision', revision_id))
329
396
 
330
397
 
331
398
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
337
404
        :param revid: utf8 encoded rev id or an empty string to indicate None
338
405
        :param committers: 'yes' or 'no'.
339
406
 
340
 
        :return: A SmartServerResponse ('ok',), a encoded body looking like
 
407
        :return: A SmartServerResponse (b'ok',), a encoded body looking like
341
408
              committers: 1
342
409
              firstrev: 1234.230 0
343
410
              latestrev: 345.700 3600
345
412
 
346
413
              But containing only fields returned by the gather_stats() call
347
414
        """
348
 
        if revid == '':
 
415
        if revid == b'':
349
416
            decoded_revision_id = None
350
417
        else:
351
418
            decoded_revision_id = revid
352
 
        if committers == 'yes':
 
419
        if committers == b'yes':
353
420
            decoded_committers = True
354
421
        else:
355
422
            decoded_committers = None
356
 
        stats = repository.gather_stats(decoded_revision_id, decoded_committers)
357
 
 
358
 
        body = ''
359
 
        if stats.has_key('committers'):
360
 
            body += 'committers: %d\n' % stats['committers']
361
 
        if stats.has_key('firstrev'):
362
 
            body += 'firstrev: %.3f %d\n' % stats['firstrev']
363
 
        if stats.has_key('latestrev'):
364
 
             body += 'latestrev: %.3f %d\n' % stats['latestrev']
365
 
        if stats.has_key('revisions'):
366
 
            body += 'revisions: %d\n' % stats['revisions']
367
 
        if stats.has_key('size'):
368
 
            body += 'size: %d\n' % stats['size']
369
 
 
370
 
        return SuccessfulSmartServerResponse(('ok', ), body)
 
423
        try:
 
424
            stats = repository.gather_stats(decoded_revision_id,
 
425
                                            decoded_committers)
 
426
        except errors.NoSuchRevision:
 
427
            return FailedSmartServerResponse((b'nosuchrevision', revid))
 
428
 
 
429
        body = b''
 
430
        if 'committers' in stats:
 
431
            body += b'committers: %d\n' % stats['committers']
 
432
        if 'firstrev' in stats:
 
433
            body += b'firstrev: %.3f %d\n' % stats['firstrev']
 
434
        if 'latestrev' in stats:
 
435
            body += b'latestrev: %.3f %d\n' % stats['latestrev']
 
436
        if 'revisions' in stats:
 
437
            body += b'revisions: %d\n' % stats['revisions']
 
438
        if 'size' in stats:
 
439
            body += b'size: %d\n' % stats['size']
 
440
 
 
441
        return SuccessfulSmartServerResponse((b'ok', ), body)
 
442
 
 
443
 
 
444
class SmartServerRepositoryGetRevisionSignatureText(
 
445
        SmartServerRepositoryRequest):
 
446
    """Return the signature text of a revision.
 
447
 
 
448
    New in 2.5.
 
449
    """
 
450
 
 
451
    def do_repository_request(self, repository, revision_id):
 
452
        """Return the result of repository.get_signature_text().
 
453
 
 
454
        :param repository: The repository to query in.
 
455
        :return: A smart server response of with the signature text as
 
456
            body.
 
457
        """
 
458
        try:
 
459
            text = repository.get_signature_text(revision_id)
 
460
        except errors.NoSuchRevision as err:
 
461
            return FailedSmartServerResponse(
 
462
                (b'nosuchrevision', err.revision))
 
463
        return SuccessfulSmartServerResponse((b'ok', ), text)
371
464
 
372
465
 
373
466
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
380
473
            shared, and ('no', ) if it is not.
381
474
        """
382
475
        if repository.is_shared():
383
 
            return SuccessfulSmartServerResponse(('yes', ))
384
 
        else:
385
 
            return SuccessfulSmartServerResponse(('no', ))
 
476
            return SuccessfulSmartServerResponse((b'yes', ))
 
477
        else:
 
478
            return SuccessfulSmartServerResponse((b'no', ))
 
479
 
 
480
 
 
481
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
 
482
 
 
483
    def do_repository_request(self, repository):
 
484
        """Return the result of repository.make_working_trees().
 
485
 
 
486
        Introduced in bzr 2.5.0.
 
487
 
 
488
        :param repository: The repository to query in.
 
489
        :return: A smart server response of ('yes', ) if the repository uses
 
490
            working trees, and ('no', ) if it is not.
 
491
        """
 
492
        if repository.make_working_trees():
 
493
            return SuccessfulSmartServerResponse((b'yes', ))
 
494
        else:
 
495
            return SuccessfulSmartServerResponse((b'no', ))
386
496
 
387
497
 
388
498
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
389
499
 
390
 
    def do_repository_request(self, repository, token=''):
 
500
    def do_repository_request(self, repository, token=b''):
391
501
        # XXX: this probably should not have a token.
392
 
        if token == '':
 
502
        if token == b'':
393
503
            token = None
394
504
        try:
395
505
            token = repository.lock_write(token=token).repository_token
396
 
        except errors.LockContention, e:
397
 
            return FailedSmartServerResponse(('LockContention',))
 
506
        except errors.LockContention as e:
 
507
            return FailedSmartServerResponse((b'LockContention',))
398
508
        except errors.UnlockableTransport:
399
 
            return FailedSmartServerResponse(('UnlockableTransport',))
400
 
        except errors.LockFailed, e:
401
 
            return FailedSmartServerResponse(('LockFailed',
402
 
                str(e.lock), str(e.why)))
 
509
            return FailedSmartServerResponse((b'UnlockableTransport',))
 
510
        except errors.LockFailed as e:
 
511
            return FailedSmartServerResponse((b'LockFailed',
 
512
                                              str(e.lock), str(e.why)))
403
513
        if token is not None:
404
514
            repository.leave_lock_in_place()
405
515
        repository.unlock()
406
516
        if token is None:
407
 
            token = ''
408
 
        return SuccessfulSmartServerResponse(('ok', token))
 
517
            token = b''
 
518
        return SuccessfulSmartServerResponse((b'ok', token))
409
519
 
410
520
 
411
521
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
413
523
    def do_repository_request(self, repository, to_network_name):
414
524
        """Get a stream for inserting into a to_format repository.
415
525
 
 
526
        The request body is 'search_bytes', a description of the revisions
 
527
        being requested.
 
528
 
 
529
        In 2.3 this verb added support for search_bytes == 'everything'.  Older
 
530
        implementations will respond with a BadSearch error, and clients should
 
531
        catch this and fallback appropriately.
 
532
 
416
533
        :param repository: The repository to stream from.
417
534
        :param to_network_name: The network name of the format of the target
418
535
            repository.
420
537
        self._to_format = network_format_registry.get(to_network_name)
421
538
        if self._should_fake_unknown():
422
539
            return FailedSmartServerResponse(
423
 
                ('UnknownMethod', 'Repository.get_stream'))
424
 
        return None # Signal that we want a body.
 
540
                (b'UnknownMethod', b'Repository.get_stream'))
 
541
        return None  # Signal that we want a body.
425
542
 
426
543
    def _should_fake_unknown(self):
427
544
        """Return True if we should return UnknownMethod to the client.
428
 
        
 
545
 
429
546
        This is a workaround for bugs in pre-1.19 clients that claim to
430
547
        support receiving streams of CHK repositories.  The pre-1.19 client
431
548
        expects inventory records to be serialized in the format defined by
444
561
        if not from_format.supports_chks:
445
562
            # Source not CHK: that's ok
446
563
            return False
447
 
        if (to_format.supports_chks and
448
 
            from_format.repository_class is to_format.repository_class and
449
 
            from_format._serializer == to_format._serializer):
 
564
        if (to_format.supports_chks
 
565
            and from_format.repository_class is to_format.repository_class
 
566
                and from_format._serializer == to_format._serializer):
450
567
            # Source is CHK, but target matches: that's ok
451
568
            # (e.g. 2a->2a, or CHK2->2a)
452
569
            return False
459
576
        repository.lock_read()
460
577
        try:
461
578
            search_result, error = self.recreate_search(repository, body_bytes,
462
 
                discard_excess=True)
 
579
                                                        discard_excess=True)
463
580
            if error is not None:
464
581
                repository.unlock()
465
582
                return error
466
583
            source = repository._get_source(self._to_format)
467
584
            stream = source.get_stream(search_result)
468
585
        except Exception:
469
 
            exc_info = sys.exc_info()
470
586
            try:
471
587
                # On non-error, unlocking is done by the body stream handler.
472
588
                repository.unlock()
473
589
            finally:
474
 
                raise exc_info[0], exc_info[1], exc_info[2]
475
 
        return SuccessfulSmartServerResponse(('ok',),
476
 
            body_stream=self.body_stream(stream, repository))
 
590
                raise
 
591
        return SuccessfulSmartServerResponse((b'ok',),
 
592
                                             body_stream=self.body_stream(stream, repository))
477
593
 
478
594
    def body_stream(self, stream, repository):
479
595
        byte_stream = _stream_to_byte_stream(stream, repository._format)
480
596
        try:
481
597
            for bytes in byte_stream:
482
598
                yield bytes
483
 
        except errors.RevisionNotPresent, e:
 
599
        except errors.RevisionNotPresent as e:
484
600
            # This shouldn't be able to happen, but as we don't buffer
485
601
            # everything it can in theory happen.
486
602
            repository.unlock()
487
 
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
603
            yield FailedSmartServerResponse((b'NoSuchRevision', e.revision_id))
488
604
        else:
489
605
            repository.unlock()
490
606
 
491
607
 
492
608
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
 
609
    """The same as Repository.get_stream, but will return stream CHK formats to
 
610
    clients.
 
611
 
 
612
    See SmartServerRepositoryGetStream._should_fake_unknown.
 
613
 
 
614
    New in 1.19.
 
615
    """
493
616
 
494
617
    def _should_fake_unknown(self):
495
618
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
500
623
    """Convert a record stream to a self delimited byte stream."""
501
624
    pack_writer = pack.ContainerSerialiser()
502
625
    yield pack_writer.begin()
503
 
    yield pack_writer.bytes_record(src_format.network_name(), '')
 
626
    yield pack_writer.bytes_record(src_format.network_name(), b'')
504
627
    for substream_type, substream in stream:
505
628
        for record in substream:
506
629
            if record.storage_kind in ('chunked', 'fulltext'):
507
630
                serialised = record_to_fulltext_bytes(record)
508
 
            elif record.storage_kind == 'inventory-delta':
509
 
                serialised = record_to_inventory_delta_bytes(record)
510
631
            elif record.storage_kind == 'absent':
511
632
                raise ValueError("Absent factory for %s" % (record.key,))
512
633
            else:
515
636
                # Some streams embed the whole stream into the wire
516
637
                # representation of the first record, which means that
517
638
                # later records have no wire representation: we skip them.
518
 
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
 
639
                yield pack_writer.bytes_record(serialised, [(substream_type.encode('ascii'),)])
519
640
    yield pack_writer.end()
520
641
 
521
642
 
544
665
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
545
666
    """
546
667
 
547
 
    def __init__(self, byte_stream):
 
668
    def __init__(self, byte_stream, record_counter):
548
669
        """Create a _ByteStreamDecoder."""
549
670
        self.stream_decoder = pack.ContainerPushParser()
550
671
        self.current_type = None
551
672
        self.first_bytes = None
552
673
        self.byte_stream = byte_stream
 
674
        self._record_counter = record_counter
 
675
        self.key_count = 0
553
676
 
554
677
    def iter_stream_decoder(self):
555
678
        """Iterate the contents of the pack from stream_decoder."""
580
703
 
581
704
    def record_stream(self):
582
705
        """Yield substream_type, substream from the byte stream."""
 
706
        def wrap_and_count(pb, rc, substream):
 
707
            """Yield records from stream while showing progress."""
 
708
            counter = 0
 
709
            if rc:
 
710
                if self.current_type != 'revisions' and self.key_count != 0:
 
711
                    # As we know the number of revisions now (in self.key_count)
 
712
                    # we can setup and use record_counter (rc).
 
713
                    if not rc.is_initialized():
 
714
                        rc.setup(self.key_count, self.key_count)
 
715
            for record in substream.read():
 
716
                if rc:
 
717
                    if rc.is_initialized() and counter == rc.STEP:
 
718
                        rc.increment(counter)
 
719
                        pb.update('Estimate', rc.current, rc.max)
 
720
                        counter = 0
 
721
                    if self.current_type == 'revisions':
 
722
                        # Total records is proportional to number of revs
 
723
                        # to fetch. With remote, we used self.key_count to
 
724
                        # track the number of revs. Once we have the revs
 
725
                        # counts in self.key_count, the progress bar changes
 
726
                        # from 'Estimating..' to 'Estimate' above.
 
727
                        self.key_count += 1
 
728
                        if counter == rc.STEP:
 
729
                            pb.update('Estimating..', self.key_count)
 
730
                            counter = 0
 
731
                counter += 1
 
732
                yield record
 
733
 
583
734
        self.seed_state()
584
 
        # Make and consume sub generators, one per substream type:
585
 
        while self.first_bytes is not None:
586
 
            substream = NetworkRecordStream(self.iter_substream_bytes())
587
 
            # after substream is fully consumed, self.current_type is set to
588
 
            # the next type, and self.first_bytes is set to the matching bytes.
589
 
            yield self.current_type, substream.read()
 
735
        with ui.ui_factory.nested_progress_bar() as pb:
 
736
            rc = self._record_counter
 
737
            try:
 
738
                # Make and consume sub generators, one per substream type:
 
739
                while self.first_bytes is not None:
 
740
                    substream = NetworkRecordStream(
 
741
                        self.iter_substream_bytes())
 
742
                    # after substream is fully consumed, self.current_type is set
 
743
                    # to the next type, and self.first_bytes is set to the matching
 
744
                    # bytes.
 
745
                    yield self.current_type.decode('ascii'), wrap_and_count(pb, rc, substream)
 
746
            finally:
 
747
                if rc:
 
748
                    pb.update('Done', rc.max, rc.max)
590
749
 
591
750
    def seed_state(self):
592
751
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
597
756
        list(self.iter_substream_bytes())
598
757
 
599
758
 
600
 
def _byte_stream_to_stream(byte_stream):
 
759
def _byte_stream_to_stream(byte_stream, record_counter=None):
601
760
    """Convert a byte stream into a format and a stream.
602
761
 
603
762
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
604
763
    :return: (RepositoryFormat, stream_generator)
605
764
    """
606
 
    decoder = _ByteStreamDecoder(byte_stream)
 
765
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
607
766
    for bytes in byte_stream:
608
767
        decoder.stream_decoder.accept_bytes(bytes)
609
768
        for record in decoder.stream_decoder.read_pending_records(max=1):
617
776
    def do_repository_request(self, repository, token):
618
777
        try:
619
778
            repository.lock_write(token=token)
620
 
        except errors.TokenMismatch, e:
621
 
            return FailedSmartServerResponse(('TokenMismatch',))
 
779
        except errors.TokenMismatch as e:
 
780
            return FailedSmartServerResponse((b'TokenMismatch',))
622
781
        repository.dont_leave_lock_in_place()
623
782
        repository.unlock()
624
 
        return SuccessfulSmartServerResponse(('ok',))
 
783
        return SuccessfulSmartServerResponse((b'ok',))
 
784
 
 
785
 
 
786
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
 
787
    """Get the physical lock status for a repository.
 
788
 
 
789
    New in 2.5.
 
790
    """
 
791
 
 
792
    def do_repository_request(self, repository):
 
793
        if repository.get_physical_lock_status():
 
794
            return SuccessfulSmartServerResponse((b'yes', ))
 
795
        else:
 
796
            return SuccessfulSmartServerResponse((b'no', ))
625
797
 
626
798
 
627
799
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
628
800
 
629
801
    def do_repository_request(self, repository, str_bool_new_value):
630
 
        if str_bool_new_value == 'True':
 
802
        if str_bool_new_value == b'True':
631
803
            new_value = True
632
804
        else:
633
805
            new_value = False
634
806
        repository.set_make_working_trees(new_value)
635
 
        return SuccessfulSmartServerResponse(('ok',))
 
807
        return SuccessfulSmartServerResponse((b'ok',))
636
808
 
637
809
 
638
810
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
657
829
 
658
830
    def _copy_to_tempdir(self, from_repo):
659
831
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
660
 
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
 
832
        tmp_bzrdir = from_repo.controldir._format.initialize(tmp_dirname)
661
833
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
662
834
        from_repo.copy_content_into(tmp_repo)
663
835
        return tmp_dirname, tmp_repo
664
836
 
665
837
    def _tarfile_response(self, tmp_dirname, compression):
666
 
        temp = tempfile.NamedTemporaryFile()
667
 
        try:
 
838
        with tempfile.NamedTemporaryFile() as temp:
668
839
            self._tarball_of_dir(tmp_dirname, compression, temp.file)
669
840
            # all finished; write the tempfile out to the network
670
841
            temp.seek(0)
671
 
            return SuccessfulSmartServerResponse(('ok',), temp.read())
 
842
            return SuccessfulSmartServerResponse((b'ok',), temp.read())
672
843
            # FIXME: Don't read the whole thing into memory here; rather stream
673
844
            # it out from the file onto the network. mbp 20070411
674
 
        finally:
675
 
            temp.close()
676
845
 
677
846
    def _tarball_of_dir(self, dirname, compression, ofile):
678
847
        import tarfile
679
848
        filename = os.path.basename(ofile.name)
680
 
        tarball = tarfile.open(fileobj=ofile, name=filename,
681
 
            mode='w|' + compression)
682
 
        try:
 
849
        with tarfile.open(fileobj=ofile, name=filename,
 
850
                          mode='w|' + compression) as tarball:
683
851
            # The tarball module only accepts ascii names, and (i guess)
684
852
            # packs them with their 8bit names.  We know all the files
685
853
            # within the repository have ASCII names so the should be safe
689
857
            # override it
690
858
            if not dirname.endswith('.bzr'):
691
859
                raise ValueError(dirname)
692
 
            tarball.add(dirname, '.bzr') # recursive by default
693
 
        finally:
694
 
            tarball.close()
 
860
            tarball.add(dirname, '.bzr')  # recursive by default
695
861
 
696
862
 
697
863
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
710
876
        self.do_insert_stream_request(repository, resume_tokens)
711
877
 
712
878
    def do_insert_stream_request(self, repository, resume_tokens):
713
 
        tokens = [token for token in resume_tokens.split(' ') if token]
 
879
        tokens = [token.decode('utf-8')
 
880
                  for token in resume_tokens.split(b' ') if token]
714
881
        self.tokens = tokens
715
882
        self.repository = repository
716
 
        self.queue = Queue.Queue()
 
883
        self.queue = queue.Queue()
717
884
        self.insert_thread = threading.Thread(target=self._inserter_thread)
718
885
        self.insert_thread.start()
719
886
 
744
911
        if self.insert_thread is not None:
745
912
            self.insert_thread.join()
746
913
        if not self.insert_ok:
747
 
            exc_info = self.insert_exception
748
 
            raise exc_info[0], exc_info[1], exc_info[2]
 
914
            (exc_type, exc_val, exc_tb) = self.insert_exception
 
915
            try:
 
916
                raise exc_val
 
917
            finally:
 
918
                del self.insert_exception
749
919
        write_group_tokens, missing_keys = self.insert_result
750
920
        if write_group_tokens or missing_keys:
751
921
            # bzip needed? missing keys should typically be a small set.
752
922
            # Should this be a streaming body response ?
753
 
            missing_keys = sorted(missing_keys)
754
 
            bytes = bencode.bencode((write_group_tokens, missing_keys))
 
923
            missing_keys = sorted(
 
924
                [(entry[0].encode('utf-8'),) + entry[1:] for entry in missing_keys])
 
925
            bytes = bencode.bencode((
 
926
                [token.encode('utf-8') for token in write_group_tokens], missing_keys))
755
927
            self.repository.unlock()
756
 
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
 
928
            return SuccessfulSmartServerResponse((b'missing-basis', bytes))
757
929
        else:
758
930
            self.repository.unlock()
759
 
            return SuccessfulSmartServerResponse(('ok', ))
 
931
            return SuccessfulSmartServerResponse((b'ok', ))
760
932
 
761
933
 
762
934
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
792
964
        self.do_insert_stream_request(repository, resume_tokens)
793
965
 
794
966
 
 
967
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
 
968
    """Add a revision signature text.
 
969
 
 
970
    New in 2.5.
 
971
    """
 
972
 
 
973
    def do_repository_request(self, repository, lock_token, revision_id,
 
974
                              *write_group_tokens):
 
975
        """Add a revision signature text.
 
976
 
 
977
        :param repository: Repository to operate on
 
978
        :param lock_token: Lock token
 
979
        :param revision_id: Revision for which to add signature
 
980
        :param write_group_tokens: Write group tokens
 
981
        """
 
982
        self._lock_token = lock_token
 
983
        self._revision_id = revision_id
 
984
        self._write_group_tokens = [token.decode(
 
985
            'utf-8') for token in write_group_tokens]
 
986
        return None
 
987
 
 
988
    def do_body(self, body_bytes):
 
989
        """Add a signature text.
 
990
 
 
991
        :param body_bytes: GPG signature text
 
992
        :return: SuccessfulSmartServerResponse with arguments 'ok' and
 
993
            the list of new write group tokens.
 
994
        """
 
995
        with self._repository.lock_write(token=self._lock_token):
 
996
            self._repository.resume_write_group(self._write_group_tokens)
 
997
            try:
 
998
                self._repository.add_signature_text(self._revision_id,
 
999
                                                    body_bytes)
 
1000
            finally:
 
1001
                new_write_group_tokens = self._repository.suspend_write_group()
 
1002
        return SuccessfulSmartServerResponse(
 
1003
            (b'ok', ) + tuple([token.encode('utf-8') for token in new_write_group_tokens]))
 
1004
 
 
1005
 
 
1006
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
 
1007
    """Start a write group.
 
1008
 
 
1009
    New in 2.5.
 
1010
    """
 
1011
 
 
1012
    def do_repository_request(self, repository, lock_token):
 
1013
        """Start a write group."""
 
1014
        with repository.lock_write(token=lock_token):
 
1015
            repository.start_write_group()
 
1016
            try:
 
1017
                tokens = repository.suspend_write_group()
 
1018
            except errors.UnsuspendableWriteGroup:
 
1019
                return FailedSmartServerResponse((b'UnsuspendableWriteGroup',))
 
1020
        return SuccessfulSmartServerResponse((b'ok', tokens))
 
1021
 
 
1022
 
 
1023
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
 
1024
    """Commit a write group.
 
1025
 
 
1026
    New in 2.5.
 
1027
    """
 
1028
 
 
1029
    def do_repository_request(self, repository, lock_token,
 
1030
                              write_group_tokens):
 
1031
        """Commit a write group."""
 
1032
        with repository.lock_write(token=lock_token):
 
1033
            try:
 
1034
                repository.resume_write_group(
 
1035
                    [token.decode('utf-8') for token in write_group_tokens])
 
1036
            except errors.UnresumableWriteGroup as e:
 
1037
                return FailedSmartServerResponse(
 
1038
                    (b'UnresumableWriteGroup', [token.encode('utf-8') for token
 
1039
                                                in e.write_groups], e.reason.encode('utf-8')))
 
1040
            try:
 
1041
                repository.commit_write_group()
 
1042
            except:
 
1043
                write_group_tokens = repository.suspend_write_group()
 
1044
                # FIXME JRV 2011-11-19: What if the write_group_tokens
 
1045
                # have changed?
 
1046
                raise
 
1047
        return SuccessfulSmartServerResponse((b'ok', ))
 
1048
 
 
1049
 
 
1050
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
 
1051
    """Abort a write group.
 
1052
 
 
1053
    New in 2.5.
 
1054
    """
 
1055
 
 
1056
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1057
        """Abort a write group."""
 
1058
        with repository.lock_write(token=lock_token):
 
1059
            try:
 
1060
                repository.resume_write_group(
 
1061
                    [token.decode('utf-8') for token in write_group_tokens])
 
1062
            except errors.UnresumableWriteGroup as e:
 
1063
                return FailedSmartServerResponse(
 
1064
                    (b'UnresumableWriteGroup',
 
1065
                        [token.encode('utf-8') for token in e.write_groups],
 
1066
                        e.reason.encode('utf-8')))
 
1067
                repository.abort_write_group()
 
1068
        return SuccessfulSmartServerResponse((b'ok', ))
 
1069
 
 
1070
 
 
1071
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
 
1072
    """Check that a write group is still valid.
 
1073
 
 
1074
    New in 2.5.
 
1075
    """
 
1076
 
 
1077
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1078
        """Abort a write group."""
 
1079
        with repository.lock_write(token=lock_token):
 
1080
            try:
 
1081
                repository.resume_write_group(
 
1082
                    [token.decode('utf-8') for token in write_group_tokens])
 
1083
            except errors.UnresumableWriteGroup as e:
 
1084
                return FailedSmartServerResponse(
 
1085
                    (b'UnresumableWriteGroup',
 
1086
                        [token.encode('utf-8') for token in e.write_groups],
 
1087
                        e.reason.encode('utf-8')))
 
1088
            else:
 
1089
                repository.suspend_write_group()
 
1090
        return SuccessfulSmartServerResponse((b'ok', ))
 
1091
 
 
1092
 
 
1093
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
 
1094
    """Retrieve all of the revision ids in a repository.
 
1095
 
 
1096
    New in 2.5.
 
1097
    """
 
1098
 
 
1099
    def do_repository_request(self, repository):
 
1100
        revids = repository.all_revision_ids()
 
1101
        return SuccessfulSmartServerResponse((b"ok", ), b"\n".join(revids))
 
1102
 
 
1103
 
 
1104
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
 
1105
    """Reconcile a repository.
 
1106
 
 
1107
    New in 2.5.
 
1108
    """
 
1109
 
 
1110
    def do_repository_request(self, repository, lock_token):
 
1111
        try:
 
1112
            repository.lock_write(token=lock_token)
 
1113
        except errors.TokenLockingNotSupported as e:
 
1114
            return FailedSmartServerResponse(
 
1115
                (b'TokenLockingNotSupported', ))
 
1116
        try:
 
1117
            reconciler = repository.reconcile()
 
1118
        finally:
 
1119
            repository.unlock()
 
1120
        body = [
 
1121
            b"garbage_inventories: %d\n" % reconciler.garbage_inventories,
 
1122
            b"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
 
1123
            ]
 
1124
        return SuccessfulSmartServerResponse((b'ok', ), b"".join(body))
 
1125
 
 
1126
 
 
1127
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
 
1128
    """Pack a repository.
 
1129
 
 
1130
    New in 2.5.
 
1131
    """
 
1132
 
 
1133
    def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
 
1134
        self._repository = repository
 
1135
        self._lock_token = lock_token
 
1136
        if clean_obsolete_packs == b'True':
 
1137
            self._clean_obsolete_packs = True
 
1138
        else:
 
1139
            self._clean_obsolete_packs = False
 
1140
        return None
 
1141
 
 
1142
    def do_body(self, body_bytes):
 
1143
        if body_bytes == "":
 
1144
            hint = None
 
1145
        else:
 
1146
            hint = body_bytes.splitlines()
 
1147
        with self._repository.lock_write(token=self._lock_token):
 
1148
            self._repository.pack(hint, self._clean_obsolete_packs)
 
1149
        return SuccessfulSmartServerResponse((b"ok", ), )
 
1150
 
 
1151
 
 
1152
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
 
1153
    """Iterate over the contents of files.
 
1154
 
 
1155
    The client sends a list of desired files to stream, one
 
1156
    per line, and as tuples of file id and revision, separated by
 
1157
    \0.
 
1158
 
 
1159
    The server replies with a stream. Each entry is preceded by a header,
 
1160
    which can either be:
 
1161
 
 
1162
    * "ok\x00IDX\n" where IDX is the index of the entry in the desired files
 
1163
        list sent by the client. This header is followed by the contents of
 
1164
        the file, bzip2-compressed.
 
1165
    * "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
 
1166
        The client can then raise an appropriate RevisionNotPresent error
 
1167
        or check its fallback repositories.
 
1168
 
 
1169
    New in 2.5.
 
1170
    """
 
1171
 
 
1172
    def body_stream(self, repository, desired_files):
 
1173
        with self._repository.lock_read():
 
1174
            text_keys = {}
 
1175
            for i, key in enumerate(desired_files):
 
1176
                text_keys[key] = i
 
1177
            for record in repository.texts.get_record_stream(text_keys,
 
1178
                                                             'unordered', True):
 
1179
                identifier = text_keys[record.key]
 
1180
                if record.storage_kind == 'absent':
 
1181
                    yield b"absent\0%s\0%s\0%d\n" % (record.key[0],
 
1182
                                                     record.key[1], identifier)
 
1183
                    # FIXME: Way to abort early?
 
1184
                    continue
 
1185
                yield b"ok\0%d\n" % identifier
 
1186
                compressor = zlib.compressobj()
 
1187
                for bytes in record.iter_bytes_as('chunked'):
 
1188
                    data = compressor.compress(bytes)
 
1189
                    if data:
 
1190
                        yield data
 
1191
                data = compressor.flush()
 
1192
                if data:
 
1193
                    yield data
 
1194
 
 
1195
    def do_body(self, body_bytes):
 
1196
        desired_files = [
 
1197
            tuple(l.split(b"\0")) for l in body_bytes.splitlines()]
 
1198
        return SuccessfulSmartServerResponse((b'ok', ),
 
1199
                                             body_stream=self.body_stream(self._repository, desired_files))
 
1200
 
 
1201
    def do_repository_request(self, repository):
 
1202
        # Signal that we want a body
 
1203
        return None
 
1204
 
 
1205
 
 
1206
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
 
1207
    """Stream a list of revisions.
 
1208
 
 
1209
    The client sends a list of newline-separated revision ids in the
 
1210
    body of the request and the server replies with the serializer format,
 
1211
    and a stream of bzip2-compressed revision texts (using the specified
 
1212
    serializer format).
 
1213
 
 
1214
    Any revisions the server does not have are omitted from the stream.
 
1215
 
 
1216
    New in 2.5.
 
1217
    """
 
1218
 
 
1219
    def do_repository_request(self, repository):
 
1220
        self._repository = repository
 
1221
        # Signal there is a body
 
1222
        return None
 
1223
 
 
1224
    def do_body(self, body_bytes):
 
1225
        revision_ids = body_bytes.split(b"\n")
 
1226
        return SuccessfulSmartServerResponse(
 
1227
            (b'ok', self._repository.get_serializer_format()),
 
1228
            body_stream=self.body_stream(self._repository, revision_ids))
 
1229
 
 
1230
    def body_stream(self, repository, revision_ids):
 
1231
        with self._repository.lock_read():
 
1232
            for record in repository.revisions.get_record_stream(
 
1233
                    [(revid,) for revid in revision_ids], 'unordered', True):
 
1234
                if record.storage_kind == 'absent':
 
1235
                    continue
 
1236
                yield zlib.compress(record.get_bytes_as('fulltext'))
 
1237
 
 
1238
 
 
1239
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
 
1240
    """Get the inventory deltas for a set of revision ids.
 
1241
 
 
1242
    This accepts a list of revision ids, and then sends a chain
 
1243
    of deltas for the inventories of those revisions. The first
 
1244
    revision will be empty.
 
1245
 
 
1246
    The server writes back zlibbed serialized inventory deltas,
 
1247
    in the ordering specified. The base for each delta is the
 
1248
    inventory generated by the previous delta.
 
1249
 
 
1250
    New in 2.5.
 
1251
    """
 
1252
 
 
1253
    def _inventory_delta_stream(self, repository, ordering, revids):
 
1254
        prev_inv = _mod_inventory.Inventory(root_id=None,
 
1255
                                            revision_id=_mod_revision.NULL_REVISION)
 
1256
        serializer = inventory_delta.InventoryDeltaSerializer(
 
1257
            repository.supports_rich_root(),
 
1258
            repository._format.supports_tree_reference)
 
1259
        with repository.lock_read():
 
1260
            for inv, revid in repository._iter_inventories(revids, ordering):
 
1261
                if inv is None:
 
1262
                    continue
 
1263
                inv_delta = inv._make_delta(prev_inv)
 
1264
                lines = serializer.delta_to_lines(
 
1265
                    prev_inv.revision_id, inv.revision_id, inv_delta)
 
1266
                yield ChunkedContentFactory(
 
1267
                    inv.revision_id, None, None, lines,
 
1268
                    chunks_are_lines=True)
 
1269
                prev_inv = inv
 
1270
 
 
1271
    def body_stream(self, repository, ordering, revids):
 
1272
        substream = self._inventory_delta_stream(repository,
 
1273
                                                 ordering, revids)
 
1274
        return _stream_to_byte_stream([('inventory-deltas', substream)],
 
1275
                                      repository._format)
 
1276
 
 
1277
    def do_body(self, body_bytes):
 
1278
        return SuccessfulSmartServerResponse((b'ok', ),
 
1279
                                             body_stream=self.body_stream(self._repository, self._ordering,
 
1280
                                                                          body_bytes.splitlines()))
 
1281
 
 
1282
    def do_repository_request(self, repository, ordering):
 
1283
        ordering = ordering.decode('ascii')
 
1284
        if ordering == 'unordered':
 
1285
            # inventory deltas for a topologically sorted stream
 
1286
            # are likely to be smaller
 
1287
            ordering = 'topological'
 
1288
        self._ordering = ordering
 
1289
        # Signal that we want a body
 
1290
        return None
 
1291
 
 
1292
 
 
1293
class SmartServerRepositoryGetStreamForMissingKeys(SmartServerRepositoryRequest):
 
1294
 
 
1295
    def do_repository_request(self, repository, to_network_name):
 
1296
        """Get a stream for missing keys.
 
1297
 
 
1298
        :param repository: The repository to stream from.
 
1299
        :param to_network_name: The network name of the format of the target
 
1300
            repository.
 
1301
        """
 
1302
        try:
 
1303
            self._to_format = network_format_registry.get(to_network_name)
 
1304
        except KeyError:
 
1305
            return FailedSmartServerResponse(
 
1306
                (b'UnknownFormat', b'repository', to_network_name))
 
1307
        return None  # Signal that we want a body.
 
1308
 
 
1309
    def do_body(self, body_bytes):
 
1310
        repository = self._repository
 
1311
        repository.lock_read()
 
1312
        try:
 
1313
            source = repository._get_source(self._to_format)
 
1314
            keys = []
 
1315
            for entry in body_bytes.split(b'\n'):
 
1316
                (kind, revid) = entry.split(b'\t')
 
1317
                keys.append((kind.decode('utf-8'), revid))
 
1318
            stream = source.get_stream_for_missing_keys(keys)
 
1319
        except Exception:
 
1320
            try:
 
1321
                # On non-error, unlocking is done by the body stream handler.
 
1322
                repository.unlock()
 
1323
            finally:
 
1324
                raise
 
1325
        return SuccessfulSmartServerResponse((b'ok',),
 
1326
                                             body_stream=self.body_stream(stream, repository))
 
1327
 
 
1328
    def body_stream(self, stream, repository):
 
1329
        byte_stream = _stream_to_byte_stream(stream, repository._format)
 
1330
        try:
 
1331
            for bytes in byte_stream:
 
1332
                yield bytes
 
1333
        except errors.RevisionNotPresent as e:
 
1334
            # This shouldn't be able to happen, but as we don't buffer
 
1335
            # everything it can in theory happen.
 
1336
            repository.unlock()
 
1337
            yield FailedSmartServerResponse((b'NoSuchRevision', e.revision_id))
 
1338
        else:
 
1339
            repository.unlock()
 
1340
 
 
1341
 
 
1342
class SmartServerRepositoryRevisionArchive(SmartServerRepositoryRequest):
 
1343
 
 
1344
    def do_repository_request(self, repository, revision_id, format, name,
 
1345
                              root, subdir=None, force_mtime=None):
 
1346
        """Stream an archive file for a specific revision.
 
1347
        :param repository: The repository to stream from.
 
1348
        :param revision_id: Revision for which to export the tree
 
1349
        :param format: Format (tar, tgz, tbz2, etc)
 
1350
        :param name: Target file name
 
1351
        :param root: Name of root directory (or '')
 
1352
        :param subdir: Subdirectory to export, if not the root
 
1353
        """
 
1354
        tree = repository.revision_tree(revision_id)
 
1355
        if subdir is not None:
 
1356
            subdir = subdir.decode('utf-8')
 
1357
        if root is not None:
 
1358
            root = root.decode('utf-8')
 
1359
        name = name.decode('utf-8')
 
1360
        return SuccessfulSmartServerResponse((b'ok',),
 
1361
                                             body_stream=self.body_stream(
 
1362
            tree, format.decode(
 
1363
                'utf-8'), os.path.basename(name), root, subdir,
 
1364
            force_mtime))
 
1365
 
 
1366
    def body_stream(self, tree, format, name, root, subdir=None, force_mtime=None):
 
1367
        with tree.lock_read():
 
1368
            return tree.archive(format, name, root, subdir, force_mtime)
 
1369
 
 
1370
 
 
1371
class SmartServerRepositoryAnnotateFileRevision(SmartServerRepositoryRequest):
 
1372
 
 
1373
    def do_repository_request(self, repository, revision_id, tree_path,
 
1374
                              file_id=None, default_revision=None):
 
1375
        """Stream an archive file for a specific revision.
 
1376
 
 
1377
        :param repository: The repository to stream from.
 
1378
        :param revision_id: Revision for which to export the tree
 
1379
        :param tree_path: The path inside the tree
 
1380
        :param file_id: Optional file_id for the file
 
1381
        :param default_revision: Default revision
 
1382
        """
 
1383
        tree = repository.revision_tree(revision_id)
 
1384
        with tree.lock_read():
 
1385
            body = bencode.bencode(list(tree.annotate_iter(
 
1386
                tree_path.decode('utf-8'), default_revision)))
 
1387
            return SuccessfulSmartServerResponse((b'ok',), body=body)