/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to processors/generic_processor.py

fix date parsing bug found while importing samba

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Import processor that supports all Bazaar repository formats."""
 
18
 
 
19
 
 
20
import time
 
21
from bzrlib import (
 
22
    bzrdir,
 
23
    delta,
 
24
    errors,
 
25
    osutils,
 
26
    progress,
 
27
    )
 
28
from bzrlib.repofmt import pack_repo
 
29
from bzrlib.trace import note, mutter
 
30
import bzrlib.util.configobj.configobj as configobj
 
31
from bzrlib.plugins.fastimport import (
 
32
    branch_updater,
 
33
    bzr_commit_handler,
 
34
    cache_manager,
 
35
    commands,
 
36
    errors as plugin_errors,
 
37
    helpers,
 
38
    idmapfile,
 
39
    marks_file,
 
40
    processor,
 
41
    revision_store,
 
42
    )
 
43
 
 
44
 
 
45
# How many commits before automatically reporting progress
 
46
_DEFAULT_AUTO_PROGRESS = 1000
 
47
 
 
48
# How many commits before automatically checkpointing
 
49
_DEFAULT_AUTO_CHECKPOINT = 10000
 
50
 
 
51
# How many checkpoints before automatically packing
 
52
_DEFAULT_AUTO_PACK = 4
 
53
 
 
54
# How many inventories to cache
 
55
_DEFAULT_INV_CACHE_SIZE = 10
 
56
_DEFAULT_CHK_INV_CACHE_SIZE = 100
 
57
 
 
58
 
 
59
class GenericProcessor(processor.ImportProcessor):
 
60
    """An import processor that handles basic imports.
 
61
 
 
62
    Current features supported:
 
63
 
 
64
    * blobs are cached in memory
 
65
    * files and symlinks commits are supported
 
66
    * checkpoints automatically happen at a configurable frequency
 
67
      over and above the stream requested checkpoints
 
68
    * timestamped progress reporting, both automatic and stream requested
 
69
    * some basic statistics are dumped on completion.
 
70
 
 
71
    At checkpoints and on completion, the commit-id -> revision-id map is
 
72
    saved to a file called 'fastimport-id-map'. If the import crashes
 
73
    or is interrupted, it can be started again and this file will be
 
74
    used to skip over already loaded revisions. The format of each line
 
75
    is "commit-id revision-id" so commit-ids cannot include spaces.
 
76
 
 
77
    Here are the supported parameters:
 
78
 
 
79
    * info - name of a hints file holding the analysis generated
 
80
      by running the fast-import-info processor in verbose mode. When
 
81
      importing large repositories, this parameter is needed so
 
82
      that the importer knows what blobs to intelligently cache.
 
83
 
 
84
    * trees - update the working trees before completing.
 
85
      By default, the importer updates the repository
 
86
      and branches and the user needs to run 'bzr update' for the
 
87
      branches of interest afterwards.
 
88
 
 
89
    * count - only import this many commits then exit. If not set
 
90
      or negative, all commits are imported.
 
91
    
 
92
    * checkpoint - automatically checkpoint every n commits over and
 
93
      above any checkpoints contained in the import stream.
 
94
      The default is 10000.
 
95
 
 
96
    * autopack - pack every n checkpoints. The default is 4.
 
97
 
 
98
    * inv-cache - number of inventories to cache.
 
99
      If not set, the default is 100 for CHK formats and 10 otherwise.
 
100
 
 
101
    * mode - import algorithm to use: default, experimental or classic.
 
102
 
 
103
    * import-marks - name of file to read to load mark information from
 
104
 
 
105
    * export-marks - name of file to write to save mark information to
 
106
    """
 
107
 
 
108
    known_params = [
 
109
        'info',
 
110
        'trees',
 
111
        'count',
 
112
        'checkpoint',
 
113
        'autopack',
 
114
        'inv-cache',
 
115
        'mode',
 
116
        'import-marks',
 
117
        'export-marks',
 
118
        ]
 
119
 
 
120
    def __init__(self, bzrdir, params=None, verbose=False, outf=None,
 
121
            prune_empty_dirs=True):
 
122
        processor.ImportProcessor.__init__(self, bzrdir, params, verbose)
 
123
        self.prune_empty_dirs = prune_empty_dirs
 
124
 
 
125
    def pre_process(self):
 
126
        self._start_time = time.time()
 
127
        self._load_info_and_params()
 
128
        if self.total_commits:
 
129
            self.note("Starting import of %d commits ..." %
 
130
                (self.total_commits,))
 
131
        else:
 
132
            self.note("Starting import ...")
 
133
        self.cache_mgr = cache_manager.CacheManager(self.info, self.verbose,
 
134
            self.inventory_cache_size)
 
135
        
 
136
        if self.params.get("import-marks") is not None:
 
137
            mark_info = marks_file.import_marks(self.params.get("import-marks"))
 
138
            if mark_info is not None:
 
139
                self.cache_mgr.revision_ids = mark_info[0]
 
140
            self.skip_total = False
 
141
            self.first_incremental_commit = True
 
142
        else:
 
143
            self.first_incremental_commit = False
 
144
            self.skip_total = self._init_id_map()
 
145
            if self.skip_total:
 
146
                self.note("Found %d commits already loaded - "
 
147
                    "skipping over these ...", self.skip_total)
 
148
        self._revision_count = 0
 
149
 
 
150
        # mapping of tag name to revision_id
 
151
        self.tags = {}
 
152
 
 
153
        # Create the revision store to use for committing, if any
 
154
        self.rev_store = self._revision_store_factory()
 
155
 
 
156
        # Disable autopacking if the repo format supports it.
 
157
        # THIS IS A HACK - there is no sanctioned way of doing this yet.
 
158
        if isinstance(self.repo, pack_repo.KnitPackRepository):
 
159
            self._original_max_pack_count = \
 
160
                self.repo._pack_collection._max_pack_count
 
161
            def _max_pack_count_for_import(total_revisions):
 
162
                return total_revisions + 1
 
163
            self.repo._pack_collection._max_pack_count = \
 
164
                _max_pack_count_for_import
 
165
        else:
 
166
            self._original_max_pack_count = None
 
167
 
 
168
        # Make groupcompress use the fast algorithm during importing.
 
169
        # We want to repack at the end anyhow when more information
 
170
        # is available to do a better job of saving space.
 
171
        try:
 
172
            from bzrlib import groupcompress
 
173
            groupcompress._FAST = True
 
174
        except ImportError:
 
175
            pass
 
176
 
 
177
        # Create a write group. This is committed at the end of the import.
 
178
        # Checkpointing closes the current one and starts a new one.
 
179
        self.repo.start_write_group()
 
180
 
 
181
    def _load_info_and_params(self):
 
182
        self._mode = bool(self.params.get('mode', 'default'))
 
183
        self._experimental = self._mode == 'experimental'
 
184
 
 
185
        # This is currently hard-coded but might be configurable via
 
186
        # parameters one day if that's needed
 
187
        repo_transport = self.repo.control_files._transport
 
188
        self.id_map_path = repo_transport.local_abspath("fastimport-id-map")
 
189
 
 
190
        # Load the info file, if any
 
191
        info_path = self.params.get('info')
 
192
        if info_path is not None:
 
193
            self.info = configobj.ConfigObj(info_path)
 
194
        else:
 
195
            self.info = None
 
196
 
 
197
        # Decide which CommitHandler to use
 
198
        self.supports_chk = getattr(self.repo._format, 'supports_chks', False)
 
199
        if self.supports_chk and self._mode == 'classic':
 
200
            note("Cannot use classic algorithm on CHK repositories"
 
201
                 " - using default one instead")
 
202
            self._mode = 'default'
 
203
        if self._mode == 'classic':
 
204
            self.commit_handler_factory = \
 
205
                bzr_commit_handler.InventoryCommitHandler
 
206
        else:
 
207
            self.commit_handler_factory = \
 
208
                bzr_commit_handler.InventoryDeltaCommitHandler
 
209
 
 
210
        # Decide how often to automatically report progress
 
211
        # (not a parameter yet)
 
212
        self.progress_every = _DEFAULT_AUTO_PROGRESS
 
213
        if self.verbose:
 
214
            self.progress_every = self.progress_every / 10
 
215
 
 
216
        # Decide how often (# of commits) to automatically checkpoint
 
217
        self.checkpoint_every = int(self.params.get('checkpoint',
 
218
            _DEFAULT_AUTO_CHECKPOINT))
 
219
 
 
220
        # Decide how often (# of checkpoints) to automatically pack
 
221
        self.checkpoint_count = 0
 
222
        self.autopack_every = int(self.params.get('autopack',
 
223
            _DEFAULT_AUTO_PACK))
 
224
 
 
225
        # Decide how big to make the inventory cache
 
226
        cache_size = int(self.params.get('inv-cache', -1))
 
227
        if cache_size == -1:
 
228
            if self.supports_chk:
 
229
                cache_size = _DEFAULT_CHK_INV_CACHE_SIZE
 
230
            else:
 
231
                cache_size = _DEFAULT_INV_CACHE_SIZE
 
232
        self.inventory_cache_size = cache_size
 
233
 
 
234
        # Find the maximum number of commits to import (None means all)
 
235
        # and prepare progress reporting. Just in case the info file
 
236
        # has an outdated count of commits, we store the max counts
 
237
        # at which we need to terminate separately to the total used
 
238
        # for progress tracking.
 
239
        try:
 
240
            self.max_commits = int(self.params['count'])
 
241
            if self.max_commits < 0:
 
242
                self.max_commits = None
 
243
        except KeyError:
 
244
            self.max_commits = None
 
245
        if self.info is not None:
 
246
            self.total_commits = int(self.info['Command counts']['commit'])
 
247
            if (self.max_commits is not None and
 
248
                self.total_commits > self.max_commits):
 
249
                self.total_commits = self.max_commits
 
250
        else:
 
251
            self.total_commits = self.max_commits
 
252
 
 
253
    def _revision_store_factory(self):
 
254
        """Make a RevisionStore based on what the repository supports."""
 
255
        new_repo_api = hasattr(self.repo, 'revisions')
 
256
        if new_repo_api:
 
257
            return revision_store.RevisionStore2(self.repo)
 
258
        elif not self._experimental:
 
259
            return revision_store.RevisionStore1(self.repo)
 
260
        else:
 
261
            def fulltext_when(count):
 
262
                total = self.total_commits
 
263
                if total is not None and count == total:
 
264
                    fulltext = True
 
265
                else:
 
266
                    # Create an inventory fulltext every 200 revisions
 
267
                    fulltext = count % 200 == 0
 
268
                if fulltext:
 
269
                    self.note("%d commits - storing inventory as full-text",
 
270
                        count)
 
271
                return fulltext
 
272
 
 
273
            return revision_store.ImportRevisionStore1(
 
274
                self.repo, self.inventory_cache_size,
 
275
                fulltext_when=fulltext_when)
 
276
 
 
277
    def _process(self, command_iter):
 
278
        # if anything goes wrong, abort the write group if any
 
279
        try:
 
280
            processor.ImportProcessor._process(self, command_iter)
 
281
        except:
 
282
            if self.repo is not None and self.repo.is_in_write_group():
 
283
                self.repo.abort_write_group()
 
284
            raise
 
285
 
 
286
    def post_process(self):
 
287
        # Commit the current write group and checkpoint the id map
 
288
        self.repo.commit_write_group()
 
289
        self._save_id_map()
 
290
 
 
291
        if self.params.get("export-marks") is not None:
 
292
            marks_file.export_marks(self.params.get("export-marks"),
 
293
                self.cache_mgr.revision_ids)
 
294
 
 
295
        if self.cache_mgr.last_ref == None:
 
296
            """Nothing to refresh"""
 
297
            return
 
298
 
 
299
        # Update the branches
 
300
        self.note("Updating branch information ...")
 
301
        updater = branch_updater.BranchUpdater(self.repo, self.branch,
 
302
            self.cache_mgr, helpers.invert_dictset(self.cache_mgr.heads),
 
303
            self.cache_mgr.last_ref, self.tags)
 
304
        branches_updated, branches_lost = updater.update()
 
305
        self._branch_count = len(branches_updated)
 
306
 
 
307
        # Tell the user about branches that were not created
 
308
        if branches_lost:
 
309
            if not self.repo.is_shared():
 
310
                self.warning("Cannot import multiple branches into "
 
311
                    "a standalone branch")
 
312
            self.warning("Not creating branches for these head revisions:")
 
313
            for lost_info in branches_lost:
 
314
                head_revision = lost_info[1]
 
315
                branch_name = lost_info[0]
 
316
                self.note("\t %s = %s", head_revision, branch_name)
 
317
 
 
318
        # Update the working trees as requested
 
319
        self._tree_count = 0
 
320
        remind_about_update = True
 
321
        if self._branch_count == 0:
 
322
            self.note("no branches to update")
 
323
            self.note("no working trees to update")
 
324
            remind_about_update = False
 
325
        elif self.params.get('trees', False):
 
326
            trees = self._get_working_trees(branches_updated)
 
327
            if trees:
 
328
                self._update_working_trees(trees)
 
329
                remind_about_update = False
 
330
            else:
 
331
                self.warning("No working trees available to update")
 
332
        else:
 
333
            # Update just the trunk. (This is always the first branch
 
334
            # returned by the branch updater.)
 
335
            trunk_branch = branches_updated[0]
 
336
            trees = self._get_working_trees([trunk_branch])
 
337
            if trees:
 
338
                self._update_working_trees(trees)
 
339
                remind_about_update = self._branch_count > 1
 
340
 
 
341
        # Dump the cache stats now because we clear it before the final pack
 
342
        if self.verbose:
 
343
            self.cache_mgr.dump_stats()
 
344
        if self._original_max_pack_count:
 
345
            # We earlier disabled autopacking, creating one pack every
 
346
            # checkpoint instead. We now pack the repository to optimise
 
347
            # how data is stored.
 
348
            self.cache_mgr.clear_all()
 
349
            self._pack_repository()
 
350
 
 
351
        # Finish up by dumping stats & telling the user what to do next.
 
352
        self.dump_stats()
 
353
        if remind_about_update:
 
354
            # This message is explicitly not timestamped.
 
355
            note("To refresh the working tree for other branches, "
 
356
                "use 'bzr update' inside that branch.")
 
357
 
 
358
    def _update_working_trees(self, trees):
 
359
        if self.verbose:
 
360
            reporter = delta._ChangeReporter()
 
361
        else:
 
362
            reporter = None
 
363
        for wt in trees:
 
364
            self.note("Updating the working tree for %s ...", wt.basedir)
 
365
            wt.update(reporter)
 
366
            self._tree_count += 1
 
367
 
 
368
    def _pack_repository(self, final=True):
 
369
        # Before packing, free whatever memory we can and ensure
 
370
        # that groupcompress is configured to optimise disk space
 
371
        import gc
 
372
        if final:
 
373
            try:
 
374
                from bzrlib import groupcompress
 
375
            except ImportError:
 
376
                pass
 
377
            else:
 
378
                groupcompress._FAST = False
 
379
        gc.collect()
 
380
        self.note("Packing repository ...")
 
381
        self.repo.pack()
 
382
 
 
383
        # To be conservative, packing puts the old packs and
 
384
        # indices in obsolete_packs. We err on the side of
 
385
        # optimism and clear out that directory to save space.
 
386
        self.note("Removing obsolete packs ...")
 
387
        # TODO: Use a public API for this once one exists
 
388
        repo_transport = self.repo._pack_collection.transport
 
389
        repo_transport.clone('obsolete_packs').delete_multi(
 
390
            repo_transport.list_dir('obsolete_packs'))
 
391
 
 
392
        # If we're not done, free whatever memory we can
 
393
        if not final:
 
394
            gc.collect()
 
395
 
 
396
    def _get_working_trees(self, branches):
 
397
        """Get the working trees for branches in the repository."""
 
398
        result = []
 
399
        wt_expected = self.repo.make_working_trees()
 
400
        for br in branches:
 
401
            if br is None:
 
402
                continue
 
403
            elif br == self.branch:
 
404
                if self.working_tree:
 
405
                    result.append(self.working_tree)
 
406
            elif wt_expected:
 
407
                try:
 
408
                    result.append(br.bzrdir.open_workingtree())
 
409
                except errors.NoWorkingTree:
 
410
                    self.warning("No working tree for branch %s", br)
 
411
        return result
 
412
 
 
413
    def dump_stats(self):
 
414
        time_required = progress.str_tdelta(time.time() - self._start_time)
 
415
        rc = self._revision_count - self.skip_total
 
416
        bc = self._branch_count
 
417
        wtc = self._tree_count
 
418
        self.note("Imported %d %s, updating %d %s and %d %s in %s",
 
419
            rc, helpers.single_plural(rc, "revision", "revisions"),
 
420
            bc, helpers.single_plural(bc, "branch", "branches"),
 
421
            wtc, helpers.single_plural(wtc, "tree", "trees"),
 
422
            time_required)
 
423
 
 
424
    def _init_id_map(self):
 
425
        """Load the id-map and check it matches the repository.
 
426
        
 
427
        :return: the number of entries in the map
 
428
        """
 
429
        # Currently, we just check the size. In the future, we might
 
430
        # decide to be more paranoid and check that the revision-ids
 
431
        # are identical as well.
 
432
        self.cache_mgr.revision_ids, known = idmapfile.load_id_map(
 
433
            self.id_map_path)
 
434
        existing_count = len(self.repo.all_revision_ids())
 
435
        if existing_count < known:
 
436
            raise plugin_errors.BadRepositorySize(known, existing_count)
 
437
        return known
 
438
 
 
439
    def _save_id_map(self):
 
440
        """Save the id-map."""
 
441
        # Save the whole lot every time. If this proves a problem, we can
 
442
        # change to 'append just the new ones' at a later time.
 
443
        idmapfile.save_id_map(self.id_map_path, self.cache_mgr.revision_ids)
 
444
 
 
445
    def blob_handler(self, cmd):
 
446
        """Process a BlobCommand."""
 
447
        if cmd.mark is not None:
 
448
            dataref = cmd.id
 
449
        else:
 
450
            dataref = osutils.sha_strings(cmd.data)
 
451
        self.cache_mgr.store_blob(dataref, cmd.data)
 
452
 
 
453
    def checkpoint_handler(self, cmd):
 
454
        """Process a CheckpointCommand."""
 
455
        # Commit the current write group and start a new one
 
456
        self.repo.commit_write_group()
 
457
        self._save_id_map()
 
458
        # track the number of automatic checkpoints done
 
459
        if cmd is None:
 
460
            self.checkpoint_count += 1
 
461
            if self.checkpoint_count % self.autopack_every == 0:
 
462
                self._pack_repository(final=False)
 
463
        self.repo.start_write_group()
 
464
 
 
465
    def commit_handler(self, cmd):
 
466
        """Process a CommitCommand."""
 
467
        if self.skip_total and self._revision_count < self.skip_total:
 
468
            self.cache_mgr.track_heads(cmd)
 
469
            # Check that we really do know about this commit-id
 
470
            if not self.cache_mgr.revision_ids.has_key(cmd.id):
 
471
                raise plugin_errors.BadRestart(cmd.id)
 
472
            # Consume the file commands and free any non-sticky blobs
 
473
            for fc in cmd.file_iter():
 
474
                pass
 
475
            self.cache_mgr._blobs = {}
 
476
            self._revision_count += 1
 
477
            return
 
478
        if self.first_incremental_commit:
 
479
            self.first_incremental_commit = None
 
480
            parents = self.cache_mgr.track_heads(cmd)
 
481
 
 
482
        # 'Commit' the revision and report progress
 
483
        handler = self.commit_handler_factory(cmd, self.cache_mgr,
 
484
            self.rev_store, verbose=self.verbose,
 
485
            prune_empty_dirs=self.prune_empty_dirs)
 
486
        try:
 
487
            handler.process()
 
488
        except:
 
489
            print "ABORT: exception occurred processing commit %s" % (cmd.id)
 
490
            raise
 
491
        self.cache_mgr.revision_ids[cmd.id] = handler.revision_id
 
492
        self._revision_count += 1
 
493
        self.report_progress("(%s)" % cmd.id)
 
494
 
 
495
        # Check if we should finish up or automatically checkpoint
 
496
        if (self.max_commits is not None and
 
497
            self._revision_count >= self.max_commits):
 
498
            self.note("Stopping after reaching requested count of commits")
 
499
            self.finished = True
 
500
        elif self._revision_count % self.checkpoint_every == 0:
 
501
            self.note("%d commits - automatic checkpoint triggered",
 
502
                self._revision_count)
 
503
            self.checkpoint_handler(None)
 
504
 
 
505
    def report_progress(self, details=''):
 
506
        if self._revision_count % self.progress_every == 0:
 
507
            if self.total_commits is not None:
 
508
                counts = "%d/%d" % (self._revision_count, self.total_commits)
 
509
            else:
 
510
                counts = "%d" % (self._revision_count,)
 
511
            minutes = (time.time() - self._start_time) / 60
 
512
            revisions_added = self._revision_count - self.skip_total
 
513
            rate = revisions_added * 1.0 / minutes
 
514
            if rate > 10:
 
515
                rate_str = "at %.0f/minute " % rate
 
516
            else:
 
517
                rate_str = "at %.1f/minute " % rate
 
518
            self.note("%s commits processed %s%s" % (counts, rate_str, details))
 
519
 
 
520
    def progress_handler(self, cmd):
 
521
        """Process a ProgressCommand."""
 
522
        # We could use a progress bar here instead
 
523
        self.note("progress %s" % (cmd.message,))
 
524
 
 
525
    def reset_handler(self, cmd):
 
526
        """Process a ResetCommand."""
 
527
        if cmd.ref.startswith('refs/tags/'):
 
528
            tag_name = cmd.ref[len('refs/tags/'):]
 
529
            if cmd.from_ is not None:
 
530
                self._set_tag(tag_name, cmd.from_)
 
531
            elif self.verbose:
 
532
                self.warning("ignoring reset refs/tags/%s - no from clause"
 
533
                    % tag_name)
 
534
            return
 
535
 
 
536
        if cmd.from_ is not None:
 
537
            self.cache_mgr.track_heads_for_ref(cmd.ref, cmd.from_)
 
538
 
 
539
    def tag_handler(self, cmd):
 
540
        """Process a TagCommand."""
 
541
        if cmd.from_ is not None:
 
542
            self._set_tag(cmd.id, cmd.from_)
 
543
        else:
 
544
            self.warning("ignoring tag %s - no from clause" % cmd.id)
 
545
 
 
546
    def _set_tag(self, name, from_):
 
547
        """Define a tag given a name and import 'from' reference."""
 
548
        bzr_tag_name = name.decode('utf-8', 'replace')
 
549
        bzr_rev_id = self.cache_mgr.revision_ids[from_]
 
550
        self.tags[bzr_tag_name] = bzr_rev_id
 
551
 
 
552
    def feature_handler(self, cmd):
 
553
        """Process a FeatureCommand."""
 
554
        feature = cmd.feature_name
 
555
        if feature not in commands.FEATURE_NAMES:
 
556
            raise plugin_errors.UnknownFeature(feature)