浏览代码

Improved remote upstream performance

Jonas Borgström 14 年之前
父节点
当前提交
f7ebd19640
共有 2 个文件被更改,包括 57 次插入27 次删除
  1. 56 25
      darc/remote.py
  2. 1 2
      darc/store.py

+ 56 - 25
darc/remote.py

@@ -12,6 +12,21 @@ from .store import Store
 BUFSIZE = 1024 * 1024
 BUFSIZE = 1024 * 1024
 
 
 
 
+class ChannelNotifyer(object):
+
+    def __init__(self, channel):
+        self.channel = channel
+        self.enabled = True
+
+    def set(self):
+        if self.enabled:
+            with self.channel.lock:
+                self.channel.out_buffer_cv.notifyAll()
+
+    def clear(self):
+        pass
+
+
 class StoreServer(object):
 class StoreServer(object):
 
 
     def __init__(self):
     def __init__(self):
@@ -39,7 +54,8 @@ class StoreServer(object):
                     except Exception, e:
                     except Exception, e:
                         sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None)))
                         sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None)))
                     else:
                     else:
-                        sys.stdout.write(msgpack.packb((1, msgid, None, res)))
+                        if method not in ('put', 'delete'):
+                            sys.stdout.write(msgpack.packb((1, msgid, None, res)))
                     sys.stdout.flush()
                     sys.stdout.flush()
             if es:
             if es:
                 return
                 return
@@ -75,38 +91,53 @@ class RemoteStore(object):
                 self.client.connect(**params)
                 self.client.connect(**params)
                 break
                 break
             except (paramiko.PasswordRequiredException,
             except (paramiko.PasswordRequiredException,
-		    paramiko.AuthenticationException,
-		    paramiko.SSHException):
-		if not 'password' in params:
-			params['password'] = getpass.getpass('Password for %(username)s@%(hostname)s:' % params)
-		else:
-		    raise
+                    paramiko.AuthenticationException,
+                    paramiko.SSHException):
+                if not 'password' in params:
+                    params['password'] = getpass.getpass('Password for %(username)s@%(hostname)s:' % params)
+                else:
+                    raise
 
 
         self.unpacker = msgpack.Unpacker()
         self.unpacker = msgpack.Unpacker()
         self.transport = self.client.get_transport()
         self.transport = self.client.get_transport()
         self.channel = self.transport.open_session()
         self.channel = self.transport.open_session()
+        self.notifier = ChannelNotifyer(self.channel)
+        self.channel.in_buffer.set_event(self.notifier)
+        self.channel.in_stderr_buffer.set_event(self.notifier)
         self.channel.exec_command('darc serve')
         self.channel.exec_command('darc serve')
         self.msgid = 0
         self.msgid = 0
         self.id, self.tid = self._cmd('open', (location.path, create))
         self.id, self.tid = self._cmd('open', (location.path, create))
 
 
-    def _cmd(self, cmd, args):
+    def _cmd(self, *args, **kw):
+        self.notifier.enabled = True
+        try:
+            return self._cmd2(*args, **kw)
+        finally:
+            self.notifier.enabled = False
+
+    def _cmd2(self, cmd, args, defer=False):
         self.msgid += 1
         self.msgid += 1
-        self.channel.sendall(msgpack.packb((0, self.msgid, cmd, args)))
+        odata = msgpack.packb((0, self.msgid, cmd, args))
         while True:
         while True:
-            r, w, e = select.select([self.channel], [], [self.channel], 10)
-            if r:
-                if self.channel.closed:
-                    raise Exception('Connection closed')
-                if self.channel.recv_stderr_ready():
-                    print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
-                elif self.channel.recv_ready():
-                    self.unpacker.feed(self.channel.recv(BUFSIZE))
-                    for type, msgid, error, res in self.unpacker:
-                        if error:
-                            raise self.RPCError(error)
-                        return res
-            if e:
-                raise Exception('ssh channel error')
+            if self.channel.closed:
+                raise Exception('Connection closed')
+            if odata and self.channel.send_ready():
+                n = self.channel.send(odata)
+                if n > 0:
+                    odata = odata[n:]
+                if not odata and defer:
+                    return
+            elif self.channel.recv_stderr_ready():
+                print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
+            elif self.channel.recv_ready():
+                self.unpacker.feed(self.channel.recv(BUFSIZE))
+                for type, msgid, error, res in self.unpacker:
+                    if error:
+                        raise self.RPCError(error)
+                    return res
+            else:
+                with self.channel.lock:
+                    self.channel.out_buffer_cv.wait(10)
 
 
     def commit(self, *args):
     def commit(self, *args):
         self._cmd('commit', args)
         self._cmd('commit', args)
@@ -125,13 +156,13 @@ class RemoteStore(object):
 
 
     def put(self, *args):
     def put(self, *args):
         try:
         try:
-            return self._cmd('put', args)
+            return self._cmd('put', args, defer=True)
         except self.RPCError, e:
         except self.RPCError, e:
             if e.name == 'AlreadyExists':
             if e.name == 'AlreadyExists':
                 raise self.AlreadyExists
                 raise self.AlreadyExists
 
 
     def delete(self, *args):
     def delete(self, *args):
-        return self._cmd('delete', args)
+        return self._cmd('delete', args, defer=True)
 
 
     def list(self, *args):
     def list(self, *args):
         return self._cmd('list', args)
         return self._cmd('list', args)

+ 1 - 2
darc/store.py

@@ -205,8 +205,7 @@ class Store(object):
             raise self.DoesNotExist
             raise self.DoesNotExist
 
 
     def list(self, ns, marker=None, limit=1000000):
     def list(self, ns, marker=None, limit=1000000):
-        for key, value in self.get_index(ns).iteritems(marker=marker, limit=limit):
-            yield key
+        return [key for key, value in self.get_index(ns).iteritems(marker=marker, limit=limit)]
 
 
 
 
 class BandIO(object):
 class BandIO(object):