/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Jelmer Vernooij
  • Date: 2009-03-12 14:02:53 UTC
  • mfrom: (4135 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4137.
  • Revision ID: jelmer@samba.org-20090312140253-bmldbzlmsitfdrzf
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Server-side repository related request implmentations."""
 
18
 
 
19
import bz2
 
20
import os
 
21
import Queue
 
22
import struct
 
23
import sys
 
24
import tarfile
 
25
import tempfile
 
26
import threading
 
27
 
 
28
from bzrlib import (
 
29
    errors,
 
30
    graph,
 
31
    osutils,
 
32
    pack,
 
33
    )
 
34
from bzrlib.bzrdir import BzrDir
 
35
from bzrlib.smart.request import (
 
36
    FailedSmartServerResponse,
 
37
    SmartServerRequest,
 
38
    SuccessfulSmartServerResponse,
 
39
    )
 
40
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
 
41
from bzrlib import revision as _mod_revision
 
42
from bzrlib.util import bencode
 
43
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
 
44
 
 
45
 
 
46
class SmartServerRepositoryRequest(SmartServerRequest):
 
47
    """Common base class for Repository requests."""
 
48
 
 
49
    def do(self, path, *args):
 
50
        """Execute a repository request.
 
51
 
 
52
        All Repository requests take a path to the repository as their first
 
53
        argument.  The repository must be at the exact path given by the
 
54
        client - no searching is done.
 
55
 
 
56
        The actual logic is delegated to self.do_repository_request.
 
57
 
 
58
        :param client_path: The path for the repository as received from the
 
59
            client.
 
60
        :return: A SmartServerResponse from self.do_repository_request().
 
61
        """
 
62
        transport = self.transport_from_client_path(path)
 
63
        bzrdir = BzrDir.open_from_transport(transport)
 
64
        # Save the repository for use with do_body.
 
65
        self._repository = bzrdir.open_repository()
 
66
        return self.do_repository_request(self._repository, *args)
 
67
 
 
68
    def do_repository_request(self, repository, *args):
 
69
        """Override to provide an implementation for a verb."""
 
70
        # No-op for verbs that take bodies (None as a result indicates a body
 
71
        # is expected)
 
72
        return None
 
73
 
 
74
    def recreate_search(self, repository, search_bytes):
 
75
        lines = search_bytes.split('\n')
 
76
        if lines[0] == 'ancestry-of':
 
77
            heads = lines[1:]
 
78
            search_result = graph.PendingAncestryResult(heads, repository)
 
79
            return search_result, None
 
80
        elif lines[0] == 'search':
 
81
            return self.recreate_search_from_recipe(repository, lines[1:])
 
82
        else:
 
83
            return (None, FailedSmartServerResponse(('BadSearch',)))
 
84
 
 
85
    def recreate_search_from_recipe(self, repository, lines):
 
86
        start_keys = set(lines[0].split(' '))
 
87
        exclude_keys = set(lines[1].split(' '))
 
88
        revision_count = int(lines[2])
 
89
        repository.lock_read()
 
90
        try:
 
91
            search = repository.get_graph()._make_breadth_first_searcher(
 
92
                start_keys)
 
93
            while True:
 
94
                try:
 
95
                    next_revs = search.next()
 
96
                except StopIteration:
 
97
                    break
 
98
                search.stop_searching_any(exclude_keys.intersection(next_revs))
 
99
            search_result = search.get_result()
 
100
            if search_result.get_recipe()[2] != revision_count:
 
101
                # we got back a different amount of data than expected, this
 
102
                # gets reported as NoSuchRevision, because less revisions
 
103
                # indicates missing revisions, and more should never happen as
 
104
                # the excludes list considers ghosts and ensures that ghost
 
105
                # filling races are not a problem.
 
106
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
107
            return (search_result, None)
 
108
        finally:
 
109
            repository.unlock()
 
110
 
 
111
 
 
112
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
 
113
    """Calls self.do_readlocked_repository_request."""
 
114
 
 
115
    def do_repository_request(self, repository, *args):
 
116
        """Read lock a repository for do_readlocked_repository_request."""
 
117
        repository.lock_read()
 
118
        try:
 
119
            return self.do_readlocked_repository_request(repository, *args)
 
120
        finally:
 
121
            repository.unlock()
 
122
 
 
123
 
 
124
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
 
125
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
 
126
 
 
127
    no_extra_results = False
 
128
 
 
129
    def do_repository_request(self, repository, *revision_ids):
 
130
        """Get parent details for some revisions.
 
131
 
 
132
        All the parents for revision_ids are returned. Additionally up to 64KB
 
133
        of additional parent data found by performing a breadth first search
 
134
        from revision_ids is returned. The verb takes a body containing the
 
135
        current search state, see do_body for details.
 
136
 
 
137
        :param repository: The repository to query in.
 
138
        :param revision_ids: The utf8 encoded revision_id to answer for.
 
139
        """
 
140
        self._revision_ids = revision_ids
 
141
        return None # Signal that we want a body.
 
142
 
 
143
    def do_body(self, body_bytes):
 
144
        """Process the current search state and perform the parent lookup.
 
145
 
 
146
        :return: A smart server response where the body contains an utf8
 
147
            encoded flattened list of the parents of the revisions (the same
 
148
            format as Repository.get_revision_graph) which has been bz2
 
149
            compressed.
 
150
        """
 
151
        repository = self._repository
 
152
        repository.lock_read()
 
153
        try:
 
154
            return self._do_repository_request(body_bytes)
 
155
        finally:
 
156
            repository.unlock()
 
157
 
 
158
    def _do_repository_request(self, body_bytes):
 
159
        repository = self._repository
 
160
        revision_ids = set(self._revision_ids)
 
161
        body_lines = body_bytes.split('\n')
 
162
        search_result, error = self.recreate_search_from_recipe(
 
163
            repository, body_lines)
 
164
        if error is not None:
 
165
            return error
 
166
        # TODO might be nice to start up the search again; but thats not
 
167
        # written or tested yet.
 
168
        client_seen_revs = set(search_result.get_keys())
 
169
        # Always include the requested ids.
 
170
        client_seen_revs.difference_update(revision_ids)
 
171
        lines = []
 
172
        repo_graph = repository.get_graph()
 
173
        result = {}
 
174
        queried_revs = set()
 
175
        size_so_far = 0
 
176
        next_revs = revision_ids
 
177
        first_loop_done = False
 
178
        while next_revs:
 
179
            queried_revs.update(next_revs)
 
180
            parent_map = repo_graph.get_parent_map(next_revs)
 
181
            next_revs = set()
 
182
            for revision_id, parents in parent_map.iteritems():
 
183
                # adjust for the wire
 
184
                if parents == (_mod_revision.NULL_REVISION,):
 
185
                    parents = ()
 
186
                # prepare the next query
 
187
                next_revs.update(parents)
 
188
                if revision_id not in client_seen_revs:
 
189
                    # Client does not have this revision, give it to it.
 
190
                    # add parents to the result
 
191
                    result[revision_id] = parents
 
192
                    # Approximate the serialized cost of this revision_id.
 
193
                    size_so_far += 2 + len(revision_id) + sum(map(len, parents))
 
194
            # get all the directly asked for parents, and then flesh out to
 
195
            # 64K (compressed) or so. We do one level of depth at a time to
 
196
            # stay in sync with the client. The 250000 magic number is
 
197
            # estimated compression ratio taken from bzr.dev itself.
 
198
            if self.no_extra_results or (
 
199
                first_loop_done and size_so_far > 250000):
 
200
                next_revs = set()
 
201
                break
 
202
            # don't query things we've already queried
 
203
            next_revs.difference_update(queried_revs)
 
204
            first_loop_done = True
 
205
 
 
206
        # sorting trivially puts lexographically similar revision ids together.
 
207
        # Compression FTW.
 
208
        for revision, parents in sorted(result.items()):
 
209
            lines.append(' '.join((revision, ) + tuple(parents)))
 
210
 
 
211
        return SuccessfulSmartServerResponse(
 
212
            ('ok', ), bz2.compress('\n'.join(lines)))
 
213
 
 
214
 
 
215
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
 
216
 
 
217
    def do_readlocked_repository_request(self, repository, revision_id):
 
218
        """Return the result of repository.get_revision_graph(revision_id).
 
219
 
 
220
        Deprecated as of bzr 1.4, but supported for older clients.
 
221
 
 
222
        :param repository: The repository to query in.
 
223
        :param revision_id: The utf8 encoded revision_id to get a graph from.
 
224
        :return: A smart server response where the body contains an utf8
 
225
            encoded flattened list of the revision graph.
 
226
        """
 
227
        if not revision_id:
 
228
            revision_id = None
 
229
 
 
230
        lines = []
 
231
        graph = repository.get_graph()
 
232
        if revision_id:
 
233
            search_ids = [revision_id]
 
234
        else:
 
235
            search_ids = repository.all_revision_ids()
 
236
        search = graph._make_breadth_first_searcher(search_ids)
 
237
        transitive_ids = set()
 
238
        map(transitive_ids.update, list(search))
 
239
        parent_map = graph.get_parent_map(transitive_ids)
 
240
        revision_graph = _strip_NULL_ghosts(parent_map)
 
241
        if revision_id and revision_id not in revision_graph:
 
242
            # Note that we return an empty body, rather than omitting the body.
 
243
            # This way the client knows that it can always expect to find a body
 
244
            # in the response for this method, even in the error case.
 
245
            return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
 
246
 
 
247
        for revision, parents in revision_graph.items():
 
248
            lines.append(' '.join((revision, ) + tuple(parents)))
 
249
 
 
250
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
 
251
 
 
252
 
 
253
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
 
254
 
 
255
    def do_repository_request(self, repository, revision_id):
 
256
        """Return ok if a specific revision is in the repository at path.
 
257
 
 
258
        :param repository: The repository to query in.
 
259
        :param revision_id: The utf8 encoded revision_id to lookup.
 
260
        :return: A smart server response of ('ok', ) if the revision is
 
261
            present.
 
262
        """
 
263
        if repository.has_revision(revision_id):
 
264
            return SuccessfulSmartServerResponse(('yes', ))
 
265
        else:
 
266
            return SuccessfulSmartServerResponse(('no', ))
 
267
 
 
268
 
 
269
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
 
270
 
 
271
    def do_repository_request(self, repository, revid, committers):
 
272
        """Return the result of repository.gather_stats().
 
273
 
 
274
        :param repository: The repository to query in.
 
275
        :param revid: utf8 encoded rev id or an empty string to indicate None
 
276
        :param committers: 'yes' or 'no'.
 
277
 
 
278
        :return: A SmartServerResponse ('ok',), a encoded body looking like
 
279
              committers: 1
 
280
              firstrev: 1234.230 0
 
281
              latestrev: 345.700 3600
 
282
              revisions: 2
 
283
 
 
284
              But containing only fields returned by the gather_stats() call
 
285
        """
 
286
        if revid == '':
 
287
            decoded_revision_id = None
 
288
        else:
 
289
            decoded_revision_id = revid
 
290
        if committers == 'yes':
 
291
            decoded_committers = True
 
292
        else:
 
293
            decoded_committers = None
 
294
        stats = repository.gather_stats(decoded_revision_id, decoded_committers)
 
295
 
 
296
        body = ''
 
297
        if stats.has_key('committers'):
 
298
            body += 'committers: %d\n' % stats['committers']
 
299
        if stats.has_key('firstrev'):
 
300
            body += 'firstrev: %.3f %d\n' % stats['firstrev']
 
301
        if stats.has_key('latestrev'):
 
302
             body += 'latestrev: %.3f %d\n' % stats['latestrev']
 
303
        if stats.has_key('revisions'):
 
304
            body += 'revisions: %d\n' % stats['revisions']
 
305
        if stats.has_key('size'):
 
306
            body += 'size: %d\n' % stats['size']
 
307
 
 
308
        return SuccessfulSmartServerResponse(('ok', ), body)
 
309
 
 
310
 
 
311
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
 
312
 
 
313
    def do_repository_request(self, repository):
 
314
        """Return the result of repository.is_shared().
 
315
 
 
316
        :param repository: The repository to query in.
 
317
        :return: A smart server response of ('yes', ) if the repository is
 
318
            shared, and ('no', ) if it is not.
 
319
        """
 
320
        if repository.is_shared():
 
321
            return SuccessfulSmartServerResponse(('yes', ))
 
322
        else:
 
323
            return SuccessfulSmartServerResponse(('no', ))
 
324
 
 
325
 
 
326
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
 
327
 
 
328
    def do_repository_request(self, repository, token=''):
 
329
        # XXX: this probably should not have a token.
 
330
        if token == '':
 
331
            token = None
 
332
        try:
 
333
            token = repository.lock_write(token=token)
 
334
        except errors.LockContention, e:
 
335
            return FailedSmartServerResponse(('LockContention',))
 
336
        except errors.UnlockableTransport:
 
337
            return FailedSmartServerResponse(('UnlockableTransport',))
 
338
        except errors.LockFailed, e:
 
339
            return FailedSmartServerResponse(('LockFailed',
 
340
                str(e.lock), str(e.why)))
 
341
        if token is not None:
 
342
            repository.leave_lock_in_place()
 
343
        repository.unlock()
 
344
        if token is None:
 
345
            token = ''
 
346
        return SuccessfulSmartServerResponse(('ok', token))
 
347
 
 
348
 
 
349
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
 
350
 
 
351
    def do_repository_request(self, repository, to_network_name):
 
352
        """Get a stream for inserting into a to_format repository.
 
353
 
 
354
        :param repository: The repository to stream from.
 
355
        :param to_network_name: The network name of the format of the target
 
356
            repository.
 
357
        """
 
358
        self._to_format = network_format_registry.get(to_network_name)
 
359
        return None # Signal that we want a body.
 
360
 
 
361
    def do_body(self, body_bytes):
 
362
        repository = self._repository
 
363
        repository.lock_read()
 
364
        try:
 
365
            search_result, error = self.recreate_search(repository, body_bytes)
 
366
            if error is not None:
 
367
                repository.unlock()
 
368
                return error
 
369
            source = repository._get_source(self._to_format)
 
370
            stream = source.get_stream(search_result)
 
371
        except Exception:
 
372
            exc_info = sys.exc_info()
 
373
            try:
 
374
                # On non-error, unlocking is done by the body stream handler.
 
375
                repository.unlock()
 
376
            finally:
 
377
                raise exc_info[0], exc_info[1], exc_info[2]
 
378
        return SuccessfulSmartServerResponse(('ok',),
 
379
            body_stream=self.body_stream(stream, repository))
 
380
 
 
381
    def body_stream(self, stream, repository):
 
382
        byte_stream = _stream_to_byte_stream(stream, repository._format)
 
383
        try:
 
384
            for bytes in byte_stream:
 
385
                yield bytes
 
386
        except errors.RevisionNotPresent, e:
 
387
            # This shouldn't be able to happen, but as we don't buffer
 
388
            # everything it can in theory happen.
 
389
            repository.unlock()
 
390
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
391
        else:
 
392
            repository.unlock()
 
393
 
 
394
 
 
395
def _stream_to_byte_stream(stream, src_format):
 
396
    """Convert a record stream to a self delimited byte stream."""
 
397
    pack_writer = pack.ContainerSerialiser()
 
398
    yield pack_writer.begin()
 
399
    yield pack_writer.bytes_record(src_format.network_name(), '')
 
400
    for substream_type, substream in stream:
 
401
        for record in substream:
 
402
            if record.storage_kind in ('chunked', 'fulltext'):
 
403
                serialised = record_to_fulltext_bytes(record)
 
404
            else:
 
405
                serialised = record.get_bytes_as(record.storage_kind)
 
406
            if serialised:
 
407
                # Some streams embed the whole stream into the wire
 
408
                # representation of the first record, which means that
 
409
                # later records have no wire representation: we skip them.
 
410
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
 
411
    yield pack_writer.end()
 
412
 
 
413
 
 
414
def _byte_stream_to_stream(byte_stream):
 
415
    """Convert a byte stream into a format and a stream.
 
416
 
 
417
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
 
418
    :return: (RepositoryFormat, stream_generator)
 
419
    """
 
420
    stream_decoder = pack.ContainerPushParser()
 
421
    def record_stream():
 
422
        """Closure to return the substreams."""
 
423
        # May have fully parsed records already.
 
424
        for record in stream_decoder.read_pending_records():
 
425
            record_names, record_bytes = record
 
426
            record_name, = record_names
 
427
            substream_type = record_name[0]
 
428
            substream = NetworkRecordStream([record_bytes])
 
429
            yield substream_type, substream.read()
 
430
        for bytes in byte_stream:
 
431
            stream_decoder.accept_bytes(bytes)
 
432
            for record in stream_decoder.read_pending_records():
 
433
                record_names, record_bytes = record
 
434
                record_name, = record_names
 
435
                substream_type = record_name[0]
 
436
                substream = NetworkRecordStream([record_bytes])
 
437
                yield substream_type, substream.read()
 
438
    for bytes in byte_stream:
 
439
        stream_decoder.accept_bytes(bytes)
 
440
        for record in stream_decoder.read_pending_records(max=1):
 
441
            record_names, src_format_name = record
 
442
            src_format = network_format_registry.get(src_format_name)
 
443
            return src_format, record_stream()
 
444
 
 
445
 
 
446
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
 
447
 
 
448
    def do_repository_request(self, repository, token):
 
449
        try:
 
450
            repository.lock_write(token=token)
 
451
        except errors.TokenMismatch, e:
 
452
            return FailedSmartServerResponse(('TokenMismatch',))
 
453
        repository.dont_leave_lock_in_place()
 
454
        repository.unlock()
 
455
        return SuccessfulSmartServerResponse(('ok',))
 
456
 
 
457
 
 
458
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
 
459
 
 
460
    def do_repository_request(self, repository, str_bool_new_value):
 
461
        if str_bool_new_value == 'True':
 
462
            new_value = True
 
463
        else:
 
464
            new_value = False
 
465
        repository.set_make_working_trees(new_value)
 
466
        return SuccessfulSmartServerResponse(('ok',))
 
467
 
 
468
 
 
469
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
 
470
    """Get the raw repository files as a tarball.
 
471
 
 
472
    The returned tarball contains a .bzr control directory which in turn
 
473
    contains a repository.
 
474
 
 
475
    This takes one parameter, compression, which currently must be
 
476
    "", "gz", or "bz2".
 
477
 
 
478
    This is used to implement the Repository.copy_content_into operation.
 
479
    """
 
480
 
 
481
    def do_repository_request(self, repository, compression):
 
482
        tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
 
483
        try:
 
484
            controldir_name = tmp_dirname + '/.bzr'
 
485
            return self._tarfile_response(controldir_name, compression)
 
486
        finally:
 
487
            osutils.rmtree(tmp_dirname)
 
488
 
 
489
    def _copy_to_tempdir(self, from_repo):
 
490
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
 
491
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
 
492
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
 
493
        from_repo.copy_content_into(tmp_repo)
 
494
        return tmp_dirname, tmp_repo
 
495
 
 
496
    def _tarfile_response(self, tmp_dirname, compression):
 
497
        temp = tempfile.NamedTemporaryFile()
 
498
        try:
 
499
            self._tarball_of_dir(tmp_dirname, compression, temp.file)
 
500
            # all finished; write the tempfile out to the network
 
501
            temp.seek(0)
 
502
            return SuccessfulSmartServerResponse(('ok',), temp.read())
 
503
            # FIXME: Don't read the whole thing into memory here; rather stream
 
504
            # it out from the file onto the network. mbp 20070411
 
505
        finally:
 
506
            temp.close()
 
507
 
 
508
    def _tarball_of_dir(self, dirname, compression, ofile):
 
509
        filename = os.path.basename(ofile.name)
 
510
        tarball = tarfile.open(fileobj=ofile, name=filename,
 
511
            mode='w|' + compression)
 
512
        try:
 
513
            # The tarball module only accepts ascii names, and (i guess)
 
514
            # packs them with their 8bit names.  We know all the files
 
515
            # within the repository have ASCII names so the should be safe
 
516
            # to pack in.
 
517
            dirname = dirname.encode(sys.getfilesystemencoding())
 
518
            # python's tarball module includes the whole path by default so
 
519
            # override it
 
520
            if not dirname.endswith('.bzr'):
 
521
                raise ValueError(dirname)
 
522
            tarball.add(dirname, '.bzr') # recursive by default
 
523
        finally:
 
524
            tarball.close()
 
525
 
 
526
 
 
527
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
 
528
    """Insert a record stream from a RemoteSink into a repository.
 
529
 
 
530
    This gets bytes pushed to it by the network infrastructure and turns that
 
531
    into a bytes iterator using a thread. That is then processed by
 
532
    _byte_stream_to_stream.
 
533
    """
 
534
 
 
535
    def do_repository_request(self, repository, resume_tokens):
 
536
        """StreamSink.insert_stream for a remote repository."""
 
537
        repository.lock_write()
 
538
        tokens = [token for token in resume_tokens.split(' ') if token]
 
539
        self.tokens = tokens
 
540
        self.repository = repository
 
541
        self.queue = Queue.Queue()
 
542
        self.insert_thread = threading.Thread(target=self._inserter_thread)
 
543
        self.insert_thread.start()
 
544
 
 
545
    def do_chunk(self, body_stream_chunk):
 
546
        self.queue.put(body_stream_chunk)
 
547
 
 
548
    def _inserter_thread(self):
 
549
        try:
 
550
            src_format, stream = _byte_stream_to_stream(
 
551
                self.blocking_byte_stream())
 
552
            self.insert_result = self.repository._get_sink().insert_stream(
 
553
                stream, src_format, self.tokens)
 
554
            self.insert_ok = True
 
555
        except:
 
556
            self.insert_exception = sys.exc_info()
 
557
            self.insert_ok = False
 
558
 
 
559
    def blocking_byte_stream(self):
 
560
        while True:
 
561
            bytes = self.queue.get()
 
562
            if bytes is StopIteration:
 
563
                return
 
564
            else:
 
565
                yield bytes
 
566
 
 
567
    def do_end(self):
 
568
        self.queue.put(StopIteration)
 
569
        if self.insert_thread is not None:
 
570
            self.insert_thread.join()
 
571
        if not self.insert_ok:
 
572
            exc_info = self.insert_exception
 
573
            raise exc_info[0], exc_info[1], exc_info[2]
 
574
        write_group_tokens, missing_keys = self.insert_result
 
575
        if write_group_tokens or missing_keys:
 
576
            # bzip needed? missing keys should typically be a small set.
 
577
            # Should this be a streaming body response ?
 
578
            missing_keys = sorted(missing_keys)
 
579
            bytes = bencode.bencode((write_group_tokens, missing_keys))
 
580
            self.repository.unlock()
 
581
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
 
582
        else:
 
583
            self.repository.unlock()
 
584
            return SuccessfulSmartServerResponse(('ok', ))