瀏覽代碼

Improved stats and delete performace on remote stores

Jonas Borgström 14 年之前
父節點
當前提交
6c77ce53d9
共有 2 個文件被更改,包括 42 次插入37 次删除
  1. 38 33
      darc/archive.py
  2. 4 4
      darc/archiver.py

+ 38 - 33
darc/archive.py

@@ -102,40 +102,39 @@ class Archive(object):
         self.store.commit()
         cache.commit()
 
-    def get_chunks(self):
-        for item in self.get_items():
-            try:
-                for chunk in item['chunks']:
-                    yield chunk
-            except KeyError:
-                pass
-
     def stats(self, cache):
         # This function is a bit evil since it abuses the cache to calculate
         # the stats. The cache transaction must be rolled back afterwards
-        unpacker = msgpack.Unpacker()
-        cache.begin_txn()
-        osize = zsize = usize = 0
-        for id, size, csize in self.metadata['items']:
-            osize += size
-            zsize += csize
-            unique = self.cache.seen_chunk(id) == 1
-            if unique:
-                usize += csize
-            data, items_hash = self.key.decrypt(self.store.get(NS_CHUNK, id))
+        def cb(chunk, error, (id, unique)):
+            assert not error
+            data, items_hash = self.key.decrypt(chunk)
             assert self.key.id_hash(data) == id
             unpacker.feed(data)
             for item in unpacker:
                 try:
                     for id, size, csize in item['chunks']:
-                        osize += size
-                        zsize += csize
-                        if unique and self.cache.seen_chunk(id) == 1:
-                            usize += csize
+                        count, _, _ = self.cache.chunks[id]
+                        stats['osize'] += size
+                        stats['csize'] += csize
+                        if unique and count == 1:
+                            stats['usize'] += csize
+                        self.cache.chunks[id] = count - 1, size, csize
                 except KeyError:
                     pass
+        unpacker = msgpack.Unpacker()
+        cache.begin_txn()
+        stats = {'osize': 0, 'csize': 0, 'usize': 0}
+        for id, size, csize in self.metadata['items']:
+            stats['osize'] += size
+            stats['csize'] += csize
+            unique = self.cache.seen_chunk(id) == 1
+            if unique:
+                stats['usize'] += csize
+            self.store.get(NS_CHUNK, id, callback=cb, callback_data=(id, unique))
+            self.cache.chunk_decref(id)
+        self.store.flush_rpc()
         cache.rollback()
-        return osize, zsize, usize
+        return stats
 
     def extract_item(self, item, dest=None, start_cb=None):
         dest = dest or os.getcwdu()
@@ -237,19 +236,25 @@ class Archive(object):
                 self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=(id, i, i==n-1))
 
     def delete(self, cache):
+        def cb(chunk, error, id):
+            assert not error
+            data, items_hash = self.key.decrypt(chunk)
+            assert self.key.id_hash(data) == id
+            unpacker.feed(data)
+            for item in unpacker:
+                try:
+                    for chunk_id, size, csize in item['chunks']:
+                        self.cache.chunk_decref(chunk_id)
+                except KeyError:
+                    pass
+            self.cache.chunk_decref(id)
         unpacker = msgpack.Unpacker()
         for id, size, csize in self.metadata['items']:
             if self.cache.seen_chunk(id) == 1:
-                data, items_hash = self.key.decrypt(self.store.get(NS_CHUNK, id))
-                assert self.key.id_hash(data) == id
-                unpacker.feed(data)
-                for item in unpacker:
-                    try:
-                        for chunk_id, size, csize in item['chunks']:
-                            self.cache.chunk_decref(chunk_id)
-                    except KeyError:
-                        pass
-            self.cache.chunk_decref(id)
+                self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
+            else:
+                self.cache.chunk_decref(id)
+        self.store.flush_rpc()
         self.store.delete(NS_ARCHIVE_METADATA, self.id)
         self.store.commit()
         cache.commit()

+ 4 - 4
darc/archiver.py

@@ -196,15 +196,15 @@ class Archiver(object):
         key = Key(store)
         cache = Cache(store, key)
         archive = Archive(store, key, args.archive.archive, cache=cache)
-        osize, csize, usize = archive.stats(cache)
+        stats = archive.stats(cache)
         print 'Name:', archive.metadata['name']
         print 'Hostname:', archive.metadata['hostname']
         print 'Username:', archive.metadata['username']
         print 'Time:', archive.metadata['time']
         print 'Command line:', ' '.join(archive.metadata['cmdline'])
-        print 'Original size:', format_file_size(osize)
-        print 'Compressed size:', format_file_size(csize)
-        print 'Unique data:', format_file_size(usize)
+        print 'Original size:', format_file_size(stats['osize'])
+        print 'Compressed size:', format_file_size(stats['csize'])
+        print 'Unique data:', format_file_size(stats['usize'])
         return self.exit_code
 
     def run(self, args=None):