浏览代码

Started work on archiver.py

Jonas Borgström 15 年之前
父节点
当前提交
695d5699ae
共有 2 个文件被更改,包括 111 次插入7 次删除
  1. 92 0
      dedupstore/archiver.py
  2. 19 7
      dedupstore/repository.py

+ 92 - 0
dedupstore/archiver.py

@@ -0,0 +1,92 @@
+import os
+import sys
+import hashlib
+import zlib
+from repository import Repository
+
+CHUNKSIZE = 256 * 1024
+
+class FileItem(object):
+    
+    def __init__(self):
+        """"""
+
+    def process_file(self, filename, cache):
+        self.filename = filename
+        fd = open(filename, 'rb')
+        self.size = 0
+        self.chunks = []
+        while True:
+            data = fd.read(CHUNKSIZE)
+            if not data:
+                break
+            self.size += len(data)
+            self.chunks.append(cache.add_chunk(zlib.compress(data)))
+        print '%s: %d chunks' % (filename, len(self.chunks))
+
+
+class Cache(object):
+    """Client Side cache
+    """
+    def __init__(self, repo):
+        self.repo = repo
+        self.chunkmap = {}
+
+    def chunk_filename(self, sha):
+        hex = sha.encode('hex')
+        return 'chunks/%s/%s/%s' % (hex[:2], hex[2:4], hex[4:])
+
+    def add_chunk(self, data):
+        sha = hashlib.sha1(data).digest()
+        if not self.seen_chunk(sha):
+            self.repo.put_file(self.chunk_filename(sha), data)
+        else:
+            print 'seen chunk', sha.encode('hex')
+        self.chunk_incref(sha)
+        return sha
+
+    def seen_chunk(self, sha):
+        return self.chunkmap.get(sha, 0) > 0
+
+    def chunk_incref(self, sha):
+        self.chunkmap.setdefault(sha, 0)
+        self.chunkmap[sha] += 1
+
+    def chunk_decref(self, sha):
+        assert self.chunkmap.get(sha, 0) > 0
+        self.chunkmap[sha] -= 1
+        return self.chunkmap[sha]
+
+
+class Archive(object):
+    """
+    """
+    def __init__(self):
+        self.items = []
+
+    def add_item(self, item):
+        self.items.append(item)
+
+
+class Archiver(object):
+
+    def __init__(self):
+        self.cache = Cache(Repository('/tmp/repo'))
+        self.archive = Archive()
+
+    def run(self, path):
+        for root, dirs, files in os.walk(path):
+            for f in files:
+                filename = os.path.join(root, f)
+                item = FileItem()
+                item.process_file(filename, self.cache)
+                self.archive.add_item(item)
+        self.cache.repo.commit()
+
+
+def main():
+    archiver = Archiver()
+    archiver.run(sys.argv[1])
+
+if __name__ == '__main__':
+    main()

+ 19 - 7
dedupstore/repository.py

@@ -7,8 +7,6 @@ import posixpath
 import shutil
 import unittest
 
-CHUNKSIZE = 256 * 1024
-
 log = logging.getLogger('')
 
 class Repository(object):
@@ -42,6 +40,7 @@ class Repository(object):
             raise Exception('%s Does not look like a repository2')
         self.lock_fd = open(os.path.join(path, 'lock'), 'w')
         fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
+        print 'locked'
         self.tid = int(open(os.path.join(path, 'tid'), 'r').read())
         self.recover()
 
@@ -68,6 +67,9 @@ class Repository(object):
         remove_fd = open(os.path.join(self.path, 'txn-active', 'remove'), 'wb')
         remove_fd.write('\n'.join(self.txn_removed))
         remove_fd.close()
+        add_fd = open(os.path.join(self.path, 'txn-active', 'add_index'), 'wb')
+        add_fd.write('\n'.join(self.txn_added))
+        add_fd.close()
         tid_fd = open(os.path.join(self.path, 'txn-active', 'tid'), 'wb')
         tid_fd.write(str(self.tid + 1))
         tid_fd.close()
@@ -84,10 +86,14 @@ class Repository(object):
         for name in remove_list:
             path = os.path.join(self.path, 'data', name)
             os.unlink(path)
+        add_list = [line.strip() for line in
+                    open(os.path.join(self.path, 'txn-commit', 'add_index'), 'rb').readlines()]
         add_path = os.path.join(self.path, 'txn-commit', 'add')
         for name in os.listdir(add_path):
-            shutil.move(os.path.join(add_path, name),
-                        os.path.join(self.path, 'data'))
+            destname = os.path.join(self.path, 'data', name)
+            if not os.path.exists(os.path.dirname(destname)):
+                os.makedirs(os.path.dirname(destname))
+            shutil.move(os.path.join(add_path, name), destname)
         tid_fd = open(os.path.join(self.path, 'tid'), 'wb')
         tid_fd.write(str(tid))
         tid_fd.close()
@@ -111,6 +117,7 @@ class Repository(object):
             os.mkdir(os.path.join(self.path, 'txn-active'))
             os.mkdir(os.path.join(self.path, 'txn-active', 'add'))
             self.txn_removed = []
+            self.txn_added = []
             self.state = Repository.ACTIVE
             
     def get_file(self, path):
@@ -127,11 +134,16 @@ class Repository(object):
         """
         """
         self.prepare_txn()
-        if path in self.txn_removed:
-            self.txn_removed.remove(path)
         if os.path.exists(os.path.join(self.path, 'txn-active', 'add', path)):
             raise Exception('FileAlreadyExists: %s' % path)
-        fd = open(os.path.join(self.path, 'txn-active', 'add', path), 'wb')
+        if path in self.txn_removed:
+            self.txn_removed.remove(path)
+        if path not in self.txn_added:
+                self.txn_added.append(path)
+        filename = os.path.join(self.path, 'txn-active', 'add', path)
+        if not os.path.exists(os.path.dirname(filename)):
+            os.makedirs(os.path.dirname(filename))  
+        fd = open(filename, 'wb')
         try:
             fd.write(data)
         finally: