/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Ian Clatworthy
  • Date: 2009-02-26 06:15:24 UTC
  • mto: (4157.1.1 ianc-integration)
  • mto: This revision was merged to the branch mainline in revision 4158.
  • Revision ID: ian.clatworthy@canonical.com-20090226061524-kpy3n8na3mk4ubuy
help xxx is full help; xxx -h is concise help

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Server-side repository related request implmentations."""
 
18
 
 
19
import bz2
 
20
import os
 
21
import Queue
 
22
import struct
 
23
import sys
 
24
import tarfile
 
25
import tempfile
 
26
import threading
 
27
 
 
28
from bzrlib import (
 
29
    errors,
 
30
    osutils,
 
31
    pack,
 
32
    )
 
33
from bzrlib.bzrdir import BzrDir
 
34
from bzrlib.smart.request import (
 
35
    FailedSmartServerResponse,
 
36
    SmartServerRequest,
 
37
    SuccessfulSmartServerResponse,
 
38
    )
 
39
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
 
40
from bzrlib import revision as _mod_revision
 
41
from bzrlib.util import bencode
 
42
from bzrlib.versionedfile import NetworkRecordStream
 
43
 
 
44
 
 
45
class SmartServerRepositoryRequest(SmartServerRequest):
 
46
    """Common base class for Repository requests."""
 
47
 
 
48
    def do(self, path, *args):
 
49
        """Execute a repository request.
 
50
 
 
51
        All Repository requests take a path to the repository as their first
 
52
        argument.  The repository must be at the exact path given by the
 
53
        client - no searching is done.
 
54
 
 
55
        The actual logic is delegated to self.do_repository_request.
 
56
 
 
57
        :param client_path: The path for the repository as received from the
 
58
            client.
 
59
        :return: A SmartServerResponse from self.do_repository_request().
 
60
        """
 
61
        transport = self.transport_from_client_path(path)
 
62
        bzrdir = BzrDir.open_from_transport(transport)
 
63
        # Save the repository for use with do_body.
 
64
        self._repository = bzrdir.open_repository()
 
65
        return self.do_repository_request(self._repository, *args)
 
66
 
 
67
    def do_repository_request(self, repository, *args):
 
68
        """Override to provide an implementation for a verb."""
 
69
        # No-op for verbs that take bodies (None as a result indicates a body
 
70
        # is expected)
 
71
        return None
 
72
 
 
73
    def recreate_search(self, repository, recipe_bytes):
 
74
        lines = recipe_bytes.split('\n')
 
75
        start_keys = set(lines[0].split(' '))
 
76
        exclude_keys = set(lines[1].split(' '))
 
77
        revision_count = int(lines[2])
 
78
        repository.lock_read()
 
79
        try:
 
80
            search = repository.get_graph()._make_breadth_first_searcher(
 
81
                start_keys)
 
82
            while True:
 
83
                try:
 
84
                    next_revs = search.next()
 
85
                except StopIteration:
 
86
                    break
 
87
                search.stop_searching_any(exclude_keys.intersection(next_revs))
 
88
            search_result = search.get_result()
 
89
            if search_result.get_recipe()[2] != revision_count:
 
90
                # we got back a different amount of data than expected, this
 
91
                # gets reported as NoSuchRevision, because less revisions
 
92
                # indicates missing revisions, and more should never happen as
 
93
                # the excludes list considers ghosts and ensures that ghost
 
94
                # filling races are not a problem.
 
95
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
96
            return (search, None)
 
97
        finally:
 
98
            repository.unlock()
 
99
 
 
100
 
 
101
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
 
102
    """Calls self.do_readlocked_repository_request."""
 
103
 
 
104
    def do_repository_request(self, repository, *args):
 
105
        """Read lock a repository for do_readlocked_repository_request."""
 
106
        repository.lock_read()
 
107
        try:
 
108
            return self.do_readlocked_repository_request(repository, *args)
 
109
        finally:
 
110
            repository.unlock()
 
111
 
 
112
 
 
113
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
 
114
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
 
115
 
 
116
    no_extra_results = False
 
117
 
 
118
    def do_repository_request(self, repository, *revision_ids):
 
119
        """Get parent details for some revisions.
 
120
 
 
121
        All the parents for revision_ids are returned. Additionally up to 64KB
 
122
        of additional parent data found by performing a breadth first search
 
123
        from revision_ids is returned. The verb takes a body containing the
 
124
        current search state, see do_body for details.
 
125
 
 
126
        :param repository: The repository to query in.
 
127
        :param revision_ids: The utf8 encoded revision_id to answer for.
 
128
        """
 
129
        self._revision_ids = revision_ids
 
130
        return None # Signal that we want a body.
 
131
 
 
132
    def do_body(self, body_bytes):
 
133
        """Process the current search state and perform the parent lookup.
 
134
 
 
135
        :return: A smart server response where the body contains an utf8
 
136
            encoded flattened list of the parents of the revisions (the same
 
137
            format as Repository.get_revision_graph) which has been bz2
 
138
            compressed.
 
139
        """
 
140
        repository = self._repository
 
141
        repository.lock_read()
 
142
        try:
 
143
            return self._do_repository_request(body_bytes)
 
144
        finally:
 
145
            repository.unlock()
 
146
 
 
147
    def _do_repository_request(self, body_bytes):
 
148
        repository = self._repository
 
149
        revision_ids = set(self._revision_ids)
 
150
        search, error = self.recreate_search(repository, body_bytes)
 
151
        if error is not None:
 
152
            return error
 
153
        # TODO might be nice to start up the search again; but thats not
 
154
        # written or tested yet.
 
155
        client_seen_revs = set(search.get_result().get_keys())
 
156
        # Always include the requested ids.
 
157
        client_seen_revs.difference_update(revision_ids)
 
158
        lines = []
 
159
        repo_graph = repository.get_graph()
 
160
        result = {}
 
161
        queried_revs = set()
 
162
        size_so_far = 0
 
163
        next_revs = revision_ids
 
164
        first_loop_done = False
 
165
        while next_revs:
 
166
            queried_revs.update(next_revs)
 
167
            parent_map = repo_graph.get_parent_map(next_revs)
 
168
            next_revs = set()
 
169
            for revision_id, parents in parent_map.iteritems():
 
170
                # adjust for the wire
 
171
                if parents == (_mod_revision.NULL_REVISION,):
 
172
                    parents = ()
 
173
                # prepare the next query
 
174
                next_revs.update(parents)
 
175
                if revision_id not in client_seen_revs:
 
176
                    # Client does not have this revision, give it to it.
 
177
                    # add parents to the result
 
178
                    result[revision_id] = parents
 
179
                    # Approximate the serialized cost of this revision_id.
 
180
                    size_so_far += 2 + len(revision_id) + sum(map(len, parents))
 
181
            # get all the directly asked for parents, and then flesh out to
 
182
            # 64K (compressed) or so. We do one level of depth at a time to
 
183
            # stay in sync with the client. The 250000 magic number is
 
184
            # estimated compression ratio taken from bzr.dev itself.
 
185
            if self.no_extra_results or (
 
186
                first_loop_done and size_so_far > 250000):
 
187
                next_revs = set()
 
188
                break
 
189
            # don't query things we've already queried
 
190
            next_revs.difference_update(queried_revs)
 
191
            first_loop_done = True
 
192
 
 
193
        # sorting trivially puts lexographically similar revision ids together.
 
194
        # Compression FTW.
 
195
        for revision, parents in sorted(result.items()):
 
196
            lines.append(' '.join((revision, ) + tuple(parents)))
 
197
 
 
198
        return SuccessfulSmartServerResponse(
 
199
            ('ok', ), bz2.compress('\n'.join(lines)))
 
200
 
 
201
 
 
202
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
 
203
 
 
204
    def do_readlocked_repository_request(self, repository, revision_id):
 
205
        """Return the result of repository.get_revision_graph(revision_id).
 
206
 
 
207
        Deprecated as of bzr 1.4, but supported for older clients.
 
208
 
 
209
        :param repository: The repository to query in.
 
210
        :param revision_id: The utf8 encoded revision_id to get a graph from.
 
211
        :return: A smart server response where the body contains an utf8
 
212
            encoded flattened list of the revision graph.
 
213
        """
 
214
        if not revision_id:
 
215
            revision_id = None
 
216
 
 
217
        lines = []
 
218
        graph = repository.get_graph()
 
219
        if revision_id:
 
220
            search_ids = [revision_id]
 
221
        else:
 
222
            search_ids = repository.all_revision_ids()
 
223
        search = graph._make_breadth_first_searcher(search_ids)
 
224
        transitive_ids = set()
 
225
        map(transitive_ids.update, list(search))
 
226
        parent_map = graph.get_parent_map(transitive_ids)
 
227
        revision_graph = _strip_NULL_ghosts(parent_map)
 
228
        if revision_id and revision_id not in revision_graph:
 
229
            # Note that we return an empty body, rather than omitting the body.
 
230
            # This way the client knows that it can always expect to find a body
 
231
            # in the response for this method, even in the error case.
 
232
            return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
 
233
 
 
234
        for revision, parents in revision_graph.items():
 
235
            lines.append(' '.join((revision, ) + tuple(parents)))
 
236
 
 
237
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
 
238
 
 
239
 
 
240
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
 
241
 
 
242
    def do_repository_request(self, repository, revision_id):
 
243
        """Return ok if a specific revision is in the repository at path.
 
244
 
 
245
        :param repository: The repository to query in.
 
246
        :param revision_id: The utf8 encoded revision_id to lookup.
 
247
        :return: A smart server response of ('ok', ) if the revision is
 
248
            present.
 
249
        """
 
250
        if repository.has_revision(revision_id):
 
251
            return SuccessfulSmartServerResponse(('yes', ))
 
252
        else:
 
253
            return SuccessfulSmartServerResponse(('no', ))
 
254
 
 
255
 
 
256
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
 
257
 
 
258
    def do_repository_request(self, repository, revid, committers):
 
259
        """Return the result of repository.gather_stats().
 
260
 
 
261
        :param repository: The repository to query in.
 
262
        :param revid: utf8 encoded rev id or an empty string to indicate None
 
263
        :param committers: 'yes' or 'no'.
 
264
 
 
265
        :return: A SmartServerResponse ('ok',), a encoded body looking like
 
266
              committers: 1
 
267
              firstrev: 1234.230 0
 
268
              latestrev: 345.700 3600
 
269
              revisions: 2
 
270
 
 
271
              But containing only fields returned by the gather_stats() call
 
272
        """
 
273
        if revid == '':
 
274
            decoded_revision_id = None
 
275
        else:
 
276
            decoded_revision_id = revid
 
277
        if committers == 'yes':
 
278
            decoded_committers = True
 
279
        else:
 
280
            decoded_committers = None
 
281
        stats = repository.gather_stats(decoded_revision_id, decoded_committers)
 
282
 
 
283
        body = ''
 
284
        if stats.has_key('committers'):
 
285
            body += 'committers: %d\n' % stats['committers']
 
286
        if stats.has_key('firstrev'):
 
287
            body += 'firstrev: %.3f %d\n' % stats['firstrev']
 
288
        if stats.has_key('latestrev'):
 
289
             body += 'latestrev: %.3f %d\n' % stats['latestrev']
 
290
        if stats.has_key('revisions'):
 
291
            body += 'revisions: %d\n' % stats['revisions']
 
292
        if stats.has_key('size'):
 
293
            body += 'size: %d\n' % stats['size']
 
294
 
 
295
        return SuccessfulSmartServerResponse(('ok', ), body)
 
296
 
 
297
 
 
298
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
 
299
 
 
300
    def do_repository_request(self, repository):
 
301
        """Return the result of repository.is_shared().
 
302
 
 
303
        :param repository: The repository to query in.
 
304
        :return: A smart server response of ('yes', ) if the repository is
 
305
            shared, and ('no', ) if it is not.
 
306
        """
 
307
        if repository.is_shared():
 
308
            return SuccessfulSmartServerResponse(('yes', ))
 
309
        else:
 
310
            return SuccessfulSmartServerResponse(('no', ))
 
311
 
 
312
 
 
313
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
 
314
 
 
315
    def do_repository_request(self, repository, token=''):
 
316
        # XXX: this probably should not have a token.
 
317
        if token == '':
 
318
            token = None
 
319
        try:
 
320
            token = repository.lock_write(token=token)
 
321
        except errors.LockContention, e:
 
322
            return FailedSmartServerResponse(('LockContention',))
 
323
        except errors.UnlockableTransport:
 
324
            return FailedSmartServerResponse(('UnlockableTransport',))
 
325
        except errors.LockFailed, e:
 
326
            return FailedSmartServerResponse(('LockFailed',
 
327
                str(e.lock), str(e.why)))
 
328
        if token is not None:
 
329
            repository.leave_lock_in_place()
 
330
        repository.unlock()
 
331
        if token is None:
 
332
            token = ''
 
333
        return SuccessfulSmartServerResponse(('ok', token))
 
334
 
 
335
 
 
336
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
 
337
 
 
338
    def do_repository_request(self, repository, token):
 
339
        try:
 
340
            repository.lock_write(token=token)
 
341
        except errors.TokenMismatch, e:
 
342
            return FailedSmartServerResponse(('TokenMismatch',))
 
343
        repository.dont_leave_lock_in_place()
 
344
        repository.unlock()
 
345
        return SuccessfulSmartServerResponse(('ok',))
 
346
 
 
347
 
 
348
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
 
349
 
 
350
    def do_repository_request(self, repository, str_bool_new_value):
 
351
        if str_bool_new_value == 'True':
 
352
            new_value = True
 
353
        else:
 
354
            new_value = False
 
355
        repository.set_make_working_trees(new_value)
 
356
        return SuccessfulSmartServerResponse(('ok',))
 
357
 
 
358
 
 
359
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
 
360
    """Get the raw repository files as a tarball.
 
361
 
 
362
    The returned tarball contains a .bzr control directory which in turn
 
363
    contains a repository.
 
364
 
 
365
    This takes one parameter, compression, which currently must be
 
366
    "", "gz", or "bz2".
 
367
 
 
368
    This is used to implement the Repository.copy_content_into operation.
 
369
    """
 
370
 
 
371
    def do_repository_request(self, repository, compression):
 
372
        tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
 
373
        try:
 
374
            controldir_name = tmp_dirname + '/.bzr'
 
375
            return self._tarfile_response(controldir_name, compression)
 
376
        finally:
 
377
            osutils.rmtree(tmp_dirname)
 
378
 
 
379
    def _copy_to_tempdir(self, from_repo):
 
380
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
 
381
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
 
382
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
 
383
        from_repo.copy_content_into(tmp_repo)
 
384
        return tmp_dirname, tmp_repo
 
385
 
 
386
    def _tarfile_response(self, tmp_dirname, compression):
 
387
        temp = tempfile.NamedTemporaryFile()
 
388
        try:
 
389
            self._tarball_of_dir(tmp_dirname, compression, temp.file)
 
390
            # all finished; write the tempfile out to the network
 
391
            temp.seek(0)
 
392
            return SuccessfulSmartServerResponse(('ok',), temp.read())
 
393
            # FIXME: Don't read the whole thing into memory here; rather stream
 
394
            # it out from the file onto the network. mbp 20070411
 
395
        finally:
 
396
            temp.close()
 
397
 
 
398
    def _tarball_of_dir(self, dirname, compression, ofile):
 
399
        filename = os.path.basename(ofile.name)
 
400
        tarball = tarfile.open(fileobj=ofile, name=filename,
 
401
            mode='w|' + compression)
 
402
        try:
 
403
            # The tarball module only accepts ascii names, and (i guess)
 
404
            # packs them with their 8bit names.  We know all the files
 
405
            # within the repository have ASCII names so the should be safe
 
406
            # to pack in.
 
407
            dirname = dirname.encode(sys.getfilesystemencoding())
 
408
            # python's tarball module includes the whole path by default so
 
409
            # override it
 
410
            if not dirname.endswith('.bzr'):
 
411
                raise ValueError(dirname)
 
412
            tarball.add(dirname, '.bzr') # recursive by default
 
413
        finally:
 
414
            tarball.close()
 
415
 
 
416
 
 
417
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
 
418
 
 
419
    def do_repository_request(self, repository, resume_tokens):
 
420
        """StreamSink.insert_stream for a remote repository."""
 
421
        repository.lock_write()
 
422
        tokens = [token for token in resume_tokens.split(' ') if token]
 
423
        self.tokens = tokens
 
424
        self.repository = repository
 
425
        self.stream_decoder = pack.ContainerPushParser()
 
426
        self.src_format = None
 
427
        self.queue = Queue.Queue()
 
428
        self.insert_thread = None
 
429
 
 
430
    def do_chunk(self, body_stream_chunk):
 
431
        self.stream_decoder.accept_bytes(body_stream_chunk)
 
432
        for record in self.stream_decoder.read_pending_records():
 
433
            record_names, record_bytes = record
 
434
            if self.src_format is None:
 
435
                src_format_name = record_bytes
 
436
                src_format = network_format_registry.get(src_format_name)
 
437
                self.src_format = src_format
 
438
                self.insert_thread = threading.Thread(target=self._inserter_thread)
 
439
                self.insert_thread.start()
 
440
            else:
 
441
                record_name, = record_names
 
442
                substream_type = record_name[0]
 
443
                stream = NetworkRecordStream([record_bytes])
 
444
                for record in stream.read():
 
445
                    self.queue.put((substream_type, [record]))
 
446
 
 
447
    def _inserter_thread(self):
 
448
        try:
 
449
            self.insert_result = self.repository._get_sink().insert_stream(
 
450
                self.blocking_read_stream(), self.src_format, self.tokens)
 
451
            self.insert_ok = True
 
452
        except:
 
453
            self.insert_exception = sys.exc_info()
 
454
            self.insert_ok = False
 
455
 
 
456
    def blocking_read_stream(self):
 
457
        while True:
 
458
            item = self.queue.get()
 
459
            if item is StopIteration:
 
460
                return
 
461
            else:
 
462
                yield item
 
463
 
 
464
    def do_end(self):
 
465
        self.queue.put(StopIteration)
 
466
        if self.insert_thread is not None:
 
467
            self.insert_thread.join()
 
468
        if not self.insert_ok:
 
469
            exc_info = self.insert_exception
 
470
            raise exc_info[0], exc_info[1], exc_info[2]
 
471
        write_group_tokens, missing_keys = self.insert_result
 
472
        if write_group_tokens or missing_keys:
 
473
            # bzip needed? missing keys should typically be a small set.
 
474
            # Should this be a streaming body response ?
 
475
            missing_keys = sorted(missing_keys)
 
476
            bytes = bencode.bencode((write_group_tokens, missing_keys))
 
477
            self.repository.unlock()
 
478
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
 
479
        else:
 
480
            self.repository.unlock()
 
481
            return SuccessfulSmartServerResponse(('ok', ))