浏览代码

Pipeline improvements and minor code cleanup.

Jonas Borgström 12 年之前
父节点
当前提交
7245575fa2
共有 2 个文件被更改,包括 35 次插入32 次删除
  1. 25 19
      darc/archive.py
  2. 10 13
      darc/remote.py

+ 25 - 19
darc/archive.py

@@ -1,7 +1,7 @@
 from __future__ import with_statement
 from datetime import datetime, timedelta
 from getpass import getuser
-from itertools import izip
+from itertools import izip_longest
 import msgpack
 import os
 import socket
@@ -29,6 +29,7 @@ class ItemIter(object):
         self.unpacker = iter(unpacker)
         self.filter = filter
         self.stack = []
+        self.peeks = 0
         self._peek = None
         self._peek_iter = None
         global foo
@@ -39,9 +40,12 @@ class ItemIter(object):
 
     def next(self):
         if self.stack:
-            return self.stack.pop(0)
-        self._peek = None
-        return self.get_next()
+            item = self.stack.pop(0)
+        else:
+            self._peek = None
+            item = self.get_next()
+        self.peeks = max(0, self.peeks - len(item.get('chunks', [])))
+        return item
 
     def get_next(self):
         next = self.unpacker.next()
@@ -52,7 +56,7 @@ class ItemIter(object):
     def peek(self):
         while True:
             while not self._peek or not self._peek_iter:
-                if len(self.stack) > 100:
+                if self.peeks > 100:
                     raise StopIteration
                 self._peek = self.get_next()
                 self.stack.append(self._peek)
@@ -61,7 +65,9 @@ class ItemIter(object):
                 else:
                     self._peek_iter = None
             try:
-                return self._peek_iter.next()
+                item = self._peek_iter.next()
+                self.peeks += 1
+                return item
             except StopIteration:
                 self._peek = None
 
@@ -186,25 +192,26 @@ class Archive(object):
         self.cache.commit()
 
     def calc_stats(self, cache):
+        def add(id):
+            count, size, csize = self.cache.chunks[id]
+            stats.update(size, csize, count == 1)
+            self.cache.chunks[id] = count - 1, size, csize
         # This function is a bit evil since it abuses the cache to calculate
         # the stats. The cache transaction must be rolled back afterwards
         unpacker = msgpack.Unpacker()
         cache.begin_txn()
         stats = Statistics()
-        for id in self.metadata['items']:
-            unpacker.feed(self.key.decrypt(id, self.store.get(id)))
+        add(self.id)
+        for id, chunk in izip_longest(self.metadata['items'], self.store.get_many(self.metadata['items'])):
+            unpacker.feed(self.key.decrypt(id, chunk))
             for item in unpacker:
                 try:
                     for id, size, csize in item['chunks']:
-                        count, _, _ = self.cache.chunks[id]
-                        stats.update(size, csize, count == 1)
-                        stats.nfiles += 1
-                        self.cache.chunks[id] = count - 1, size, csize
+                        add(id)
+                    stats.nfiles += 1
                 except KeyError:
                     pass
-            count, size, csize = self.cache.chunks[id]
-            stats.update(size, csize, count == 1)
-            self.cache.chunks[id] = count - 1, size, csize
+            add(id)
         cache.rollback()
         return stats
 
@@ -237,7 +244,7 @@ class Archive(object):
                     fd = open(path, 'wb')
                     start_cb(item)
                     ids = [id for id, size, csize in item['chunks']]
-                    for id, chunk in izip(ids, self.store.get_many(ids, peek)):
+                    for id, chunk in izip_longest(ids, self.store.get_many(ids, peek)):
                         data = self.key.decrypt(id, chunk)
                         fd.write(data)
                     fd.close()
@@ -296,9 +303,8 @@ class Archive(object):
             start(item)
             ids = [id for id, size, csize in item['chunks']]
             try:
-                for id, chunk in izip(ids, self.store.get_many(ids, peek)):
-                    if chunk:
-                        self.key.decrypt(id, chunk)
+                for id, chunk in izip_longest(ids, self.store.get_many(ids, peek)):
+                    self.key.decrypt(id, chunk)
             except Exception, e:
                 result(item, False)
                 return

+ 10 - 13
darc/remote.py

@@ -75,7 +75,7 @@ class RemoteStore(object):
         self.cache = LRUCache(200)
         self.to_send = ''
         self.extra = {}
-        self.pending_cache = {}
+        self.pending = {}
         self.unpacker = msgpack.Unpacker()
         self.msgid = 0
         self.received_msgid = 0
@@ -106,13 +106,10 @@ class RemoteStore(object):
             self.received_msgid = msgid
             if error:
                 raise self.RPCError(error)
-            if msgid in self.pending_cache:
-                args = self.pending_cache.pop(msgid)
-                self.cache[args] = msgid, res
-            else:
-                print 'unknown response'
-            for args in self.extra.pop(msgid, []):
-                to_yield.append(self.cache[args][1])
+            args = self.pending.pop(msgid)
+            self.cache[args] = msgid, res
+            for args, resp in self.extra.pop(msgid, []):
+                to_yield.append(resp or self.cache[args][1])
         for res in to_yield:
             yield res
 
@@ -127,12 +124,12 @@ class RemoteStore(object):
             if not args in self.cache:
                 self.msgid += 1
                 msgid = self.msgid
-                self.pending_cache[msgid] = args
+                self.pending[msgid] = args
                 self.cache[args] = msgid, None
                 data.append(msgpack.packb((1, msgid, cmd, args)))
             msgid, resp = self.cache[args]
             m = max(m, msgid)
-            self.extra.setdefault(m, []).append(args)
+            self.extra.setdefault(m, []).append((args, resp))
         return ''.join(data)
 
     def gen_cache_requests(self, cmd, peek):
@@ -146,7 +143,7 @@ class RemoteStore(object):
                 continue
             self.msgid += 1
             msgid = self.msgid
-            self.pending_cache[msgid] = args
+            self.pending[msgid] = args
             self.cache[args] = msgid, None
             data.append(msgpack.packb((1, msgid, cmd, args)))
         return ''.join(data)
@@ -156,9 +153,9 @@ class RemoteStore(object):
         left = len(argsv)
         data = self.gen_request(cmd, argsv)
         self.to_send += data
-        for args in self.extra.pop(self.received_msgid, []):
+        for args, resp in self.extra.pop(self.received_msgid, []):
             left -= 1
-            yield self.cache[args][1]
+            yield resp or self.cache[args][1]
         while left:
             r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
             if x: