Procházet zdrojové kódy

Automatically replay segments to rebuild missing repository index

Jonas Borgström před 11 roky
rodič
revize
bd22bc8cb2
3 změnil soubory, kde provedl 133 přidání a 45 odebrání
  1. 54 15
      attic/repository.py
  2. 5 0
      attic/testsuite/mock.py
  3. 74 30
      attic/testsuite/repository.py

+ 54 - 15
attic/repository.py

@@ -45,7 +45,6 @@ class Repository(object):
     class CheckNeeded(Error):
         '''Inconsistency detected. Please run "attic check {}"'''
 
-
     def __init__(self, path, create=False):
         self.path = path
         self.io = None
@@ -88,6 +87,12 @@ class Repository(object):
     def get_transaction_id(self):
         index_transaction_id = self.get_index_transaction_id()
         segments_transaction_id = self.io.get_segments_transaction_id()
+        # Attempt to automatically rebuild index if we crashed between commit
+        # tag write and index save
+        if (index_transaction_id if index_transaction_id is not None else -1) < (segments_transaction_id if segments_transaction_id is not None else -1):
+            self.replay_segments(index_transaction_id, segments_transaction_id)
+            index_transaction_id = self.get_index_transaction_id()
+
         if index_transaction_id != segments_transaction_id:
             raise self.CheckNeeded(self.path)
         return index_transaction_id
@@ -127,14 +132,16 @@ class Repository(object):
             return {}
         return NSIndex((os.path.join(self.path, 'index.%d') % transaction_id).encode('utf-8'), readonly=True)
 
-    def get_index(self, transaction_id):
+    def get_index(self, transaction_id, do_cleanup=True):
+        self._active_txn = True
         self.lock.upgrade()
         if transaction_id is None:
             self.index = NSIndex.create(os.path.join(self.path, 'index.tmp').encode('utf-8'))
             self.segments = {}
             self.compact = set()
         else:
-            self.io.cleanup(transaction_id)
+            if do_cleanup:
+                self.io.cleanup(transaction_id)
             shutil.copy(os.path.join(self.path, 'index.%d' % transaction_id),
                         os.path.join(self.path, 'index.tmp'))
             self.index = NSIndex(os.path.join(self.path, 'index.tmp').encode('utf-8'))
@@ -161,6 +168,7 @@ class Repository(object):
             if name.endswith(current):
                 continue
             os.unlink(os.path.join(self.path, name))
+        self.index = None
 
     def compact_segments(self):
         """Compact sparse segments by copying data into new segments
@@ -186,6 +194,41 @@ class Repository(object):
             self.io.delete_segment(segment)
         self.compact = set()
 
+    def replay_segments(self, index_transaction_id, segments_transaction_id):
+        self.get_index(index_transaction_id, do_cleanup=False)
+        for segment, filename in self.io.segment_iterator():
+            if index_transaction_id is not None and segment <= index_transaction_id:
+                continue
+            if segment > segments_transaction_id:
+                break
+            self.segments[segment] = 0
+            for tag, key, offset in self.io.iter_objects(segment):
+                if tag == TAG_PUT:
+                    try:
+                        s, _ = self.index[key]
+                        self.compact.add(s)
+                        self.segments[s] -= 1
+                    except KeyError:
+                        pass
+                    self.index[key] = segment, offset
+                    self.segments[segment] += 1
+                elif tag == TAG_DELETE:
+                    try:
+                        s, _ = self.index.pop(key)
+                    except KeyError:
+                        raise self.CheckNeeded(self.path)
+                    self.segments[s] -= 1
+                    self.compact.add(s)
+                    self.compact.add(segment)
+                elif tag == TAG_COMMIT:
+                    continue
+                else:
+                    raise self.CheckNeeded(self.path)
+            if self.segments[segment] == 0:
+                self.compact.add(segment)
+        self.write_index()
+        self.rollback()
+
     def check(self, progress=False, repair=False):
         """Check repository consistency
 
@@ -220,11 +263,6 @@ class Repository(object):
 
         for segment, filename in self.io.segment_iterator():
             if segment > transaction_id:
-                if repair:
-                    report_progress('Deleting uncommitted segment {}'.format(segment), error=True)
-                    self.io.delete_segment(segment)
-                else:
-                    report_progress('Uncommitted segment {} found'.format(segment), error=True)
                 continue
             try:
                 objects = list(self.io.iter_objects(segment))
@@ -241,7 +279,6 @@ class Repository(object):
                         s, _ = self.index[key]
                         self.compact.add(s)
                         self.segments[s] -= 1
-                        report_progress('Key found in more than one segment. Segment={}, key={}'.format(segment, hexlify(key)), error=True)
                     except KeyError:
                         pass
                     self.index[key] = segment, offset
@@ -264,15 +301,19 @@ class Repository(object):
             self.io.segment = transaction_id + 1
             self.io.write_commit()
             self.io.close_segment()
-        if current_index and len(current_index) != len(self.index):
-            report_progress('Index object count mismatch. {} != {}'.format(len(current_index), len(self.index)), error=True)
+        if current_index and not repair:
+            if len(current_index) != len(self.index) and False:
+                report_progress('Index object count mismatch. {} != {}'.format(len(current_index), len(self.index)), error=True)
+            elif current_index:
+                for key, value in self.index.iteritems():
+                    if current_index.get(key, (-1, -1)) != value:
+                        report_progress('Index mismatch for key {}. {} != {}'.format(key, value, current_index.get(key, (-1, -1))), error=True)
         if not error_found:
             report_progress('Repository check complete, no problems found.')
         if repair:
+            self.compact_segments()
             self.write_index()
         else:
-            # Delete temporary index file
-            self.index = None
             os.unlink(os.path.join(self.path, 'index.tmp'))
         self.rollback()
         return not error_found or repair
@@ -309,7 +350,6 @@ class Repository(object):
     def put(self, id, data, wait=True):
         if not self._active_txn:
             self.get_index(self.get_transaction_id())
-            self._active_txn = True
         try:
             segment, _ = self.index[id]
             self.segments[segment] -= 1
@@ -327,7 +367,6 @@ class Repository(object):
     def delete(self, id, wait=True):
         if not self._active_txn:
             self.get_index(self.get_transaction_id())
-            self._active_txn = True
         try:
             segment, offset = self.index.pop(id)
             self.segments[segment] -= 1

+ 5 - 0
attic/testsuite/mock.py

@@ -0,0 +1,5 @@
+try:
+    # Only available in python 3.3+
+    from unittest.mock import *
+except ImportError:
+    from mock import *

+ 74 - 30
attic/testsuite/repository.py

@@ -1,14 +1,15 @@
 import os
 import shutil
 import tempfile
+from attic.testsuite.mock import patch
 from attic.hashindex import NSIndex
-from attic.helpers import Location, IntegrityError
+from attic.helpers import Location, IntegrityError, UpgradableLock
 from attic.remote import RemoteRepository
 from attic.repository import Repository
 from attic.testsuite import AtticTestCase
 
 
-class RepositoryTestCase(AtticTestCase):
+class RepositoryTestCaseBase(AtticTestCase):
 
     def open(self, create=False):
         return Repository(os.path.join(self.tmppath, 'repository'), create=create)
@@ -21,6 +22,14 @@ class RepositoryTestCase(AtticTestCase):
         self.repository.close()
         shutil.rmtree(self.tmppath)
 
+    def reopen(self):
+        if self.repository:
+            self.repository.close()
+        self.repository = self.open()
+
+
+class RepositoryTestCase(RepositoryTestCaseBase):
+
     def test1(self):
         for x in range(100):
             self.repository.put(('%-32d' % x).encode('ascii'), b'SOMEDATA')
@@ -101,23 +110,72 @@ class RepositoryTestCase(AtticTestCase):
         self.assert_equal(len(self.repository.list(limit=50)), 50)
 
 
-class RepositoryCheckTestCase(AtticTestCase):
+class RepositoryCommitTestCase(RepositoryTestCaseBase):
 
-    def open(self, create=False):
-        return Repository(os.path.join(self.tmppath, 'repository'), create=create)
+    def add_keys(self):
+        self.repository.put(b'00000000000000000000000000000000', b'foo')
+        self.repository.put(b'00000000000000000000000000000001', b'bar')
+        self.repository.commit()
+        self.repository.put(b'00000000000000000000000000000001', b'bar2')
+        self.repository.put(b'00000000000000000000000000000002', b'boo')
+
+    def test_replay_of_missing_index(self):
+        self.add_keys()
+        for name in os.listdir(self.repository.path):
+            if name.startswith('index.'):
+                os.unlink(os.path.join(self.repository.path, name))
+        self.reopen()
+        self.assert_equal(len(self.repository), 2)
+        self.assert_equal(self.repository.check(), True)
+
+    def test_crash_before_compact_segments(self):
+        self.add_keys()
+        self.repository.compact_segments = None
+        try:
+            self.repository.commit()
+        except TypeError:
+            pass
+        self.reopen()
+        self.assert_equal(len(self.repository), 3)
+        self.assert_equal(self.repository.check(), True)
+
+    def test_replay_of_readonly_repository(self):
+        self.add_keys()
+        for name in os.listdir(self.repository.path):
+            if name.startswith('index.'):
+                os.unlink(os.path.join(self.repository.path, name))
+        with patch.object(UpgradableLock, 'upgrade', side_effect=UpgradableLock.LockUpgradeFailed) as upgrade:
+            self.reopen()
+            self.assert_raises(UpgradableLock.LockUpgradeFailed, lambda: len(self.repository))
+            upgrade.assert_called_once()
+
+
+    def test_crash_before_write_index(self):
+        self.add_keys()
+        self.repository.write_index = None
+        try:
+            self.repository.commit()
+        except TypeError:
+            pass
+        self.reopen()
+        self.assert_equal(len(self.repository), 3)
+        self.assert_equal(self.repository.check(), True)
+
+    def test_crash_before_deleting_compacted_segments(self):
+        self.add_keys()
+        self.repository.io.delete_segment = None
+        try:
+            self.repository.commit()
+        except TypeError:
+            pass
+        self.reopen()
+        self.assert_equal(len(self.repository), 3)
+        self.assert_equal(self.repository.check(), True)
+        self.assert_equal(len(self.repository), 3)
 
-    def reopen(self):
-        if self.repository:
-            self.repository.close()
-        self.repository = self.open()
 
-    def setUp(self):
-        self.tmppath = tempfile.mkdtemp()
-        self.repository = self.open(create=True)
 
-    def tearDown(self):
-        self.repository.close()
-        shutil.rmtree(self.tmppath)
+class RepositoryCheckTestCase(RepositoryTestCaseBase):
 
     def list_indices(self):
         return [name for name in os.listdir(os.path.join(self.tmppath, 'repository')) if name.startswith('index.')]
@@ -161,7 +219,7 @@ class RepositoryCheckTestCase(AtticTestCase):
                   os.path.join(self.tmppath, 'repository', new_name))
 
     def list_objects(self):
-        return set((int(key) for key, _ in list(self.open_index().iteritems())))
+        return set(int(key) for key in self.repository.list())
 
     def test_repair_corrupted_segment(self):
         self.add_objects([[1, 2, 3], [4, 5, 6]])
@@ -228,22 +286,8 @@ class RepositoryCheckTestCase(AtticTestCase):
     def test_repair_missing_index(self):
         self.add_objects([[1, 2, 3], [4, 5, 6]])
         self.delete_index()
-        self.assert_raises(Repository.CheckNeeded, lambda: self.get_objects(4))
-        self.check(status=False)
-        self.check(repair=True, status=True)
-        self.check(status=True)
-        self.get_objects(4)
-        self.assert_equal(set([1, 2, 3, 4, 5, 6]), self.list_objects())
-
-    def test_repair_index_too_old(self):
-        self.add_objects([[1, 2, 3], [4, 5, 6]])
-        self.assert_equal(self.list_indices(), ['index.1'])
-        self.rename_index('index.0')
-        self.assert_equal(self.list_indices(), ['index.0'])
-        self.assert_raises(Repository.CheckNeeded, lambda: self.get_objects(4))
         self.check(status=False)
         self.check(repair=True, status=True)
-        self.assert_equal(self.list_indices(), ['index.1'])
         self.check(status=True)
         self.get_objects(4)
         self.assert_equal(set([1, 2, 3, 4, 5, 6]), self.list_objects())