Explorar o código

Use smarter chunkifier algorithm (same as git and rsyncable gzip)

Jonas Borgström %!s(int64=14) %!d(string=hai) anos
pai
achega
2eea138c92
Modificáronse 4 ficheiros con 134 adicións e 178 borrados
  1. 53 72
      dedupestore/_speedups.c
  2. 6 6
      dedupestore/archiver.py
  3. 22 24
      dedupestore/cache.py
  4. 53 76
      dedupestore/chunkifier.py

+ 53 - 72
dedupestore/_speedups.c

@@ -1,5 +1,5 @@
-#include <Python/Python.h>
-#include <Python/structmember.h>
+#include <Python.h>
+#include <structmember.h>
 
 static unsigned long int
 checksum(const unsigned char *data, int len, unsigned long int sum)
@@ -28,15 +28,23 @@ roll_checksum(unsigned long int sum, unsigned char remove, unsigned char add, in
 
 typedef struct {
     PyObject_HEAD
-    int chunk_size, i, full_sum, done, buf_size, data_len;
-    PyObject *chunks, *fd, *extra;
-    unsigned long sum;
+    int chunk_size, window_size, i, last, eof, done, buf_size, data_len, initial;
+    PyObject *chunks, *fd;
+    unsigned long int sum;
     unsigned char *data, add, remove;
 } ChunkifyIter;
 
 static PyObject*
 ChunkifyIter_iter(PyObject *self)
 {
+    ChunkifyIter *c = (ChunkifyIter *)self;
+    c->data_len = 0;
+    c->done = 0;
+    c->eof = 0;
+    c->i = 0;
+    c->sum = 0;
+    c->last = -1;
+    c->initial = c->window_size;
     Py_INCREF(self);
     return self;
 }
@@ -46,7 +54,6 @@ ChunkifyIter_dealloc(PyObject *self)
 {
     ChunkifyIter *c = (ChunkifyIter *)self;
     Py_DECREF(c->fd);
-    Py_XDECREF(c->chunks);
     free(c->data);
     self->ob_type->tp_free(self);
 }
@@ -55,28 +62,25 @@ static PyObject*
 ChunkifyIter_iternext(PyObject *self)
 {
     ChunkifyIter *c = (ChunkifyIter *)self;
-    PyObject *pysum;
-    int o = 0;
     if(c->done)
     {
         PyErr_SetNone(PyExc_StopIteration);
         return NULL;
     }
-    if(c->extra)
-    {
-        c->done = 1;
-	Py_INCREF(c->extra);
-        return c->extra;
-    }
     for(;;)
     {
-        if(c->i > c->buf_size - c->chunk_size)
+        if(c->i == c->buf_size)
         {
-            memmove(c->data, c->data + c->i - o, c->data_len - c->i + o);
-            c->data_len -= c->i - o;
-            c->i = o;
+            int diff = c->last + 1 - c->window_size;
+            memmove(c->data, c->data + diff, c->buf_size - diff);
+            c->i -= diff;
+            c->last -= diff;
+            c->data_len -= diff;
+            assert(c->i >= 0);
+            assert(c->last >= -1);
+            assert(c->data_len >= 0);
         }
-        if(c->data_len - c->i < c->chunk_size)
+        if(c->i == c->data_len)
         {
             PyObject *data = PyObject_CallMethod(c->fd, "read", "i", c->buf_size - c->data_len);
             int n = PyString_Size(data);
@@ -86,59 +90,42 @@ ChunkifyIter_iternext(PyObject *self)
         }
         if(c->i == c->data_len)
         {
-            PyErr_SetNone(PyExc_StopIteration);
-            return NULL;
-        }
-        if(c->data_len - c->i < c->chunk_size) /* EOF ? */
-        {
-            if(o == 1)
-            {
+            if(c->last < c->i - 1) {
                 c->done = 1;
-                return PyString_FromStringAndSize((char *)(c->data + c->i - 1), c->data_len - c->i + 1);
-            }
-            else if(o > 1)
-            {
-                c->extra = PyString_FromStringAndSize((char *)(c->data + c->i - 1), c->chunk_size);
-                return PyString_FromStringAndSize((char *)(c->data + c->i - o), o - 1);
-            }
-            else
-            {
-                c->done = 1;
-                return PyString_FromStringAndSize((char *)(c->data + c->i), c->data_len - c->i);
+                return PyString_FromStringAndSize((char *)(c->data + c->last + 1),
+                                                  c->data_len - c->last - 1);
             }
+            PyErr_SetNone(PyExc_StopIteration);
+            return NULL;
         }
-        if(o == c->chunk_size)
+        if(c->initial)
         {
-            return PyString_FromStringAndSize((char *)(c->data + c->i - c->chunk_size), c->chunk_size);
+            c->initial--;
+            c->sum = checksum(c->data + c->i, 1, c->sum);
         }
-        if(c->full_sum || c->i + c->chunk_size > c->data_len)
+        else
         {
-            c->full_sum = 0;
-            c->sum = checksum(c->data + c->i, c->chunk_size, 0);
+            c->sum = roll_checksum(c->sum,
+                                   c->data[c->i - c->window_size],
+                                   c->data[c->i],
+                                   c->window_size);
         }
-        else
+        c->i++;
+        if(c->i == c->buf_size && c->last == c->window_size - 1)
         {
-            c->sum = roll_checksum(c->sum, c->remove, c->data[c->i + c->chunk_size - 1], c->chunk_size);
+            int old_last = c->last;
+            c->last = c->i - 1;
+            printf("Max chunk size reached %d\n", c->last - old_last);
+            return PyString_FromStringAndSize((char *)(c->data + old_last + 1),
+                                              c->last - old_last);
         }
-        c->remove = c->data[c->i];
-        pysum = PyInt_FromLong(c->sum);
-        if(PySequence_Contains(c->chunks, pysum) == 1)
+        else if((c->sum % c->chunk_size) == 0)
         {
-            Py_DECREF(pysum);
-            c->full_sum = 1;
-            if(o > 0)
-            {
-                return PyString_FromStringAndSize((char *)(c->data + c->i - o), o);
-            }
-            else
-            {
-                c->i += c->chunk_size;
-                return PyString_FromStringAndSize((char *)(c->data + c->i - c->chunk_size), c->chunk_size);
-            }
+            int old_last = c->last;
+            c->last = c->i - 1;
+            return PyString_FromStringAndSize((char *)(c->data + old_last + 1),
+                                              c->last - old_last);
         }
-        Py_DECREF(pysum);
-        o++;
-        c->i++;
     }
     PyErr_SetNone(PyExc_StopIteration);
     return NULL;
@@ -180,11 +167,11 @@ static PyTypeObject ChunkifyIterType = {
 static PyObject *
 chunkify(PyObject *self, PyObject *args)
 {
-    PyObject *fd, *chunks;
-    long int chunk_size;
+    PyObject *fd;
+    long int chunk_size, window_size;
     ChunkifyIter *c;
 
-    if (!PyArg_ParseTuple(args, "OiO", &fd, &chunk_size, &chunks))
+    if (!PyArg_ParseTuple(args, "Oii", &fd, &chunk_size, &window_size))
     {
         return NULL;
     }
@@ -193,18 +180,12 @@ chunkify(PyObject *self, PyObject *args)
         return NULL;
     }
     PyObject_Init((PyObject *)c, &ChunkifyIterType);
-    c->buf_size = chunk_size * 10;
+    c->buf_size = 10 * 1024 * 1024;
     c->data = malloc(c->buf_size);
-    c->data_len = 0;
-    c->i = 0;
-    c->full_sum = 1;
-    c->done = 0;
-    c->extra = NULL;
     c->fd = fd;
     c->chunk_size = chunk_size;
-    c->chunks = chunks;
+    c->window_size = window_size;
     Py_INCREF(fd);
-    Py_INCREF(chunks);
     return (PyObject *)c;
 }
 

+ 6 - 6
dedupestore/archiver.py

@@ -9,7 +9,7 @@ from cache import Cache, NS_ARCHIVES, NS_CHUNKS
 #from sqlitestore import SqliteStore
 from bandstore import BandStore
 
-CHUNK_SIZE = 256 * 1024
+CHUNK_SIZE = 55001
 
 
 class Archive(object):
@@ -22,12 +22,12 @@ class Archive(object):
         if name:
             self.open(name)
 
-    def add_chunk(self, id, sum, csize, osize):
+    def add_chunk(self, id, csize, osize):
         try:
             return self.chunk_idx[id]
         except KeyError:
             idx = len(self.chunks)
-            self.chunks.append((id, sum, csize, osize))
+            self.chunks.append((id, csize, osize))
             self.chunk_idx[id] = idx
             return idx
 
@@ -36,7 +36,7 @@ class Archive(object):
         self.items = archive['items']
         self.name = archive['name']
         self.chunks = archive['chunks']
-        for i, (id, sum, csize, osize) in enumerate(archive['chunks']):
+        for i, (id, csize, osize) in enumerate(archive['chunks']):
             self.chunk_idx[i] = id
 
     def save(self, name):
@@ -57,7 +57,7 @@ class Archive(object):
                     chunk_count.setdefault(id, 0)
                     chunk_count[id] += 1
         for id, c in chunk_count.items():
-            count, sum, csize, osize = cache.chunkmap[id]
+            count, csize, osize = cache.chunkmap[id]
             total_csize += csize
             if  c == count:
                 total_usize += csize
@@ -134,7 +134,7 @@ class Archive(object):
             path = path.lstrip('/\\:')
             chunks = []
             size = 0
-            for chunk in chunkify(fd, CHUNK_SIZE, cache.summap):
+            for chunk in chunkify(fd, CHUNK_SIZE, 30):
                 size += len(chunk)
                 chunks.append(self.add_chunk(*cache.add_chunk(chunk)))
         return {'type': 'FILE', 'path': path, 'chunks': chunks, 'size': size}

+ 22 - 24
dedupestore/cache.py

@@ -4,8 +4,6 @@ import os
 import sys
 import zlib
 
-from chunkifier import checksum
-
 NS_ARCHIVES = 'ARCHIVES'
 NS_CHUNKS = 'CHUNKS'
 
@@ -16,10 +14,12 @@ class Cache(object):
 
     def __init__(self, store):
         self.store = store
-        self.path = os.path.join(os.path.expanduser('~'), '.dedupestore', 'cache', 
+        self.path = os.path.join(os.path.expanduser('~'), '.dedupestore', 'cache',
                                  '%s.cache' % self.store.uuid)
         self.tid = -1
         self.open()
+        self.total = 0
+        self.max = 0
         if self.tid != self.store.tid:
             self.init()
 
@@ -32,7 +32,6 @@ class Cache(object):
             print >> sys.stderr, 'Cache UUID mismatch'
             return
         self.chunkmap = data['chunkmap']
-        self.summap = data['summap']
         self.archives = data['archives']
         self.tid = data['tid']
         print 'done'
@@ -56,14 +55,14 @@ class Cache(object):
                 if self.seen_chunk(id):
                     self.chunk_incref(id)
                 else:
-                    self.init_chunk(id, sum, csize, osize)
+                    self.init_chunk(id, csize, osize)
         print 'done'
 
     def save(self):
         assert self.store.state == self.store.OPEN
         print 'saving cache'
         data = {'uuid': self.store.uuid,
-                'chunkmap': self.chunkmap, 'summap': self.summap,
+                'chunkmap': self.chunkmap,
                 'tid': self.store.tid, 'archives': self.archives}
         print 'Saving cache as:', self.path
         cachedir = os.path.dirname(self.path)
@@ -74,41 +73,40 @@ class Cache(object):
         print 'done'
 
     def add_chunk(self, data):
-        sum = checksum(data)
         osize = len(data)
         data = zlib.compress(data)
         id = hashlib.sha1(data).digest()
+        self.total += 1
+        if osize == 55001* 4:
+            self.max += 1
+            print 'rate = %.2f' % (100.*self.max/self.total)
         if self.seen_chunk(id):
+            print 'yay %d bytes' % osize
             return self.chunk_incref(id)
         csize = len(data)
         self.store.put(NS_CHUNKS, id, data)
-        return self.init_chunk(id, sum, csize, osize)
+        return self.init_chunk(id, csize, osize)
 
-    def init_chunk(self, id, sum, csize, osize):
-        self.chunkmap[id] = (1, sum, csize, osize)
-        self.summap[sum] = self.summap.get(sum, 0) + 1
-        return id, sum, csize, osize
+    def init_chunk(self, id, csize, osize):
+        self.chunkmap[id] = (1, csize, osize)
+        return id, csize, osize
 
     def seen_chunk(self, id):
-        count, sum, csize, osize = self.chunkmap.get(id, (0, 0, 0, 0))
+        count, csize, osize = self.chunkmap.get(id, (0, 0, 0))
         return count
 
     def chunk_incref(self, id):
-        count, sum, csize, osize = self.chunkmap[id]
-        self.chunkmap[id] = (count + 1, sum, csize, osize)
-        self.summap[sum] += 1
-        return id, sum, csize, osize
+        count, csize, osize = self.chunkmap[id]
+        self.chunkmap[id] = (count + 1, csize, osize)
+        return id, csize, osize
 
     def chunk_decref(self, id):
-        count, sum, csize, osize = self.chunkmap[id]
-        sumcount = self.summap[sum]
-        if sumcount == 1:
-            del self.summap[sum]
-        else:
-            self.summap[sum] = sumcount - 1
+        count, csize, osize = self.chunkmap[id]
         if count == 1:
             del self.chunkmap[id]
             print 'deleting chunk: ', id.encode('hex')
             self.store.delete(NS_CHUNKS, id)
         else:
-            self.chunkmap[id] = (count - 1, sum, csize, osize)
+            self.chunkmap[id] = (count - 1, csize, osize)
+
+

+ 53 - 76
dedupestore/chunkifier.py

@@ -30,103 +30,79 @@ def roll_checksum(sum, remove, add, len):
 
 class ChunkifyIter(object):
 
-    def __init__(self, fd, chunk_size, chunks):
+    def __init__(self, fd, chunk_size, window_size):
         self.fd = fd
         self.chunk_size = chunk_size
-        self.chunks = chunks
+        self.window_size = window_size
+        self.buf_size = self.chunk_size * 10
 
     def __iter__(self):
         self.data = ''
-        self.i = 0
-        self.full_sum = True
-        self.extra = None
         self.done = False
-        self.buf_size = self.chunk_size * 10
+        self.i = 0
+        self.sum = 0
+        self.last = -1
+        self.initial = self.window_size
         return self
 
     def next(self):
-        o = 0
         if self.done:
             raise StopIteration
-        if self.extra:
-            self.done = True
-            return self.extra
         while True:
-            if self.i >  self.buf_size - self.chunk_size:
-                self.data = self.data[self.i - o:]
-                self.i = o
-            if len(self.data) - self.i < self.chunk_size:
+            if self.i == self.buf_size:
+                diff = self.last + 1 - self.window_size
+                if diff < 0:
+                    import ipdb
+                    ipdb.set_trace()
+                self.data = self.data[diff:]
+                self.last -= diff
+                self.i -= diff
+            if self.i == len(self.data):
                 self.data += self.fd.read(self.buf_size - len(self.data))
-            if len(self.data) == self.i:
-                raise StopIteration
-            if len(self.data) - self.i < self.chunk_size:  # EOF?
-                if o == 1:
-                    self.done = True
-                    return self.data[self.i - 1:]
-                elif o > 1:
-                    self.extra = self.data[-self.chunk_size:]
-                    return self.data[-self.chunk_size - o + 1:-self.chunk_size]
-                else:
+            if self.i == len(self.data):
+                if self.last < self.i - 1:
                     self.done = True
-                    return self.data[self.i:]
-            elif o == self.chunk_size:
-                return self.data[self.i-self.chunk_size:self.i]
-            if self.full_sum or len(self.data) - self.i < self.chunk_size:
-                self.sum = checksum(self.data[self.i:self.i + self.chunk_size])
-                self.full_sum = False
-                self.remove = self.data[self.i]
-            else:
-                self.sum = roll_checksum(self.sum, self.remove, self.data[self.i + self.chunk_size - 1], 
-                                         self.chunk_size)
-                self.remove = self.data[self.i]
-            if self.sum in self.chunks:
-                if o > 0:
-                    chunk = self.data[self.i - o:self.i]
-                else:
-                    chunk = self.data[self.i:self.i + self.chunk_size]
-                    self.i += self.chunk_size
-                self.full_sum = True
-                return chunk
+                    return self.data[self.last + 1:]
+                raise StopIteration
+            if self.initial:
+                self.initial -= 1
+                self.sum = checksum(self.data[self.i], self.sum)
             else:
-                self.i += 1
-                o += 1
+                self.sum = roll_checksum(self.sum,
+                                         self.data[self.i - self.window_size],
+                                         self.data[self.i],
+                                         self.window_size)
+            self.i += 1
+            if self.i == self.buf_size and self.last == self.window_size - 1:
+                old_last = self.last
+                self.last = self.i - 1
+                return self.data[old_last + 1:self.last + 1]
+            elif self.sum % self.chunk_size == 0:
+                old_last = self.last
+                self.last = self.i - 1
+                return self.data[old_last + 1:self.last + 1]
 
 
 def chunkify(fd, chunk_size, chunks):
     """
-    >>> list(chunkify(StringIO.StringIO('A'), 4, {}))
+    >>> list(chunkify(StringIO.StringIO(''), 5, 3))
+    []
+    >>> list(chunkify(StringIO.StringIO('A'), 5, 3))
     ['A']
-    >>> list(chunkify(StringIO.StringIO('AB'), 4, {}))
+    >>> list(chunkify(StringIO.StringIO('AB'), 5, 3))
     ['AB']
-    >>> list(chunkify(StringIO.StringIO('ABC'), 4, {}))
-    ['ABC']
-    >>> list(chunkify(StringIO.StringIO('ABCD'), 4, {}))
-    ['ABCD']
-    >>> list(chunkify(StringIO.StringIO('ABCDE'), 4, {}))
-    ['A', 'BCDE']
-    >>> list(chunkify(StringIO.StringIO('ABCDEF'), 4, {}))
-    ['AB', 'CDEF']
-    >>> list(chunkify(StringIO.StringIO('ABCDEFG'), 4, {}))
-    ['ABC', 'DEFG']
-    >>> list(chunkify(StringIO.StringIO('ABCDEFGH'), 4, {}))
-    ['ABCD', 'EFGH']
-    >>> list(chunkify(StringIO.StringIO('ABCDEFGHI'), 4, {}))
-    ['ABCD', 'E', 'FGHI']
-
-    >>> list(chunkify(StringIO.StringIO('ABCDEFGHIJKLMN'), 4, {}))
-    ['ABCD', 'EFGH', 'IJ', 'KLMN']
-
-    >>> chunks = {44564754: True} # 'BCDE'
-    >>> list(chunkify(StringIO.StringIO('ABCDEFGHIJKLMN'), 4, chunks))
-    ['A', 'BCDE', 'FGHI', 'J', 'KLMN']
-
-    >>> chunks = {44564754: True, 48496938: True} # 'BCDE', 'HIJK'
-    >>> list(chunkify(StringIO.StringIO('ABCDEFGHIJKLMN'), 4, chunks))
-    ['A', 'BCDE', 'FG', 'HIJK', 'LMN']
-
-    >>> chunks = {43909390: True, 50463030: True} # 'ABCD', 'KLMN'
-    >>> list(chunkify(StringIO.StringIO('ABCDEFGHIJKLMN'), 4, chunks))
-    ['ABCD', 'EFGH', 'IJ', 'KLMN']
+    >>> list(chunkify(StringIO.StringIO('1B'), 5, 3))
+    ['1', 'B']
+    >>> list(chunkify(StringIO.StringIO('ABCDEFGHIJKLMNOPQ'), 5, 3))
+    ['ABCD', 'EFGHI', 'JKLMN', 'OPQ']
+    >>> list(chunkify(StringIO.StringIO('1ABCDEFGHIJKLMNOPQ'), 5, 3))
+    ['1', 'ABCD', 'EFGHI', 'JKLMN', 'OPQ']
+    >>> list(chunkify(StringIO.StringIO('12ABCDEFGHIJKLMNOPQ'), 5, 3))
+    ['1', '2A', 'BCD', 'EFGHI', 'JKLMN', 'OPQ']
+    >>> list(chunkify(StringIO.StringIO('12ABCDEFGHIJKLMNOPQRSTUVWXYZ'), 5, 3))
+    ['1', '2A', 'BCD', 'EFGHI', 'JKLMN', 'OPQRS', 'TUVWX', 'YZ']
+    >>> list(chunkify(StringIO.StringIO('12ABCDEFGHIJKLMNOPQRSTUVWXYZ'), 5, 3))
+    ['1', '2A', 'BCD', 'EFGHI', 'JKLMN', 'OPQRS', 'TUVWX', 'YZ']
     """
     return ChunkifyIter(fd, chunk_size, chunks)
 
@@ -142,4 +118,5 @@ except ImportError:
 
 if __name__ == '__main__':
     import doctest
+    import StringIO
     doctest.testmod()