/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

merge bzr.dev rev 4098

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
import bz2
20
20
import os
 
21
import Queue
21
22
import struct
22
23
import sys
23
24
import tarfile
24
25
import tempfile
25
26
import threading
26
 
import Queue
27
27
 
28
28
from bzrlib import (
29
29
    errors,
 
30
    graph,
30
31
    osutils,
31
32
    pack,
32
33
    )
39
40
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
40
41
from bzrlib import revision as _mod_revision
41
42
from bzrlib.util import bencode
42
 
from bzrlib.versionedfile import NetworkRecordStream
 
43
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
43
44
 
44
45
 
45
46
class SmartServerRepositoryRequest(SmartServerRequest):
70
71
        # is expected)
71
72
        return None
72
73
 
73
 
    def recreate_search(self, repository, recipe_bytes):
74
 
        lines = recipe_bytes.split('\n')
 
74
    def recreate_search(self, repository, search_bytes):
 
75
        lines = search_bytes.split('\n')
 
76
        if lines[0] == 'ancestry-of':
 
77
            heads = lines[1:]
 
78
            search_result = graph.PendingAncestryResult(heads, repository)
 
79
            return search_result, None
 
80
        elif lines[0] == 'search':
 
81
            return self.recreate_search_from_recipe(repository, lines[1:])
 
82
        else:
 
83
            return (None, FailedSmartServerResponse(('BadSearch',)))
 
84
 
 
85
    def recreate_search_from_recipe(self, repository, lines):
75
86
        start_keys = set(lines[0].split(' '))
76
87
        exclude_keys = set(lines[1].split(' '))
77
88
        revision_count = int(lines[2])
93
104
                # the excludes list considers ghosts and ensures that ghost
94
105
                # filling races are not a problem.
95
106
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
96
 
            return (search, None)
 
107
            return (search_result, None)
97
108
        finally:
98
109
            repository.unlock()
99
110
 
113
124
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
114
125
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
115
126
 
 
127
    no_extra_results = False
 
128
 
116
129
    def do_repository_request(self, repository, *revision_ids):
117
130
        """Get parent details for some revisions.
118
131
 
145
158
    def _do_repository_request(self, body_bytes):
146
159
        repository = self._repository
147
160
        revision_ids = set(self._revision_ids)
148
 
        search, error = self.recreate_search(repository, body_bytes)
 
161
        body_lines = body_bytes.split('\n')
 
162
        search_result, error = self.recreate_search_from_recipe(
 
163
            repository, body_lines)
149
164
        if error is not None:
150
165
            return error
151
166
        # TODO might be nice to start up the search again; but thats not
152
167
        # written or tested yet.
153
 
        client_seen_revs = set(search.get_result().get_keys())
 
168
        client_seen_revs = set(search_result.get_keys())
154
169
        # Always include the requested ids.
155
170
        client_seen_revs.difference_update(revision_ids)
156
171
        lines = []
180
195
            # 64K (compressed) or so. We do one level of depth at a time to
181
196
            # stay in sync with the client. The 250000 magic number is
182
197
            # estimated compression ratio taken from bzr.dev itself.
183
 
            if first_loop_done and size_so_far > 250000:
 
198
            if self.no_extra_results or (
 
199
                first_loop_done and size_so_far > 250000):
184
200
                next_revs = set()
185
201
                break
186
202
            # don't query things we've already queried
330
346
        return SuccessfulSmartServerResponse(('ok', token))
331
347
 
332
348
 
 
349
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
 
350
 
 
351
    def do_repository_request(self, repository, to_network_name):
 
352
        """Get a stream for inserting into a to_format repository.
 
353
 
 
354
        :param repository: The repository to stream from.
 
355
        :param to_network_name: The network name of the format of the target
 
356
            repository.
 
357
        """
 
358
        self._to_format = network_format_registry.get(to_network_name)
 
359
        return None # Signal that we want a body.
 
360
 
 
361
    def do_body(self, body_bytes):
 
362
        repository = self._repository
 
363
        repository.lock_read()
 
364
        try:
 
365
            search_result, error = self.recreate_search(repository, body_bytes)
 
366
            if error is not None:
 
367
                repository.unlock()
 
368
                return error
 
369
            source = repository._get_source(self._to_format)
 
370
            stream = source.get_stream(search_result)
 
371
        except Exception:
 
372
            exc_info = sys.exc_info()
 
373
            try:
 
374
                # On non-error, unlocking is done by the body stream handler.
 
375
                repository.unlock()
 
376
            finally:
 
377
                raise exc_info[0], exc_info[1], exc_info[2]
 
378
        return SuccessfulSmartServerResponse(('ok',),
 
379
            body_stream=self.body_stream(stream, repository))
 
380
 
 
381
    def body_stream(self, stream, repository):
 
382
        byte_stream = _stream_to_byte_stream(stream, repository._format)
 
383
        try:
 
384
            for bytes in byte_stream:
 
385
                yield bytes
 
386
        except errors.RevisionNotPresent, e:
 
387
            # This shouldn't be able to happen, but as we don't buffer
 
388
            # everything it can in theory happen.
 
389
            repository.unlock()
 
390
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
391
        else:
 
392
            repository.unlock()
 
393
 
 
394
 
 
395
def _stream_to_byte_stream(stream, src_format):
 
396
    """Convert a record stream to a self delimited byte stream."""
 
397
    pack_writer = pack.ContainerSerialiser()
 
398
    yield pack_writer.begin()
 
399
    yield pack_writer.bytes_record(src_format.network_name(), '')
 
400
    for substream_type, substream in stream:
 
401
        for record in substream:
 
402
            if record.storage_kind in ('chunked', 'fulltext'):
 
403
                serialised = record_to_fulltext_bytes(record)
 
404
            else:
 
405
                serialised = record.get_bytes_as(record.storage_kind)
 
406
            if serialised:
 
407
                # Some streams embed the whole stream into the wire
 
408
                # representation of the first record, which means that
 
409
                # later records have no wire representation: we skip them.
 
410
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
 
411
    yield pack_writer.end()
 
412
 
 
413
 
 
414
def _byte_stream_to_stream(byte_stream):
 
415
    """Convert a byte stream into a format and a stream.
 
416
 
 
417
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
 
418
    :return: (RepositoryFormat, stream_generator)
 
419
    """
 
420
    stream_decoder = pack.ContainerPushParser()
 
421
    def record_stream():
 
422
        """Closure to return the substreams."""
 
423
        # May have fully parsed records already.
 
424
        for record in stream_decoder.read_pending_records():
 
425
            record_names, record_bytes = record
 
426
            record_name, = record_names
 
427
            substream_type = record_name[0]
 
428
            substream = NetworkRecordStream([record_bytes])
 
429
            yield substream_type, substream.read()
 
430
        for bytes in byte_stream:
 
431
            stream_decoder.accept_bytes(bytes)
 
432
            for record in stream_decoder.read_pending_records():
 
433
                record_names, record_bytes = record
 
434
                record_name, = record_names
 
435
                substream_type = record_name[0]
 
436
                substream = NetworkRecordStream([record_bytes])
 
437
                yield substream_type, substream.read()
 
438
    for bytes in byte_stream:
 
439
        stream_decoder.accept_bytes(bytes)
 
440
        for record in stream_decoder.read_pending_records(max=1):
 
441
            record_names, src_format_name = record
 
442
            src_format = network_format_registry.get(src_format_name)
 
443
            return src_format, record_stream()
 
444
 
 
445
 
333
446
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
334
447
 
335
448
    def do_repository_request(self, repository, token):
412
525
 
413
526
 
414
527
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
 
528
    """Insert a record stream from a RemoteSink into a repository.
 
529
 
 
530
    This gets bytes pushed to it by the network infrastructure and turns that
 
531
    into a bytes iterator using a thread. That is then processed by
 
532
    _byte_stream_to_stream.
 
533
    """
415
534
 
416
535
    def do_repository_request(self, repository, resume_tokens):
417
536
        """StreamSink.insert_stream for a remote repository."""
418
537
        repository.lock_write()
419
538
        tokens = [token for token in resume_tokens.split(' ') if token]
420
 
        if tokens:
421
 
            repository.resume_write_group(tokens)
422
 
        else:
423
 
            repository.start_write_group()
 
539
        self.tokens = tokens
424
540
        self.repository = repository
425
 
        self.stream_decoder = pack.ContainerPushParser()
426
 
        self.src_format = None
427
541
        self.queue = Queue.Queue()
428
 
        self.insert_thread = None
 
542
        self.insert_thread = threading.Thread(target=self._inserter_thread)
 
543
        self.insert_thread.start()
429
544
 
430
545
    def do_chunk(self, body_stream_chunk):
431
 
        self.stream_decoder.accept_bytes(body_stream_chunk)
432
 
        for record in self.stream_decoder.read_pending_records():
433
 
            record_names, record_bytes = record
434
 
            if self.src_format is None:
435
 
                src_format_name = record_bytes
436
 
                src_format = network_format_registry.get(src_format_name)
437
 
                self.src_format = src_format
438
 
                self.insert_thread = threading.Thread(target=self._inserter_thread)
439
 
                self.insert_thread.start()
440
 
            else:
441
 
                record_name, = record_names
442
 
                substream_type = record_name[0]
443
 
                stream = NetworkRecordStream([record_bytes])
444
 
                for record in stream.read():
445
 
                    self.queue.put((substream_type, [record]))
 
546
        self.queue.put(body_stream_chunk)
446
547
 
447
548
    def _inserter_thread(self):
448
 
        self.repository._get_sink().insert_stream(self.blocking_read_stream(),
449
 
                self.src_format)
 
549
        try:
 
550
            src_format, stream = _byte_stream_to_stream(
 
551
                self.blocking_byte_stream())
 
552
            self.insert_result = self.repository._get_sink().insert_stream(
 
553
                stream, src_format, self.tokens)
 
554
            self.insert_ok = True
 
555
        except:
 
556
            self.insert_exception = sys.exc_info()
 
557
            self.insert_ok = False
450
558
 
451
 
    def blocking_read_stream(self):
 
559
    def blocking_byte_stream(self):
452
560
        while True:
453
 
            item = self.queue.get()
454
 
            if item is StopIteration:
 
561
            bytes = self.queue.get()
 
562
            if bytes is StopIteration:
455
563
                return
456
564
            else:
457
 
                yield item
 
565
                yield bytes
458
566
 
459
567
    def do_end(self):
460
568
        self.queue.put(StopIteration)
461
569
        if self.insert_thread is not None:
462
570
            self.insert_thread.join()
463
 
        try:
464
 
            missing_keys = set()
465
 
            for prefix, versioned_file in (
466
 
                ('texts', self.repository.texts),
467
 
                ('inventories', self.repository.inventories),
468
 
                ('revisions', self.repository.revisions),
469
 
                ('signatures', self.repository.signatures),
470
 
                ):
471
 
                missing_keys.update((prefix,) + key for key in
472
 
                    versioned_file.get_missing_compression_parent_keys())
473
 
        except NotImplementedError:
474
 
            # cannot even attempt suspending.
475
 
            pass
 
571
        if not self.insert_ok:
 
572
            exc_info = self.insert_exception
 
573
            raise exc_info[0], exc_info[1], exc_info[2]
 
574
        write_group_tokens, missing_keys = self.insert_result
 
575
        if write_group_tokens or missing_keys:
 
576
            # bzip needed? missing keys should typically be a small set.
 
577
            # Should this be a streaming body response ?
 
578
            missing_keys = sorted(missing_keys)
 
579
            bytes = bencode.bencode((write_group_tokens, missing_keys))
 
580
            self.repository.unlock()
 
581
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
476
582
        else:
477
 
            if missing_keys:
478
 
                # suspend the write group and tell the caller what we is
479
 
                # missing. We know we can suspend or else we would not have
480
 
                # entered this code path. (All repositories that can handle
481
 
                # missing keys can handle suspending a write group).
482
 
                write_group_tokens = self.repository.suspend_write_group()
483
 
                # bzip needed? missing keys should typically be a small set.
484
 
                # Should this be a streaming body response ?
485
 
                missing_keys = sorted(missing_keys)
486
 
                bytes = bencode.bencode((write_group_tokens, missing_keys))
487
 
                return SuccessfulSmartServerResponse(('missing-basis', bytes))
488
 
        # All finished.
489
 
        self.repository.commit_write_group()
490
 
        self.repository.unlock()
491
 
        return SuccessfulSmartServerResponse(('ok', ))
 
583
            self.repository.unlock()
 
584
            return SuccessfulSmartServerResponse(('ok', ))