Browse Source

Deduplicate the items stream the same way as ordinary files

Jonas Borgström 14 years ago
parent
commit
748401e21e
2 changed files with 26 additions and 34 deletions
  1. 25 33
      darc/archive.py
  2. 1 1
      darc/store.py

+ 25 - 33
darc/archive.py

@@ -6,7 +6,7 @@ import os
 import socket
 import socket
 import stat
 import stat
 import sys
 import sys
-from zlib import crc32
+from cStringIO import StringIO
 from xattr import xattr, XATTR_NOFOLLOW
 from xattr import xattr, XATTR_NOFOLLOW
 
 
 from . import NS_ARCHIVE_METADATA, NS_CHUNK
 from . import NS_ARCHIVE_METADATA, NS_CHUNK
@@ -29,8 +29,7 @@ class Archive(object):
         self.key = key
         self.key = key
         self.store = store
         self.store = store
         self.cache = cache
         self.cache = cache
-        self.items = ''
-        self.items_refs = []
+        self.items = StringIO()
         self.items_ids = []
         self.items_ids = []
         self.hard_links = {}
         self.hard_links = {}
         if name:
         if name:
@@ -55,44 +54,41 @@ class Archive(object):
         unpacker = msgpack.Unpacker()
         unpacker = msgpack.Unpacker()
         counter = Counter(0)
         counter = Counter(0)
         def cb(chunk, error, id):
         def cb(chunk, error, id):
+            assert not error
             counter.dec()
             counter.dec()
-            print len(chunk)
             data, items_hash = self.key.decrypt(chunk)
             data, items_hash = self.key.decrypt(chunk)
             assert self.key.id_hash(data) == id
             assert self.key.id_hash(data) == id
             unpacker.feed(data)
             unpacker.feed(data)
             for item in unpacker:
             for item in unpacker:
                 callback(item)
                 callback(item)
         for id, size, csize in self.metadata['items']:
         for id, size, csize in self.metadata['items']:
-            # Limit the number of concurrent items requests to 3
+            # Limit the number of concurrent items requests to 10
             self.store.flush_rpc(counter, 10)
             self.store.flush_rpc(counter, 10)
             counter.inc()
             counter.inc()
             self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
             self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
 
 
-    def add_item(self, item, refs=None):
-        data = msgpack.packb(item)
-        if crc32(item['path'].encode('utf-8')) % 1000 == 0:
+    def add_item(self, item):
+        self.items.write(msgpack.packb(item))
+        if self.items.tell() > 1024 * 1024:
             self.flush_items()
             self.flush_items()
-        if refs:
-            self.items_refs += refs
-        self.items += data
 
 
-    def flush_items(self):
-        if not self.items:
+    def flush_items(self, flush=False):
+        if self.items.tell() == 0:
             return
             return
-        print 'flush', len(self.items)
-        id = self.key.id_hash(self.items)
-        if self.cache.seen_chunk(id):
-            self.items_ids.append(self.cache.chunk_incref(id))
-            for id in self.items_refs:
-                self.cache.chunk_decref(id)
+        self.items.seek(0)
+        chunks = list(str(s) for s in chunkify(self.items, CHUNK_SIZE, WINDOW_SIZE, self.key.chunk_seed))
+        self.items.seek(0)
+        self.items.truncate()
+        for chunk in chunks[:-1]:
+            self.items_ids.append(self.cache.add_chunk(self.key.id_hash(chunk), chunk))
+        if flush or len(chunks) == 1:
+            self.items_ids.append(self.cache.add_chunk(self.key.id_hash(chunks[-1]), chunks[-1]))
         else:
         else:
-            self.items_ids.append(self.cache.add_chunk(id, self.items))
-        self.items = ''
-        self.items_refs = []
+            self.items.write(chunks[-1])
 
 
     def save(self, name, cache):
     def save(self, name, cache):
         self.id = self.key.archive_hash(name)
         self.id = self.key.archive_hash(name)
-        self.flush_items()
+        self.flush_items(flush=True)
         metadata = {
         metadata = {
             'version': 1,
             'version': 1,
             'name': name,
             'name': name,
@@ -110,7 +106,7 @@ class Archive(object):
     def stats(self, cache):
     def stats(self, cache):
         # This function is a bit evil since it abuses the cache to calculate
         # This function is a bit evil since it abuses the cache to calculate
         # the stats. The cache transaction must be rolled back afterwards
         # the stats. The cache transaction must be rolled back afterwards
-        def cb(chunk, error, (id, unique)):
+        def cb(chunk, error, id):
             assert not error
             assert not error
             data, items_hash = self.key.decrypt(chunk)
             data, items_hash = self.key.decrypt(chunk)
             assert self.key.id_hash(data) == id
             assert self.key.id_hash(data) == id
@@ -121,7 +117,7 @@ class Archive(object):
                         count, _, _ = self.cache.chunks[id]
                         count, _, _ = self.cache.chunks[id]
                         stats['osize'] += size
                         stats['osize'] += size
                         stats['csize'] += csize
                         stats['csize'] += csize
-                        if unique and count == 1:
+                        if count == 1:
                             stats['usize'] += csize
                             stats['usize'] += csize
                         self.cache.chunks[id] = count - 1, size, csize
                         self.cache.chunks[id] = count - 1, size, csize
                 except KeyError:
                 except KeyError:
@@ -132,10 +128,9 @@ class Archive(object):
         for id, size, csize in self.metadata['items']:
         for id, size, csize in self.metadata['items']:
             stats['osize'] += size
             stats['osize'] += size
             stats['csize'] += csize
             stats['csize'] += csize
-            unique = self.cache.seen_chunk(id) == 1
-            if unique:
+            if self.cache.seen_chunk(id) == 1:
                 stats['usize'] += csize
                 stats['usize'] += csize
-            self.store.get(NS_CHUNK, id, callback=cb, callback_data=(id, unique))
+            self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
             self.cache.chunk_decref(id)
             self.cache.chunk_decref(id)
         self.store.flush_rpc()
         self.store.flush_rpc()
         cache.rollback()
         cache.rollback()
@@ -256,10 +251,7 @@ class Archive(object):
             self.cache.chunk_decref(id)
             self.cache.chunk_decref(id)
         unpacker = msgpack.Unpacker()
         unpacker = msgpack.Unpacker()
         for id, size, csize in self.metadata['items']:
         for id, size, csize in self.metadata['items']:
-            if self.cache.seen_chunk(id) == 1:
-                self.store.get(NS_CHUNK, id, callback=callback, callback_data=id)
-            else:
-                self.cache.chunk_decref(id)
+            self.store.get(NS_CHUNK, id, callback=callback, callback_data=id)
         self.store.flush_rpc()
         self.store.flush_rpc()
         self.store.delete(NS_ARCHIVE_METADATA, self.id)
         self.store.delete(NS_ARCHIVE_METADATA, self.id)
         self.store.commit()
         self.store.commit()
@@ -335,7 +327,7 @@ class Archive(object):
             cache.memorize_file(path_hash, st, ids)
             cache.memorize_file(path_hash, st, ids)
         item = {'path': safe_path, 'chunks': chunks}
         item = {'path': safe_path, 'chunks': chunks}
         item.update(self.stat_attrs(st, path))
         item.update(self.stat_attrs(st, path))
-        self.add_item(item, ids)
+        self.add_item(item)
 
 
     @staticmethod
     @staticmethod
     def list_archives(store, key):
     def list_archives(store, key):

+ 1 - 1
darc/store.py

@@ -211,7 +211,7 @@ class Store(object):
     def list(self, ns, marker=None, limit=1000000):
     def list(self, ns, marker=None, limit=1000000):
         return [key for key, value in self.get_index(ns).iteritems(marker=marker, limit=limit)]
         return [key for key, value in self.get_index(ns).iteritems(marker=marker, limit=limit)]
 
 
-    def flush_rpc(self):
+    def flush_rpc(self, *args):
         pass
         pass