1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Implementation of Transport that prevents access to locations above a set
20
from urlparse import urlparse
22
from bzrlib import errors, urlutils
23
from bzrlib.transport import (
30
from bzrlib.transport.decorator import TransportDecorator, DecoratorServer
31
from bzrlib.transport.memory import MemoryTransport
34
class ChrootServer(Server):
35
"""Server for chroot transports."""
37
def __init__(self, backing_transport):
38
self.backing_transport = backing_transport
40
def _factory(self, url):
41
assert url.startswith(self.scheme)
42
return ChrootTransport(self, url)
48
self.scheme = 'chroot-%d:///' % id(self)
49
register_transport(self.scheme, self._factory)
52
unregister_transport(self.scheme, self._factory)
55
class ChrootTransport(Transport):
57
def __init__(self, server, base):
59
if not base.endswith('/'):
61
Transport.__init__(self, base)
62
self.base_path = self.base[len(self.server.scheme)-1:]
63
self.scheme = self.server.scheme
65
def _call(self, methodname, relpath, *args):
66
method = getattr(self.server.backing_transport, methodname)
67
return method(self._safe_relpath(relpath), *args)
69
def _safe_relpath(self, relpath):
70
safe_relpath = self._combine_paths(self.base_path, relpath)
71
assert safe_relpath.startswith('/')
72
return safe_relpath[1:]
75
def abspath(self, relpath):
76
return self.scheme + self._safe_relpath(relpath)
78
def append_file(self, relpath, f, mode=None):
79
return self._call('append_file', relpath, f, mode)
81
def clone(self, relpath):
82
return ChrootTransport(self.server, self.abspath(relpath))
84
def delete(self, relpath):
85
return self._call('delete', relpath)
87
def delete_tree(self, relpath):
88
return self._call('delete_tree', relpath)
90
def get(self, relpath):
91
return self._call('get', relpath)
93
def has(self, relpath):
94
return self._call('has', relpath)
96
def iter_files_recursive(self):
97
backing_transport = self.server.backing_transport.clone(
98
self._safe_relpath('.'))
99
return backing_transport.iter_files_recursive()
102
return self.server.backing_transport.listable()
104
def list_dir(self, relpath):
105
return self._call('list_dir', relpath)
107
def lock_read(self, relpath):
108
return self._call('lock_read', relpath)
110
def lock_write(self, relpath):
111
return self._call('lock_write', relpath)
113
def mkdir(self, relpath, mode=None):
114
return self._call('mkdir', relpath, mode)
116
def put_file(self, relpath, f, mode=None):
117
return self._call('put_file', relpath, f, mode)
119
def rename(self, rel_from, rel_to):
120
return self._call('rename', rel_from, self._safe_relpath(rel_to))
122
def rmdir(self, relpath):
123
return self._call('rmdir', relpath)
125
def stat(self, relpath):
126
return self._call('stat', relpath)
129
class TestingChrootServer(ChrootServer):
132
ChrootServer.__init__(self, get_transport('.'))
135
def get_test_permutations():
136
"""Return the permutations to be used in testing."""
137
return [(ChrootTransport, TestingChrootServer),