Преглед изворни кода

Transaction safe band cleanup

Jonas Borgström пре 14 година
родитељ
комит
ef032d30b1
2 измењених фајлова са 42 додато и 5 уклоњено
  1. 16 0
      darc/helpers.py
  2. 26 5
      darc/store.py

+ 16 - 0
darc/helpers.py

@@ -6,6 +6,22 @@ import os
 import pwd
 import re
 import stat
+import struct
+
+
+def read_set(path):
+    """Read set from disk (as int32s)
+    """
+    with open(path, 'rb') as fd:
+        data = fd.read()
+        return set(struct.unpack('<%di' % (len(data) / 4), data))
+
+
+def write_set(s, path):
+    """Write set to disk (as int32s)
+    """
+    with open(path, 'wb') as fd:
+        fd.write(struct.pack('<%di' % len(s), *s))
 
 
 def encode_long(v):

+ 26 - 5
darc/store.py

@@ -8,7 +8,7 @@ import unittest
 from zlib import crc32
 
 from .hashindex import NSIndex, BandIndex
-from .helpers import IntegrityError
+from .helpers import IntegrityError, read_set, write_set
 from .lrucache import LRUCache
 
 
@@ -76,7 +76,17 @@ class Store(object):
         self.rollback()
         self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
 
+    def delete_bands(self):
+        delete_path = os.path.join(self.path, 'indexes', 'delete')
+        if os.path.exists(delete_path):
+            for band in read_set(delete_path):
+                assert self.bands.pop(band) == 0
+                self.io.delete_band(band)
+            os.unlink(delete_path)
+
     def begin_txn(self):
+        self.io.cleanup()
+        self.delete_bands()
         txn_dir = os.path.join(self.path, 'txn.tmp')
         # Initialize transaction snapshot
         os.mkdir(txn_dir)
@@ -111,8 +121,11 @@ class Store(object):
         shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
         self.indexes = {}
         self.txn_active = False
+        self.delete_bands()
 
     def compact_bands(self):
+        """Compact sparse bands by copying data into new bands
+        """
         if not self.compact:
             return
         self.io.close_band()
@@ -126,10 +139,7 @@ class Store(object):
                     self.bands[band] -= 1
                     self.bands.setdefault(new_band, 0)
                     self.bands[new_band] += 1
-
-        for band in self.compact:
-            assert self.bands.pop(band) == 0
-            self.io.delete_band(band)
+        write_set(self.compact, os.path.join(self.path, 'indexes', 'delete'))
 
     def rollback(self):
         """
@@ -206,6 +216,17 @@ class BandIO(object):
         for band in self.fds.keys():
             self.fds.pop(band).close()
 
+    def cleanup(self):
+        """Delete band files left by aborted transactions
+        """
+        band = self.band
+        while True:
+            filename = self.band_filename(band)
+            if not os.path.exists(filename):
+                break
+            os.unlink(filename)
+            band += 1
+
     def band_filename(self, band):
         return os.path.join(self.path, 'bands', str(band / self.bands_per_dir), str(band))