Jelajahi Sumber

New "bands" store index

Jonas Borgström 14 tahun lalu
induk
melakukan
f9e63e302a
3 mengubah file dengan 259 tambahan dan 133 penghapusan
  1. 237 0
      darc/hashindex.py
  2. 20 132
      darc/store.py
  3. 2 1
      darc/test.py

+ 237 - 0
darc/hashindex.py

@@ -0,0 +1,237 @@
+import numpy
+import os
+import random
+import shutil
+import struct
+import tempfile
+import unittest
+from UserDict import DictMixin
+
+
+class HashIndexBase(DictMixin):
+    EMPTY, DELETED = -1, -2
+    FREE = (EMPTY, DELETED)
+
+    i_fmt    = struct.Struct('<i')
+    assert i_fmt.size == 4
+
+    def __init__(self, path):
+        self.path = path
+        self.fd = open(path, 'r+')
+        assert self.fd.read(len(self.MAGIC)) == self.MAGIC
+        self.num_entries = self.i_fmt.unpack(self.fd.read(4))[0]
+        self.buckets = numpy.memmap(self.fd, self.idx_type, offset=len(self.MAGIC) + 4)
+        self.limit = 3 * self.buckets.size / 4  # 75% fill rate
+
+    def flush(self):
+        self.fd.seek(len(self.MAGIC))
+        self.fd.write(self.i_fmt.pack(self.num_entries))
+        self.fd.flush()
+        self.buckets.flush()
+
+    @classmethod
+    def create(cls, path, capacity=1024):
+        with open(path, 'wb') as fd:
+            fd.write(cls.MAGIC + '\0\0\0\0')
+            a = numpy.zeros(capacity, cls.idx_type)
+            for i in xrange(capacity):
+                a[i][1] = cls.EMPTY
+            a.tofile(fd)
+        return cls(path)
+
+    def __contains__(self, key):
+        try:
+            self[key]
+            return True
+        except KeyError:
+            return False
+
+    def __delitem__(self, key):
+        self.buckets[self.lookup(key)][1] = self.DELETED
+        self.num_entries -= 1
+
+    def resize(self, capacity=0):
+        capacity = capacity or self.buckets.size * 2
+        if capacity < self.num_entries:
+            raise ValueError('Too small')
+        new = self.create(self.path + '.tmp', capacity)
+        for key, value in self.iteritems():
+            new[key] = value
+        new.flush()
+        os.unlink(self.path)
+        os.rename(self.path + '.tmp', self.path)
+        self.fd = new.fd
+        self.buckets = new.buckets
+        self.limit = 3 * self.buckets.size / 4
+
+
+class NSIndex(HashIndexBase):
+    MAGIC = 'NSINDEX'
+
+    idx_type = numpy.dtype('V32,<i4,<i4')
+    assert idx_type.itemsize == 40
+
+    def index(self, key):
+        hash = self.i_fmt.unpack(key[:4])[0]
+        return hash % self.buckets.size
+
+    def lookup(self, key):
+        didx = -1
+        idx = self.index(key)
+        while True:
+            while self.buckets[idx][1] == self.DELETED:
+                if didx == -1:
+                    didx = idx
+                idx = (idx + 1) % self.buckets.size
+            if self.buckets[idx][1] == self.EMPTY:
+                raise KeyError
+            if str(self.buckets[idx][0]) == key:
+                if didx != -1:
+                    self.buckets[didx] = self.buckets[idx]
+                    self.buckets[idx][1] = self.DELETED
+                    idx = didx
+                return idx
+            idx = (idx + 1) % self.buckets.size
+
+    def pop(self, key):
+        idx = self.lookup(key)
+        band = self.buckets[idx][1]
+        self.buckets[idx][1] = self.DELETED
+        self.num_entries -= 1
+        return band, self.buckets[idx][2]
+
+    def __getitem__(self, key):
+        idx = self.lookup(key)
+        return self.buckets[idx][1], self.buckets[idx][2]
+
+    def __setitem__(self, key, value):
+        if self.num_entries >= self.limit:
+            self.resize()
+        try:
+            idx = self.lookup(key)
+            self.buckets[idx][1], self.buckets[idx][2] = value
+            return
+        except KeyError:
+            idx = self.index(key)
+            while self.buckets[idx][1] not in self.FREE:
+                idx = (idx + 1) % self.buckets.size
+            self.buckets[idx][1], self.buckets[idx][2] = value
+            self.buckets[idx][0] = key
+            self.num_entries += 1
+
+    def iteritems(self, limit=0, marker=None):
+        n = 0
+        for idx in xrange(self.buckets.size):
+            if self.buckets[idx][1] in self.FREE:
+                continue
+            key = str(self.buckets[idx][0])
+            if marker and key != marker:
+                continue
+            elif marker:
+                marker = None
+            yield key, (self.buckets[idx][1], self.buckets[idx][2])
+            n += 1
+            if n == limit:
+                return
+
+
+class BandIndex(HashIndexBase):
+    MAGIC = 'BANDINDEX'
+    idx_type = numpy.dtype('<i4,<i2')
+    assert idx_type.itemsize == 6
+
+    def index(self, key):
+        return key % self.buckets.size
+
+    def lookup(self, key):
+        didx = -1
+        idx = self.index(key)
+        while True:
+            while self.buckets[idx][1] == self.DELETED:
+                if didx == -1:
+                    didx = idx
+                idx = (idx + 1) % self.buckets.size
+            if self.buckets[idx][1] == self.EMPTY:
+                raise KeyError
+            if self.buckets[idx][0] == key:
+                if didx != -1:
+                    self.buckets[didx] = self.buckets[idx]
+                    self.buckets[idx][1] = self.DELETED
+                    idx = didx
+                return idx
+            idx = (idx + 1) % self.buckets.size
+
+    def pop(self, key):
+        idx = self.lookup(key)
+        value = self.buckets[idx][1]
+        self.buckets[idx][1] = self.DELETED
+        self.num_entries -= 1
+        return value
+
+    def __getitem__(self, key):
+        idx = self.lookup(key)
+        return self.buckets[idx][1]
+
+    def __setitem__(self, key, value):
+        if self.num_entries >= self.limit:
+            self.resize()
+        try:
+            idx = self.lookup(key)
+            self.buckets[idx][1] = value
+            return
+        except KeyError:
+            idx = self.index(key)
+            while self.buckets[idx][1] not in self.FREE:
+                idx = (idx + 1) % self.buckets.size
+            self.buckets[idx][1] = value
+            self.buckets[idx][0] = key
+            self.num_entries += 1
+
+    def iteritems(self, limit=0, marker=None):
+        n = 0
+        for idx in xrange(self.buckets.size):
+            if self.buckets[idx][1] in self.FREE:
+                continue
+            key = self.buckets[idx][0]
+            if marker and key != marker:
+                continue
+            elif marker:
+                marker = None
+            yield key, self.buckets[idx][1]
+            n += 1
+            if n == limit:
+                return
+
+
+class HashIndexTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.tmppath = tempfile.mkdtemp()
+
+    def tearDown(self):
+        shutil.rmtree(self.tmppath)
+
+    def test_bandindex(self):
+        ref = {}
+        idx = BandIndex.create(os.path.join(self.tmppath, 'idx'), 16)
+        for x in range(1000):
+            band = random.randint(0, 100)
+            ref.setdefault(band, 0)
+            ref[band] += 1
+            idx.setdefault(band, 0)
+            idx[band] += 1
+        idx.flush()
+        idx2 = BandIndex(os.path.join(self.tmppath, 'idx'))
+        for key, value in ref.iteritems():
+            self.assertEqual(idx2[key], value)
+
+
+def suite():
+    return unittest.TestLoader().loadTestsFromTestCase(HashIndexTestCase)
+
+if __name__ == '__main__':
+    unittest.main()
+
+
+
+

+ 20 - 132
darc/store.py

@@ -1,14 +1,13 @@
 from ConfigParser import RawConfigParser
 import fcntl
-import numpy
 import os
 import shutil
 import struct
 import tempfile
 import unittest
-from UserDict import DictMixin
 
 from .lrucache import LRUCache
+from .hashindex import NSIndex, BandIndex
 
 
 class Store(object):
@@ -20,7 +19,7 @@ class Store(object):
     dir/bands/<X / BANDS_PER_DIR>/<X>
     dir/indexes/<NS>
     """
-    DEFAULT_MAX_BAND_SIZE = 10 * 1024 * 1024
+    DEFAULT_MAX_BAND_SIZE = 5 * 1024 * 1024
     DEFAULT_BANDS_PER_DIR = 10000
 
     class DoesNotExist(KeyError):
@@ -44,6 +43,7 @@ class Store(object):
             fd.write('This is a DARC store')
         os.mkdir(os.path.join(path, 'bands'))
         os.mkdir(os.path.join(path, 'indexes'))
+        BandIndex.create(os.path.join(path, 'indexes', 'bands'))
         config = RawConfigParser()
         config.add_section('store')
         config.set('store', 'version', '1')
@@ -85,6 +85,7 @@ class Store(object):
                   os.path.join(self.path, 'txn.active'))
         self.compact = set()
         self.txn_active = True
+        self.bands = BandIndex(os.path.join(self.path, 'indexes', 'bands'))
 
     def close(self):
         self.rollback()
@@ -102,6 +103,7 @@ class Store(object):
             self.config.write(fd)
         for i in self.indexes.values():
             i.flush()
+        self.bands.flush()
         os.rename(os.path.join(self.path, 'txn.active'),
                   os.path.join(self.path, 'txn.tmp'))
         shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
@@ -115,9 +117,16 @@ class Store(object):
         def lookup(ns, key):
             return key in self.indexes[ns]
         for band in self.compact:
-            for ns, key, data in self.io.iter_objects(band, lookup):
-                self.indexes[ns][key] = self.io.write(ns, key, data)
+            if self.bands[band] > 0:
+                for ns, key, data in self.io.iter_objects(band, lookup):
+                    new_band, offset = self.io.write(ns, key, data)
+                    self.indexes[ns][key] = new_band, offset
+                    self.bands[band] -= 1
+                    self.bands.setdefault(new_band, 0)
+                    self.bands[new_band] += 1
+
         for band in self.compact:
+            assert self.bands.pop(band) == 0
             self.io.delete_band(band)
 
     def rollback(self):
@@ -141,11 +150,11 @@ class Store(object):
         try:
             return self.indexes[ns]
         except KeyError:
-            filename = os.path.join(self.path, 'indexes', str(ns))
+            filename = os.path.join(self.path, 'indexes', 'ns%d' % ns)
             if os.path.exists(filename):
-                self.indexes[ns] = HashIndex(filename)
+                self.indexes[ns] = NSIndex(filename)
             else:
-                self.indexes[ns] = HashIndex.create(filename)
+                self.indexes[ns] = NSIndex.create(filename)
             return self.indexes[ns]
 
     def get(self, ns, id):
@@ -159,6 +168,8 @@ class Store(object):
         if not self.txn_active:
             self.begin_txn()
         band, offset = self.io.write(ns, id, data)
+        self.bands.setdefault(band, 0)
+        self.bands[band] += 1
         self.get_index(ns)[id] = band, offset
 
     def delete(self, ns, id):
@@ -166,6 +177,7 @@ class Store(object):
             self.begin_txn()
         try:
             band, offset = self.get_index(ns).pop(id)
+            self.bands[band] -= 1
             self.compact.add(band)
         except KeyError:
             raise self.DoesNotExist
@@ -175,130 +187,6 @@ class Store(object):
                 self.get_index(ns).iteritems(marker=marker, limit=limit)]
 
 
-class HashIndex(DictMixin):
-    """Hash Table with open addressing and lazy deletes
-    """
-    EMPTY, DELETED = -1, -2
-    FREE = (EMPTY, DELETED)
-
-    i_fmt    = struct.Struct('<i')
-    assert i_fmt.size == 4
-    idx_type = numpy.dtype('V32,<i,<i')
-    assert idx_type.itemsize == 40
-
-    def __init__(self, path):
-        self.path = path
-        self.fd = open(path, 'r+')
-        assert self.fd.read(8) == 'DARCHASH'
-        self.num_entries = self.i_fmt.unpack(self.fd.read(4))[0]
-        self.buckets = numpy.memmap(self.fd, self.idx_type, offset=12)
-        self.limit = 3 * self.buckets.size / 4  # 75% fill rate
-
-    def flush(self):
-        self.fd.seek(8)
-        self.fd.write(self.i_fmt.pack(self.num_entries))
-        self.fd.flush()
-        self.buckets.flush()
-
-    @classmethod
-    def create(cls, path, capacity=1024):
-        with open(path, 'wb') as fd:
-            fd.write('DARCHASH\0\0\0\0')
-            a = numpy.zeros(capacity, cls.idx_type)
-            for i in xrange(capacity):
-                a[i][1] = cls.EMPTY
-            a.tofile(fd)
-        return cls(path)
-
-    def index(self, key):
-        hash = self.i_fmt.unpack(key[:4])[0]
-        return hash % self.buckets.size
-
-    def lookup(self, key):
-        didx = -1
-        idx = self.index(key)
-        while True:
-            while self.buckets[idx][1] == self.DELETED:
-                if didx == -1:
-                    didx = idx
-                idx = (idx + 1) % self.buckets.size
-            if self.buckets[idx][1] == self.EMPTY:
-                raise KeyError
-            if str(self.buckets[idx][0]) == key:
-                if didx != -1:
-                    self.buckets[didx] = self.buckets[idx]
-                    self.buckets[idx][1] = self.DELETED
-                    idx = didx
-                return idx
-            idx = (idx + 1) % self.buckets.size
-
-    def __contains__(self, key):
-        try:
-            self[key]
-            return True
-        except KeyError:
-            return False
-
-    def pop(self, key):
-        idx = self.lookup(key)
-        band = self.buckets[idx][1]
-        self.buckets[idx][1] = self.DELETED
-        self.num_entries -= 1
-        return band, self.buckets[idx][2]
-
-    def __getitem__(self, key):
-        idx = self.lookup(key)
-        return self.buckets[idx][1], self.buckets[idx][2]
-
-    def __delitem__(self, key):
-        self.buckets[self.lookup(key)][1] = self.DELETED
-        self.num_entries -= 1
-
-    def __setitem__(self, key, value):
-        if self.num_entries >= self.limit:
-            self.resize()
-        try:
-            idx = self.lookup(key)
-            self.buckets[idx][1], self.buckets[idx][2] = value
-            return
-        except KeyError:
-            idx = self.index(key)
-            while self.buckets[idx][1] not in self.FREE:
-                idx = (idx + 1) % self.buckets.size
-            self.buckets[idx][1], self.buckets[idx][2] = value
-            self.buckets[idx][0] = key
-            self.num_entries += 1
-
-    def iteritems(self, limit=0, marker=None):
-        n = 0
-        for idx in xrange(self.buckets.size):
-            if self.buckets[idx][1] in self.FREE:
-                continue
-            key = str(self.buckets[idx][0])
-            if marker and key != marker:
-                continue
-            elif marker:
-                marker = None
-            yield key, (self.buckets[idx][1], self.buckets[idx][2])
-            n += 1
-            if n == limit:
-                return
-
-    def resize(self, capacity=0):
-        capacity = capacity or self.buckets.size * 2
-        if capacity < self.num_entries:
-            raise ValueError('HashIndex full')
-        new = HashIndex.create(self.path + '.tmp', capacity)
-        for key, value in self.iteritems():
-            new[key] = value
-        new.flush()
-        os.unlink(self.path)
-        os.rename(self.path + '.tmp', self.path)
-        self.fd = new.fd
-        self.buckets = new.buckets
-        self.limit = 3 * self.buckets.size / 4
-
-
 class BandIO(object):
 
     header_fmt = struct.Struct('<iBB32s')

+ 2 - 1
darc/test.py

@@ -11,7 +11,7 @@ from xattr import xattr, XATTR_NOFOLLOW
 import getpass
 getpass.getpass = lambda m: 'abc123'
 
-from . import store, helpers, lrucache
+from . import store, helpers, lrucache, hashindex
 from .archiver import Archiver
 
 
@@ -124,6 +124,7 @@ def suite():
     suite.addTest(store.suite())
     suite.addTest(doctest.DocTestSuite(helpers))
     suite.addTest(lrucache.suite())
+    suite.addTest(hashindex.suite())
     return suite
 
 if __name__ == '__main__':