Explorar o código

Handle crash between commit and delete_bands

Jonas Borgström %!s(int64=14) %!d(string=hai) anos
pai
achega
fa6a45cb1c
Modificáronse 4 ficheiros con 67 adicións e 32 borrados
  1. 11 0
      darc/_hashindex.c
  2. 1 0
      darc/hashindex.h
  3. 14 4
      darc/hashindex.pyx
  4. 41 28
      darc/store.py

+ 11 - 0
darc/_hashindex.c

@@ -159,6 +159,17 @@ error:
     return NULL;
 }
 
+void
+hashindex_clear(HashIndex *index)
+{
+    int i;
+    for(i = 0; i < index->num_buckets; i++) {
+        BUCKET_MARK_DELETED(index, i);
+    }
+    index->num_entries = 0;
+    hashindex_resize(index, 16);
+}
+
 void
 hashindex_flush(HashIndex *index)
 {

+ 1 - 0
darc/hashindex.h

@@ -16,6 +16,7 @@ typedef struct {
 
 HashIndex *hashindex_open(const char *path);
 void hashindex_close(HashIndex *index);
+void hashindex_clear(HashIndex *index);
 void hashindex_flush(HashIndex *index);
 HashIndex *hashindex_create(const char *path, int capacity, int key_size, int value_size);
 const void *hashindex_get(HashIndex *index, const void *key);

+ 14 - 4
darc/hashindex.pyx

@@ -7,6 +7,7 @@ cdef extern from "hashindex.h":
     HashIndex *hashindex_open(char *path)
     HashIndex *hashindex_create(char *path, int capacity, int key_size, int value_size)
     int hashindex_get_size(HashIndex *index)
+    void hashindex_clear(HashIndex *index)
     void hashindex_close(HashIndex *index)
     void hashindex_flush(HashIndex *index)
     void *hashindex_get(HashIndex *index, void *key)
@@ -14,6 +15,7 @@ cdef extern from "hashindex.h":
     void hashindex_delete(HashIndex *index, void *key)
     void hashindex_set(HashIndex *index, void *key, void *value)
 
+_NoDefault = object()
 
 cdef class IndexBase:
     cdef HashIndex *index
@@ -26,6 +28,9 @@ cdef class IndexBase:
     def __dealloc__(self):
         hashindex_close(self.index)
 
+    def clear(self):
+        hashindex_clear(self.index)
+
     def flush(self):
         hashindex_flush(self.index)
 
@@ -39,10 +44,15 @@ cdef class IndexBase:
         except KeyError:
             return default
 
-    def pop(self, key):
-        value = self[key]
-        del self[key]
-        return value
+    def pop(self, key, default=_NoDefault):
+        try:
+            value = self[key]
+            del self[key]
+            return value
+        except KeyError:
+            if default != _NoDefault:
+                return default
+            raise
 
     def __len__(self):
         return hashindex_get_size(self.index)

+ 41 - 28
darc/store.py

@@ -1,4 +1,5 @@
 from ConfigParser import RawConfigParser
+import errno
 import fcntl
 import os
 import shutil
@@ -45,7 +46,6 @@ class Store(object):
             fd.write('This is a DARC store')
         os.mkdir(os.path.join(path, 'bands'))
         os.mkdir(os.path.join(path, 'indexes'))
-        BandIndex.create(os.path.join(path, 'indexes', 'bands'))
         config = RawConfigParser()
         config.add_section('store')
         config.set('store', 'version', '1')
@@ -75,18 +75,18 @@ class Store(object):
         max_band_size = self.config.getint('store', 'max_band_size')
         bands_per_dir = self.config.getint('store', 'bands_per_dir')
         self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
+        self.io.cleanup()
 
     def delete_bands(self):
         delete_path = os.path.join(self.path, 'indexes', 'delete')
         if os.path.exists(delete_path):
+            bands = self.get_index('bands')
             for band in read_set(delete_path):
-                assert self.bands.pop(band) == 0
-                self.io.delete_band(band)
+                assert bands.pop(band, 0) == 0
+                self.io.delete_band(band, missing_ok=True)
             os.unlink(delete_path)
 
     def begin_txn(self):
-        self.io.cleanup()
-        self.delete_bands()
         txn_dir = os.path.join(self.path, 'txn.tmp')
         # Initialize transaction snapshot
         os.mkdir(txn_dir)
@@ -97,7 +97,6 @@ class Store(object):
                   os.path.join(self.path, 'txn.active'))
         self.compact = set()
         self.txn_active = True
-        self.bands = BandIndex(os.path.join(self.path, 'indexes', 'bands'))
 
     def close(self):
         self.rollback()
@@ -115,13 +114,11 @@ class Store(object):
             self.config.write(fd)
         for i in self.indexes.values():
             i.flush()
-        self.bands.flush()
+        # If we crash before this line, the transaction will be
+        # rolled back by open()
         os.rename(os.path.join(self.path, 'txn.active'),
-                  os.path.join(self.path, 'txn.tmp'))
-        shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
-        self.indexes = {}
-        self.txn_active = False
-        self.delete_bands()
+                  os.path.join(self.path, 'txn.commit'))
+        self.rollback()
 
     def compact_bands(self):
         """Compact sparse bands by copying data into new bands
@@ -131,22 +128,25 @@ class Store(object):
         self.io.close_band()
         def lookup(ns, key):
             return key in self.get_index(ns)
+        bands = self.get_index('bands')
         for band in self.compact:
-            if self.bands[band] > 0:
+            if bands[band] > 0:
                 for ns, key, data in self.io.iter_objects(band, lookup):
                     new_band, offset = self.io.write(ns, key, data)
                     self.indexes[ns][key] = new_band, offset
-                    self.bands[band] -= 1
-                    self.bands.setdefault(new_band, 0)
-                    self.bands[new_band] += 1
+                    bands[band] -= 1
+                    bands.setdefault(new_band, 0)
+                    bands[new_band] += 1
         write_set(self.compact, os.path.join(self.path, 'indexes', 'delete'))
 
     def rollback(self):
         """
         """
-        # Remove partial transaction
-        if os.path.exists(os.path.join(self.path, 'txn.tmp')):
-            shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
+        # Commit any half committed transaction
+        if os.path.exists(os.path.join(self.path, 'txn.commit')):
+            self.delete_bands()
+            os.rename(os.path.join(self.path, 'txn.commit'),
+                      os.path.join(self.path, 'txn.tmp'))
         # Roll back active transaction
         txn_dir = os.path.join(self.path, 'txn.active')
         if os.path.exists(txn_dir):
@@ -154,7 +154,10 @@ class Store(object):
             shutil.copytree(os.path.join(txn_dir, 'indexes'),
                             os.path.join(self.path, 'indexes'))
             shutil.copy(os.path.join(txn_dir, 'config'), self.path)
-            shutil.rmtree(txn_dir)
+            os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
+        # Remove partially removed transaction
+        if os.path.exists(os.path.join(self.path, 'txn.tmp')):
+            shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
         self.indexes = {}
         self.txn_active = False
 
@@ -162,11 +165,16 @@ class Store(object):
         try:
             return self.indexes[ns]
         except KeyError:
-            filename = os.path.join(self.path, 'indexes', 'ns%d' % ns)
+            if ns == 'bands':
+                filename = os.path.join(self.path, 'indexes', 'bands')
+                cls = BandIndex
+            else:
+                filename = os.path.join(self.path, 'indexes', 'ns%d' % ns)
+                cls = NSIndex
             if os.path.exists(filename):
-                self.indexes[ns] = NSIndex(filename)
+                self.indexes[ns] = cls(filename)
             else:
-                self.indexes[ns] = NSIndex.create(filename)
+                self.indexes[ns] = cls.create(filename)
             return self.indexes[ns]
 
     def get(self, ns, id):
@@ -180,8 +188,9 @@ class Store(object):
         if not self.txn_active:
             self.begin_txn()
         band, offset = self.io.write(ns, id, data)
-        self.bands.setdefault(band, 0)
-        self.bands[band] += 1
+        bands = self.get_index('bands')
+        bands.setdefault(band, 0)
+        bands[band] += 1
         self.get_index(ns)[id] = band, offset
 
     def delete(self, ns, id):
@@ -189,7 +198,7 @@ class Store(object):
             self.begin_txn()
         try:
             band, offset = self.get_index(ns).pop(id)
-            self.bands[band] -= 1
+            self.get_index('bands')[band] -= 1
             self.compact.add(band)
         except KeyError:
             raise self.DoesNotExist
@@ -242,8 +251,12 @@ class BandIO(object):
             self.fds[band] = fd
             return fd
 
-    def delete_band(self, band):
-        os.unlink(self.band_filename(band))
+    def delete_band(self, band, missing_ok=False):
+        try:
+            os.unlink(self.band_filename(band))
+        except OSError, e:
+            if not missing_ok or e.errno != errno.ENOENT:
+                raise
 
     def read(self, band, offset):
         fd = self.get_fd(band)