1
# Copyright (C) 2005-2010 Canonical Ltd
1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
so this is primarily useful for testing.
23
from __future__ import absolute_import
26
31
from stat import S_IFREG, S_IFDIR
27
from cStringIO import StringIO
34
from bzrlib.errors import (
37
from ..errors import (
37
40
InProcessTransport,
41
from bzrlib.trace import mutter
42
from bzrlib.transport import (
43
from ..transport import (
43
44
AppendBasedFileStream,
82
83
def clone(self, offset=None):
83
84
"""See Transport.clone()."""
84
path = self._combine_paths(self._cwd, offset)
85
path = urlutils.URL._combine_paths(self._cwd, offset)
85
86
if len(path) == 0 or path[-1] != '/':
87
88
url = self._scheme + path
106
107
"""See Transport.append_file()."""
107
108
_abspath = self._abspath(relpath)
108
109
self._check_parent(_abspath)
109
orig_content, orig_mode = self._files.get(_abspath, ("", None))
110
orig_content, orig_mode = self._files.get(_abspath, (b"", None))
112
113
self._files[_abspath] = (orig_content + f.read(), mode)
131
132
del self._files[_abspath]
133
134
def external_url(self):
134
"""See bzrlib.transport.Transport.external_url."""
135
"""See breezy.transport.Transport.external_url."""
135
136
# MemoryTransport's are only accessible in-process
136
137
# so we raise here
137
138
raise InProcessTransport(self)
144
145
return LateReadError(relpath)
146
147
raise NoSuchFile(relpath)
147
return StringIO(self._files[_abspath][0])
148
return BytesIO(self._files[_abspath][0])
149
150
def put_file(self, relpath, f, mode=None):
150
151
"""See Transport.put_file()."""
151
152
_abspath = self._abspath(relpath)
152
153
self._check_parent(_abspath)
154
if type(bytes) is not str:
155
# Although not strictly correct, we raise UnicodeEncodeError to be
156
# compatible with other transports.
157
raise UnicodeEncodeError(
158
'undefined', bytes, 0, 1,
159
'put_file must be given a file of bytes, not unicode.')
160
self._files[_abspath] = (bytes, mode)
155
self._files[_abspath] = (raw_bytes, mode)
156
return len(raw_bytes)
163
158
def mkdir(self, relpath, mode=None):
164
159
"""See Transport.mkdir()."""
171
166
def open_write_stream(self, relpath, mode=None):
172
167
"""See Transport.open_write_stream."""
173
self.put_bytes(relpath, "", mode)
168
self.put_bytes(relpath, b"", mode)
174
169
result = AppendBasedFileStream(self, relpath)
175
170
_file_streams[self.abspath(relpath)] = result
199
194
if path.startswith(_abspath):
200
195
trailing = path[len(_abspath):]
201
196
if trailing and '/' not in trailing:
202
result.append(trailing)
203
return map(urlutils.escape, result)
197
result.append(urlutils.escape(trailing))
205
200
def rename(self, rel_from, rel_to):
206
201
"""Rename a file or directory; fail if the destination exists"""
207
202
abs_from = self._abspath(rel_from)
208
203
abs_to = self._abspath(rel_to)
210
206
if x == abs_from:
212
208
elif x.startswith(abs_from + '/'):
213
209
x = abs_to + x[len(abs_from):]
215
212
def do_renames(container):
216
214
for path in container:
217
215
new_path = replace(path)
218
216
if new_path != path:
219
217
if new_path in container:
220
218
raise FileExists(new_path)
221
container[new_path] = container[path]
223
do_renames(self._files)
224
do_renames(self._dirs)
219
renames.append((path, new_path))
220
for path, new_path in renames:
221
container[new_path] = container[path]
224
# If we modify the existing dicts, we're not atomic anymore and may
225
# fail differently depending on dict order. So work on copy, fail on
226
# error on only replace dicts if all goes well.
227
renamed_files = self._files.copy()
228
renamed_dirs = self._dirs.copy()
229
do_renames(renamed_files)
230
do_renames(renamed_dirs)
231
# We may have been cloned so modify in place
233
self._files.update(renamed_files)
235
self._dirs.update(renamed_dirs)
226
237
def rmdir(self, relpath):
227
238
"""See Transport.rmdir."""
289
300
raise LockError('File %r already locked' % (self.path,))
290
301
self.transport._locks[self.path] = self
293
# Should this warn, or actually try to cleanup?
295
warnings.warn("MemoryLock %r not explicitly unlocked" % (self.path,))
298
303
def unlock(self):
299
304
del self.transport._locks[self.path]
300
305
self.transport = None
310
315
self._scheme = "memory+%s:///" % id(self)
311
316
def memory_factory(url):
312
from bzrlib.transport import memory
313
318
result = memory.MemoryTransport(url)
314
319
result._dirs = self._dirs
315
320
result._files = self._files