/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/remote.py

merge bzr.dev rev 4098

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
    SmartProtocolError,
41
41
    )
42
42
from bzrlib.lockable_files import LockableFiles
43
 
from bzrlib.smart import client, vfs
 
43
from bzrlib.smart import client, vfs, repository as smart_repo
44
44
from bzrlib.revision import ensure_null, NULL_REVISION
45
45
from bzrlib.trace import mutter, note, warning
46
46
from bzrlib.util import bencode
47
 
from bzrlib.versionedfile import record_to_fulltext_bytes
48
47
 
49
48
 
50
49
class _RpcHelper(object):
96
95
        # this object holds a delegated bzrdir that uses file-level operations
97
96
        # to talk to the other side
98
97
        self._real_bzrdir = None
 
98
        # 1-shot cache for the call pattern 'create_branch; open_branch' - see
 
99
        # create_branch for details.
 
100
        self._next_open_branch_result = None
99
101
 
100
102
        if _client is None:
101
103
            medium = transport.get_smart_medium()
119
121
        if not self._real_bzrdir:
120
122
            self._real_bzrdir = BzrDir.open_from_transport(
121
123
                self.root_transport, _server_formats=False)
 
124
            self._format._network_name = \
 
125
                self._real_bzrdir._format.network_name()
122
126
 
123
127
    def _translate_error(self, err, **context):
124
128
        _translate_error(err, bzrdir=self, **context)
125
129
 
126
 
    def cloning_metadir(self, stacked=False):
 
130
    def break_lock(self):
 
131
        # Prevent aliasing problems in the next_open_branch_result cache.
 
132
        # See create_branch for rationale.
 
133
        self._next_open_branch_result = None
 
134
        return BzrDir.break_lock(self)
 
135
 
 
136
    def _vfs_cloning_metadir(self, require_stacking=False):
127
137
        self._ensure_real()
128
 
        return self._real_bzrdir.cloning_metadir(stacked)
 
138
        return self._real_bzrdir.cloning_metadir(
 
139
            require_stacking=require_stacking)
 
140
 
 
141
    def cloning_metadir(self, require_stacking=False):
 
142
        medium = self._client._medium
 
143
        if medium._is_remote_before((1, 13)):
 
144
            return self._vfs_cloning_metadir(require_stacking=require_stacking)
 
145
        verb = 'BzrDir.cloning_metadir'
 
146
        if require_stacking:
 
147
            stacking = 'True'
 
148
        else:
 
149
            stacking = 'False'
 
150
        path = self._path_for_remote_call(self._client)
 
151
        try:
 
152
            response = self._call(verb, path, stacking)
 
153
        except errors.UnknownSmartMethod:
 
154
            medium._remember_remote_is_before((1, 13))
 
155
            return self._vfs_cloning_metadir(require_stacking=require_stacking)
 
156
        if len(response) != 3:
 
157
            raise errors.UnexpectedSmartServerResponse(response)
 
158
        control_name, repo_name, branch_info = response
 
159
        if len(branch_info) != 2:
 
160
            raise errors.UnexpectedSmartServerResponse(response)
 
161
        branch_ref, branch_name = branch_info
 
162
        format = bzrdir.network_format_registry.get(control_name)
 
163
        if repo_name:
 
164
            format.repository_format = repository.network_format_registry.get(
 
165
                repo_name)
 
166
        if branch_ref == 'ref':
 
167
            # XXX: we need possible_transports here to avoid reopening the
 
168
            # connection to the referenced location
 
169
            ref_bzrdir = BzrDir.open(branch_name)
 
170
            branch_format = ref_bzrdir.cloning_metadir().get_branch_format()
 
171
            format.set_branch_format(branch_format)
 
172
        elif branch_ref == 'branch':
 
173
            if branch_name:
 
174
                format.set_branch_format(
 
175
                    branch.network_format_registry.get(branch_name))
 
176
        else:
 
177
            raise errors.UnexpectedSmartServerResponse(response)
 
178
        return format
129
179
 
130
180
    def create_repository(self, shared=False):
131
181
        # as per meta1 formats - just delegate to the format object which may
146
196
        # be parameterised.
147
197
        real_branch = self._format.get_branch_format().initialize(self)
148
198
        if not isinstance(real_branch, RemoteBranch):
149
 
            return RemoteBranch(self, self.find_repository(), real_branch)
 
199
            result = RemoteBranch(self, self.find_repository(), real_branch)
150
200
        else:
151
 
            return real_branch
 
201
            result = real_branch
 
202
        # BzrDir.clone_on_transport() uses the result of create_branch but does
 
203
        # not return it to its callers; we save approximately 8% of our round
 
204
        # trips by handing the branch we created back to the first caller to
 
205
        # open_branch rather than probing anew. Long term we need a API in
 
206
        # bzrdir that doesn't discard result objects (like result_branch).
 
207
        # RBC 20090225
 
208
        self._next_open_branch_result = result
 
209
        return result
152
210
 
153
211
    def destroy_branch(self):
154
212
        """See BzrDir.destroy_branch"""
155
213
        self._ensure_real()
156
214
        self._real_bzrdir.destroy_branch()
 
215
        self._next_open_branch_result = None
157
216
 
158
217
    def create_workingtree(self, revision_id=None, from_branch=None):
159
218
        raise errors.NotLocalUrl(self.transport.base)
168
227
 
169
228
    def get_branch_reference(self):
170
229
        """See BzrDir.get_branch_reference()."""
 
230
        response = self._get_branch_reference()
 
231
        if response[0] == 'ref':
 
232
            return response[1]
 
233
        else:
 
234
            return None
 
235
 
 
236
    def _get_branch_reference(self):
171
237
        path = self._path_for_remote_call(self._client)
 
238
        medium = self._client._medium
 
239
        if not medium._is_remote_before((1, 13)):
 
240
            try:
 
241
                response = self._call('BzrDir.open_branchV2', path)
 
242
                if response[0] not in ('ref', 'branch'):
 
243
                    raise errors.UnexpectedSmartServerResponse(response)
 
244
                return response
 
245
            except errors.UnknownSmartMethod:
 
246
                medium._remember_remote_is_before((1, 13))
172
247
        response = self._call('BzrDir.open_branch', path)
173
 
        if response[0] == 'ok':
174
 
            if response[1] == '':
175
 
                # branch at this location.
176
 
                return None
177
 
            else:
178
 
                # a branch reference, use the existing BranchReference logic.
179
 
                return response[1]
 
248
        if response[0] != 'ok':
 
249
            raise errors.UnexpectedSmartServerResponse(response)
 
250
        if response[1] != '':
 
251
            return ('ref', response[1])
180
252
        else:
181
 
            raise errors.UnexpectedSmartServerResponse(response)
 
253
            return ('branch', '')
182
254
 
183
255
    def _get_tree_branch(self):
184
256
        """See BzrDir._get_tree_branch()."""
187
259
    def open_branch(self, _unsupported=False):
188
260
        if _unsupported:
189
261
            raise NotImplementedError('unsupported flag support not implemented yet.')
190
 
        reference_url = self.get_branch_reference()
191
 
        if reference_url is None:
192
 
            # branch at this location.
193
 
            return RemoteBranch(self, self.find_repository())
194
 
        else:
 
262
        if self._next_open_branch_result is not None:
 
263
            # See create_branch for details.
 
264
            result = self._next_open_branch_result
 
265
            self._next_open_branch_result = None
 
266
            return result
 
267
        response = self._get_branch_reference()
 
268
        if response[0] == 'ref':
195
269
            # a branch reference, use the existing BranchReference logic.
196
270
            format = BranchReferenceFormat()
197
 
            return format.open(self, _found=True, location=reference_url)
 
271
            return format.open(self, _found=True, location=response[1])
 
272
        branch_format_name = response[1]
 
273
        if not branch_format_name:
 
274
            branch_format_name = None
 
275
        format = RemoteBranchFormat(network_name=branch_format_name)
 
276
        return RemoteBranch(self, self.find_repository(), format=format)
 
277
 
 
278
    def _open_repo_v1(self, path):
 
279
        verb = 'BzrDir.find_repository'
 
280
        response = self._call(verb, path)
 
281
        if response[0] != 'ok':
 
282
            raise errors.UnexpectedSmartServerResponse(response)
 
283
        # servers that only support the v1 method don't support external
 
284
        # references either.
 
285
        self._ensure_real()
 
286
        repo = self._real_bzrdir.open_repository()
 
287
        response = response + ('no', repo._format.network_name())
 
288
        return response, repo
 
289
 
 
290
    def _open_repo_v2(self, path):
 
291
        verb = 'BzrDir.find_repositoryV2'
 
292
        response = self._call(verb, path)
 
293
        if response[0] != 'ok':
 
294
            raise errors.UnexpectedSmartServerResponse(response)
 
295
        self._ensure_real()
 
296
        repo = self._real_bzrdir.open_repository()
 
297
        response = response + (repo._format.network_name(),)
 
298
        return response, repo
 
299
 
 
300
    def _open_repo_v3(self, path):
 
301
        verb = 'BzrDir.find_repositoryV3'
 
302
        medium = self._client._medium
 
303
        if medium._is_remote_before((1, 13)):
 
304
            raise errors.UnknownSmartMethod(verb)
 
305
        try:
 
306
            response = self._call(verb, path)
 
307
        except errors.UnknownSmartMethod:
 
308
            medium._remember_remote_is_before((1, 13))
 
309
            raise
 
310
        if response[0] != 'ok':
 
311
            raise errors.UnexpectedSmartServerResponse(response)
 
312
        return response, None
198
313
 
199
314
    def open_repository(self):
200
315
        path = self._path_for_remote_call(self._client)
201
 
        verb = 'BzrDir.find_repositoryV2'
202
 
        try:
203
 
            response = self._call(verb, path)
204
 
        except errors.UnknownSmartMethod:
205
 
            verb = 'BzrDir.find_repository'
206
 
            response = self._call(verb, path)
 
316
        response = None
 
317
        for probe in [self._open_repo_v3, self._open_repo_v2,
 
318
            self._open_repo_v1]:
 
319
            try:
 
320
                response, real_repo = probe(path)
 
321
                break
 
322
            except errors.UnknownSmartMethod:
 
323
                pass
 
324
        if response is None:
 
325
            raise errors.UnknownSmartMethod('BzrDir.find_repository{3,2,}')
207
326
        if response[0] != 'ok':
208
327
            raise errors.UnexpectedSmartServerResponse(response)
209
 
        if verb == 'BzrDir.find_repository':
210
 
            # servers that don't support the V2 method don't support external
211
 
            # references either.
212
 
            response = response + ('no', )
213
 
        if not (len(response) == 5):
 
328
        if len(response) != 6:
214
329
            raise SmartProtocolError('incorrect response length %s' % (response,))
215
330
        if response[1] == '':
216
 
            format = RemoteRepositoryFormat()
217
 
            format.rich_root_data = (response[2] == 'yes')
218
 
            format.supports_tree_reference = (response[3] == 'yes')
219
 
            # No wire format to check this yet.
220
 
            format.supports_external_lookups = (response[4] == 'yes')
 
331
            # repo is at this dir.
 
332
            format = response_tuple_to_repo_format(response[2:])
221
333
            # Used to support creating a real format instance when needed.
222
334
            format._creating_bzrdir = self
223
335
            remote_repo = RemoteRepository(self, format)
224
336
            format._creating_repo = remote_repo
 
337
            if real_repo is not None:
 
338
                remote_repo._set_real_repository(real_repo)
225
339
            return remote_repo
226
340
        else:
227
341
            raise errors.NoRepositoryPresent(self)
351
465
            response = a_bzrdir._call(verb, path, network_name, shared_str)
352
466
        except errors.UnknownSmartMethod:
353
467
            # Fallback - use vfs methods
 
468
            medium._remember_remote_is_before((1, 13))
354
469
            return self._vfs_initialize(a_bzrdir, shared)
355
470
        else:
356
471
            # Turn the response into a RemoteRepository object.
366
481
            raise AssertionError('%r is not a RemoteBzrDir' % (a_bzrdir,))
367
482
        return a_bzrdir.open_repository()
368
483
 
 
484
    def _ensure_real(self):
 
485
        if self._custom_format is None:
 
486
            self._custom_format = repository.network_format_registry.get(
 
487
                self._network_name)
 
488
 
 
489
    @property
 
490
    def _fetch_order(self):
 
491
        self._ensure_real()
 
492
        return self._custom_format._fetch_order
 
493
 
 
494
    @property
 
495
    def _fetch_uses_deltas(self):
 
496
        self._ensure_real()
 
497
        return self._custom_format._fetch_uses_deltas
 
498
 
 
499
    @property
 
500
    def _fetch_reconcile(self):
 
501
        self._ensure_real()
 
502
        return self._custom_format._fetch_reconcile
 
503
 
369
504
    def get_format_description(self):
370
505
        return 'bzr remote repository'
371
506
 
372
507
    def __eq__(self, other):
373
 
        return self.__class__ == other.__class__
 
508
        return self.__class__ is other.__class__
374
509
 
375
510
    def check_conversion_target(self, target_format):
376
511
        if self.rich_root_data and not target_format.rich_root_data:
389
524
 
390
525
    @property
391
526
    def _serializer(self):
392
 
        if self._custom_format is not None:
393
 
            return self._custom_format._serializer
394
 
        elif self._network_name is not None:
395
 
            self._custom_format = repository.network_format_registry.get(
396
 
                self._network_name)
397
 
            return self._custom_format._serializer
398
 
        else:
399
 
            # We should only be getting asked for the serializer for
400
 
            # RemoteRepositoryFormat objects when the RemoteRepositoryFormat object
401
 
            # is a concrete instance for a RemoteRepository. In this case we know
402
 
            # the creating_repo and can use it to supply the serializer.
403
 
            self._creating_repo._ensure_real()
404
 
            return self._creating_repo._real_repository._format._serializer
 
527
        self._ensure_real()
 
528
        return self._custom_format._serializer
405
529
 
406
530
 
407
531
class RemoteRepository(_RpcHelper):
561
685
        """See Repository._get_sink()."""
562
686
        return RemoteStreamSink(self)
563
687
 
 
688
    def _get_source(self, to_format):
 
689
        """Return a source for streaming from this repository."""
 
690
        return RemoteStreamSource(self, to_format)
 
691
 
564
692
    def has_revision(self, revision_id):
565
693
        """See Repository.has_revision()."""
566
694
        if revision_id == NULL_REVISION:
588
716
        return result
589
717
 
590
718
    def has_same_location(self, other):
591
 
        return (self.__class__ == other.__class__ and
 
719
        return (self.__class__ is other.__class__ and
592
720
                self.bzrdir.transport.base == other.bzrdir.transport.base)
593
721
 
594
722
    def get_graph(self, other_repository=None):
730
858
            implemented operations.
731
859
        """
732
860
        if self._real_repository is not None:
733
 
            raise AssertionError('_real_repository is already set')
 
861
            # Replacing an already set real repository.
 
862
            # We cannot do this [currently] if the repository is locked -
 
863
            # synchronised state might be lost.
 
864
            if self.is_locked():
 
865
                raise AssertionError('_real_repository is already set')
734
866
        if isinstance(repository, RemoteRepository):
735
867
            raise AssertionError()
736
868
        self._real_repository = repository
 
869
        # If the _real_repository has _fallback_repositories, clear them out,
 
870
        # because we want it to have the same set as this repository.  This is
 
871
        # reasonable to do because the fallbacks we clear here are from a
 
872
        # "real" branch, and we're about to replace them with the equivalents
 
873
        # from a RemoteBranch.
 
874
        self._real_repository._fallback_repositories = []
737
875
        for fb in self._fallback_repositories:
738
876
            self._real_repository.add_fallback_repository(fb)
739
877
        if self._lock_mode == 'w':
865
1003
        #
866
1004
        # We need to accumulate additional repositories here, to pass them in
867
1005
        # on various RPC's.
 
1006
        #
868
1007
        self._fallback_repositories.append(repository)
869
 
        # They are also seen by the fallback repository.  If it doesn't exist
870
 
        # yet they'll be added then.  This implicitly copies them.
871
 
        self._ensure_real()
 
1008
        # If self._real_repository was parameterised already (e.g. because a
 
1009
        # _real_branch had its get_stacked_on_url method called), then the
 
1010
        # repository to be added may already be in the _real_repositories list.
 
1011
        if self._real_repository is not None:
 
1012
            if repository not in self._real_repository._fallback_repositories:
 
1013
                self._real_repository.add_fallback_repository(repository)
 
1014
        else:
 
1015
            # They are also seen by the fallback repository.  If it doesn't
 
1016
            # exist yet they'll be added then.  This implicitly copies them.
 
1017
            self._ensure_real()
872
1018
 
873
1019
    def add_inventory(self, revid, inv, parents):
874
1020
        self._ensure_real()
937
1083
        return repository.InterRepository.get(
938
1084
            other, self).search_missing_revision_ids(revision_id, find_ghosts)
939
1085
 
940
 
    def fetch(self, source, revision_id=None, pb=None, find_ghosts=False):
 
1086
    def fetch(self, source, revision_id=None, pb=None, find_ghosts=False,
 
1087
            fetch_spec=None):
 
1088
        if fetch_spec is not None and revision_id is not None:
 
1089
            raise AssertionError(
 
1090
                "fetch_spec and revision_id are mutually exclusive.")
941
1091
        # Not delegated to _real_repository so that InterRepository.get has a
942
1092
        # chance to find an InterRepository specialised for RemoteRepository.
943
 
        if self.has_same_location(source):
 
1093
        if self.has_same_location(source) and fetch_spec is None:
944
1094
            # check that last_revision is in 'from' and then return a
945
1095
            # no-operation.
946
1096
            if (revision_id is not None and
949
1099
            return 0, []
950
1100
        inter = repository.InterRepository.get(source, self)
951
1101
        try:
952
 
            return inter.fetch(revision_id=revision_id, pb=pb, find_ghosts=find_ghosts)
 
1102
            return inter.fetch(revision_id=revision_id, pb=pb,
 
1103
                    find_ghosts=find_ghosts, fetch_spec=fetch_spec)
953
1104
        except NotImplementedError:
954
1105
            raise errors.IncompatibleRepositories(source, self)
955
1106
 
977
1128
        self._ensure_real()
978
1129
        return self._real_repository.iter_files_bytes(desired_files)
979
1130
 
980
 
    @property
981
 
    def _fetch_order(self):
982
 
        """Decorate the real repository for now.
983
 
 
984
 
        In the long term getting this back from the remote repository as part
985
 
        of open would be more efficient.
986
 
        """
987
 
        self._ensure_real()
988
 
        return self._real_repository._fetch_order
989
 
 
990
 
    @property
991
 
    def _fetch_uses_deltas(self):
992
 
        """Decorate the real repository for now.
993
 
 
994
 
        In the long term getting this back from the remote repository as part
995
 
        of open would be more efficient.
996
 
        """
997
 
        self._ensure_real()
998
 
        return self._real_repository._fetch_uses_deltas
999
 
 
1000
 
    @property
1001
 
    def _fetch_reconcile(self):
1002
 
        """Decorate the real repository for now.
1003
 
 
1004
 
        In the long term getting this back from the remote repository as part
1005
 
        of open would be more efficient.
1006
 
        """
1007
 
        self._ensure_real()
1008
 
        return self._real_repository._fetch_reconcile
1009
 
 
1010
1131
    def get_parent_map(self, revision_ids):
1011
1132
        """See bzrlib.Graph.get_parent_map()."""
1012
1133
        return self._make_parents_provider().get_parent_map(revision_ids)
1280
1401
        return self._real_repository.get_revisions(revision_ids)
1281
1402
 
1282
1403
    def supports_rich_root(self):
1283
 
        self._ensure_real()
1284
 
        return self._real_repository.supports_rich_root()
 
1404
        return self._format.rich_root_data
1285
1405
 
1286
1406
    def iter_reverse_revision_history(self, revision_id):
1287
1407
        self._ensure_real()
1289
1409
 
1290
1410
    @property
1291
1411
    def _serializer(self):
1292
 
        self._ensure_real()
1293
 
        return self._real_repository._serializer
 
1412
        return self._format._serializer
1294
1413
 
1295
1414
    def store_revision_signature(self, gpg_strategy, plaintext, revision_id):
1296
1415
        self._ensure_real()
1342
1461
        count = str(recipe[2])
1343
1462
        return '\n'.join((start_keys, stop_keys, count))
1344
1463
 
 
1464
    def _serialise_search_result(self, search_result):
 
1465
        if isinstance(search_result, graph.PendingAncestryResult):
 
1466
            parts = ['ancestry-of']
 
1467
            parts.extend(search_result.heads)
 
1468
        else:
 
1469
            recipe = search_result.get_recipe()
 
1470
            parts = ['search', self._serialise_search_recipe(recipe)]
 
1471
        return '\n'.join(parts)
 
1472
 
1345
1473
    def autopack(self):
1346
1474
        path = self.bzrdir._path_for_remote_call(self._client)
1347
1475
        try:
1362
1490
 
1363
1491
class RemoteStreamSink(repository.StreamSink):
1364
1492
 
1365
 
    def __init__(self, target_repo):
1366
 
        repository.StreamSink.__init__(self, target_repo)
1367
 
        self._resume_tokens = []
1368
 
 
1369
 
    def _insert_real(self, stream, src_format):
 
1493
    def _insert_real(self, stream, src_format, resume_tokens):
1370
1494
        self.target_repo._ensure_real()
1371
1495
        sink = self.target_repo._real_repository._get_sink()
1372
 
        result = sink.insert_stream(stream, src_format)
 
1496
        result = sink.insert_stream(stream, src_format, resume_tokens)
1373
1497
        if not result:
1374
1498
            self.target_repo.autopack()
1375
1499
        return result
1376
1500
 
1377
 
    def insert_stream(self, stream, src_format):
 
1501
    def insert_stream(self, stream, src_format, resume_tokens):
1378
1502
        repo = self.target_repo
1379
1503
        client = repo._client
1380
1504
        medium = client._medium
1381
1505
        if medium._is_remote_before((1, 13)):
1382
1506
            # No possible way this can work.
1383
 
            return self._insert_real(stream, src_format)
 
1507
            return self._insert_real(stream, src_format, resume_tokens)
1384
1508
        path = repo.bzrdir._path_for_remote_call(client)
1385
 
        if not self._resume_tokens:
 
1509
        if not resume_tokens:
1386
1510
            # XXX: Ugly but important for correctness, *will* be fixed during
1387
1511
            # 1.13 cycle. Pushing a stream that is interrupted results in a
1388
1512
            # fallback to the _real_repositories sink *with a partial stream*.
1391
1515
            # do not fallback when actually pushing the stream. A cleanup patch
1392
1516
            # is going to look at rewinding/restarting the stream/partial
1393
1517
            # buffering etc.
1394
 
            byte_stream = self._stream_to_byte_stream([], src_format)
 
1518
            byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1395
1519
            try:
1396
 
                resume_tokens = ''
1397
1520
                response = client.call_with_body_stream(
1398
 
                    ('Repository.insert_stream', path, resume_tokens), byte_stream)
 
1521
                    ('Repository.insert_stream', path, ''), byte_stream)
1399
1522
            except errors.UnknownSmartMethod:
1400
1523
                medium._remember_remote_is_before((1,13))
1401
 
                return self._insert_real(stream, src_format)
1402
 
        byte_stream = self._stream_to_byte_stream(stream, src_format)
1403
 
        resume_tokens = ' '.join(self._resume_tokens)
 
1524
                return self._insert_real(stream, src_format, resume_tokens)
 
1525
        byte_stream = smart_repo._stream_to_byte_stream(
 
1526
            stream, src_format)
 
1527
        resume_tokens = ' '.join(resume_tokens)
1404
1528
        response = client.call_with_body_stream(
1405
1529
            ('Repository.insert_stream', path, resume_tokens), byte_stream)
1406
1530
        if response[0][0] not in ('ok', 'missing-basis'):
1407
1531
            raise errors.UnexpectedSmartServerResponse(response)
1408
1532
        if response[0][0] == 'missing-basis':
1409
1533
            tokens, missing_keys = bencode.bdecode_as_tuple(response[0][1])
1410
 
            self._resume_tokens = tokens
1411
 
            return missing_keys
 
1534
            resume_tokens = tokens
 
1535
            return resume_tokens, missing_keys
1412
1536
        else:
1413
1537
            if self.target_repo._real_repository is not None:
1414
1538
                collection = getattr(self.target_repo._real_repository,
1415
1539
                    '_pack_collection', None)
1416
1540
                if collection is not None:
1417
1541
                    collection.reload_pack_names()
1418
 
            return []
1419
 
 
1420
 
    def _stream_to_byte_stream(self, stream, src_format):
1421
 
        bytes = []
1422
 
        pack_writer = pack.ContainerWriter(bytes.append)
1423
 
        pack_writer.begin()
1424
 
        pack_writer.add_bytes_record(src_format.network_name(), '')
1425
 
        adapters = {}
1426
 
        def get_adapter(adapter_key):
1427
 
            try:
1428
 
                return adapters[adapter_key]
1429
 
            except KeyError:
1430
 
                adapter_factory = adapter_registry.get(adapter_key)
1431
 
                adapter = adapter_factory(self)
1432
 
                adapters[adapter_key] = adapter
1433
 
                return adapter
1434
 
        for substream_type, substream in stream:
1435
 
            for record in substream:
1436
 
                if record.storage_kind in ('chunked', 'fulltext'):
1437
 
                    serialised = record_to_fulltext_bytes(record)
1438
 
                else:
1439
 
                    serialised = record.get_bytes_as(record.storage_kind)
1440
 
                pack_writer.add_bytes_record(serialised, [(substream_type,)])
1441
 
                for b in bytes:
1442
 
                    yield b
1443
 
                del bytes[:]
1444
 
        pack_writer.end()
1445
 
        for b in bytes:
1446
 
            yield b
 
1542
            return [], set()
 
1543
 
 
1544
 
 
1545
class RemoteStreamSource(repository.StreamSource):
 
1546
    """Stream data from a remote server."""
 
1547
 
 
1548
    def get_stream(self, search):
 
1549
        # streaming with fallback repositories is not well defined yet: The
 
1550
        # remote repository cannot see the fallback repositories, and thus
 
1551
        # cannot satisfy the entire search in the general case. Likewise the
 
1552
        # fallback repositories cannot reify the search to determine what they
 
1553
        # should send. It likely needs a return value in the stream listing the
 
1554
        # edge of the search to resume from in fallback repositories.
 
1555
        if self.from_repository._fallback_repositories:
 
1556
            return repository.StreamSource.get_stream(self, search)
 
1557
        repo = self.from_repository
 
1558
        client = repo._client
 
1559
        medium = client._medium
 
1560
        if medium._is_remote_before((1, 13)):
 
1561
            # No possible way this can work.
 
1562
            return repository.StreamSource.get_stream(self, search)
 
1563
        path = repo.bzrdir._path_for_remote_call(client)
 
1564
        try:
 
1565
            search_bytes = repo._serialise_search_result(search)
 
1566
            response = repo._call_with_body_bytes_expecting_body(
 
1567
                'Repository.get_stream',
 
1568
                (path, self.to_format.network_name()), search_bytes)
 
1569
            response_tuple, response_handler = response
 
1570
        except errors.UnknownSmartMethod:
 
1571
            medium._remember_remote_is_before((1,13))
 
1572
            return repository.StreamSource.get_stream(self, search)
 
1573
        if response_tuple[0] != 'ok':
 
1574
            raise errors.UnexpectedSmartServerResponse(response_tuple)
 
1575
        byte_stream = response_handler.read_streamed_body()
 
1576
        src_format, stream = smart_repo._byte_stream_to_stream(byte_stream)
 
1577
        if src_format.network_name() != repo._format.network_name():
 
1578
            raise AssertionError(
 
1579
                "Mismatched RemoteRepository and stream src %r, %r" % (
 
1580
                src_format.network_name(), repo._format.network_name()))
 
1581
        return stream
1447
1582
 
1448
1583
 
1449
1584
class RemoteBranchLockableFiles(LockableFiles):
1468
1603
 
1469
1604
class RemoteBranchFormat(branch.BranchFormat):
1470
1605
 
1471
 
    def __init__(self):
 
1606
    def __init__(self, network_name=None):
1472
1607
        super(RemoteBranchFormat, self).__init__()
1473
1608
        self._matchingbzrdir = RemoteBzrDirFormat()
1474
1609
        self._matchingbzrdir.set_branch_format(self)
1475
1610
        self._custom_format = None
 
1611
        self._network_name = network_name
1476
1612
 
1477
1613
    def __eq__(self, other):
1478
1614
        return (isinstance(other, RemoteBranchFormat) and
1479
1615
            self.__dict__ == other.__dict__)
1480
1616
 
 
1617
    def _ensure_real(self):
 
1618
        if self._custom_format is None:
 
1619
            self._custom_format = branch.network_format_registry.get(
 
1620
                self._network_name)
 
1621
 
1481
1622
    def get_format_description(self):
1482
1623
        return 'Remote BZR Branch'
1483
1624
 
1527
1668
            response = a_bzrdir._call(verb, path, network_name)
1528
1669
        except errors.UnknownSmartMethod:
1529
1670
            # Fallback - use vfs methods
 
1671
            medium._remember_remote_is_before((1, 13))
1530
1672
            return self._vfs_initialize(a_bzrdir)
1531
1673
        if response[0] != 'ok':
1532
1674
            raise errors.UnexpectedSmartServerResponse(response)
1533
1675
        # Turn the response into a RemoteRepository object.
1534
 
        format = RemoteBranchFormat()
1535
 
        format._network_name = response[1]
 
1676
        format = RemoteBranchFormat(network_name=response[1])
1536
1677
        repo_format = response_tuple_to_repo_format(response[3:])
1537
1678
        if response[2] == '':
1538
1679
            repo_bzrdir = a_bzrdir
1543
1684
        remote_repo = RemoteRepository(repo_bzrdir, repo_format)
1544
1685
        remote_branch = RemoteBranch(a_bzrdir, remote_repo,
1545
1686
            format=format, setup_stacking=False)
 
1687
        # XXX: We know this is a new branch, so it must have revno 0, revid
 
1688
        # NULL_REVISION. Creating the branch locked would make this be unable
 
1689
        # to be wrong; here its simply very unlikely to be wrong. RBC 20090225
 
1690
        remote_branch._last_revision_info_cache = 0, NULL_REVISION
1546
1691
        return remote_branch
1547
1692
 
 
1693
    def make_tags(self, branch):
 
1694
        self._ensure_real()
 
1695
        return self._custom_format.make_tags(branch)
 
1696
 
1548
1697
    def supports_tags(self):
1549
1698
        # Remote branches might support tags, but we won't know until we
1550
1699
        # access the real remote branch.
1551
 
        return True
 
1700
        self._ensure_real()
 
1701
        return self._custom_format.supports_tags()
1552
1702
 
1553
1703
 
1554
1704
class RemoteBranch(branch.Branch, _RpcHelper):
1613
1763
            if real_branch is not None:
1614
1764
                self._format._network_name = \
1615
1765
                    self._real_branch._format.network_name()
1616
 
            #else:
1617
 
            #    # XXX: Need to get this from BzrDir.open_branch's return value.
1618
 
            #    self._ensure_real()
1619
 
            #    self._format._network_name = \
1620
 
            #        self._real_branch._format.network_name()
1621
1766
        else:
1622
1767
            self._format = format
 
1768
        if not self._format._network_name:
 
1769
            # Did not get from open_branchV2 - old server.
 
1770
            self._ensure_real()
 
1771
            self._format._network_name = \
 
1772
                self._real_branch._format.network_name()
 
1773
        self.tags = self._format.make_tags(self)
1623
1774
        # The base class init is not called, so we duplicate this:
1624
1775
        hooks = branch.Branch.hooks['open']
1625
1776
        for hook in hooks:
1638
1789
        # it's relative to this branch...
1639
1790
        fallback_url = urlutils.join(self.base, fallback_url)
1640
1791
        transports = [self.bzrdir.root_transport]
1641
 
        if self._real_branch is not None:
1642
 
            transports.append(self._real_branch._transport)
1643
1792
        stacked_on = branch.Branch.open(fallback_url,
1644
1793
                                        possible_transports=transports)
1645
1794
        self.repository.add_fallback_repository(stacked_on.repository)
1746
1895
            raise errors.UnexpectedSmartServerResponse(response)
1747
1896
        return response[1]
1748
1897
 
 
1898
    def _vfs_get_tags_bytes(self):
 
1899
        self._ensure_real()
 
1900
        return self._real_branch._get_tags_bytes()
 
1901
 
 
1902
    def _get_tags_bytes(self):
 
1903
        medium = self._client._medium
 
1904
        if medium._is_remote_before((1, 13)):
 
1905
            return self._vfs_get_tags_bytes()
 
1906
        try:
 
1907
            response = self._call('Branch.get_tags_bytes', self._remote_path())
 
1908
        except errors.UnknownSmartMethod:
 
1909
            medium._remember_remote_is_before((1, 13))
 
1910
            return self._vfs_get_tags_bytes()
 
1911
        return response[0]
 
1912
 
1749
1913
    def lock_read(self):
1750
1914
        self.repository.lock_read()
1751
1915
        if not self._lock_mode:
1805
1969
            self.repository.lock_write(self._repo_lock_token)
1806
1970
        return self._lock_token or None
1807
1971
 
 
1972
    def _set_tags_bytes(self, bytes):
 
1973
        self._ensure_real()
 
1974
        return self._real_branch._set_tags_bytes(bytes)
 
1975
 
1808
1976
    def _unlock(self, branch_token, repo_token):
1809
1977
        err_context = {'token': str((branch_token, repo_token))}
1810
1978
        response = self._call(
1939
2107
            hook(self, rev_history)
1940
2108
        self._cache_revision_history(rev_history)
1941
2109
 
1942
 
    def get_parent(self):
1943
 
        self._ensure_real()
1944
 
        return self._real_branch.get_parent()
1945
 
 
1946
2110
    def _get_parent_location(self):
1947
 
        # Used by tests, when checking normalisation of given vs stored paths.
 
2111
        medium = self._client._medium
 
2112
        if medium._is_remote_before((1, 13)):
 
2113
            return self._vfs_get_parent_location()
 
2114
        try:
 
2115
            response = self._call('Branch.get_parent', self._remote_path())
 
2116
        except errors.UnknownSmartMethod:
 
2117
            medium._remember_remote_is_before((1, 13))
 
2118
            return self._vfs_get_parent_location()
 
2119
        if len(response) != 1:
 
2120
            raise errors.UnexpectedSmartServerResponse(response)
 
2121
        parent_location = response[0]
 
2122
        if parent_location == '':
 
2123
            return None
 
2124
        return parent_location
 
2125
 
 
2126
    def _vfs_get_parent_location(self):
1948
2127
        self._ensure_real()
1949
2128
        return self._real_branch._get_parent_location()
1950
2129
 
2038
2217
        self.set_revision_history(self._lefthand_history(revision_id,
2039
2218
            last_rev=last_rev,other_branch=other_branch))
2040
2219
 
2041
 
    @property
2042
 
    def tags(self):
2043
 
        self._ensure_real()
2044
 
        return self._real_branch.tags
2045
 
 
2046
2220
    def set_push_location(self, location):
2047
2221
        self._ensure_real()
2048
2222
        return self._real_branch.set_push_location(location)
2049
2223
 
2050
 
    @needs_write_lock
2051
 
    def update_revisions(self, other, stop_revision=None, overwrite=False,
2052
 
                         graph=None):
2053
 
        """See Branch.update_revisions."""
2054
 
        other.lock_read()
2055
 
        try:
2056
 
            if stop_revision is None:
2057
 
                stop_revision = other.last_revision()
2058
 
                if revision.is_null(stop_revision):
2059
 
                    # if there are no commits, we're done.
2060
 
                    return
2061
 
            self.fetch(other, stop_revision)
2062
 
 
2063
 
            if overwrite:
2064
 
                # Just unconditionally set the new revision.  We don't care if
2065
 
                # the branches have diverged.
2066
 
                self._set_last_revision(stop_revision)
2067
 
            else:
2068
 
                medium = self._client._medium
2069
 
                if not medium._is_remote_before((1, 6)):
2070
 
                    try:
2071
 
                        self._set_last_revision_descendant(stop_revision, other)
2072
 
                        return
2073
 
                    except errors.UnknownSmartMethod:
2074
 
                        medium._remember_remote_is_before((1, 6))
2075
 
                # Fallback for pre-1.6 servers: check for divergence
2076
 
                # client-side, then do _set_last_revision.
2077
 
                last_rev = revision.ensure_null(self.last_revision())
2078
 
                if graph is None:
2079
 
                    graph = self.repository.get_graph()
2080
 
                if self._check_if_descendant_or_diverged(
2081
 
                        stop_revision, last_rev, graph, other):
2082
 
                    # stop_revision is a descendant of last_rev, but we aren't
2083
 
                    # overwriting, so we're done.
2084
 
                    return
2085
 
                self._set_last_revision(stop_revision)
2086
 
        finally:
2087
 
            other.unlock()
2088
 
 
2089
2224
 
2090
2225
def _extract_tar(tar, to_dir):
2091
2226
    """Extract all the contents of a tarfile object.