Browse Source

Switched to using avro serialization for archives

Jonas Borgström 14 years ago
parent
commit
6eb65d07f9
3 changed files with 91 additions and 30 deletions
  1. 38 1
      dedupestore/__init__.py
  2. 31 14
      dedupestore/archiver.py
  3. 22 15
      dedupestore/cache.py

+ 38 - 1
dedupestore/__init__.py

@@ -1 +1,38 @@
-# This is a python package
+# This is a python package
+
+ARCHIVE_SCHEMA = """
+{
+    "name": "Archive",
+    "type": "record",
+    "fields" : [
+        { "name": "name", "type": "string" },
+        { "name": "ts",   "type": "string" },
+        { "name": "chunks", "type": { "type": "array", "items":
+            { "type": "record",
+              "name": "Chunk",
+              "fields": [
+                { "name": "id", "type": {"type": "fixed", "size": 32, "name": "sha256" }},
+                { "name": "size", "type": "int" }
+              ]
+            }
+        }},
+        { "name": "items", "type": {"type": "array", "items":
+            {
+                "type": "record",
+                "name": "Item",
+                "fields": [
+                    { "name": "type", "type":
+                      { "name": "ItemType", "type": "enum", "symbols": ["FILE", "DIRECTORY"] } },
+                    { "name": "path", "type": "string" },
+                    { "name": "size", "type": ["null", "long"] },
+                    { "name": "chunks", "type": ["null",
+                        { "type": "array", "items": "int" }
+                    ]}
+                ]
+            }
+        }}
+    ]
+}
+"""
+from avro import schema
+archive_schema = schema.parse(ARCHIVE_SCHEMA)

+ 31 - 14
dedupestore/archiver.py

@@ -2,10 +2,14 @@ import os
 import hashlib
 import hashlib
 import logging
 import logging
 import zlib
 import zlib
-import cPickle
 import argparse
 import argparse
 import sys
 import sys
+from cStringIO import StringIO
+from datetime import datetime
 
 
+from avro import io
+
+from dedupestore import archive_schema
 from chunkifier import chunkify
 from chunkifier import chunkify
 from cache import Cache, NS_ARCHIVES, NS_CHUNKS
 from cache import Cache, NS_ARCHIVES, NS_CHUNKS
 from bandstore import BandStore
 from bandstore import BandStore
@@ -41,26 +45,39 @@ class Archive(object):
         data = self.store.get(NS_ARCHIVES, id)
         data = self.store.get(NS_ARCHIVES, id)
         if hashlib.sha256(data).digest() != id:
         if hashlib.sha256(data).digest() != id:
             raise Exception('Archive hash did not match')
             raise Exception('Archive hash did not match')
-        archive = cPickle.loads(zlib.decompress(data))
+        buffer = StringIO(zlib.decompress(data))
+        reader = io.DatumReader(archive_schema)
+        decoder = io.BinaryDecoder(buffer)
+        archive = reader.read(decoder)
         self.items = archive['items']
         self.items = archive['items']
         self.name = archive['name']
         self.name = archive['name']
         self.chunks = archive['chunks']
         self.chunks = archive['chunks']
-        for i, (id, csize, osize) in enumerate(archive['chunks']):
-            self.chunk_idx[i] = id
+        for i, chunk in enumerate(archive['chunks']):
+            self.chunk_idx[i] = chunk['id']
 
 
     def save(self, name):
     def save(self, name):
-        archive = {'name': name, 'items': self.items, 'chunks': self.chunks}
-        data = zlib.compress(cPickle.dumps(archive))
+        archive = {
+            'name': name,
+            'ts': datetime.utcnow().isoformat(),
+            'items': self.items,
+            'chunks': self.chunks
+        }
+        writer = StringIO()
+        encoder = io.BinaryEncoder(writer)
+        datum_writer = io.DatumWriter(archive_schema)
+        datum_writer.write(archive, encoder)
+        data = zlib.compress(writer.getvalue())
+        print 'archive size: %d' % len(data)
         self.id = hashlib.sha256(data).digest()
         self.id = hashlib.sha256(data).digest()
         self.store.put(NS_ARCHIVES, self.id, data)
         self.store.put(NS_ARCHIVES, self.id, data)
         self.store.commit()
         self.store.commit()
 
 
-    def add_chunk(self, id, csize, osize):
+    def add_chunk(self, id, size):
         try:
         try:
             return self.chunk_idx[id]
             return self.chunk_idx[id]
         except KeyError:
         except KeyError:
             idx = len(self.chunks)
             idx = len(self.chunks)
-            self.chunks.append((id, csize, osize))
+            self.chunks.append(dict(id=id, size=size))
             self.chunk_idx[id] = idx
             self.chunk_idx[id] = idx
             return idx
             return idx
 
 
@@ -77,10 +94,10 @@ class Archive(object):
                     chunk_count.setdefault(id, 0)
                     chunk_count.setdefault(id, 0)
                     chunk_count[id] += 1
                     chunk_count[id] += 1
         for id, c in chunk_count.items():
         for id, c in chunk_count.items():
-            count, csize, osize = cache.chunkmap[id]
-            total_csize += csize
+            count, size = cache.chunkmap[id]
+            total_csize += size
             if  c == count:
             if  c == count:
-                total_usize += csize
+                total_usize += size
         return dict(osize=total_osize, csize=total_csize, usize=total_usize)
         return dict(osize=total_osize, csize=total_csize, usize=total_usize)
 
 
     def list(self):
     def list(self):
@@ -93,7 +110,7 @@ class Archive(object):
             assert item['path'][0] not in ('/', '\\', ':')
             assert item['path'][0] not in ('/', '\\', ':')
             path = os.path.join(dest, item['path'])
             path = os.path.join(dest, item['path'])
             logging.info(path)
             logging.info(path)
-            if item['type'] == 'DIR':
+            if item['type'] == 'DIRECTORY':
                 if not os.path.exists(path):
                 if not os.path.exists(path):
                     os.makedirs(path)
                     os.makedirs(path)
             if item['type'] == 'FILE':
             if item['type'] == 'FILE':
@@ -142,7 +159,7 @@ class Archive(object):
         if name in cache.archives:
         if name in cache.archives:
             raise NameError('Archive already exists')
             raise NameError('Archive already exists')
         for path in paths:
         for path in paths:
-            for root, dirs, files in os.walk(path):
+            for root, dirs, files in os.walk(unicode(path)):
                 for d in dirs:
                 for d in dirs:
                     p = os.path.join(root, d)
                     p = os.path.join(root, d)
                     self.items.append(self.process_dir(p, cache))
                     self.items.append(self.process_dir(p, cache))
@@ -158,7 +175,7 @@ class Archive(object):
     def process_dir(self, path, cache):
     def process_dir(self, path, cache):
         path = path.lstrip('/\\:')
         path = path.lstrip('/\\:')
         logging.info(path)
         logging.info(path)
-        return {'type': 'DIR', 'path': path}
+        return {'type': 'DIRECTORY', 'path': path}
 
 
     def process_file(self, path, cache):
     def process_file(self, path, cache):
         try:
         try:

+ 22 - 15
dedupestore/cache.py

@@ -1,8 +1,11 @@
-import cPickle
 import hashlib
 import hashlib
 import os
 import os
-import sys
 import zlib
 import zlib
+from avro import io
+from cStringIO import StringIO
+import cPickle
+
+from dedupestore import archive_schema
 
 
 NS_ARCHIVES = 'ARCHIVES'
 NS_ARCHIVES = 'ARCHIVES'
 NS_CHUNKS = 'CHUNKS'
 NS_CHUNKS = 'CHUNKS'
@@ -48,13 +51,17 @@ class Cache(object):
             data = self.store.get(NS_ARCHIVES, id)
             data = self.store.get(NS_ARCHIVES, id)
             if hashlib.sha256(data).digest() != id:
             if hashlib.sha256(data).digest() != id:
                 raise Exception('Archive hash did not match')
                 raise Exception('Archive hash did not match')
-            archive = cPickle.loads(zlib.decompress(data))
+
+            buffer = StringIO(zlib.decompress(data))
+            reader = io.DatumReader(archive_schema)
+            decoder = io.BinaryDecoder(buffer)
+            archive = reader.read(decoder)
             self.archives[archive['name']] = id
             self.archives[archive['name']] = id
-            for id, csize, osize in archive['chunks']:
+            for id, size in archive['chunks']:
                 if self.seen_chunk(id):
                 if self.seen_chunk(id):
                     self.chunk_incref(id)
                     self.chunk_incref(id)
                 else:
                 else:
-                    self.init_chunk(id, csize, osize)
+                    self.init_chunk(id, size)
 
 
     def save(self):
     def save(self):
         assert self.store.state == self.store.OPEN
         assert self.store.state == self.store.OPEN
@@ -78,27 +85,27 @@ class Cache(object):
         data = hashlib.sha256(data).digest() + data
         data = hashlib.sha256(data).digest() + data
         csize = len(data)
         csize = len(data)
         self.store.put(NS_CHUNKS, id, data)
         self.store.put(NS_CHUNKS, id, data)
-        return self.init_chunk(id, csize, osize)
+        return self.init_chunk(id, csize)
 
 
-    def init_chunk(self, id, csize, osize):
-        self.chunkmap[id] = (1, csize, osize)
-        return id, csize, osize
+    def init_chunk(self, id, size):
+        self.chunkmap[id] = (1, size)
+        return id, size
 
 
     def seen_chunk(self, id):
     def seen_chunk(self, id):
-        count, csize, osize = self.chunkmap.get(id, (0, 0, 0))
+        count, size = self.chunkmap.get(id, (0, 0))
         return count
         return count
 
 
     def chunk_incref(self, id):
     def chunk_incref(self, id):
-        count, csize, osize = self.chunkmap[id]
-        self.chunkmap[id] = (count + 1, csize, osize)
-        return id, csize, osize
+        count, size = self.chunkmap[id]
+        self.chunkmap[id] = (count + 1, size)
+        return id, size
 
 
     def chunk_decref(self, id):
     def chunk_decref(self, id):
-        count, csize, osize = self.chunkmap[id]
+        count, size = self.chunkmap[id]
         if count == 1:
         if count == 1:
             del self.chunkmap[id]
             del self.chunkmap[id]
             self.store.delete(NS_CHUNKS, id)
             self.store.delete(NS_CHUNKS, id)
         else:
         else:
-            self.chunkmap[id] = (count - 1, csize, osize)
+            self.chunkmap[id] = (count - 1, size)