/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-02-24 10:31:49 UTC
  • mfrom: (4037.1.1 bzr.integration)
  • Revision ID: pqm@pqm.ubuntu.com-20090224103149-b9a60tx1qy68jtcj
(vila) Catching-up with review tweaks

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Server-side repository related request implmentations."""
 
18
 
 
19
import bz2
 
20
import os
 
21
import struct
 
22
import sys
 
23
import tarfile
 
24
import tempfile
 
25
import threading
 
26
import Queue
 
27
 
 
28
from bzrlib import (
 
29
    errors,
 
30
    osutils,
 
31
    pack,
 
32
    )
 
33
from bzrlib.bzrdir import BzrDir
 
34
from bzrlib.smart.request import (
 
35
    FailedSmartServerResponse,
 
36
    SmartServerRequest,
 
37
    SuccessfulSmartServerResponse,
 
38
    )
 
39
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
 
40
from bzrlib import revision as _mod_revision
 
41
from bzrlib.util import bencode
 
42
from bzrlib.versionedfile import NetworkRecordStream
 
43
 
 
44
 
 
45
class SmartServerRepositoryRequest(SmartServerRequest):
 
46
    """Common base class for Repository requests."""
 
47
 
 
48
    def do(self, path, *args):
 
49
        """Execute a repository request.
 
50
 
 
51
        All Repository requests take a path to the repository as their first
 
52
        argument.  The repository must be at the exact path given by the
 
53
        client - no searching is done.
 
54
 
 
55
        The actual logic is delegated to self.do_repository_request.
 
56
 
 
57
        :param client_path: The path for the repository as received from the
 
58
            client.
 
59
        :return: A SmartServerResponse from self.do_repository_request().
 
60
        """
 
61
        transport = self.transport_from_client_path(path)
 
62
        bzrdir = BzrDir.open_from_transport(transport)
 
63
        # Save the repository for use with do_body.
 
64
        self._repository = bzrdir.open_repository()
 
65
        return self.do_repository_request(self._repository, *args)
 
66
 
 
67
    def do_repository_request(self, repository, *args):
 
68
        """Override to provide an implementation for a verb."""
 
69
        # No-op for verbs that take bodies (None as a result indicates a body
 
70
        # is expected)
 
71
        return None
 
72
 
 
73
    def recreate_search(self, repository, recipe_bytes):
 
74
        lines = recipe_bytes.split('\n')
 
75
        start_keys = set(lines[0].split(' '))
 
76
        exclude_keys = set(lines[1].split(' '))
 
77
        revision_count = int(lines[2])
 
78
        repository.lock_read()
 
79
        try:
 
80
            search = repository.get_graph()._make_breadth_first_searcher(
 
81
                start_keys)
 
82
            while True:
 
83
                try:
 
84
                    next_revs = search.next()
 
85
                except StopIteration:
 
86
                    break
 
87
                search.stop_searching_any(exclude_keys.intersection(next_revs))
 
88
            search_result = search.get_result()
 
89
            if search_result.get_recipe()[2] != revision_count:
 
90
                # we got back a different amount of data than expected, this
 
91
                # gets reported as NoSuchRevision, because less revisions
 
92
                # indicates missing revisions, and more should never happen as
 
93
                # the excludes list considers ghosts and ensures that ghost
 
94
                # filling races are not a problem.
 
95
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
96
            return (search, None)
 
97
        finally:
 
98
            repository.unlock()
 
99
 
 
100
 
 
101
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
 
102
    """Calls self.do_readlocked_repository_request."""
 
103
 
 
104
    def do_repository_request(self, repository, *args):
 
105
        """Read lock a repository for do_readlocked_repository_request."""
 
106
        repository.lock_read()
 
107
        try:
 
108
            return self.do_readlocked_repository_request(repository, *args)
 
109
        finally:
 
110
            repository.unlock()
 
111
 
 
112
 
 
113
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
 
114
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
 
115
 
 
116
    def do_repository_request(self, repository, *revision_ids):
 
117
        """Get parent details for some revisions.
 
118
 
 
119
        All the parents for revision_ids are returned. Additionally up to 64KB
 
120
        of additional parent data found by performing a breadth first search
 
121
        from revision_ids is returned. The verb takes a body containing the
 
122
        current search state, see do_body for details.
 
123
 
 
124
        :param repository: The repository to query in.
 
125
        :param revision_ids: The utf8 encoded revision_id to answer for.
 
126
        """
 
127
        self._revision_ids = revision_ids
 
128
        return None # Signal that we want a body.
 
129
 
 
130
    def do_body(self, body_bytes):
 
131
        """Process the current search state and perform the parent lookup.
 
132
 
 
133
        :return: A smart server response where the body contains an utf8
 
134
            encoded flattened list of the parents of the revisions (the same
 
135
            format as Repository.get_revision_graph) which has been bz2
 
136
            compressed.
 
137
        """
 
138
        repository = self._repository
 
139
        repository.lock_read()
 
140
        try:
 
141
            return self._do_repository_request(body_bytes)
 
142
        finally:
 
143
            repository.unlock()
 
144
 
 
145
    def _do_repository_request(self, body_bytes):
 
146
        repository = self._repository
 
147
        revision_ids = set(self._revision_ids)
 
148
        search, error = self.recreate_search(repository, body_bytes)
 
149
        if error is not None:
 
150
            return error
 
151
        # TODO might be nice to start up the search again; but thats not
 
152
        # written or tested yet.
 
153
        client_seen_revs = set(search.get_result().get_keys())
 
154
        # Always include the requested ids.
 
155
        client_seen_revs.difference_update(revision_ids)
 
156
        lines = []
 
157
        repo_graph = repository.get_graph()
 
158
        result = {}
 
159
        queried_revs = set()
 
160
        size_so_far = 0
 
161
        next_revs = revision_ids
 
162
        first_loop_done = False
 
163
        while next_revs:
 
164
            queried_revs.update(next_revs)
 
165
            parent_map = repo_graph.get_parent_map(next_revs)
 
166
            next_revs = set()
 
167
            for revision_id, parents in parent_map.iteritems():
 
168
                # adjust for the wire
 
169
                if parents == (_mod_revision.NULL_REVISION,):
 
170
                    parents = ()
 
171
                # prepare the next query
 
172
                next_revs.update(parents)
 
173
                if revision_id not in client_seen_revs:
 
174
                    # Client does not have this revision, give it to it.
 
175
                    # add parents to the result
 
176
                    result[revision_id] = parents
 
177
                    # Approximate the serialized cost of this revision_id.
 
178
                    size_so_far += 2 + len(revision_id) + sum(map(len, parents))
 
179
            # get all the directly asked for parents, and then flesh out to
 
180
            # 64K (compressed) or so. We do one level of depth at a time to
 
181
            # stay in sync with the client. The 250000 magic number is
 
182
            # estimated compression ratio taken from bzr.dev itself.
 
183
            if first_loop_done and size_so_far > 250000:
 
184
                next_revs = set()
 
185
                break
 
186
            # don't query things we've already queried
 
187
            next_revs.difference_update(queried_revs)
 
188
            first_loop_done = True
 
189
 
 
190
        # sorting trivially puts lexographically similar revision ids together.
 
191
        # Compression FTW.
 
192
        for revision, parents in sorted(result.items()):
 
193
            lines.append(' '.join((revision, ) + tuple(parents)))
 
194
 
 
195
        return SuccessfulSmartServerResponse(
 
196
            ('ok', ), bz2.compress('\n'.join(lines)))
 
197
 
 
198
 
 
199
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
 
200
 
 
201
    def do_readlocked_repository_request(self, repository, revision_id):
 
202
        """Return the result of repository.get_revision_graph(revision_id).
 
203
 
 
204
        Deprecated as of bzr 1.4, but supported for older clients.
 
205
 
 
206
        :param repository: The repository to query in.
 
207
        :param revision_id: The utf8 encoded revision_id to get a graph from.
 
208
        :return: A smart server response where the body contains an utf8
 
209
            encoded flattened list of the revision graph.
 
210
        """
 
211
        if not revision_id:
 
212
            revision_id = None
 
213
 
 
214
        lines = []
 
215
        graph = repository.get_graph()
 
216
        if revision_id:
 
217
            search_ids = [revision_id]
 
218
        else:
 
219
            search_ids = repository.all_revision_ids()
 
220
        search = graph._make_breadth_first_searcher(search_ids)
 
221
        transitive_ids = set()
 
222
        map(transitive_ids.update, list(search))
 
223
        parent_map = graph.get_parent_map(transitive_ids)
 
224
        revision_graph = _strip_NULL_ghosts(parent_map)
 
225
        if revision_id and revision_id not in revision_graph:
 
226
            # Note that we return an empty body, rather than omitting the body.
 
227
            # This way the client knows that it can always expect to find a body
 
228
            # in the response for this method, even in the error case.
 
229
            return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
 
230
 
 
231
        for revision, parents in revision_graph.items():
 
232
            lines.append(' '.join((revision, ) + tuple(parents)))
 
233
 
 
234
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
 
235
 
 
236
 
 
237
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
 
238
 
 
239
    def do_repository_request(self, repository, revision_id):
 
240
        """Return ok if a specific revision is in the repository at path.
 
241
 
 
242
        :param repository: The repository to query in.
 
243
        :param revision_id: The utf8 encoded revision_id to lookup.
 
244
        :return: A smart server response of ('ok', ) if the revision is
 
245
            present.
 
246
        """
 
247
        if repository.has_revision(revision_id):
 
248
            return SuccessfulSmartServerResponse(('yes', ))
 
249
        else:
 
250
            return SuccessfulSmartServerResponse(('no', ))
 
251
 
 
252
 
 
253
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
 
254
 
 
255
    def do_repository_request(self, repository, revid, committers):
 
256
        """Return the result of repository.gather_stats().
 
257
 
 
258
        :param repository: The repository to query in.
 
259
        :param revid: utf8 encoded rev id or an empty string to indicate None
 
260
        :param committers: 'yes' or 'no'.
 
261
 
 
262
        :return: A SmartServerResponse ('ok',), a encoded body looking like
 
263
              committers: 1
 
264
              firstrev: 1234.230 0
 
265
              latestrev: 345.700 3600
 
266
              revisions: 2
 
267
 
 
268
              But containing only fields returned by the gather_stats() call
 
269
        """
 
270
        if revid == '':
 
271
            decoded_revision_id = None
 
272
        else:
 
273
            decoded_revision_id = revid
 
274
        if committers == 'yes':
 
275
            decoded_committers = True
 
276
        else:
 
277
            decoded_committers = None
 
278
        stats = repository.gather_stats(decoded_revision_id, decoded_committers)
 
279
 
 
280
        body = ''
 
281
        if stats.has_key('committers'):
 
282
            body += 'committers: %d\n' % stats['committers']
 
283
        if stats.has_key('firstrev'):
 
284
            body += 'firstrev: %.3f %d\n' % stats['firstrev']
 
285
        if stats.has_key('latestrev'):
 
286
             body += 'latestrev: %.3f %d\n' % stats['latestrev']
 
287
        if stats.has_key('revisions'):
 
288
            body += 'revisions: %d\n' % stats['revisions']
 
289
        if stats.has_key('size'):
 
290
            body += 'size: %d\n' % stats['size']
 
291
 
 
292
        return SuccessfulSmartServerResponse(('ok', ), body)
 
293
 
 
294
 
 
295
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
 
296
 
 
297
    def do_repository_request(self, repository):
 
298
        """Return the result of repository.is_shared().
 
299
 
 
300
        :param repository: The repository to query in.
 
301
        :return: A smart server response of ('yes', ) if the repository is
 
302
            shared, and ('no', ) if it is not.
 
303
        """
 
304
        if repository.is_shared():
 
305
            return SuccessfulSmartServerResponse(('yes', ))
 
306
        else:
 
307
            return SuccessfulSmartServerResponse(('no', ))
 
308
 
 
309
 
 
310
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
 
311
 
 
312
    def do_repository_request(self, repository, token=''):
 
313
        # XXX: this probably should not have a token.
 
314
        if token == '':
 
315
            token = None
 
316
        try:
 
317
            token = repository.lock_write(token=token)
 
318
        except errors.LockContention, e:
 
319
            return FailedSmartServerResponse(('LockContention',))
 
320
        except errors.UnlockableTransport:
 
321
            return FailedSmartServerResponse(('UnlockableTransport',))
 
322
        except errors.LockFailed, e:
 
323
            return FailedSmartServerResponse(('LockFailed',
 
324
                str(e.lock), str(e.why)))
 
325
        if token is not None:
 
326
            repository.leave_lock_in_place()
 
327
        repository.unlock()
 
328
        if token is None:
 
329
            token = ''
 
330
        return SuccessfulSmartServerResponse(('ok', token))
 
331
 
 
332
 
 
333
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
 
334
 
 
335
    def do_repository_request(self, repository, token):
 
336
        try:
 
337
            repository.lock_write(token=token)
 
338
        except errors.TokenMismatch, e:
 
339
            return FailedSmartServerResponse(('TokenMismatch',))
 
340
        repository.dont_leave_lock_in_place()
 
341
        repository.unlock()
 
342
        return SuccessfulSmartServerResponse(('ok',))
 
343
 
 
344
 
 
345
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
 
346
 
 
347
    def do_repository_request(self, repository, str_bool_new_value):
 
348
        if str_bool_new_value == 'True':
 
349
            new_value = True
 
350
        else:
 
351
            new_value = False
 
352
        repository.set_make_working_trees(new_value)
 
353
        return SuccessfulSmartServerResponse(('ok',))
 
354
 
 
355
 
 
356
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
 
357
    """Get the raw repository files as a tarball.
 
358
 
 
359
    The returned tarball contains a .bzr control directory which in turn
 
360
    contains a repository.
 
361
 
 
362
    This takes one parameter, compression, which currently must be
 
363
    "", "gz", or "bz2".
 
364
 
 
365
    This is used to implement the Repository.copy_content_into operation.
 
366
    """
 
367
 
 
368
    def do_repository_request(self, repository, compression):
 
369
        tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
 
370
        try:
 
371
            controldir_name = tmp_dirname + '/.bzr'
 
372
            return self._tarfile_response(controldir_name, compression)
 
373
        finally:
 
374
            osutils.rmtree(tmp_dirname)
 
375
 
 
376
    def _copy_to_tempdir(self, from_repo):
 
377
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
 
378
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
 
379
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
 
380
        from_repo.copy_content_into(tmp_repo)
 
381
        return tmp_dirname, tmp_repo
 
382
 
 
383
    def _tarfile_response(self, tmp_dirname, compression):
 
384
        temp = tempfile.NamedTemporaryFile()
 
385
        try:
 
386
            self._tarball_of_dir(tmp_dirname, compression, temp.file)
 
387
            # all finished; write the tempfile out to the network
 
388
            temp.seek(0)
 
389
            return SuccessfulSmartServerResponse(('ok',), temp.read())
 
390
            # FIXME: Don't read the whole thing into memory here; rather stream
 
391
            # it out from the file onto the network. mbp 20070411
 
392
        finally:
 
393
            temp.close()
 
394
 
 
395
    def _tarball_of_dir(self, dirname, compression, ofile):
 
396
        filename = os.path.basename(ofile.name)
 
397
        tarball = tarfile.open(fileobj=ofile, name=filename,
 
398
            mode='w|' + compression)
 
399
        try:
 
400
            # The tarball module only accepts ascii names, and (i guess)
 
401
            # packs them with their 8bit names.  We know all the files
 
402
            # within the repository have ASCII names so the should be safe
 
403
            # to pack in.
 
404
            dirname = dirname.encode(sys.getfilesystemencoding())
 
405
            # python's tarball module includes the whole path by default so
 
406
            # override it
 
407
            if not dirname.endswith('.bzr'):
 
408
                raise ValueError(dirname)
 
409
            tarball.add(dirname, '.bzr') # recursive by default
 
410
        finally:
 
411
            tarball.close()
 
412
 
 
413
 
 
414
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
 
415
 
 
416
    def do_repository_request(self, repository, resume_tokens):
 
417
        """StreamSink.insert_stream for a remote repository."""
 
418
        repository.lock_write()
 
419
        tokens = [token for token in resume_tokens.split(' ') if token]
 
420
        if tokens:
 
421
            repository.resume_write_group(tokens)
 
422
        else:
 
423
            repository.start_write_group()
 
424
        self.repository = repository
 
425
        self.stream_decoder = pack.ContainerPushParser()
 
426
        self.src_format = None
 
427
        self.queue = Queue.Queue()
 
428
        self.insert_thread = None
 
429
 
 
430
    def do_chunk(self, body_stream_chunk):
 
431
        self.stream_decoder.accept_bytes(body_stream_chunk)
 
432
        for record in self.stream_decoder.read_pending_records():
 
433
            record_names, record_bytes = record
 
434
            if self.src_format is None:
 
435
                src_format_name = record_bytes
 
436
                src_format = network_format_registry.get(src_format_name)
 
437
                self.src_format = src_format
 
438
                self.insert_thread = threading.Thread(target=self._inserter_thread)
 
439
                self.insert_thread.start()
 
440
            else:
 
441
                record_name, = record_names
 
442
                substream_type = record_name[0]
 
443
                stream = NetworkRecordStream([record_bytes])
 
444
                for record in stream.read():
 
445
                    self.queue.put((substream_type, [record]))
 
446
 
 
447
    def _inserter_thread(self):
 
448
        self.repository._get_sink().insert_stream(self.blocking_read_stream(),
 
449
                self.src_format)
 
450
 
 
451
    def blocking_read_stream(self):
 
452
        while True:
 
453
            item = self.queue.get()
 
454
            if item is StopIteration:
 
455
                return
 
456
            else:
 
457
                yield item
 
458
 
 
459
    def do_end(self):
 
460
        self.queue.put(StopIteration)
 
461
        if self.insert_thread is not None:
 
462
            self.insert_thread.join()
 
463
        try:
 
464
            missing_keys = set()
 
465
            for prefix, versioned_file in (
 
466
                ('texts', self.repository.texts),
 
467
                ('inventories', self.repository.inventories),
 
468
                ('revisions', self.repository.revisions),
 
469
                ('signatures', self.repository.signatures),
 
470
                ):
 
471
                missing_keys.update((prefix,) + key for key in
 
472
                    versioned_file.get_missing_compression_parent_keys())
 
473
        except NotImplementedError:
 
474
            # cannot even attempt suspending.
 
475
            pass
 
476
        else:
 
477
            if missing_keys:
 
478
                # suspend the write group and tell the caller what we is
 
479
                # missing. We know we can suspend or else we would not have
 
480
                # entered this code path. (All repositories that can handle
 
481
                # missing keys can handle suspending a write group).
 
482
                write_group_tokens = self.repository.suspend_write_group()
 
483
                # bzip needed? missing keys should typically be a small set.
 
484
                # Should this be a streaming body response ?
 
485
                missing_keys = sorted(missing_keys)
 
486
                bytes = bencode.bencode((write_group_tokens, missing_keys))
 
487
                return SuccessfulSmartServerResponse(('missing-basis', bytes))
 
488
        # All finished.
 
489
        self.repository.commit_write_group()
 
490
        self.repository.unlock()
 
491
        return SuccessfulSmartServerResponse(('ok', ))