|
@@ -175,7 +175,7 @@ class Repository:
|
|
|
# the repository instance lives on - even if exceptions happened.
|
|
|
self._active_txn = False
|
|
|
raise
|
|
|
- if not self.index:
|
|
|
+ if not self.index or transaction_id is None:
|
|
|
self.index = self.open_index(transaction_id)
|
|
|
if transaction_id is None:
|
|
|
self.segments = {}
|
|
@@ -237,38 +237,41 @@ class Repository:
|
|
|
|
|
|
def replay_segments(self, index_transaction_id, segments_transaction_id):
|
|
|
self.prepare_txn(index_transaction_id, do_cleanup=False)
|
|
|
- for segment, filename in self.io.segment_iterator():
|
|
|
- if index_transaction_id is not None and segment <= index_transaction_id:
|
|
|
- continue
|
|
|
- if segment > segments_transaction_id:
|
|
|
- break
|
|
|
- self.segments[segment] = 0
|
|
|
- for tag, key, offset in self.io.iter_objects(segment):
|
|
|
- if tag == TAG_PUT:
|
|
|
- try:
|
|
|
- s, _ = self.index[key]
|
|
|
- self.compact.add(s)
|
|
|
- self.segments[s] -= 1
|
|
|
- except KeyError:
|
|
|
- pass
|
|
|
- self.index[key] = segment, offset
|
|
|
- self.segments[segment] += 1
|
|
|
- elif tag == TAG_DELETE:
|
|
|
- try:
|
|
|
- s, _ = self.index.pop(key)
|
|
|
- self.segments[s] -= 1
|
|
|
- self.compact.add(s)
|
|
|
- except KeyError:
|
|
|
- pass
|
|
|
- self.compact.add(segment)
|
|
|
- elif tag == TAG_COMMIT:
|
|
|
+ try:
|
|
|
+ for segment, filename in self.io.segment_iterator():
|
|
|
+ if index_transaction_id is not None and segment <= index_transaction_id:
|
|
|
continue
|
|
|
- else:
|
|
|
- raise self.CheckNeeded(self.path)
|
|
|
- if self.segments[segment] == 0:
|
|
|
- self.compact.add(segment)
|
|
|
- self.write_index()
|
|
|
- self.rollback()
|
|
|
+ if segment > segments_transaction_id:
|
|
|
+ break
|
|
|
+ # code duplication below?? vvv (see similar code in check())
|
|
|
+ self.segments[segment] = 0
|
|
|
+ for tag, key, offset in self.io.iter_objects(segment):
|
|
|
+ if tag == TAG_PUT:
|
|
|
+ try:
|
|
|
+ s, _ = self.index[key]
|
|
|
+ self.compact.add(s)
|
|
|
+ self.segments[s] -= 1
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+ self.index[key] = segment, offset
|
|
|
+ self.segments[segment] += 1
|
|
|
+ elif tag == TAG_DELETE:
|
|
|
+ try:
|
|
|
+ s, _ = self.index.pop(key)
|
|
|
+ self.segments[s] -= 1
|
|
|
+ self.compact.add(s)
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+ self.compact.add(segment)
|
|
|
+ elif tag == TAG_COMMIT:
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ raise self.CheckNeeded(self.path)
|
|
|
+ if self.segments[segment] == 0:
|
|
|
+ self.compact.add(segment)
|
|
|
+ self.write_index()
|
|
|
+ finally:
|
|
|
+ self.rollback()
|
|
|
|
|
|
def check(self, repair=False):
|
|
|
"""Check repository consistency
|
|
@@ -297,7 +300,7 @@ class Repository:
|
|
|
if repair:
|
|
|
self.io.cleanup(transaction_id)
|
|
|
segments_transaction_id = self.io.get_segments_transaction_id()
|
|
|
- self.prepare_txn(None)
|
|
|
+ self.prepare_txn(None) # self.index, self.compact, self.segments all empty now!
|
|
|
for segment, filename in self.io.segment_iterator():
|
|
|
if segment > transaction_id:
|
|
|
continue
|