Jelajahi Sumber

More remote store improvements

Jonas Borgström 14 tahun lalu
induk
melakukan
87271291a1
5 mengubah file dengan 62 tambahan dan 72 penghapusan
  1. 12 6
      darc/archive.py
  2. 3 2
      darc/cache.py
  3. 18 0
      darc/helpers.py
  4. 24 59
      darc/remote.py
  5. 5 5
      darc/store.py

+ 12 - 6
darc/archive.py

@@ -197,15 +197,21 @@ class Archive(object):
             os.utime(path, (item['atime'], item['mtime']))
 
     def verify_file(self, item):
+        def verify_chunk(chunk, error, id):
+            assert not error
+            magic, data, hash = self.keychain.decrypt(chunk)
+            assert magic == PACKET_CHUNK
+            if self.keychain.id_hash(data) != id:
+                raise IntegrityError()
         try:
-            for id, chunk in izip(item['chunks'], self.store.get_many(NS_CHUNK, item['chunks'])):
-                magic, data, hash = self.keychain.decrypt(chunk)
-                assert magic == PACKET_CHUNK
-                if self.keychain.id_hash(data) != id:
-                    raise IntegrityError('chunk id did not match')
+            for id in item['chunks'][:-1]:
+                self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=id)
+            last = item['chunks'][-1]
+            chunk = self.store.get(NS_CHUNK, last)
+            verify_chunk(chunk, None, last)
+            return True
         except IntegrityError:
             return False
-        return True
 
     def delete(self, cache):
         for id, size in self.get_chunks():

+ 3 - 2
darc/cache.py

@@ -6,6 +6,7 @@ import os
 import shutil
 
 from . import NS_ARCHIVE_CHUNKS, NS_CHUNK, PACKET_ARCHIVE_CHUNKS, PACKET_CHUNK
+from .helpers import error_callback
 from .hashindex import NSIndex
 
 
@@ -146,7 +147,7 @@ class Cache(object):
             return self.chunk_incref(id)
         data, hash = self.keychain.encrypt(PACKET_CHUNK, data)
         csize = len(data)
-        self.store.put(NS_CHUNK, id, data)
+        self.store.put(NS_CHUNK, id, data, callback=error_callback)
         self.chunks[id] = (1000001, csize)
         return id
 
@@ -167,7 +168,7 @@ class Cache(object):
         count, size = self.chunks[id]
         if count == 1:
             del self.chunks[id]
-            self.store.delete(NS_CHUNK, id)
+            self.store.delete(NS_CHUNK, id, callback=error_callback)
         else:
             self.chunks[id] = (count - 1, size)
 

+ 18 - 0
darc/helpers.py

@@ -10,6 +10,24 @@ import stat
 import struct
 import time
 
+
+def deferrable(f):
+    def wrapper(*args, **kw):
+        callback = kw.pop('callback', None)
+        if callback:
+            data = kw.pop('callback_data', None)
+            try:
+                callback(f(*args, **kw), None, data)
+            except Exception, e:
+                callback(None, e, data)
+        else:
+            return f(*args, **kw)
+    return wrapper
+
+def error_callback(res, error, data):
+    if res:
+        raise res
+
 def to_localtime(ts):
     """Convert datetime object from UTC to local time zone"""
     return ts - timedelta(seconds=time.altzone)

+ 24 - 59
darc/remote.py

@@ -54,13 +54,7 @@ class StoreServer(object):
                     except Exception, e:
                         sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None)))
                     else:
-                        if method not in ('put', 'delete'):
-                            if hasattr(res, 'next'):
-                                for r in res:
-                                    sys.stdout.write(msgpack.packb((1, msgid, None, r)))
-                                sys.stdout.write(msgpack.packb((2, msgid, None, None)))
-                            else:
-                                sys.stdout.write(msgpack.packb((1, msgid, None, res)))
+                        sys.stdout.write(msgpack.packb((1, msgid, None, res)))
                     sys.stdout.flush()
             if es:
                 return
@@ -110,13 +104,16 @@ class RemoteStore(object):
         self.channel.in_buffer.set_event(self.notifier)
         self.channel.in_stderr_buffer.set_event(self.notifier)
         self.channel.exec_command('darc serve')
+        self.callbacks = {}
         self.msgid = 0
         self.id, self.tid = self.cmd('open', (location.path, create))
 
-    def cmd(self, cmd, args, defer=False):
+    def cmd(self, cmd, args, callback=None, callback_data=None):
         self.msgid += 1
-        odata = msgpack.packb((0, self.msgid, cmd, args))
         self.notifier.enabled += 1
+        if callback:
+            self.callbacks[self.msgid] = callback, callback_data
+        odata = msgpack.packb((0, self.msgid, cmd, args))
         while True:
             if self.channel.closed:
                 raise Exception('Connection closed')
@@ -124,52 +121,28 @@ class RemoteStore(object):
                 n = self.channel.send(odata)
                 if n > 0:
                     odata = odata[n:]
-                if not odata and defer:
-                    self.notifier.enabled -= 1
+                if not odata and callback:
                     return
             elif self.channel.recv_stderr_ready():
                 print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
             elif self.channel.recv_ready():
                 self.unpacker.feed(self.channel.recv(BUFSIZE))
                 for type, msgid, error, res in self.unpacker:
-                    if error:
-                        raise self.RPCError(error)
                     self.notifier.enabled -= 1
-                    return res
-            else:
-                with self.channel.lock:
-                    self.channel.out_buffer_cv.wait(10)
-
-    def cmd_iter(self, cmd, args):
-        self.msgid += 1
-        odata = msgpack.packb((0, self.msgid, cmd, args))
-        self.notifier.enabled += 1
-        try:
-            while True:
-                if self.channel.closed:
-                    raise Exception('Connection closed')
-                if odata and self.channel.send_ready():
-                    n = self.channel.send(odata)
-                    if n > 0:
-                        odata = odata[n:]
-                if self.channel.recv_stderr_ready():
-                    print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
-                elif self.channel.recv_ready():
-                    self.unpacker.feed(self.channel.recv(BUFSIZE))
-                    for type, msgid, error, res in self.unpacker:
-                        if msgid < self.msgid:
-                            continue
+                    if msgid == self.msgid:
                         if error:
                             raise self.RPCError(error)
-                        if type == 1:
-                            yield res
-                        else:
-                            return
-                else:
-                    with self.channel.lock:
+                        return res
+                    else:
+                        c, d = self.callbacks.pop(msgid, (None, None))
+                        if c:
+                            c(res, error, d)
+            else:
+                with self.channel.lock:
+                    if (self.channel.out_window_size == 0 and
+                        not self.channel.recv_ready() and
+                        not self.channel.recv_stderr_ready()):
                         self.channel.out_buffer_cv.wait(10)
-        finally:
-            self.notifier.enabled -= 1
 
     def commit(self, *args):
         self.cmd('commit', args)
@@ -178,32 +151,24 @@ class RemoteStore(object):
     def rollback(self, *args):
         return self.cmd('rollback', args)
 
-    def get(self, *args):
+    def get(self, ns, id, callback=None, callback_data=None):
         try:
-            return self.cmd('get', args)
+            return self.cmd('get', (ns, id), callback, callback_data)
         except self.RPCError, e:
             print e.name
             if e.name == 'DoesNotExist':
                 raise self.DoesNotExist
             raise
 
-    def get_many(self, *args):
-        try:
-            return self.cmd_iter('get_many', args)
-        except self.RPCError, e:
-            if e.name == 'DoesNotExist':
-                raise self.DoesNotExist
-            raise
-
-    def put(self, *args):
+    def put(self, ns, id, data, callback=None, callback_data=None):
         try:
-            return self.cmd('put', args, defer=True)
+            return self.cmd('put', (ns, id, data), callback, callback_data)
         except self.RPCError, e:
             if e.name == 'AlreadyExists':
                 raise self.AlreadyExists
 
-    def delete(self, *args):
-        return self.cmd('delete', args, defer=True)
+    def delete(self, ns, id, callback=None, callback_data=None):
+        return self.cmd('delete', (ns, id), callback, callback_data)
 
     def list(self, *args):
         return self.cmd('list', args)

+ 5 - 5
darc/store.py

@@ -10,7 +10,7 @@ import unittest
 from zlib import crc32
 
 from .hashindex import NSIndex, BandIndex
-from .helpers import IntegrityError, read_set, write_set
+from .helpers import IntegrityError, read_set, write_set, deferrable
 from .lrucache import LRUCache
 
 
@@ -178,6 +178,7 @@ class Store(object):
                 self.indexes[ns] = cls.create(filename)
             return self.indexes[ns]
 
+    @deferrable
     def get(self, ns, id):
         try:
             band, offset = self.get_index(ns)[id]
@@ -185,10 +186,7 @@ class Store(object):
         except KeyError:
             raise self.DoesNotExist
 
-    def get_many(self, ns, ids):
-        for id in ids:
-            yield self.get(ns, id)
-
+    @deferrable
     def put(self, ns, id, data):
         if not self.txn_active:
             self.begin_txn()
@@ -198,6 +196,7 @@ class Store(object):
         bands[band] += 1
         self.get_index(ns)[id] = band, offset
 
+    @deferrable
     def delete(self, ns, id):
         if not self.txn_active:
             self.begin_txn()
@@ -208,6 +207,7 @@ class Store(object):
         except KeyError:
             raise self.DoesNotExist
 
+    @deferrable
     def list(self, ns, marker=None, limit=1000000):
         return [key for key, value in self.get_index(ns).iteritems(marker=marker, limit=limit)]