Browse Source

Switched to a namespace base object store implementation.

Jonas Borgström 15 years ago
parent
commit
5cbafdfc20
2 changed files with 113 additions and 84 deletions
  1. 21 38
      dedupstore/archiver.py
  2. 92 46
      dedupstore/store.py

+ 21 - 38
dedupstore/archiver.py

@@ -8,7 +8,8 @@ from optparse import OptionParser
 from store import Store
 
 CHUNKSIZE = 256 * 1024
-
+NS_ARCHIVES = 'ARCHIVES'
+NS_CHUNKS  = 'CHUNKS'
 
 class Cache(object):
     """Client Side cache
@@ -16,14 +17,14 @@ class Cache(object):
     def __init__(self, path, store):
         self.store = store
         self.path = path
-        self.tid = 'unknown'
+        self.tid = -1
         self.open()
         if self.tid != self.store.tid:
-            print self.tid.encode('hex'), self.store.tid.encode('hex')
+            print self.tid, self.store.tid
             self.create()
 
     def open(self):
-        if self.store.tid == '':
+        if self.store.tid == -1:
             return
         filename = os.path.join(self.path, '%s.cache' % self.store.uuid)
         if not os.path.exists(filename):
@@ -32,31 +33,16 @@ class Cache(object):
         data = cPickle.loads(zlib.decompress(open(filename, 'rb').read()))
         self.chunkmap = data['chunkmap']
         self.tid = data['tid']
-        self.archives = data['archives']
         print 'done'
 
-    def update_manifest(self):
-        print 'old manifest', self.tid.encode('hex')
-        if self.tid:
-            self.chunk_decref(self.tid)
-        manifest = {'archives': self.archives.values()}
-        hash = self.add_chunk(zlib.compress(cPickle.dumps(manifest)))
-        print 'new manifest', hash.encode('hex')
-        self.store.commit(hash)
-
     def create(self):
-        self.archives = {}
         self.chunkmap = {}
         self.tid = self.store.tid
-        if self.store.tid == '':
+        if self.store.tid == 0:
             return
         print 'Recreating cache...'
-        self.chunk_incref(self.store.tid)
-        manifest = cPickle.loads(zlib.decompress(self.store.get(self.store.tid)))
-        for hash in manifest['archives']:
-            self.chunk_incref(hash)
-            archive = cPickle.loads(zlib.decompress(self.store.get(hash)))
-            self.archives[archive['name']] = hash
+        for id in self.store.list(NS_ARCHIVES):
+            archive = cPickle.loads(zlib.decompress(self.store.get(NS_ARCHIVES, id)))
             for item in archive['items']:
                 if item['type'] == 'FILE':
                     for c in item['chunks']:
@@ -66,7 +52,7 @@ class Cache(object):
     def save(self):
         assert self.store.state == Store.OPEN
         print 'saving cache'
-        data = {'chunkmap': self.chunkmap, 'tid': self.store.tid, 'archives': self.archives}
+        data = {'chunkmap': self.chunkmap, 'tid': self.store.tid}
         filename = os.path.join(self.path, '%s.cache' % self.store.uuid)
         print 'Saving cache as:', filename
         with open(filename, 'wb') as fd:
@@ -76,7 +62,7 @@ class Cache(object):
     def add_chunk(self, data):
         hash = hashlib.sha1(data).digest()
         if not self.seen_chunk(hash):
-            self.store.put(data, hash)
+            self.store.put(NS_CHUNKS, hash, data)
         else:
             print 'seen chunk', hash.encode('hex')
         self.chunk_incref(hash)
@@ -95,7 +81,7 @@ class Cache(object):
         self.chunkmap[hash] = count
         if not count:
             print 'deleting chunk: ', hash.encode('hex')
-            self.store.delete(hash)
+            self.store.delete(NS_CHUNKS, hash)
         return count
 
 
@@ -106,8 +92,8 @@ class Archiver(object):
         self.cache = Cache('/tmp/cache', self.store)
 
     def create_archive(self, archive_name, paths):
-        if archive_name in self.cache.archives:
-            raise Exception('Archive "%s" already exists' % archive_name)
+#        if archive_name in self.cache.archives:
+#            raise Exception('Archive "%s" already exists' % archive_name)
         items = []
         for path in paths:
             for root, dirs, files in os.walk(path):
@@ -117,24 +103,21 @@ class Archiver(object):
                 for f in files:
                     name = os.path.join(root, f)
                     items.append(self.process_file(name, self.cache))
-        archive = {'name': name, 'items': items}
-        hash = self.cache.add_chunk(zlib.compress(cPickle.dumps(archive)))
-        self.cache.archives[archive_name] = hash
-        self.cache.update_manifest()
+        archive = {'name': archive_name, 'items': items}
+        hash = self.store.put(NS_ARCHIVES, archive_name, zlib.compress(cPickle.dumps(archive)))
+        self.store.commit()
         self.cache.save()
 
     def delete_archive(self, archive_name):
-        hash = self.cache.archives.get(archive_name)
-        if not hash:
-            raise Exception('Archive "%s" does not exist' % archive_name)
-        archive = cPickle.loads(zlib.decompress(self.store.get(hash)))
-        self.cache.chunk_decref(hash)
+        archive = cPickle.loads(zlib.decompress(self.store.get(NS_ARCHIVES, archive_name)))
+        self.store.delete(NS_ARCHIVES, archive_name)
+#        if not hash:
+#            raise Exception('Archive "%s" does not exist' % archive_name)
         for item in archive['items']:
             if item['type'] == 'FILE':
                 for c in item['chunks']:
                     self.cache.chunk_decref(c)
-        del self.cache.archives[archive_name]
-        self.cache.update_manifest()
+        self.store.commit()
         self.cache.save()
 
     def process_dir(self, path, cache):

+ 92 - 46
dedupstore/store.py

@@ -17,7 +17,7 @@ class Store(object):
     VERSION = 'DEDUPSTORE VERSION 1'
 
     def __init__(self, path):
-        self.tid = ''
+        self.tid = '-1'
         self.state = Store.IDLE
         if not os.path.exists(path):
             self.create(path)
@@ -27,7 +27,7 @@ class Store(object):
         os.mkdir(path)
         open(os.path.join(path, 'version'), 'wb').write(self.VERSION)
         open(os.path.join(path, 'uuid'), 'wb').write(str(uuid.uuid4()))
-        open(os.path.join(path, 'tid'), 'wb').write('')
+        open(os.path.join(path, 'tid'), 'wb').write('0')
         os.mkdir(os.path.join(path, 'data'))
 
     def open(self, path):
@@ -40,7 +40,7 @@ class Store(object):
         self.uuid = open(os.path.join(path, 'uuid'), 'rb').read()
         self.lock_fd = open(os.path.join(path, 'lock'), 'w')
         fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
-        self.tid = open(os.path.join(path, 'tid'), 'r').read()
+        self.tid = int(open(os.path.join(path, 'tid'), 'r').read())
         self.recover()
 
     def recover(self):
@@ -59,7 +59,7 @@ class Store(object):
         self.lock_fd.close()
         self.state = Store.IDLE
 
-    def commit(self, tid):
+    def commit(self):
         """
         """
         if self.state == Store.OPEN:
@@ -70,28 +70,29 @@ class Store(object):
         with open(os.path.join(self.path, 'txn-active', 'write_index'), 'wb') as fd:
             fd.write('\n'.join(self.txn_write))
         with open(os.path.join(self.path, 'txn-active', 'tid'), 'wb') as fd:
-            fd.write(tid)
+            fd.write(str(self.tid + 1))
         os.rename(os.path.join(self.path, 'txn-active'),
                   os.path.join(self.path, 'txn-commit'))
         self.recover()
 
     def apply_txn(self):
         assert os.path.isdir(os.path.join(self.path, 'txn-commit'))
-        tid = open(os.path.join(self.path, 'txn-commit', 'tid'), 'rb').read()
+        tid = int(open(os.path.join(self.path, 'txn-commit', 'tid'), 'rb').read())
+        assert tid == self.tid + 1
         delete_list = [line.strip() for line in
                        open(os.path.join(self.path, 'txn-commit', 'delete_index'), 'rb').readlines()]
         for name in delete_list:
-            path = os.path.join(self.path, 'objects', name)
+            path = os.path.join(self.path, 'data', name)
             os.unlink(path)
         write_list = [line.strip() for line in
                       open(os.path.join(self.path, 'txn-commit', 'write_index'), 'rb').readlines()]
         for name in write_list:
-            destname = os.path.join(self.path, 'objects', name)
+            destname = os.path.join(self.path, 'data', name)
             if not os.path.exists(os.path.dirname(destname)):
                 os.makedirs(os.path.dirname(destname))
             os.rename(os.path.join(self.path, 'txn-commit', 'write', name), destname)
         with open(os.path.join(self.path, 'tid'), 'wb') as fd:
-            fd.write(tid)
+            fd.write(str(tid))
         os.rename(os.path.join(self.path, 'txn-commit'),
                   os.path.join(self.path, 'txn-applied'))
         shutil.rmtree(os.path.join(self.path, 'txn-applied'))
@@ -110,60 +111,90 @@ class Store(object):
             os.mkdir(os.path.join(self.path, 'txn-active', 'write'))
             self.state = Store.ACTIVE
 
-    def _filename(self, hash, base=''):
-        hex = hash.encode('hex')
-        return os.path.join(base, hex[:2], hex[2:4], hex[4:])
+    def _filename(self, ns, id, base=''):
+        ns = ns.encode('hex')
+        id = id.encode('hex')
+        return os.path.join(base, ns, id[:2], id[2:4], id[4:])
             
-    def get(self, hash):
+    def get(self, ns, id):
         """
         """
-        path = self._filename(hash)
+        path = self._filename(ns, id)
         if path in self.txn_write:
             filename = os.path.join(self.path, 'txn-active', 'write', path)
             return open(filename, 'rb').read()
-        filename = self._filename(hash, os.path.join(self.path, 'objects'))
+        if path in self.txn_delete:
+            raise Exception('Object %s does not exist' % hash.encode('hex'))
+        filename = self._filename(ns, id, os.path.join(self.path, 'data'))
         if os.path.exists(filename):
             return open(filename, 'rb').read()
         else:
             raise Exception('Object %s does not exist' % hash.encode('hex'))
 
-    def put(self, data, hash=None):
+    def put(self, ns, id, data):
         """
         """
-        if not hash:
-            hash = hashlib.sha1(data).digest()
         self.prepare_txn()
-        path = self._filename(hash)
-        filename = self._filename(hash, os.path.join(self.path, 'objects'))
+        path = self._filename(ns, id)
+        filename = self._filename(ns, id, os.path.join(self.path, 'data'))
         if (path in self.txn_write or
            (path not in self.txn_delete and os.path.exists(filename))):
-            raise Exception('Object already exists: %s' % hash.encode('hex'))
+            raise Exception('Object already exists: %s:%s' % (ns.encode('hex'), id.encode('hex')))
         if path in self.txn_delete:
             self.txn_delete.remove(path)
         if path not in self.txn_write:
             self.txn_write.append(path)
-        filename = self._filename(hash, os.path.join(self.path, 'txn-active', 'write'))
+        filename = self._filename(ns, id, os.path.join(self.path, 'txn-active', 'write'))
         if not os.path.exists(os.path.dirname(filename)):
             os.makedirs(os.path.dirname(filename))
         with open(filename, 'wb') as fd:
             fd.write(data)
-        return hash
 
-    def delete(self, hash):
+    def delete(self, ns, id):
         """
         """
         self.prepare_txn()
-        path = self._filename(hash)
+        path = self._filename(ns, id)
         if path in self.txn_write:
+            filename = self._filename(ns, id, os.path.join(self.path, 'txn-active', 'write'))
             self.txn_write.remove(path)
             os.unlink(filename)
         else:
-            filename = os.path.join(self.path, 'objects', path)
+            filename = os.path.join(self.path, 'data', path)
             if os.path.exists(filename):
                 self.txn_delete.append(path)
             else:
                 raise Exception('Object does not exist: %s' % hash.encode('hex'))
 
+    def list(self, ns, prefix='', marker=None, max_keys=1000000):
+        for x in self.foo(os.path.join(self.path, 'data', ns.encode('hex')), 
+                          prefix, marker, '', max_keys):
+            yield x
+        
+
+    def foo(self, path, prefix, marker, base, max_keys):
+        n = 0
+        for name in sorted(os.listdir(path)):
+            if n >= max_keys:
+                return
+            dirs = []
+            names = []
+            id = name.decode('hex')
+            if os.path.isdir(os.path.join(path, name)):
+                if prefix and not id.startswith(prefix[:len(id)]):
+                    continue
+                for x in self.foo(os.path.join(path, name),
+                                  prefix[len(id):], marker, 
+                                  base + id, max_keys - n):
+                    yield x
+                    n += 1
+            else:
+                if prefix and not id.startswith(prefix):
+                    continue
+                if not marker or base + id >= marker:
+                    yield base + id
+                    n += 1
+
 
 class StoreTestCase(unittest.TestCase):
     
@@ -175,32 +206,47 @@ class StoreTestCase(unittest.TestCase):
         shutil.rmtree(self.tmppath)
     
     def test1(self):
-        self.assertEqual(self.store.tid, '')
+        self.assertEqual(self.store.tid, 0)
         self.assertEqual(self.store.state, Store.OPEN)
-        SOMEDATA_hash = self.store.put('SOMEDATA')
-        self.assertRaises(Exception, lambda: self.store.put('SOMEDATA'))
-        self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA')
+        self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
+        self.assertRaises(Exception, lambda: self.store.put('SOMENS', 'SOMEID', 'SOMEDATA'))
+        self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
         self.store.rollback()
-        self.assertRaises(Exception, lambda: self.store.get('SOMEDATA'))
-        self.assertEqual(self.store.tid, '')
+        self.assertRaises(Exception, lambda: self.store.get('SOMENS', 'SOMEID'))
+        self.assertEqual(self.store.tid, 0)
 
     def test2(self):
-        self.assertEqual(self.store.tid, '')
+        self.assertEqual(self.store.tid, 0)
         self.assertEqual(self.store.state, Store.OPEN)
-        SOMEDATA_hash = self.store.put('SOMEDATA')
-        self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA')
-        self.store.commit(SOMEDATA_hash)
-        self.assertEqual(self.store.tid, SOMEDATA_hash)
-        self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA')
-        self.store.delete(SOMEDATA_hash)
-        self.assertRaises(Exception, lambda: self.store.get('SOMEDATA'))
+        self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
+        self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
+        self.store.commit()
+        self.assertEqual(self.store.tid, 1)
+        self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
+        self.store.delete('SOMENS', 'SOMEID')
+        self.assertRaises(Exception, lambda: self.store.get('SOMENS', 'SOMEID'))
         self.store.rollback()
-        self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA')
-        self.store.delete(SOMEDATA_hash)
-        self.assertRaises(Exception, lambda: self.store.get('SOMEDATA'))
-        self.store.commit('Something Else')
-        self.assertEqual(self.store.tid, 'Something Else')
-        self.assertRaises(Exception, lambda: self.store.get('SOMEDATA'))
+        self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
+        self.store.delete('SOMENS', 'SOMEID')
+        self.assertRaises(Exception, lambda: self.store.get('SOMENS', 'SOMEID'))
+        self.store.commit()
+        self.assertEqual(self.store.tid, 2)
+        self.assertRaises(Exception, lambda: self.store.get('SOMENS', 'SOMEID'))
+
+    def test_list(self):
+        self.store.put('SOMENS', 'SOMEID12', 'SOMEDATA')
+        self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
+        self.store.put('SOMENS', 'SOMEID1', 'SOMEDATA')
+        self.store.put('SOMENS', 'SOMEID123', 'SOMEDATA')
+        self.store.commit()
+        self.assertEqual(list(self.store.list('SOMENS', max_keys=3)), 
+            ['SOMEID', 'SOMEID1', 'SOMEID12'])
+        self.assertEqual(list(self.store.list('SOMENS', marker='SOMEID12')), 
+            ['SOMEID12', 'SOMEID123'])
+        self.assertEqual(list(self.store.list('SOMENS', prefix='SOMEID1', max_keys=2)), 
+            ['SOMEID1', 'SOMEID12'])
+        self.assertEqual(list(self.store.list('SOMENS', prefix='SOMEID1', marker='SOMEID12')), 
+            ['SOMEID12', 'SOMEID123'])
 
 
 if __name__ == '__main__':