1
from email import Message
2
from StringIO import StringIO
9
revision as _mod_revision,
14
from bzrlib.bundle import serializer as bundle_serializer
17
class MergeDirective(object):
19
_format_string = 'Bazaar merge directive format experimental-1'
21
def __init__(self, revision_id, testament_sha1, time, timezone,
22
target_branch, patch=None, patch_type=None,
23
source_branch=None, message=None):
24
assert patch_type in (None, 'diff', 'bundle')
25
if patch_type != 'bundle' and source_branch is None:
26
raise errors.NoMergeSource()
27
if patch_type is not None and patch is None:
28
raise errors.PatchMissing(patch_type)
29
self.revision_id = revision_id
30
self.testament_sha1 = testament_sha1
32
self.timezone = timezone
33
self.target_branch = target_branch
35
self.patch_type = patch_type
36
self.source_branch = source_branch
37
self.message = message
40
def from_lines(klass, lines):
41
assert lines[0].startswith('# ' + klass._format_string + '\n')
42
line_iter = iter(lines[1:])
43
stanza = rio.read_patch_stanza(line_iter)
44
patch_lines = list(line_iter)
45
if len(patch_lines) == 0:
48
patch = ''.join(patch_lines)
50
bundle_serializer.read_bundle(StringIO(patch))
51
except errors.NotABundle:
55
time, timezone = timestamp.parse_patch_date(stanza.get('timestamp'))
57
for key in ('revision_id', 'testament_sha1', 'target_branch',
58
'source_branch', 'message'):
60
kwargs[key] = stanza.get(key)
63
return MergeDirective(time=time, timezone=timezone,
64
patch_type=patch_type, patch=patch, **kwargs)
67
time_str = timestamp.format_patch_date(self.time, self.timezone)
68
stanza = rio.Stanza(revision_id=self.revision_id, timestamp=time_str,
69
target_branch=self.target_branch,
70
testament_sha1=self.testament_sha1)
71
for key in ('source_branch', 'message'):
72
if self.__dict__[key] is not None:
73
stanza.add(key, self.__dict__[key])
74
lines = ['# ' + self._format_string + '\n']
75
lines.extend(rio.to_patch_lines(stanza))
77
if self.patch is not None:
78
lines.extend(self.patch.splitlines(True))
81
def to_signed(self, branch):
82
my_gpg = gpg.GPGStrategy(branch.get_config())
83
return my_gpg.sign(''.join(self.to_lines()))
85
def to_email(self, mail_to, branch, sign=False):
86
mail_from = branch.get_config().username()
87
message = Message.Message()
88
message['To'] = mail_to
89
message['From'] = mail_from
90
if self.message is not None:
91
message['Subject'] = self.message
93
revision = branch.repository.get_revision(self.revision_id)
94
message['Subject'] = revision.message
96
body = self.to_signed(branch)
98
body = ''.join(self.to_lines())
99
message.set_payload(body)
103
def from_objects(klass, repository, revision_id, time, timezone,
104
target_branch, patch_type='bundle',
105
local_target_branch=None, public_branch=None, message=None):
106
t = testament.StrictTestament3.from_revision(repository, revision_id)
107
if patch_type is None:
110
submit_branch = _mod_branch.Branch.open(target_branch)
111
submit_revision_id = submit_branch.last_revision()
112
repository.fetch(submit_branch.repository, submit_revision_id)
113
ancestor_id = _mod_revision.common_ancestor(revision_id,
116
if patch_type == 'bundle':
118
bundle_serializer.write_bundle(repository, revision_id,
121
elif patch_type == 'diff':
122
patch = klass._generate_diff(repository, revision_id,
125
if public_branch is not None and patch_type != 'bundle':
126
public_branch_obj = _mod_branch.Branch.open(public_branch)
127
if not public_branch_obj.repository.has_revision(revision_id):
128
raise errors.PublicBranchOutOfDate(public_branch,
131
return MergeDirective(revision_id, t.as_sha1(), time, timezone,
132
target_branch, patch, patch_type, public_branch,
136
def _generate_diff(repository, revision_id, ancestor_id):
137
tree_1 = repository.revision_tree(ancestor_id)
138
tree_2 = repository.revision_tree(revision_id)
140
diff.show_diff_trees(tree_1, tree_2, s)