Browse Source

Switch from Repository to Store.

Jonas Borgström 15 years ago
parent
commit
575efb5e0b
3 changed files with 116 additions and 309 deletions
  1. 76 47
      dedupstore/archiver.py
  2. 0 223
      dedupstore/repository.py
  3. 40 39
      dedupstore/store.py

+ 76 - 47
dedupstore/archiver.py

@@ -3,7 +3,7 @@ import sys
 import hashlib
 import zlib
 import cPickle
-from repository import Repository
+from store import Store
 
 CHUNKSIZE = 256 * 1024
 
@@ -11,21 +11,19 @@ CHUNKSIZE = 256 * 1024
 class Cache(object):
     """Client Side cache
     """
-    def __init__(self, path, repo):
-        self.repo = repo
+    def __init__(self, path, store):
+        self.store = store
         self.path = path
-        self.chunkmap = {}
-        self.archives = []
-        self.tid = -1
+        self.tid = 'unknown'
         self.open()
-        if self.tid != self.repo.tid:
-            print self.tid, self.repo.tid
+        if self.tid != self.store.tid:
+            print self.tid.encode('hex'), self.store.tid.encode('hex')
             self.create()
 
     def open(self):
-        if self.repo.tid == 0:
+        if self.store.tid == '':
             return
-        filename = os.path.join(self.path, '%s.cache' % self.repo.uuid)
+        filename = os.path.join(self.path, '%s.cache' % self.store.uuid)
         if not os.path.exists(filename):
             return
         print 'Reading cache: ', filename, '...'
@@ -35,59 +33,75 @@ class Cache(object):
         self.archives = data['archives']
         print 'done'
 
+    def update_manifest(self):
+        print 'old manifest', self.tid.encode('hex')
+        if self.tid:
+            self.chunk_decref(self.tid)
+        manifest = {'archives': self.archives.values()}
+        hash = self.add_chunk(zlib.compress(cPickle.dumps(manifest)))
+        print 'new manifest', hash.encode('hex')
+        self.store.commit(hash)
+
     def create(self):
+        self.archives = {}
+        self.chunkmap = {}
+        self.tid = self.store.tid
+        if self.store.tid == '':
+            return
         print 'Recreating cache...'
-        for archive in self.repo.listdir('archives'):
-            self.archives.append(archive)
-            data = self.repo.get_file(os.path.join('archives', archive))
-            a = cPickle.loads(zlib.decompress(data))
-            for item in a['items']:
+        self.chunk_incref(self.store.tid)
+        manifest = cPickle.loads(zlib.decompress(self.store.get(self.store.tid)))
+        for hash in manifest['archives']:
+            self.chunk_incref(hash)
+            archive = cPickle.loads(zlib.decompress(self.store.get(hash)))
+            self.archives[archive['name']] = hash
+            for item in archive['items']:
                 if item['type'] == 'FILE':
                     for c in item['chunks']:
                         self.chunk_incref(c)
-        self.tid = self.repo.tid
         print 'done'
 
     def save(self):
-        assert self.repo.state == Repository.OPEN
-        print 'saving',self.tid, self.repo.tid
-        data = {'chunkmap': self.chunkmap, 'tid': self.repo.tid, 'archives': self.archives}
-        filename = os.path.join(self.path, '%s.cache' % self.repo.uuid)
+        assert self.store.state == Store.OPEN
+        print 'saving cache'
+        data = {'chunkmap': self.chunkmap, 'tid': self.store.tid, 'archives': self.archives}
+        filename = os.path.join(self.path, '%s.cache' % self.store.uuid)
         print 'Saving cache as:', filename
         with open(filename, 'wb') as fd:
             fd.write(zlib.compress(cPickle.dumps(data)))
         print 'done'
 
-    def chunk_filename(self, sha):
-        hex = sha.encode('hex')
-        return 'chunks/%s/%s/%s' % (hex[:2], hex[2:4], hex[4:])
-
     def add_chunk(self, data):
-        sha = hashlib.sha1(data).digest()
-        if not self.seen_chunk(sha):
-            self.repo.put_file(self.chunk_filename(sha), data)
+        hash = hashlib.sha1(data).digest()
+        if not self.seen_chunk(hash):
+            self.store.put(data, hash)
         else:
-            print 'seen chunk', sha.encode('hex')
-        self.chunk_incref(sha)
-        return sha
+            print 'seen chunk', hash.encode('hex')
+        self.chunk_incref(hash)
+        return hash
+
+    def seen_chunk(self, hash):
+        return self.chunkmap.get(hash, 0) > 0
 
-    def seen_chunk(self, sha):
-        return self.chunkmap.get(sha, 0) > 0
+    def chunk_incref(self, hash):
+        self.chunkmap.setdefault(hash, 0)
+        self.chunkmap[hash] += 1
 
-    def chunk_incref(self, sha):
-        self.chunkmap.setdefault(sha, 0)
-        self.chunkmap[sha] += 1
+    def chunk_decref(self, hash):
+        count = self.chunkmap.get(hash, 0) - 1
+        assert count >= 0
+        self.chunkmap[hash] = count
+        if not count:
+            print 'deleting chunk: ', hash.encode('hex')
+            self.store.delete(hash)
+        return count
 
-    def chunk_decref(self, sha):
-        assert self.chunkmap.get(sha, 0) > 0
-        self.chunkmap[sha] -= 1
-        return self.chunkmap[sha]
 
 class Archiver(object):
 
     def __init__(self):
-        self.repo = Repository('/tmp/repo')
-        self.cache = Cache('/tmp/cache', self.repo)
+        self.store = Store('/tmp/store')
+        self.cache = Cache('/tmp/cache', self.store)
 
     def create_archive(self, archive_name, path):
         if archive_name in self.cache.archives:
@@ -101,11 +115,23 @@ class Archiver(object):
                 name = os.path.join(root, f)
                 items.append(self.process_file(name, self.cache))
         archive = {'name': name, 'items': items}
-        zdata = zlib.compress(cPickle.dumps(archive))
-        self.repo.put_file(os.path.join('archives', archive_name), zdata)
-        self.cache.archives.append(archive_name)
-        print 'Archive file size: %d' % len(zdata)
-        self.repo.commit()
+        hash = self.cache.add_chunk(zlib.compress(cPickle.dumps(archive)))
+        self.cache.archives[archive_name] = hash
+        self.cache.update_manifest()
+        self.cache.save()
+
+    def delete_archive(self, archive_name):
+        hash = self.cache.archives.get(archive_name)
+        if not hash:
+            raise Exception('Archive "%s" does not exist' % archive_name)
+        archive = cPickle.loads(zlib.decompress(self.store.get(hash)))
+        self.cache.chunk_decref(hash)
+        for item in archive['items']:
+            if item['type'] == 'FILE':
+                for c in item['chunks']:
+                    self.cache.chunk_decref(c)
+        del self.cache.archives[archive_name]
+        self.cache.update_manifest()
         self.cache.save()
 
     def process_dir(self, path, cache):
@@ -128,7 +154,10 @@ class Archiver(object):
 
 def main():
     archiver = Archiver()
-    archiver.create_archive(sys.argv[1], sys.argv[2])
+    if sys.argv[1] == 'delete':
+        archiver.delete_archive(sys.argv[2])
+    else:
+        archiver.create_archive(sys.argv[1], sys.argv[2])
 
 if __name__ == '__main__':
     main()

+ 0 - 223
dedupstore/repository.py

@@ -1,223 +0,0 @@
-#!/usr/bin/env python
-import fcntl
-import tempfile
-import logging
-import os
-import posixpath
-import shutil
-import unittest
-import uuid
-
-log = logging.getLogger('')
-
-
-class Repository(object):
-    """
-    """
-    IDLE = 'Idle'
-    OPEN = 'Open'
-    ACTIVE = 'Active'
-    VERSION = 'DEDUPSTORE REPOSITORY VERSION 1'
-
-    def __init__(self, path):
-        self.tid = -1
-        self.state = Repository.IDLE
-        if not os.path.exists(path):
-            self.create(path)
-        self.open(path)
-    
-    def create(self, path):
-        log.info('Initializing Repository at "%s"' % path)
-        os.mkdir(path)
-        open(os.path.join(path, 'VERSION'), 'wb').write(self.VERSION)
-        open(os.path.join(path, 'uuid'), 'wb').write(str(uuid.uuid4()))
-        open(os.path.join(path, 'tid'), 'wb').write('0')
-        os.mkdir(os.path.join(path, 'data'))
-
-    def open(self, path):
-        self.path = path
-        if not os.path.isdir(path):
-            raise Exception('%s Does not look like a repository')
-        version_path = os.path.join(path, 'version')
-        if not os.path.exists(version_path) or open(version_path, 'rb').read() != self.VERSION:
-            raise Exception('%s Does not look like a repository2')
-        self.uuid = open(os.path.join(path, 'uuid'), 'rb').read()
-        self.lock_fd = open(os.path.join(path, 'lock'), 'w')
-        fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
-        self.tid = int(open(os.path.join(path, 'tid'), 'r').read())
-        self.recover()
-
-    def recover(self):
-        if os.path.exists(os.path.join(self.path, 'txn-active')):
-            self.rollback()
-        if os.path.exists(os.path.join(self.path, 'txn-commit')):
-            self.apply_txn()
-        if os.path.exists(os.path.join(self.path, 'txn-applied')):
-            shutil.rmtree(os.path.join(self.path, 'txn-applied'))
-        self.state = Repository.OPEN
-
-    def close(self):
-        self.recover()
-        self.lock_fd.close()
-        self.state = Repository.IDLE
-
-    def commit(self):
-        """
-        """
-        if self.state == Repository.OPEN:
-            return
-        assert self.state == Repository.ACTIVE
-        remove_fd = open(os.path.join(self.path, 'txn-active', 'remove'), 'wb')
-        remove_fd.write('\n'.join(self.txn_removed))
-        remove_fd.close()
-        add_fd = open(os.path.join(self.path, 'txn-active', 'add_index'), 'wb')
-        add_fd.write('\n'.join(self.txn_added))
-        add_fd.close()
-        tid_fd = open(os.path.join(self.path, 'txn-active', 'tid'), 'wb')
-        tid_fd.write(str(self.tid + 1))
-        tid_fd.close()
-        os.rename(os.path.join(self.path, 'txn-active'),
-                  os.path.join(self.path, 'txn-commit'))
-        self.apply_txn()
-
-    def apply_txn(self):
-        assert os.path.isdir(os.path.join(self.path, 'txn-commit'))
-        tid = int(open(os.path.join(self.path, 'txn-commit', 'tid'), 'rb').read())
-        assert tid >= self.tid
-        remove_list = [line.strip() for line in
-                       open(os.path.join(self.path, 'txn-commit', 'remove'), 'rb').readlines()]
-        for name in remove_list:
-            path = os.path.join(self.path, 'data', name)
-            os.unlink(path)
-        add_list = [line.strip() for line in
-                    open(os.path.join(self.path, 'txn-commit', 'add_index'), 'rb').readlines()]
-        for name in add_list:
-            destname = os.path.join(self.path, 'data', name)
-            if not os.path.exists(os.path.dirname(destname)):
-                os.makedirs(os.path.dirname(destname))
-            shutil.move(os.path.join(self.path, 'txn-commit', 'add', name), destname)
-        tid_fd = open(os.path.join(self.path, 'tid'), 'wb')
-        tid_fd.write(str(tid))
-        tid_fd.close()
-        os.rename(os.path.join(self.path, 'txn-commit'),
-                  os.path.join(self.path, 'txn-applied'))
-        shutil.rmtree(os.path.join(self.path, 'txn-applied'))
-        self.tid = tid
-        self.state = Repository.OPEN
-
-    def rollback(self):
-        """
-        """
-        txn_path = os.path.join(self.path, 'txn-active')
-        if os.path.exists(txn_path):
-            shutil.rmtree(txn_path)
-        self.state = Repository.OPEN
-
-    def prepare_txn(self):
-        if self.state == Repository.ACTIVE:
-            return os.path.join(self.path, 'txn-active')
-        elif self.state == Repository.OPEN:
-            os.mkdir(os.path.join(self.path, 'txn-active'))
-            os.mkdir(os.path.join(self.path, 'txn-active', 'add'))
-            self.txn_removed = []
-            self.txn_added = []
-            self.state = Repository.ACTIVE
-            
-    def get_file(self, path):
-        """
-        """
-        if os.path.exists(os.path.join(self.path, 'txn-active', 'add', path)):
-            return open(os.path.join(self.path, 'txn-active', 'add', path), 'rb').read()
-        elif os.path.exists(os.path.join(self.path, 'data', path)):
-            return open(os.path.join(self.path, 'data', path), 'rb').read()
-        else:
-            raise Exception('FileNotFound: %s' % path)
-
-    def put_file(self, path, data):
-        """
-        """
-        self.prepare_txn()
-        if (path in self.txn_added or
-           (path not in self.txn_removed and os.path.exists(os.path.join(self.path, 'data', path)))):
-            raise Exception('FileAlreadyExists: %s' % path)
-        if path in self.txn_removed:
-            self.txn_removed.remove(path)
-        if path not in self.txn_added:
-            self.txn_added.append(path)
-        filename = os.path.join(self.path, 'txn-active', 'add', path)
-        if not os.path.exists(os.path.dirname(filename)):
-            os.makedirs(os.path.dirname(filename))  
-        fd = open(filename, 'wb')
-        try:
-            fd.write(data)
-        finally:
-            fd.close()
-
-    def delete(self, path):
-        """
-        """
-        self.prepare_txn()
-        if os.path.exists(os.path.join(self.path, 'txn-active', 'add', path)):
-            os.unlink(os.path.join(self.path, 'txn-active', 'add', path))
-        elif os.path.exists(os.path.join(self.path, 'data', path)):
-            self.txn_removed.append(path)
-        else:
-            raise Exception('FileNotFound: %s' % path)
-
-    def listdir(self, path):
-        """
-        """
-        entries = set(os.listdir(os.path.join(self.path, 'data', path)))
-        if self.state == Repository.ACTIVE:
-            txn_entries = set(os.listdir(os.path.join(self.path, 'txn-active', 'add', path)))
-            entries = entries.union(txn_entries)
-            for e in entries:
-                if posixpath.join(path, e) in self.txn_removed:
-                    entries.remove(e)
-        return list(entries)
-
-    def mkdir(self, path):
-        """
-        """
-
-    def rmdir(self, path):
-        """
-        """
-
-class RepositoryTestCase(unittest.TestCase):
-    
-    def setUp(self):
-        self.tmppath = tempfile.mkdtemp()
-        self.repo = Repository(os.path.join(self.tmppath, 'repo'))
-
-    def tearDown(self):
-        shutil.rmtree(self.tmppath)
-    
-    def test1(self):
-        self.assertEqual(self.repo.tid, 0)
-        self.assertEqual(self.repo.state, Repository.OPEN)
-        self.assertEqual(self.repo.listdir(''), [])
-        self.repo.put_file('foo', 'SOMEDATA')
-        self.assertRaises(Exception, lambda: self.repo.put_file('foo', 'SOMETHINGELSE'))
-        self.assertEqual(self.repo.get_file('foo'), 'SOMEDATA')
-        self.assertEqual(self.repo.listdir(''), ['foo'])
-        self.repo.rollback()
-        self.assertEqual(self.repo.listdir(''), [])
-
-    def test2(self):
-        self.repo.put_file('foo', 'SOMEDATA')
-        self.repo.put_file('bar', 'SOMEDATAbar')
-        self.assertEqual(self.repo.listdir(''), ['foo', 'bar'])
-        self.assertEqual(self.repo.get_file('foo'), 'SOMEDATA')
-        self.repo.delete('foo')
-        self.assertRaises(Exception, lambda: self.repo.get_file('foo'))
-        self.assertEqual(self.repo.listdir(''), ['bar'])
-        self.assertEqual(self.repo.state, Repository.ACTIVE)
-        self.assertEqual(os.path.exists(os.path.join(self.tmppath, 'repo', 'data', 'bar')), False)
-        self.repo.commit()
-        self.assertEqual(os.path.exists(os.path.join(self.tmppath, 'repo', 'data', 'bar')), True)
-        self.assertEqual(self.repo.listdir(''), ['bar'])
-        self.assertEqual(self.repo.state, Repository.IDLE)
-
-if __name__ == '__main__':
-    unittest.main()

+ 40 - 39
dedupstore/store.py

@@ -17,7 +17,7 @@ class Store(object):
     VERSION = 'DEDUPSTORE VERSION 1'
 
     def __init__(self, path):
-        self.tid = -1
+        self.tid = ''
         self.state = Store.IDLE
         if not os.path.exists(path):
             self.create(path)
@@ -27,7 +27,7 @@ class Store(object):
         os.mkdir(path)
         open(os.path.join(path, 'version'), 'wb').write(self.VERSION)
         open(os.path.join(path, 'uuid'), 'wb').write(str(uuid.uuid4()))
-        open(os.path.join(path, 'tid'), 'wb').write('0')
+        open(os.path.join(path, 'tid'), 'wb').write('')
         os.mkdir(os.path.join(path, 'data'))
 
     def open(self, path):
@@ -40,7 +40,7 @@ class Store(object):
         self.uuid = open(os.path.join(path, 'uuid'), 'rb').read()
         self.lock_fd = open(os.path.join(path, 'lock'), 'w')
         fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
-        self.tid = int(open(os.path.join(path, 'tid'), 'r').read())
+        self.tid = open(os.path.join(path, 'tid'), 'r').read()
         self.recover()
 
     def recover(self):
@@ -59,7 +59,7 @@ class Store(object):
         self.lock_fd.close()
         self.state = Store.IDLE
 
-    def commit(self):
+    def commit(self, tid):
         """
         """
         if self.state == Store.OPEN:
@@ -70,15 +70,14 @@ class Store(object):
         with open(os.path.join(self.path, 'txn-active', 'write_index'), 'wb') as fd:
             fd.write('\n'.join(self.txn_write))
         with open(os.path.join(self.path, 'txn-active', 'tid'), 'wb') as fd:
-            fd.write(str(self.tid + 1))
+            fd.write(tid)
         os.rename(os.path.join(self.path, 'txn-active'),
                   os.path.join(self.path, 'txn-commit'))
         self.recover()
 
     def apply_txn(self):
         assert os.path.isdir(os.path.join(self.path, 'txn-commit'))
-        tid = int(open(os.path.join(self.path, 'txn-commit', 'tid'), 'rb').read())
-        assert tid == self.tid + 1
+        tid = open(os.path.join(self.path, 'txn-commit', 'tid'), 'rb').read()
         delete_list = [line.strip() for line in
                        open(os.path.join(self.path, 'txn-commit', 'delete_index'), 'rb').readlines()]
         for name in delete_list:
@@ -92,7 +91,7 @@ class Store(object):
                 os.makedirs(os.path.dirname(destname))
             os.rename(os.path.join(self.path, 'txn-commit', 'write', name), destname)
         with open(os.path.join(self.path, 'tid'), 'wb') as fd:
-            fd.write(str(tid))
+            fd.write(tid)
         os.rename(os.path.join(self.path, 'txn-commit'),
                   os.path.join(self.path, 'txn-applied'))
         shutil.rmtree(os.path.join(self.path, 'txn-applied'))
@@ -111,50 +110,50 @@ class Store(object):
             os.mkdir(os.path.join(self.path, 'txn-active', 'write'))
             self.state = Store.ACTIVE
 
-    def _filename(self, sha, base=''):
-        hex = sha.encode('hex')
+    def _filename(self, hash, base=''):
+        hex = hash.encode('hex')
         return os.path.join(base, hex[:2], hex[2:4], hex[4:])
             
-    def get(self, sha):
+    def get(self, hash):
         """
         """
-        path = self._filename(sha)
+        path = self._filename(hash)
         if path in self.txn_write:
             filename = os.path.join(self.path, 'txn-active', 'write', path)
             return open(filename, 'rb').read()
-        filename = self._filename(sha, os.path.join(self.path, 'objects'))
+        filename = self._filename(hash, os.path.join(self.path, 'objects'))
         if os.path.exists(filename):
             return open(filename, 'rb').read()
         else:
-            raise Exception('Object %s does not exist' % sha.encode('hex'))
+            raise Exception('Object %s does not exist' % hash.encode('hex'))
 
-    def put(self, data, sha=None):
+    def put(self, data, hash=None):
         """
         """
-        if not sha:
-            sha = hashlib.sha1(data).digest()
+        if not hash:
+            hash = hashlib.sha1(data).digest()
         self.prepare_txn()
-        path = self._filename(sha)
-        filename = self._filename(sha, os.path.join(self.path, 'objects'))
+        path = self._filename(hash)
+        filename = self._filename(hash, os.path.join(self.path, 'objects'))
         if (path in self.txn_write or
            (path not in self.txn_delete and os.path.exists(filename))):
-            raise Exception('Object already exists: %s' % sha.encode('hex'))
+            raise Exception('Object already exists: %s' % hash.encode('hex'))
         if path in self.txn_delete:
             self.txn_delete.remove(path)
         if path not in self.txn_write:
             self.txn_write.append(path)
-        filename = self._filename(sha, os.path.join(self.path, 'txn-active', 'write'))
+        filename = self._filename(hash, os.path.join(self.path, 'txn-active', 'write'))
         if not os.path.exists(os.path.dirname(filename)):
             os.makedirs(os.path.dirname(filename))
         with open(filename, 'wb') as fd:
             fd.write(data)
-        return sha
+        return hash
 
-    def delete(self, sha):
+    def delete(self, hash):
         """
         """
         self.prepare_txn()
-        path = self._filename(sha)
+        path = self._filename(hash)
         if path in self.txn_write:
             self.txn_write.remove(path)
             os.unlink(filename)
@@ -163,7 +162,7 @@ class Store(object):
             if os.path.exists(filename):
                 self.txn_delete.append(path)
             else:
-                raise Exception('Object does not exist: %s' % sha.encode('hex'))
+                raise Exception('Object does not exist: %s' % hash.encode('hex'))
 
 
 class StoreTestCase(unittest.TestCase):
@@ -176,31 +175,33 @@ class StoreTestCase(unittest.TestCase):
         shutil.rmtree(self.tmppath)
     
     def test1(self):
-        self.assertEqual(self.store.tid, 0)
+        self.assertEqual(self.store.tid, '')
         self.assertEqual(self.store.state, Store.OPEN)
-        SOMEDATA_sha = self.store.put('SOMEDATA')
+        SOMEDATA_hash = self.store.put('SOMEDATA')
         self.assertRaises(Exception, lambda: self.store.put('SOMEDATA'))
-        self.assertEqual(self.store.get(SOMEDATA_sha), 'SOMEDATA')
+        self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA')
         self.store.rollback()
         self.assertRaises(Exception, lambda: self.store.get('SOMEDATA'))
-        self.assertEqual(self.store.tid, 0)
+        self.assertEqual(self.store.tid, '')
 
     def test2(self):
-        self.assertEqual(self.store.tid, 0)
+        self.assertEqual(self.store.tid, '')
         self.assertEqual(self.store.state, Store.OPEN)
-        SOMEDATA_sha = self.store.put('SOMEDATA')
-        self.assertEqual(self.store.get(SOMEDATA_sha), 'SOMEDATA')
-        self.store.commit()
-        self.assertEqual(self.store.get(SOMEDATA_sha), 'SOMEDATA')
-        self.assertEqual(self.store.tid, 1)
-        self.store.delete(SOMEDATA_sha)
+        SOMEDATA_hash = self.store.put('SOMEDATA')
+        self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA')
+        self.store.commit(SOMEDATA_hash)
+        self.assertEqual(self.store.tid, SOMEDATA_hash)
+        self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA')
+        self.store.delete(SOMEDATA_hash)
         self.assertRaises(Exception, lambda: self.store.get('SOMEDATA'))
         self.store.rollback()
-        self.assertEqual(self.store.get(SOMEDATA_sha), 'SOMEDATA')
-        self.store.delete(SOMEDATA_sha)
+        self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA')
+        self.store.delete(SOMEDATA_hash)
         self.assertRaises(Exception, lambda: self.store.get('SOMEDATA'))
-        self.store.commit()
+        self.store.commit('Something Else')
+        self.assertEqual(self.store.tid, 'Something Else')
         self.assertRaises(Exception, lambda: self.store.get('SOMEDATA'))
 
+
 if __name__ == '__main__':
     unittest.main()