/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

merge bzr.dev r4154

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
 
28
28
from bzrlib import (
29
29
    errors,
 
30
    graph,
30
31
    osutils,
31
32
    pack,
32
33
    )
39
40
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
40
41
from bzrlib import revision as _mod_revision
41
42
from bzrlib.util import bencode
42
 
from bzrlib.versionedfile import NetworkRecordStream
 
43
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
43
44
 
44
45
 
45
46
class SmartServerRepositoryRequest(SmartServerRequest):
70
71
        # is expected)
71
72
        return None
72
73
 
73
 
    def recreate_search(self, repository, recipe_bytes):
74
 
        lines = recipe_bytes.split('\n')
 
74
    def recreate_search(self, repository, search_bytes):
 
75
        lines = search_bytes.split('\n')
 
76
        if lines[0] == 'ancestry-of':
 
77
            heads = lines[1:]
 
78
            search_result = graph.PendingAncestryResult(heads, repository)
 
79
            return search_result, None
 
80
        elif lines[0] == 'search':
 
81
            return self.recreate_search_from_recipe(repository, lines[1:])
 
82
        else:
 
83
            return (None, FailedSmartServerResponse(('BadSearch',)))
 
84
 
 
85
    def recreate_search_from_recipe(self, repository, lines):
75
86
        start_keys = set(lines[0].split(' '))
76
87
        exclude_keys = set(lines[1].split(' '))
77
88
        revision_count = int(lines[2])
93
104
                # the excludes list considers ghosts and ensures that ghost
94
105
                # filling races are not a problem.
95
106
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
96
 
            return (search, None)
 
107
            return (search_result, None)
97
108
        finally:
98
109
            repository.unlock()
99
110
 
147
158
    def _do_repository_request(self, body_bytes):
148
159
        repository = self._repository
149
160
        revision_ids = set(self._revision_ids)
150
 
        search, error = self.recreate_search(repository, body_bytes)
 
161
        body_lines = body_bytes.split('\n')
 
162
        search_result, error = self.recreate_search_from_recipe(
 
163
            repository, body_lines)
151
164
        if error is not None:
152
165
            return error
153
166
        # TODO might be nice to start up the search again; but thats not
154
167
        # written or tested yet.
155
 
        client_seen_revs = set(search.get_result().get_keys())
 
168
        client_seen_revs = set(search_result.get_keys())
156
169
        # Always include the requested ids.
157
170
        client_seen_revs.difference_update(revision_ids)
158
171
        lines = []
333
346
        return SuccessfulSmartServerResponse(('ok', token))
334
347
 
335
348
 
 
349
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
 
350
 
 
351
    def do_repository_request(self, repository, to_network_name):
 
352
        """Get a stream for inserting into a to_format repository.
 
353
 
 
354
        :param repository: The repository to stream from.
 
355
        :param to_network_name: The network name of the format of the target
 
356
            repository.
 
357
        """
 
358
        self._to_format = network_format_registry.get(to_network_name)
 
359
        return None # Signal that we want a body.
 
360
 
 
361
    def do_body(self, body_bytes):
 
362
        repository = self._repository
 
363
        repository.lock_read()
 
364
        try:
 
365
            search_result, error = self.recreate_search(repository, body_bytes)
 
366
            if error is not None:
 
367
                repository.unlock()
 
368
                return error
 
369
            source = repository._get_source(self._to_format)
 
370
            stream = source.get_stream(search_result)
 
371
        except Exception:
 
372
            exc_info = sys.exc_info()
 
373
            try:
 
374
                # On non-error, unlocking is done by the body stream handler.
 
375
                repository.unlock()
 
376
            finally:
 
377
                raise exc_info[0], exc_info[1], exc_info[2]
 
378
        return SuccessfulSmartServerResponse(('ok',),
 
379
            body_stream=self.body_stream(stream, repository))
 
380
 
 
381
    def body_stream(self, stream, repository):
 
382
        byte_stream = _stream_to_byte_stream(stream, repository._format)
 
383
        try:
 
384
            for bytes in byte_stream:
 
385
                yield bytes
 
386
        except errors.RevisionNotPresent, e:
 
387
            # This shouldn't be able to happen, but as we don't buffer
 
388
            # everything it can in theory happen.
 
389
            repository.unlock()
 
390
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
391
        else:
 
392
            repository.unlock()
 
393
 
 
394
 
 
395
def _stream_to_byte_stream(stream, src_format):
 
396
    """Convert a record stream to a self delimited byte stream."""
 
397
    pack_writer = pack.ContainerSerialiser()
 
398
    yield pack_writer.begin()
 
399
    yield pack_writer.bytes_record(src_format.network_name(), '')
 
400
    for substream_type, substream in stream:
 
401
        for record in substream:
 
402
            if record.storage_kind in ('chunked', 'fulltext'):
 
403
                serialised = record_to_fulltext_bytes(record)
 
404
            else:
 
405
                serialised = record.get_bytes_as(record.storage_kind)
 
406
            if serialised:
 
407
                # Some streams embed the whole stream into the wire
 
408
                # representation of the first record, which means that
 
409
                # later records have no wire representation: we skip them.
 
410
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
 
411
    yield pack_writer.end()
 
412
 
 
413
 
 
414
def _byte_stream_to_stream(byte_stream):
 
415
    """Convert a byte stream into a format and a stream.
 
416
 
 
417
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
 
418
    :return: (RepositoryFormat, stream_generator)
 
419
    """
 
420
    stream_decoder = pack.ContainerPushParser()
 
421
    def record_stream():
 
422
        """Closure to return the substreams."""
 
423
        # May have fully parsed records already.
 
424
        for record in stream_decoder.read_pending_records():
 
425
            record_names, record_bytes = record
 
426
            record_name, = record_names
 
427
            substream_type = record_name[0]
 
428
            substream = NetworkRecordStream([record_bytes])
 
429
            yield substream_type, substream.read()
 
430
        for bytes in byte_stream:
 
431
            stream_decoder.accept_bytes(bytes)
 
432
            for record in stream_decoder.read_pending_records():
 
433
                record_names, record_bytes = record
 
434
                record_name, = record_names
 
435
                substream_type = record_name[0]
 
436
                substream = NetworkRecordStream([record_bytes])
 
437
                yield substream_type, substream.read()
 
438
    for bytes in byte_stream:
 
439
        stream_decoder.accept_bytes(bytes)
 
440
        for record in stream_decoder.read_pending_records(max=1):
 
441
            record_names, src_format_name = record
 
442
            src_format = network_format_registry.get(src_format_name)
 
443
            return src_format, record_stream()
 
444
 
 
445
 
336
446
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
337
447
 
338
448
    def do_repository_request(self, repository, token):
414
524
            tarball.close()
415
525
 
416
526
 
417
 
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
418
 
 
419
 
    def do_repository_request(self, repository, resume_tokens):
 
527
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
 
528
    """Insert a record stream from a RemoteSink into a repository.
 
529
 
 
530
    This gets bytes pushed to it by the network infrastructure and turns that
 
531
    into a bytes iterator using a thread. That is then processed by
 
532
    _byte_stream_to_stream.
 
533
 
 
534
    New in 1.14.
 
535
    """
 
536
 
 
537
    def do_repository_request(self, repository, resume_tokens, lock_token):
420
538
        """StreamSink.insert_stream for a remote repository."""
421
 
        repository.lock_write()
 
539
        repository.lock_write(token=lock_token)
 
540
        self.do_insert_stream_request(repository, resume_tokens)
 
541
 
 
542
    def do_insert_stream_request(self, repository, resume_tokens):
422
543
        tokens = [token for token in resume_tokens.split(' ') if token]
423
544
        self.tokens = tokens
424
545
        self.repository = repository
425
 
        self.stream_decoder = pack.ContainerPushParser()
426
 
        self.src_format = None
427
546
        self.queue = Queue.Queue()
428
 
        self.insert_thread = None
 
547
        self.insert_thread = threading.Thread(target=self._inserter_thread)
 
548
        self.insert_thread.start()
429
549
 
430
550
    def do_chunk(self, body_stream_chunk):
431
 
        self.stream_decoder.accept_bytes(body_stream_chunk)
432
 
        for record in self.stream_decoder.read_pending_records():
433
 
            record_names, record_bytes = record
434
 
            if self.src_format is None:
435
 
                src_format_name = record_bytes
436
 
                src_format = network_format_registry.get(src_format_name)
437
 
                self.src_format = src_format
438
 
                self.insert_thread = threading.Thread(target=self._inserter_thread)
439
 
                self.insert_thread.start()
440
 
            else:
441
 
                record_name, = record_names
442
 
                substream_type = record_name[0]
443
 
                stream = NetworkRecordStream([record_bytes])
444
 
                for record in stream.read():
445
 
                    self.queue.put((substream_type, [record]))
 
551
        self.queue.put(body_stream_chunk)
446
552
 
447
553
    def _inserter_thread(self):
448
554
        try:
 
555
            src_format, stream = _byte_stream_to_stream(
 
556
                self.blocking_byte_stream())
449
557
            self.insert_result = self.repository._get_sink().insert_stream(
450
 
                self.blocking_read_stream(), self.src_format, self.tokens)
 
558
                stream, src_format, self.tokens)
451
559
            self.insert_ok = True
452
560
        except:
453
561
            self.insert_exception = sys.exc_info()
454
562
            self.insert_ok = False
455
563
 
456
 
    def blocking_read_stream(self):
 
564
    def blocking_byte_stream(self):
457
565
        while True:
458
 
            item = self.queue.get()
459
 
            if item is StopIteration:
 
566
            bytes = self.queue.get()
 
567
            if bytes is StopIteration:
460
568
                return
461
569
            else:
462
 
                yield item
 
570
                yield bytes
463
571
 
464
572
    def do_end(self):
465
573
        self.queue.put(StopIteration)
479
587
        else:
480
588
            self.repository.unlock()
481
589
            return SuccessfulSmartServerResponse(('ok', ))
 
590
 
 
591
 
 
592
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
 
593
    """Insert a record stream from a RemoteSink into an unlocked repository.
 
594
 
 
595
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
 
596
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
 
597
    like pack format) repository.
 
598
 
 
599
    New in 1.13.
 
600
    """
 
601
 
 
602
    def do_repository_request(self, repository, resume_tokens):
 
603
        """StreamSink.insert_stream for a remote repository."""
 
604
        repository.lock_write()
 
605
        self.do_insert_stream_request(repository, resume_tokens)
 
606
 
 
607