1
# Copyright (C) 2005-2010 Canonical Ltd
1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
so this is primarily useful for testing.
23
from __future__ import absolute_import
26
from stat import S_IFREG, S_IFDIR
27
from cStringIO import StringIO
31
from stat import S_IFREG, S_IFDIR, S_IFLNK
34
from bzrlib.errors import (
37
from ..errors import (
37
40
InProcessTransport,
41
from bzrlib.trace import mutter
42
from bzrlib.transport import (
44
from ..transport import (
43
45
AppendBasedFileStream,
50
51
class MemoryStat(object):
52
def __init__(self, size, is_dir, perms):
53
def __init__(self, size, kind, perms):
53
54
self.st_size = size
57
58
self.st_mode = S_IFREG | perms
59
elif kind == 'directory':
61
62
self.st_mode = S_IFDIR | perms
63
elif kind == 'symlink':
64
self.st_mode = S_IFLNK | 0o644
66
raise AssertionError('unknown kind %r' % kind)
64
69
class MemoryTransport(transport.Transport):
75
80
self._scheme = url[:split]
76
81
self._cwd = url[split:]
77
82
# dictionaries from absolute path to file mode
78
self._dirs = {'/':None}
83
self._dirs = {'/': None}
82
88
def clone(self, offset=None):
83
89
"""See Transport.clone()."""
84
path = self._combine_paths(self._cwd, offset)
90
path = urlutils.URL._combine_paths(self._cwd, offset)
85
91
if len(path) == 0 or path[-1] != '/':
87
93
url = self._scheme + path
88
94
result = self.__class__(url)
89
95
result._dirs = self._dirs
96
result._symlinks = self._symlinks
90
97
result._files = self._files
91
98
result._locks = self._locks
106
113
"""See Transport.append_file()."""
107
114
_abspath = self._abspath(relpath)
108
115
self._check_parent(_abspath)
109
orig_content, orig_mode = self._files.get(_abspath, ("", None))
116
orig_content, orig_mode = self._files.get(_abspath, (b"", None))
112
119
self._files[_abspath] = (orig_content + f.read(), mode)
115
122
def _check_parent(self, _abspath):
116
123
dir = os.path.dirname(_abspath)
118
if not dir in self._dirs:
125
if dir not in self._dirs:
119
126
raise NoSuchFile(_abspath)
121
128
def has(self, relpath):
122
129
"""See Transport.has()."""
123
130
_abspath = self._abspath(relpath)
124
return (_abspath in self._files) or (_abspath in self._dirs)
131
return ((_abspath in self._files)
132
or (_abspath in self._dirs)
133
or (_abspath in self._symlinks))
126
135
def delete(self, relpath):
127
136
"""See Transport.delete()."""
128
137
_abspath = self._abspath(relpath)
129
if not _abspath in self._files:
138
if _abspath not in self._files:
130
139
raise NoSuchFile(relpath)
131
140
del self._files[_abspath]
133
142
def external_url(self):
134
"""See bzrlib.transport.Transport.external_url."""
143
"""See breezy.transport.Transport.external_url."""
135
144
# MemoryTransport's are only accessible in-process
136
145
# so we raise here
137
146
raise InProcessTransport(self)
139
148
def get(self, relpath):
140
149
"""See Transport.get()."""
141
150
_abspath = self._abspath(relpath)
142
if not _abspath in self._files:
151
if _abspath not in self._files:
143
152
if _abspath in self._dirs:
144
153
return LateReadError(relpath)
146
155
raise NoSuchFile(relpath)
147
return StringIO(self._files[_abspath][0])
156
return BytesIO(self._files[_abspath][0])
149
158
def put_file(self, relpath, f, mode=None):
150
159
"""See Transport.put_file()."""
151
160
_abspath = self._abspath(relpath)
152
161
self._check_parent(_abspath)
154
if type(bytes) is not str:
155
# Although not strictly correct, we raise UnicodeEncodeError to be
156
# compatible with other transports.
157
raise UnicodeEncodeError(
158
'undefined', bytes, 0, 1,
159
'put_file must be given a file of bytes, not unicode.')
160
self._files[_abspath] = (bytes, mode)
163
self._files[_abspath] = (raw_bytes, mode)
164
return len(raw_bytes)
163
166
def mkdir(self, relpath, mode=None):
164
167
"""See Transport.mkdir()."""
166
169
self._check_parent(_abspath)
167
170
if _abspath in self._dirs:
168
171
raise FileExists(relpath)
169
self._dirs[_abspath]=mode
172
self._dirs[_abspath] = mode
171
174
def open_write_stream(self, relpath, mode=None):
172
175
"""See Transport.open_write_stream."""
173
self.put_bytes(relpath, "", mode)
176
self.put_bytes(relpath, b"", mode)
174
177
result = AppendBasedFileStream(self, relpath)
175
178
_file_streams[self.abspath(relpath)] = result
199
202
if path.startswith(_abspath):
200
203
trailing = path[len(_abspath):]
201
204
if trailing and '/' not in trailing:
202
result.append(trailing)
203
return map(urlutils.escape, result)
205
result.append(urlutils.escape(trailing))
205
208
def rename(self, rel_from, rel_to):
206
209
"""Rename a file or directory; fail if the destination exists"""
207
210
abs_from = self._abspath(rel_from)
208
211
abs_to = self._abspath(rel_to)
210
214
if x == abs_from:
212
216
elif x.startswith(abs_from + '/'):
213
217
x = abs_to + x[len(abs_from):]
215
220
def do_renames(container):
216
222
for path in container:
217
223
new_path = replace(path)
218
224
if new_path != path:
219
225
if new_path in container:
220
226
raise FileExists(new_path)
221
container[new_path] = container[path]
223
do_renames(self._files)
224
do_renames(self._dirs)
227
renames.append((path, new_path))
228
for path, new_path in renames:
229
container[new_path] = container[path]
232
# If we modify the existing dicts, we're not atomic anymore and may
233
# fail differently depending on dict order. So work on copy, fail on
234
# error on only replace dicts if all goes well.
235
renamed_files = self._files.copy()
236
renamed_dirs = self._dirs.copy()
237
do_renames(renamed_files)
238
do_renames(renamed_dirs)
239
# We may have been cloned so modify in place
241
self._files.update(renamed_files)
243
self._dirs.update(renamed_dirs)
226
245
def rmdir(self, relpath):
227
246
"""See Transport.rmdir."""
235
254
for path in self._dirs:
236
255
if path.startswith(_abspath + '/') and path != _abspath:
237
self._translate_error(IOError(errno.ENOTEMPTY, relpath), relpath)
238
if not _abspath in self._dirs:
256
self._translate_error(
257
IOError(errno.ENOTEMPTY, relpath), relpath)
258
if _abspath not in self._dirs:
239
259
raise NoSuchFile(relpath)
240
260
del self._dirs[_abspath]
243
263
"""See Transport.stat()."""
244
264
_abspath = self._abspath(relpath)
245
265
if _abspath in self._files:
246
return MemoryStat(len(self._files[_abspath][0]), False,
266
return MemoryStat(len(self._files[_abspath][0]), 'file',
247
267
self._files[_abspath][1])
248
268
elif _abspath in self._dirs:
249
return MemoryStat(0, True, self._dirs[_abspath])
269
return MemoryStat(0, 'directory', self._dirs[_abspath])
270
elif _abspath in self._symlinks:
271
return MemoryStat(0, 'symlink', 0)
251
273
raise NoSuchFile(_abspath)
272
294
raise ValueError("illegal relpath %r under %r"
273
% (relpath, self._cwd))
295
% (relpath, self._cwd))
275
297
elif i == '.' or i == '':
301
r = self._symlinks.get('/'.join(r), r)
279
302
return '/' + '/'.join(r)
304
def symlink(self, source, link_name):
305
"""Create a symlink pointing to source named link_name."""
306
_abspath = self._abspath(link_name)
307
self._check_parent(_abspath)
308
self._symlinks[_abspath] = source.split('/')
310
def readlink(self, link_name):
311
_abspath = self._abspath(link_name)
313
return '/'.join(self._symlinks[_abspath])
315
raise NoSuchFile(link_name)
282
318
class _MemoryLock(object):
283
319
"""This makes a lock."""
289
325
raise LockError('File %r already locked' % (self.path,))
290
326
self.transport._locks[self.path] = self
293
# Should this warn, or actually try to cleanup?
295
warnings.warn("MemoryLock %r not explicitly unlocked" % (self.path,))
298
328
def unlock(self):
299
329
del self.transport._locks[self.path]
300
330
self.transport = None
304
334
"""Server for the MemoryTransport for testing with."""
306
336
def start_server(self):
307
self._dirs = {'/':None}
337
self._dirs = {'/': None}
310
340
self._scheme = "memory+%s:///" % id(self)
311
342
def memory_factory(url):
312
from bzrlib.transport import memory
313
344
result = memory.MemoryTransport(url)
314
345
result._dirs = self._dirs
315
346
result._files = self._files