/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/pack_repo.py

  • Committer: Jelmer Vernooij
  • Date: 2017-07-23 22:06:41 UTC
  • mfrom: (6738 trunk)
  • mto: This revision was merged to the branch mainline in revision 6739.
  • Revision ID: jelmer@jelmer.uk-20170723220641-69eczax9bmv8d6kk
Merge trunk, address review comments.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
 
17
from __future__ import absolute_import
 
18
 
17
19
import re
18
20
import sys
19
21
 
20
22
from ..lazy_import import lazy_import
21
23
lazy_import(globals(), """
22
 
import contextlib
 
24
from itertools import izip
23
25
import time
24
26
 
25
27
from breezy import (
 
28
    cleanup,
26
29
    config,
27
30
    debug,
28
31
    graph,
29
32
    osutils,
30
33
    transactions,
 
34
    tsort,
31
35
    ui,
32
36
    )
33
37
from breezy.bzr import (
 
38
    chk_map,
34
39
    pack,
35
40
    )
36
41
from breezy.bzr.index import (
37
42
    CombinedGraphIndex,
 
43
    GraphIndexPrefixAdapter,
38
44
    )
39
45
""")
40
46
from .. import (
47
53
    )
48
54
 
49
55
from ..decorators import (
 
56
    needs_read_lock,
 
57
    needs_write_lock,
50
58
    only_raises,
51
59
    )
52
60
from ..lock import LogicalLockResult
58
66
    MetaDirRepository,
59
67
    RepositoryFormatMetaDir,
60
68
    )
 
69
from ..sixish import (
 
70
    reraise,
 
71
    )
61
72
from ..bzr.vf_repository import (
62
73
    MetaDirVersionedFileRepository,
63
74
    MetaDirVersionedFileRepositoryFormat,
64
75
    VersionedFileCommitBuilder,
 
76
    VersionedFileRootCommitBuilder,
65
77
    )
66
78
from ..trace import (
67
79
    mutter,
81
93
                 timezone=None, committer=None, revprops=None,
82
94
                 revision_id=None, lossy=False):
83
95
        VersionedFileCommitBuilder.__init__(self, repository, parents, config,
84
 
                                            timestamp=timestamp, timezone=timezone, committer=committer,
85
 
                                            revprops=revprops, revision_id=revision_id, lossy=lossy)
 
96
            timestamp=timestamp, timezone=timezone, committer=committer,
 
97
            revprops=revprops, revision_id=revision_id, lossy=lossy)
 
98
        self._file_graph = graph.Graph(
 
99
            repository._pack_collection.text_index.combined_index)
 
100
 
 
101
    def _heads(self, file_id, revision_ids):
 
102
        keys = [(file_id, revision_id) for revision_id in revision_ids]
 
103
        return {key[1] for key in self._file_graph.heads(keys)}
 
104
 
 
105
 
 
106
class PackRootCommitBuilder(VersionedFileRootCommitBuilder):
 
107
    """A subclass of RootCommitBuilder to add texts with pack semantics.
 
108
 
 
109
    Specifically this uses one knit object rather than one knit object per
 
110
    added text, reducing memory and object pressure.
 
111
    """
 
112
 
 
113
    def __init__(self, repository, parents, config, timestamp=None,
 
114
                 timezone=None, committer=None, revprops=None,
 
115
                 revision_id=None, lossy=False):
 
116
        super(PackRootCommitBuilder, self).__init__(repository, parents,
 
117
            config, timestamp=timestamp, timezone=timezone,
 
118
            committer=committer, revprops=revprops, revision_id=revision_id,
 
119
            lossy=lossy)
86
120
        self._file_graph = graph.Graph(
87
121
            repository._pack_collection.text_index.combined_index)
88
122
 
109
143
        }
110
144
 
111
145
    def __init__(self, revision_index, inventory_index, text_index,
112
 
                 signature_index, chk_index=None):
 
146
        signature_index, chk_index=None):
113
147
        """Create a pack instance.
114
148
 
115
149
        :param revision_index: A GraphIndex for determining what revisions are
145
179
        """
146
180
        missing_items = {}
147
181
        for (index_name, external_refs, index) in [
148
 
                ('texts',
149
 
                    self._get_external_refs(self.text_index),
150
 
                    self._pack_collection.text_index.combined_index),
151
 
                ('inventories',
152
 
                    self._get_external_refs(self.inventory_index),
153
 
                    self._pack_collection.inventory_index.combined_index),
154
 
                ]:
 
182
            ('texts',
 
183
                self._get_external_refs(self.text_index),
 
184
                self._pack_collection.text_index.combined_index),
 
185
            ('inventories',
 
186
                self._get_external_refs(self.inventory_index),
 
187
                self._pack_collection.inventory_index.combined_index),
 
188
            ]:
155
189
            missing = external_refs.difference(
156
190
                k for (idx, k, v, r) in
157
191
                index.iter_entries(external_refs))
200
234
        if index_type == 'chk':
201
235
            unlimited_cache = True
202
236
        index = self.index_class(self.index_transport,
203
 
                                 self.index_name(index_type, self.name),
204
 
                                 self.index_sizes[self.index_offset(
205
 
                                     index_type)],
206
 
                                 unlimited_cache=unlimited_cache)
 
237
                    self.index_name(index_type, self.name),
 
238
                    self.index_sizes[self.index_offset(index_type)],
 
239
                    unlimited_cache=unlimited_cache)
207
240
        if index_type == 'chk':
208
241
            index._leaf_factory = btree_index._gcchk_factory
209
242
        setattr(self, index_type + '_index', index)
210
243
 
211
 
    def __lt__(self, other):
212
 
        if not isinstance(other, Pack):
213
 
            raise TypeError(other)
214
 
        return (id(self) < id(other))
215
 
 
216
 
    def __hash__(self):
217
 
        return hash((type(self), self.revision_index, self.inventory_index,
218
 
                     self.text_index, self.signature_index, self.chk_index))
219
 
 
220
244
 
221
245
class ExistingPack(Pack):
222
246
    """An in memory proxy for an existing .pack and its disk indices."""
223
247
 
224
248
    def __init__(self, pack_transport, name, revision_index, inventory_index,
225
 
                 text_index, signature_index, chk_index=None):
 
249
        text_index, signature_index, chk_index=None):
226
250
        """Create an ExistingPack object.
227
251
 
228
252
        :param pack_transport: The transport where the pack file resides.
229
253
        :param name: The name of the pack on disk in the pack_transport.
230
254
        """
231
255
        Pack.__init__(self, revision_index, inventory_index, text_index,
232
 
                      signature_index, chk_index)
 
256
            signature_index, chk_index)
233
257
        self.name = name
234
258
        self.pack_transport = pack_transport
235
259
        if None in (revision_index, inventory_index, text_index,
236
 
                    signature_index, name, pack_transport):
 
260
                signature_index, name, pack_transport):
237
261
            raise AssertionError()
238
262
 
239
263
    def __eq__(self, other):
247
271
            self.__class__.__module__, self.__class__.__name__, id(self),
248
272
            self.pack_transport, self.name)
249
273
 
250
 
    def __hash__(self):
251
 
        return hash((type(self), self.name))
252
 
 
253
274
 
254
275
class ResumedPack(ExistingPack):
255
276
 
256
277
    def __init__(self, name, revision_index, inventory_index, text_index,
257
 
                 signature_index, upload_transport, pack_transport, index_transport,
258
 
                 pack_collection, chk_index=None):
 
278
        signature_index, upload_transport, pack_transport, index_transport,
 
279
        pack_collection, chk_index=None):
259
280
        """Create a ResumedPack object."""
260
281
        ExistingPack.__init__(self, pack_transport, name, revision_index,
261
 
                              inventory_index, text_index, signature_index,
262
 
                              chk_index=chk_index)
 
282
            inventory_index, text_index, signature_index,
 
283
            chk_index=chk_index)
263
284
        self.upload_transport = upload_transport
264
285
        self.index_transport = index_transport
265
286
        self.index_sizes = [None, None, None, None]
291
312
    def abort(self):
292
313
        self.upload_transport.delete(self.file_name())
293
314
        indices = [self.revision_index, self.inventory_index, self.text_index,
294
 
                   self.signature_index]
 
315
            self.signature_index]
295
316
        if self.chk_index is not None:
296
317
            indices.append(self.chk_index)
297
318
        for index in indices:
339
360
        else:
340
361
            chk_index = None
341
362
        Pack.__init__(self,
342
 
                      # Revisions: parents list, no text compression.
343
 
                      index_builder_class(reference_lists=1),
344
 
                      # Inventory: We want to map compression only, but currently the
345
 
                      # knit code hasn't been updated enough to understand that, so we
346
 
                      # have a regular 2-list index giving parents and compression
347
 
                      # source.
348
 
                      index_builder_class(reference_lists=2),
349
 
                      # Texts: compression and per file graph, for all fileids - so two
350
 
                      # reference lists and two elements in the key tuple.
351
 
                      index_builder_class(reference_lists=2, key_elements=2),
352
 
                      # Signatures: Just blobs to store, no compression, no parents
353
 
                      # listing.
354
 
                      index_builder_class(reference_lists=0),
355
 
                      # CHK based storage - just blobs, no compression or parents.
356
 
                      chk_index=chk_index
357
 
                      )
 
363
            # Revisions: parents list, no text compression.
 
364
            index_builder_class(reference_lists=1),
 
365
            # Inventory: We want to map compression only, but currently the
 
366
            # knit code hasn't been updated enough to understand that, so we
 
367
            # have a regular 2-list index giving parents and compression
 
368
            # source.
 
369
            index_builder_class(reference_lists=2),
 
370
            # Texts: compression and per file graph, for all fileids - so two
 
371
            # reference lists and two elements in the key tuple.
 
372
            index_builder_class(reference_lists=2, key_elements=2),
 
373
            # Signatures: Just blobs to store, no compression, no parents
 
374
            # listing.
 
375
            index_builder_class(reference_lists=0),
 
376
            # CHK based storage - just blobs, no compression or parents.
 
377
            chk_index=chk_index
 
378
            )
358
379
        self._pack_collection = pack_collection
359
380
        # When we make readonly indices, we need this.
360
381
        self.index_class = pack_collection._index_class
385
406
            self.random_name, mode=self._file_mode)
386
407
        if 'pack' in debug.debug_flags:
387
408
            mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
388
 
                   time.ctime(), self.upload_transport.base, self.random_name,
389
 
                   time.time() - self.start_time)
 
409
                time.ctime(), self.upload_transport.base, self.random_name,
 
410
                time.time() - self.start_time)
390
411
        # A list of byte sequences to be written to the new pack, and the
391
412
        # aggregate size of them.  Stored as a list rather than separate
392
413
        # variables so that the _write_data closure below can update them.
396
417
        # robertc says- this is a closure rather than a method on the object
397
418
        # so that the variables are locals, and faster than accessing object
398
419
        # members.
399
 
 
400
420
        def _write_data(bytes, flush=False, _buffer=self._buffer,
401
 
                        _write=self.write_stream.write, _update=self._hash.update):
 
421
            _write=self.write_stream.write, _update=self._hash.update):
402
422
            _buffer[0].append(bytes)
403
423
            _buffer[1] += len(bytes)
404
424
            # buffer cap
437
457
    def data_inserted(self):
438
458
        """True if data has been added to this pack."""
439
459
        return bool(self.get_revision_count() or
440
 
                    self.inventory_index.key_count() or
441
 
                    self.text_index.key_count() or
442
 
                    self.signature_index.key_count() or
443
 
                    (self.chk_index is not None and self.chk_index.key_count()))
 
460
            self.inventory_index.key_count() or
 
461
            self.text_index.key_count() or
 
462
            self.signature_index.key_count() or
 
463
            (self.chk_index is not None and self.chk_index.key_count()))
444
464
 
445
465
    def finish_content(self):
446
466
        if self.name is not None:
447
467
            return
448
468
        self._writer.end()
449
469
        if self._buffer[1]:
450
 
            self._write_data(b'', flush=True)
 
470
            self._write_data('', flush=True)
451
471
        self.name = self._hash.hexdigest()
452
472
 
453
473
    def finish(self, suspend=False):
471
491
        # they're in the names list.
472
492
        self.index_sizes = [None, None, None, None]
473
493
        self._write_index('revision', self.revision_index, 'revision',
474
 
                          suspend)
 
494
            suspend)
475
495
        self._write_index('inventory', self.inventory_index, 'inventory',
476
 
                          suspend)
 
496
            suspend)
477
497
        self._write_index('text', self.text_index, 'file texts', suspend)
478
498
        self._write_index('signature', self.signature_index,
479
 
                          'revision signatures', suspend)
 
499
            'revision signatures', suspend)
480
500
        if self.chk_index is not None:
481
501
            self.index_sizes.append(None)
482
502
            self._write_index('chk', self.chk_index,
483
 
                              'content hash bytes', suspend)
 
503
                'content hash bytes', suspend)
484
504
        self.write_stream.close(
485
505
            want_fdatasync=self._pack_collection.config_stack.get('repository.fdatasync'))
486
506
        # Note that this will clobber an existing pack with the same name,
502
522
        if 'pack' in debug.debug_flags:
503
523
            # XXX: size might be interesting?
504
524
            mutter('%s: create_pack: pack finished: %s%s->%s t+%6.3fs',
505
 
                   time.ctime(), self.upload_transport.base, self.random_name,
506
 
                   new_name, time.time() - self.start_time)
 
525
                time.ctime(), self.upload_transport.base, self.random_name,
 
526
                new_name, time.time() - self.start_time)
507
527
 
508
528
    def flush(self):
509
529
        """Flush any current data."""
534
554
        index_tempfile = index.finish()
535
555
        index_bytes = index_tempfile.read()
536
556
        write_stream = transport.open_write_stream(index_name,
537
 
                                                   mode=self._file_mode)
 
557
            mode=self._file_mode)
538
558
        write_stream.write(index_bytes)
539
559
        write_stream.close(
540
560
            want_fdatasync=self._pack_collection.config_stack.get('repository.fdatasync'))
542
562
        if 'pack' in debug.debug_flags:
543
563
            # XXX: size might be interesting?
544
564
            mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
545
 
                   time.ctime(), label, self.upload_transport.base,
546
 
                   self.random_name, time.time() - self.start_time)
 
565
                time.ctime(), label, self.upload_transport.base,
 
566
                self.random_name, time.time() - self.start_time)
547
567
        # Replace the writable index on this object with a readonly,
548
568
        # presently unloaded index. We should alter
549
569
        # the index layer to make its finish() error if add_node is
606
626
        """
607
627
        if self.add_callback is not None:
608
628
            raise AssertionError(
609
 
                "%s already has a writable index through %s" %
 
629
                "%s already has a writable index through %s" % \
610
630
                (self, self.add_callback))
611
631
        # allow writing: queue writes to a new index
612
632
        self.add_index(index, pack)
632
652
        del self.combined_index._indices[pos]
633
653
        del self.combined_index._index_names[pos]
634
654
        if (self.add_callback is not None and
635
 
                getattr(index, 'add_nodes', None) == self.add_callback):
 
655
            getattr(index, 'add_nodes', None) == self.add_callback):
636
656
            self.add_callback = None
637
657
            self.data_access.set_writer(None, None, (None, None))
638
658
 
696
716
            else:
697
717
                self.revision_ids = frozenset(self.revision_ids)
698
718
                self.revision_keys = frozenset((revid,) for revid in
699
 
                                               self.revision_ids)
 
719
                    self.revision_ids)
700
720
        if pb is None:
701
721
            self.pb = ui.ui_factory.nested_progress_bar()
702
722
        else:
710
730
    def open_pack(self):
711
731
        """Open a pack for the pack we are creating."""
712
732
        new_pack = self._pack_collection.pack_factory(self._pack_collection,
713
 
                                                      upload_suffix=self.suffix,
714
 
                                                      file_mode=self._pack_collection.repo.controldir._get_file_mode())
 
733
                upload_suffix=self.suffix,
 
734
                file_mode=self._pack_collection.repo.controldir._get_file_mode())
715
735
        # We know that we will process all nodes in order, and don't need to
716
736
        # query, so don't combine any indices spilled to disk until we are done
717
737
        new_pack.revision_index.set_optimize(combine_backing_indices=False)
742
762
    def _log_copied_texts(self):
743
763
        if 'pack' in debug.debug_flags:
744
764
            mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
745
 
                   time.ctime(), self._pack_collection._upload_transport.base,
746
 
                   self.new_pack.random_name,
747
 
                   self.new_pack.text_index.key_count(),
748
 
                   time.time() - self.new_pack.start_time)
 
765
                time.ctime(), self._pack_collection._upload_transport.base,
 
766
                self.new_pack.random_name,
 
767
                self.new_pack.text_index.key_count(),
 
768
                time.time() - self.new_pack.start_time)
749
769
 
750
770
    def _use_pack(self, new_pack):
751
771
        """Return True if new_pack should be used.
791
811
        self._index_builder_class = index_builder_class
792
812
        self._index_class = index_class
793
813
        self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3,
794
 
                                '.cix': 4}
 
814
            '.cix': 4}
795
815
        self.packs = []
796
816
        # name:Pack mapping
797
817
        self._names = None
807
827
        self.text_index = AggregateIndex(self.reload_pack_names, flush)
808
828
        self.signature_index = AggregateIndex(self.reload_pack_names, flush)
809
829
        all_indices = [self.revision_index, self.inventory_index,
810
 
                       self.text_index, self.signature_index]
 
830
                self.text_index, self.signature_index]
811
831
        if use_chk_index:
812
832
            self.chk_index = AggregateIndex(self.reload_pack_names, flush)
813
833
            all_indices.append(self.chk_index)
910
930
        num_old_packs = sum([len(po[1]) for po in pack_operations])
911
931
        num_revs_affected = sum([po[0] for po in pack_operations])
912
932
        mutter('Auto-packing repository %s, which has %d pack files, '
913
 
               'containing %d revisions. Packing %d files into %d affecting %d'
914
 
               ' revisions', str(
915
 
                   self), total_packs, total_revisions, num_old_packs,
916
 
               num_new_packs, num_revs_affected)
 
933
            'containing %d revisions. Packing %d files into %d affecting %d'
 
934
            ' revisions', self, total_packs, total_revisions, num_old_packs,
 
935
            num_new_packs, num_revs_affected)
917
936
        result = self._execute_pack_operations(pack_operations, packer_class=self.normal_packer_class,
918
 
                                               reload_func=self._restart_autopack)
919
 
        mutter('Auto-packing repository %s completed', str(self))
 
937
                                      reload_func=self._restart_autopack)
 
938
        mutter('Auto-packing repository %s completed', self)
920
939
        return result
921
940
 
922
941
    def _execute_pack_operations(self, pack_operations, packer_class,
923
 
                                 reload_func=None):
 
942
            reload_func=None):
924
943
        """Execute a series of pack operations.
925
944
 
926
945
        :param pack_operations: A list of [revision_count, packs_to_combine].
932
951
            if len(packs) == 0:
933
952
                continue
934
953
            packer = packer_class(self, packs, '.autopack',
935
 
                                  reload_func=reload_func)
 
954
                                   reload_func=reload_func)
936
955
            try:
937
956
                result = packer.pack()
938
957
            except errors.RetryWithNewPacks:
982
1001
        # XXX: the following may want to be a class, to pack with a given
983
1002
        # policy.
984
1003
        mutter('Packing repository %s, which has %d pack files, '
985
 
               'containing %d revisions with hint %r.', str(self), total_packs,
986
 
               total_revisions, hint)
 
1004
            'containing %d revisions with hint %r.', self, total_packs,
 
1005
            total_revisions, hint)
987
1006
        while True:
988
1007
            try:
989
1008
                self._try_pack_operations(hint)
1007
1026
                pack_operations[-1][0] += pack.get_revision_count()
1008
1027
                pack_operations[-1][1].append(pack)
1009
1028
        self._execute_pack_operations(pack_operations,
1010
 
                                      packer_class=self.optimising_packer_class,
1011
 
                                      reload_func=self._restart_pack_operations)
 
1029
            packer_class=self.optimising_packer_class,
 
1030
            reload_func=self._restart_pack_operations)
1012
1031
 
1013
1032
    def plan_autopack_combinations(self, existing_packs, pack_distribution):
1014
1033
        """Plan a pack operation.
1058
1077
            final_pack_list.extend(pack_files)
1059
1078
        if len(final_pack_list) == 1:
1060
1079
            raise AssertionError('We somehow generated an autopack with a'
1061
 
                                 ' single pack file being moved.')
 
1080
                ' single pack file being moved.')
1062
1081
            return []
1063
1082
        return [[final_rev_count, final_pack_list]]
1064
1083
 
1075
1094
            self._names = {}
1076
1095
            self._packs_at_load = set()
1077
1096
            for index, key, value in self._iter_disk_pack_index():
1078
 
                name = key[0].decode('ascii')
 
1097
                name = key[0]
1079
1098
                self._names[name] = self._parse_index_sizes(value)
1080
 
                self._packs_at_load.add((name, value))
 
1099
                self._packs_at_load.add((key, value))
1081
1100
            result = True
1082
1101
        else:
1083
1102
            result = False
1087
1106
 
1088
1107
    def _parse_index_sizes(self, value):
1089
1108
        """Parse a string of index sizes."""
1090
 
        return tuple(int(digits) for digits in value.split(b' '))
 
1109
        return tuple([int(digits) for digits in value.split(' ')])
1091
1110
 
1092
1111
    def get_pack_by_name(self, name):
1093
1112
        """Get a Pack object by name.
1107
1126
            else:
1108
1127
                chk_index = None
1109
1128
            result = ExistingPack(self._pack_transport, name, rev_index,
1110
 
                                  inv_index, txt_index, sig_index, chk_index)
 
1129
                inv_index, txt_index, sig_index, chk_index)
1111
1130
            self.add_pack_to_memory(result)
1112
1131
            return result
1113
1132
 
1133
1152
            else:
1134
1153
                chk_index = None
1135
1154
            result = self.resumed_pack_factory(name, rev_index, inv_index,
1136
 
                                               txt_index, sig_index, self._upload_transport,
1137
 
                                               self._pack_transport, self._index_transport, self,
1138
 
                                               chk_index=chk_index)
 
1155
                txt_index, sig_index, self._upload_transport,
 
1156
                self._pack_transport, self._index_transport, self,
 
1157
                chk_index=chk_index)
1139
1158
        except errors.NoSuchFile as e:
1140
1159
            raise errors.UnresumableWriteGroup(self.repo, [name], str(e))
1141
1160
        self.add_pack_to_memory(result)
1163
1182
        :return: An iterator of the index contents.
1164
1183
        """
1165
1184
        return self._index_class(self.transport, 'pack-names', None
1166
 
                                 ).iter_all_entries()
 
1185
                ).iter_all_entries()
1167
1186
 
1168
1187
    def _make_index(self, name, suffix, resume=False, is_chk=False):
1169
1188
        size_offset = self._suffix_offsets[suffix]
1176
1195
            index_size = self._names[name][size_offset]
1177
1196
        index = self._index_class(transport, index_name, index_size,
1178
1197
                                  unlimited_cache=is_chk)
1179
 
        if is_chk and self._index_class is btree_index.BTreeGraphIndex:
 
1198
        if is_chk and self._index_class is btree_index.BTreeGraphIndex: 
1180
1199
            index._leaf_factory = btree_index._gcchk_factory
1181
1200
        return index
1182
1201
 
1215
1234
            try:
1216
1235
                try:
1217
1236
                    pack.pack_transport.move(pack.file_name(),
1218
 
                                             '../obsolete_packs/' + pack.file_name())
 
1237
                        '../obsolete_packs/' + pack.file_name())
1219
1238
                except errors.NoSuchFile:
1220
1239
                    # perhaps obsolete_packs was removed? Let's create it and
1221
1240
                    # try again
1224
1243
                    except errors.FileExists:
1225
1244
                        pass
1226
1245
                    pack.pack_transport.move(pack.file_name(),
1227
 
                                             '../obsolete_packs/' + pack.file_name())
 
1246
                        '../obsolete_packs/' + pack.file_name())
1228
1247
            except (errors.PathError, errors.TransportError) as e:
1229
1248
                # TODO: Should these be warnings or mutters?
1230
1249
                mutter("couldn't rename obsolete pack, skipping it:\n%s"
1238
1257
            for suffix in suffixes:
1239
1258
                try:
1240
1259
                    self._index_transport.move(pack.name + suffix,
1241
 
                                               '../obsolete_packs/' + pack.name + suffix)
 
1260
                        '../obsolete_packs/' + pack.name + suffix)
1242
1261
                except (errors.PathError, errors.TransportError) as e:
1243
1262
                    mutter("couldn't rename obsolete index, skipping it:\n%s"
1244
1263
                           % (e,))
1326
1345
        # load the disk nodes across
1327
1346
        disk_nodes = set()
1328
1347
        for index, key, value in self._iter_disk_pack_index():
1329
 
            disk_nodes.add((key[0].decode('ascii'), value))
 
1348
            disk_nodes.add((key, value))
1330
1349
        orig_disk_nodes = set(disk_nodes)
1331
1350
 
1332
1351
        # do a two-way diff against our original content
1333
1352
        current_nodes = set()
1334
1353
        for name, sizes in self._names.items():
1335
1354
            current_nodes.add(
1336
 
                (name, b' '.join(b'%d' % size for size in sizes)))
 
1355
                ((name, ), ' '.join(str(size) for size in sizes)))
1337
1356
 
1338
1357
        # Packs no longer present in the repository, which were present when we
1339
1358
        # locked the repository
1363
1382
        new_names = dict(disk_nodes)
1364
1383
        # drop no longer present nodes
1365
1384
        for pack in self.all_packs():
1366
 
            if pack.name not in new_names:
 
1385
            if (pack.name,) not in new_names:
1367
1386
                removed.append(pack.name)
1368
1387
                self._remove_pack_from_memory(pack)
1369
1388
        # add new nodes/refresh existing ones
1370
 
        for name, value in disk_nodes:
 
1389
        for key, value in disk_nodes:
 
1390
            name = key[0]
1371
1391
            sizes = self._parse_index_sizes(value)
1372
1392
            if name in self._names:
1373
1393
                # existing
1414
1434
            # TODO: handle same-name, index-size-changes here -
1415
1435
            # e.g. use the value from disk, not ours, *unless* we're the one
1416
1436
            # changing it.
1417
 
            for name, value in disk_nodes:
1418
 
                builder.add_node((name.encode('ascii'), ), value)
 
1437
            for key, value in disk_nodes:
 
1438
                builder.add_node(key, value)
1419
1439
            self.transport.put_file('pack-names', builder.finish(),
1420
 
                                    mode=self.repo.controldir._get_file_mode())
 
1440
                mode=self.repo.controldir._get_file_mode())
1421
1441
            self._packs_at_load = disk_nodes
1422
1442
            if clear_obsolete_packs:
1423
1443
                to_preserve = None
1437
1457
            obsolete_packs = [o for o in obsolete_packs
1438
1458
                              if o.name not in already_obsolete]
1439
1459
            self._obsolete_packs(obsolete_packs)
1440
 
        return [new_node[0] for new_node in new_nodes]
 
1460
        return [new_node[0][0] for new_node in new_nodes]
1441
1461
 
1442
1462
    def reload_pack_names(self):
1443
1463
        """Sync our pack listing with what is present in the repository.
1449
1469
        :return: True if the in-memory list of packs has been altered at all.
1450
1470
        """
1451
1471
        # The ensure_loaded call is to handle the case where the first call
1452
 
        # made involving the collection was to reload_pack_names, where we
 
1472
        # made involving the collection was to reload_pack_names, where we 
1453
1473
        # don't have a view of disk contents. It's a bit of a bandaid, and
1454
1474
        # causes two reads of pack-names, but it's a rare corner case not
1455
1475
        # struck with regular push/pull etc.
1517
1537
        if not self.repo.is_write_locked():
1518
1538
            raise errors.NotWriteLocked(self)
1519
1539
        self._new_pack = self.pack_factory(self, upload_suffix='.pack',
1520
 
                                           file_mode=self.repo.controldir._get_file_mode())
 
1540
            file_mode=self.repo.controldir._get_file_mode())
1521
1541
        # allow writing: queue writes to a new index
1522
1542
        self.revision_index.add_writable_index(self._new_pack.revision_index,
1523
 
                                               self._new_pack)
 
1543
            self._new_pack)
1524
1544
        self.inventory_index.add_writable_index(self._new_pack.inventory_index,
1525
 
                                                self._new_pack)
 
1545
            self._new_pack)
1526
1546
        self.text_index.add_writable_index(self._new_pack.text_index,
1527
 
                                           self._new_pack)
 
1547
            self._new_pack)
1528
1548
        self._new_pack.text_index.set_optimize(combine_backing_indices=False)
1529
1549
        self.signature_index.add_writable_index(self._new_pack.signature_index,
1530
 
                                                self._new_pack)
 
1550
            self._new_pack)
1531
1551
        if self.chk_index is not None:
1532
1552
            self.chk_index.add_writable_index(self._new_pack.chk_index,
1533
 
                                              self._new_pack)
 
1553
                self._new_pack)
1534
1554
            self.repo.chk_bytes._index._add_callback = self.chk_index.add_callback
1535
 
            self._new_pack.chk_index.set_optimize(
1536
 
                combine_backing_indices=False)
 
1555
            self._new_pack.chk_index.set_optimize(combine_backing_indices=False)
1537
1556
 
1538
1557
        self.repo.inventories._index._add_callback = self.inventory_index.add_callback
1539
1558
        self.repo.revisions._index._add_callback = self.revision_index.add_callback
1544
1563
        # FIXME: just drop the transient index.
1545
1564
        # forget what names there are
1546
1565
        if self._new_pack is not None:
1547
 
            with contextlib.ExitStack() as stack:
1548
 
                stack.callback(setattr, self, '_new_pack', None)
1549
 
                # If we aborted while in the middle of finishing the write
1550
 
                # group, _remove_pack_indices could fail because the indexes are
1551
 
                # already gone.  But they're not there we shouldn't fail in this
1552
 
                # case, so we pass ignore_missing=True.
1553
 
                stack.callback(self._remove_pack_indices, self._new_pack,
1554
 
                               ignore_missing=True)
1555
 
                self._new_pack.abort()
 
1566
            operation = cleanup.OperationWithCleanups(self._new_pack.abort)
 
1567
            operation.add_cleanup(setattr, self, '_new_pack', None)
 
1568
            # If we aborted while in the middle of finishing the write
 
1569
            # group, _remove_pack_indices could fail because the indexes are
 
1570
            # already gone.  But they're not there we shouldn't fail in this
 
1571
            # case, so we pass ignore_missing=True.
 
1572
            operation.add_cleanup(self._remove_pack_indices, self._new_pack,
 
1573
                ignore_missing=True)
 
1574
            operation.run_simple()
1556
1575
        for resumed_pack in self._resumed_packs:
1557
 
            with contextlib.ExitStack() as stack:
1558
 
                # See comment in previous finally block.
1559
 
                stack.callback(self._remove_pack_indices, resumed_pack,
1560
 
                               ignore_missing=True)
1561
 
                resumed_pack.abort()
 
1576
            operation = cleanup.OperationWithCleanups(resumed_pack.abort)
 
1577
            # See comment in previous finally block.
 
1578
            operation.add_cleanup(self._remove_pack_indices, resumed_pack,
 
1579
                ignore_missing=True)
 
1580
            operation.run_simple()
1562
1581
        del self._resumed_packs[:]
1563
1582
 
1564
1583
    def _remove_resumed_pack_indices(self):
1575
1594
        # The base implementation does no checks.  GCRepositoryPackCollection
1576
1595
        # overrides this.
1577
1596
        return []
1578
 
 
 
1597
        
1579
1598
    def _commit_write_group(self):
1580
1599
        all_missing = set()
1581
1600
        for prefix, versioned_file in (
1589
1608
        if all_missing:
1590
1609
            raise errors.BzrCheckError(
1591
1610
                "Repository %s has missing compression parent(s) %r "
1592
 
                % (self.repo, sorted(all_missing)))
 
1611
                 % (self.repo, sorted(all_missing)))
1593
1612
        problems = self._check_new_inventories()
1594
1613
        if problems:
1595
1614
            problems_summary = '\n'.join(problems)
1676
1695
    _serializer = None
1677
1696
 
1678
1697
    def __init__(self, _format, a_controldir, control_files, _commit_builder_class,
1679
 
                 _serializer):
 
1698
        _serializer):
1680
1699
        MetaDirRepository.__init__(self, _format, a_controldir, control_files)
1681
1700
        self._commit_builder_class = _commit_builder_class
1682
1701
        self._serializer = _serializer
1688
1707
            self._unstacked_provider = graph.CachingParentsProvider(self)
1689
1708
        self._unstacked_provider.disable_cache()
1690
1709
 
 
1710
    @needs_read_lock
1691
1711
    def _all_revision_ids(self):
1692
1712
        """See Repository.all_revision_ids()."""
1693
 
        with self.lock_read():
1694
 
            return [key[0] for key in self.revisions.keys()]
 
1713
        return [key[0] for key in self.revisions.keys()]
1695
1714
 
1696
1715
    def _abort_write_group(self):
1697
1716
        self.revisions._index._key_dependencies.clear()
1801
1820
        # not supported - raise an error
1802
1821
        raise NotImplementedError(self.dont_leave_lock_in_place)
1803
1822
 
 
1823
    @needs_write_lock
1804
1824
    def pack(self, hint=None, clean_obsolete_packs=False):
1805
1825
        """Compress the data within the repository.
1806
1826
 
1807
1827
        This will pack all the data to a single pack. In future it may
1808
1828
        recompress deltas or do other such expensive operations.
1809
1829
        """
1810
 
        with self.lock_write():
1811
 
            self._pack_collection.pack(
1812
 
                hint=hint, clean_obsolete_packs=clean_obsolete_packs)
 
1830
        self._pack_collection.pack(hint=hint, clean_obsolete_packs=clean_obsolete_packs)
1813
1831
 
 
1832
    @needs_write_lock
1814
1833
    def reconcile(self, other=None, thorough=False):
1815
1834
        """Reconcile this repository."""
1816
 
        from .reconcile import PackReconciler
1817
 
        with self.lock_write():
1818
 
            reconciler = PackReconciler(self, thorough=thorough)
1819
 
            return reconciler.reconcile()
 
1835
        from breezy.reconcile import PackReconciler
 
1836
        reconciler = PackReconciler(self, thorough=thorough)
 
1837
        reconciler.reconcile()
 
1838
        return reconciler
1820
1839
 
1821
1840
    def _reconcile_pack(self, collection, packs, extension, revs, pb):
1822
1841
        raise NotImplementedError(self._reconcile_pack)
1897
1916
        dirs = ['indices', 'obsolete_packs', 'packs', 'upload']
1898
1917
        builder = self.index_builder_class()
1899
1918
        files = [('pack-names', builder.finish())]
1900
 
        utf8_files = [('format', self.get_format_string())]
 
1919
        # GZ 2017-06-09: Where should format strings get decoded...
 
1920
        utf8_files = [('format', self.get_format_string().encode('ascii'))]
1901
1921
 
1902
 
        self._upload_blank_content(
1903
 
            a_controldir, dirs, files, utf8_files, shared)
 
1922
        self._upload_blank_content(a_controldir, dirs, files, utf8_files, shared)
1904
1923
        repository = self.open(a_controldir=a_controldir, _found=True)
1905
1924
        self._run_post_repo_init_hooks(repository, a_controldir, shared)
1906
1925
        return repository
1919
1938
        else:
1920
1939
            repo_transport = a_controldir.get_repository_transport(None)
1921
1940
        control_files = lockable_files.LockableFiles(repo_transport,
1922
 
                                                     'lock', lockdir.LockDir)
 
1941
                                'lock', lockdir.LockDir)
1923
1942
        return self.repository_class(_format=self,
1924
 
                                     a_controldir=a_controldir,
1925
 
                                     control_files=control_files,
1926
 
                                     _commit_builder_class=self._commit_builder_class,
1927
 
                                     _serializer=self._serializer)
 
1943
                              a_controldir=a_controldir,
 
1944
                              control_files=control_files,
 
1945
                              _commit_builder_class=self._commit_builder_class,
 
1946
                              _serializer=self._serializer)
1928
1947
 
1929
1948
 
1930
1949
class RetryPackOperations(errors.RetryWithNewPacks):
1958
1977
        self._reload_func = reload_func
1959
1978
        self._flush_func = flush_func
1960
1979
 
1961
 
    def add_raw_record(self, key, size, raw_data):
1962
 
        """Add raw knit bytes to a storage area.
1963
 
 
1964
 
        The data is spooled to the container writer in one bytes-record per
1965
 
        raw data item.
1966
 
 
1967
 
        :param key: key of the data segment
1968
 
        :param size: length of the data segment
1969
 
        :param raw_data: A bytestring containing the data.
1970
 
        :return: An opaque index memo For _DirectPackAccess the memo is
1971
 
            (index, pos, length), where the index field is the write_index
1972
 
            object supplied to the PackAccess object.
1973
 
        """
1974
 
        p_offset, p_length = self._container_writer.add_bytes_record(
1975
 
            raw_data, size, [])
1976
 
        return (self._write_index, p_offset, p_length)
1977
 
 
1978
1980
    def add_raw_records(self, key_sizes, raw_data):
1979
1981
        """Add raw knit bytes to a storage area.
1980
1982
 
1989
1991
            length), where the index field is the write_index object supplied
1990
1992
            to the PackAccess object.
1991
1993
        """
1992
 
        raw_data = b''.join(raw_data)
1993
1994
        if not isinstance(raw_data, bytes):
1994
1995
            raise AssertionError(
1995
1996
                'data must be plain bytes was %s' % type(raw_data))
1996
1997
        result = []
1997
1998
        offset = 0
1998
1999
        for key, size in key_sizes:
1999
 
            result.append(
2000
 
                self.add_raw_record(key, size, [raw_data[offset:offset + size]]))
 
2000
            p_offset, p_length = self._container_writer.add_bytes_record(
 
2001
                raw_data[offset:offset+size], [])
2001
2002
            offset += size
 
2003
            result.append((self._write_index, p_offset, p_length))
2002
2004
        return result
2003
2005
 
2004
2006
    def flush(self):
2093
2095
                is_error = True
2094
2096
        if is_error:
2095
2097
            # GZ 2017-03-27: No real reason this needs the original traceback.
2096
 
            raise retry_exc.exc_info[1]
 
2098
            reraise(*retry_exc.exc_info)