1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Import processor that supports all Bazaar repository formats."""
28
from bzrlib.repofmt import pack_repo
29
from bzrlib.trace import note, mutter
30
import bzrlib.util.configobj.configobj as configobj
31
from bzrlib.plugins.fastimport import (
36
errors as plugin_errors,
45
# How many commits before automatically reporting progress
46
_DEFAULT_AUTO_PROGRESS = 1000
48
# How many commits before automatically checkpointing
49
_DEFAULT_AUTO_CHECKPOINT = 10000
51
# How many checkpoints before automatically packing
52
_DEFAULT_AUTO_PACK = 4
54
# How many inventories to cache
55
_DEFAULT_INV_CACHE_SIZE = 10
56
_DEFAULT_CHK_INV_CACHE_SIZE = 100
59
class GenericProcessor(processor.ImportProcessor):
60
"""An import processor that handles basic imports.
62
Current features supported:
64
* blobs are cached in memory
65
* files and symlinks commits are supported
66
* checkpoints automatically happen at a configurable frequency
67
over and above the stream requested checkpoints
68
* timestamped progress reporting, both automatic and stream requested
69
* some basic statistics are dumped on completion.
71
At checkpoints and on completion, the commit-id -> revision-id map is
72
saved to a file called 'fastimport-id-map'. If the import crashes
73
or is interrupted, it can be started again and this file will be
74
used to skip over already loaded revisions. The format of each line
75
is "commit-id revision-id" so commit-ids cannot include spaces.
77
Here are the supported parameters:
79
* info - name of a hints file holding the analysis generated
80
by running the fast-import-info processor in verbose mode. When
81
importing large repositories, this parameter is needed so
82
that the importer knows what blobs to intelligently cache.
84
* trees - update the working trees before completing.
85
By default, the importer updates the repository
86
and branches and the user needs to run 'bzr update' for the
87
branches of interest afterwards.
89
* count - only import this many commits then exit. If not set
90
or negative, all commits are imported.
92
* checkpoint - automatically checkpoint every n commits over and
93
above any checkpoints contained in the import stream.
96
* autopack - pack every n checkpoints. The default is 4.
98
* inv-cache - number of inventories to cache.
99
If not set, the default is 100 for CHK formats and 10 otherwise.
101
* mode - import algorithm to use: default, experimental or classic.
103
* import-marks - name of file to read to load mark information from
105
* export-marks - name of file to write to save mark information to
120
def __init__(self, bzrdir, params=None, verbose=False, outf=None,
121
prune_empty_dirs=True):
122
processor.ImportProcessor.__init__(self, bzrdir, params, verbose)
123
self.prune_empty_dirs = prune_empty_dirs
125
def pre_process(self):
126
self._start_time = time.time()
127
self._load_info_and_params()
128
if self.total_commits:
129
self.note("Starting import of %d commits ..." %
130
(self.total_commits,))
132
self.note("Starting import ...")
133
self.cache_mgr = cache_manager.CacheManager(self.info, self.verbose,
134
self.inventory_cache_size)
136
if self.params.get("import-marks") is not None:
137
mark_info = marks_file.import_marks(self.params.get("import-marks"))
138
if mark_info is not None:
139
self.cache_mgr.revision_ids = mark_info[0]
140
self.skip_total = False
141
self.first_incremental_commit = True
143
self.first_incremental_commit = False
144
self.skip_total = self._init_id_map()
146
self.note("Found %d commits already loaded - "
147
"skipping over these ...", self.skip_total)
148
self._revision_count = 0
150
# mapping of tag name to revision_id
153
# Create the revision store to use for committing, if any
154
self.rev_store = self._revision_store_factory()
156
# Disable autopacking if the repo format supports it.
157
# THIS IS A HACK - there is no sanctioned way of doing this yet.
158
if isinstance(self.repo, pack_repo.KnitPackRepository):
159
self._original_max_pack_count = \
160
self.repo._pack_collection._max_pack_count
161
def _max_pack_count_for_import(total_revisions):
162
return total_revisions + 1
163
self.repo._pack_collection._max_pack_count = \
164
_max_pack_count_for_import
166
self._original_max_pack_count = None
168
# Make groupcompress use the fast algorithm during importing.
169
# We want to repack at the end anyhow when more information
170
# is available to do a better job of saving space.
172
from bzrlib import groupcompress
173
groupcompress._FAST = True
177
# Create a write group. This is committed at the end of the import.
178
# Checkpointing closes the current one and starts a new one.
179
self.repo.start_write_group()
181
def _load_info_and_params(self):
182
self._mode = bool(self.params.get('mode', 'default'))
183
self._experimental = self._mode == 'experimental'
185
# This is currently hard-coded but might be configurable via
186
# parameters one day if that's needed
187
repo_transport = self.repo.control_files._transport
188
self.id_map_path = repo_transport.local_abspath("fastimport-id-map")
190
# Load the info file, if any
191
info_path = self.params.get('info')
192
if info_path is not None:
193
self.info = configobj.ConfigObj(info_path)
197
# Decide which CommitHandler to use
198
self.supports_chk = getattr(self.repo._format, 'supports_chks', False)
199
if self.supports_chk and self._mode == 'classic':
200
note("Cannot use classic algorithm on CHK repositories"
201
" - using default one instead")
202
self._mode = 'default'
203
if self._mode == 'classic':
204
self.commit_handler_factory = \
205
bzr_commit_handler.InventoryCommitHandler
207
self.commit_handler_factory = \
208
bzr_commit_handler.InventoryDeltaCommitHandler
210
# Decide how often to automatically report progress
211
# (not a parameter yet)
212
self.progress_every = _DEFAULT_AUTO_PROGRESS
214
self.progress_every = self.progress_every / 10
216
# Decide how often (# of commits) to automatically checkpoint
217
self.checkpoint_every = int(self.params.get('checkpoint',
218
_DEFAULT_AUTO_CHECKPOINT))
220
# Decide how often (# of checkpoints) to automatically pack
221
self.checkpoint_count = 0
222
self.autopack_every = int(self.params.get('autopack',
225
# Decide how big to make the inventory cache
226
cache_size = int(self.params.get('inv-cache', -1))
228
if self.supports_chk:
229
cache_size = _DEFAULT_CHK_INV_CACHE_SIZE
231
cache_size = _DEFAULT_INV_CACHE_SIZE
232
self.inventory_cache_size = cache_size
234
# Find the maximum number of commits to import (None means all)
235
# and prepare progress reporting. Just in case the info file
236
# has an outdated count of commits, we store the max counts
237
# at which we need to terminate separately to the total used
238
# for progress tracking.
240
self.max_commits = int(self.params['count'])
241
if self.max_commits < 0:
242
self.max_commits = None
244
self.max_commits = None
245
if self.info is not None:
246
self.total_commits = int(self.info['Command counts']['commit'])
247
if (self.max_commits is not None and
248
self.total_commits > self.max_commits):
249
self.total_commits = self.max_commits
251
self.total_commits = self.max_commits
253
def _revision_store_factory(self):
254
"""Make a RevisionStore based on what the repository supports."""
255
new_repo_api = hasattr(self.repo, 'revisions')
257
return revision_store.RevisionStore2(self.repo)
258
elif not self._experimental:
259
return revision_store.RevisionStore1(self.repo)
261
def fulltext_when(count):
262
total = self.total_commits
263
if total is not None and count == total:
266
# Create an inventory fulltext every 200 revisions
267
fulltext = count % 200 == 0
269
self.note("%d commits - storing inventory as full-text",
273
return revision_store.ImportRevisionStore1(
274
self.repo, self.inventory_cache_size,
275
fulltext_when=fulltext_when)
277
def _process(self, command_iter):
278
# if anything goes wrong, abort the write group if any
280
processor.ImportProcessor._process(self, command_iter)
282
if self.repo is not None and self.repo.is_in_write_group():
283
self.repo.abort_write_group()
286
def post_process(self):
287
# Commit the current write group and checkpoint the id map
288
self.repo.commit_write_group()
291
if self.params.get("export-marks") is not None:
292
marks_file.export_marks(self.params.get("export-marks"),
293
self.cache_mgr.revision_ids)
295
if self.cache_mgr.last_ref == None:
296
"""Nothing to refresh"""
299
# Update the branches
300
self.note("Updating branch information ...")
301
updater = branch_updater.BranchUpdater(self.repo, self.branch,
302
self.cache_mgr, helpers.invert_dictset(self.cache_mgr.heads),
303
self.cache_mgr.last_ref, self.tags)
304
branches_updated, branches_lost = updater.update()
305
self._branch_count = len(branches_updated)
307
# Tell the user about branches that were not created
309
if not self.repo.is_shared():
310
self.warning("Cannot import multiple branches into "
311
"a standalone branch")
312
self.warning("Not creating branches for these head revisions:")
313
for lost_info in branches_lost:
314
head_revision = lost_info[1]
315
branch_name = lost_info[0]
316
self.note("\t %s = %s", head_revision, branch_name)
318
# Update the working trees as requested
320
remind_about_update = True
321
if self._branch_count == 0:
322
self.note("no branches to update")
323
self.note("no working trees to update")
324
remind_about_update = False
325
elif self.params.get('trees', False):
326
trees = self._get_working_trees(branches_updated)
328
self._update_working_trees(trees)
329
remind_about_update = False
331
self.warning("No working trees available to update")
333
# Update just the trunk. (This is always the first branch
334
# returned by the branch updater.)
335
trunk_branch = branches_updated[0]
336
trees = self._get_working_trees([trunk_branch])
338
self._update_working_trees(trees)
339
remind_about_update = self._branch_count > 1
341
# Dump the cache stats now because we clear it before the final pack
343
self.cache_mgr.dump_stats()
344
if self._original_max_pack_count:
345
# We earlier disabled autopacking, creating one pack every
346
# checkpoint instead. We now pack the repository to optimise
347
# how data is stored.
348
self.cache_mgr.clear_all()
349
self._pack_repository()
351
# Finish up by dumping stats & telling the user what to do next.
353
if remind_about_update:
354
# This message is explicitly not timestamped.
355
note("To refresh the working tree for other branches, "
356
"use 'bzr update' inside that branch.")
358
def _update_working_trees(self, trees):
360
reporter = delta._ChangeReporter()
364
self.note("Updating the working tree for %s ...", wt.basedir)
366
self._tree_count += 1
368
def _pack_repository(self, final=True):
369
# Before packing, free whatever memory we can and ensure
370
# that groupcompress is configured to optimise disk space
374
from bzrlib import groupcompress
378
groupcompress._FAST = False
380
self.note("Packing repository ...")
383
# To be conservative, packing puts the old packs and
384
# indices in obsolete_packs. We err on the side of
385
# optimism and clear out that directory to save space.
386
self.note("Removing obsolete packs ...")
387
# TODO: Use a public API for this once one exists
388
repo_transport = self.repo._pack_collection.transport
389
repo_transport.clone('obsolete_packs').delete_multi(
390
repo_transport.list_dir('obsolete_packs'))
392
# If we're not done, free whatever memory we can
396
def _get_working_trees(self, branches):
397
"""Get the working trees for branches in the repository."""
399
wt_expected = self.repo.make_working_trees()
403
elif br == self.branch:
404
if self.working_tree:
405
result.append(self.working_tree)
408
result.append(br.bzrdir.open_workingtree())
409
except errors.NoWorkingTree:
410
self.warning("No working tree for branch %s", br)
413
def dump_stats(self):
414
time_required = progress.str_tdelta(time.time() - self._start_time)
415
rc = self._revision_count - self.skip_total
416
bc = self._branch_count
417
wtc = self._tree_count
418
self.note("Imported %d %s, updating %d %s and %d %s in %s",
419
rc, helpers.single_plural(rc, "revision", "revisions"),
420
bc, helpers.single_plural(bc, "branch", "branches"),
421
wtc, helpers.single_plural(wtc, "tree", "trees"),
424
def _init_id_map(self):
425
"""Load the id-map and check it matches the repository.
427
:return: the number of entries in the map
429
# Currently, we just check the size. In the future, we might
430
# decide to be more paranoid and check that the revision-ids
431
# are identical as well.
432
self.cache_mgr.revision_ids, known = idmapfile.load_id_map(
434
existing_count = len(self.repo.all_revision_ids())
435
if existing_count < known:
436
raise plugin_errors.BadRepositorySize(known, existing_count)
439
def _save_id_map(self):
440
"""Save the id-map."""
441
# Save the whole lot every time. If this proves a problem, we can
442
# change to 'append just the new ones' at a later time.
443
idmapfile.save_id_map(self.id_map_path, self.cache_mgr.revision_ids)
445
def blob_handler(self, cmd):
446
"""Process a BlobCommand."""
447
if cmd.mark is not None:
450
dataref = osutils.sha_strings(cmd.data)
451
self.cache_mgr.store_blob(dataref, cmd.data)
453
def checkpoint_handler(self, cmd):
454
"""Process a CheckpointCommand."""
455
# Commit the current write group and start a new one
456
self.repo.commit_write_group()
458
# track the number of automatic checkpoints done
460
self.checkpoint_count += 1
461
if self.checkpoint_count % self.autopack_every == 0:
462
self._pack_repository(final=False)
463
self.repo.start_write_group()
465
def commit_handler(self, cmd):
466
"""Process a CommitCommand."""
467
if self.skip_total and self._revision_count < self.skip_total:
468
self.cache_mgr.track_heads(cmd)
469
# Check that we really do know about this commit-id
470
if not self.cache_mgr.revision_ids.has_key(cmd.id):
471
raise plugin_errors.BadRestart(cmd.id)
472
# Consume the file commands and free any non-sticky blobs
473
for fc in cmd.file_iter():
475
self.cache_mgr._blobs = {}
476
self._revision_count += 1
478
if self.first_incremental_commit:
479
self.first_incremental_commit = None
480
parents = self.cache_mgr.track_heads(cmd)
482
# 'Commit' the revision and report progress
483
handler = self.commit_handler_factory(cmd, self.cache_mgr,
484
self.rev_store, verbose=self.verbose,
485
prune_empty_dirs=self.prune_empty_dirs)
489
print "ABORT: exception occurred processing commit %s" % (cmd.id)
491
self.cache_mgr.revision_ids[cmd.id] = handler.revision_id
492
self._revision_count += 1
493
self.report_progress("(%s)" % cmd.id)
495
# Check if we should finish up or automatically checkpoint
496
if (self.max_commits is not None and
497
self._revision_count >= self.max_commits):
498
self.note("Stopping after reaching requested count of commits")
500
elif self._revision_count % self.checkpoint_every == 0:
501
self.note("%d commits - automatic checkpoint triggered",
502
self._revision_count)
503
self.checkpoint_handler(None)
505
def report_progress(self, details=''):
506
if self._revision_count % self.progress_every == 0:
507
if self.total_commits is not None:
508
counts = "%d/%d" % (self._revision_count, self.total_commits)
510
counts = "%d" % (self._revision_count,)
511
minutes = (time.time() - self._start_time) / 60
512
revisions_added = self._revision_count - self.skip_total
513
rate = revisions_added * 1.0 / minutes
515
rate_str = "at %.0f/minute " % rate
517
rate_str = "at %.1f/minute " % rate
518
self.note("%s commits processed %s%s" % (counts, rate_str, details))
520
def progress_handler(self, cmd):
521
"""Process a ProgressCommand."""
522
# We could use a progress bar here instead
523
self.note("progress %s" % (cmd.message,))
525
def reset_handler(self, cmd):
526
"""Process a ResetCommand."""
527
if cmd.ref.startswith('refs/tags/'):
528
tag_name = cmd.ref[len('refs/tags/'):]
529
if cmd.from_ is not None:
530
self._set_tag(tag_name, cmd.from_)
532
self.warning("ignoring reset refs/tags/%s - no from clause"
536
if cmd.from_ is not None:
537
self.cache_mgr.track_heads_for_ref(cmd.ref, cmd.from_)
539
def tag_handler(self, cmd):
540
"""Process a TagCommand."""
541
if cmd.from_ is not None:
542
self._set_tag(cmd.id, cmd.from_)
544
self.warning("ignoring tag %s - no from clause" % cmd.id)
546
def _set_tag(self, name, from_):
547
"""Define a tag given a name and import 'from' reference."""
548
bzr_tag_name = name.decode('utf-8', 'replace')
549
bzr_rev_id = self.cache_mgr.revision_ids[from_]
550
self.tags[bzr_tag_name] = bzr_rev_id
552
def feature_handler(self, cmd):
553
"""Process a FeatureCommand."""
554
feature = cmd.feature_name
555
if feature not in commands.FEATURE_NAMES:
556
raise plugin_errors.UnknownFeature(feature)