Преглед изворни кода

First on-disk archive format.

Jonas Borgström пре 15 година
родитељ
комит
21069cc618
2 измењених фајлова са 58 додато и 44 уклоњено
  1. 53 39
      dedupstore/archiver.py
  2. 5 5
      dedupstore/repository.py

+ 53 - 39
dedupstore/archiver.py

@@ -2,35 +2,34 @@ import os
 import sys
 import hashlib
 import zlib
+import cPickle
 from repository import Repository
 
 CHUNKSIZE = 256 * 1024
 
-class FileItem(object):
-    
-    def __init__(self):
-        """"""
-
-    def process_file(self, filename, cache):
-        self.filename = filename
-        fd = open(filename, 'rb')
-        self.size = 0
-        self.chunks = []
-        while True:
-            data = fd.read(CHUNKSIZE)
-            if not data:
-                break
-            self.size += len(data)
-            self.chunks.append(cache.add_chunk(zlib.compress(data)))
-        print '%s: %d chunks' % (filename, len(self.chunks))
-
 
 class Cache(object):
     """Client Side cache
     """
-    def __init__(self, repo):
+    def __init__(self, path, repo):
         self.repo = repo
         self.chunkmap = {}
+        self.archives = []
+        self.open(path)
+
+    def open(self, path):
+        for archive in self.repo.listdir('archives'):
+            self.archives.append(archive)
+            data = self.repo.get_file(os.path.join('archives', archive))
+            a = cPickle.loads(zlib.decompress(data))
+            for item in a['items']:
+                if item['type'] == 'FILE':
+                    for c in item['chunks']:
+                        print 'adding chunk', c.encode('hex')
+                        self.chunk_incref(c)
+
+    def save(self):
+        assert self.repo.state == Repository.OPEN
 
     def chunk_filename(self, sha):
         hex = sha.encode('hex')
@@ -57,36 +56,51 @@ class Cache(object):
         self.chunkmap[sha] -= 1
         return self.chunkmap[sha]
 
-
-class Archive(object):
-    """
-    """
-    def __init__(self):
-        self.items = []
-
-    def add_item(self, item):
-        self.items.append(item)
-
-
 class Archiver(object):
 
     def __init__(self):
-        self.cache = Cache(Repository('/tmp/repo'))
-        self.archive = Archive()
+        self.repo = Repository('/tmp/repo')
+        self.cache = Cache('/tmp/cache', self.repo)
 
-    def run(self, path):
+    def create_archive(self, archive_name, path):
+        if archive_name in self.cache.archives:
+            raise Exception('Archive "%s" already exists' % archive_name)
+        items = []
         for root, dirs, files in os.walk(path):
+            for d in dirs:
+                name = os.path.join(root, d)
+                items.append(self.process_dir(name, self.cache))
             for f in files:
-                filename = os.path.join(root, f)
-                item = FileItem()
-                item.process_file(filename, self.cache)
-                self.archive.add_item(item)
-        self.cache.repo.commit()
+                name = os.path.join(root, f)
+                items.append(self.process_file(name, self.cache))
+        archive = {'name': name, 'items': items}
+        zdata = zlib.compress(cPickle.dumps(archive))
+        self.repo.put_file(os.path.join('archives', archive_name), zdata)
+        print 'Archive file size: %d' % len(zdata)
+        self.repo.commit()
+        self.cache.save()
+
+    def process_dir(self, path, cache):
+        print 'Directory: %s' % (path)
+        return {'type': 'DIR', 'path': path}
+
+    def process_file(self, path, cache):
+        fd = open(path, 'rb')
+        size = 0
+        chunks = []
+        while True:
+            data = fd.read(CHUNKSIZE)
+            if not data:
+                break
+            size += len(data)
+            chunks.append(cache.add_chunk(zlib.compress(data)))
+        print 'File: %s (%d chunks)' % (path, len(chunks))
+        return {'type': 'FILE', 'path': path, 'size': size, 'chunks': chunks}
 
 
 def main():
     archiver = Archiver()
-    archiver.run(sys.argv[1])
+    archiver.create_archive(sys.argv[1], sys.argv[2])
 
 if __name__ == '__main__':
     main()

+ 5 - 5
dedupstore/repository.py

@@ -9,6 +9,8 @@ import unittest
 
 log = logging.getLogger('')
 
+# FIXME: UUID
+
 class Repository(object):
     """
     """
@@ -40,7 +42,6 @@ class Repository(object):
             raise Exception('%s Does not look like a repository2')
         self.lock_fd = open(os.path.join(path, 'lock'), 'w')
         fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
-        print 'locked'
         self.tid = int(open(os.path.join(path, 'tid'), 'r').read())
         self.recover()
 
@@ -88,19 +89,18 @@ class Repository(object):
             os.unlink(path)
         add_list = [line.strip() for line in
                     open(os.path.join(self.path, 'txn-commit', 'add_index'), 'rb').readlines()]
-        add_path = os.path.join(self.path, 'txn-commit', 'add')
-        for name in os.listdir(add_path):
+        for name in add_list:
             destname = os.path.join(self.path, 'data', name)
             if not os.path.exists(os.path.dirname(destname)):
                 os.makedirs(os.path.dirname(destname))
-            shutil.move(os.path.join(add_path, name), destname)
+            shutil.move(os.path.join(self.path, 'txn-commit', 'add', name), destname)
         tid_fd = open(os.path.join(self.path, 'tid'), 'wb')
         tid_fd.write(str(tid))
         tid_fd.close()
         os.rename(os.path.join(self.path, 'txn-commit'),
                   os.path.join(self.path, 'txn-applied'))
         shutil.rmtree(os.path.join(self.path, 'txn-applied'))
-        self.state = Repository.IDLE
+        self.state = Repository.OPEN
 
     def rollback(self):
         """