/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/fetch.py

  • Committer: John Arbash Meinel
  • Date: 2011-04-20 14:27:19 UTC
  • mto: This revision was merged to the branch mainline in revision 5837.
  • Revision ID: john@arbash-meinel.com-20110420142719-advs1k5vztqzbrgv
Fix bug #767177. Be more agressive with file.close() calls.

Our test suite gets a number of thread leaks and failures because it happens to get async
SFTPFile.close() calls. (if an SFTPFile closes due to __del__ it is done as an async request,
while if you call SFTPFile.close() it is done as a synchronous request.)
We have a couple other cases, probably. Namely SFTPTransport.get() also does an async
prefetch of the content, so if you don't .read() you'll also leak threads that think they
are doing work that you want.

The biggest change here, though, is using a try/finally in a generator, which is not 
python2.4 compatible.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
28
28
from bzrlib.lazy_import import lazy_import
29
29
lazy_import(globals(), """
30
30
from bzrlib import (
 
31
    graph,
31
32
    tsort,
32
33
    versionedfile,
33
34
    )
34
35
""")
35
 
import bzrlib
36
36
from bzrlib import (
37
37
    errors,
38
38
    ui,
54
54
 
55
55
        :param last_revision: If set, try to limit to the data this revision
56
56
            references.
 
57
        :param fetch_spec: A SearchResult specifying which revisions to fetch.
 
58
            If set, this overrides last_revision.
57
59
        :param find_ghosts: If True search the entire history for ghosts.
58
60
        """
59
61
        # repository.fetch has the responsibility for short-circuiting
92
94
        pb.show_pct = pb.show_count = False
93
95
        try:
94
96
            pb.update("Finding revisions", 0, 2)
95
 
            search = self._revids_to_fetch()
96
 
            if search is None:
 
97
            search_result = self._revids_to_fetch()
 
98
            mutter('fetching: %s', search_result)
 
99
            if search_result.is_empty():
97
100
                return
98
101
            pb.update("Fetching revisions", 1, 2)
99
 
            self._fetch_everything_for_search(search)
 
102
            self._fetch_everything_for_search(search_result)
100
103
        finally:
101
104
            pb.finished()
102
105
 
125
128
            pb.update("Inserting stream")
126
129
            resume_tokens, missing_keys = self.sink.insert_stream(
127
130
                stream, from_format, [])
128
 
            if self.to_repository._fallback_repositories:
129
 
                missing_keys.update(
130
 
                    self._parent_inventories(search.get_keys()))
131
131
            if missing_keys:
132
132
                pb.update("Missing keys")
133
133
                stream = source.get_stream_for_missing_keys(missing_keys)
151
151
        """Determines the exact revisions needed from self.from_repository to
152
152
        install self._last_revision in self.to_repository.
153
153
 
154
 
        If no revisions need to be fetched, then this just returns None.
 
154
        :returns: A SearchResult of some sort.  (Possibly a
 
155
            PendingAncestryResult, EmptySearchResult, etc.)
155
156
        """
156
157
        if self._fetch_spec is not None:
 
158
            # The fetch spec is already a concrete search result.
157
159
            return self._fetch_spec
158
 
        mutter('fetch up to rev {%s}', self._last_revision)
159
 
        if self._last_revision is NULL_REVISION:
 
160
        elif self._last_revision == NULL_REVISION:
 
161
            # fetch_spec is None + last_revision is null => empty fetch.
160
162
            # explicit limit of no revisions needed
161
 
            return None
162
 
        return self.to_repository.search_missing_revision_ids(
163
 
            self.from_repository, self._last_revision,
164
 
            find_ghosts=self.find_ghosts)
165
 
 
166
 
    def _parent_inventories(self, revision_ids):
167
 
        # Find all the parent revisions referenced by the stream, but
168
 
        # not present in the stream, and make sure we send their
169
 
        # inventories.
170
 
        parent_maps = self.to_repository.get_parent_map(revision_ids)
171
 
        parents = set()
172
 
        map(parents.update, parent_maps.itervalues())
173
 
        parents.discard(NULL_REVISION)
174
 
        parents.difference_update(revision_ids)
175
 
        missing_keys = set(('inventories', rev_id) for rev_id in parents)
176
 
        return missing_keys
 
163
            return graph.EmptySearchResult()
 
164
        elif self._last_revision is not None:
 
165
            return graph.NotInOtherForRevs(self.to_repository,
 
166
                self.from_repository, [self._last_revision],
 
167
                find_ghosts=self.find_ghosts).execute()
 
168
        else: # self._last_revision is None:
 
169
            return graph.EverythingNotInOther(self.to_repository,
 
170
                self.from_repository,
 
171
                find_ghosts=self.find_ghosts).execute()
177
172
 
178
173
 
179
174
class Inter1and2Helper(object):
182
177
    This is for use by fetchers and converters.
183
178
    """
184
179
 
 
180
    # This is a class variable so that the test suite can override it.
 
181
    known_graph_threshold = 100
 
182
 
185
183
    def __init__(self, source):
186
184
        """Constructor.
187
185
 
243
241
        # yet, and are unlikely to in non-rich-root environments anyway.
244
242
        root_id_order.sort(key=operator.itemgetter(0))
245
243
        # Create a record stream containing the roots to create.
246
 
        if len(revs) > 100:
247
 
            # XXX: not covered by tests, should have a flag to always run
248
 
            # this. -- mbp 20100129
249
 
            graph = self.source_repo.get_known_graph_ancestry(revs)
 
244
        if len(revs) > self.known_graph_threshold:
 
245
            graph = self.source.get_known_graph_ancestry(revs)
250
246
        new_roots_stream = _new_root_data_stream(
251
247
            root_id_order, rev_id_to_root_id, parent_map, self.source, graph)
252
248
        return [('texts', new_roots_stream)]
253
249
 
254
250
 
255
 
def _get_rich_root_heads_graph(source_repo, revision_ids):
256
 
    """Get a Graph object suitable for asking heads() for new rich roots."""
257
 
    return 
258
 
 
259
 
 
260
251
def _new_root_data_stream(
261
252
    root_keys_to_create, rev_id_to_root_id_map, parent_map, repo, graph=None):
262
253
    """Generate a texts substream of synthesised root entries.
326
317
                pass
327
318
            else:
328
319
                try:
329
 
                    parent_ids.append(tree.inventory[root_id].revision)
 
320
                    parent_ids.append(tree.get_file_revision(root_id))
330
321
                except errors.NoSuchId:
331
322
                    # not in the tree
332
323
                    pass
340
331
            selected_ids.append(parent_id)
341
332
    parent_keys = [(root_id, parent_id) for parent_id in selected_ids]
342
333
    return parent_keys
 
334
 
 
335
 
 
336
class TargetRepoKinds(object):
 
337
    """An enum-like set of constants.
 
338
    
 
339
    They are the possible values of FetchSpecFactory.target_repo_kinds.
 
340
    """
 
341
    
 
342
    PREEXISTING = 'preexisting'
 
343
    STACKED = 'stacked'
 
344
    EMPTY = 'empty'
 
345
 
 
346
 
 
347
class FetchSpecFactory(object):
 
348
    """A helper for building the best fetch spec for a sprout call.
 
349
 
 
350
    Factors that go into determining the sort of fetch to perform:
 
351
     * did the caller specify any revision IDs?
 
352
     * did the caller specify a source branch (need to fetch its
 
353
       heads_to_fetch(), usually the tip + tags)
 
354
     * is there an existing target repo (don't need to refetch revs it
 
355
       already has)
 
356
     * target is stacked?  (similar to pre-existing target repo: even if
 
357
       the target itself is new don't want to refetch existing revs)
 
358
 
 
359
    :ivar source_branch: the source branch if one specified, else None.
 
360
    :ivar source_branch_stop_revision_id: fetch up to this revision of
 
361
        source_branch, rather than its tip.
 
362
    :ivar source_repo: the source repository if one found, else None.
 
363
    :ivar target_repo: the target repository acquired by sprout.
 
364
    :ivar target_repo_kind: one of the TargetRepoKinds constants.
 
365
    """
 
366
 
 
367
    def __init__(self):
 
368
        self._explicit_rev_ids = set()
 
369
        self.source_branch = None
 
370
        self.source_branch_stop_revision_id = None
 
371
        self.source_repo = None
 
372
        self.target_repo = None
 
373
        self.target_repo_kind = None
 
374
 
 
375
    def add_revision_ids(self, revision_ids):
 
376
        """Add revision_ids to the set of revision_ids to be fetched."""
 
377
        self._explicit_rev_ids.update(revision_ids)
 
378
        
 
379
    def make_fetch_spec(self):
 
380
        """Build a SearchResult or PendingAncestryResult or etc."""
 
381
        if self.target_repo_kind is None or self.source_repo is None:
 
382
            raise AssertionError(
 
383
                'Incomplete FetchSpecFactory: %r' % (self.__dict__,))
 
384
        if len(self._explicit_rev_ids) == 0 and self.source_branch is None:
 
385
            # Caller hasn't specified any revisions or source branch
 
386
            if self.target_repo_kind == TargetRepoKinds.EMPTY:
 
387
                return graph.EverythingResult(self.source_repo)
 
388
            else:
 
389
                # We want everything not already in the target (or target's
 
390
                # fallbacks).
 
391
                return graph.EverythingNotInOther(
 
392
                    self.target_repo, self.source_repo).execute()
 
393
        heads_to_fetch = set(self._explicit_rev_ids)
 
394
        if self.source_branch is not None:
 
395
            must_fetch, if_present_fetch = self.source_branch.heads_to_fetch()
 
396
            if self.source_branch_stop_revision_id is not None:
 
397
                # Replace the tip rev from must_fetch with the stop revision
 
398
                # XXX: this might be wrong if the tip rev is also in the
 
399
                # must_fetch set for other reasons (e.g. it's the tip of
 
400
                # multiple loom threads?), but then it's pretty unclear what it
 
401
                # should mean to specify a stop_revision in that case anyway.
 
402
                must_fetch.discard(self.source_branch.last_revision())
 
403
                must_fetch.add(self.source_branch_stop_revision_id)
 
404
            heads_to_fetch.update(must_fetch)
 
405
        else:
 
406
            if_present_fetch = set()
 
407
        if self.target_repo_kind == TargetRepoKinds.EMPTY:
 
408
            # PendingAncestryResult does not raise errors if a requested head
 
409
            # is absent.  Ideally it would support the
 
410
            # required_ids/if_present_ids distinction, but in practice
 
411
            # heads_to_fetch will almost certainly be present so this doesn't
 
412
            # matter much.
 
413
            all_heads = heads_to_fetch.union(if_present_fetch)
 
414
            return graph.PendingAncestryResult(all_heads, self.source_repo)
 
415
        return graph.NotInOtherForRevs(self.target_repo, self.source_repo,
 
416
            required_ids=heads_to_fetch, if_present_ids=if_present_fetch
 
417
            ).execute()
 
418
 
 
419