/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/transport/memory.py

  • Committer: Jelmer Vernooij
  • Date: 2018-07-08 10:56:06 UTC
  • mto: This revision was merged to the branch mainline in revision 7030.
  • Revision ID: jelmer@jelmer.uk-20180708105606-d53hkks89qq88twu
Use separate .as_bytes method rather than __bytes__.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
so this is primarily useful for testing.
21
21
"""
22
22
 
 
23
from __future__ import absolute_import
 
24
 
 
25
import contextlib
 
26
from io import (
 
27
    BytesIO,
 
28
    )
23
29
import os
24
30
import errno
25
 
import re
26
31
from stat import S_IFREG, S_IFDIR
27
 
from cStringIO import StringIO
28
 
import warnings
29
32
 
30
 
from bzrlib import (
 
33
from .. import (
31
34
    transport,
32
35
    urlutils,
33
36
    )
34
 
from bzrlib.errors import (
 
37
from ..errors import (
35
38
    FileExists,
36
39
    LockError,
37
40
    InProcessTransport,
38
41
    NoSuchFile,
39
 
    TransportError,
40
42
    )
41
 
from bzrlib.trace import mutter
42
 
from bzrlib.transport import (
 
43
from ..transport import (
43
44
    AppendBasedFileStream,
44
45
    _file_streams,
45
46
    LateReadError,
53
54
        self.st_size = size
54
55
        if not is_dir:
55
56
            if perms is None:
56
 
                perms = 0644
 
57
                perms = 0o644
57
58
            self.st_mode = S_IFREG | perms
58
59
        else:
59
60
            if perms is None:
60
 
                perms = 0755
 
61
                perms = 0o755
61
62
            self.st_mode = S_IFDIR | perms
62
63
 
63
64
 
81
82
 
82
83
    def clone(self, offset=None):
83
84
        """See Transport.clone()."""
84
 
        path = self._combine_paths(self._cwd, offset)
 
85
        path = urlutils.URL._combine_paths(self._cwd, offset)
85
86
        if len(path) == 0 or path[-1] != '/':
86
87
            path += '/'
87
88
        url = self._scheme + path
106
107
        """See Transport.append_file()."""
107
108
        _abspath = self._abspath(relpath)
108
109
        self._check_parent(_abspath)
109
 
        orig_content, orig_mode = self._files.get(_abspath, ("", None))
 
110
        orig_content, orig_mode = self._files.get(_abspath, (b"", None))
110
111
        if mode is None:
111
112
            mode = orig_mode
112
113
        self._files[_abspath] = (orig_content + f.read(), mode)
131
132
        del self._files[_abspath]
132
133
 
133
134
    def external_url(self):
134
 
        """See bzrlib.transport.Transport.external_url."""
 
135
        """See breezy.transport.Transport.external_url."""
135
136
        # MemoryTransport's are only accessible in-process
136
137
        # so we raise here
137
138
        raise InProcessTransport(self)
144
145
                return LateReadError(relpath)
145
146
            else:
146
147
                raise NoSuchFile(relpath)
147
 
        return StringIO(self._files[_abspath][0])
 
148
        return BytesIO(self._files[_abspath][0])
148
149
 
149
150
    def put_file(self, relpath, f, mode=None):
150
151
        """See Transport.put_file()."""
151
152
        _abspath = self._abspath(relpath)
152
153
        self._check_parent(_abspath)
153
 
        bytes = f.read()
154
 
        if type(bytes) is not str:
155
 
            # Although not strictly correct, we raise UnicodeEncodeError to be
156
 
            # compatible with other transports.
157
 
            raise UnicodeEncodeError(
158
 
                'undefined', bytes, 0, 1,
159
 
                'put_file must be given a file of bytes, not unicode.')
160
 
        self._files[_abspath] = (bytes, mode)
161
 
        return len(bytes)
 
154
        raw_bytes = f.read()
 
155
        self._files[_abspath] = (raw_bytes, mode)
 
156
        return len(raw_bytes)
162
157
 
163
158
    def mkdir(self, relpath, mode=None):
164
159
        """See Transport.mkdir()."""
170
165
 
171
166
    def open_write_stream(self, relpath, mode=None):
172
167
        """See Transport.open_write_stream."""
173
 
        self.put_bytes(relpath, "", mode)
 
168
        self.put_bytes(relpath, b"", mode)
174
169
        result = AppendBasedFileStream(self, relpath)
175
170
        _file_streams[self.abspath(relpath)] = result
176
171
        return result
199
194
                if path.startswith(_abspath):
200
195
                    trailing = path[len(_abspath):]
201
196
                    if trailing and '/' not in trailing:
202
 
                        result.append(trailing)
203
 
        return map(urlutils.escape, result)
 
197
                        result.append(urlutils.escape(trailing))
 
198
        return result
204
199
 
205
200
    def rename(self, rel_from, rel_to):
206
201
        """Rename a file or directory; fail if the destination exists"""
207
202
        abs_from = self._abspath(rel_from)
208
203
        abs_to = self._abspath(rel_to)
 
204
 
209
205
        def replace(x):
210
206
            if x == abs_from:
211
207
                x = abs_to
212
208
            elif x.startswith(abs_from + '/'):
213
209
                x = abs_to + x[len(abs_from):]
214
210
            return x
 
211
 
215
212
        def do_renames(container):
 
213
            renames = []
216
214
            for path in container:
217
215
                new_path = replace(path)
218
216
                if new_path != path:
219
217
                    if new_path in container:
220
218
                        raise FileExists(new_path)
221
 
                    container[new_path] = container[path]
222
 
                    del container[path]
223
 
        do_renames(self._files)
224
 
        do_renames(self._dirs)
 
219
                    renames.append((path, new_path))
 
220
            for path, new_path in renames:
 
221
                container[new_path] = container[path]
 
222
                del container[path]
 
223
 
 
224
        # If we modify the existing dicts, we're not atomic anymore and may
 
225
        # fail differently depending on dict order. So work on copy, fail on
 
226
        # error on only replace dicts if all goes well.
 
227
        renamed_files = self._files.copy()
 
228
        renamed_dirs = self._dirs.copy()
 
229
        do_renames(renamed_files)
 
230
        do_renames(renamed_dirs)
 
231
        # We may have been cloned so modify in place
 
232
        self._files.clear()
 
233
        self._files.update(renamed_files)
 
234
        self._dirs.clear()
 
235
        self._dirs.update(renamed_dirs)
225
236
 
226
237
    def rmdir(self, relpath):
227
238
        """See Transport.rmdir."""
289
300
            raise LockError('File %r already locked' % (self.path,))
290
301
        self.transport._locks[self.path] = self
291
302
 
292
 
    def __del__(self):
293
 
        # Should this warn, or actually try to cleanup?
294
 
        if self.transport:
295
 
            warnings.warn("MemoryLock %r not explicitly unlocked" % (self.path,))
296
 
            self.unlock()
297
 
 
298
303
    def unlock(self):
299
304
        del self.transport._locks[self.path]
300
305
        self.transport = None
309
314
        self._locks = {}
310
315
        self._scheme = "memory+%s:///" % id(self)
311
316
        def memory_factory(url):
312
 
            from bzrlib.transport import memory
 
317
            from . import memory
313
318
            result = memory.MemoryTransport(url)
314
319
            result._dirs = self._dirs
315
320
            result._files = self._files
323
328
        transport.unregister_transport(self._scheme, self._memory_factory)
324
329
 
325
330
    def get_url(self):
326
 
        """See bzrlib.transport.Server.get_url."""
 
331
        """See breezy.transport.Server.get_url."""
327
332
        return self._scheme
328
333
 
329
334
    def get_bogus_url(self):