|  | @@ -175,11 +175,11 @@ class Repository:
 | 
											
												
													
														|  |              # the repository instance lives on - even if exceptions happened.
 |  |              # the repository instance lives on - even if exceptions happened.
 | 
											
												
													
														|  |              self._active_txn = False
 |  |              self._active_txn = False
 | 
											
												
													
														|  |              raise
 |  |              raise
 | 
											
												
													
														|  | -        if not self.index:
 |  | 
 | 
											
												
													
														|  | 
 |  | +        if not self.index or transaction_id is None:
 | 
											
												
													
														|  |              self.index = self.open_index(transaction_id)
 |  |              self.index = self.open_index(transaction_id)
 | 
											
												
													
														|  |          if transaction_id is None:
 |  |          if transaction_id is None:
 | 
											
												
													
														|  | -            self.segments = {}
 |  | 
 | 
											
												
													
														|  | -            self.compact = set()
 |  | 
 | 
											
												
													
														|  | 
 |  | +            self.segments = {}  # XXX bad name: usage_count_of_segment_x = self.segments[x]
 | 
											
												
													
														|  | 
 |  | +            self.compact = set()  # XXX bad name: segments_needing_compaction = self.compact
 | 
											
												
													
														|  |          else:
 |  |          else:
 | 
											
												
													
														|  |              if do_cleanup:
 |  |              if do_cleanup:
 | 
											
												
													
														|  |                  self.io.cleanup(transaction_id)
 |  |                  self.io.cleanup(transaction_id)
 | 
											
										
											
												
													
														|  | @@ -237,38 +237,49 @@ class Repository:
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      def replay_segments(self, index_transaction_id, segments_transaction_id):
 |  |      def replay_segments(self, index_transaction_id, segments_transaction_id):
 | 
											
												
													
														|  |          self.prepare_txn(index_transaction_id, do_cleanup=False)
 |  |          self.prepare_txn(index_transaction_id, do_cleanup=False)
 | 
											
												
													
														|  | -        for segment, filename in self.io.segment_iterator():
 |  | 
 | 
											
												
													
														|  | -            if index_transaction_id is not None and segment <= index_transaction_id:
 |  | 
 | 
											
												
													
														|  | -                continue
 |  | 
 | 
											
												
													
														|  | -            if segment > segments_transaction_id:
 |  | 
 | 
											
												
													
														|  | -                break
 |  | 
 | 
											
												
													
														|  | -            self.segments[segment] = 0
 |  | 
 | 
											
												
													
														|  | -            for tag, key, offset in self.io.iter_objects(segment):
 |  | 
 | 
											
												
													
														|  | -                if tag == TAG_PUT:
 |  | 
 | 
											
												
													
														|  | -                    try:
 |  | 
 | 
											
												
													
														|  | -                        s, _ = self.index[key]
 |  | 
 | 
											
												
													
														|  | -                        self.compact.add(s)
 |  | 
 | 
											
												
													
														|  | -                        self.segments[s] -= 1
 |  | 
 | 
											
												
													
														|  | -                    except KeyError:
 |  | 
 | 
											
												
													
														|  | -                        pass
 |  | 
 | 
											
												
													
														|  | -                    self.index[key] = segment, offset
 |  | 
 | 
											
												
													
														|  | -                    self.segments[segment] += 1
 |  | 
 | 
											
												
													
														|  | -                elif tag == TAG_DELETE:
 |  | 
 | 
											
												
													
														|  | -                    try:
 |  | 
 | 
											
												
													
														|  | -                        s, _ = self.index.pop(key)
 |  | 
 | 
											
												
													
														|  | -                        self.segments[s] -= 1
 |  | 
 | 
											
												
													
														|  | -                        self.compact.add(s)
 |  | 
 | 
											
												
													
														|  | -                    except KeyError:
 |  | 
 | 
											
												
													
														|  | -                        pass
 |  | 
 | 
											
												
													
														|  | -                    self.compact.add(segment)
 |  | 
 | 
											
												
													
														|  | -                elif tag == TAG_COMMIT:
 |  | 
 | 
											
												
													
														|  | 
 |  | +        try:
 | 
											
												
													
														|  | 
 |  | +            for segment, filename in self.io.segment_iterator():
 | 
											
												
													
														|  | 
 |  | +                if index_transaction_id is not None and segment <= index_transaction_id:
 | 
											
												
													
														|  |                      continue
 |  |                      continue
 | 
											
												
													
														|  | -                else:
 |  | 
 | 
											
												
													
														|  | -                    raise self.CheckNeeded(self.path)
 |  | 
 | 
											
												
													
														|  | -            if self.segments[segment] == 0:
 |  | 
 | 
											
												
													
														|  | 
 |  | +                if segment > segments_transaction_id:
 | 
											
												
													
														|  | 
 |  | +                    break
 | 
											
												
													
														|  | 
 |  | +                objects = self.io.iter_objects(segment)
 | 
											
												
													
														|  | 
 |  | +                self._update_index(segment, objects)
 | 
											
												
													
														|  | 
 |  | +            self.write_index()
 | 
											
												
													
														|  | 
 |  | +        finally:
 | 
											
												
													
														|  | 
 |  | +            self.rollback()
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +    def _update_index(self, segment, objects, report=None):
 | 
											
												
													
														|  | 
 |  | +        """some code shared between replay_segments and check"""
 | 
											
												
													
														|  | 
 |  | +        self.segments[segment] = 0
 | 
											
												
													
														|  | 
 |  | +        for tag, key, offset in objects:
 | 
											
												
													
														|  | 
 |  | +            if tag == TAG_PUT:
 | 
											
												
													
														|  | 
 |  | +                try:
 | 
											
												
													
														|  | 
 |  | +                    s, _ = self.index[key]
 | 
											
												
													
														|  | 
 |  | +                    self.compact.add(s)
 | 
											
												
													
														|  | 
 |  | +                    self.segments[s] -= 1
 | 
											
												
													
														|  | 
 |  | +                except KeyError:
 | 
											
												
													
														|  | 
 |  | +                    pass
 | 
											
												
													
														|  | 
 |  | +                self.index[key] = segment, offset
 | 
											
												
													
														|  | 
 |  | +                self.segments[segment] += 1
 | 
											
												
													
														|  | 
 |  | +            elif tag == TAG_DELETE:
 | 
											
												
													
														|  | 
 |  | +                try:
 | 
											
												
													
														|  | 
 |  | +                    s, _ = self.index.pop(key)
 | 
											
												
													
														|  | 
 |  | +                    self.segments[s] -= 1
 | 
											
												
													
														|  | 
 |  | +                    self.compact.add(s)
 | 
											
												
													
														|  | 
 |  | +                except KeyError:
 | 
											
												
													
														|  | 
 |  | +                    pass
 | 
											
												
													
														|  |                  self.compact.add(segment)
 |  |                  self.compact.add(segment)
 | 
											
												
													
														|  | -        self.write_index()
 |  | 
 | 
											
												
													
														|  | -        self.rollback()
 |  | 
 | 
											
												
													
														|  | 
 |  | +            elif tag == TAG_COMMIT:
 | 
											
												
													
														|  | 
 |  | +                continue
 | 
											
												
													
														|  | 
 |  | +            else:
 | 
											
												
													
														|  | 
 |  | +                msg = 'Unexpected tag {} in segment {}'.format(tag, segment)
 | 
											
												
													
														|  | 
 |  | +                if report is None:
 | 
											
												
													
														|  | 
 |  | +                    raise self.CheckNeeded(msg)
 | 
											
												
													
														|  | 
 |  | +                else:
 | 
											
												
													
														|  | 
 |  | +                    report(msg)
 | 
											
												
													
														|  | 
 |  | +        if self.segments[segment] == 0:
 | 
											
												
													
														|  | 
 |  | +            self.compact.add(segment)
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      def check(self, repair=False):
 |  |      def check(self, repair=False):
 | 
											
												
													
														|  |          """Check repository consistency
 |  |          """Check repository consistency
 | 
											
										
											
												
													
														|  | @@ -297,7 +308,7 @@ class Repository:
 | 
											
												
													
														|  |          if repair:
 |  |          if repair:
 | 
											
												
													
														|  |              self.io.cleanup(transaction_id)
 |  |              self.io.cleanup(transaction_id)
 | 
											
												
													
														|  |          segments_transaction_id = self.io.get_segments_transaction_id()
 |  |          segments_transaction_id = self.io.get_segments_transaction_id()
 | 
											
												
													
														|  | -        self.prepare_txn(None)
 |  | 
 | 
											
												
													
														|  | 
 |  | +        self.prepare_txn(None)  # self.index, self.compact, self.segments all empty now!
 | 
											
												
													
														|  |          for segment, filename in self.io.segment_iterator():
 |  |          for segment, filename in self.io.segment_iterator():
 | 
											
												
													
														|  |              if segment > transaction_id:
 |  |              if segment > transaction_id:
 | 
											
												
													
														|  |                  continue
 |  |                  continue
 | 
											
										
											
												
													
														|  | @@ -309,35 +320,16 @@ class Repository:
 | 
											
												
													
														|  |                  if repair:
 |  |                  if repair:
 | 
											
												
													
														|  |                      self.io.recover_segment(segment, filename)
 |  |                      self.io.recover_segment(segment, filename)
 | 
											
												
													
														|  |                      objects = list(self.io.iter_objects(segment))
 |  |                      objects = list(self.io.iter_objects(segment))
 | 
											
												
													
														|  | -            self.segments[segment] = 0
 |  | 
 | 
											
												
													
														|  | -            for tag, key, offset in objects:
 |  | 
 | 
											
												
													
														|  | -                if tag == TAG_PUT:
 |  | 
 | 
											
												
													
														|  | -                    try:
 |  | 
 | 
											
												
													
														|  | -                        s, _ = self.index[key]
 |  | 
 | 
											
												
													
														|  | -                        self.compact.add(s)
 |  | 
 | 
											
												
													
														|  | -                        self.segments[s] -= 1
 |  | 
 | 
											
												
													
														|  | -                    except KeyError:
 |  | 
 | 
											
												
													
														|  | -                        pass
 |  | 
 | 
											
												
													
														|  | -                    self.index[key] = segment, offset
 |  | 
 | 
											
												
													
														|  | -                    self.segments[segment] += 1
 |  | 
 | 
											
												
													
														|  | -                elif tag == TAG_DELETE:
 |  | 
 | 
											
												
													
														|  | -                    try:
 |  | 
 | 
											
												
													
														|  | -                        s, _ = self.index.pop(key)
 |  | 
 | 
											
												
													
														|  | -                        self.segments[s] -= 1
 |  | 
 | 
											
												
													
														|  | -                        self.compact.add(s)
 |  | 
 | 
											
												
													
														|  | -                    except KeyError:
 |  | 
 | 
											
												
													
														|  | -                        pass
 |  | 
 | 
											
												
													
														|  | -                    self.compact.add(segment)
 |  | 
 | 
											
												
													
														|  | -                elif tag == TAG_COMMIT:
 |  | 
 | 
											
												
													
														|  | -                    continue
 |  | 
 | 
											
												
													
														|  | -                else:
 |  | 
 | 
											
												
													
														|  | -                    report_error('Unexpected tag {} in segment {}'.format(tag, segment))
 |  | 
 | 
											
												
													
														|  | 
 |  | +            self._update_index(segment, objects, report_error)
 | 
											
												
													
														|  | 
 |  | +        # self.index, self.segments, self.compact now reflect the state of the segment files up to <transaction_id>
 | 
											
												
													
														|  |          # We might need to add a commit tag if no committed segment is found
 |  |          # We might need to add a commit tag if no committed segment is found
 | 
											
												
													
														|  |          if repair and segments_transaction_id is None:
 |  |          if repair and segments_transaction_id is None:
 | 
											
												
													
														|  |              report_error('Adding commit tag to segment {}'.format(transaction_id))
 |  |              report_error('Adding commit tag to segment {}'.format(transaction_id))
 | 
											
												
													
														|  |              self.io.segment = transaction_id + 1
 |  |              self.io.segment = transaction_id + 1
 | 
											
												
													
														|  |              self.io.write_commit()
 |  |              self.io.write_commit()
 | 
											
												
													
														|  |          if current_index and not repair:
 |  |          if current_index and not repair:
 | 
											
												
													
														|  | 
 |  | +            # current_index = "as found on disk"
 | 
											
												
													
														|  | 
 |  | +            # self.index = "as rebuilt in-memory from segments"
 | 
											
												
													
														|  |              if len(current_index) != len(self.index):
 |  |              if len(current_index) != len(self.index):
 | 
											
												
													
														|  |                  report_error('Index object count mismatch. {} != {}'.format(len(current_index), len(self.index)))
 |  |                  report_error('Index object count mismatch. {} != {}'.format(len(current_index), len(self.index)))
 | 
											
												
													
														|  |              elif current_index:
 |  |              elif current_index:
 |