52
52
class MemoryStat(object):
54
def __init__(self, size, kind, perms):
54
def __init__(self, size, kind, perms=None):
55
55
self.st_size = size
59
self.st_mode = S_IFREG | perms
60
elif kind == 'directory':
59
self.st_mode = kind | perms
63
self.st_mode = S_IFDIR | perms
64
elif kind == 'symlink':
65
self.st_mode = S_IFLNK | 0o644
67
raise AssertionError('unknown kind %r' % kind)
63
self.st_mode = kind | perms
70
66
class MemoryTransport(transport.Transport):
113
109
def append_file(self, relpath, f, mode=None):
114
110
"""See Transport.append_file()."""
115
_abspath = self._abspath(relpath)
111
_abspath = self._resolve_symlinks(relpath)
116
112
self._check_parent(_abspath)
117
113
orig_content, orig_mode = self._files.get(_abspath, (b"", None))
123
119
def _check_parent(self, _abspath):
124
120
dir = os.path.dirname(_abspath)
126
if not dir in self._dirs:
122
if dir not in self._dirs:
127
123
raise NoSuchFile(_abspath)
129
125
def has(self, relpath):
130
126
"""See Transport.has()."""
131
127
_abspath = self._abspath(relpath)
132
return ((_abspath in self._files) or
133
(_abspath in self._dirs) or
134
(_abspath in self._symlinks))
128
for container in (self._files, self._dirs, self._symlinks):
129
if _abspath in container.keys():
136
133
def delete(self, relpath):
137
134
"""See Transport.delete()."""
138
135
_abspath = self._abspath(relpath)
139
if not _abspath in self._files:
136
if _abspath in self._files:
137
del self._files[_abspath]
138
elif _abspath in self._symlinks:
139
del self._symlinks[_abspath]
140
141
raise NoSuchFile(relpath)
141
del self._files[_abspath]
143
143
def external_url(self):
144
144
"""See breezy.transport.Transport.external_url."""
159
159
def put_file(self, relpath, f, mode=None):
160
160
"""See Transport.put_file()."""
161
_abspath = self._abspath(relpath)
161
_abspath = self._resolve_symlinks(relpath)
162
162
self._check_parent(_abspath)
163
163
raw_bytes = f.read()
164
164
self._files[_abspath] = (raw_bytes, mode)
165
165
return len(raw_bytes)
167
def symlink(self, source, target):
168
_abspath = self._resolve_symlinks(target)
169
self._check_parent(_abspath)
170
self._symlinks[_abspath] = self._abspath(source)
167
172
def mkdir(self, relpath, mode=None):
168
173
"""See Transport.mkdir()."""
169
_abspath = self._abspath(relpath)
174
_abspath = self._resolve_symlinks(relpath)
170
175
self._check_parent(_abspath)
171
176
if _abspath in self._dirs:
172
177
raise FileExists(relpath)
173
self._dirs[_abspath]=mode
178
self._dirs[_abspath] = mode
175
180
def open_write_stream(self, relpath, mode=None):
176
181
"""See Transport.open_write_stream."""
186
191
def iter_files_recursive(self):
187
for file in self._files:
192
for file in itertools.chain(self._files, self._symlinks):
188
193
if file.startswith(self._cwd):
189
194
yield urlutils.escape(file[len(self._cwd):])
191
196
def list_dir(self, relpath):
192
197
"""See Transport.list_dir()."""
193
_abspath = self._abspath(relpath)
198
_abspath = self._resolve_symlinks(relpath)
194
199
if _abspath != '/' and _abspath not in self._dirs:
195
200
raise NoSuchFile(relpath)
209
214
def rename(self, rel_from, rel_to):
210
215
"""Rename a file or directory; fail if the destination exists"""
211
abs_from = self._abspath(rel_from)
212
abs_to = self._abspath(rel_to)
216
abs_from = self._resolve_symlinks(rel_from)
217
abs_to = self._resolve_symlinks(rel_to)
215
220
if x == abs_from:
234
239
# fail differently depending on dict order. So work on copy, fail on
235
240
# error on only replace dicts if all goes well.
236
241
renamed_files = self._files.copy()
242
renamed_symlinks = self._symlinks.copy()
237
243
renamed_dirs = self._dirs.copy()
238
244
do_renames(renamed_files)
245
do_renames(renamed_symlinks)
239
246
do_renames(renamed_dirs)
240
247
# We may have been cloned so modify in place
241
248
self._files.clear()
242
249
self._files.update(renamed_files)
250
self._symlinks.clear()
251
self._symlinks.update(renamed_symlinks)
243
252
self._dirs.clear()
244
253
self._dirs.update(renamed_dirs)
246
255
def rmdir(self, relpath):
247
256
"""See Transport.rmdir."""
248
_abspath = self._abspath(relpath)
257
_abspath = self._resolve_symlinks(relpath)
249
258
if _abspath in self._files:
250
259
self._translate_error(IOError(errno.ENOTDIR, relpath), relpath)
251
for path in self._files:
260
for path in itertools.chain(self._files, self._symlinks):
252
261
if path.startswith(_abspath + '/'):
253
262
self._translate_error(IOError(errno.ENOTEMPTY, relpath),
255
264
for path in self._dirs:
256
265
if path.startswith(_abspath + '/') and path != _abspath:
257
self._translate_error(IOError(errno.ENOTEMPTY, relpath), relpath)
258
if not _abspath in self._dirs:
266
self._translate_error(
267
IOError(errno.ENOTEMPTY, relpath), relpath)
268
if _abspath not in self._dirs:
259
269
raise NoSuchFile(relpath)
260
270
del self._dirs[_abspath]
262
272
def stat(self, relpath):
263
273
"""See Transport.stat()."""
264
274
_abspath = self._abspath(relpath)
265
if _abspath in self._files:
266
return MemoryStat(len(self._files[_abspath][0]), 'file',
275
if _abspath in self._files.keys():
276
return MemoryStat(len(self._files[_abspath][0]), S_IFREG,
267
277
self._files[_abspath][1])
268
elif _abspath in self._dirs:
269
return MemoryStat(0, 'directory', self._dirs[_abspath])
270
elif _abspath in self._symlinks:
271
return MemoryStat(0, 'symlink', 0)
278
elif _abspath in self._dirs.keys():
279
return MemoryStat(0, S_IFDIR, self._dirs[_abspath])
280
elif _abspath in self._symlinks.keys():
281
return MemoryStat(0, S_IFLNK)
273
283
raise NoSuchFile(_abspath)
280
290
"""See Transport.lock_write()."""
281
291
return _MemoryLock(self._abspath(relpath), self)
293
def _resolve_symlinks(self, relpath):
294
path = self._abspath(relpath)
295
while path in self._symlinks.keys():
296
path = self._symlinks[path]
283
299
def _abspath(self, relpath):
284
300
"""Generate an internal absolute path."""
285
301
relpath = urlutils.unescape(relpath)
334
350
"""Server for the MemoryTransport for testing with."""
336
352
def start_server(self):
337
self._dirs = {'/':None}
353
self._dirs = {'/': None}
340
357
self._scheme = "memory+%s:///" % id(self)
341
359
def memory_factory(url):
342
360
from . import memory
343
361
result = memory.MemoryTransport(url)
344
362
result._dirs = self._dirs
345
363
result._files = self._files
364
result._symlinks = self._symlinks
346
365
result._locks = self._locks
348
367
self._memory_factory = memory_factory