Explorar o código

Switched to msgpack and some bandedstore improvements.

Jonas Borgström %!s(int64=14) %!d(string=hai) anos
pai
achega
a8ff4362b8
Modificáronse 5 ficheiros con 40 adicións e 85 borrados
  1. 0 37
      dedupestore/__init__.py
  2. 5 15
      dedupestore/archiver.py
  3. 26 16
      dedupestore/bandstore.py
  4. 8 16
      dedupestore/cache.py
  5. 1 1
      dedupestore/test.py

+ 0 - 37
dedupestore/__init__.py

@@ -1,38 +1 @@
 # This is a python package
-
-ARCHIVE_SCHEMA = """
-{
-    "name": "Archive",
-    "type": "record",
-    "fields" : [
-        { "name": "name", "type": "string" },
-        { "name": "ts",   "type": "string" },
-        { "name": "chunks", "type": { "type": "array", "items":
-            { "type": "record",
-              "name": "Chunk",
-              "fields": [
-                { "name": "id", "type": {"type": "fixed", "size": 32, "name": "sha256" }},
-                { "name": "size", "type": "int" }
-              ]
-            }
-        }},
-        { "name": "items", "type": {"type": "array", "items":
-            {
-                "type": "record",
-                "name": "Item",
-                "fields": [
-                    { "name": "type", "type":
-                      { "name": "ItemType", "type": "enum", "symbols": ["FILE", "DIRECTORY"] } },
-                    { "name": "path", "type": "string" },
-                    { "name": "size", "type": ["null", "long"] },
-                    { "name": "chunks", "type": ["null",
-                        { "type": "array", "items": "int" }
-                    ]}
-                ]
-            }
-        }}
-    ]
-}
-"""
-from avro import schema
-archive_schema = schema.parse(ARCHIVE_SCHEMA)

+ 5 - 15
dedupestore/archiver.py

@@ -4,12 +4,9 @@ import logging
 import zlib
 import argparse
 import sys
-from cStringIO import StringIO
 from datetime import datetime
 
-from avro import io
-
-from dedupestore import archive_schema
+import msgpack
 from chunkifier import chunkify
 from cache import Cache, NS_ARCHIVES, NS_CHUNKS
 from bandstore import BandStore
@@ -45,15 +42,12 @@ class Archive(object):
         data = self.store.get(NS_ARCHIVES, id)
         if hashlib.sha256(data).digest() != id:
             raise Exception('Archive hash did not match')
-        buffer = StringIO(zlib.decompress(data))
-        reader = io.DatumReader(archive_schema)
-        decoder = io.BinaryDecoder(buffer)
-        archive = reader.read(decoder)
+        archive = msgpack.unpackb(zlib.decompress(data))
         self.items = archive['items']
         self.name = archive['name']
         self.chunks = archive['chunks']
         for i, chunk in enumerate(archive['chunks']):
-            self.chunk_idx[i] = chunk['id']
+            self.chunk_idx[i] = chunk[0]
 
     def save(self, name):
         archive = {
@@ -62,11 +56,7 @@ class Archive(object):
             'items': self.items,
             'chunks': self.chunks
         }
-        writer = StringIO()
-        encoder = io.BinaryEncoder(writer)
-        datum_writer = io.DatumWriter(archive_schema)
-        datum_writer.write(archive, encoder)
-        data = zlib.compress(writer.getvalue())
+        data = zlib.compress(msgpack.packb(archive))
         self.id = hashlib.sha256(data).digest()
         self.store.put(NS_ARCHIVES, self.id, data)
         self.store.commit()
@@ -76,7 +66,7 @@ class Archive(object):
             return self.chunk_idx[id]
         except KeyError:
             idx = len(self.chunks)
-            self.chunks.append(dict(id=id, size=size))
+            self.chunks.append((id, size))
             self.chunk_idx[id] = idx
             return idx
 

+ 26 - 16
dedupestore/bandstore.py

@@ -21,9 +21,10 @@ class BandStore(object):
     IDLE = 'Idle'
     OPEN = 'Open'
     ACTIVE = 'Active'
-    BAND_LIMIT = 10 * 1024 * 1024
+    BAND_LIMIT = 1 * 1024 * 1024
 
     def __init__(self, path):
+        self.band_fd = None
         if not os.path.exists(path):
             self.create(path)
         self.open(path)
@@ -42,6 +43,9 @@ class BandStore(object):
         self._begin()
 
     def _begin(self):
+        if self.band_fd:
+            self.band_fd.close()
+            self.band_fd = None
         row = self.cursor.execute('SELECT uuid, tid, nextband, version, '
                                   'bandlimit FROM system').fetchone()
         self.uuid, self.tid, self.nextband, version, self.bandlimit = row
@@ -67,6 +71,7 @@ class BandStore(object):
         cnx.execute('CREATE UNIQUE INDEX objects_pk ON objects(ns, id)')
 
     def close(self):
+        self.rollback()
         self.cnx.close()
         self.lock_fd.close()
         os.unlink(os.path.join(self.path, 'lock'))
@@ -77,12 +82,13 @@ class BandStore(object):
         self.band = None
         for b in self.to_delete:
             objects = self.cursor.execute('SELECT ns, id, offset, size '
-                                          'FROM objects WHERE band=?', (b,)).fetchall()
+                                          'FROM objects WHERE band=? ORDER BY offset',
+                                          (b,)).fetchall()
             for o in objects:
                 band, offset, size = self.store_data(self.retrieve_data(b, *o[2:]))
                 self.cursor.execute('UPDATE objects SET band=?, offset=?, size=? '
                                     'WHERE ns=? AND id=?', (band, offset, size, o[0], o[1]))
-            os.unlink(os.path.join(self.path, 'bands', str(b)))
+            os.unlink(self.band_filename(b))
         self.cursor.execute('UPDATE system SET tid=tid+1, nextband=?',
                             (self.nextband,))
         self.cnx.commit()
@@ -107,25 +113,29 @@ class BandStore(object):
             raise self.DoesNotExist
 
     def band_filename(self, band):
-        return os.path.join(self.path, 'bands', str(band))
+        return os.path.join(self.path, 'bands', str(band / 1000), str(band))
 
     def retrieve_data(self, band, offset, size):
-        with open(self.band_filename(band), 'rb') as fd:
-            fd.seek(offset)
-            return fd.read(size)
+        if self.band != band:
+            self.band = band
+            self.band_fd = open(self.band_filename(band), 'rb')
+        self.band_fd.seek(offset)
+        return self.band_fd.read(size)
 
     def store_data(self, data):
-        if self.band is None:
+        if self.band_fd is None:
             self.band = self.nextband
-            assert not os.path.exists(self.band_filename(self.band))
             self.nextband += 1
-        band = self.band
-        with open(self.band_filename(band), 'ab') as fd:
-            offset = fd.tell()
-            fd.write(data)
-            if offset + len(data) > self.bandlimit:
-                self.band = None
-        return band, offset, len(data)
+            if self.band % 1000 == 0:
+                os.mkdir(os.path.join(self.path, 'bands', str(self.band / 1000)))
+            assert not os.path.exists(self.band_filename(self.band))
+            self.band_fd = open(self.band_filename(self.band), 'ab')
+        offset = self.band_fd.tell()
+        self.band_fd.write(data)
+        if offset + len(data) > self.bandlimit:
+            self.band_fd.close()
+            self.band_fd = None
+        return self.band, offset, len(data)
 
     def put(self, ns, id, data):
         """

+ 8 - 16
dedupestore/cache.py

@@ -1,11 +1,7 @@
 import hashlib
 import os
 import zlib
-from avro import io
-from cStringIO import StringIO
-import cPickle
-
-from dedupestore import archive_schema
+import msgpack
 
 NS_ARCHIVES = 'ARCHIVES'
 NS_CHUNKS = 'CHUNKS'
@@ -32,7 +28,7 @@ class Cache(object):
         data = data[32:]
         if hashlib.sha256(data).digest() != id:
             raise Exception('Cache hash did not match')
-        data = cPickle.loads(zlib.decompress(data))
+        data = msgpack.unpackb(zlib.decompress(data))
         if data['uuid'] != self.store.uuid:
             raise Exception('Cache UUID mismatch')
         self.chunkmap = data['chunkmap']
@@ -51,23 +47,19 @@ class Cache(object):
             data = self.store.get(NS_ARCHIVES, id)
             if hashlib.sha256(data).digest() != id:
                 raise Exception('Archive hash did not match')
-
-            buffer = StringIO(zlib.decompress(data))
-            reader = io.DatumReader(archive_schema)
-            decoder = io.BinaryDecoder(buffer)
-            archive = reader.read(decoder)
+            archive = msgpack.unpackb(zlib.decompress(data))
             self.archives[archive['name']] = id
             for item in archive['items']:
                 if item['type'] != 'FILE':
                     continue
                 for idx in item['chunks']:
-                    chunk = archive['chunks'][idx]
-                    id = chunk['id']
+                    id, size = archive['chunks'][idx]
                     if self.seen_chunk(id):
                         self.chunk_incref(id)
                     else:
-                        self.init_chunk(id, chunk['size'])
-            self.save()
+                        self.init_chunk(id, size)
+        self.save()
+
 
     def save(self):
         assert self.store.state == self.store.OPEN
@@ -78,7 +70,7 @@ class Cache(object):
         if not os.path.exists(cachedir):
             os.makedirs(cachedir)
         with open(self.path, 'wb') as fd:
-            data = zlib.compress(cPickle.dumps(data))
+            data = zlib.compress(msgpack.packb(data))
             id = hashlib.sha256(data).digest()
             fd.write(id + data)
 

+ 1 - 1
dedupestore/test.py

@@ -36,7 +36,7 @@ class Test(unittest.TestCase):
     def test_corrupted_store(self):
         self.create_src_archive('test')
         self.dedupestore('verify', self.store_path + '::test')
-        fd = open(os.path.join(self.tmpdir, 'store', 'bands', '0'), 'r+')
+        fd = open(os.path.join(self.tmpdir, 'store', 'bands', '0', '0'), 'r+')
         fd.seek(1000)
         fd.write('X')
         fd.close()