1
# Copyright (C) 2006-2012 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
from __future__ import absolute_import
26
config as _mod_config,
36
repository as _mod_repository,
37
revision as _mod_revision,
39
testament as _mod_testament,
44
bzrdir as _mod_bzrdir,
49
from .branch import BranchReferenceFormat
50
from ..branch import BranchWriteLockResult
51
from ..decorators import only_raises
52
from ..errors import (
56
from ..i18n import gettext
57
from .inventory import Inventory
58
from .inventorytree import InventoryRevisionTree
59
from ..lockable_files import LockableFiles
60
from ..sixish import (
64
from .smart import client, vfs, repository as smart_repo
65
from .smart.client import _SmartClient
66
from ..revision import NULL_REVISION
67
from ..repository import RepositoryWriteLockResult, _LazyListJoin
68
from .serializer import format_registry as serializer_format_registry
69
from ..trace import mutter, note, warning, log_exception_quietly
70
from .versionedfile import FulltextContentFactory
73
_DEFAULT_SEARCH_DEPTH = 100
76
class _RpcHelper(object):
77
"""Mixin class that helps with issuing RPCs."""
79
def _call(self, method, *args, **err_context):
81
return self._client.call(method, *args)
82
except errors.ErrorFromSmartServer as err:
83
self._translate_error(err, **err_context)
85
def _call_expecting_body(self, method, *args, **err_context):
87
return self._client.call_expecting_body(method, *args)
88
except errors.ErrorFromSmartServer as err:
89
self._translate_error(err, **err_context)
91
def _call_with_body_bytes(self, method, args, body_bytes, **err_context):
93
return self._client.call_with_body_bytes(method, args, body_bytes)
94
except errors.ErrorFromSmartServer as err:
95
self._translate_error(err, **err_context)
97
def _call_with_body_bytes_expecting_body(self, method, args, body_bytes,
100
return self._client.call_with_body_bytes_expecting_body(
101
method, args, body_bytes)
102
except errors.ErrorFromSmartServer as err:
103
self._translate_error(err, **err_context)
106
def response_tuple_to_repo_format(response):
107
"""Convert a response tuple describing a repository format to a format."""
108
format = RemoteRepositoryFormat()
109
format._rich_root_data = (response[0] == 'yes')
110
format._supports_tree_reference = (response[1] == 'yes')
111
format._supports_external_lookups = (response[2] == 'yes')
112
format._network_name = response[3]
116
# Note that RemoteBzrDirProber lives in breezy.bzrdir so breezy.bzr.remote
117
# does not have to be imported unless a remote format is involved.
119
class RemoteBzrDirFormat(_mod_bzrdir.BzrDirMetaFormat1):
120
"""Format representing bzrdirs accessed via a smart server"""
122
supports_workingtrees = False
124
colocated_branches = False
127
_mod_bzrdir.BzrDirMetaFormat1.__init__(self)
128
# XXX: It's a bit ugly that the network name is here, because we'd
129
# like to believe that format objects are stateless or at least
130
# immutable, However, we do at least avoid mutating the name after
131
# it's returned. See <https://bugs.launchpad.net/bzr/+bug/504102>
132
self._network_name = None
135
return "%s(_network_name=%r)" % (self.__class__.__name__,
138
def get_format_description(self):
139
if self._network_name:
141
real_format = controldir.network_format_registry.get(
146
return 'Remote: ' + real_format.get_format_description()
147
return 'bzr remote bzrdir'
149
def get_format_string(self):
150
raise NotImplementedError(self.get_format_string)
152
def network_name(self):
153
if self._network_name:
154
return self._network_name
156
raise AssertionError("No network name set.")
158
def initialize_on_transport(self, transport):
160
# hand off the request to the smart server
161
client_medium = transport.get_smart_medium()
162
except errors.NoSmartMedium:
163
# TODO: lookup the local format from a server hint.
164
local_dir_format = _mod_bzrdir.BzrDirMetaFormat1()
165
return local_dir_format.initialize_on_transport(transport)
166
client = _SmartClient(client_medium)
167
path = client.remote_path_from_transport(transport)
169
response = client.call('BzrDirFormat.initialize', path)
170
except errors.ErrorFromSmartServer as err:
171
_translate_error(err, path=path)
172
if response[0] != 'ok':
173
raise errors.SmartProtocolError('unexpected response code %s' % (response,))
174
format = RemoteBzrDirFormat()
175
self._supply_sub_formats_to(format)
176
return RemoteBzrDir(transport, format)
178
def parse_NoneTrueFalse(self, arg):
185
raise AssertionError("invalid arg %r" % arg)
187
def _serialize_NoneTrueFalse(self, arg):
194
def _serialize_NoneString(self, arg):
197
def initialize_on_transport_ex(self, transport, use_existing_dir=False,
198
create_prefix=False, force_new_repo=False, stacked_on=None,
199
stack_on_pwd=None, repo_format_name=None, make_working_trees=None,
202
# hand off the request to the smart server
203
client_medium = transport.get_smart_medium()
204
except errors.NoSmartMedium:
207
# Decline to open it if the server doesn't support our required
208
# version (3) so that the VFS-based transport will do it.
209
if client_medium.should_probe():
211
server_version = client_medium.protocol_version()
212
if server_version != '2':
216
except errors.SmartProtocolError:
217
# Apparently there's no usable smart server there, even though
218
# the medium supports the smart protocol.
223
client = _SmartClient(client_medium)
224
path = client.remote_path_from_transport(transport)
225
if client_medium._is_remote_before((1, 16)):
228
# TODO: lookup the local format from a server hint.
229
local_dir_format = _mod_bzrdir.BzrDirMetaFormat1()
230
self._supply_sub_formats_to(local_dir_format)
231
return local_dir_format.initialize_on_transport_ex(transport,
232
use_existing_dir=use_existing_dir, create_prefix=create_prefix,
233
force_new_repo=force_new_repo, stacked_on=stacked_on,
234
stack_on_pwd=stack_on_pwd, repo_format_name=repo_format_name,
235
make_working_trees=make_working_trees, shared_repo=shared_repo,
237
return self._initialize_on_transport_ex_rpc(client, path, transport,
238
use_existing_dir, create_prefix, force_new_repo, stacked_on,
239
stack_on_pwd, repo_format_name, make_working_trees, shared_repo)
241
def _initialize_on_transport_ex_rpc(self, client, path, transport,
242
use_existing_dir, create_prefix, force_new_repo, stacked_on,
243
stack_on_pwd, repo_format_name, make_working_trees, shared_repo):
245
args.append(self._serialize_NoneTrueFalse(use_existing_dir))
246
args.append(self._serialize_NoneTrueFalse(create_prefix))
247
args.append(self._serialize_NoneTrueFalse(force_new_repo))
248
args.append(self._serialize_NoneString(stacked_on))
249
# stack_on_pwd is often/usually our transport
252
stack_on_pwd = transport.relpath(stack_on_pwd)
255
except errors.PathNotChild:
257
args.append(self._serialize_NoneString(stack_on_pwd))
258
args.append(self._serialize_NoneString(repo_format_name))
259
args.append(self._serialize_NoneTrueFalse(make_working_trees))
260
args.append(self._serialize_NoneTrueFalse(shared_repo))
261
request_network_name = self._network_name or \
262
_mod_bzrdir.BzrDirFormat.get_default_format().network_name()
264
response = client.call('BzrDirFormat.initialize_ex_1.16',
265
request_network_name, path, *args)
266
except errors.UnknownSmartMethod:
267
client._medium._remember_remote_is_before((1, 16))
268
local_dir_format = _mod_bzrdir.BzrDirMetaFormat1()
269
self._supply_sub_formats_to(local_dir_format)
270
return local_dir_format.initialize_on_transport_ex(transport,
271
use_existing_dir=use_existing_dir, create_prefix=create_prefix,
272
force_new_repo=force_new_repo, stacked_on=stacked_on,
273
stack_on_pwd=stack_on_pwd, repo_format_name=repo_format_name,
274
make_working_trees=make_working_trees, shared_repo=shared_repo,
276
except errors.ErrorFromSmartServer as err:
277
_translate_error(err, path=path)
278
repo_path = response[0]
279
bzrdir_name = response[6]
280
require_stacking = response[7]
281
require_stacking = self.parse_NoneTrueFalse(require_stacking)
282
format = RemoteBzrDirFormat()
283
format._network_name = bzrdir_name
284
self._supply_sub_formats_to(format)
285
bzrdir = RemoteBzrDir(transport, format, _client=client)
287
repo_format = response_tuple_to_repo_format(response[1:])
291
repo_bzrdir_format = RemoteBzrDirFormat()
292
repo_bzrdir_format._network_name = response[5]
293
repo_bzr = RemoteBzrDir(transport.clone(repo_path),
297
final_stack = response[8] or None
298
final_stack_pwd = response[9] or None
300
final_stack_pwd = urlutils.join(
301
transport.base, final_stack_pwd)
302
remote_repo = RemoteRepository(repo_bzr, repo_format)
303
if len(response) > 10:
304
# Updated server verb that locks remotely.
305
repo_lock_token = response[10] or None
306
remote_repo.lock_write(repo_lock_token, _skip_rpc=True)
308
remote_repo.dont_leave_lock_in_place()
310
remote_repo.lock_write()
311
policy = _mod_bzrdir.UseExistingRepository(remote_repo,
312
final_stack, final_stack_pwd, require_stacking)
313
policy.acquire_repository()
317
bzrdir._format.set_branch_format(self.get_branch_format())
319
# The repo has already been created, but we need to make sure that
320
# we'll make a stackable branch.
321
bzrdir._format.require_stacking(_skip_repo=True)
322
return remote_repo, bzrdir, require_stacking, policy
324
def _open(self, transport):
325
return RemoteBzrDir(transport, self)
327
def __eq__(self, other):
328
if not isinstance(other, RemoteBzrDirFormat):
330
return self.get_format_description() == other.get_format_description()
332
def __return_repository_format(self):
333
# Always return a RemoteRepositoryFormat object, but if a specific bzr
334
# repository format has been asked for, tell the RemoteRepositoryFormat
335
# that it should use that for init() etc.
336
result = RemoteRepositoryFormat()
337
custom_format = getattr(self, '_repository_format', None)
339
if isinstance(custom_format, RemoteRepositoryFormat):
342
# We will use the custom format to create repositories over the
343
# wire; expose its details like rich_root_data for code to
345
result._custom_format = custom_format
348
def get_branch_format(self):
349
result = _mod_bzrdir.BzrDirMetaFormat1.get_branch_format(self)
350
if not isinstance(result, RemoteBranchFormat):
351
new_result = RemoteBranchFormat()
352
new_result._custom_format = result
354
self.set_branch_format(new_result)
358
repository_format = property(__return_repository_format,
359
_mod_bzrdir.BzrDirMetaFormat1._set_repository_format) #.im_func)
362
class RemoteControlStore(_mod_config.IniFileStore):
363
"""Control store which attempts to use HPSS calls to retrieve control store.
365
Note that this is specific to bzr-based formats.
368
def __init__(self, bzrdir):
369
super(RemoteControlStore, self).__init__()
370
self.controldir = bzrdir
371
self._real_store = None
373
def lock_write(self, token=None):
375
return self._real_store.lock_write(token)
379
return self._real_store.unlock()
382
with self.lock_write():
383
# We need to be able to override the undecorated implementation
384
self.save_without_locking()
386
def save_without_locking(self):
387
super(RemoteControlStore, self).save()
389
def _ensure_real(self):
390
self.controldir._ensure_real()
391
if self._real_store is None:
392
self._real_store = _mod_config.ControlStore(self.controldir)
394
def external_url(self):
395
return urlutils.join(self.branch.user_url, 'control.conf')
397
def _load_content(self):
398
medium = self.controldir._client._medium
399
path = self.controldir._path_for_remote_call(self.controldir._client)
401
response, handler = self.controldir._call_expecting_body(
402
'BzrDir.get_config_file', path)
403
except errors.UnknownSmartMethod:
405
return self._real_store._load_content()
406
if len(response) and response[0] != 'ok':
407
raise errors.UnexpectedSmartServerResponse(response)
408
return handler.read_body_bytes()
410
def _save_content(self, content):
411
# FIXME JRV 2011-11-22: Ideally this should use a
412
# HPSS call too, but at the moment it is not possible
413
# to write lock control directories.
415
return self._real_store._save_content(content)
418
class RemoteBzrDir(_mod_bzrdir.BzrDir, _RpcHelper):
419
"""Control directory on a remote server, accessed via bzr:// or similar."""
421
def __init__(self, transport, format, _client=None, _force_probe=False):
422
"""Construct a RemoteBzrDir.
424
:param _client: Private parameter for testing. Disables probing and the
425
use of a real bzrdir.
427
_mod_bzrdir.BzrDir.__init__(self, transport, format)
428
# this object holds a delegated bzrdir that uses file-level operations
429
# to talk to the other side
430
self._real_bzrdir = None
431
self._has_working_tree = None
432
# 1-shot cache for the call pattern 'create_branch; open_branch' - see
433
# create_branch for details.
434
self._next_open_branch_result = None
437
medium = transport.get_smart_medium()
438
self._client = client._SmartClient(medium)
440
self._client = _client
447
return '%s(%r)' % (self.__class__.__name__, self._client)
449
def _probe_bzrdir(self):
450
medium = self._client._medium
451
path = self._path_for_remote_call(self._client)
452
if medium._is_remote_before((2, 1)):
456
self._rpc_open_2_1(path)
458
except errors.UnknownSmartMethod:
459
medium._remember_remote_is_before((2, 1))
462
def _rpc_open_2_1(self, path):
463
response = self._call('BzrDir.open_2.1', path)
464
if response == ('no',):
465
raise errors.NotBranchError(path=self.root_transport.base)
466
elif response[0] == 'yes':
467
if response[1] == 'yes':
468
self._has_working_tree = True
469
elif response[1] == 'no':
470
self._has_working_tree = False
472
raise errors.UnexpectedSmartServerResponse(response)
474
raise errors.UnexpectedSmartServerResponse(response)
476
def _rpc_open(self, path):
477
response = self._call('BzrDir.open', path)
478
if response not in [('yes',), ('no',)]:
479
raise errors.UnexpectedSmartServerResponse(response)
480
if response == ('no',):
481
raise errors.NotBranchError(path=self.root_transport.base)
483
def _ensure_real(self):
484
"""Ensure that there is a _real_bzrdir set.
486
Used before calls to self._real_bzrdir.
488
if not self._real_bzrdir:
489
if 'hpssvfs' in debug.debug_flags:
491
warning('VFS BzrDir access triggered\n%s',
492
''.join(traceback.format_stack()))
493
self._real_bzrdir = _mod_bzrdir.BzrDir.open_from_transport(
494
self.root_transport, probers=[_mod_bzr.BzrProber])
495
self._format._network_name = \
496
self._real_bzrdir._format.network_name()
498
def _translate_error(self, err, **context):
499
_translate_error(err, bzrdir=self, **context)
501
def break_lock(self):
502
# Prevent aliasing problems in the next_open_branch_result cache.
503
# See create_branch for rationale.
504
self._next_open_branch_result = None
505
return _mod_bzrdir.BzrDir.break_lock(self)
507
def _vfs_checkout_metadir(self):
509
return self._real_bzrdir.checkout_metadir()
511
def checkout_metadir(self):
512
"""Retrieve the controldir format to use for checkouts of this one.
514
medium = self._client._medium
515
if medium._is_remote_before((2, 5)):
516
return self._vfs_checkout_metadir()
517
path = self._path_for_remote_call(self._client)
519
response = self._client.call('BzrDir.checkout_metadir',
521
except errors.UnknownSmartMethod:
522
medium._remember_remote_is_before((2, 5))
523
return self._vfs_checkout_metadir()
524
if len(response) != 3:
525
raise errors.UnexpectedSmartServerResponse(response)
526
control_name, repo_name, branch_name = response
528
format = controldir.network_format_registry.get(control_name)
530
raise errors.UnknownFormatError(kind='control',
534
repo_format = _mod_repository.network_format_registry.get(
537
raise errors.UnknownFormatError(kind='repository',
539
format.repository_format = repo_format
542
format.set_branch_format(
543
branch.network_format_registry.get(branch_name))
545
raise errors.UnknownFormatError(kind='branch',
549
def _vfs_cloning_metadir(self, require_stacking=False):
551
return self._real_bzrdir.cloning_metadir(
552
require_stacking=require_stacking)
554
def cloning_metadir(self, require_stacking=False):
555
medium = self._client._medium
556
if medium._is_remote_before((1, 13)):
557
return self._vfs_cloning_metadir(require_stacking=require_stacking)
558
verb = 'BzrDir.cloning_metadir'
563
path = self._path_for_remote_call(self._client)
565
response = self._call(verb, path, stacking)
566
except errors.UnknownSmartMethod:
567
medium._remember_remote_is_before((1, 13))
568
return self._vfs_cloning_metadir(require_stacking=require_stacking)
569
except errors.UnknownErrorFromSmartServer as err:
570
if err.error_tuple != ('BranchReference',):
572
# We need to resolve the branch reference to determine the
573
# cloning_metadir. This causes unnecessary RPCs to open the
574
# referenced branch (and bzrdir, etc) but only when the caller
575
# didn't already resolve the branch reference.
576
referenced_branch = self.open_branch()
577
return referenced_branch.controldir.cloning_metadir()
578
if len(response) != 3:
579
raise errors.UnexpectedSmartServerResponse(response)
580
control_name, repo_name, branch_info = response
581
if len(branch_info) != 2:
582
raise errors.UnexpectedSmartServerResponse(response)
583
branch_ref, branch_name = branch_info
585
format = controldir.network_format_registry.get(control_name)
587
raise errors.UnknownFormatError(kind='control', format=control_name)
591
format.repository_format = _mod_repository.network_format_registry.get(
594
raise errors.UnknownFormatError(kind='repository',
596
if branch_ref == 'ref':
597
# XXX: we need possible_transports here to avoid reopening the
598
# connection to the referenced location
599
ref_bzrdir = _mod_bzrdir.BzrDir.open(branch_name)
600
branch_format = ref_bzrdir.cloning_metadir().get_branch_format()
601
format.set_branch_format(branch_format)
602
elif branch_ref == 'branch':
605
branch_format = branch.network_format_registry.get(
608
raise errors.UnknownFormatError(kind='branch',
610
format.set_branch_format(branch_format)
612
raise errors.UnexpectedSmartServerResponse(response)
615
def create_repository(self, shared=False):
616
# as per meta1 formats - just delegate to the format object which may
618
result = self._format.repository_format.initialize(self, shared)
619
if not isinstance(result, RemoteRepository):
620
return self.open_repository()
624
def destroy_repository(self):
625
"""See BzrDir.destroy_repository"""
626
path = self._path_for_remote_call(self._client)
628
response = self._call('BzrDir.destroy_repository', path)
629
except errors.UnknownSmartMethod:
631
self._real_bzrdir.destroy_repository()
633
if response[0] != 'ok':
634
raise SmartProtocolError('unexpected response code %s' % (response,))
636
def create_branch(self, name=None, repository=None,
637
append_revisions_only=None):
639
name = self._get_selected_branch()
641
raise errors.NoColocatedBranchSupport(self)
642
# as per meta1 formats - just delegate to the format object which may
644
real_branch = self._format.get_branch_format().initialize(self,
645
name=name, repository=repository,
646
append_revisions_only=append_revisions_only)
647
if not isinstance(real_branch, RemoteBranch):
648
if not isinstance(repository, RemoteRepository):
649
raise AssertionError(
650
'need a RemoteRepository to use with RemoteBranch, got %r'
652
result = RemoteBranch(self, repository, real_branch, name=name)
655
# BzrDir.clone_on_transport() uses the result of create_branch but does
656
# not return it to its callers; we save approximately 8% of our round
657
# trips by handing the branch we created back to the first caller to
658
# open_branch rather than probing anew. Long term we need a API in
659
# bzrdir that doesn't discard result objects (like result_branch).
661
self._next_open_branch_result = result
664
def destroy_branch(self, name=None):
665
"""See BzrDir.destroy_branch"""
667
name = self._get_selected_branch()
669
raise errors.NoColocatedBranchSupport(self)
670
path = self._path_for_remote_call(self._client)
676
response = self._call('BzrDir.destroy_branch', path, *args)
677
except errors.UnknownSmartMethod:
679
self._real_bzrdir.destroy_branch(name=name)
680
self._next_open_branch_result = None
682
self._next_open_branch_result = None
683
if response[0] != 'ok':
684
raise SmartProtocolError('unexpected response code %s' % (response,))
686
def create_workingtree(self, revision_id=None, from_branch=None,
687
accelerator_tree=None, hardlink=False):
688
raise errors.NotLocalUrl(self.transport.base)
690
def find_branch_format(self, name=None):
691
"""Find the branch 'format' for this bzrdir.
693
This might be a synthetic object for e.g. RemoteBranch and SVN.
695
b = self.open_branch(name=name)
698
def get_branches(self, possible_transports=None, ignore_fallbacks=False):
699
path = self._path_for_remote_call(self._client)
701
response, handler = self._call_expecting_body(
702
'BzrDir.get_branches', path)
703
except errors.UnknownSmartMethod:
705
return self._real_bzrdir.get_branches()
706
if response[0] != "success":
707
raise errors.UnexpectedSmartServerResponse(response)
708
body = bencode.bdecode(handler.read_body_bytes())
710
for name, value in viewitems(body):
711
ret[name] = self._open_branch(name, value[0], value[1],
712
possible_transports=possible_transports,
713
ignore_fallbacks=ignore_fallbacks)
716
def set_branch_reference(self, target_branch, name=None):
717
"""See BzrDir.set_branch_reference()."""
719
name = self._get_selected_branch()
721
raise errors.NoColocatedBranchSupport(self)
723
return self._real_bzrdir.set_branch_reference(target_branch, name=name)
725
def get_branch_reference(self, name=None):
726
"""See BzrDir.get_branch_reference()."""
728
name = self._get_selected_branch()
730
raise errors.NoColocatedBranchSupport(self)
731
response = self._get_branch_reference()
732
if response[0] == 'ref':
737
def _get_branch_reference(self):
738
path = self._path_for_remote_call(self._client)
739
medium = self._client._medium
741
('BzrDir.open_branchV3', (2, 1)),
742
('BzrDir.open_branchV2', (1, 13)),
743
('BzrDir.open_branch', None),
745
for verb, required_version in candidate_calls:
746
if required_version and medium._is_remote_before(required_version):
749
response = self._call(verb, path)
750
except errors.UnknownSmartMethod:
751
if required_version is None:
753
medium._remember_remote_is_before(required_version)
756
if verb == 'BzrDir.open_branch':
757
if response[0] != 'ok':
758
raise errors.UnexpectedSmartServerResponse(response)
759
if response[1] != '':
760
return ('ref', response[1])
762
return ('branch', '')
763
if response[0] not in ('ref', 'branch'):
764
raise errors.UnexpectedSmartServerResponse(response)
767
def _get_tree_branch(self, name=None):
768
"""See BzrDir._get_tree_branch()."""
769
return None, self.open_branch(name=name)
771
def _open_branch(self, name, kind, location_or_format,
772
ignore_fallbacks=False, possible_transports=None):
774
# a branch reference, use the existing BranchReference logic.
775
format = BranchReferenceFormat()
776
return format.open(self, name=name, _found=True,
777
location=location_or_format, ignore_fallbacks=ignore_fallbacks,
778
possible_transports=possible_transports)
779
branch_format_name = location_or_format
780
if not branch_format_name:
781
branch_format_name = None
782
format = RemoteBranchFormat(network_name=branch_format_name)
783
return RemoteBranch(self, self.find_repository(), format=format,
784
setup_stacking=not ignore_fallbacks, name=name,
785
possible_transports=possible_transports)
787
def open_branch(self, name=None, unsupported=False,
788
ignore_fallbacks=False, possible_transports=None):
790
name = self._get_selected_branch()
792
raise errors.NoColocatedBranchSupport(self)
794
raise NotImplementedError('unsupported flag support not implemented yet.')
795
if self._next_open_branch_result is not None:
796
# See create_branch for details.
797
result = self._next_open_branch_result
798
self._next_open_branch_result = None
800
response = self._get_branch_reference()
801
return self._open_branch(name, response[0], response[1],
802
possible_transports=possible_transports,
803
ignore_fallbacks=ignore_fallbacks)
805
def _open_repo_v1(self, path):
806
verb = 'BzrDir.find_repository'
807
response = self._call(verb, path)
808
if response[0] != 'ok':
809
raise errors.UnexpectedSmartServerResponse(response)
810
# servers that only support the v1 method don't support external
813
repo = self._real_bzrdir.open_repository()
814
response = response + ('no', repo._format.network_name())
815
return response, repo
817
def _open_repo_v2(self, path):
818
verb = 'BzrDir.find_repositoryV2'
819
response = self._call(verb, path)
820
if response[0] != 'ok':
821
raise errors.UnexpectedSmartServerResponse(response)
823
repo = self._real_bzrdir.open_repository()
824
response = response + (repo._format.network_name(),)
825
return response, repo
827
def _open_repo_v3(self, path):
828
verb = 'BzrDir.find_repositoryV3'
829
medium = self._client._medium
830
if medium._is_remote_before((1, 13)):
831
raise errors.UnknownSmartMethod(verb)
833
response = self._call(verb, path)
834
except errors.UnknownSmartMethod:
835
medium._remember_remote_is_before((1, 13))
837
if response[0] != 'ok':
838
raise errors.UnexpectedSmartServerResponse(response)
839
return response, None
841
def open_repository(self):
842
path = self._path_for_remote_call(self._client)
844
for probe in [self._open_repo_v3, self._open_repo_v2,
847
response, real_repo = probe(path)
849
except errors.UnknownSmartMethod:
852
raise errors.UnknownSmartMethod('BzrDir.find_repository{3,2,}')
853
if response[0] != 'ok':
854
raise errors.UnexpectedSmartServerResponse(response)
855
if len(response) != 6:
856
raise SmartProtocolError('incorrect response length %s' % (response,))
857
if response[1] == '':
858
# repo is at this dir.
859
format = response_tuple_to_repo_format(response[2:])
860
# Used to support creating a real format instance when needed.
861
format._creating_bzrdir = self
862
remote_repo = RemoteRepository(self, format)
863
format._creating_repo = remote_repo
864
if real_repo is not None:
865
remote_repo._set_real_repository(real_repo)
868
raise errors.NoRepositoryPresent(self)
870
def has_workingtree(self):
871
if self._has_working_tree is None:
872
path = self._path_for_remote_call(self._client)
874
response = self._call('BzrDir.has_workingtree', path)
875
except errors.UnknownSmartMethod:
877
self._has_working_tree = self._real_bzrdir.has_workingtree()
879
if response[0] not in ('yes', 'no'):
880
raise SmartProtocolError('unexpected response code %s' % (response,))
881
self._has_working_tree = (response[0] == 'yes')
882
return self._has_working_tree
884
def open_workingtree(self, recommend_upgrade=True):
885
if self.has_workingtree():
886
raise errors.NotLocalUrl(self.root_transport)
888
raise errors.NoWorkingTree(self.root_transport.base)
890
def _path_for_remote_call(self, client):
891
"""Return the path to be used for this bzrdir in a remote call."""
892
return urlutils.split_segment_parameters_raw(
893
client.remote_path_from_transport(self.root_transport))[0]
895
def get_branch_transport(self, branch_format, name=None):
897
return self._real_bzrdir.get_branch_transport(branch_format, name=name)
899
def get_repository_transport(self, repository_format):
901
return self._real_bzrdir.get_repository_transport(repository_format)
903
def get_workingtree_transport(self, workingtree_format):
905
return self._real_bzrdir.get_workingtree_transport(workingtree_format)
907
def can_convert_format(self):
908
"""Upgrading of remote bzrdirs is not supported yet."""
911
def needs_format_conversion(self, format):
912
"""Upgrading of remote bzrdirs is not supported yet."""
915
def _get_config(self):
916
return RemoteBzrDirConfig(self)
918
def _get_config_store(self):
919
return RemoteControlStore(self)
922
class RemoteRepositoryFormat(vf_repository.VersionedFileRepositoryFormat):
923
"""Format for repositories accessed over a _SmartClient.
925
Instances of this repository are represented by RemoteRepository
928
The RemoteRepositoryFormat is parameterized during construction
929
to reflect the capabilities of the real, remote format. Specifically
930
the attributes rich_root_data and supports_tree_reference are set
931
on a per instance basis, and are not set (and should not be) at
934
:ivar _custom_format: If set, a specific concrete repository format that
935
will be used when initializing a repository with this
936
RemoteRepositoryFormat.
937
:ivar _creating_repo: If set, the repository object that this
938
RemoteRepositoryFormat was created for: it can be called into
939
to obtain data like the network name.
942
_matchingcontroldir = RemoteBzrDirFormat()
943
supports_full_versioned_files = True
944
supports_leaving_lock = True
945
supports_overriding_transport = False
948
_mod_repository.RepositoryFormat.__init__(self)
949
self._custom_format = None
950
self._network_name = None
951
self._creating_bzrdir = None
952
self._revision_graph_can_have_wrong_parents = None
953
self._supports_chks = None
954
self._supports_external_lookups = None
955
self._supports_tree_reference = None
956
self._supports_funky_characters = None
957
self._supports_nesting_repositories = None
958
self._rich_root_data = None
961
return "%s(_network_name=%r)" % (self.__class__.__name__,
965
def fast_deltas(self):
967
return self._custom_format.fast_deltas
970
def rich_root_data(self):
971
if self._rich_root_data is None:
973
self._rich_root_data = self._custom_format.rich_root_data
974
return self._rich_root_data
977
def supports_chks(self):
978
if self._supports_chks is None:
980
self._supports_chks = self._custom_format.supports_chks
981
return self._supports_chks
984
def supports_external_lookups(self):
985
if self._supports_external_lookups is None:
987
self._supports_external_lookups = \
988
self._custom_format.supports_external_lookups
989
return self._supports_external_lookups
992
def supports_funky_characters(self):
993
if self._supports_funky_characters is None:
995
self._supports_funky_characters = \
996
self._custom_format.supports_funky_characters
997
return self._supports_funky_characters
1000
def supports_nesting_repositories(self):
1001
if self._supports_nesting_repositories is None:
1003
self._supports_nesting_repositories = \
1004
self._custom_format.supports_nesting_repositories
1005
return self._supports_nesting_repositories
1008
def supports_tree_reference(self):
1009
if self._supports_tree_reference is None:
1011
self._supports_tree_reference = \
1012
self._custom_format.supports_tree_reference
1013
return self._supports_tree_reference
1016
def revision_graph_can_have_wrong_parents(self):
1017
if self._revision_graph_can_have_wrong_parents is None:
1019
self._revision_graph_can_have_wrong_parents = \
1020
self._custom_format.revision_graph_can_have_wrong_parents
1021
return self._revision_graph_can_have_wrong_parents
1023
def _vfs_initialize(self, a_controldir, shared):
1024
"""Helper for common code in initialize."""
1025
if self._custom_format:
1026
# Custom format requested
1027
result = self._custom_format.initialize(a_controldir, shared=shared)
1028
elif self._creating_bzrdir is not None:
1029
# Use the format that the repository we were created to back
1031
prior_repo = self._creating_bzrdir.open_repository()
1032
prior_repo._ensure_real()
1033
result = prior_repo._real_repository._format.initialize(
1034
a_controldir, shared=shared)
1036
# assume that a_bzr is a RemoteBzrDir but the smart server didn't
1037
# support remote initialization.
1038
# We delegate to a real object at this point (as RemoteBzrDir
1039
# delegate to the repository format which would lead to infinite
1040
# recursion if we just called a_controldir.create_repository.
1041
a_controldir._ensure_real()
1042
result = a_controldir._real_bzrdir.create_repository(shared=shared)
1043
if not isinstance(result, RemoteRepository):
1044
return self.open(a_controldir)
1048
def initialize(self, a_controldir, shared=False):
1049
# Being asked to create on a non RemoteBzrDir:
1050
if not isinstance(a_controldir, RemoteBzrDir):
1051
return self._vfs_initialize(a_controldir, shared)
1052
medium = a_controldir._client._medium
1053
if medium._is_remote_before((1, 13)):
1054
return self._vfs_initialize(a_controldir, shared)
1055
# Creating on a remote bzr dir.
1056
# 1) get the network name to use.
1057
if self._custom_format:
1058
network_name = self._custom_format.network_name()
1059
elif self._network_name:
1060
network_name = self._network_name
1062
# Select the current breezy default and ask for that.
1063
reference_bzrdir_format = controldir.format_registry.get('default')()
1064
reference_format = reference_bzrdir_format.repository_format
1065
network_name = reference_format.network_name()
1066
# 2) try direct creation via RPC
1067
path = a_controldir._path_for_remote_call(a_controldir._client)
1068
verb = 'BzrDir.create_repository'
1072
shared_str = 'False'
1074
response = a_controldir._call(verb, path, network_name, shared_str)
1075
except errors.UnknownSmartMethod:
1076
# Fallback - use vfs methods
1077
medium._remember_remote_is_before((1, 13))
1078
return self._vfs_initialize(a_controldir, shared)
1080
# Turn the response into a RemoteRepository object.
1081
format = response_tuple_to_repo_format(response[1:])
1082
# Used to support creating a real format instance when needed.
1083
format._creating_bzrdir = a_controldir
1084
remote_repo = RemoteRepository(a_controldir, format)
1085
format._creating_repo = remote_repo
1088
def open(self, a_controldir):
1089
if not isinstance(a_controldir, RemoteBzrDir):
1090
raise AssertionError('%r is not a RemoteBzrDir' % (a_controldir,))
1091
return a_controldir.open_repository()
1093
def _ensure_real(self):
1094
if self._custom_format is None:
1096
self._custom_format = _mod_repository.network_format_registry.get(
1099
raise errors.UnknownFormatError(kind='repository',
1100
format=self._network_name)
1103
def _fetch_order(self):
1105
return self._custom_format._fetch_order
1108
def _fetch_uses_deltas(self):
1110
return self._custom_format._fetch_uses_deltas
1113
def _fetch_reconcile(self):
1115
return self._custom_format._fetch_reconcile
1117
def get_format_description(self):
1119
return 'Remote: ' + self._custom_format.get_format_description()
1121
def __eq__(self, other):
1122
return self.__class__ is other.__class__
1124
def network_name(self):
1125
if self._network_name:
1126
return self._network_name
1127
self._creating_repo._ensure_real()
1128
return self._creating_repo._real_repository._format.network_name()
1131
def pack_compresses(self):
1133
return self._custom_format.pack_compresses
1136
def _serializer(self):
1138
return self._custom_format._serializer
1141
class RemoteRepository(_mod_repository.Repository, _RpcHelper,
1142
lock._RelockDebugMixin):
1143
"""Repository accessed over rpc.
1145
For the moment most operations are performed using local transport-backed
1149
def __init__(self, remote_bzrdir, format, real_repository=None, _client=None):
1150
"""Create a RemoteRepository instance.
1152
:param remote_bzrdir: The bzrdir hosting this repository.
1153
:param format: The RemoteFormat object to use.
1154
:param real_repository: If not None, a local implementation of the
1155
repository logic for the repository, usually accessing the data
1157
:param _client: Private testing parameter - override the smart client
1158
to be used by the repository.
1161
self._real_repository = real_repository
1163
self._real_repository = None
1164
self.controldir = remote_bzrdir
1166
self._client = remote_bzrdir._client
1168
self._client = _client
1169
self._format = format
1170
self._lock_mode = None
1171
self._lock_token = None
1172
self._write_group_tokens = None
1173
self._lock_count = 0
1174
self._leave_lock = False
1175
# Cache of revision parents; misses are cached during read locks, and
1176
# write locks when no _real_repository has been set.
1177
self._unstacked_provider = graph.CachingParentsProvider(
1178
get_parent_map=self._get_parent_map_rpc)
1179
self._unstacked_provider.disable_cache()
1181
# These depend on the actual remote format, so force them off for
1182
# maximum compatibility. XXX: In future these should depend on the
1183
# remote repository instance, but this is irrelevant until we perform
1184
# reconcile via an RPC call.
1185
self._reconcile_does_inventory_gc = False
1186
self._reconcile_fixes_text_parents = False
1187
self._reconcile_backsup_inventory = False
1188
self.base = self.controldir.transport.base
1189
# Additional places to query for data.
1190
self._fallback_repositories = []
1193
def user_transport(self):
1194
return self.controldir.user_transport
1197
def control_transport(self):
1198
# XXX: Normally you shouldn't directly get at the remote repository
1199
# transport, but I'm not sure it's worth making this method
1200
# optional -- mbp 2010-04-21
1201
return self.controldir.get_repository_transport(None)
1204
return "%s(%s)" % (self.__class__.__name__, self.base)
1208
def abort_write_group(self, suppress_errors=False):
1209
"""Complete a write group on the decorated repository.
1211
Smart methods perform operations in a single step so this API
1212
is not really applicable except as a compatibility thunk
1213
for older plugins that don't use e.g. the CommitBuilder
1216
:param suppress_errors: see Repository.abort_write_group.
1218
if self._real_repository:
1220
return self._real_repository.abort_write_group(
1221
suppress_errors=suppress_errors)
1222
if not self.is_in_write_group():
1224
mutter('(suppressed) not in write group')
1226
raise errors.BzrError("not in write group")
1227
path = self.controldir._path_for_remote_call(self._client)
1229
response = self._call('Repository.abort_write_group', path,
1230
self._lock_token, self._write_group_tokens)
1231
except Exception as exc:
1232
self._write_group = None
1233
if not suppress_errors:
1235
mutter('abort_write_group failed')
1236
log_exception_quietly()
1237
note(gettext('bzr: ERROR (ignored): %s'), exc)
1239
if response != ('ok', ):
1240
raise errors.UnexpectedSmartServerResponse(response)
1241
self._write_group_tokens = None
1244
def chk_bytes(self):
1245
"""Decorate the real repository for now.
1247
In the long term a full blown network facility is needed to avoid
1248
creating a real repository object locally.
1251
return self._real_repository.chk_bytes
1253
def commit_write_group(self):
1254
"""Complete a write group on the decorated repository.
1256
Smart methods perform operations in a single step so this API
1257
is not really applicable except as a compatibility thunk
1258
for older plugins that don't use e.g. the CommitBuilder
1261
if self._real_repository:
1263
return self._real_repository.commit_write_group()
1264
if not self.is_in_write_group():
1265
raise errors.BzrError("not in write group")
1266
path = self.controldir._path_for_remote_call(self._client)
1267
response = self._call('Repository.commit_write_group', path,
1268
self._lock_token, self._write_group_tokens)
1269
if response != ('ok', ):
1270
raise errors.UnexpectedSmartServerResponse(response)
1271
self._write_group_tokens = None
1272
# Refresh data after writing to the repository.
1275
def resume_write_group(self, tokens):
1276
if self._real_repository:
1277
return self._real_repository.resume_write_group(tokens)
1278
path = self.controldir._path_for_remote_call(self._client)
1280
response = self._call('Repository.check_write_group', path,
1281
self._lock_token, tokens)
1282
except errors.UnknownSmartMethod:
1284
return self._real_repository.resume_write_group(tokens)
1285
if response != ('ok', ):
1286
raise errors.UnexpectedSmartServerResponse(response)
1287
self._write_group_tokens = tokens
1289
def suspend_write_group(self):
1290
if self._real_repository:
1291
return self._real_repository.suspend_write_group()
1292
ret = self._write_group_tokens or []
1293
self._write_group_tokens = None
1296
def get_missing_parent_inventories(self, check_for_missing_texts=True):
1298
return self._real_repository.get_missing_parent_inventories(
1299
check_for_missing_texts=check_for_missing_texts)
1301
def _get_rev_id_for_revno_vfs(self, revno, known_pair):
1303
return self._real_repository.get_rev_id_for_revno(
1306
def get_rev_id_for_revno(self, revno, known_pair):
1307
"""See Repository.get_rev_id_for_revno."""
1308
path = self.controldir._path_for_remote_call(self._client)
1310
if self._client._medium._is_remote_before((1, 17)):
1311
return self._get_rev_id_for_revno_vfs(revno, known_pair)
1312
response = self._call(
1313
'Repository.get_rev_id_for_revno', path, revno, known_pair)
1314
except errors.UnknownSmartMethod:
1315
self._client._medium._remember_remote_is_before((1, 17))
1316
return self._get_rev_id_for_revno_vfs(revno, known_pair)
1317
if response[0] == 'ok':
1318
return True, response[1]
1319
elif response[0] == 'history-incomplete':
1320
known_pair = response[1:3]
1321
for fallback in self._fallback_repositories:
1322
found, result = fallback.get_rev_id_for_revno(revno, known_pair)
1327
# Not found in any fallbacks
1328
return False, known_pair
1330
raise errors.UnexpectedSmartServerResponse(response)
1332
def _ensure_real(self):
1333
"""Ensure that there is a _real_repository set.
1335
Used before calls to self._real_repository.
1337
Note that _ensure_real causes many roundtrips to the server which are
1338
not desirable, and prevents the use of smart one-roundtrip RPC's to
1339
perform complex operations (such as accessing parent data, streaming
1340
revisions etc). Adding calls to _ensure_real should only be done when
1341
bringing up new functionality, adding fallbacks for smart methods that
1342
require a fallback path, and never to replace an existing smart method
1343
invocation. If in doubt chat to the bzr network team.
1345
if self._real_repository is None:
1346
if 'hpssvfs' in debug.debug_flags:
1348
warning('VFS Repository access triggered\n%s',
1349
''.join(traceback.format_stack()))
1350
self._unstacked_provider.missing_keys.clear()
1351
self.controldir._ensure_real()
1352
self._set_real_repository(
1353
self.controldir._real_bzrdir.open_repository())
1355
def _translate_error(self, err, **context):
1356
self.controldir._translate_error(err, repository=self, **context)
1358
def find_text_key_references(self):
1359
"""Find the text key references within the repository.
1361
:return: A dictionary mapping text keys ((fileid, revision_id) tuples)
1362
to whether they were referred to by the inventory of the
1363
revision_id that they contain. The inventory texts from all present
1364
revision ids are assessed to generate this report.
1367
return self._real_repository.find_text_key_references()
1369
def _generate_text_key_index(self):
1370
"""Generate a new text key index for the repository.
1372
This is an expensive function that will take considerable time to run.
1374
:return: A dict mapping (file_id, revision_id) tuples to a list of
1375
parents, also (file_id, revision_id) tuples.
1378
return self._real_repository._generate_text_key_index()
1380
def _get_revision_graph(self, revision_id):
1381
"""Private method for using with old (< 1.2) servers to fallback."""
1382
if revision_id is None:
1384
elif _mod_revision.is_null(revision_id):
1387
path = self.controldir._path_for_remote_call(self._client)
1388
response = self._call_expecting_body(
1389
'Repository.get_revision_graph', path, revision_id)
1390
response_tuple, response_handler = response
1391
if response_tuple[0] != 'ok':
1392
raise errors.UnexpectedSmartServerResponse(response_tuple)
1393
coded = response_handler.read_body_bytes()
1395
# no revisions in this repository!
1397
lines = coded.split('\n')
1400
d = tuple(line.split())
1401
revision_graph[d[0]] = d[1:]
1403
return revision_graph
1405
def _get_sink(self):
1406
"""See Repository._get_sink()."""
1407
return RemoteStreamSink(self)
1409
def _get_source(self, to_format):
1410
"""Return a source for streaming from this repository."""
1411
return RemoteStreamSource(self, to_format)
1413
def get_file_graph(self):
1414
with self.lock_read():
1415
return graph.Graph(self.texts)
1417
def has_revision(self, revision_id):
1418
"""True if this repository has a copy of the revision."""
1419
# Copy of breezy.repository.Repository.has_revision
1420
with self.lock_read():
1421
return revision_id in self.has_revisions((revision_id,))
1423
def has_revisions(self, revision_ids):
1424
"""Probe to find out the presence of multiple revisions.
1426
:param revision_ids: An iterable of revision_ids.
1427
:return: A set of the revision_ids that were present.
1429
with self.lock_read():
1430
# Copy of breezy.repository.Repository.has_revisions
1431
parent_map = self.get_parent_map(revision_ids)
1432
result = set(parent_map)
1433
if _mod_revision.NULL_REVISION in revision_ids:
1434
result.add(_mod_revision.NULL_REVISION)
1437
def _has_same_fallbacks(self, other_repo):
1438
"""Returns true if the repositories have the same fallbacks."""
1439
# XXX: copied from Repository; it should be unified into a base class
1440
# <https://bugs.launchpad.net/bzr/+bug/401622>
1441
my_fb = self._fallback_repositories
1442
other_fb = other_repo._fallback_repositories
1443
if len(my_fb) != len(other_fb):
1445
for f, g in zip(my_fb, other_fb):
1446
if not f.has_same_location(g):
1450
def has_same_location(self, other):
1451
# TODO: Move to RepositoryBase and unify with the regular Repository
1452
# one; unfortunately the tests rely on slightly different behaviour at
1453
# present -- mbp 20090710
1454
return (self.__class__ is other.__class__ and
1455
self.controldir.transport.base == other.controldir.transport.base)
1457
def get_graph(self, other_repository=None):
1458
"""Return the graph for this repository format"""
1459
parents_provider = self._make_parents_provider(other_repository)
1460
return graph.Graph(parents_provider)
1462
def get_known_graph_ancestry(self, revision_ids):
1463
"""Return the known graph for a set of revision ids and their ancestors.
1465
with self.lock_read():
1466
st = static_tuple.StaticTuple
1467
revision_keys = [st(r_id).intern() for r_id in revision_ids]
1468
known_graph = self.revisions.get_known_graph_ancestry(revision_keys)
1469
return graph.GraphThunkIdsToKeys(known_graph)
1471
def gather_stats(self, revid=None, committers=None):
1472
"""See Repository.gather_stats()."""
1473
path = self.controldir._path_for_remote_call(self._client)
1474
# revid can be None to indicate no revisions, not just NULL_REVISION
1475
if revid is None or _mod_revision.is_null(revid):
1479
if committers is None or not committers:
1480
fmt_committers = 'no'
1482
fmt_committers = 'yes'
1483
response_tuple, response_handler = self._call_expecting_body(
1484
'Repository.gather_stats', path, fmt_revid, fmt_committers)
1485
if response_tuple[0] != 'ok':
1486
raise errors.UnexpectedSmartServerResponse(response_tuple)
1488
body = response_handler.read_body_bytes()
1490
for line in body.split('\n'):
1493
key, val_text = line.split(':')
1494
if key in ('revisions', 'size', 'committers'):
1495
result[key] = int(val_text)
1496
elif key in ('firstrev', 'latestrev'):
1497
values = val_text.split(' ')[1:]
1498
result[key] = (float(values[0]), int(values[1]))
1502
def find_branches(self, using=False):
1503
"""See Repository.find_branches()."""
1504
# should be an API call to the server.
1506
return self._real_repository.find_branches(using=using)
1508
def get_physical_lock_status(self):
1509
"""See Repository.get_physical_lock_status()."""
1510
path = self.controldir._path_for_remote_call(self._client)
1512
response = self._call('Repository.get_physical_lock_status', path)
1513
except errors.UnknownSmartMethod:
1515
return self._real_repository.get_physical_lock_status()
1516
if response[0] not in ('yes', 'no'):
1517
raise errors.UnexpectedSmartServerResponse(response)
1518
return (response[0] == 'yes')
1520
def is_in_write_group(self):
1521
"""Return True if there is an open write group.
1523
write groups are only applicable locally for the smart server..
1525
if self._write_group_tokens is not None:
1527
if self._real_repository:
1528
return self._real_repository.is_in_write_group()
1530
def is_locked(self):
1531
return self._lock_count >= 1
1533
def is_shared(self):
1534
"""See Repository.is_shared()."""
1535
path = self.controldir._path_for_remote_call(self._client)
1536
response = self._call('Repository.is_shared', path)
1537
if response[0] not in ('yes', 'no'):
1538
raise SmartProtocolError('unexpected response code %s' % (response,))
1539
return response[0] == 'yes'
1541
def is_write_locked(self):
1542
return self._lock_mode == 'w'
1544
def _warn_if_deprecated(self, branch=None):
1545
# If we have a real repository, the check will be done there, if we
1546
# don't the check will be done remotely.
1549
def lock_read(self):
1550
"""Lock the repository for read operations.
1552
:return: A breezy.lock.LogicalLockResult.
1554
# wrong eventually - want a local lock cache context
1555
if not self._lock_mode:
1556
self._note_lock('r')
1557
self._lock_mode = 'r'
1558
self._lock_count = 1
1559
self._unstacked_provider.enable_cache(cache_misses=True)
1560
if self._real_repository is not None:
1561
self._real_repository.lock_read()
1562
for repo in self._fallback_repositories:
1565
self._lock_count += 1
1566
return lock.LogicalLockResult(self.unlock)
1568
def _remote_lock_write(self, token):
1569
path = self.controldir._path_for_remote_call(self._client)
1572
err_context = {'token': token}
1573
response = self._call('Repository.lock_write', path, token,
1575
if response[0] == 'ok':
1576
ok, token = response
1579
raise errors.UnexpectedSmartServerResponse(response)
1581
def lock_write(self, token=None, _skip_rpc=False):
1582
if not self._lock_mode:
1583
self._note_lock('w')
1585
if self._lock_token is not None:
1586
if token != self._lock_token:
1587
raise errors.TokenMismatch(token, self._lock_token)
1588
self._lock_token = token
1590
self._lock_token = self._remote_lock_write(token)
1591
# if self._lock_token is None, then this is something like packs or
1592
# svn where we don't get to lock the repo, or a weave style repository
1593
# where we cannot lock it over the wire and attempts to do so will
1595
if self._real_repository is not None:
1596
self._real_repository.lock_write(token=self._lock_token)
1597
if token is not None:
1598
self._leave_lock = True
1600
self._leave_lock = False
1601
self._lock_mode = 'w'
1602
self._lock_count = 1
1603
cache_misses = self._real_repository is None
1604
self._unstacked_provider.enable_cache(cache_misses=cache_misses)
1605
for repo in self._fallback_repositories:
1606
# Writes don't affect fallback repos
1608
elif self._lock_mode == 'r':
1609
raise errors.ReadOnlyError(self)
1611
self._lock_count += 1
1612
return RepositoryWriteLockResult(self.unlock, self._lock_token or None)
1614
def leave_lock_in_place(self):
1615
if not self._lock_token:
1616
raise NotImplementedError(self.leave_lock_in_place)
1617
self._leave_lock = True
1619
def dont_leave_lock_in_place(self):
1620
if not self._lock_token:
1621
raise NotImplementedError(self.dont_leave_lock_in_place)
1622
self._leave_lock = False
1624
def _set_real_repository(self, repository):
1625
"""Set the _real_repository for this repository.
1627
:param repository: The repository to fallback to for non-hpss
1628
implemented operations.
1630
if self._real_repository is not None:
1631
# Replacing an already set real repository.
1632
# We cannot do this [currently] if the repository is locked -
1633
# synchronised state might be lost.
1634
if self.is_locked():
1635
raise AssertionError('_real_repository is already set')
1636
if isinstance(repository, RemoteRepository):
1637
raise AssertionError()
1638
self._real_repository = repository
1639
# three code paths happen here:
1640
# 1) old servers, RemoteBranch.open() calls _ensure_real before setting
1641
# up stacking. In this case self._fallback_repositories is [], and the
1642
# real repo is already setup. Preserve the real repo and
1643
# RemoteRepository.add_fallback_repository will avoid adding
1645
# 2) new servers, RemoteBranch.open() sets up stacking, and when
1646
# ensure_real is triggered from a branch, the real repository to
1647
# set already has a matching list with separate instances, but
1648
# as they are also RemoteRepositories we don't worry about making the
1649
# lists be identical.
1650
# 3) new servers, RemoteRepository.ensure_real is triggered before
1651
# RemoteBranch.ensure real, in this case we get a repo with no fallbacks
1652
# and need to populate it.
1653
if (self._fallback_repositories and
1654
len(self._real_repository._fallback_repositories) !=
1655
len(self._fallback_repositories)):
1656
if len(self._real_repository._fallback_repositories):
1657
raise AssertionError(
1658
"cannot cleanly remove existing _fallback_repositories")
1659
for fb in self._fallback_repositories:
1660
self._real_repository.add_fallback_repository(fb)
1661
if self._lock_mode == 'w':
1662
# if we are already locked, the real repository must be able to
1663
# acquire the lock with our token.
1664
self._real_repository.lock_write(self._lock_token)
1665
elif self._lock_mode == 'r':
1666
self._real_repository.lock_read()
1667
if self._write_group_tokens is not None:
1668
# if we are already in a write group, resume it
1669
self._real_repository.resume_write_group(self._write_group_tokens)
1670
self._write_group_tokens = None
1672
def start_write_group(self):
1673
"""Start a write group on the decorated repository.
1675
Smart methods perform operations in a single step so this API
1676
is not really applicable except as a compatibility thunk
1677
for older plugins that don't use e.g. the CommitBuilder
1680
if self._real_repository:
1682
return self._real_repository.start_write_group()
1683
if not self.is_write_locked():
1684
raise errors.NotWriteLocked(self)
1685
if self._write_group_tokens is not None:
1686
raise errors.BzrError('already in a write group')
1687
path = self.controldir._path_for_remote_call(self._client)
1689
response = self._call('Repository.start_write_group', path,
1691
except (errors.UnknownSmartMethod, errors.UnsuspendableWriteGroup):
1693
return self._real_repository.start_write_group()
1694
if response[0] != 'ok':
1695
raise errors.UnexpectedSmartServerResponse(response)
1696
self._write_group_tokens = response[1]
1698
def _unlock(self, token):
1699
path = self.controldir._path_for_remote_call(self._client)
1701
# with no token the remote repository is not persistently locked.
1703
err_context = {'token': token}
1704
response = self._call('Repository.unlock', path, token,
1706
if response == ('ok',):
1709
raise errors.UnexpectedSmartServerResponse(response)
1711
@only_raises(errors.LockNotHeld, errors.LockBroken)
1713
if not self._lock_count:
1714
return lock.cant_unlock_not_held(self)
1715
self._lock_count -= 1
1716
if self._lock_count > 0:
1718
self._unstacked_provider.disable_cache()
1719
old_mode = self._lock_mode
1720
self._lock_mode = None
1722
# The real repository is responsible at present for raising an
1723
# exception if it's in an unfinished write group. However, it
1724
# normally will *not* actually remove the lock from disk - that's
1725
# done by the server on receiving the Repository.unlock call.
1726
# This is just to let the _real_repository stay up to date.
1727
if self._real_repository is not None:
1728
self._real_repository.unlock()
1729
elif self._write_group_tokens is not None:
1730
self.abort_write_group()
1732
# The rpc-level lock should be released even if there was a
1733
# problem releasing the vfs-based lock.
1735
# Only write-locked repositories need to make a remote method
1736
# call to perform the unlock.
1737
old_token = self._lock_token
1738
self._lock_token = None
1739
if not self._leave_lock:
1740
self._unlock(old_token)
1741
# Fallbacks are always 'lock_read()' so we don't pay attention to
1743
for repo in self._fallback_repositories:
1746
def break_lock(self):
1747
# should hand off to the network
1748
path = self.controldir._path_for_remote_call(self._client)
1750
response = self._call("Repository.break_lock", path)
1751
except errors.UnknownSmartMethod:
1753
return self._real_repository.break_lock()
1754
if response != ('ok',):
1755
raise errors.UnexpectedSmartServerResponse(response)
1757
def _get_tarball(self, compression):
1758
"""Return a TemporaryFile containing a repository tarball.
1760
Returns None if the server does not support sending tarballs.
1763
path = self.controldir._path_for_remote_call(self._client)
1765
response, protocol = self._call_expecting_body(
1766
'Repository.tarball', path, compression)
1767
except errors.UnknownSmartMethod:
1768
protocol.cancel_read_body()
1770
if response[0] == 'ok':
1771
# Extract the tarball and return it
1772
t = tempfile.NamedTemporaryFile()
1773
# TODO: rpc layer should read directly into it...
1774
t.write(protocol.read_body_bytes())
1777
raise errors.UnexpectedSmartServerResponse(response)
1779
def sprout(self, to_bzrdir, revision_id=None):
1780
"""Create a descendent repository for new development.
1782
Unlike clone, this does not copy the settings of the repository.
1784
with self.lock_read():
1785
dest_repo = self._create_sprouting_repo(to_bzrdir, shared=False)
1786
dest_repo.fetch(self, revision_id=revision_id)
1789
def _create_sprouting_repo(self, a_controldir, shared):
1790
if not isinstance(a_controldir._format, self.controldir._format.__class__):
1791
# use target default format.
1792
dest_repo = a_controldir.create_repository()
1794
# Most control formats need the repository to be specifically
1795
# created, but on some old all-in-one formats it's not needed
1797
dest_repo = self._format.initialize(a_controldir, shared=shared)
1798
except errors.UninitializableFormat:
1799
dest_repo = a_controldir.open_repository()
1802
### These methods are just thin shims to the VFS object for now.
1804
def revision_tree(self, revision_id):
1805
with self.lock_read():
1806
revision_id = _mod_revision.ensure_null(revision_id)
1807
if revision_id == _mod_revision.NULL_REVISION:
1808
return InventoryRevisionTree(self,
1809
Inventory(root_id=None), _mod_revision.NULL_REVISION)
1811
return list(self.revision_trees([revision_id]))[0]
1813
def get_serializer_format(self):
1814
path = self.controldir._path_for_remote_call(self._client)
1816
response = self._call('VersionedFileRepository.get_serializer_format',
1818
except errors.UnknownSmartMethod:
1820
return self._real_repository.get_serializer_format()
1821
if response[0] != 'ok':
1822
raise errors.UnexpectedSmartServerResponse(response)
1825
def get_commit_builder(self, branch, parents, config, timestamp=None,
1826
timezone=None, committer=None, revprops=None,
1827
revision_id=None, lossy=False):
1828
"""Obtain a CommitBuilder for this repository.
1830
:param branch: Branch to commit to.
1831
:param parents: Revision ids of the parents of the new revision.
1832
:param config: Configuration to use.
1833
:param timestamp: Optional timestamp recorded for commit.
1834
:param timezone: Optional timezone for timestamp.
1835
:param committer: Optional committer to set for commit.
1836
:param revprops: Optional dictionary of revision properties.
1837
:param revision_id: Optional revision id.
1838
:param lossy: Whether to discard data that can not be natively
1839
represented, when pushing to a foreign VCS
1841
if self._fallback_repositories and not self._format.supports_chks:
1842
raise errors.BzrError("Cannot commit directly to a stacked branch"
1843
" in pre-2a formats. See "
1844
"https://bugs.launchpad.net/bzr/+bug/375013 for details.")
1845
if self._format.rich_root_data:
1846
commit_builder_kls = vf_repository.VersionedFileRootCommitBuilder
1848
commit_builder_kls = vf_repository.VersionedFileCommitBuilder
1849
result = commit_builder_kls(self, parents, config,
1850
timestamp, timezone, committer, revprops, revision_id,
1852
self.start_write_group()
1855
def add_fallback_repository(self, repository):
1856
"""Add a repository to use for looking up data not held locally.
1858
:param repository: A repository.
1860
if not self._format.supports_external_lookups:
1861
raise errors.UnstackableRepositoryFormat(
1862
self._format.network_name(), self.base)
1863
# We need to accumulate additional repositories here, to pass them in
1866
# Make the check before we lock: this raises an exception.
1867
self._check_fallback_repository(repository)
1868
if self.is_locked():
1869
# We will call fallback.unlock() when we transition to the unlocked
1870
# state, so always add a lock here. If a caller passes us a locked
1871
# repository, they are responsible for unlocking it later.
1872
repository.lock_read()
1873
self._fallback_repositories.append(repository)
1874
# If self._real_repository was parameterised already (e.g. because a
1875
# _real_branch had its get_stacked_on_url method called), then the
1876
# repository to be added may already be in the _real_repositories list.
1877
if self._real_repository is not None:
1878
fallback_locations = [repo.user_url for repo in
1879
self._real_repository._fallback_repositories]
1880
if repository.user_url not in fallback_locations:
1881
self._real_repository.add_fallback_repository(repository)
1883
def _check_fallback_repository(self, repository):
1884
"""Check that this repository can fallback to repository safely.
1886
Raise an error if not.
1888
:param repository: A repository to fallback to.
1890
return _mod_repository.InterRepository._assert_same_model(
1893
def add_inventory(self, revid, inv, parents):
1895
return self._real_repository.add_inventory(revid, inv, parents)
1897
def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
1898
parents, basis_inv=None, propagate_caches=False):
1900
return self._real_repository.add_inventory_by_delta(basis_revision_id,
1901
delta, new_revision_id, parents, basis_inv=basis_inv,
1902
propagate_caches=propagate_caches)
1904
def add_revision(self, revision_id, rev, inv=None):
1905
_mod_revision.check_not_reserved_id(revision_id)
1906
key = (revision_id,)
1907
# check inventory present
1908
if not self.inventories.get_parent_map([key]):
1910
raise errors.WeaveRevisionNotPresent(revision_id,
1913
# yes, this is not suitable for adding with ghosts.
1914
rev.inventory_sha1 = self.add_inventory(revision_id, inv,
1917
rev.inventory_sha1 = self.inventories.get_sha1s([key])[key]
1918
self._add_revision(rev)
1920
def _add_revision(self, rev):
1921
if self._real_repository is not None:
1922
return self._real_repository._add_revision(rev)
1923
text = self._serializer.write_revision_to_string(rev)
1924
key = (rev.revision_id,)
1925
parents = tuple((parent,) for parent in rev.parent_ids)
1926
self._write_group_tokens, missing_keys = self._get_sink().insert_stream(
1927
[('revisions', [FulltextContentFactory(key, parents, None, text)])],
1928
self._format, self._write_group_tokens)
1930
def get_inventory(self, revision_id):
1931
with self.lock_read():
1932
return list(self.iter_inventories([revision_id]))[0]
1934
def _iter_inventories_rpc(self, revision_ids, ordering):
1935
if ordering is None:
1936
ordering = 'unordered'
1937
path = self.controldir._path_for_remote_call(self._client)
1938
body = "\n".join(revision_ids)
1939
response_tuple, response_handler = (
1940
self._call_with_body_bytes_expecting_body(
1941
"VersionedFileRepository.get_inventories",
1942
(path, ordering), body))
1943
if response_tuple[0] != "ok":
1944
raise errors.UnexpectedSmartServerResponse(response_tuple)
1945
deserializer = inventory_delta.InventoryDeltaDeserializer()
1946
byte_stream = response_handler.read_streamed_body()
1947
decoded = smart_repo._byte_stream_to_stream(byte_stream)
1949
# no results whatsoever
1951
src_format, stream = decoded
1952
if src_format.network_name() != self._format.network_name():
1953
raise AssertionError(
1954
"Mismatched RemoteRepository and stream src %r, %r" % (
1955
src_format.network_name(), self._format.network_name()))
1956
# ignore the src format, it's not really relevant
1957
prev_inv = Inventory(root_id=None,
1958
revision_id=_mod_revision.NULL_REVISION)
1959
# there should be just one substream, with inventory deltas
1960
substream_kind, substream = next(stream)
1961
if substream_kind != "inventory-deltas":
1962
raise AssertionError(
1963
"Unexpected stream %r received" % substream_kind)
1964
for record in substream:
1965
(parent_id, new_id, versioned_root, tree_references, invdelta) = (
1966
deserializer.parse_text_bytes(record.get_bytes_as("fulltext")))
1967
if parent_id != prev_inv.revision_id:
1968
raise AssertionError("invalid base %r != %r" % (parent_id,
1969
prev_inv.revision_id))
1970
inv = prev_inv.create_by_apply_delta(invdelta, new_id)
1971
yield inv, inv.revision_id
1974
def _iter_inventories_vfs(self, revision_ids, ordering=None):
1976
return self._real_repository._iter_inventories(revision_ids, ordering)
1978
def iter_inventories(self, revision_ids, ordering=None):
1979
"""Get many inventories by revision_ids.
1981
This will buffer some or all of the texts used in constructing the
1982
inventories in memory, but will only parse a single inventory at a
1985
:param revision_ids: The expected revision ids of the inventories.
1986
:param ordering: optional ordering, e.g. 'topological'. If not
1987
specified, the order of revision_ids will be preserved (by
1988
buffering if necessary).
1989
:return: An iterator of inventories.
1991
if ((None in revision_ids)
1992
or (_mod_revision.NULL_REVISION in revision_ids)):
1993
raise ValueError('cannot get null revision inventory')
1994
for inv, revid in self._iter_inventories(revision_ids, ordering):
1996
raise errors.NoSuchRevision(self, revid)
1999
def _iter_inventories(self, revision_ids, ordering=None):
2000
if len(revision_ids) == 0:
2002
missing = set(revision_ids)
2003
if ordering is None:
2004
order_as_requested = True
2006
order = list(revision_ids)
2008
next_revid = order.pop()
2010
order_as_requested = False
2011
if ordering != 'unordered' and self._fallback_repositories:
2012
raise ValueError('unsupported ordering %r' % ordering)
2013
iter_inv_fns = [self._iter_inventories_rpc] + [
2014
fallback._iter_inventories for fallback in
2015
self._fallback_repositories]
2017
for iter_inv in iter_inv_fns:
2018
request = [revid for revid in revision_ids if revid in missing]
2019
for inv, revid in iter_inv(request, ordering):
2022
missing.remove(inv.revision_id)
2023
if ordering != 'unordered':
2027
if order_as_requested:
2028
# Yield as many results as we can while preserving order.
2029
while next_revid in invs:
2030
inv = invs.pop(next_revid)
2031
yield inv, inv.revision_id
2033
next_revid = order.pop()
2035
# We still want to fully consume the stream, just
2036
# in case it is not actually finished at this point
2039
except errors.UnknownSmartMethod:
2040
for inv, revid in self._iter_inventories_vfs(revision_ids, ordering):
2044
if order_as_requested:
2045
if next_revid is not None:
2046
yield None, next_revid
2049
yield invs.get(revid), revid
2052
yield None, missing.pop()
2054
def get_revision(self, revision_id):
2055
with self.lock_read():
2056
return self.get_revisions([revision_id])[0]
2058
def get_transaction(self):
2060
return self._real_repository.get_transaction()
2062
def clone(self, a_controldir, revision_id=None):
2063
with self.lock_read():
2064
dest_repo = self._create_sprouting_repo(
2065
a_controldir, shared=self.is_shared())
2066
self.copy_content_into(dest_repo, revision_id)
2069
def make_working_trees(self):
2070
"""See Repository.make_working_trees"""
2071
path = self.controldir._path_for_remote_call(self._client)
2073
response = self._call('Repository.make_working_trees', path)
2074
except errors.UnknownSmartMethod:
2076
return self._real_repository.make_working_trees()
2077
if response[0] not in ('yes', 'no'):
2078
raise SmartProtocolError('unexpected response code %s' % (response,))
2079
return response[0] == 'yes'
2081
def refresh_data(self):
2082
"""Re-read any data needed to synchronise with disk.
2084
This method is intended to be called after another repository instance
2085
(such as one used by a smart server) has inserted data into the
2086
repository. On all repositories this will work outside of write groups.
2087
Some repository formats (pack and newer for breezy native formats)
2088
support refresh_data inside write groups. If called inside a write
2089
group on a repository that does not support refreshing in a write group
2090
IsInWriteGroupError will be raised.
2092
if self._real_repository is not None:
2093
self._real_repository.refresh_data()
2094
# Refresh the parents cache for this object
2095
self._unstacked_provider.disable_cache()
2096
self._unstacked_provider.enable_cache()
2098
def revision_ids_to_search_result(self, result_set):
2099
"""Convert a set of revision ids to a graph SearchResult."""
2100
result_parents = set()
2101
for parents in viewvalues(self.get_graph().get_parent_map(result_set)):
2102
result_parents.update(parents)
2103
included_keys = result_set.intersection(result_parents)
2104
start_keys = result_set.difference(included_keys)
2105
exclude_keys = result_parents.difference(result_set)
2106
result = vf_search.SearchResult(start_keys, exclude_keys,
2107
len(result_set), result_set)
2110
def search_missing_revision_ids(self, other,
2111
find_ghosts=True, revision_ids=None, if_present_ids=None,
2113
"""Return the revision ids that other has that this does not.
2115
These are returned in topological order.
2117
revision_id: only return revision ids included by revision_id.
2119
with self.lock_read():
2120
inter_repo = _mod_repository.InterRepository.get(other, self)
2121
return inter_repo.search_missing_revision_ids(
2122
find_ghosts=find_ghosts, revision_ids=revision_ids,
2123
if_present_ids=if_present_ids, limit=limit)
2125
def fetch(self, source, revision_id=None, find_ghosts=False,
2127
# No base implementation to use as RemoteRepository is not a subclass
2128
# of Repository; so this is a copy of Repository.fetch().
2129
if fetch_spec is not None and revision_id is not None:
2130
raise AssertionError(
2131
"fetch_spec and revision_id are mutually exclusive.")
2132
if self.is_in_write_group():
2133
raise errors.InternalBzrError(
2134
"May not fetch while in a write group.")
2135
# fast path same-url fetch operations
2136
if (self.has_same_location(source)
2137
and fetch_spec is None
2138
and self._has_same_fallbacks(source)):
2139
# check that last_revision is in 'from' and then return a
2141
if (revision_id is not None and
2142
not _mod_revision.is_null(revision_id)):
2143
self.get_revision(revision_id)
2145
# if there is no specific appropriate InterRepository, this will get
2146
# the InterRepository base class, which raises an
2147
# IncompatibleRepositories when asked to fetch.
2148
inter = _mod_repository.InterRepository.get(source, self)
2149
if (fetch_spec is not None and
2150
not getattr(inter, "supports_fetch_spec", False)):
2151
raise errors.UnsupportedOperation(
2152
"fetch_spec not supported for %r" % inter)
2153
return inter.fetch(revision_id=revision_id,
2154
find_ghosts=find_ghosts, fetch_spec=fetch_spec)
2156
def create_bundle(self, target, base, fileobj, format=None):
2158
self._real_repository.create_bundle(target, base, fileobj, format)
2160
def fileids_altered_by_revision_ids(self, revision_ids):
2162
return self._real_repository.fileids_altered_by_revision_ids(revision_ids)
2164
def _get_versioned_file_checker(self, revisions, revision_versions_cache):
2166
return self._real_repository._get_versioned_file_checker(
2167
revisions, revision_versions_cache)
2169
def _iter_files_bytes_rpc(self, desired_files, absent):
2170
path = self.controldir._path_for_remote_call(self._client)
2173
for (file_id, revid, identifier) in desired_files:
2174
lines.append("%s\0%s" % (
2175
osutils.safe_file_id(file_id),
2176
osutils.safe_revision_id(revid)))
2177
identifiers.append(identifier)
2178
(response_tuple, response_handler) = (
2179
self._call_with_body_bytes_expecting_body(
2180
"Repository.iter_files_bytes", (path, ), "\n".join(lines)))
2181
if response_tuple != ('ok', ):
2182
response_handler.cancel_read_body()
2183
raise errors.UnexpectedSmartServerResponse(response_tuple)
2184
byte_stream = response_handler.read_streamed_body()
2185
def decompress_stream(start, byte_stream, unused):
2186
decompressor = zlib.decompressobj()
2187
yield decompressor.decompress(start)
2188
while decompressor.unused_data == "":
2190
data = next(byte_stream)
2191
except StopIteration:
2193
yield decompressor.decompress(data)
2194
yield decompressor.flush()
2195
unused.append(decompressor.unused_data)
2198
while not "\n" in unused:
2199
unused += next(byte_stream)
2200
header, rest = unused.split("\n", 1)
2201
args = header.split("\0")
2202
if args[0] == "absent":
2203
absent[identifiers[int(args[3])]] = (args[1], args[2])
2206
elif args[0] == "ok":
2209
raise errors.UnexpectedSmartServerResponse(args)
2211
yield (identifiers[idx],
2212
decompress_stream(rest, byte_stream, unused_chunks))
2213
unused = "".join(unused_chunks)
2215
def iter_files_bytes(self, desired_files):
2216
"""See Repository.iter_file_bytes.
2220
for (identifier, bytes_iterator) in self._iter_files_bytes_rpc(
2221
desired_files, absent):
2222
yield identifier, bytes_iterator
2223
for fallback in self._fallback_repositories:
2226
desired_files = [(key[0], key[1], identifier)
2227
for identifier, key in viewitems(absent)]
2228
for (identifier, bytes_iterator) in fallback.iter_files_bytes(desired_files):
2229
del absent[identifier]
2230
yield identifier, bytes_iterator
2232
# There may be more missing items, but raise an exception
2234
missing_identifier = next(iter(absent))
2235
missing_key = absent[missing_identifier]
2236
raise errors.RevisionNotPresent(revision_id=missing_key[1],
2237
file_id=missing_key[0])
2238
except errors.UnknownSmartMethod:
2240
for (identifier, bytes_iterator) in (
2241
self._real_repository.iter_files_bytes(desired_files)):
2242
yield identifier, bytes_iterator
2244
def get_cached_parent_map(self, revision_ids):
2245
"""See breezy.CachingParentsProvider.get_cached_parent_map"""
2246
return self._unstacked_provider.get_cached_parent_map(revision_ids)
2248
def get_parent_map(self, revision_ids):
2249
"""See breezy.Graph.get_parent_map()."""
2250
return self._make_parents_provider().get_parent_map(revision_ids)
2252
def _get_parent_map_rpc(self, keys):
2253
"""Helper for get_parent_map that performs the RPC."""
2254
medium = self._client._medium
2255
if medium._is_remote_before((1, 2)):
2256
# We already found out that the server can't understand
2257
# Repository.get_parent_map requests, so just fetch the whole
2260
# Note that this reads the whole graph, when only some keys are
2261
# wanted. On this old server there's no way (?) to get them all
2262
# in one go, and the user probably will have seen a warning about
2263
# the server being old anyhow.
2264
rg = self._get_revision_graph(None)
2265
# There is an API discrepancy between get_parent_map and
2266
# get_revision_graph. Specifically, a "key:()" pair in
2267
# get_revision_graph just means a node has no parents. For
2268
# "get_parent_map" it means the node is a ghost. So fix up the
2269
# graph to correct this.
2270
# https://bugs.launchpad.net/bzr/+bug/214894
2271
# There is one other "bug" which is that ghosts in
2272
# get_revision_graph() are not returned at all. But we won't worry
2273
# about that for now.
2274
for node_id, parent_ids in viewitems(rg):
2275
if parent_ids == ():
2276
rg[node_id] = (NULL_REVISION,)
2277
rg[NULL_REVISION] = ()
2282
raise ValueError('get_parent_map(None) is not valid')
2283
if NULL_REVISION in keys:
2284
keys.discard(NULL_REVISION)
2285
found_parents = {NULL_REVISION:()}
2287
return found_parents
2290
# TODO(Needs analysis): We could assume that the keys being requested
2291
# from get_parent_map are in a breadth first search, so typically they
2292
# will all be depth N from some common parent, and we don't have to
2293
# have the server iterate from the root parent, but rather from the
2294
# keys we're searching; and just tell the server the keyspace we
2295
# already have; but this may be more traffic again.
2297
# Transform self._parents_map into a search request recipe.
2298
# TODO: Manage this incrementally to avoid covering the same path
2299
# repeatedly. (The server will have to on each request, but the less
2300
# work done the better).
2302
# Negative caching notes:
2303
# new server sends missing when a request including the revid
2304
# 'include-missing:' is present in the request.
2305
# missing keys are serialised as missing:X, and we then call
2306
# provider.note_missing(X) for-all X
2307
parents_map = self._unstacked_provider.get_cached_map()
2308
if parents_map is None:
2309
# Repository is not locked, so there's no cache.
2311
if _DEFAULT_SEARCH_DEPTH <= 0:
2312
(start_set, stop_keys,
2313
key_count) = vf_search.search_result_from_parent_map(
2314
parents_map, self._unstacked_provider.missing_keys)
2316
(start_set, stop_keys,
2317
key_count) = vf_search.limited_search_result_from_parent_map(
2318
parents_map, self._unstacked_provider.missing_keys,
2319
keys, depth=_DEFAULT_SEARCH_DEPTH)
2320
recipe = ('manual', start_set, stop_keys, key_count)
2321
body = self._serialise_search_recipe(recipe)
2322
path = self.controldir._path_for_remote_call(self._client)
2324
if not isinstance(key, str):
2326
"key %r not a plain string" % (key,))
2327
verb = 'Repository.get_parent_map'
2328
args = (path, 'include-missing:') + tuple(keys)
2330
response = self._call_with_body_bytes_expecting_body(
2332
except errors.UnknownSmartMethod:
2333
# Server does not support this method, so get the whole graph.
2334
# Worse, we have to force a disconnection, because the server now
2335
# doesn't realise it has a body on the wire to consume, so the
2336
# only way to recover is to abandon the connection.
2338
'Server is too old for fast get_parent_map, reconnecting. '
2339
'(Upgrade the server to Bazaar 1.2 to avoid this)')
2341
# To avoid having to disconnect repeatedly, we keep track of the
2342
# fact the server doesn't understand remote methods added in 1.2.
2343
medium._remember_remote_is_before((1, 2))
2344
# Recurse just once and we should use the fallback code.
2345
return self._get_parent_map_rpc(keys)
2346
response_tuple, response_handler = response
2347
if response_tuple[0] not in ['ok']:
2348
response_handler.cancel_read_body()
2349
raise errors.UnexpectedSmartServerResponse(response_tuple)
2350
if response_tuple[0] == 'ok':
2351
coded = bz2.decompress(response_handler.read_body_bytes())
2353
# no revisions found
2355
lines = coded.split('\n')
2358
d = tuple(line.split())
2360
revision_graph[d[0]] = d[1:]
2363
if d[0].startswith('missing:'):
2365
self._unstacked_provider.note_missing_key(revid)
2367
# no parents - so give the Graph result
2369
revision_graph[d[0]] = (NULL_REVISION,)
2370
return revision_graph
2372
def get_signature_text(self, revision_id):
2373
with self.lock_read():
2374
path = self.controldir._path_for_remote_call(self._client)
2376
response_tuple, response_handler = self._call_expecting_body(
2377
'Repository.get_revision_signature_text', path, revision_id)
2378
except errors.UnknownSmartMethod:
2380
return self._real_repository.get_signature_text(revision_id)
2381
except errors.NoSuchRevision as err:
2382
for fallback in self._fallback_repositories:
2384
return fallback.get_signature_text(revision_id)
2385
except errors.NoSuchRevision:
2389
if response_tuple[0] != 'ok':
2390
raise errors.UnexpectedSmartServerResponse(response_tuple)
2391
return response_handler.read_body_bytes()
2393
def _get_inventory_xml(self, revision_id):
2394
with self.lock_read():
2395
# This call is used by older working tree formats,
2396
# which stored a serialized basis inventory.
2398
return self._real_repository._get_inventory_xml(revision_id)
2400
def reconcile(self, other=None, thorough=False):
2401
from ..reconcile import RepoReconciler
2402
with self.lock_write():
2403
path = self.controldir._path_for_remote_call(self._client)
2405
response, handler = self._call_expecting_body(
2406
'Repository.reconcile', path, self._lock_token)
2407
except (errors.UnknownSmartMethod, errors.TokenLockingNotSupported):
2409
return self._real_repository.reconcile(other=other, thorough=thorough)
2410
if response != ('ok', ):
2411
raise errors.UnexpectedSmartServerResponse(response)
2412
body = handler.read_body_bytes()
2413
result = RepoReconciler(self)
2414
for line in body.split('\n'):
2417
key, val_text = line.split(':')
2418
if key == "garbage_inventories":
2419
result.garbage_inventories = int(val_text)
2420
elif key == "inconsistent_parents":
2421
result.inconsistent_parents = int(val_text)
2423
mutter("unknown reconcile key %r" % key)
2426
def all_revision_ids(self):
2427
path = self.controldir._path_for_remote_call(self._client)
2429
response_tuple, response_handler = self._call_expecting_body(
2430
"Repository.all_revision_ids", path)
2431
except errors.UnknownSmartMethod:
2433
return self._real_repository.all_revision_ids()
2434
if response_tuple != ("ok", ):
2435
raise errors.UnexpectedSmartServerResponse(response_tuple)
2436
revids = set(response_handler.read_body_bytes().splitlines())
2437
for fallback in self._fallback_repositories:
2438
revids.update(set(fallback.all_revision_ids()))
2441
def _filtered_revision_trees(self, revision_ids, file_ids):
2442
"""Return Tree for a revision on this branch with only some files.
2444
:param revision_ids: a sequence of revision-ids;
2445
a revision-id may not be None or 'null:'
2446
:param file_ids: if not None, the result is filtered
2447
so that only those file-ids, their parents and their
2448
children are included.
2450
inventories = self.iter_inventories(revision_ids)
2451
for inv in inventories:
2452
# Should we introduce a FilteredRevisionTree class rather
2453
# than pre-filter the inventory here?
2454
filtered_inv = inv.filter(file_ids)
2455
yield InventoryRevisionTree(self, filtered_inv, filtered_inv.revision_id)
2457
def get_deltas_for_revisions(self, revisions, specific_fileids=None):
2458
with self.lock_read():
2459
medium = self._client._medium
2460
if medium._is_remote_before((1, 2)):
2462
for delta in self._real_repository.get_deltas_for_revisions(
2463
revisions, specific_fileids):
2466
# Get the revision-ids of interest
2467
required_trees = set()
2468
for revision in revisions:
2469
required_trees.add(revision.revision_id)
2470
required_trees.update(revision.parent_ids[:1])
2472
# Get the matching filtered trees. Note that it's more
2473
# efficient to pass filtered trees to changes_from() rather
2474
# than doing the filtering afterwards. changes_from() could
2475
# arguably do the filtering itself but it's path-based, not
2476
# file-id based, so filtering before or afterwards is
2478
if specific_fileids is None:
2479
trees = dict((t.get_revision_id(), t) for
2480
t in self.revision_trees(required_trees))
2482
trees = dict((t.get_revision_id(), t) for
2483
t in self._filtered_revision_trees(required_trees,
2486
# Calculate the deltas
2487
for revision in revisions:
2488
if not revision.parent_ids:
2489
old_tree = self.revision_tree(_mod_revision.NULL_REVISION)
2491
old_tree = trees[revision.parent_ids[0]]
2492
yield trees[revision.revision_id].changes_from(old_tree)
2494
def get_revision_delta(self, revision_id, specific_fileids=None):
2495
with self.lock_read():
2496
r = self.get_revision(revision_id)
2497
return list(self.get_deltas_for_revisions([r],
2498
specific_fileids=specific_fileids))[0]
2500
def revision_trees(self, revision_ids):
2501
with self.lock_read():
2502
inventories = self.iter_inventories(revision_ids)
2503
for inv in inventories:
2504
yield InventoryRevisionTree(self, inv, inv.revision_id)
2506
def get_revision_reconcile(self, revision_id):
2507
with self.lock_read():
2509
return self._real_repository.get_revision_reconcile(revision_id)
2511
def check(self, revision_ids=None, callback_refs=None, check_repo=True):
2512
with self.lock_read():
2514
return self._real_repository.check(revision_ids=revision_ids,
2515
callback_refs=callback_refs, check_repo=check_repo)
2517
def copy_content_into(self, destination, revision_id=None):
2518
"""Make a complete copy of the content in self into destination.
2520
This is a destructive operation! Do not use it on existing
2523
interrepo = _mod_repository.InterRepository.get(self, destination)
2524
return interrepo.copy_content(revision_id)
2526
def _copy_repository_tarball(self, to_bzrdir, revision_id=None):
2527
# get a tarball of the remote repository, and copy from that into the
2530
# TODO: Maybe a progress bar while streaming the tarball?
2531
note(gettext("Copying repository content as tarball..."))
2532
tar_file = self._get_tarball('bz2')
2533
if tar_file is None:
2535
destination = to_bzrdir.create_repository()
2537
tar = tarfile.open('repository', fileobj=tar_file,
2539
tmpdir = osutils.mkdtemp()
2541
tar.extractall(tmpdir)
2542
tmp_bzrdir = _mod_bzrdir.BzrDir.open(tmpdir)
2543
tmp_repo = tmp_bzrdir.open_repository()
2544
tmp_repo.copy_content_into(destination, revision_id)
2546
osutils.rmtree(tmpdir)
2550
# TODO: Suggestion from john: using external tar is much faster than
2551
# python's tarfile library, but it may not work on windows.
2554
def inventories(self):
2555
"""Decorate the real repository for now.
2557
In the long term a full blown network facility is needed to
2558
avoid creating a real repository object locally.
2561
return self._real_repository.inventories
2563
def pack(self, hint=None, clean_obsolete_packs=False):
2564
"""Compress the data within the repository.
2569
body = "".join([l+"\n" for l in hint])
2570
with self.lock_write():
2571
path = self.controldir._path_for_remote_call(self._client)
2573
response, handler = self._call_with_body_bytes_expecting_body(
2574
'Repository.pack', (path, self._lock_token,
2575
str(clean_obsolete_packs)), body)
2576
except errors.UnknownSmartMethod:
2578
return self._real_repository.pack(hint=hint,
2579
clean_obsolete_packs=clean_obsolete_packs)
2580
handler.cancel_read_body()
2581
if response != ('ok', ):
2582
raise errors.UnexpectedSmartServerResponse(response)
2585
def revisions(self):
2586
"""Decorate the real repository for now.
2588
In the long term a full blown network facility is needed.
2591
return self._real_repository.revisions
2593
def set_make_working_trees(self, new_value):
2595
new_value_str = "True"
2597
new_value_str = "False"
2598
path = self.controldir._path_for_remote_call(self._client)
2600
response = self._call(
2601
'Repository.set_make_working_trees', path, new_value_str)
2602
except errors.UnknownSmartMethod:
2604
self._real_repository.set_make_working_trees(new_value)
2606
if response[0] != 'ok':
2607
raise errors.UnexpectedSmartServerResponse(response)
2610
def signatures(self):
2611
"""Decorate the real repository for now.
2613
In the long term a full blown network facility is needed to avoid
2614
creating a real repository object locally.
2617
return self._real_repository.signatures
2619
def sign_revision(self, revision_id, gpg_strategy):
2620
with self.lock_write():
2621
testament = _mod_testament.Testament.from_revision(self, revision_id)
2622
plaintext = testament.as_short_text()
2623
self.store_revision_signature(gpg_strategy, plaintext, revision_id)
2627
"""Decorate the real repository for now.
2629
In the long term a full blown network facility is needed to avoid
2630
creating a real repository object locally.
2633
return self._real_repository.texts
2635
def _iter_revisions_rpc(self, revision_ids):
2636
body = "\n".join(revision_ids)
2637
path = self.controldir._path_for_remote_call(self._client)
2638
response_tuple, response_handler = (
2639
self._call_with_body_bytes_expecting_body(
2640
"Repository.iter_revisions", (path, ), body))
2641
if response_tuple[0] != "ok":
2642
raise errors.UnexpectedSmartServerResponse(response_tuple)
2643
serializer_format = response_tuple[1]
2644
serializer = serializer_format_registry.get(serializer_format)
2645
byte_stream = response_handler.read_streamed_body()
2646
decompressor = zlib.decompressobj()
2648
for bytes in byte_stream:
2649
chunks.append(decompressor.decompress(bytes))
2650
if decompressor.unused_data != "":
2651
chunks.append(decompressor.flush())
2652
yield serializer.read_revision_from_string("".join(chunks))
2653
unused = decompressor.unused_data
2654
decompressor = zlib.decompressobj()
2655
chunks = [decompressor.decompress(unused)]
2656
chunks.append(decompressor.flush())
2657
text = "".join(chunks)
2659
yield serializer.read_revision_from_string("".join(chunks))
2661
def iter_revisions(self, revision_ids):
2662
for rev_id in revision_ids:
2663
if not rev_id or not isinstance(rev_id, bytes):
2664
raise errors.InvalidRevisionId(
2665
revision_id=rev_id, branch=self)
2666
with self.lock_read():
2668
missing = set(revision_ids)
2669
for rev in self._iter_revisions_rpc(revision_ids):
2670
missing.remove(rev.revision_id)
2671
yield (rev.revision_id, rev)
2672
for fallback in self._fallback_repositories:
2675
for (revid, rev) in fallback.iter_revisions(missing):
2678
missing.remove(revid)
2679
for revid in missing:
2681
except errors.UnknownSmartMethod:
2683
for entry in self._real_repository.iter_revisions(revision_ids):
2686
def supports_rich_root(self):
2687
return self._format.rich_root_data
2690
def _serializer(self):
2691
return self._format._serializer
2693
def store_revision_signature(self, gpg_strategy, plaintext, revision_id):
2694
with self.lock_write():
2695
signature = gpg_strategy.sign(plaintext, gpg.MODE_CLEAR)
2696
self.add_signature_text(revision_id, signature)
2698
def add_signature_text(self, revision_id, signature):
2699
if self._real_repository:
2700
# If there is a real repository the write group will
2701
# be in the real repository as well, so use that:
2703
return self._real_repository.add_signature_text(
2704
revision_id, signature)
2705
path = self.controldir._path_for_remote_call(self._client)
2706
response, handler = self._call_with_body_bytes_expecting_body(
2707
'Repository.add_signature_text', (path, self._lock_token,
2708
revision_id) + tuple(self._write_group_tokens), signature)
2709
handler.cancel_read_body()
2711
if response[0] != 'ok':
2712
raise errors.UnexpectedSmartServerResponse(response)
2713
self._write_group_tokens = response[1:]
2715
def has_signature_for_revision_id(self, revision_id):
2716
path = self.controldir._path_for_remote_call(self._client)
2718
response = self._call('Repository.has_signature_for_revision_id',
2720
except errors.UnknownSmartMethod:
2722
return self._real_repository.has_signature_for_revision_id(
2724
if response[0] not in ('yes', 'no'):
2725
raise SmartProtocolError('unexpected response code %s' % (response,))
2726
if response[0] == 'yes':
2728
for fallback in self._fallback_repositories:
2729
if fallback.has_signature_for_revision_id(revision_id):
2733
def verify_revision_signature(self, revision_id, gpg_strategy):
2734
with self.lock_read():
2735
if not self.has_signature_for_revision_id(revision_id):
2736
return gpg.SIGNATURE_NOT_SIGNED, None
2737
signature = self.get_signature_text(revision_id)
2739
testament = _mod_testament.Testament.from_revision(self, revision_id)
2741
(status, key, signed_plaintext) = gpg_strategy.verify(signature)
2742
if testament.as_short_text() != signed_plaintext:
2743
return gpg.SIGNATURE_NOT_VALID, None
2744
return (status, key)
2746
def item_keys_introduced_by(self, revision_ids, _files_pb=None):
2748
return self._real_repository.item_keys_introduced_by(revision_ids,
2749
_files_pb=_files_pb)
2751
def _find_inconsistent_revision_parents(self, revisions_iterator=None):
2753
return self._real_repository._find_inconsistent_revision_parents(
2756
def _check_for_inconsistent_revision_parents(self):
2758
return self._real_repository._check_for_inconsistent_revision_parents()
2760
def _make_parents_provider(self, other=None):
2761
providers = [self._unstacked_provider]
2762
if other is not None:
2763
providers.insert(0, other)
2764
return graph.StackedParentsProvider(_LazyListJoin(
2765
providers, self._fallback_repositories))
2767
def _serialise_search_recipe(self, recipe):
2768
"""Serialise a graph search recipe.
2770
:param recipe: A search recipe (start, stop, count).
2771
:return: Serialised bytes.
2773
start_keys = ' '.join(recipe[1])
2774
stop_keys = ' '.join(recipe[2])
2775
count = str(recipe[3])
2776
return '\n'.join((start_keys, stop_keys, count))
2778
def _serialise_search_result(self, search_result):
2779
parts = search_result.get_network_struct()
2780
return '\n'.join(parts)
2783
path = self.controldir._path_for_remote_call(self._client)
2785
response = self._call('PackRepository.autopack', path)
2786
except errors.UnknownSmartMethod:
2788
self._real_repository._pack_collection.autopack()
2791
if response[0] != 'ok':
2792
raise errors.UnexpectedSmartServerResponse(response)
2795
class RemoteStreamSink(vf_repository.StreamSink):
2797
def _insert_real(self, stream, src_format, resume_tokens):
2798
self.target_repo._ensure_real()
2799
sink = self.target_repo._real_repository._get_sink()
2800
result = sink.insert_stream(stream, src_format, resume_tokens)
2802
self.target_repo.autopack()
2805
def insert_stream(self, stream, src_format, resume_tokens):
2806
target = self.target_repo
2807
target._unstacked_provider.missing_keys.clear()
2808
candidate_calls = [('Repository.insert_stream_1.19', (1, 19))]
2809
if target._lock_token:
2810
candidate_calls.append(('Repository.insert_stream_locked', (1, 14)))
2811
lock_args = (target._lock_token or '',)
2813
candidate_calls.append(('Repository.insert_stream', (1, 13)))
2815
client = target._client
2816
medium = client._medium
2817
path = target.controldir._path_for_remote_call(client)
2818
# Probe for the verb to use with an empty stream before sending the
2819
# real stream to it. We do this both to avoid the risk of sending a
2820
# large request that is then rejected, and because we don't want to
2821
# implement a way to buffer, rewind, or restart the stream.
2823
for verb, required_version in candidate_calls:
2824
if medium._is_remote_before(required_version):
2827
# We've already done the probing (and set _is_remote_before) on
2828
# a previous insert.
2831
byte_stream = smart_repo._stream_to_byte_stream([], src_format)
2833
response = client.call_with_body_stream(
2834
(verb, path, '') + lock_args, byte_stream)
2835
except errors.UnknownSmartMethod:
2836
medium._remember_remote_is_before(required_version)
2842
return self._insert_real(stream, src_format, resume_tokens)
2843
self._last_inv_record = None
2844
self._last_substream = None
2845
if required_version < (1, 19):
2846
# Remote side doesn't support inventory deltas. Wrap the stream to
2847
# make sure we don't send any. If the stream contains inventory
2848
# deltas we'll interrupt the smart insert_stream request and
2850
stream = self._stop_stream_if_inventory_delta(stream)
2851
byte_stream = smart_repo._stream_to_byte_stream(
2853
resume_tokens = ' '.join(resume_tokens)
2854
response = client.call_with_body_stream(
2855
(verb, path, resume_tokens) + lock_args, byte_stream)
2856
if response[0][0] not in ('ok', 'missing-basis'):
2857
raise errors.UnexpectedSmartServerResponse(response)
2858
if self._last_substream is not None:
2859
# The stream included an inventory-delta record, but the remote
2860
# side isn't new enough to support them. So we need to send the
2861
# rest of the stream via VFS.
2862
self.target_repo.refresh_data()
2863
return self._resume_stream_with_vfs(response, src_format)
2864
if response[0][0] == 'missing-basis':
2865
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
2866
resume_tokens = tokens
2867
return resume_tokens, set(missing_keys)
2869
self.target_repo.refresh_data()
2872
def _resume_stream_with_vfs(self, response, src_format):
2873
"""Resume sending a stream via VFS, first resending the record and
2874
substream that couldn't be sent via an insert_stream verb.
2876
if response[0][0] == 'missing-basis':
2877
tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
2878
# Ignore missing_keys, we haven't finished inserting yet
2881
def resume_substream():
2882
# Yield the substream that was interrupted.
2883
for record in self._last_substream:
2885
self._last_substream = None
2886
def resume_stream():
2887
# Finish sending the interrupted substream
2888
yield ('inventory-deltas', resume_substream())
2889
# Then simply continue sending the rest of the stream.
2890
for substream_kind, substream in self._last_stream:
2891
yield substream_kind, substream
2892
return self._insert_real(resume_stream(), src_format, tokens)
2894
def _stop_stream_if_inventory_delta(self, stream):
2895
"""Normally this just lets the original stream pass-through unchanged.
2897
However if any 'inventory-deltas' substream occurs it will stop
2898
streaming, and store the interrupted substream and stream in
2899
self._last_substream and self._last_stream so that the stream can be
2900
resumed by _resume_stream_with_vfs.
2903
stream_iter = iter(stream)
2904
for substream_kind, substream in stream_iter:
2905
if substream_kind == 'inventory-deltas':
2906
self._last_substream = substream
2907
self._last_stream = stream_iter
2910
yield substream_kind, substream
2913
class RemoteStreamSource(vf_repository.StreamSource):
2914
"""Stream data from a remote server."""
2916
def get_stream(self, search):
2917
if (self.from_repository._fallback_repositories and
2918
self.to_format._fetch_order == 'topological'):
2919
return self._real_stream(self.from_repository, search)
2922
repos = [self.from_repository]
2928
repos.extend(repo._fallback_repositories)
2929
sources.append(repo)
2930
return self.missing_parents_chain(search, sources)
2932
def get_stream_for_missing_keys(self, missing_keys):
2933
self.from_repository._ensure_real()
2934
real_repo = self.from_repository._real_repository
2935
real_source = real_repo._get_source(self.to_format)
2936
return real_source.get_stream_for_missing_keys(missing_keys)
2938
def _real_stream(self, repo, search):
2939
"""Get a stream for search from repo.
2941
This never called RemoteStreamSource.get_stream, and is a helper
2942
for RemoteStreamSource._get_stream to allow getting a stream
2943
reliably whether fallback back because of old servers or trying
2944
to stream from a non-RemoteRepository (which the stacked support
2947
source = repo._get_source(self.to_format)
2948
if isinstance(source, RemoteStreamSource):
2950
source = repo._real_repository._get_source(self.to_format)
2951
return source.get_stream(search)
2953
def _get_stream(self, repo, search):
2954
"""Core worker to get a stream from repo for search.
2956
This is used by both get_stream and the stacking support logic. It
2957
deliberately gets a stream for repo which does not need to be
2958
self.from_repository. In the event that repo is not Remote, or
2959
cannot do a smart stream, a fallback is made to the generic
2960
repository._get_stream() interface, via self._real_stream.
2962
In the event of stacking, streams from _get_stream will not
2963
contain all the data for search - this is normal (see get_stream).
2965
:param repo: A repository.
2966
:param search: A search.
2968
# Fallbacks may be non-smart
2969
if not isinstance(repo, RemoteRepository):
2970
return self._real_stream(repo, search)
2971
client = repo._client
2972
medium = client._medium
2973
path = repo.controldir._path_for_remote_call(client)
2974
search_bytes = repo._serialise_search_result(search)
2975
args = (path, self.to_format.network_name())
2977
('Repository.get_stream_1.19', (1, 19)),
2978
('Repository.get_stream', (1, 13))]
2981
for verb, version in candidate_verbs:
2982
if medium._is_remote_before(version):
2985
response = repo._call_with_body_bytes_expecting_body(
2986
verb, args, search_bytes)
2987
except errors.UnknownSmartMethod:
2988
medium._remember_remote_is_before(version)
2989
except errors.UnknownErrorFromSmartServer as e:
2990
if isinstance(search, vf_search.EverythingResult):
2991
error_verb = e.error_from_smart_server.error_verb
2992
if error_verb == 'BadSearch':
2993
# Pre-2.4 servers don't support this sort of search.
2994
# XXX: perhaps falling back to VFS on BadSearch is a
2995
# good idea in general? It might provide a little bit
2996
# of protection against client-side bugs.
2997
medium._remember_remote_is_before((2, 4))
3001
response_tuple, response_handler = response
3005
return self._real_stream(repo, search)
3006
if response_tuple[0] != 'ok':
3007
raise errors.UnexpectedSmartServerResponse(response_tuple)
3008
byte_stream = response_handler.read_streamed_body()
3009
src_format, stream = smart_repo._byte_stream_to_stream(byte_stream,
3010
self._record_counter)
3011
if src_format.network_name() != repo._format.network_name():
3012
raise AssertionError(
3013
"Mismatched RemoteRepository and stream src %r, %r" % (
3014
src_format.network_name(), repo._format.network_name()))
3017
def missing_parents_chain(self, search, sources):
3018
"""Chain multiple streams together to handle stacking.
3020
:param search: The overall search to satisfy with streams.
3021
:param sources: A list of Repository objects to query.
3023
self.from_serialiser = self.from_repository._format._serializer
3024
self.seen_revs = set()
3025
self.referenced_revs = set()
3026
# If there are heads in the search, or the key count is > 0, we are not
3028
while not search.is_empty() and len(sources) > 1:
3029
source = sources.pop(0)
3030
stream = self._get_stream(source, search)
3031
for kind, substream in stream:
3032
if kind != 'revisions':
3033
yield kind, substream
3035
yield kind, self.missing_parents_rev_handler(substream)
3036
search = search.refine(self.seen_revs, self.referenced_revs)
3037
self.seen_revs = set()
3038
self.referenced_revs = set()
3039
if not search.is_empty():
3040
for kind, stream in self._get_stream(sources[0], search):
3043
def missing_parents_rev_handler(self, substream):
3044
for content in substream:
3045
revision_bytes = content.get_bytes_as('fulltext')
3046
revision = self.from_serialiser.read_revision_from_string(
3048
self.seen_revs.add(content.key[-1])
3049
self.referenced_revs.update(revision.parent_ids)
3053
class RemoteBranchLockableFiles(LockableFiles):
3054
"""A 'LockableFiles' implementation that talks to a smart server.
3056
This is not a public interface class.
3059
def __init__(self, bzrdir, _client):
3060
self.controldir = bzrdir
3061
self._client = _client
3062
self._need_find_modes = True
3063
LockableFiles.__init__(
3064
self, bzrdir.get_branch_transport(None),
3065
'lock', lockdir.LockDir)
3067
def _find_modes(self):
3068
# RemoteBranches don't let the client set the mode of control files.
3069
self._dir_mode = None
3070
self._file_mode = None
3073
class RemoteBranchFormat(branch.BranchFormat):
3075
def __init__(self, network_name=None):
3076
super(RemoteBranchFormat, self).__init__()
3077
self._matchingcontroldir = RemoteBzrDirFormat()
3078
self._matchingcontroldir.set_branch_format(self)
3079
self._custom_format = None
3080
self._network_name = network_name
3082
def __eq__(self, other):
3083
return (isinstance(other, RemoteBranchFormat) and
3084
self.__dict__ == other.__dict__)
3086
def _ensure_real(self):
3087
if self._custom_format is None:
3089
self._custom_format = branch.network_format_registry.get(
3092
raise errors.UnknownFormatError(kind='branch',
3093
format=self._network_name)
3095
def get_format_description(self):
3097
return 'Remote: ' + self._custom_format.get_format_description()
3099
def network_name(self):
3100
return self._network_name
3102
def open(self, a_controldir, name=None, ignore_fallbacks=False):
3103
return a_controldir.open_branch(name=name,
3104
ignore_fallbacks=ignore_fallbacks)
3106
def _vfs_initialize(self, a_controldir, name, append_revisions_only,
3108
# Initialisation when using a local bzrdir object, or a non-vfs init
3109
# method is not available on the server.
3110
# self._custom_format is always set - the start of initialize ensures
3112
if isinstance(a_controldir, RemoteBzrDir):
3113
a_controldir._ensure_real()
3114
result = self._custom_format.initialize(a_controldir._real_bzrdir,
3115
name=name, append_revisions_only=append_revisions_only,
3116
repository=repository)
3118
# We assume the bzrdir is parameterised; it may not be.
3119
result = self._custom_format.initialize(a_controldir, name=name,
3120
append_revisions_only=append_revisions_only,
3121
repository=repository)
3122
if (isinstance(a_controldir, RemoteBzrDir) and
3123
not isinstance(result, RemoteBranch)):
3124
result = RemoteBranch(a_controldir, a_controldir.find_repository(), result,
3128
def initialize(self, a_controldir, name=None, repository=None,
3129
append_revisions_only=None):
3131
name = a_controldir._get_selected_branch()
3132
# 1) get the network name to use.
3133
if self._custom_format:
3134
network_name = self._custom_format.network_name()
3136
# Select the current breezy default and ask for that.
3137
reference_bzrdir_format = controldir.format_registry.get('default')()
3138
reference_format = reference_bzrdir_format.get_branch_format()
3139
self._custom_format = reference_format
3140
network_name = reference_format.network_name()
3141
# Being asked to create on a non RemoteBzrDir:
3142
if not isinstance(a_controldir, RemoteBzrDir):
3143
return self._vfs_initialize(a_controldir, name=name,
3144
append_revisions_only=append_revisions_only,
3145
repository=repository)
3146
medium = a_controldir._client._medium
3147
if medium._is_remote_before((1, 13)):
3148
return self._vfs_initialize(a_controldir, name=name,
3149
append_revisions_only=append_revisions_only,
3150
repository=repository)
3151
# Creating on a remote bzr dir.
3152
# 2) try direct creation via RPC
3153
path = a_controldir._path_for_remote_call(a_controldir._client)
3155
# XXX JRV20100304: Support creating colocated branches
3156
raise errors.NoColocatedBranchSupport(self)
3157
verb = 'BzrDir.create_branch'
3159
response = a_controldir._call(verb, path, network_name)
3160
except errors.UnknownSmartMethod:
3161
# Fallback - use vfs methods
3162
medium._remember_remote_is_before((1, 13))
3163
return self._vfs_initialize(a_controldir, name=name,
3164
append_revisions_only=append_revisions_only,
3165
repository=repository)
3166
if response[0] != 'ok':
3167
raise errors.UnexpectedSmartServerResponse(response)
3168
# Turn the response into a RemoteRepository object.
3169
format = RemoteBranchFormat(network_name=response[1])
3170
repo_format = response_tuple_to_repo_format(response[3:])
3171
repo_path = response[2]
3172
if repository is not None:
3173
remote_repo_url = urlutils.join(a_controldir.user_url, repo_path)
3174
url_diff = urlutils.relative_url(repository.user_url,
3177
raise AssertionError(
3178
'repository.user_url %r does not match URL from server '
3179
'response (%r + %r)'
3180
% (repository.user_url, a_controldir.user_url, repo_path))
3181
remote_repo = repository
3184
repo_bzrdir = a_controldir
3186
repo_bzrdir = RemoteBzrDir(
3187
a_controldir.root_transport.clone(repo_path), a_controldir._format,
3188
a_controldir._client)
3189
remote_repo = RemoteRepository(repo_bzrdir, repo_format)
3190
remote_branch = RemoteBranch(a_controldir, remote_repo,
3191
format=format, setup_stacking=False, name=name)
3192
if append_revisions_only:
3193
remote_branch.set_append_revisions_only(append_revisions_only)
3194
# XXX: We know this is a new branch, so it must have revno 0, revid
3195
# NULL_REVISION. Creating the branch locked would make this be unable
3196
# to be wrong; here its simply very unlikely to be wrong. RBC 20090225
3197
remote_branch._last_revision_info_cache = 0, NULL_REVISION
3198
return remote_branch
3200
def make_tags(self, branch):
3202
return self._custom_format.make_tags(branch)
3204
def supports_tags(self):
3205
# Remote branches might support tags, but we won't know until we
3206
# access the real remote branch.
3208
return self._custom_format.supports_tags()
3210
def supports_stacking(self):
3212
return self._custom_format.supports_stacking()
3214
def supports_set_append_revisions_only(self):
3216
return self._custom_format.supports_set_append_revisions_only()
3218
def _use_default_local_heads_to_fetch(self):
3219
# If the branch format is a metadir format *and* its heads_to_fetch
3220
# implementation is not overridden vs the base class, we can use the
3221
# base class logic rather than use the heads_to_fetch RPC. This is
3222
# usually cheaper in terms of net round trips, as the last-revision and
3223
# tags info fetched is cached and would be fetched anyway.
3225
if isinstance(self._custom_format, bzrbranch.BranchFormatMetadir):
3226
branch_class = self._custom_format._branch_class()
3227
heads_to_fetch_impl = branch_class.heads_to_fetch.__func__
3228
if heads_to_fetch_impl is branch.Branch.heads_to_fetch.__func__:
3233
class RemoteBranchStore(_mod_config.IniFileStore):
3234
"""Branch store which attempts to use HPSS calls to retrieve branch store.
3236
Note that this is specific to bzr-based formats.
3239
def __init__(self, branch):
3240
super(RemoteBranchStore, self).__init__()
3241
self.branch = branch
3243
self._real_store = None
3245
def external_url(self):
3246
return urlutils.join(self.branch.user_url, 'branch.conf')
3248
def _load_content(self):
3249
path = self.branch._remote_path()
3251
response, handler = self.branch._call_expecting_body(
3252
'Branch.get_config_file', path)
3253
except errors.UnknownSmartMethod:
3255
return self._real_store._load_content()
3256
if len(response) and response[0] != 'ok':
3257
raise errors.UnexpectedSmartServerResponse(response)
3258
return handler.read_body_bytes()
3260
def _save_content(self, content):
3261
path = self.branch._remote_path()
3263
response, handler = self.branch._call_with_body_bytes_expecting_body(
3264
'Branch.put_config_file', (path,
3265
self.branch._lock_token, self.branch._repo_lock_token),
3267
except errors.UnknownSmartMethod:
3269
return self._real_store._save_content(content)
3270
handler.cancel_read_body()
3271
if response != ('ok', ):
3272
raise errors.UnexpectedSmartServerResponse(response)
3274
def _ensure_real(self):
3275
self.branch._ensure_real()
3276
if self._real_store is None:
3277
self._real_store = _mod_config.BranchStore(self.branch)
3280
class RemoteBranch(branch.Branch, _RpcHelper, lock._RelockDebugMixin):
3281
"""Branch stored on a server accessed by HPSS RPC.
3283
At the moment most operations are mapped down to simple file operations.
3286
def __init__(self, remote_bzrdir, remote_repository, real_branch=None,
3287
_client=None, format=None, setup_stacking=True, name=None,
3288
possible_transports=None):
3289
"""Create a RemoteBranch instance.
3291
:param real_branch: An optional local implementation of the branch
3292
format, usually accessing the data via the VFS.
3293
:param _client: Private parameter for testing.
3294
:param format: A RemoteBranchFormat object, None to create one
3295
automatically. If supplied it should have a network_name already
3297
:param setup_stacking: If True make an RPC call to determine the
3298
stacked (or not) status of the branch. If False assume the branch
3300
:param name: Colocated branch name
3302
# We intentionally don't call the parent class's __init__, because it
3303
# will try to assign to self.tags, which is a property in this subclass.
3304
# And the parent's __init__ doesn't do much anyway.
3305
self.controldir = remote_bzrdir
3307
if _client is not None:
3308
self._client = _client
3310
self._client = remote_bzrdir._client
3311
self.repository = remote_repository
3312
if real_branch is not None:
3313
self._real_branch = real_branch
3314
# Give the remote repository the matching real repo.
3315
real_repo = self._real_branch.repository
3316
if isinstance(real_repo, RemoteRepository):
3317
real_repo._ensure_real()
3318
real_repo = real_repo._real_repository
3319
self.repository._set_real_repository(real_repo)
3320
# Give the branch the remote repository to let fast-pathing happen.
3321
self._real_branch.repository = self.repository
3323
self._real_branch = None
3324
# Fill out expected attributes of branch for breezy API users.
3325
self._clear_cached_state()
3326
# TODO: deprecate self.base in favor of user_url
3327
self.base = self.controldir.user_url
3329
self._control_files = None
3330
self._lock_mode = None
3331
self._lock_token = None
3332
self._repo_lock_token = None
3333
self._lock_count = 0
3334
self._leave_lock = False
3335
self.conf_store = None
3336
# Setup a format: note that we cannot call _ensure_real until all the
3337
# attributes above are set: This code cannot be moved higher up in this
3340
self._format = RemoteBranchFormat()
3341
if real_branch is not None:
3342
self._format._network_name = \
3343
self._real_branch._format.network_name()
3345
self._format = format
3346
# when we do _ensure_real we may need to pass ignore_fallbacks to the
3347
# branch.open_branch method.
3348
self._real_ignore_fallbacks = not setup_stacking
3349
if not self._format._network_name:
3350
# Did not get from open_branchV2 - old server.
3352
self._format._network_name = \
3353
self._real_branch._format.network_name()
3354
self.tags = self._format.make_tags(self)
3355
# The base class init is not called, so we duplicate this:
3356
hooks = branch.Branch.hooks['open']
3359
self._is_stacked = False
3361
self._setup_stacking(possible_transports)
3363
def _setup_stacking(self, possible_transports):
3364
# configure stacking into the remote repository, by reading it from
3367
fallback_url = self.get_stacked_on_url()
3368
except (errors.NotStacked, branch.UnstackableBranchFormat,
3369
errors.UnstackableRepositoryFormat) as e:
3371
self._is_stacked = True
3372
if possible_transports is None:
3373
possible_transports = []
3375
possible_transports = list(possible_transports)
3376
possible_transports.append(self.controldir.root_transport)
3377
self._activate_fallback_location(fallback_url,
3378
possible_transports=possible_transports)
3380
def _get_config(self):
3381
return RemoteBranchConfig(self)
3383
def _get_config_store(self):
3384
if self.conf_store is None:
3385
self.conf_store = RemoteBranchStore(self)
3386
return self.conf_store
3388
def store_uncommitted(self, creator):
3390
return self._real_branch.store_uncommitted(creator)
3392
def get_unshelver(self, tree):
3394
return self._real_branch.get_unshelver(tree)
3396
def _get_real_transport(self):
3397
# if we try vfs access, return the real branch's vfs transport
3399
return self._real_branch._transport
3401
_transport = property(_get_real_transport)
3404
return "%s(%s)" % (self.__class__.__name__, self.base)
3408
def _ensure_real(self):
3409
"""Ensure that there is a _real_branch set.
3411
Used before calls to self._real_branch.
3413
if self._real_branch is None:
3414
if not vfs.vfs_enabled():
3415
raise AssertionError('smart server vfs must be enabled '
3416
'to use vfs implementation')
3417
self.controldir._ensure_real()
3418
self._real_branch = self.controldir._real_bzrdir.open_branch(
3419
ignore_fallbacks=self._real_ignore_fallbacks, name=self._name)
3420
# The remote branch and the real branch shares the same store. If
3421
# we don't, there will always be cases where one of the stores
3422
# doesn't see an update made on the other.
3423
self._real_branch.conf_store = self.conf_store
3424
if self.repository._real_repository is None:
3425
# Give the remote repository the matching real repo.
3426
real_repo = self._real_branch.repository
3427
if isinstance(real_repo, RemoteRepository):
3428
real_repo._ensure_real()
3429
real_repo = real_repo._real_repository
3430
self.repository._set_real_repository(real_repo)
3431
# Give the real branch the remote repository to let fast-pathing
3433
self._real_branch.repository = self.repository
3434
if self._lock_mode == 'r':
3435
self._real_branch.lock_read()
3436
elif self._lock_mode == 'w':
3437
self._real_branch.lock_write(token=self._lock_token)
3439
def _translate_error(self, err, **context):
3440
self.repository._translate_error(err, branch=self, **context)
3442
def _clear_cached_state(self):
3443
super(RemoteBranch, self)._clear_cached_state()
3444
self._tags_bytes = None
3445
if self._real_branch is not None:
3446
self._real_branch._clear_cached_state()
3448
def _clear_cached_state_of_remote_branch_only(self):
3449
"""Like _clear_cached_state, but doesn't clear the cache of
3452
This is useful when falling back to calling a method of
3453
self._real_branch that changes state. In that case the underlying
3454
branch changes, so we need to invalidate this RemoteBranch's cache of
3455
it. However, there's no need to invalidate the _real_branch's cache
3456
too, in fact doing so might harm performance.
3458
super(RemoteBranch, self)._clear_cached_state()
3461
def control_files(self):
3462
# Defer actually creating RemoteBranchLockableFiles until its needed,
3463
# because it triggers an _ensure_real that we otherwise might not need.
3464
if self._control_files is None:
3465
self._control_files = RemoteBranchLockableFiles(
3466
self.controldir, self._client)
3467
return self._control_files
3469
def get_physical_lock_status(self):
3470
"""See Branch.get_physical_lock_status()."""
3472
response = self._client.call('Branch.get_physical_lock_status',
3473
self._remote_path())
3474
except errors.UnknownSmartMethod:
3476
return self._real_branch.get_physical_lock_status()
3477
if response[0] not in ('yes', 'no'):
3478
raise errors.UnexpectedSmartServerResponse(response)
3479
return (response[0] == 'yes')
3481
def get_stacked_on_url(self):
3482
"""Get the URL this branch is stacked against.
3484
:raises NotStacked: If the branch is not stacked.
3485
:raises UnstackableBranchFormat: If the branch does not support
3487
:raises UnstackableRepositoryFormat: If the repository does not support
3491
# there may not be a repository yet, so we can't use
3492
# self._translate_error, so we can't use self._call either.
3493
response = self._client.call('Branch.get_stacked_on_url',
3494
self._remote_path())
3495
except errors.ErrorFromSmartServer as err:
3496
# there may not be a repository yet, so we can't call through
3497
# its _translate_error
3498
_translate_error(err, branch=self)
3499
except errors.UnknownSmartMethod as err:
3501
return self._real_branch.get_stacked_on_url()
3502
if response[0] != 'ok':
3503
raise errors.UnexpectedSmartServerResponse(response)
3506
def set_stacked_on_url(self, url):
3507
branch.Branch.set_stacked_on_url(self, url)
3508
# We need the stacked_on_url to be visible both locally (to not query
3509
# it repeatedly) and remotely (so smart verbs can get it server side)
3510
# Without the following line,
3511
# breezy.tests.per_branch.test_create_clone.TestCreateClone
3512
# .test_create_clone_on_transport_stacked_hooks_get_stacked_branch
3513
# fails for remote branches -- vila 2012-01-04
3514
self.conf_store.save_changes()
3516
self._is_stacked = False
3518
self._is_stacked = True
3520
def _vfs_get_tags_bytes(self):
3522
return self._real_branch._get_tags_bytes()
3524
def _get_tags_bytes(self):
3525
with self.lock_read():
3526
if self._tags_bytes is None:
3527
self._tags_bytes = self._get_tags_bytes_via_hpss()
3528
return self._tags_bytes
3530
def _get_tags_bytes_via_hpss(self):
3531
medium = self._client._medium
3532
if medium._is_remote_before((1, 13)):
3533
return self._vfs_get_tags_bytes()
3535
response = self._call('Branch.get_tags_bytes', self._remote_path())
3536
except errors.UnknownSmartMethod:
3537
medium._remember_remote_is_before((1, 13))
3538
return self._vfs_get_tags_bytes()
3541
def _vfs_set_tags_bytes(self, bytes):
3543
return self._real_branch._set_tags_bytes(bytes)
3545
def _set_tags_bytes(self, bytes):
3546
if self.is_locked():
3547
self._tags_bytes = bytes
3548
medium = self._client._medium
3549
if medium._is_remote_before((1, 18)):
3550
self._vfs_set_tags_bytes(bytes)
3554
self._remote_path(), self._lock_token, self._repo_lock_token)
3555
response = self._call_with_body_bytes(
3556
'Branch.set_tags_bytes', args, bytes)
3557
except errors.UnknownSmartMethod:
3558
medium._remember_remote_is_before((1, 18))
3559
self._vfs_set_tags_bytes(bytes)
3561
def lock_read(self):
3562
"""Lock the branch for read operations.
3564
:return: A breezy.lock.LogicalLockResult.
3566
self.repository.lock_read()
3567
if not self._lock_mode:
3568
self._note_lock('r')
3569
self._lock_mode = 'r'
3570
self._lock_count = 1
3571
if self._real_branch is not None:
3572
self._real_branch.lock_read()
3574
self._lock_count += 1
3575
return lock.LogicalLockResult(self.unlock)
3577
def _remote_lock_write(self, token):
3579
branch_token = repo_token = ''
3581
branch_token = token
3582
repo_token = self.repository.lock_write().repository_token
3583
self.repository.unlock()
3584
err_context = {'token': token}
3586
response = self._call(
3587
'Branch.lock_write', self._remote_path(), branch_token,
3588
repo_token or '', **err_context)
3589
except errors.LockContention as e:
3590
# The LockContention from the server doesn't have any
3591
# information about the lock_url. We re-raise LockContention
3592
# with valid lock_url.
3593
raise errors.LockContention('(remote lock)',
3594
self.repository.base.split('.bzr/')[0])
3595
if response[0] != 'ok':
3596
raise errors.UnexpectedSmartServerResponse(response)
3597
ok, branch_token, repo_token = response
3598
return branch_token, repo_token
3600
def lock_write(self, token=None):
3601
if not self._lock_mode:
3602
self._note_lock('w')
3603
# Lock the branch and repo in one remote call.
3604
remote_tokens = self._remote_lock_write(token)
3605
self._lock_token, self._repo_lock_token = remote_tokens
3606
if not self._lock_token:
3607
raise SmartProtocolError('Remote server did not return a token!')
3608
# Tell the self.repository object that it is locked.
3609
self.repository.lock_write(
3610
self._repo_lock_token, _skip_rpc=True)
3612
if self._real_branch is not None:
3613
self._real_branch.lock_write(token=self._lock_token)
3614
if token is not None:
3615
self._leave_lock = True
3617
self._leave_lock = False
3618
self._lock_mode = 'w'
3619
self._lock_count = 1
3620
elif self._lock_mode == 'r':
3621
raise errors.ReadOnlyError(self)
3623
if token is not None:
3624
# A token was given to lock_write, and we're relocking, so
3625
# check that the given token actually matches the one we
3627
if token != self._lock_token:
3628
raise errors.TokenMismatch(token, self._lock_token)
3629
self._lock_count += 1
3630
# Re-lock the repository too.
3631
self.repository.lock_write(self._repo_lock_token)
3632
return BranchWriteLockResult(self.unlock, self._lock_token or None)
3634
def _unlock(self, branch_token, repo_token):
3635
err_context = {'token': str((branch_token, repo_token))}
3636
response = self._call(
3637
'Branch.unlock', self._remote_path(), branch_token,
3638
repo_token or '', **err_context)
3639
if response == ('ok',):
3641
raise errors.UnexpectedSmartServerResponse(response)
3643
@only_raises(errors.LockNotHeld, errors.LockBroken)
3646
self._lock_count -= 1
3647
if not self._lock_count:
3648
if self.conf_store is not None:
3649
self.conf_store.save_changes()
3650
self._clear_cached_state()
3651
mode = self._lock_mode
3652
self._lock_mode = None
3653
if self._real_branch is not None:
3654
if (not self._leave_lock and mode == 'w' and
3655
self._repo_lock_token):
3656
# If this RemoteBranch will remove the physical lock
3657
# for the repository, make sure the _real_branch
3658
# doesn't do it first. (Because the _real_branch's
3659
# repository is set to be the RemoteRepository.)
3660
self._real_branch.repository.leave_lock_in_place()
3661
self._real_branch.unlock()
3663
# Only write-locked branched need to make a remote method
3664
# call to perform the unlock.
3666
if not self._lock_token:
3667
raise AssertionError('Locked, but no token!')
3668
branch_token = self._lock_token
3669
repo_token = self._repo_lock_token
3670
self._lock_token = None
3671
self._repo_lock_token = None
3672
if not self._leave_lock:
3673
self._unlock(branch_token, repo_token)
3675
self.repository.unlock()
3677
def break_lock(self):
3679
response = self._call(
3680
'Branch.break_lock', self._remote_path())
3681
except errors.UnknownSmartMethod:
3683
return self._real_branch.break_lock()
3684
if response != ('ok',):
3685
raise errors.UnexpectedSmartServerResponse(response)
3687
def leave_lock_in_place(self):
3688
if not self._lock_token:
3689
raise NotImplementedError(self.leave_lock_in_place)
3690
self._leave_lock = True
3692
def dont_leave_lock_in_place(self):
3693
if not self._lock_token:
3694
raise NotImplementedError(self.dont_leave_lock_in_place)
3695
self._leave_lock = False
3697
def get_rev_id(self, revno, history=None):
3699
return _mod_revision.NULL_REVISION
3700
with self.lock_read():
3701
last_revision_info = self.last_revision_info()
3702
ok, result = self.repository.get_rev_id_for_revno(
3703
revno, last_revision_info)
3706
missing_parent = result[1]
3707
# Either the revision named by the server is missing, or its parent
3708
# is. Call get_parent_map to determine which, so that we report a
3710
parent_map = self.repository.get_parent_map([missing_parent])
3711
if missing_parent in parent_map:
3712
missing_parent = parent_map[missing_parent]
3713
raise errors.RevisionNotPresent(missing_parent, self.repository)
3715
def _read_last_revision_info(self):
3716
response = self._call('Branch.last_revision_info', self._remote_path())
3717
if response[0] != 'ok':
3718
raise SmartProtocolError('unexpected response code %s' % (response,))
3719
revno = int(response[1])
3720
last_revision = response[2]
3721
return (revno, last_revision)
3723
def _gen_revision_history(self):
3724
"""See Branch._gen_revision_history()."""
3725
if self._is_stacked:
3727
return self._real_branch._gen_revision_history()
3728
response_tuple, response_handler = self._call_expecting_body(
3729
'Branch.revision_history', self._remote_path())
3730
if response_tuple[0] != 'ok':
3731
raise errors.UnexpectedSmartServerResponse(response_tuple)
3732
result = response_handler.read_body_bytes().split('\x00')
3737
def _remote_path(self):
3738
return self.controldir._path_for_remote_call(self._client)
3740
def _set_last_revision_descendant(self, revision_id, other_branch,
3741
allow_diverged=False, allow_overwrite_descendant=False):
3742
# This performs additional work to meet the hook contract; while its
3743
# undesirable, we have to synthesise the revno to call the hook, and
3744
# not calling the hook is worse as it means changes can't be prevented.
3745
# Having calculated this though, we can't just call into
3746
# set_last_revision_info as a simple call, because there is a set_rh
3747
# hook that some folk may still be using.
3748
old_revno, old_revid = self.last_revision_info()
3749
history = self._lefthand_history(revision_id)
3750
self._run_pre_change_branch_tip_hooks(len(history), revision_id)
3751
err_context = {'other_branch': other_branch}
3752
response = self._call('Branch.set_last_revision_ex',
3753
self._remote_path(), self._lock_token, self._repo_lock_token,
3754
revision_id, int(allow_diverged), int(allow_overwrite_descendant),
3756
self._clear_cached_state()
3757
if len(response) != 3 and response[0] != 'ok':
3758
raise errors.UnexpectedSmartServerResponse(response)
3759
new_revno, new_revision_id = response[1:]
3760
self._last_revision_info_cache = new_revno, new_revision_id
3761
self._run_post_change_branch_tip_hooks(old_revno, old_revid)
3762
if self._real_branch is not None:
3763
cache = new_revno, new_revision_id
3764
self._real_branch._last_revision_info_cache = cache
3766
def _set_last_revision(self, revision_id):
3767
old_revno, old_revid = self.last_revision_info()
3768
# This performs additional work to meet the hook contract; while its
3769
# undesirable, we have to synthesise the revno to call the hook, and
3770
# not calling the hook is worse as it means changes can't be prevented.
3771
# Having calculated this though, we can't just call into
3772
# set_last_revision_info as a simple call, because there is a set_rh
3773
# hook that some folk may still be using.
3774
history = self._lefthand_history(revision_id)
3775
self._run_pre_change_branch_tip_hooks(len(history), revision_id)
3776
self._clear_cached_state()
3777
response = self._call('Branch.set_last_revision',
3778
self._remote_path(), self._lock_token, self._repo_lock_token,
3780
if response != ('ok',):
3781
raise errors.UnexpectedSmartServerResponse(response)
3782
self._run_post_change_branch_tip_hooks(old_revno, old_revid)
3784
def _get_parent_location(self):
3785
medium = self._client._medium
3786
if medium._is_remote_before((1, 13)):
3787
return self._vfs_get_parent_location()
3789
response = self._call('Branch.get_parent', self._remote_path())
3790
except errors.UnknownSmartMethod:
3791
medium._remember_remote_is_before((1, 13))
3792
return self._vfs_get_parent_location()
3793
if len(response) != 1:
3794
raise errors.UnexpectedSmartServerResponse(response)
3795
parent_location = response[0]
3796
if parent_location == '':
3798
return parent_location
3800
def _vfs_get_parent_location(self):
3802
return self._real_branch._get_parent_location()
3804
def _set_parent_location(self, url):
3805
medium = self._client._medium
3806
if medium._is_remote_before((1, 15)):
3807
return self._vfs_set_parent_location(url)
3809
call_url = url or ''
3810
if not isinstance(call_url, str):
3811
raise AssertionError('url must be a str or None (%s)' % url)
3812
response = self._call('Branch.set_parent_location',
3813
self._remote_path(), self._lock_token, self._repo_lock_token,
3815
except errors.UnknownSmartMethod:
3816
medium._remember_remote_is_before((1, 15))
3817
return self._vfs_set_parent_location(url)
3819
raise errors.UnexpectedSmartServerResponse(response)
3821
def _vfs_set_parent_location(self, url):
3823
return self._real_branch._set_parent_location(url)
3825
def pull(self, source, overwrite=False, stop_revision=None,
3827
with self.lock_write():
3828
self._clear_cached_state_of_remote_branch_only()
3830
return self._real_branch.pull(
3831
source, overwrite=overwrite, stop_revision=stop_revision,
3832
_override_hook_target=self, **kwargs)
3834
def push(self, target, overwrite=False, stop_revision=None, lossy=False):
3835
with self.lock_read():
3837
return self._real_branch.push(
3838
target, overwrite=overwrite, stop_revision=stop_revision, lossy=lossy,
3839
_override_hook_source_branch=self)
3841
def peek_lock_mode(self):
3842
return self._lock_mode
3844
def is_locked(self):
3845
return self._lock_count >= 1
3847
def revision_id_to_dotted_revno(self, revision_id):
3848
"""Given a revision id, return its dotted revno.
3850
:return: a tuple like (1,) or (400,1,3).
3852
with self.lock_read():
3854
response = self._call('Branch.revision_id_to_revno',
3855
self._remote_path(), revision_id)
3856
except errors.UnknownSmartMethod:
3858
return self._real_branch.revision_id_to_dotted_revno(revision_id)
3859
if response[0] == 'ok':
3860
return tuple([int(x) for x in response[1:]])
3862
raise errors.UnexpectedSmartServerResponse(response)
3864
def revision_id_to_revno(self, revision_id):
3865
"""Given a revision id on the branch mainline, return its revno.
3869
with self.lock_read():
3871
response = self._call('Branch.revision_id_to_revno',
3872
self._remote_path(), revision_id)
3873
except errors.UnknownSmartMethod:
3875
return self._real_branch.revision_id_to_revno(revision_id)
3876
if response[0] == 'ok':
3877
if len(response) == 2:
3878
return int(response[1])
3879
raise NoSuchRevision(self, revision_id)
3881
raise errors.UnexpectedSmartServerResponse(response)
3883
def set_last_revision_info(self, revno, revision_id):
3884
with self.lock_write():
3885
# XXX: These should be returned by the set_last_revision_info verb
3886
old_revno, old_revid = self.last_revision_info()
3887
self._run_pre_change_branch_tip_hooks(revno, revision_id)
3888
if not revision_id or not isinstance(revision_id, bytes):
3889
raise errors.InvalidRevisionId(revision_id=revision_id, branch=self)
3891
response = self._call('Branch.set_last_revision_info',
3892
self._remote_path(), self._lock_token, self._repo_lock_token,
3893
str(revno), revision_id)
3894
except errors.UnknownSmartMethod:
3896
self._clear_cached_state_of_remote_branch_only()
3897
self._real_branch.set_last_revision_info(revno, revision_id)
3898
self._last_revision_info_cache = revno, revision_id
3900
if response == ('ok',):
3901
self._clear_cached_state()
3902
self._last_revision_info_cache = revno, revision_id
3903
self._run_post_change_branch_tip_hooks(old_revno, old_revid)
3904
# Update the _real_branch's cache too.
3905
if self._real_branch is not None:
3906
cache = self._last_revision_info_cache
3907
self._real_branch._last_revision_info_cache = cache
3909
raise errors.UnexpectedSmartServerResponse(response)
3911
def generate_revision_history(self, revision_id, last_rev=None,
3913
with self.lock_write():
3914
medium = self._client._medium
3915
if not medium._is_remote_before((1, 6)):
3916
# Use a smart method for 1.6 and above servers
3918
self._set_last_revision_descendant(revision_id, other_branch,
3919
allow_diverged=True, allow_overwrite_descendant=True)
3921
except errors.UnknownSmartMethod:
3922
medium._remember_remote_is_before((1, 6))
3923
self._clear_cached_state_of_remote_branch_only()
3924
graph = self.repository.get_graph()
3925
(last_revno, last_revid) = self.last_revision_info()
3926
known_revision_ids = [
3927
(last_revid, last_revno),
3928
(_mod_revision.NULL_REVISION, 0),
3930
if last_rev is not None:
3931
if not graph.is_ancestor(last_rev, revision_id):
3932
# our previous tip is not merged into stop_revision
3933
raise errors.DivergedBranches(self, other_branch)
3934
revno = graph.find_distance_to_null(revision_id, known_revision_ids)
3935
self.set_last_revision_info(revno, revision_id)
3937
def set_push_location(self, location):
3938
self._set_config_location('push_location', location)
3940
def heads_to_fetch(self):
3941
if self._format._use_default_local_heads_to_fetch():
3942
# We recognise this format, and its heads-to-fetch implementation
3943
# is the default one (tip + tags). In this case it's cheaper to
3944
# just use the default implementation rather than a special RPC as
3945
# the tip and tags data is cached.
3946
return branch.Branch.heads_to_fetch(self)
3947
medium = self._client._medium
3948
if medium._is_remote_before((2, 4)):
3949
return self._vfs_heads_to_fetch()
3951
return self._rpc_heads_to_fetch()
3952
except errors.UnknownSmartMethod:
3953
medium._remember_remote_is_before((2, 4))
3954
return self._vfs_heads_to_fetch()
3956
def _rpc_heads_to_fetch(self):
3957
response = self._call('Branch.heads_to_fetch', self._remote_path())
3958
if len(response) != 2:
3959
raise errors.UnexpectedSmartServerResponse(response)
3960
must_fetch, if_present_fetch = response
3961
return set(must_fetch), set(if_present_fetch)
3963
def _vfs_heads_to_fetch(self):
3965
return self._real_branch.heads_to_fetch()
3968
class RemoteConfig(object):
3969
"""A Config that reads and writes from smart verbs.
3971
It is a low-level object that considers config data to be name/value pairs
3972
that may be associated with a section. Assigning meaning to the these
3973
values is done at higher levels like breezy.config.TreeConfig.
3976
def get_option(self, name, section=None, default=None):
3977
"""Return the value associated with a named option.
3979
:param name: The name of the value
3980
:param section: The section the option is in (if any)
3981
:param default: The value to return if the value is not set
3982
:return: The value or default value
3985
configobj = self._get_configobj()
3988
section_obj = configobj
3991
section_obj = configobj[section]
3994
if section_obj is None:
3997
value = section_obj.get(name, default)
3998
except errors.UnknownSmartMethod:
3999
value = self._vfs_get_option(name, section, default)
4000
for hook in _mod_config.OldConfigHooks['get']:
4001
hook(self, name, value)
4004
def _response_to_configobj(self, response):
4005
if len(response[0]) and response[0][0] != 'ok':
4006
raise errors.UnexpectedSmartServerResponse(response)
4007
lines = response[1].read_body_bytes().splitlines()
4008
conf = _mod_config.ConfigObj(lines, encoding='utf-8')
4009
for hook in _mod_config.OldConfigHooks['load']:
4014
class RemoteBranchConfig(RemoteConfig):
4015
"""A RemoteConfig for Branches."""
4017
def __init__(self, branch):
4018
self._branch = branch
4020
def _get_configobj(self):
4021
path = self._branch._remote_path()
4022
response = self._branch._client.call_expecting_body(
4023
'Branch.get_config_file', path)
4024
return self._response_to_configobj(response)
4026
def set_option(self, value, name, section=None):
4027
"""Set the value associated with a named option.
4029
:param value: The value to set
4030
:param name: The name of the value to set
4031
:param section: The section the option is in (if any)
4033
medium = self._branch._client._medium
4034
if medium._is_remote_before((1, 14)):
4035
return self._vfs_set_option(value, name, section)
4036
if isinstance(value, dict):
4037
if medium._is_remote_before((2, 2)):
4038
return self._vfs_set_option(value, name, section)
4039
return self._set_config_option_dict(value, name, section)
4041
return self._set_config_option(value, name, section)
4043
def _set_config_option(self, value, name, section):
4045
path = self._branch._remote_path()
4046
response = self._branch._client.call('Branch.set_config_option',
4047
path, self._branch._lock_token, self._branch._repo_lock_token,
4048
value.encode('utf8'), name, section or '')
4049
except errors.UnknownSmartMethod:
4050
medium = self._branch._client._medium
4051
medium._remember_remote_is_before((1, 14))
4052
return self._vfs_set_option(value, name, section)
4054
raise errors.UnexpectedSmartServerResponse(response)
4056
def _serialize_option_dict(self, option_dict):
4058
for key, value in option_dict.items():
4059
if isinstance(key, unicode):
4060
key = key.encode('utf8')
4061
if isinstance(value, unicode):
4062
value = value.encode('utf8')
4063
utf8_dict[key] = value
4064
return bencode.bencode(utf8_dict)
4066
def _set_config_option_dict(self, value, name, section):
4068
path = self._branch._remote_path()
4069
serialised_dict = self._serialize_option_dict(value)
4070
response = self._branch._client.call(
4071
'Branch.set_config_option_dict',
4072
path, self._branch._lock_token, self._branch._repo_lock_token,
4073
serialised_dict, name, section or '')
4074
except errors.UnknownSmartMethod:
4075
medium = self._branch._client._medium
4076
medium._remember_remote_is_before((2, 2))
4077
return self._vfs_set_option(value, name, section)
4079
raise errors.UnexpectedSmartServerResponse(response)
4081
def _real_object(self):
4082
self._branch._ensure_real()
4083
return self._branch._real_branch
4085
def _vfs_set_option(self, value, name, section=None):
4086
return self._real_object()._get_config().set_option(
4087
value, name, section)
4090
class RemoteBzrDirConfig(RemoteConfig):
4091
"""A RemoteConfig for BzrDirs."""
4093
def __init__(self, bzrdir):
4094
self._bzrdir = bzrdir
4096
def _get_configobj(self):
4097
medium = self._bzrdir._client._medium
4098
verb = 'BzrDir.get_config_file'
4099
if medium._is_remote_before((1, 15)):
4100
raise errors.UnknownSmartMethod(verb)
4101
path = self._bzrdir._path_for_remote_call(self._bzrdir._client)
4102
response = self._bzrdir._call_expecting_body(
4104
return self._response_to_configobj(response)
4106
def _vfs_get_option(self, name, section, default):
4107
return self._real_object()._get_config().get_option(
4108
name, section, default)
4110
def set_option(self, value, name, section=None):
4111
"""Set the value associated with a named option.
4113
:param value: The value to set
4114
:param name: The name of the value to set
4115
:param section: The section the option is in (if any)
4117
return self._real_object()._get_config().set_option(
4118
value, name, section)
4120
def _real_object(self):
4121
self._bzrdir._ensure_real()
4122
return self._bzrdir._real_bzrdir
4125
error_translators = registry.Registry()
4126
no_context_error_translators = registry.Registry()
4129
def _translate_error(err, **context):
4130
"""Translate an ErrorFromSmartServer into a more useful error.
4132
Possible context keys:
4140
If the error from the server doesn't match a known pattern, then
4141
UnknownErrorFromSmartServer is raised.
4145
return context[name]
4146
except KeyError as key_err:
4147
mutter('Missing key %r in context %r', key_err.args[0], context)
4150
"""Get the path from the context if present, otherwise use first error
4154
return context['path']
4155
except KeyError as key_err:
4157
return err.error_args[0]
4158
except IndexError as idx_err:
4160
'Missing key %r in context %r', key_err.args[0], context)
4164
translator = error_translators.get(err.error_verb)
4168
raise translator(err, find, get_path)
4170
translator = no_context_error_translators.get(err.error_verb)
4172
raise errors.UnknownErrorFromSmartServer(err)
4174
raise translator(err)
4177
error_translators.register('NoSuchRevision',
4178
lambda err, find, get_path: NoSuchRevision(
4179
find('branch'), err.error_args[0]))
4180
error_translators.register('nosuchrevision',
4181
lambda err, find, get_path: NoSuchRevision(
4182
find('repository'), err.error_args[0]))
4184
def _translate_nobranch_error(err, find, get_path):
4185
if len(err.error_args) >= 1:
4186
extra = err.error_args[0]
4189
return errors.NotBranchError(path=find('bzrdir').root_transport.base,
4192
error_translators.register('nobranch', _translate_nobranch_error)
4193
error_translators.register('norepository',
4194
lambda err, find, get_path: errors.NoRepositoryPresent(
4196
error_translators.register('UnlockableTransport',
4197
lambda err, find, get_path: errors.UnlockableTransport(
4198
find('bzrdir').root_transport))
4199
error_translators.register('TokenMismatch',
4200
lambda err, find, get_path: errors.TokenMismatch(
4201
find('token'), '(remote token)'))
4202
error_translators.register('Diverged',
4203
lambda err, find, get_path: errors.DivergedBranches(
4204
find('branch'), find('other_branch')))
4205
error_translators.register('NotStacked',
4206
lambda err, find, get_path: errors.NotStacked(branch=find('branch')))
4208
def _translate_PermissionDenied(err, find, get_path):
4210
if len(err.error_args) >= 2:
4211
extra = err.error_args[1]
4214
return errors.PermissionDenied(path, extra=extra)
4216
error_translators.register('PermissionDenied', _translate_PermissionDenied)
4217
error_translators.register('ReadError',
4218
lambda err, find, get_path: errors.ReadError(get_path()))
4219
error_translators.register('NoSuchFile',
4220
lambda err, find, get_path: errors.NoSuchFile(get_path()))
4221
error_translators.register('TokenLockingNotSupported',
4222
lambda err, find, get_path: errors.TokenLockingNotSupported(
4223
find('repository')))
4224
error_translators.register('UnsuspendableWriteGroup',
4225
lambda err, find, get_path: errors.UnsuspendableWriteGroup(
4226
repository=find('repository')))
4227
error_translators.register('UnresumableWriteGroup',
4228
lambda err, find, get_path: errors.UnresumableWriteGroup(
4229
repository=find('repository'), write_groups=err.error_args[0],
4230
reason=err.error_args[1]))
4231
no_context_error_translators.register('IncompatibleRepositories',
4232
lambda err: errors.IncompatibleRepositories(
4233
err.error_args[0], err.error_args[1], err.error_args[2]))
4234
no_context_error_translators.register('LockContention',
4235
lambda err: errors.LockContention('(remote lock)'))
4236
no_context_error_translators.register('LockFailed',
4237
lambda err: errors.LockFailed(err.error_args[0], err.error_args[1]))
4238
no_context_error_translators.register('TipChangeRejected',
4239
lambda err: errors.TipChangeRejected(err.error_args[0].decode('utf8')))
4240
no_context_error_translators.register('UnstackableBranchFormat',
4241
lambda err: branch.UnstackableBranchFormat(*err.error_args))
4242
no_context_error_translators.register('UnstackableRepositoryFormat',
4243
lambda err: errors.UnstackableRepositoryFormat(*err.error_args))
4244
no_context_error_translators.register('FileExists',
4245
lambda err: errors.FileExists(err.error_args[0]))
4246
no_context_error_translators.register('DirectoryNotEmpty',
4247
lambda err: errors.DirectoryNotEmpty(err.error_args[0]))
4249
def _translate_short_readv_error(err):
4250
args = err.error_args
4251
return errors.ShortReadvError(args[0], int(args[1]), int(args[2]),
4254
no_context_error_translators.register('ShortReadvError',
4255
_translate_short_readv_error)
4257
def _translate_unicode_error(err):
4258
encoding = str(err.error_args[0]) # encoding must always be a string
4259
val = err.error_args[1]
4260
start = int(err.error_args[2])
4261
end = int(err.error_args[3])
4262
reason = str(err.error_args[4]) # reason must always be a string
4263
if val.startswith('u:'):
4264
val = val[2:].decode('utf-8')
4265
elif val.startswith('s:'):
4266
val = val[2:].decode('base64')
4267
if err.error_verb == 'UnicodeDecodeError':
4268
raise UnicodeDecodeError(encoding, val, start, end, reason)
4269
elif err.error_verb == 'UnicodeEncodeError':
4270
raise UnicodeEncodeError(encoding, val, start, end, reason)
4272
no_context_error_translators.register('UnicodeEncodeError',
4273
_translate_unicode_error)
4274
no_context_error_translators.register('UnicodeDecodeError',
4275
_translate_unicode_error)
4276
no_context_error_translators.register('ReadOnlyError',
4277
lambda err: errors.TransportNotPossible('readonly transport'))
4278
no_context_error_translators.register('MemoryError',
4279
lambda err: errors.BzrError("remote server out of memory\n"
4280
"Retry non-remotely, or contact the server admin for details."))
4281
no_context_error_translators.register('RevisionNotPresent',
4282
lambda err: errors.RevisionNotPresent(err.error_args[0], err.error_args[1]))
4284
no_context_error_translators.register('BzrCheckError',
4285
lambda err: errors.BzrCheckError(msg=err.error_args[0]))