Explorar el Código

New hashindex implementation.

Jonas Borgström hace 15 años
padre
commit
af614665ca
Se han modificado 6 ficheros con 426 adiciones y 241 borrados
  1. 243 0
      darc/_hashindex.c
  2. 27 0
      darc/hashindex.h
  3. 0 237
      darc/hashindex.py
  4. 149 0
      darc/hashindex.pyx
  5. 1 2
      darc/test.py
  6. 6 2
      setup.py

+ 243 - 0
darc/_hashindex.c

@@ -0,0 +1,243 @@
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/mman.h>
+
+#include "hashindex.h"
+
+typedef struct {
+    char magic[8];
+    int32_t num_entries;
+    int32_t num_buckets;
+    int8_t  key_size;
+    int8_t  value_size;
+} __attribute__((__packed__)) HashHeader;
+
+
+#define MAGIC "DARCHASH"
+#define EMPTY ((int32_t)-1)
+#define DELETED ((int32_t)-2)
+#define BUCKET_ADDR(index, idx) (index->buckets + (idx * index->bucket_size))
+
+#define BUCKET_IS_DELETED(index, idx) (*((int32_t *)(BUCKET_ADDR(index, idx) + index->key_size)) == DELETED)
+#define BUCKET_IS_EMPTY(index, idx) (*((int32_t *)(BUCKET_ADDR(index, idx) + index->key_size)) == EMPTY)
+
+#define BUCKET_MATCHES_KEY(index, idx, key) (memcmp(key, BUCKET_ADDR(index, idx), index->key_size) == 0)
+
+#define BUCKET_MARK_DELETED(index, idx) (*((int32_t *)(BUCKET_ADDR(index, idx) + index->key_size)) = DELETED)
+
+
+/* Private API */
+static int
+hashindex_index(HashIndex *index, const void *key)
+{
+    return *((uint32_t *)key) % index->num_buckets;
+}
+
+static int
+hashindex_lookup(HashIndex *index, const void *key)
+{
+    int didx = -1;
+    int idx = hashindex_index(index, key);
+    for(;;) {
+        while(BUCKET_IS_DELETED(index, idx)) {
+            if(didx == -1) {
+                didx = idx;
+            }
+            idx = (idx + 1) % index->num_buckets;
+        }
+        if(BUCKET_IS_EMPTY(index, idx))
+        {
+            return -1;
+        }
+        if(BUCKET_MATCHES_KEY(index, idx, key)) {
+            if (didx != -1) {
+                memcpy(BUCKET_ADDR(index, didx), BUCKET_ADDR(index, idx), index->bucket_size);
+                BUCKET_MARK_DELETED(index, idx);
+                idx = didx;
+            }
+            return idx;
+        }
+        idx = (idx + 1) % index->num_buckets;
+    }
+}
+
+static void
+hashindex_resize(HashIndex *index, int capacity)
+{
+    printf("Resizing => %d\n", capacity);
+    char *new_path = malloc(strlen(index->path) + 5);
+    strcpy(new_path, index->path);
+    strcat(new_path, ".tmp");
+    HashIndex *new = hashindex_create(new_path, capacity, index->key_size, index->value_size);
+    void *key = NULL;
+    while((key = hashindex_next_key(index, key))) {
+        hashindex_set(new, key, hashindex_get(index, key));
+    }
+    munmap(index->map_addr, index->map_length);
+    index->map_addr = new->map_addr;
+    index->map_length = new->map_length;
+    index->num_buckets = new->num_buckets;
+    index->limit = new->limit;
+    index->buckets = new->buckets;
+    unlink(index->path);
+    rename(new_path, index->path);
+    free(new_path);
+    free(new->path);
+    free(new);
+}
+
+/* Public API */
+HashIndex *
+hashindex_open(const char *path)
+{
+    int fd = open(path, O_RDWR);
+    if(fd < 0) {
+        printf("Failed to open %s\n", path);
+        return NULL;
+    }
+    off_t length = lseek(fd, 0, SEEK_END);
+    void *addr = mmap(0, length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+    close(fd);
+    if(addr == MAP_FAILED) {
+        printf("Failed to mmap %s", path);
+    }
+    HashHeader *header = (HashHeader *)addr;
+    HashIndex *index = malloc(sizeof(HashIndex));
+    index->path = malloc(strlen(path) + 1);
+    strcpy(index->path, path);
+    index->map_addr = addr;
+    index->map_length = length;
+    index->num_entries = header->num_entries;
+    index->num_buckets = header->num_buckets;
+    index->key_size = header->key_size;
+    index->value_size = header->value_size;
+    index->bucket_size = index->key_size + index->value_size;
+    index->buckets = (addr + sizeof(HashHeader));
+    index->limit = (int)(index->num_buckets * .75);
+    return index;
+}
+
+HashIndex *
+hashindex_create(const char *path, int capacity, int key_size, int value_size)
+{
+    FILE *fd;
+    int i;
+    if(!(fd = fopen(path, "w"))) {
+        printf("Failed to create %s\n", path);
+        return NULL;
+    }
+    HashHeader header;
+    memcpy(header.magic, MAGIC, sizeof(MAGIC) - 1);
+    header.num_entries = 0;
+    header.num_buckets = capacity;
+    header.key_size = key_size;
+    header.value_size = value_size;
+    int bucket_size = key_size + value_size;
+    char *bucket = calloc(bucket_size, 1);
+    if(fwrite(&header, 1, sizeof(header), fd) != sizeof(header))
+        goto error;
+    *((int32_t *)(bucket + key_size)) = EMPTY;
+    for(i = 0; i < capacity; i++) {
+        if(fwrite(bucket, 1, bucket_size, fd) != bucket_size)
+            goto error;
+    }
+    free(bucket);
+    fclose(fd);
+    return hashindex_open(path);
+error:
+    fclose(fd);
+    free(bucket);
+    return NULL;
+}
+
+void
+hashindex_flush(HashIndex *index)
+{
+    *((int32_t *)(index->map_addr + 8)) = index->num_entries;
+    *((int32_t *)(index->map_addr + 12)) = index->num_buckets;
+    msync(index->map_addr, index->map_length, MS_SYNC);
+}
+
+void
+hashindex_close(HashIndex *index)
+{
+    hashindex_flush(index);
+    munmap(index->map_addr, index->map_length);
+    free(index->path);
+    free(index);
+}
+
+const void *
+hashindex_get(HashIndex *index, const void *key)
+{
+    int idx = hashindex_lookup(index, key);
+    if(idx < 0) {
+        return NULL;
+    }
+    return BUCKET_ADDR(index, idx) + index->key_size;
+}
+
+void
+hashindex_set(HashIndex *index, const void *key, const void *value)
+{
+    int idx = hashindex_lookup(index, key);
+    if(idx < 0)
+    {
+        if(index->num_entries > index->limit) {
+            hashindex_resize(index, index->num_buckets * 2);
+        }
+        idx = hashindex_index(index, key);
+        while(!BUCKET_IS_EMPTY(index, idx) && !BUCKET_IS_DELETED(index, idx)) {
+            idx = (idx + 1) % index->num_buckets;
+        }
+        memcpy(BUCKET_ADDR(index, idx), key, index->key_size);
+        memcpy(BUCKET_ADDR(index, idx) + index->key_size, value, index->value_size);
+        index->num_entries += 1;
+    }
+    else
+    {
+        memcpy(BUCKET_ADDR(index, idx) + index->key_size, value, index->value_size);
+    }
+}
+
+void
+hashindex_delete(HashIndex *index, const void *key)
+{
+    int idx = hashindex_lookup(index, key);
+    if (idx < 0) {
+        return;
+    }
+    BUCKET_MARK_DELETED(index, idx);
+    index->num_entries -= 1;
+}
+
+void *
+hashindex_next_key(HashIndex *index, const void *key)
+{
+    int idx = 0;
+    if(key) {
+        idx = 1 + (key - index->buckets) / index->bucket_size;
+    }
+    if (idx == index->num_buckets)
+        return NULL;
+    while(BUCKET_IS_EMPTY(index, idx) || BUCKET_IS_DELETED(index, idx)) {
+        idx ++;
+        if (idx == index->num_buckets)
+            return NULL;
+    }
+    return BUCKET_ADDR(index, idx);
+}
+
+int
+hashindex_get_size(HashIndex *index)
+{
+    return index->num_entries;
+}
+

+ 27 - 0
darc/hashindex.h

@@ -0,0 +1,27 @@
+#ifndef __HASHINDEX_H__
+#define __HASHINDEX_H__
+
+typedef struct {
+    char *path;
+    void *map_addr;
+    off_t map_length;
+    void *buckets;
+    int num_entries;
+    int num_buckets;
+    int key_size;
+    int value_size;
+    int bucket_size;
+    int limit;
+} HashIndex;
+
+HashIndex *hashindex_open(const char *path);
+void hashindex_close(HashIndex *index);
+void hashindex_flush(HashIndex *index);
+HashIndex *hashindex_create(const char *path, int capacity, int key_size, int value_size);
+const void *hashindex_get(HashIndex *index, const void *key);
+void hashindex_set(HashIndex *index, const void *key, const void *value);
+void hashindex_delete(HashIndex *index, const void *key);
+void *hashindex_next_key(HashIndex *index, const void *key);
+int hashindex_get_size(HashIndex *index);
+
+#endif

+ 0 - 237
darc/hashindex.py

@@ -1,237 +0,0 @@
-import numpy
-import os
-import random
-import shutil
-import struct
-import tempfile
-import unittest
-from UserDict import DictMixin
-
-
-class HashIndexBase(DictMixin):
-    EMPTY, DELETED = -1, -2
-    FREE = (EMPTY, DELETED)
-
-    i_fmt    = struct.Struct('<i')
-    assert i_fmt.size == 4
-
-    def __init__(self, path):
-        self.path = path
-        self.fd = open(path, 'r+')
-        assert self.fd.read(len(self.MAGIC)) == self.MAGIC
-        self.num_entries = self.i_fmt.unpack(self.fd.read(4))[0]
-        self.buckets = numpy.memmap(self.fd, self.idx_type, offset=len(self.MAGIC) + 4)
-        self.limit = 3 * self.buckets.size / 4  # 75% fill rate
-
-    def flush(self):
-        self.fd.seek(len(self.MAGIC))
-        self.fd.write(self.i_fmt.pack(self.num_entries))
-        self.fd.flush()
-        self.buckets.flush()
-
-    @classmethod
-    def create(cls, path, capacity=1024):
-        with open(path, 'wb') as fd:
-            fd.write(cls.MAGIC + '\0\0\0\0')
-            a = numpy.zeros(capacity, cls.idx_type)
-            for i in xrange(capacity):
-                a[i][1] = cls.EMPTY
-            a.tofile(fd)
-        return cls(path)
-
-    def __contains__(self, key):
-        try:
-            self[key]
-            return True
-        except KeyError:
-            return False
-
-    def __delitem__(self, key):
-        self.buckets[self.lookup(key)][1] = self.DELETED
-        self.num_entries -= 1
-
-    def resize(self, capacity=0):
-        capacity = capacity or self.buckets.size * 2
-        if capacity < self.num_entries:
-            raise ValueError('Too small')
-        new = self.create(self.path + '.tmp', capacity)
-        for key, value in self.iteritems():
-            new[key] = value
-        new.flush()
-        os.unlink(self.path)
-        os.rename(self.path + '.tmp', self.path)
-        self.fd = new.fd
-        self.buckets = new.buckets
-        self.limit = 3 * self.buckets.size / 4
-
-
-class NSIndex(HashIndexBase):
-    MAGIC = 'NSINDEX'
-
-    idx_type = numpy.dtype('V32,<i4,<i4')
-    assert idx_type.itemsize == 40
-
-    def index(self, key):
-        hash = self.i_fmt.unpack(key[:4])[0]
-        return hash % self.buckets.size
-
-    def lookup(self, key):
-        didx = -1
-        idx = self.index(key)
-        while True:
-            while self.buckets[idx][1] == self.DELETED:
-                if didx == -1:
-                    didx = idx
-                idx = (idx + 1) % self.buckets.size
-            if self.buckets[idx][1] == self.EMPTY:
-                raise KeyError
-            if str(self.buckets[idx][0]) == key:
-                if didx != -1:
-                    self.buckets[didx] = self.buckets[idx]
-                    self.buckets[idx][1] = self.DELETED
-                    idx = didx
-                return idx
-            idx = (idx + 1) % self.buckets.size
-
-    def pop(self, key):
-        idx = self.lookup(key)
-        band = self.buckets[idx][1]
-        self.buckets[idx][1] = self.DELETED
-        self.num_entries -= 1
-        return band, self.buckets[idx][2]
-
-    def __getitem__(self, key):
-        idx = self.lookup(key)
-        return self.buckets[idx][1], self.buckets[idx][2]
-
-    def __setitem__(self, key, value):
-        if self.num_entries >= self.limit:
-            self.resize()
-        try:
-            idx = self.lookup(key)
-            self.buckets[idx][1], self.buckets[idx][2] = value
-            return
-        except KeyError:
-            idx = self.index(key)
-            while self.buckets[idx][1] not in self.FREE:
-                idx = (idx + 1) % self.buckets.size
-            self.buckets[idx][1], self.buckets[idx][2] = value
-            self.buckets[idx][0] = key
-            self.num_entries += 1
-
-    def iteritems(self, limit=0, marker=None):
-        n = 0
-        for idx in xrange(self.buckets.size):
-            if self.buckets[idx][1] in self.FREE:
-                continue
-            key = str(self.buckets[idx][0])
-            if marker and key != marker:
-                continue
-            elif marker:
-                marker = None
-            yield key, (self.buckets[idx][1], self.buckets[idx][2])
-            n += 1
-            if n == limit:
-                return
-
-
-class BandIndex(HashIndexBase):
-    MAGIC = 'BANDINDEX'
-    idx_type = numpy.dtype('<i4,<i2')
-    assert idx_type.itemsize == 6
-
-    def index(self, key):
-        return key % self.buckets.size
-
-    def lookup(self, key):
-        didx = -1
-        idx = self.index(key)
-        while True:
-            while self.buckets[idx][1] == self.DELETED:
-                if didx == -1:
-                    didx = idx
-                idx = (idx + 1) % self.buckets.size
-            if self.buckets[idx][1] == self.EMPTY:
-                raise KeyError
-            if self.buckets[idx][0] == key:
-                if didx != -1:
-                    self.buckets[didx] = self.buckets[idx]
-                    self.buckets[idx][1] = self.DELETED
-                    idx = didx
-                return idx
-            idx = (idx + 1) % self.buckets.size
-
-    def pop(self, key):
-        idx = self.lookup(key)
-        value = self.buckets[idx][1]
-        self.buckets[idx][1] = self.DELETED
-        self.num_entries -= 1
-        return value
-
-    def __getitem__(self, key):
-        idx = self.lookup(key)
-        return self.buckets[idx][1]
-
-    def __setitem__(self, key, value):
-        if self.num_entries >= self.limit:
-            self.resize()
-        try:
-            idx = self.lookup(key)
-            self.buckets[idx][1] = value
-            return
-        except KeyError:
-            idx = self.index(key)
-            while self.buckets[idx][1] not in self.FREE:
-                idx = (idx + 1) % self.buckets.size
-            self.buckets[idx][1] = value
-            self.buckets[idx][0] = key
-            self.num_entries += 1
-
-    def iteritems(self, limit=0, marker=None):
-        n = 0
-        for idx in xrange(self.buckets.size):
-            if self.buckets[idx][1] in self.FREE:
-                continue
-            key = self.buckets[idx][0]
-            if marker and key != marker:
-                continue
-            elif marker:
-                marker = None
-            yield key, self.buckets[idx][1]
-            n += 1
-            if n == limit:
-                return
-
-
-class HashIndexTestCase(unittest.TestCase):
-
-    def setUp(self):
-        self.tmppath = tempfile.mkdtemp()
-
-    def tearDown(self):
-        shutil.rmtree(self.tmppath)
-
-    def test_bandindex(self):
-        ref = {}
-        idx = BandIndex.create(os.path.join(self.tmppath, 'idx'), 16)
-        for x in range(1000):
-            band = random.randint(0, 100)
-            ref.setdefault(band, 0)
-            ref[band] += 1
-            idx.setdefault(band, 0)
-            idx[band] += 1
-        idx.flush()
-        idx2 = BandIndex(os.path.join(self.tmppath, 'idx'))
-        for key, value in ref.iteritems():
-            self.assertEqual(idx2[key], value)
-
-
-def suite():
-    return unittest.TestLoader().loadTestsFromTestCase(HashIndexTestCase)
-
-if __name__ == '__main__':
-    unittest.main()
-
-
-
-

+ 149 - 0
darc/hashindex.pyx

@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+
+cdef extern from "hashindex.h":
+    ctypedef struct HashIndex:
+        pass
+
+    HashIndex *hashindex_open(char *path)
+    HashIndex *hashindex_create(char *path, int capacity, int key_size, int value_size)
+    int hashindex_get_size(HashIndex *index)
+    void hashindex_close(HashIndex *index)
+    void hashindex_flush(HashIndex *index)
+    void *hashindex_get(HashIndex *index, void *key)
+    void *hashindex_next_key(HashIndex *index, void *key)
+    void hashindex_delete(HashIndex *index, void *key)
+    void hashindex_set(HashIndex *index, void *key, void *value)
+
+
+cdef class IndexBase:
+    cdef HashIndex *index
+
+    def __cinit__(self, path):
+        self.index = hashindex_open(path)
+        if not self.index:
+            raise Exception('Failed to open %s' % path)
+
+    def __dealloc__(self):
+        hashindex_close(self.index)
+
+    def flush(self):
+        hashindex_flush(self.index)
+
+    def setdefault(self, key, value):
+        if not key in self:
+            self[key] = value
+
+    def pop(self, key):
+        value = self[key]
+        del self[key]
+        return value
+
+    def __len__(self):
+        return hashindex_get_size(self.index)
+
+
+cdef class NSIndex(IndexBase):
+
+    @classmethod
+    def create(cls, path, capacity=16):
+        index = hashindex_create(path, capacity, 32, 8)
+        hashindex_close(index)
+        return cls(path)
+
+    def __getitem__(self, key):
+        assert len(key) == 32
+        data = <int *>hashindex_get(self.index, <char *>key)
+        if not data:
+            raise KeyError
+        return data[0], data[1]
+
+    def __delitem__(self, key):
+        assert len(key) == 32
+        hashindex_delete(self.index, <char *>key)
+
+    def __setitem__(self, key, value):
+        assert len(key) == 32
+        cdef int[2] data
+        data[0] = value[0]
+        data[1] = value[1]
+        hashindex_set(self.index, <char *>key, data)
+
+    def __contains__(self, key):
+        assert len(key) == 32
+        data = <int *>hashindex_get(self.index, <char *>key)
+        return data != NULL
+
+    def iteritems(self, marker=None, limit=0):
+        iter = NSKeyIterator()
+        iter.index = self.index
+        return iter
+
+
+cdef class NSKeyIterator:
+    cdef HashIndex *index
+    cdef char *key
+
+    def __cinit__(self):
+        self.key = NULL
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        self.key = <char *>hashindex_next_key(self.index, <char *>self.key)
+        if not self.key:
+            raise StopIteration
+        cdef int *value = <int *>(self.key + 32)
+        return self.key[:32], (value[0], value[1])
+
+
+cdef class BandIndex(IndexBase):
+
+    @classmethod
+    def create(cls, path, capacity=16):
+        index = hashindex_create(path, capacity, 4, 4)
+        hashindex_close(index)
+        return cls(path)
+
+    def __getitem__(self, key):
+        cdef int k = key
+        data = <int *>hashindex_get(self.index, &k)
+        if not data:
+            raise KeyError
+        return data[0]
+
+    def __delitem__(self, key):
+        cdef int k = key
+        hashindex_delete(self.index, &k)
+
+    def __setitem__(self, key, value):
+        cdef int k = key
+        cdef int v = value
+        hashindex_set(self.index, &k, &v)
+
+    def __contains__(self, key):
+        cdef int k = key
+        data = <int *>hashindex_get(self.index, &k)
+        return data != NULL
+
+    def iteritems(self, marker=None, limit=0):
+        iter = BandKeyIterator()
+        iter.index = self.index
+        return iter
+
+
+cdef class BandKeyIterator:
+    cdef HashIndex *index
+    cdef int *key
+
+    def __cinit__(self):
+        self.key = NULL
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        self.key = <int *>hashindex_next_key(self.index, <char *>self.key)
+        if not self.key:
+            raise StopIteration
+        return self.key[0], self.key[1]

+ 1 - 2
darc/test.py

@@ -11,7 +11,7 @@ from xattr import xattr, XATTR_NOFOLLOW
 import getpass
 getpass.getpass = lambda m: 'abc123'
 
-from . import store, helpers, lrucache, hashindex
+from . import store, helpers, lrucache
 from .archiver import Archiver
 
 
@@ -124,7 +124,6 @@ def suite():
     suite.addTest(store.suite())
     suite.addTest(doctest.DocTestSuite(helpers))
     suite.addTest(lrucache.suite())
-    suite.addTest(hashindex.suite())
     return suite
 
 if __name__ == '__main__':

+ 6 - 2
setup.py

@@ -2,8 +2,9 @@
 #!/usr/bin/env python
 import sys
 from setuptools import setup, Extension
+from Cython.Distutils import build_ext
 
-dependencies = ['pycrypto', 'msgpack-python', 'pbkdf2.py', 'xattr', 'paramiko', 'numpy']
+dependencies = ['pycrypto', 'msgpack-python', 'pbkdf2.py', 'xattr', 'paramiko']
 if sys.version_info < (2, 7):
     dependencies.append('argparse')
 
@@ -13,7 +14,10 @@ setup(name='darc',
       author=u'Jonas Borgström',
       author_email='jonas@borgstrom.se',
       packages=['darc'],
-      ext_modules=[Extension('darc._speedups', ['darc/_speedups.c'])],
+      cmdclass = {'build_ext': build_ext},
+      ext_modules=[
+      Extension('darc._speedups', ['darc/_speedups.c']),
+                   Extension('darc.hashindex', ['darc/hashindex.pyx', 'darc/_hashindex.c'])],
       install_requires=dependencies,
       entry_points = {
         'console_scripts': [