浏览代码

Some experimental remote store optimizations

Jonas Borgström 14 年之前
父节点
当前提交
c125ecc2f9
共有 3 个文件被更改,包括 68 次插入22 次删除
  1. 6 5
      darc/archive.py
  2. 58 17
      darc/remote.py
  3. 4 0
      darc/store.py

+ 6 - 5
darc/archive.py

@@ -6,6 +6,7 @@ import os
 import socket
 import stat
 import sys
+from itertools import izip
 from xattr import xattr, XATTR_NOFOLLOW
 
 from . import NS_ARCHIVE_METADATA, NS_ARCHIVE_ITEMS, NS_ARCHIVE_CHUNKS, NS_CHUNK, \
@@ -196,14 +197,14 @@ class Archive(object):
             os.utime(path, (item['atime'], item['mtime']))
 
     def verify_file(self, item):
-        for id in item['chunks']:
-            try:
-                magic, data, hash = self.keychain.decrypt(self.store.get(NS_CHUNK, id))
+        try:
+            for id, chunk in izip(item['chunks'], self.store.get_many(NS_CHUNK, item['chunks'])):
+                magic, data, hash = self.keychain.decrypt(chunk)
                 assert magic == PACKET_CHUNK
                 if self.keychain.id_hash(data) != id:
                     raise IntegrityError('chunk id did not match')
-            except IntegrityError:
-                return False
+        except IntegrityError:
+            return False
         return True
 
     def delete(self, cache):

+ 58 - 17
darc/remote.py

@@ -16,7 +16,7 @@ class ChannelNotifyer(object):
 
     def __init__(self, channel):
         self.channel = channel
-        self.enabled = True
+        self.enabled = 0
 
     def set(self):
         if self.enabled:
@@ -55,7 +55,12 @@ class StoreServer(object):
                         sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None)))
                     else:
                         if method not in ('put', 'delete'):
-                            sys.stdout.write(msgpack.packb((1, msgid, None, res)))
+                            if hasattr(res, 'next'):
+                                for r in res:
+                                    sys.stdout.write(msgpack.packb((1, msgid, None, r)))
+                                sys.stdout.write(msgpack.packb((2, msgid, None, None)))
+                            else:
+                                sys.stdout.write(msgpack.packb((1, msgid, None, res)))
                     sys.stdout.flush()
             if es:
                 return
@@ -106,18 +111,12 @@ class RemoteStore(object):
         self.channel.in_stderr_buffer.set_event(self.notifier)
         self.channel.exec_command('darc serve')
         self.msgid = 0
-        self.id, self.tid = self._cmd('open', (location.path, create))
+        self.id, self.tid = self.cmd('open', (location.path, create))
 
-    def _cmd(self, *args, **kw):
-        self.notifier.enabled = True
-        try:
-            return self._cmd2(*args, **kw)
-        finally:
-            self.notifier.enabled = False
-
-    def _cmd2(self, cmd, args, defer=False):
+    def cmd(self, cmd, args, defer=False):
         self.msgid += 1
         odata = msgpack.packb((0, self.msgid, cmd, args))
+        self.notifier.enabled += 1
         while True:
             if self.channel.closed:
                 raise Exception('Connection closed')
@@ -126,6 +125,7 @@ class RemoteStore(object):
                 if n > 0:
                     odata = odata[n:]
                 if not odata and defer:
+                    self.notifier.enabled -= 1
                     return
             elif self.channel.recv_stderr_ready():
                 print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
@@ -134,21 +134,62 @@ class RemoteStore(object):
                 for type, msgid, error, res in self.unpacker:
                     if error:
                         raise self.RPCError(error)
+                    self.notifier.enabled -= 1
                     return res
             else:
                 with self.channel.lock:
                     self.channel.out_buffer_cv.wait(10)
 
+    def cmd_iter(self, cmd, args):
+        self.msgid += 1
+        odata = msgpack.packb((0, self.msgid, cmd, args))
+        self.notifier.enabled += 1
+        try:
+            while True:
+                if self.channel.closed:
+                    raise Exception('Connection closed')
+                if odata and self.channel.send_ready():
+                    n = self.channel.send(odata)
+                    if n > 0:
+                        odata = odata[n:]
+                if self.channel.recv_stderr_ready():
+                    print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
+                elif self.channel.recv_ready():
+                    self.unpacker.feed(self.channel.recv(BUFSIZE))
+                    for type, msgid, error, res in self.unpacker:
+                        if msgid < self.msgid:
+                            continue
+                        if error:
+                            raise self.RPCError(error)
+                        if type == 1:
+                            yield res
+                        else:
+                            return
+                else:
+                    with self.channel.lock:
+                        self.channel.out_buffer_cv.wait(10)
+        finally:
+            self.notifier.enabled -= 1
+
     def commit(self, *args):
-        self._cmd('commit', args)
+        self.cmd('commit', args)
         self.tid += 1
 
     def rollback(self, *args):
-        return self._cmd('rollback', args)
+        return self.cmd('rollback', args)
 
     def get(self, *args):
         try:
-            return self._cmd('get', args)
+            return self.cmd('get', args)
+        except self.RPCError, e:
+            print e.name
+            if e.name == 'DoesNotExist':
+                raise self.DoesNotExist
+            raise
+
+    def get_many(self, *args):
+        try:
+            return self.cmd_iter('get_many', args)
         except self.RPCError, e:
             if e.name == 'DoesNotExist':
                 raise self.DoesNotExist
@@ -156,14 +197,14 @@ class RemoteStore(object):
 
     def put(self, *args):
         try:
-            return self._cmd('put', args, defer=True)
+            return self.cmd('put', args, defer=True)
         except self.RPCError, e:
             if e.name == 'AlreadyExists':
                 raise self.AlreadyExists
 
     def delete(self, *args):
-        return self._cmd('delete', args, defer=True)
+        return self.cmd('delete', args, defer=True)
 
     def list(self, *args):
-        return self._cmd('list', args)
+        return self.cmd('list', args)
 

+ 4 - 0
darc/store.py

@@ -185,6 +185,10 @@ class Store(object):
         except KeyError:
             raise self.DoesNotExist
 
+    def get_many(self, ns, ids):
+        for id in ids:
+            yield self.get(ns, id)
+
     def put(self, ns, id, data):
         if not self.txn_active:
             self.begin_txn()