/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/url_policy_open.py

  • Committer: John Arbash Meinel
  • Date: 2006-04-25 15:05:42 UTC
  • mfrom: (1185.85.85 bzr-encoding)
  • mto: This revision was merged to the branch mainline in revision 1752.
  • Revision ID: john@arbash-meinel.com-20060425150542-c7b518dca9928691
[merge] the old bzr-encoding changes, reparenting them on bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2011 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Branch opening with URL-based restrictions."""
18
 
 
19
 
from __future__ import absolute_import
20
 
 
21
 
import threading
22
 
 
23
 
from . import (
24
 
    errors,
25
 
    trace,
26
 
    urlutils,
27
 
    )
28
 
from .branch import Branch
29
 
from .controldir import (
30
 
    ControlDir,
31
 
    ControlDirFormat,
32
 
    )
33
 
from .transport import (
34
 
    do_catching_redirections,
35
 
    get_transport,
36
 
    )
37
 
 
38
 
 
39
 
class BadUrl(errors.BzrError):
40
 
 
41
 
    _fmt = "Tried to access a branch from bad URL %(url)s."
42
 
 
43
 
 
44
 
class BranchReferenceForbidden(errors.BzrError):
45
 
 
46
 
    _fmt = ("Trying to mirror a branch reference and the branch type "
47
 
            "does not allow references.")
48
 
 
49
 
 
50
 
class BranchLoopError(errors.BzrError):
51
 
    """Encountered a branch cycle.
52
 
 
53
 
    A URL may point to a branch reference or it may point to a stacked branch.
54
 
    In either case, it's possible for there to be a cycle in these references,
55
 
    and this exception is raised when we detect such a cycle.
56
 
    """
57
 
 
58
 
    _fmt = "Encountered a branch cycle"""
59
 
 
60
 
 
61
 
class BranchOpenPolicy(object):
62
 
    """Policy on how to open branches.
63
 
 
64
 
    In particular, a policy determines which branches are okay to open by
65
 
    checking their URLs and deciding whether or not to follow branch
66
 
    references.
67
 
    """
68
 
 
69
 
    def should_follow_references(self):
70
 
        """Whether we traverse references when mirroring.
71
 
 
72
 
        Subclasses must override this method.
73
 
 
74
 
        If we encounter a branch reference and this returns false, an error is
75
 
        raised.
76
 
 
77
 
        :returns: A boolean to indicate whether to follow a branch reference.
78
 
        """
79
 
        raise NotImplementedError(self.should_follow_references)
80
 
 
81
 
    def transform_fallback_location(self, branch, url):
82
 
        """Validate, maybe modify, 'url' to be used as a stacked-on location.
83
 
 
84
 
        :param branch:  The branch that is being opened.
85
 
        :param url: The URL that the branch provides for its stacked-on
86
 
            location.
87
 
        :return: (new_url, check) where 'new_url' is the URL of the branch to
88
 
            actually open and 'check' is true if 'new_url' needs to be
89
 
            validated by check_and_follow_branch_reference.
90
 
        """
91
 
        raise NotImplementedError(self.transform_fallback_location)
92
 
 
93
 
    def check_one_url(self, url):
94
 
        """Check a URL.
95
 
 
96
 
        Subclasses must override this method.
97
 
 
98
 
        :param url: The source URL to check.
99
 
        :raise BadUrl: subclasses are expected to raise this or a subclass
100
 
            when it finds a URL it deems to be unacceptable.
101
 
        """
102
 
        raise NotImplementedError(self.check_one_url)
103
 
 
104
 
 
105
 
class _BlacklistPolicy(BranchOpenPolicy):
106
 
    """Branch policy that forbids certain URLs.
107
 
 
108
 
    This doesn't cope with various alternative spellings of URLs,
109
 
    with e.g. url encoding. It's mostly useful for tests.
110
 
    """
111
 
 
112
 
    def __init__(self, should_follow_references, bad_urls=None):
113
 
        if bad_urls is None:
114
 
            bad_urls = set()
115
 
        self._bad_urls = bad_urls
116
 
        self._should_follow_references = should_follow_references
117
 
 
118
 
    def should_follow_references(self):
119
 
        return self._should_follow_references
120
 
 
121
 
    def check_one_url(self, url):
122
 
        if url in self._bad_urls:
123
 
            raise BadUrl(url)
124
 
 
125
 
    def transform_fallback_location(self, branch, url):
126
 
        """See `BranchOpenPolicy.transform_fallback_location`.
127
 
 
128
 
        This class is not used for testing our smarter stacking features so we
129
 
        just do the simplest thing: return the URL that would be used anyway
130
 
        and don't check it.
131
 
        """
132
 
        return urlutils.join(branch.base, url), False
133
 
 
134
 
 
135
 
class AcceptAnythingPolicy(_BlacklistPolicy):
136
 
    """Accept anything, to make testing easier."""
137
 
 
138
 
    def __init__(self):
139
 
        super(AcceptAnythingPolicy, self).__init__(True, set())
140
 
 
141
 
 
142
 
class WhitelistPolicy(BranchOpenPolicy):
143
 
    """Branch policy that only allows certain URLs."""
144
 
 
145
 
    def __init__(self, should_follow_references, allowed_urls=None,
146
 
                 check=False):
147
 
        if allowed_urls is None:
148
 
            allowed_urls = []
149
 
        self.allowed_urls = set(url.rstrip('/') for url in allowed_urls)
150
 
        self.check = check
151
 
 
152
 
    def should_follow_references(self):
153
 
        return self._should_follow_references
154
 
 
155
 
    def check_one_url(self, url):
156
 
        if url.rstrip('/') not in self.allowed_urls:
157
 
            raise BadUrl(url)
158
 
 
159
 
    def transform_fallback_location(self, branch, url):
160
 
        """See `BranchOpenPolicy.transform_fallback_location`.
161
 
 
162
 
        Here we return the URL that would be used anyway and optionally check
163
 
        it.
164
 
        """
165
 
        return urlutils.join(branch.base, url), self.check
166
 
 
167
 
 
168
 
class SingleSchemePolicy(BranchOpenPolicy):
169
 
    """Branch open policy that rejects URLs not on the given scheme."""
170
 
 
171
 
    def __init__(self, allowed_scheme):
172
 
        self.allowed_scheme = allowed_scheme
173
 
 
174
 
    def should_follow_references(self):
175
 
        return True
176
 
 
177
 
    def transform_fallback_location(self, branch, url):
178
 
        return urlutils.join(branch.base, url), True
179
 
 
180
 
    def check_one_url(self, url):
181
 
        """Check that `url` is okay to open."""
182
 
        if urlutils.URL.from_string(str(url)).scheme != self.allowed_scheme:
183
 
            raise BadUrl(url)
184
 
 
185
 
 
186
 
class BranchOpener(object):
187
 
    """Branch opener which uses a URL policy.
188
 
 
189
 
    All locations that are opened (stacked-on branches, references) are
190
 
    checked against a policy object.
191
 
 
192
 
    The policy object is expected to have the following methods:
193
 
    * check_one_url
194
 
    * should_follow_references
195
 
    * transform_fallback_location
196
 
    """
197
 
 
198
 
    _threading_data = threading.local()
199
 
 
200
 
    def __init__(self, policy, probers=None):
201
 
        """Create a new BranchOpener.
202
 
 
203
 
        :param policy: The opener policy to use.
204
 
        :param probers: Optional list of probers to allow.
205
 
            Defaults to local and remote bzr probers.
206
 
        """
207
 
        self.policy = policy
208
 
        self._seen_urls = set()
209
 
        if probers is None:
210
 
            probers = ControlDirFormat.all_probers()
211
 
        self.probers = probers
212
 
 
213
 
    @classmethod
214
 
    def install_hook(cls):
215
 
        """Install the ``transform_fallback_location`` hook.
216
 
 
217
 
        This is done at module import time, but transform_fallback_locationHook
218
 
        doesn't do anything unless the `_active_openers` threading.Local
219
 
        object has a 'opener' attribute in this thread.
220
 
 
221
 
        This is in a module-level function rather than performed at module
222
 
        level so that it can be called in setUp for testing `BranchOpener`
223
 
        as breezy.tests.TestCase.setUp clears hooks.
224
 
        """
225
 
        Branch.hooks.install_named_hook(
226
 
            'transform_fallback_location',
227
 
            cls.transform_fallback_locationHook,
228
 
            'BranchOpener.transform_fallback_locationHook')
229
 
 
230
 
    def check_and_follow_branch_reference(self, url):
231
 
        """Check URL (and possibly the referenced URL).
232
 
 
233
 
        This method checks that `url` passes the policy's `check_one_url`
234
 
        method, and if `url` refers to a branch reference, it checks whether
235
 
        references are allowed and whether the reference's URL passes muster
236
 
        also -- recursively, until a real branch is found.
237
 
 
238
 
        :param url: URL to check
239
 
        :raise BranchLoopError: If the branch references form a loop.
240
 
        :raise BranchReferenceForbidden: If this opener forbids branch
241
 
            references.
242
 
        """
243
 
        while True:
244
 
            if url in self._seen_urls:
245
 
                raise BranchLoopError()
246
 
            self._seen_urls.add(url)
247
 
            self.policy.check_one_url(url)
248
 
            next_url = self.follow_reference(url)
249
 
            if next_url is None:
250
 
                return url
251
 
            url = next_url
252
 
            if not self.policy.should_follow_references():
253
 
                raise BranchReferenceForbidden(url)
254
 
 
255
 
    @classmethod
256
 
    def transform_fallback_locationHook(cls, branch, url):
257
 
        """Installed as the 'transform_fallback_location' Branch hook.
258
 
 
259
 
        This method calls `transform_fallback_location` on the policy object
260
 
        and either returns the url it provides or passes it back to
261
 
        check_and_follow_branch_reference.
262
 
        """
263
 
        try:
264
 
            opener = getattr(cls._threading_data, "opener")
265
 
        except AttributeError:
266
 
            return url
267
 
        new_url, check = opener.policy.transform_fallback_location(branch, url)
268
 
        if check:
269
 
            return opener.check_and_follow_branch_reference(new_url)
270
 
        else:
271
 
            return new_url
272
 
 
273
 
    def run_with_transform_fallback_location_hook_installed(
274
 
            self, callable, *args, **kw):
275
 
        if (self.transform_fallback_locationHook not in
276
 
                Branch.hooks['transform_fallback_location']):
277
 
            raise AssertionError("hook not installed")
278
 
        self._threading_data.opener = self
279
 
        try:
280
 
            return callable(*args, **kw)
281
 
        finally:
282
 
            del self._threading_data.opener
283
 
            # We reset _seen_urls here to avoid multiple calls to open giving
284
 
            # spurious loop exceptions.
285
 
            self._seen_urls = set()
286
 
 
287
 
    def _open_dir(self, url):
288
 
        """Simple BzrDir.open clone that only uses specific probers.
289
 
 
290
 
        :param url: URL to open
291
 
        :return: ControlDir instance
292
 
        """
293
 
        def redirected(transport, e, redirection_notice):
294
 
            self.policy.check_one_url(e.target)
295
 
            redirected_transport = transport._redirected_to(
296
 
                e.source, e.target)
297
 
            if redirected_transport is None:
298
 
                raise errors.NotBranchError(e.source)
299
 
            trace.note(
300
 
                '%s is%s redirected to %s',
301
 
                transport.base, e.permanently, redirected_transport.base)
302
 
            return redirected_transport
303
 
 
304
 
        def find_format(transport):
305
 
            last_error = errors.NotBranchError(transport.base)
306
 
            for prober_kls in self.probers:
307
 
                prober = prober_kls()
308
 
                try:
309
 
                    return transport, prober.probe_transport(transport)
310
 
                except errors.NotBranchError as e:
311
 
                    last_error = e
312
 
            else:
313
 
                raise last_error
314
 
        transport = get_transport(url)
315
 
        transport, format = do_catching_redirections(
316
 
            find_format, transport, redirected)
317
 
        return format.open(transport)
318
 
 
319
 
    def follow_reference(self, url):
320
 
        """Get the branch-reference value at the specified url.
321
 
 
322
 
        This exists as a separate method only to be overriden in unit tests.
323
 
        """
324
 
        controldir = self._open_dir(url)
325
 
        return controldir.get_branch_reference()
326
 
 
327
 
    def open(self, url, ignore_fallbacks=False):
328
 
        """Open the Bazaar branch at url, first checking it.
329
 
 
330
 
        What is acceptable means is defined by the policy's `follow_reference`
331
 
        and `check_one_url` methods.
332
 
        """
333
 
        if not isinstance(url, str):
334
 
            raise TypeError
335
 
 
336
 
        url = self.check_and_follow_branch_reference(url)
337
 
 
338
 
        def open_branch(url, ignore_fallbacks):
339
 
            dir = self._open_dir(url)
340
 
            return dir.open_branch(ignore_fallbacks=ignore_fallbacks)
341
 
        return self.run_with_transform_fallback_location_hook_installed(
342
 
            open_branch, url, ignore_fallbacks)
343
 
 
344
 
 
345
 
def open_only_scheme(allowed_scheme, url):
346
 
    """Open the branch at `url`, only accessing URLs on `allowed_scheme`.
347
 
 
348
 
    :raises BadUrl: An attempt was made to open a URL that was not on
349
 
        `allowed_scheme`.
350
 
    """
351
 
    return BranchOpener(SingleSchemePolicy(allowed_scheme)).open(url)
352
 
 
353
 
 
354
 
BranchOpener.install_hook()