39
39
revision as _mod_revision,
42
from ..tree import InterTree
41
from .. import lazy_import
42
lazy_import.lazy_import(globals(),
44
from breezy.bzr.smart.request import request_handlers as smart_request_handlers
45
from breezy.bzr.smart import vfs
47
from ..sixish import (
44
51
from testtools.matchers import Equals, Mismatch, Matcher
129
136
Matcher.__init__(self)
130
137
self.entries = entries
132
def get_tree_layout(self, tree, include_file_ids):
139
def get_tree_layout(self, tree):
133
140
"""Get the (path, file_id) pairs for the current tree."""
134
141
with tree.lock_read():
135
142
for path, ie in tree.iter_entries_by_dir():
137
path += ie.kind_character()
139
yield (path, ie.file_id)
143
if ie.parent_id is None:
144
yield (u"", ie.file_id)
146
yield (path+ie.kind_character(), ie.file_id)
144
149
def _strip_unreferenced_directories(entries):
167
172
return 'HasLayout(%r)' % self.entries
169
174
def match(self, tree):
170
include_file_ids = self.entries and not isinstance(
171
self.entries[0], str)
172
actual = list(self.get_tree_layout(
173
tree, include_file_ids=include_file_ids))
175
actual = list(self.get_tree_layout(tree))
176
if self.entries and isinstance(self.entries[0], (str, text_type)):
177
actual = [path for (path, fileid) in actual]
174
178
if not tree.has_versioned_directories():
175
179
entries = list(self._strip_unreferenced_directories(self.entries))
178
182
return Equals(entries).match(actual)
181
class HasPathRelations(Matcher):
182
"""Matcher verifies that paths have a relation to those in another tree.
184
:ivar previous_tree: tree to compare to
185
:ivar previous_entries: List of expected entries, as (path, previous_path) pairs.
188
def __init__(self, previous_tree, previous_entries):
189
Matcher.__init__(self)
190
self.previous_tree = previous_tree
191
self.previous_entries = previous_entries
193
def get_path_map(self, tree):
194
"""Get the (path, previous_path) pairs for the current tree."""
195
previous_intertree = InterTree.get(self.previous_tree, tree)
196
with tree.lock_read(), self.previous_tree.lock_read():
197
for path, ie in tree.iter_entries_by_dir():
198
if tree.supports_rename_tracking():
199
previous_path = previous_intertree.find_source_path(path)
201
if self.previous_tree.is_versioned(path):
206
kind = self.previous_tree.kind(previous_path)
207
if kind == 'directory':
210
yield (u"", previous_path)
212
yield (path + ie.kind_character(), previous_path)
215
def _strip_unreferenced_directories(entries):
216
"""Strip all directories that don't (in)directly contain any files.
218
:param entries: List of path strings or (path, previous_path) tuples to process
220
directory_used = set()
222
for (path, previous_path) in entries:
223
if not path or path[-1] == "/":
225
directories.append((path, previous_path))
227
# Yield the referenced parent directories
228
for direntry in directories:
229
if osutils.is_inside(direntry[0], path):
230
directory_used.add(direntry[0])
231
for (path, previous_path) in entries:
232
if (not path.endswith("/")) or path in directory_used:
233
yield (path, previous_path)
236
return 'HasPathRelations(%r, %r)' % (self.previous_tree, self.previous_entries)
238
def match(self, tree):
239
actual = list(self.get_path_map(tree))
240
if not tree.has_versioned_directories():
241
entries = list(self._strip_unreferenced_directories(
242
self.previous_entries))
244
entries = self.previous_entries
245
if not tree.supports_rename_tracking():
247
(path, path if self.previous_tree.is_versioned(path) else None)
248
for (path, previous_path) in entries]
249
return Equals(entries).match(actual)
252
185
class RevisionHistoryMatches(Matcher):
253
186
"""A matcher that checks if a branch has a specific revision history.
269
202
branch.last_revision(), [_mod_revision.NULL_REVISION]))
270
203
history.reverse()
271
204
return Equals(self.expected).match(history)
207
class _NoVfsCallsMismatch(Mismatch):
208
"""Mismatch describing a list of HPSS calls which includes VFS requests."""
210
def __init__(self, vfs_calls):
211
self.vfs_calls = vfs_calls
214
return "no VFS calls expected, got: %s" % ",".join([
215
"%s(%s)" % (c.method,
216
", ".join([repr(a) for a in c.args])) for c in self.vfs_calls])
219
class ContainsNoVfsCalls(Matcher):
220
"""Ensure that none of the specified calls are HPSS calls."""
223
return 'ContainsNoVfsCalls()'
226
def match(cls, hpss_calls):
228
for call in hpss_calls:
230
request_method = smart_request_handlers.get(call.call.method)
232
# A method we don't know about doesn't count as a VFS method.
234
if issubclass(request_method, vfs.VfsRequest):
235
vfs_calls.append(call.call)
236
if len(vfs_calls) == 0:
238
return _NoVfsCallsMismatch(vfs_calls)