/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/url_policy_open.py

  • Committer: Jelmer Vernooij
  • Date: 2020-04-05 19:11:34 UTC
  • mto: (7490.7.16 work)
  • mto: This revision was merged to the branch mainline in revision 7501.
  • Revision ID: jelmer@jelmer.uk-20200405191134-0aebh8ikiwygxma5
Populate the .gitignore file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Branch opening with URL-based restrictions."""
 
18
 
 
19
from __future__ import absolute_import
 
20
 
 
21
import threading
 
22
 
 
23
from . import (
 
24
    errors,
 
25
    trace,
 
26
    urlutils,
 
27
    )
 
28
from .branch import Branch
 
29
from .controldir import (
 
30
    ControlDir,
 
31
    ControlDirFormat,
 
32
    )
 
33
from .transport import (
 
34
    do_catching_redirections,
 
35
    get_transport,
 
36
    )
 
37
 
 
38
 
 
39
class BadUrl(errors.BzrError):
 
40
 
 
41
    _fmt = "Tried to access a branch from bad URL %(url)s."
 
42
 
 
43
 
 
44
class BranchReferenceForbidden(errors.BzrError):
 
45
 
 
46
    _fmt = ("Trying to mirror a branch reference and the branch type "
 
47
            "does not allow references.")
 
48
 
 
49
 
 
50
class BranchLoopError(errors.BzrError):
 
51
    """Encountered a branch cycle.
 
52
 
 
53
    A URL may point to a branch reference or it may point to a stacked branch.
 
54
    In either case, it's possible for there to be a cycle in these references,
 
55
    and this exception is raised when we detect such a cycle.
 
56
    """
 
57
 
 
58
    _fmt = "Encountered a branch cycle"""
 
59
 
 
60
 
 
61
class BranchOpenPolicy(object):
 
62
    """Policy on how to open branches.
 
63
 
 
64
    In particular, a policy determines which branches are okay to open by
 
65
    checking their URLs and deciding whether or not to follow branch
 
66
    references.
 
67
    """
 
68
 
 
69
    def should_follow_references(self):
 
70
        """Whether we traverse references when mirroring.
 
71
 
 
72
        Subclasses must override this method.
 
73
 
 
74
        If we encounter a branch reference and this returns false, an error is
 
75
        raised.
 
76
 
 
77
        :returns: A boolean to indicate whether to follow a branch reference.
 
78
        """
 
79
        raise NotImplementedError(self.should_follow_references)
 
80
 
 
81
    def transform_fallback_location(self, branch, url):
 
82
        """Validate, maybe modify, 'url' to be used as a stacked-on location.
 
83
 
 
84
        :param branch:  The branch that is being opened.
 
85
        :param url: The URL that the branch provides for its stacked-on
 
86
            location.
 
87
        :return: (new_url, check) where 'new_url' is the URL of the branch to
 
88
            actually open and 'check' is true if 'new_url' needs to be
 
89
            validated by check_and_follow_branch_reference.
 
90
        """
 
91
        raise NotImplementedError(self.transform_fallback_location)
 
92
 
 
93
    def check_one_url(self, url):
 
94
        """Check a URL.
 
95
 
 
96
        Subclasses must override this method.
 
97
 
 
98
        :param url: The source URL to check.
 
99
        :raise BadUrl: subclasses are expected to raise this or a subclass
 
100
            when it finds a URL it deems to be unacceptable.
 
101
        """
 
102
        raise NotImplementedError(self.check_one_url)
 
103
 
 
104
 
 
105
class _BlacklistPolicy(BranchOpenPolicy):
 
106
    """Branch policy that forbids certain URLs.
 
107
 
 
108
    This doesn't cope with various alternative spellings of URLs,
 
109
    with e.g. url encoding. It's mostly useful for tests.
 
110
    """
 
111
 
 
112
    def __init__(self, should_follow_references, bad_urls=None):
 
113
        if bad_urls is None:
 
114
            bad_urls = set()
 
115
        self._bad_urls = bad_urls
 
116
        self._should_follow_references = should_follow_references
 
117
 
 
118
    def should_follow_references(self):
 
119
        return self._should_follow_references
 
120
 
 
121
    def check_one_url(self, url):
 
122
        if url in self._bad_urls:
 
123
            raise BadUrl(url)
 
124
 
 
125
    def transform_fallback_location(self, branch, url):
 
126
        """See `BranchOpenPolicy.transform_fallback_location`.
 
127
 
 
128
        This class is not used for testing our smarter stacking features so we
 
129
        just do the simplest thing: return the URL that would be used anyway
 
130
        and don't check it.
 
131
        """
 
132
        return urlutils.join(branch.base, url), False
 
133
 
 
134
 
 
135
class AcceptAnythingPolicy(_BlacklistPolicy):
 
136
    """Accept anything, to make testing easier."""
 
137
 
 
138
    def __init__(self):
 
139
        super(AcceptAnythingPolicy, self).__init__(True, set())
 
140
 
 
141
 
 
142
class WhitelistPolicy(BranchOpenPolicy):
 
143
    """Branch policy that only allows certain URLs."""
 
144
 
 
145
    def __init__(self, should_follow_references, allowed_urls=None,
 
146
                 check=False):
 
147
        if allowed_urls is None:
 
148
            allowed_urls = []
 
149
        self.allowed_urls = set(url.rstrip('/') for url in allowed_urls)
 
150
        self.check = check
 
151
 
 
152
    def should_follow_references(self):
 
153
        return self._should_follow_references
 
154
 
 
155
    def check_one_url(self, url):
 
156
        if url.rstrip('/') not in self.allowed_urls:
 
157
            raise BadUrl(url)
 
158
 
 
159
    def transform_fallback_location(self, branch, url):
 
160
        """See `BranchOpenPolicy.transform_fallback_location`.
 
161
 
 
162
        Here we return the URL that would be used anyway and optionally check
 
163
        it.
 
164
        """
 
165
        return urlutils.join(branch.base, url), self.check
 
166
 
 
167
 
 
168
class SingleSchemePolicy(BranchOpenPolicy):
 
169
    """Branch open policy that rejects URLs not on the given scheme."""
 
170
 
 
171
    def __init__(self, allowed_scheme):
 
172
        self.allowed_scheme = allowed_scheme
 
173
 
 
174
    def should_follow_references(self):
 
175
        return True
 
176
 
 
177
    def transform_fallback_location(self, branch, url):
 
178
        return urlutils.join(branch.base, url), True
 
179
 
 
180
    def check_one_url(self, url):
 
181
        """Check that `url` is okay to open."""
 
182
        if urlutils.URL.from_string(str(url)).scheme != self.allowed_scheme:
 
183
            raise BadUrl(url)
 
184
 
 
185
 
 
186
class BranchOpener(object):
 
187
    """Branch opener which uses a URL policy.
 
188
 
 
189
    All locations that are opened (stacked-on branches, references) are
 
190
    checked against a policy object.
 
191
 
 
192
    The policy object is expected to have the following methods:
 
193
    * check_one_url
 
194
    * should_follow_references
 
195
    * transform_fallback_location
 
196
    """
 
197
 
 
198
    _threading_data = threading.local()
 
199
 
 
200
    def __init__(self, policy, probers=None):
 
201
        """Create a new BranchOpener.
 
202
 
 
203
        :param policy: The opener policy to use.
 
204
        :param probers: Optional list of probers to allow.
 
205
            Defaults to local and remote bzr probers.
 
206
        """
 
207
        self.policy = policy
 
208
        self._seen_urls = set()
 
209
        if probers is None:
 
210
            probers = ControlDirFormat.all_probers()
 
211
        self.probers = probers
 
212
 
 
213
    @classmethod
 
214
    def install_hook(cls):
 
215
        """Install the ``transform_fallback_location`` hook.
 
216
 
 
217
        This is done at module import time, but transform_fallback_locationHook
 
218
        doesn't do anything unless the `_active_openers` threading.Local
 
219
        object has a 'opener' attribute in this thread.
 
220
 
 
221
        This is in a module-level function rather than performed at module
 
222
        level so that it can be called in setUp for testing `BranchOpener`
 
223
        as breezy.tests.TestCase.setUp clears hooks.
 
224
        """
 
225
        Branch.hooks.install_named_hook(
 
226
            'transform_fallback_location',
 
227
            cls.transform_fallback_locationHook,
 
228
            'BranchOpener.transform_fallback_locationHook')
 
229
 
 
230
    def check_and_follow_branch_reference(self, url):
 
231
        """Check URL (and possibly the referenced URL).
 
232
 
 
233
        This method checks that `url` passes the policy's `check_one_url`
 
234
        method, and if `url` refers to a branch reference, it checks whether
 
235
        references are allowed and whether the reference's URL passes muster
 
236
        also -- recursively, until a real branch is found.
 
237
 
 
238
        :param url: URL to check
 
239
        :raise BranchLoopError: If the branch references form a loop.
 
240
        :raise BranchReferenceForbidden: If this opener forbids branch
 
241
            references.
 
242
        """
 
243
        while True:
 
244
            if url in self._seen_urls:
 
245
                raise BranchLoopError()
 
246
            self._seen_urls.add(url)
 
247
            self.policy.check_one_url(url)
 
248
            next_url = self.follow_reference(url)
 
249
            if next_url is None:
 
250
                return url
 
251
            url = next_url
 
252
            if not self.policy.should_follow_references():
 
253
                raise BranchReferenceForbidden(url)
 
254
 
 
255
    @classmethod
 
256
    def transform_fallback_locationHook(cls, branch, url):
 
257
        """Installed as the 'transform_fallback_location' Branch hook.
 
258
 
 
259
        This method calls `transform_fallback_location` on the policy object
 
260
        and either returns the url it provides or passes it back to
 
261
        check_and_follow_branch_reference.
 
262
        """
 
263
        try:
 
264
            opener = getattr(cls._threading_data, "opener")
 
265
        except AttributeError:
 
266
            return url
 
267
        new_url, check = opener.policy.transform_fallback_location(branch, url)
 
268
        if check:
 
269
            return opener.check_and_follow_branch_reference(new_url)
 
270
        else:
 
271
            return new_url
 
272
 
 
273
    def run_with_transform_fallback_location_hook_installed(
 
274
            self, callable, *args, **kw):
 
275
        if (self.transform_fallback_locationHook not in
 
276
                Branch.hooks['transform_fallback_location']):
 
277
            raise AssertionError("hook not installed")
 
278
        self._threading_data.opener = self
 
279
        try:
 
280
            return callable(*args, **kw)
 
281
        finally:
 
282
            del self._threading_data.opener
 
283
            # We reset _seen_urls here to avoid multiple calls to open giving
 
284
            # spurious loop exceptions.
 
285
            self._seen_urls = set()
 
286
 
 
287
    def _open_dir(self, url):
 
288
        """Simple BzrDir.open clone that only uses specific probers.
 
289
 
 
290
        :param url: URL to open
 
291
        :return: ControlDir instance
 
292
        """
 
293
        def redirected(transport, e, redirection_notice):
 
294
            self.policy.check_one_url(e.target)
 
295
            redirected_transport = transport._redirected_to(
 
296
                e.source, e.target)
 
297
            if redirected_transport is None:
 
298
                raise errors.NotBranchError(e.source)
 
299
            trace.note(
 
300
                '%s is%s redirected to %s',
 
301
                transport.base, e.permanently, redirected_transport.base)
 
302
            return redirected_transport
 
303
 
 
304
        def find_format(transport):
 
305
            last_error = errors.NotBranchError(transport.base)
 
306
            for prober_kls in self.probers:
 
307
                prober = prober_kls()
 
308
                try:
 
309
                    return transport, prober.probe_transport(transport)
 
310
                except errors.NotBranchError as e:
 
311
                    last_error = e
 
312
            else:
 
313
                raise last_error
 
314
        transport = get_transport(url)
 
315
        transport, format = do_catching_redirections(
 
316
            find_format, transport, redirected)
 
317
        return format.open(transport)
 
318
 
 
319
    def follow_reference(self, url):
 
320
        """Get the branch-reference value at the specified url.
 
321
 
 
322
        This exists as a separate method only to be overriden in unit tests.
 
323
        """
 
324
        controldir = self._open_dir(url)
 
325
        return controldir.get_branch_reference()
 
326
 
 
327
    def open(self, url, ignore_fallbacks=False):
 
328
        """Open the Bazaar branch at url, first checking it.
 
329
 
 
330
        What is acceptable means is defined by the policy's `follow_reference`
 
331
        and `check_one_url` methods.
 
332
        """
 
333
        if not isinstance(url, str):
 
334
            raise TypeError
 
335
 
 
336
        url = self.check_and_follow_branch_reference(url)
 
337
 
 
338
        def open_branch(url, ignore_fallbacks):
 
339
            dir = self._open_dir(url)
 
340
            return dir.open_branch(ignore_fallbacks=ignore_fallbacks)
 
341
        return self.run_with_transform_fallback_location_hook_installed(
 
342
            open_branch, url, ignore_fallbacks)
 
343
 
 
344
 
 
345
def open_only_scheme(allowed_scheme, url):
 
346
    """Open the branch at `url`, only accessing URLs on `allowed_scheme`.
 
347
 
 
348
    :raises BadUrl: An attempt was made to open a URL that was not on
 
349
        `allowed_scheme`.
 
350
    """
 
351
    return BranchOpener(SingleSchemePolicy(allowed_scheme)).open(url)
 
352
 
 
353
 
 
354
BranchOpener.install_hook()