Browse Source

Cache redesign and encryption preparation

Jonas Borgström 14 years ago
parent
commit
f0240fbae6
4 changed files with 98 additions and 74 deletions
  1. 50 44
      dedupestore/archive.py
  2. 17 15
      dedupestore/archiver.py
  3. 10 13
      dedupestore/cache.py
  4. 21 2
      dedupestore/helpers.py

+ 50 - 44
dedupestore/archive.py

@@ -1,58 +1,59 @@
 from datetime import datetime
 import hashlib
 import logging
-import msgpack
 import os
 import stat
 import sys
 import zlib
 
-from .cache import NS_ARCHIVES, NS_CHUNKS
+from .cache import NS_ARCHIVES, NS_CHUNKS, NS_CINDEX
 from .chunkifier import chunkify
-from .helpers import uid2user, user2uid, gid2group, group2gid
+from .helpers import uid2user, user2uid, gid2group, group2gid, pack, unpack
 
 CHUNK_SIZE = 55001
 
 
 class Archive(object):
 
-    def __init__(self, store, cache, name=None):
+    def __init__(self, store, name=None):
         self.store = store
-        self.cache = cache
         self.items = []
         self.chunks = []
         self.chunk_idx = {}
         self.hard_links = {}
         if name:
-            self.open(name)
-
-    def open(self, name):
-        id = self.cache.archives[name]
-        data = self.store.get(NS_ARCHIVES, id)
-        if hashlib.sha256(data).digest() != id:
-            raise Exception('Archive hash did not match')
-        archive = msgpack.unpackb(zlib.decompress(data))
-        version = archive.get('version')
-        if version != 1:
-            raise Exception('Archive version %r not supported' % version)
+            self.load(hashlib.sha256(name).digest())
+
+    def load(self, id):
+        self.id = id
+        archive = unpack(self.store.get(NS_ARCHIVES, self.id))
+        if archive['version'] != 1:
+            raise Exception('Archive version %r not supported' % archive['version'])
         self.items = archive['items']
         self.name = archive['name']
-        self.chunks = archive['chunks']
-        for i, chunk in enumerate(archive['chunks']):
+        cindex = unpack(self.store.get(NS_CINDEX, self.id))
+        assert cindex['version'] == 1
+        self.chunks = cindex['chunks']
+        for i, chunk in enumerate(self.chunks):
             self.chunk_idx[i] = chunk[0]
 
     def save(self, name):
+        self.id = hashlib.sha256(name).digest()
         archive = {
             'version': 1,
             'name': name,
-            'cmdline': ' '.join(sys.argv),
+            'cmdline': sys.argv,
             'ts': datetime.utcnow().isoformat(),
             'items': self.items,
-            'chunks': self.chunks
         }
-        data = zlib.compress(msgpack.packb(archive))
-        self.id = hashlib.sha256(data).digest()
+        _, data = pack(archive)
         self.store.put(NS_ARCHIVES, self.id, data)
+        cindex = {
+            'version': 1,
+            'chunks': self.chunks,
+        }
+        _, data = pack(cindex)
+        self.store.put(NS_CINDEX, self.id, data)
         self.store.commit()
 
     def add_chunk(self, id, size):
@@ -71,7 +72,7 @@ class Archive(object):
                 osize += item['size']
         for id, size in self.chunks:
             csize += size
-            if self.cache.seen_chunk(id) == 1:
+            if cache.seen_chunk(id) == 1:
                 usize += size
         return osize, csize, usize
 
@@ -115,13 +116,10 @@ class Archive(object):
                 with open(path, 'wb') as fd:
                     for chunk in item['chunks']:
                         id = self.chunk_idx[chunk]
-                        data = self.store.get(NS_CHUNKS, id)
-                        cid = data[:32]
-                        data = data[32:]
-                        if hashlib.sha256(data).digest() != cid:
+                        try:
+                            fd.write(unpack(self.store.get(NS_CHUNKS, id)))
+                        except ValueError:
                             raise Exception('Invalid chunk checksum')
-                        data = zlib.decompress(data)
-                        fd.write(data)
                 self.restore_stat(path, item)
             else:
                 raise Exception('Unknown archive item type %r' % item['type'])
@@ -146,21 +144,20 @@ class Archive(object):
                 item['path'] = item['path'].decode('utf-8')
                 for chunk in item['chunks']:
                     id = self.chunk_idx[chunk]
-                    data = self.store.get(NS_CHUNKS, id)
-                    cid = data[:32]
-                    data = data[32:]
-                    if (hashlib.sha256(data).digest() != cid):
+                    try:
+                        unpack(self.store.get(NS_CHUNKS, id))
+                    except ValueError:
                         logging.error('%s ... ERROR', item['path'])
                         break
-                else:
-                    logging.info('%s ... OK', item['path'])
+                    else:
+                        logging.info('%s ... OK', item['path'])
 
     def delete(self, cache):
-        self.store.delete(NS_ARCHIVES, self.cache.archives[self.name])
+        self.store.delete(NS_ARCHIVES, self.id)
+        self.store.delete(NS_CINDEX, self.id)
         for id, size in self.chunks:
             cache.chunk_decref(id)
         self.store.commit()
-        del cache.archives[self.name]
         cache.save()
 
     def _walk(self, path):
@@ -172,7 +169,11 @@ class Archive(object):
                     yield x
 
     def create(self, name, paths, cache):
-        if name in cache.archives:
+        try:
+            self.store.get(NS_ARCHIVES, name)
+        except self.store.DoesNotExist:
+            pass
+        else:
             raise NameError('Archive already exists')
         for path in paths:
             for path, st in self._walk(unicode(path)):
@@ -181,11 +182,10 @@ class Archive(object):
                 elif stat.S_ISLNK(st.st_mode):
                     self.process_symlink(path, st)
                 elif stat.S_ISREG(st.st_mode):
-                    self.process_file(path, st)
+                    self.process_file(path, st, cache)
                 else:
                     logging.error('Unknown file type: %s', path)
         self.save(name)
-        cache.archives[name] = self.id
         cache.save()
 
     def process_dir(self, path, st):
@@ -210,7 +210,7 @@ class Archive(object):
             'gid': st.st_gid, 'group': gid2group(st.st_gid),
             'ctime': st.st_ctime, 'mtime': st.st_mtime,
         })
-    def process_file(self, path, st):
+    def process_file(self, path, st, cache):
         safe_path = path.lstrip('/\\:')
         if st.st_nlink > 1:
             source = self.hard_links.get((st.st_ino, st.st_dev))
@@ -231,7 +231,7 @@ class Archive(object):
             chunks = []
             size = 0
             for chunk in chunkify(fd, CHUNK_SIZE, 30):
-                chunks.append(self.process_chunk(chunk))
+                chunks.append(self.process_chunk(chunk, cache))
                 size += len(chunk)
         self.items.append({
             'type': 'FILE', 'path': safe_path, 'chunks': chunks, 'size': size,
@@ -241,14 +241,20 @@ class Archive(object):
             'ctime': st.st_ctime, 'mtime': st.st_mtime,
         })
 
-    def process_chunk(self, data):
+    def process_chunk(self, data, cache):
         id = hashlib.sha256(data).digest()
         try:
             return self.chunk_idx[id]
         except KeyError:
             idx = len(self.chunks)
-            size = self.cache.add_chunk(id, data)
+            size = cache.add_chunk(id, data)
             self.chunks.append((id, size))
             self.chunk_idx[id] = idx
             return idx
 
+    @staticmethod
+    def list_archives(store):
+        for id in store.list(NS_ARCHIVES):
+            archive = Archive(store)
+            archive.load(id)
+            yield archive

+ 17 - 15
dedupestore/archiver.py

@@ -12,49 +12,51 @@ class Archiver(object):
 
     def open_store(self, location):
         store = BandStore(location.path)
-        cache = Cache(store)
-        return store, cache
+        return store
 
     def exit_code_from_logger(self):
         return 1 if self.level_filter.count.get('ERROR') else 0
 
     def do_create(self, args):
-        store, cache = self.open_store(args.archive)
-        archive = Archive(store, cache)
+        store = self.open_store(args.archive)
+        cache = Cache(store)
+        archive = Archive(store)
         archive.create(args.archive.archive, args.paths, cache)
         return self.exit_code_from_logger()
 
     def do_extract(self, args):
-        store, cache = self.open_store(args.archive)
-        archive = Archive(store, cache, args.archive.archive)
+        store = self.open_store(args.archive)
+        archive = Archive(store, args.archive.archive)
         archive.extract(args.dest)
         return self.exit_code_from_logger()
 
     def do_delete(self, args):
-        store, cache = self.open_store(args.archive)
-        archive = Archive(store, cache, args.archive.archive)
+        store = self.open_store(args.archive)
+        cache = Cache(store)
+        archive = Archive(store, args.archive.archive)
         archive.delete(cache)
         return self.exit_code_from_logger()
 
     def do_list(self, args):
-        store, cache = self.open_store(args.src)
+        store = self.open_store(args.src)
         if args.src.archive:
-            archive = Archive(store, cache, args.src.archive)
+            archive = Archive(store, args.src.archive)
             archive.list()
         else:
-            for archive in sorted(cache.archives):
+            for archive in Archive.list_archives(store):
                 print archive
         return self.exit_code_from_logger()
 
     def do_verify(self, args):
-        store, cache = self.open_store(args.archive)
-        archive = Archive(store, cache, args.archive.archive)
+        store = self.open_store(args.archive)
+        archive = Archive(store, args.archive.archive)
         archive.verify()
         return self.exit_code_from_logger()
 
     def do_info(self, args):
-        store, cache = self.open_store(args.archive)
-        archive = Archive(store, cache, args.archive.archive)
+        store = self.open_store(args.archive)
+        cache = Cache(store)
+        archive = Archive(store, args.archive.archive)
         osize, csize, usize = archive.stats(cache)
         print 'Original size:', pretty_size(osize)
         print 'Compressed size:', pretty_size(csize)

+ 10 - 13
dedupestore/cache.py

@@ -4,8 +4,11 @@ import msgpack
 import os
 import zlib
 
+from .helpers import pack, unpack
+
 NS_ARCHIVES = 'A'
 NS_CHUNKS = 'C'
+NS_CINDEX = 'I'
 
 
 class Cache(object):
@@ -37,25 +40,19 @@ class Cache(object):
         if data['store'] != self.store.uuid:
             raise Exception('Cache UUID mismatch')
         self.chunkmap = data['chunkmap']
-        self.archives = data['archives']
         self.tid = data['tid']
 
     def init(self):
         """Initializes cache by fetching and reading all archive indicies
         """
-        logging.info('Initialzing cache...')
+        logging.info('Initializing cache...')
         self.chunkmap = {}
-        self.archives = {}
         self.tid = self.store.tid
         if self.store.tid == 0:
             return
-        for id in list(self.store.list(NS_ARCHIVES)):
-            data = self.store.get(NS_ARCHIVES, id)
-            if hashlib.sha256(data).digest() != id:
-                raise Exception('Archive hash did not match')
-            archive = msgpack.unpackb(zlib.decompress(data))
-            self.archives[archive['name']] = id
-            for id, size in archive['chunks']:
+        for id in list(self.store.list(NS_CINDEX)):
+            cindex = unpack(self.store.get(NS_CINDEX, id))
+            for id, size in cindex['chunks']:
                 try:
                     count, size = self.chunkmap[id]
                     self.chunkmap[id] = count + 1, size
@@ -68,7 +65,8 @@ class Cache(object):
         data = {'version': 1,
                 'store': self.store.uuid,
                 'chunkmap': self.chunkmap,
-                'tid': self.store.tid, 'archives': self.archives}
+                'tid': self.store.tid,
+        }
         cachedir = os.path.dirname(self.path)
         if not os.path.exists(cachedir):
             os.makedirs(cachedir)
@@ -80,8 +78,7 @@ class Cache(object):
     def add_chunk(self, id, data):
         if self.seen_chunk(id):
             return self.chunk_incref(id)
-        data = zlib.compress(data)
-        data = hashlib.sha256(data).digest() + data
+        _, data = pack(data)
         csize = len(data)
         self.store.put(NS_CHUNKS, id, data)
         self.chunkmap[id] = (1, csize)

+ 21 - 2
dedupestore/helpers.py

@@ -1,8 +1,27 @@
-import logging
 import argparse
-import re
 import grp
+import hashlib
+import logging
+import msgpack
 import pwd
+import re
+import zlib
+
+
+def pack(data):
+    data = zlib.compress(msgpack.packb(data))
+    id = hashlib.sha256(data).digest()
+    tid = 0
+    return id, msgpack.packb((1, tid, id, data))
+
+
+def unpack(data):
+    version, tid, id, data = msgpack.unpackb(data)
+    assert version == 1
+    if hashlib.sha256(data).digest() != id:
+        raise ValueError
+    return msgpack.unpackb(zlib.decompress(data))
+
 
 def memoize(function):
     cache = {}