/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/url_policy_open.py

  • Committer: Jelmer Vernooij
  • Date: 2020-02-18 01:57:45 UTC
  • mto: This revision was merged to the branch mainline in revision 7493.
  • Revision ID: jelmer@jelmer.uk-20200218015745-q2ss9tsk74h4nh61
drop use of future.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Branch opening with URL-based restrictions."""
 
18
 
 
19
import threading
 
20
 
 
21
from . import (
 
22
    errors,
 
23
    trace,
 
24
    urlutils,
 
25
    )
 
26
from .branch import Branch
 
27
from .controldir import (
 
28
    ControlDir,
 
29
    ControlDirFormat,
 
30
    )
 
31
from .transport import (
 
32
    do_catching_redirections,
 
33
    get_transport,
 
34
    )
 
35
 
 
36
 
 
37
class BadUrl(errors.BzrError):
 
38
 
 
39
    _fmt = "Tried to access a branch from bad URL %(url)s."
 
40
 
 
41
 
 
42
class BranchReferenceForbidden(errors.BzrError):
 
43
 
 
44
    _fmt = ("Trying to mirror a branch reference and the branch type "
 
45
            "does not allow references.")
 
46
 
 
47
 
 
48
class BranchLoopError(errors.BzrError):
 
49
    """Encountered a branch cycle.
 
50
 
 
51
    A URL may point to a branch reference or it may point to a stacked branch.
 
52
    In either case, it's possible for there to be a cycle in these references,
 
53
    and this exception is raised when we detect such a cycle.
 
54
    """
 
55
 
 
56
    _fmt = "Encountered a branch cycle"""
 
57
 
 
58
 
 
59
class BranchOpenPolicy(object):
 
60
    """Policy on how to open branches.
 
61
 
 
62
    In particular, a policy determines which branches are okay to open by
 
63
    checking their URLs and deciding whether or not to follow branch
 
64
    references.
 
65
    """
 
66
 
 
67
    def should_follow_references(self):
 
68
        """Whether we traverse references when mirroring.
 
69
 
 
70
        Subclasses must override this method.
 
71
 
 
72
        If we encounter a branch reference and this returns false, an error is
 
73
        raised.
 
74
 
 
75
        :returns: A boolean to indicate whether to follow a branch reference.
 
76
        """
 
77
        raise NotImplementedError(self.should_follow_references)
 
78
 
 
79
    def transform_fallback_location(self, branch, url):
 
80
        """Validate, maybe modify, 'url' to be used as a stacked-on location.
 
81
 
 
82
        :param branch:  The branch that is being opened.
 
83
        :param url: The URL that the branch provides for its stacked-on
 
84
            location.
 
85
        :return: (new_url, check) where 'new_url' is the URL of the branch to
 
86
            actually open and 'check' is true if 'new_url' needs to be
 
87
            validated by check_and_follow_branch_reference.
 
88
        """
 
89
        raise NotImplementedError(self.transform_fallback_location)
 
90
 
 
91
    def check_one_url(self, url):
 
92
        """Check a URL.
 
93
 
 
94
        Subclasses must override this method.
 
95
 
 
96
        :param url: The source URL to check.
 
97
        :raise BadUrl: subclasses are expected to raise this or a subclass
 
98
            when it finds a URL it deems to be unacceptable.
 
99
        """
 
100
        raise NotImplementedError(self.check_one_url)
 
101
 
 
102
 
 
103
class _BlacklistPolicy(BranchOpenPolicy):
 
104
    """Branch policy that forbids certain URLs.
 
105
 
 
106
    This doesn't cope with various alternative spellings of URLs,
 
107
    with e.g. url encoding. It's mostly useful for tests.
 
108
    """
 
109
 
 
110
    def __init__(self, should_follow_references, bad_urls=None):
 
111
        if bad_urls is None:
 
112
            bad_urls = set()
 
113
        self._bad_urls = bad_urls
 
114
        self._should_follow_references = should_follow_references
 
115
 
 
116
    def should_follow_references(self):
 
117
        return self._should_follow_references
 
118
 
 
119
    def check_one_url(self, url):
 
120
        if url in self._bad_urls:
 
121
            raise BadUrl(url)
 
122
 
 
123
    def transform_fallback_location(self, branch, url):
 
124
        """See `BranchOpenPolicy.transform_fallback_location`.
 
125
 
 
126
        This class is not used for testing our smarter stacking features so we
 
127
        just do the simplest thing: return the URL that would be used anyway
 
128
        and don't check it.
 
129
        """
 
130
        return urlutils.join(branch.base, url), False
 
131
 
 
132
 
 
133
class AcceptAnythingPolicy(_BlacklistPolicy):
 
134
    """Accept anything, to make testing easier."""
 
135
 
 
136
    def __init__(self):
 
137
        super(AcceptAnythingPolicy, self).__init__(True, set())
 
138
 
 
139
 
 
140
class WhitelistPolicy(BranchOpenPolicy):
 
141
    """Branch policy that only allows certain URLs."""
 
142
 
 
143
    def __init__(self, should_follow_references, allowed_urls=None,
 
144
                 check=False):
 
145
        if allowed_urls is None:
 
146
            allowed_urls = []
 
147
        self.allowed_urls = set(url.rstrip('/') for url in allowed_urls)
 
148
        self.check = check
 
149
 
 
150
    def should_follow_references(self):
 
151
        return self._should_follow_references
 
152
 
 
153
    def check_one_url(self, url):
 
154
        if url.rstrip('/') not in self.allowed_urls:
 
155
            raise BadUrl(url)
 
156
 
 
157
    def transform_fallback_location(self, branch, url):
 
158
        """See `BranchOpenPolicy.transform_fallback_location`.
 
159
 
 
160
        Here we return the URL that would be used anyway and optionally check
 
161
        it.
 
162
        """
 
163
        return urlutils.join(branch.base, url), self.check
 
164
 
 
165
 
 
166
class SingleSchemePolicy(BranchOpenPolicy):
 
167
    """Branch open policy that rejects URLs not on the given scheme."""
 
168
 
 
169
    def __init__(self, allowed_scheme):
 
170
        self.allowed_scheme = allowed_scheme
 
171
 
 
172
    def should_follow_references(self):
 
173
        return True
 
174
 
 
175
    def transform_fallback_location(self, branch, url):
 
176
        return urlutils.join(branch.base, url), True
 
177
 
 
178
    def check_one_url(self, url):
 
179
        """Check that `url` is okay to open."""
 
180
        if urlutils.URL.from_string(str(url)).scheme != self.allowed_scheme:
 
181
            raise BadUrl(url)
 
182
 
 
183
 
 
184
class BranchOpener(object):
 
185
    """Branch opener which uses a URL policy.
 
186
 
 
187
    All locations that are opened (stacked-on branches, references) are
 
188
    checked against a policy object.
 
189
 
 
190
    The policy object is expected to have the following methods:
 
191
    * check_one_url
 
192
    * should_follow_references
 
193
    * transform_fallback_location
 
194
    """
 
195
 
 
196
    _threading_data = threading.local()
 
197
 
 
198
    def __init__(self, policy, probers=None):
 
199
        """Create a new BranchOpener.
 
200
 
 
201
        :param policy: The opener policy to use.
 
202
        :param probers: Optional list of probers to allow.
 
203
            Defaults to local and remote bzr probers.
 
204
        """
 
205
        self.policy = policy
 
206
        self._seen_urls = set()
 
207
        if probers is None:
 
208
            probers = ControlDirFormat.all_probers()
 
209
        self.probers = probers
 
210
 
 
211
    @classmethod
 
212
    def install_hook(cls):
 
213
        """Install the ``transform_fallback_location`` hook.
 
214
 
 
215
        This is done at module import time, but transform_fallback_locationHook
 
216
        doesn't do anything unless the `_active_openers` threading.Local
 
217
        object has a 'opener' attribute in this thread.
 
218
 
 
219
        This is in a module-level function rather than performed at module
 
220
        level so that it can be called in setUp for testing `BranchOpener`
 
221
        as breezy.tests.TestCase.setUp clears hooks.
 
222
        """
 
223
        Branch.hooks.install_named_hook(
 
224
            'transform_fallback_location',
 
225
            cls.transform_fallback_locationHook,
 
226
            'BranchOpener.transform_fallback_locationHook')
 
227
 
 
228
    def check_and_follow_branch_reference(self, url):
 
229
        """Check URL (and possibly the referenced URL).
 
230
 
 
231
        This method checks that `url` passes the policy's `check_one_url`
 
232
        method, and if `url` refers to a branch reference, it checks whether
 
233
        references are allowed and whether the reference's URL passes muster
 
234
        also -- recursively, until a real branch is found.
 
235
 
 
236
        :param url: URL to check
 
237
        :raise BranchLoopError: If the branch references form a loop.
 
238
        :raise BranchReferenceForbidden: If this opener forbids branch
 
239
            references.
 
240
        """
 
241
        while True:
 
242
            if url in self._seen_urls:
 
243
                raise BranchLoopError()
 
244
            self._seen_urls.add(url)
 
245
            self.policy.check_one_url(url)
 
246
            next_url = self.follow_reference(url)
 
247
            if next_url is None:
 
248
                return url
 
249
            url = next_url
 
250
            if not self.policy.should_follow_references():
 
251
                raise BranchReferenceForbidden(url)
 
252
 
 
253
    @classmethod
 
254
    def transform_fallback_locationHook(cls, branch, url):
 
255
        """Installed as the 'transform_fallback_location' Branch hook.
 
256
 
 
257
        This method calls `transform_fallback_location` on the policy object
 
258
        and either returns the url it provides or passes it back to
 
259
        check_and_follow_branch_reference.
 
260
        """
 
261
        try:
 
262
            opener = getattr(cls._threading_data, "opener")
 
263
        except AttributeError:
 
264
            return url
 
265
        new_url, check = opener.policy.transform_fallback_location(branch, url)
 
266
        if check:
 
267
            return opener.check_and_follow_branch_reference(new_url)
 
268
        else:
 
269
            return new_url
 
270
 
 
271
    def run_with_transform_fallback_location_hook_installed(
 
272
            self, callable, *args, **kw):
 
273
        if (self.transform_fallback_locationHook not in
 
274
                Branch.hooks['transform_fallback_location']):
 
275
            raise AssertionError("hook not installed")
 
276
        self._threading_data.opener = self
 
277
        try:
 
278
            return callable(*args, **kw)
 
279
        finally:
 
280
            del self._threading_data.opener
 
281
            # We reset _seen_urls here to avoid multiple calls to open giving
 
282
            # spurious loop exceptions.
 
283
            self._seen_urls = set()
 
284
 
 
285
    def _open_dir(self, url):
 
286
        """Simple BzrDir.open clone that only uses specific probers.
 
287
 
 
288
        :param url: URL to open
 
289
        :return: ControlDir instance
 
290
        """
 
291
        def redirected(transport, e, redirection_notice):
 
292
            self.policy.check_one_url(e.target)
 
293
            redirected_transport = transport._redirected_to(
 
294
                e.source, e.target)
 
295
            if redirected_transport is None:
 
296
                raise errors.NotBranchError(e.source)
 
297
            trace.note(
 
298
                '%s is%s redirected to %s',
 
299
                transport.base, e.permanently, redirected_transport.base)
 
300
            return redirected_transport
 
301
 
 
302
        def find_format(transport):
 
303
            last_error = errors.NotBranchError(transport.base)
 
304
            for prober_kls in self.probers:
 
305
                prober = prober_kls()
 
306
                try:
 
307
                    return transport, prober.probe_transport(transport)
 
308
                except errors.NotBranchError as e:
 
309
                    last_error = e
 
310
            else:
 
311
                raise last_error
 
312
        transport = get_transport(url)
 
313
        transport, format = do_catching_redirections(
 
314
            find_format, transport, redirected)
 
315
        return format.open(transport)
 
316
 
 
317
    def follow_reference(self, url):
 
318
        """Get the branch-reference value at the specified url.
 
319
 
 
320
        This exists as a separate method only to be overriden in unit tests.
 
321
        """
 
322
        controldir = self._open_dir(url)
 
323
        return controldir.get_branch_reference()
 
324
 
 
325
    def open(self, url, ignore_fallbacks=False):
 
326
        """Open the Bazaar branch at url, first checking it.
 
327
 
 
328
        What is acceptable means is defined by the policy's `follow_reference`
 
329
        and `check_one_url` methods.
 
330
        """
 
331
        if not isinstance(url, str):
 
332
            raise TypeError
 
333
 
 
334
        url = self.check_and_follow_branch_reference(url)
 
335
 
 
336
        def open_branch(url, ignore_fallbacks):
 
337
            dir = self._open_dir(url)
 
338
            return dir.open_branch(ignore_fallbacks=ignore_fallbacks)
 
339
        return self.run_with_transform_fallback_location_hook_installed(
 
340
            open_branch, url, ignore_fallbacks)
 
341
 
 
342
 
 
343
def open_only_scheme(allowed_scheme, url):
 
344
    """Open the branch at `url`, only accessing URLs on `allowed_scheme`.
 
345
 
 
346
    :raises BadUrl: An attempt was made to open a URL that was not on
 
347
        `allowed_scheme`.
 
348
    """
 
349
    return BranchOpener(SingleSchemePolicy(allowed_scheme)).open(url)
 
350
 
 
351
 
 
352
BranchOpener.install_hook()