43
43
from bzrlib.trace import mutter, note, warning
46
class _RpcHelper(object):
47
"""Mixin class that helps with issuing RPCs."""
49
def _call(self, method, *args, **err_context):
51
return self._client.call(method, *args)
52
except errors.ErrorFromSmartServer, err:
53
self._translate_error(err, **err_context)
55
def _call_expecting_body(self, method, *args, **err_context):
57
return self._client.call_expecting_body(method, *args)
58
except errors.ErrorFromSmartServer, err:
59
self._translate_error(err, **err_context)
61
def _call_with_body_bytes_expecting_body(self, method, args, body_bytes,
64
return self._client.call_with_body_bytes_expecting_body(
65
method, args, body_bytes)
66
except errors.ErrorFromSmartServer, err:
67
self._translate_error(err, **err_context)
46
69
# Note: RemoteBzrDirFormat is in bzrdir.py
48
class RemoteBzrDir(BzrDir):
71
class RemoteBzrDir(BzrDir, _RpcHelper):
49
72
"""Control directory on a remote server, accessed via bzr:// or similar."""
51
74
def __init__(self, transport, _client=None):
69
92
path = self._path_for_remote_call(self._client)
70
response = self._client.call('BzrDir.open', path)
93
response = self._call('BzrDir.open', path)
71
94
if response not in [('yes',), ('no',)]:
72
95
raise errors.UnexpectedSmartServerResponse(response)
73
96
if response == ('no',):
82
105
self._real_bzrdir = BzrDir.open_from_transport(
83
106
self.root_transport, _server_formats=False)
108
def _translate_error(self, err, **context):
109
_translate_error(err, bzrdir=self, **context)
85
111
def cloning_metadir(self, stacked=False):
86
112
self._ensure_real()
87
113
return self._real_bzrdir.cloning_metadir(stacked)
89
def _translate_error(self, err, **context):
90
_translate_error(err, bzrdir=self, **context)
92
115
def create_repository(self, shared=False):
93
116
self._ensure_real()
94
117
self._real_bzrdir.create_repository(shared=shared)
123
146
def get_branch_reference(self):
124
147
"""See BzrDir.get_branch_reference()."""
125
148
path = self._path_for_remote_call(self._client)
127
response = self._client.call('BzrDir.open_branch', path)
128
except errors.ErrorFromSmartServer, err:
129
self._translate_error(err)
149
response = self._call('BzrDir.open_branch', path)
130
150
if response[0] == 'ok':
131
151
if response[1] == '':
132
152
# branch at this location.
157
177
path = self._path_for_remote_call(self._client)
158
178
verb = 'BzrDir.find_repositoryV2'
161
response = self._client.call(verb, path)
162
except errors.UnknownSmartMethod:
163
verb = 'BzrDir.find_repository'
164
response = self._client.call(verb, path)
165
except errors.ErrorFromSmartServer, err:
166
self._translate_error(err)
180
response = self._call(verb, path)
181
except errors.UnknownSmartMethod:
182
verb = 'BzrDir.find_repository'
183
response = self._call(verb, path)
167
184
if response[0] != 'ok':
168
185
raise errors.UnexpectedSmartServerResponse(response)
169
186
if verb == 'BzrDir.find_repository':
270
287
'Does not support nested trees', target_format)
273
class RemoteRepository(object):
290
class _UnstackedParentsProvider(object):
291
"""ParentsProvider for RemoteRepository that ignores stacking."""
293
def __init__(self, remote_repository):
294
self._remote_repository = remote_repository
296
def get_parent_map(self, revision_ids):
297
"""See RemoteRepository.get_parent_map."""
298
return self._remote_repository._get_parent_map(revision_ids)
301
class RemoteRepository(_RpcHelper):
274
302
"""Repository accessed over rpc.
276
304
For the moment most operations are performed using local transport-backed
324
352
__repr__ = __str__
326
def abort_write_group(self):
354
def abort_write_group(self, suppress_errors=False):
327
355
"""Complete a write group on the decorated repository.
329
357
Smart methods peform operations in a single step so this api
330
358
is not really applicable except as a compatibility thunk
331
359
for older plugins that don't use e.g. the CommitBuilder
362
:param suppress_errors: see Repository.abort_write_group.
334
364
self._ensure_real()
335
return self._real_repository.abort_write_group()
365
return self._real_repository.abort_write_group(
366
suppress_errors=suppress_errors)
337
368
def commit_write_group(self):
338
369
"""Complete a write group on the decorated repository.
398
429
path = self.bzrdir._path_for_remote_call(self._client)
400
response = self._client.call_expecting_body(
401
'Repository.get_revision_graph', path, revision_id)
402
except errors.ErrorFromSmartServer, err:
403
self._translate_error(err)
430
response = self._call_expecting_body(
431
'Repository.get_revision_graph', path, revision_id)
404
432
response_tuple, response_handler = response
405
433
if response_tuple[0] != 'ok':
406
434
raise errors.UnexpectedSmartServerResponse(response_tuple)
422
450
# The null revision is always present.
424
452
path = self.bzrdir._path_for_remote_call(self._client)
425
response = self._client.call(
426
'Repository.has_revision', path, revision_id)
453
response = self._call('Repository.has_revision', path, revision_id)
427
454
if response[0] not in ('yes', 'no'):
428
455
raise errors.UnexpectedSmartServerResponse(response)
429
456
if response[0] == 'yes':
446
473
def has_same_location(self, other):
447
474
return (self.__class__ == other.__class__ and
448
475
self.bzrdir.transport.base == other.bzrdir.transport.base)
450
477
def get_graph(self, other_repository=None):
451
478
"""Return the graph for this repository format"""
452
parents_provider = self
479
parents_provider = self._make_parents_provider()
453
480
if (other_repository is not None and
454
481
other_repository.bzrdir.transport.base !=
455
482
self.bzrdir.transport.base):
469
496
fmt_committers = 'no'
471
498
fmt_committers = 'yes'
472
response_tuple, response_handler = self._client.call_expecting_body(
499
response_tuple, response_handler = self._call_expecting_body(
473
500
'Repository.gather_stats', path, fmt_revid, fmt_committers)
474
501
if response_tuple[0] != 'ok':
475
502
raise errors.UnexpectedSmartServerResponse(response_tuple)
514
541
def is_shared(self):
515
542
"""See Repository.is_shared()."""
516
543
path = self.bzrdir._path_for_remote_call(self._client)
517
response = self._client.call('Repository.is_shared', path)
544
response = self._call('Repository.is_shared', path)
518
545
if response[0] not in ('yes', 'no'):
519
546
raise SmartProtocolError('unexpected response code %s' % (response,))
520
547
return response[0] == 'yes'
539
566
path = self.bzrdir._path_for_remote_call(self._client)
540
567
if token is None:
543
response = self._client.call('Repository.lock_write', path, token)
544
except errors.ErrorFromSmartServer, err:
545
self._translate_error(err, token=token)
569
err_context = {'token': token}
570
response = self._call('Repository.lock_write', path, token,
547
572
if response[0] == 'ok':
548
573
ok, token = response
627
652
# with no token the remote repository is not persistently locked.
630
response = self._client.call('Repository.unlock', path, token)
631
except errors.ErrorFromSmartServer, err:
632
self._translate_error(err, token=token)
654
err_context = {'token': token}
655
response = self._call('Repository.unlock', path, token,
633
657
if response == ('ok',):
677
701
path = self.bzrdir._path_for_remote_call(self._client)
679
response, protocol = self._client.call_expecting_body(
703
response, protocol = self._call_expecting_body(
680
704
'Repository.tarball', path, compression)
681
705
except errors.UnknownSmartMethod:
682
706
protocol.cancel_read_body()
799
823
return repository.InterRepository.get(
800
824
other, self).search_missing_revision_ids(revision_id, find_ghosts)
802
def fetch(self, source, revision_id=None, pb=None):
826
def fetch(self, source, revision_id=None, pb=None, find_ghosts=False):
827
# Not delegated to _real_repository so that InterRepository.get has a
828
# chance to find an InterRepository specialised for RemoteRepository.
803
829
if self.has_same_location(source):
804
830
# check that last_revision is in 'from' and then return a
807
833
not revision.is_null(revision_id)):
808
834
self.get_revision(revision_id)
811
return self._real_repository.fetch(
812
source, revision_id=revision_id, pb=pb)
836
inter = repository.InterRepository.get(source, self)
838
return inter.fetch(revision_id=revision_id, pb=pb, find_ghosts=find_ghosts)
839
except NotImplementedError:
840
raise errors.IncompatibleRepositories(source, self)
814
842
def create_bundle(self, target, base, fileobj, format=None):
815
843
self._ensure_real()
865
893
self._ensure_real()
866
894
return self._real_repository._fetch_reconcile
868
def get_parent_map(self, keys):
896
def get_parent_map(self, revision_ids):
869
897
"""See bzrlib.Graph.get_parent_map()."""
898
return self._make_parents_provider().get_parent_map(revision_ids)
900
def _get_parent_map(self, keys):
901
"""Implementation of get_parent_map() that ignores fallbacks."""
870
902
# Hack to build up the caching logic.
871
903
ancestry = self._parents_map
872
904
if ancestry is None:
877
909
missing_revisions = set(key for key in keys if key not in ancestry)
878
910
if missing_revisions:
879
parent_map = self._get_parent_map(missing_revisions)
911
parent_map = self._get_parent_map_rpc(missing_revisions)
880
912
if 'hpss' in debug.debug_flags:
881
913
mutter('retransmitted revisions: %d of %d',
882
914
len(set(ancestry).intersection(parent_map)),
890
922
100.0 * len(self._requested_parents) / len(ancestry))
891
923
return dict((k, ancestry[k]) for k in present_keys)
893
def _get_parent_map(self, keys):
925
def _get_parent_map_rpc(self, keys):
894
926
"""Helper for get_parent_map that performs the RPC."""
895
927
medium = self._client._medium
896
928
if medium._is_remote_before((1, 2)):
959
991
verb = 'Repository.get_parent_map'
960
992
args = (path,) + tuple(keys)
962
response = self._client.call_with_body_bytes_expecting_body(
963
verb, args, self._serialise_search_recipe(recipe))
994
response = self._call_with_body_bytes_expecting_body(
964
996
except errors.UnknownSmartMethod:
965
997
# Server does not support this method, so get the whole graph.
966
998
# Worse, we have to force a disconnection, because the server now
1191
1223
return self._real_repository._check_for_inconsistent_revision_parents()
1193
1225
def _make_parents_provider(self):
1226
providers = [_UnstackedParentsProvider(self)]
1227
providers.extend(r._make_parents_provider() for r in
1228
self._fallback_repositories)
1229
return graph._StackedParentsProvider(providers)
1196
1231
def _serialise_search_recipe(self, recipe):
1197
1232
"""Serialise a graph search recipe.
1204
1239
count = str(recipe[2])
1205
1240
return '\n'.join((start_keys, stop_keys, count))
1243
path = self.bzrdir._path_for_remote_call(self._client)
1245
response = self._call('PackRepository.autopack', path)
1246
except errors.UnknownSmartMethod:
1248
self._real_repository._pack_collection.autopack()
1250
if self._real_repository is not None:
1251
# Reset the real repository's cache of pack names.
1252
# XXX: At some point we may be able to skip this and just rely on
1253
# the automatic retry logic to do the right thing, but for now we
1254
# err on the side of being correct rather than being optimal.
1255
self._real_repository._pack_collection.reload_pack_names()
1256
if response[0] != 'ok':
1257
raise errors.UnexpectedSmartServerResponse(response)
1208
1260
class RemoteBranchLockableFiles(LockableFiles):
1209
1261
"""A 'LockableFiles' implementation that talks to a smart server.
1252
class RemoteBranch(branch.Branch):
1304
class RemoteBranch(branch.Branch, _RpcHelper):
1253
1305
"""Branch stored on a server accessed by HPSS RPC.
1255
1307
At the moment most operations are mapped down to simple file operations.
1315
1367
transports = [self.bzrdir.root_transport]
1316
1368
if self._real_branch is not None:
1317
1369
transports.append(self._real_branch._transport)
1318
fallback_bzrdir = BzrDir.open(fallback_url, transports)
1319
fallback_repo = fallback_bzrdir.open_repository()
1320
self.repository.add_fallback_repository(fallback_repo)
1370
stacked_on = branch.Branch.open(fallback_url,
1371
possible_transports=transports)
1372
self.repository.add_fallback_repository(stacked_on.repository)
1322
1374
def _get_real_transport(self):
1323
1375
# if we try vfs access, return the real branch's vfs transport
1461
# there may not be a repository yet, so we can't use
1462
# self._translate_error, so we can't use self._call either.
1409
1463
response = self._client.call('Branch.get_stacked_on_url',
1410
1464
self._remote_path())
1411
if response[0] != 'ok':
1412
raise errors.UnexpectedSmartServerResponse(response)
1414
1465
except errors.ErrorFromSmartServer, err:
1415
1466
# there may not be a repository yet, so we can't call through
1416
1467
# its _translate_error
1418
1469
except errors.UnknownSmartMethod, err:
1419
1470
self._ensure_real()
1420
1471
return self._real_branch.get_stacked_on_url()
1472
if response[0] != 'ok':
1473
raise errors.UnexpectedSmartServerResponse(response)
1422
1476
def lock_read(self):
1423
1477
self.repository.lock_read()
1436
1490
branch_token = token
1437
1491
repo_token = self.repository.lock_write()
1438
1492
self.repository.unlock()
1440
response = self._client.call(
1441
'Branch.lock_write', self._remote_path(),
1442
branch_token, repo_token or '')
1443
except errors.ErrorFromSmartServer, err:
1444
self._translate_error(err, token=token)
1493
err_context = {'token': token}
1494
response = self._call(
1495
'Branch.lock_write', self._remote_path(), branch_token,
1496
repo_token or '', **err_context)
1445
1497
if response[0] != 'ok':
1446
1498
raise errors.UnexpectedSmartServerResponse(response)
1447
1499
ok, branch_token, repo_token = response
1481
1533
return self._lock_token or None
1483
1535
def _unlock(self, branch_token, repo_token):
1485
response = self._client.call('Branch.unlock', self._remote_path(), branch_token,
1487
except errors.ErrorFromSmartServer, err:
1488
self._translate_error(err, token=str((branch_token, repo_token)))
1536
err_context = {'token': str((branch_token, repo_token))}
1537
response = self._call(
1538
'Branch.unlock', self._remote_path(), branch_token,
1539
repo_token or '', **err_context)
1489
1540
if response == ('ok',):
1491
1542
raise errors.UnexpectedSmartServerResponse(response)
1536
1587
self._leave_lock = False
1538
1589
def _last_revision_info(self):
1539
response = self._client.call('Branch.last_revision_info', self._remote_path())
1590
response = self._call('Branch.last_revision_info', self._remote_path())
1540
1591
if response[0] != 'ok':
1541
1592
raise SmartProtocolError('unexpected response code %s' % (response,))
1542
1593
revno = int(response[1])
1546
1597
def _gen_revision_history(self):
1547
1598
"""See Branch._gen_revision_history()."""
1548
response_tuple, response_handler = self._client.call_expecting_body(
1599
response_tuple, response_handler = self._call_expecting_body(
1549
1600
'Branch.revision_history', self._remote_path())
1550
1601
if response_tuple[0] != 'ok':
1551
1602
raise errors.UnexpectedSmartServerResponse(response_tuple)
1560
1611
def _set_last_revision_descendant(self, revision_id, other_branch,
1561
1612
allow_diverged=False, allow_overwrite_descendant=False):
1563
response = self._client.call('Branch.set_last_revision_ex',
1564
self._remote_path(), self._lock_token, self._repo_lock_token, revision_id,
1565
int(allow_diverged), int(allow_overwrite_descendant))
1566
except errors.ErrorFromSmartServer, err:
1567
self._translate_error(err, other_branch=other_branch)
1613
err_context = {'other_branch': other_branch}
1614
response = self._call('Branch.set_last_revision_ex',
1615
self._remote_path(), self._lock_token, self._repo_lock_token,
1616
revision_id, int(allow_diverged), int(allow_overwrite_descendant),
1568
1618
self._clear_cached_state()
1569
1619
if len(response) != 3 and response[0] != 'ok':
1570
1620
raise errors.UnexpectedSmartServerResponse(response)
1577
1627
def _set_last_revision(self, revision_id):
1578
1628
self._clear_cached_state()
1580
response = self._client.call('Branch.set_last_revision',
1581
self._remote_path(), self._lock_token, self._repo_lock_token, revision_id)
1582
except errors.ErrorFromSmartServer, err:
1583
self._translate_error(err)
1629
response = self._call('Branch.set_last_revision',
1630
self._remote_path(), self._lock_token, self._repo_lock_token,
1584
1632
if response != ('ok',):
1585
1633
raise errors.UnexpectedSmartServerResponse(response)
1616
1664
return self._real_branch.set_stacked_on_url(stacked_location)
1618
1666
def sprout(self, to_bzrdir, revision_id=None):
1619
# Like Branch.sprout, except that it sprouts a branch in the default
1620
# format, because RemoteBranches can't be created at arbitrary URLs.
1621
# XXX: if to_bzrdir is a RemoteBranch, this should perhaps do
1622
# to_bzrdir.create_branch...
1624
result = self._real_branch._format.initialize(to_bzrdir)
1625
self.copy_content_into(result, revision_id=revision_id)
1626
result.set_parent(self.bzrdir.root_transport.base)
1667
branch_format = to_bzrdir._format._branch_format
1668
if (branch_format is None or
1669
isinstance(branch_format, RemoteBranchFormat)):
1670
# The to_bzrdir specifies RemoteBranchFormat (or no format, which
1671
# implies the same thing), but RemoteBranches can't be created at
1672
# arbitrary URLs. So create a branch in the same format as
1673
# _real_branch instead.
1674
# XXX: if to_bzrdir is a RemoteBzrDir, this should perhaps do
1675
# to_bzrdir.create_branch to create a RemoteBranch after all...
1677
result = self._real_branch._format.initialize(to_bzrdir)
1678
self.copy_content_into(result, revision_id=revision_id)
1679
result.set_parent(self.bzrdir.root_transport.base)
1681
result = branch.Branch.sprout(
1682
self, to_bzrdir, revision_id=revision_id)
1629
1685
@needs_write_lock
1654
1710
def set_last_revision_info(self, revno, revision_id):
1655
1711
revision_id = ensure_null(revision_id)
1657
response = self._client.call('Branch.set_last_revision_info',
1658
self._remote_path(), self._lock_token, self._repo_lock_token, str(revno), revision_id)
1713
response = self._call('Branch.set_last_revision_info',
1714
self._remote_path(), self._lock_token, self._repo_lock_token,
1715
str(revno), revision_id)
1659
1716
except errors.UnknownSmartMethod:
1660
1717
self._ensure_real()
1661
1718
self._clear_cached_state_of_remote_branch_only()
1662
1719
self._real_branch.set_last_revision_info(revno, revision_id)
1663
1720
self._last_revision_info_cache = revno, revision_id
1665
except errors.ErrorFromSmartServer, err:
1666
self._translate_error(err)
1667
1722
if response == ('ok',):
1668
1723
self._clear_cached_state()
1669
1724
self._last_revision_info_cache = revno, revision_id
1764
1820
def find(name):
1766
1822
return context[name]
1767
except KeyError, keyErr:
1768
mutter('Missing key %r in context %r', keyErr.args[0], context)
1823
except KeyError, key_err:
1824
mutter('Missing key %r in context %r', key_err.args[0], context)
1827
"""Get the path from the context if present, otherwise use first error
1831
return context['path']
1832
except KeyError, key_err:
1834
return err.error_args[0]
1835
except IndexError, idx_err:
1837
'Missing key %r in context %r', key_err.args[0], context)
1770
1840
if err.error_verb == 'NoSuchRevision':
1771
1841
raise NoSuchRevision(find('branch'), err.error_args[0])
1772
1842
elif err.error_verb == 'nosuchrevision':
1793
1863
raise errors.UnstackableRepositoryFormat(*err.error_args)
1794
1864
elif err.error_verb == 'NotStacked':
1795
1865
raise errors.NotStacked(branch=find('branch'))
1866
elif err.error_verb == 'PermissionDenied':
1868
if len(err.error_args) >= 2:
1869
extra = err.error_args[1]
1872
raise errors.PermissionDenied(path, extra=extra)
1873
elif err.error_verb == 'ReadError':
1875
raise errors.ReadError(path)
1876
elif err.error_verb == 'NoSuchFile':
1878
raise errors.NoSuchFile(path)
1879
elif err.error_verb == 'FileExists':
1880
raise errors.FileExists(err.error_args[0])
1881
elif err.error_verb == 'DirectoryNotEmpty':
1882
raise errors.DirectoryNotEmpty(err.error_args[0])
1883
elif err.error_verb == 'ShortReadvError':
1884
args = err.error_args
1885
raise errors.ShortReadvError(
1886
args[0], int(args[1]), int(args[2]), int(args[3]))
1887
elif err.error_verb in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1888
encoding = str(err.error_args[0]) # encoding must always be a string
1889
val = err.error_args[1]
1890
start = int(err.error_args[2])
1891
end = int(err.error_args[3])
1892
reason = str(err.error_args[4]) # reason must always be a string
1893
if val.startswith('u:'):
1894
val = val[2:].decode('utf-8')
1895
elif val.startswith('s:'):
1896
val = val[2:].decode('base64')
1897
if err.error_verb == 'UnicodeDecodeError':
1898
raise UnicodeDecodeError(encoding, val, start, end, reason)
1899
elif err.error_verb == 'UnicodeEncodeError':
1900
raise UnicodeEncodeError(encoding, val, start, end, reason)
1901
elif err.error_verb == 'ReadOnlyError':
1902
raise errors.TransportNotPossible('readonly transport')
1796
1903
raise errors.UnknownErrorFromSmartServer(err)