Jonas Borgström преди 14 години
родител
ревизия
2d6df6454e
променени са 3 файла, в които са добавени 70 реда и са изтрити 31 реда
  1. 20 16
      darc/archive.py
  2. 19 0
      darc/helpers.py
  3. 31 15
      darc/remote.py

+ 20 - 16
darc/archive.py

@@ -6,12 +6,12 @@ import os
 import socket
 import stat
 import sys
-from os.path import dirname
+from zlib import crc32
 from xattr import xattr, XATTR_NOFOLLOW
 
 from . import NS_ARCHIVE_METADATA, NS_CHUNK
 from ._speedups import chunkify
-from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
+from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError, Counter
 
 CHUNK_SIZE = 64 * 1024
 WINDOW_SIZE = 4096
@@ -31,7 +31,6 @@ class Archive(object):
         self.cache = cache
         self.items = ''
         self.items_refs = []
-        self.items_prefix = ''
         self.items_ids = []
         self.hard_links = {}
         if name:
@@ -54,28 +53,33 @@ class Archive(object):
 
     def iter_items(self, callback):
         unpacker = msgpack.Unpacker()
+        counter = Counter(0)
         def cb(chunk, error, id):
+            counter.dec()
+            print len(chunk)
             data, items_hash = self.key.decrypt(chunk)
             assert self.key.id_hash(data) == id
             unpacker.feed(data)
             for item in unpacker:
                 callback(item)
         for id, size, csize in self.metadata['items']:
+            # Limit the number of concurrent items requests to 3
+            self.store.flush_rpc(counter, 10)
+            counter.inc()
             self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
 
     def add_item(self, item, refs=None):
         data = msgpack.packb(item)
-        prefix = dirname(item['path'])
-        if self.items_prefix and self.items_prefix != prefix:
+        if crc32(item['path'].encode('utf-8')) % 1000 == 0:
             self.flush_items()
         if refs:
             self.items_refs += refs
         self.items += data
-        self.items_prefix = prefix
 
     def flush_items(self):
         if not self.items:
             return
+        print 'flush', len(self.items)
         id = self.key.id_hash(self.items)
         if self.cache.seen_chunk(id):
             self.items_ids.append(self.cache.chunk_incref(id))
@@ -85,7 +89,6 @@ class Archive(object):
             self.items_ids.append(self.cache.add_chunk(id, self.items))
         self.items = ''
         self.items_refs = []
-        self.items_prefix = ''
 
     def save(self, name, cache):
         self.id = self.key.archive_hash(name)
@@ -171,27 +174,28 @@ class Archive(object):
                     os.unlink(path)
                 os.link(source, path)
             else:
-                def extract_cb(chunk, error, (id, i, last)):
-                    if i==0:
+                def extract_cb(chunk, error, (id, i)):
+                    if i == 0:
+                        state['fd'] = open(path, 'wb')
                         start_cb(item)
                     assert not error
                     data, hash = self.key.decrypt(chunk)
                     if self.key.id_hash(data) != id:
                         raise IntegrityError('chunk hash did not match')
-                    fd.write(data)
-                    if last:
-                        fd.close()
+                    state['fd'].write(data)
+                    if i == n - 1:
+                        state['fd'].close()
                         self.restore_attrs(path, item)
-
-                fd = open(path, 'wb')
+                state = {}
                 n = len(item['chunks'])
+                ## 0 chunks indicates an empty (0 bytes) file
                 if n == 0:
+                    open(path, 'wb').close()
                     start_cb(item)
                     self.restore_attrs(path, item)
-                    fd.close()
                 else:
                     for i, (id, size, csize) in enumerate(item['chunks']):
-                        self.store.get(NS_CHUNK, id, callback=extract_cb, callback_data=(id, i, i==n-1))
+                        self.store.get(NS_CHUNK, id, callback=extract_cb, callback_data=(id, i))
 
         else:
             raise Exception('Unknown archive item type %r' % item['mode'])

+ 19 - 0
darc/helpers.py

@@ -10,6 +10,25 @@ import stat
 import struct
 import time
 
+class Counter(object):
+
+    __slots__ = ('v',)
+
+    def __init__(self, value=0):
+        self.v = value
+
+    def inc(self, amount=1):
+        self.v += amount
+
+    def dec(self, amount=1):
+        self.v -= amount
+
+    def __cmp__(self, x):
+        return cmp(self.v, x)
+
+    def __repr__(self):
+        return '<Counter(%r)>' % self.v
+
 
 def deferrable(f):
     def wrapper(*args, **kw):

+ 31 - 15
darc/remote.py

@@ -7,6 +7,7 @@ import sys
 import getpass
 
 from .store import Store
+from .helpers import Counter
 
 
 BUFSIZE = 1024 * 1024
@@ -16,10 +17,10 @@ class ChannelNotifyer(object):
 
     def __init__(self, channel):
         self.channel = channel
-        self.enabled = 0
+        self.enabled = Counter()
 
     def set(self):
-        if self.enabled:
+        if self.enabled > 0:
             with self.channel.lock:
                 self.channel.out_buffer_cv.notifyAll()
 
@@ -106,6 +107,8 @@ class RemoteStore(object):
         self.channel.exec_command('darc serve')
         self.callbacks = {}
         self.msgid = 0
+        self.recursion = 0
+        self.odata = ''
         self.id, self.tid = self.cmd('open', (location.path, create))
 
     def wait(self, write=True):
@@ -113,39 +116,46 @@ class RemoteStore(object):
             if ((not write or self.channel.out_window_size == 0) and
                 len(self.channel.in_buffer._buffer) == 0 and
                 len(self.channel.in_stderr_buffer._buffer) == 0):
-                self.channel.out_buffer_cv.wait(10)
+                self.channel.out_buffer_cv.wait(1)
 
     def cmd(self, cmd, args, callback=None, callback_data=None):
         self.msgid += 1
-        self.notifier.enabled += 1
-        odata = msgpack.packb((0, self.msgid, cmd, args))
+        self.notifier.enabled.inc()
+        self.odata += msgpack.packb((0, self.msgid, cmd, args))
+        self.recursion += 1
         if callback:
             self.callbacks[self.msgid] = callback, callback_data
+            if self.recursion > 1:
+                self.recursion -= 1
+                return
         while True:
             if self.channel.closed:
+                self.recursion -= 1
                 raise Exception('Connection closed')
             elif self.channel.recv_stderr_ready():
                 print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
             elif self.channel.recv_ready():
                 self.unpacker.feed(self.channel.recv(BUFSIZE))
                 for type, msgid, error, res in self.unpacker:
-                    self.notifier.enabled -= 1
+                    self.notifier.enabled.dec()
                     if msgid == self.msgid:
                         if error:
                             raise self.RPCError(error)
+                        self.recursion -= 1
                         return res
                     else:
                         c, d = self.callbacks.pop(msgid, (None, None))
                         if c:
                             c(res, error, d)
-            elif odata and self.channel.send_ready():
-                n = self.channel.send(odata)
+            elif self.odata and self.channel.send_ready():
+                n = self.channel.send(self.odata)
                 if n > 0:
-                    odata = odata[n:]
-                if not odata and callback:
+                    self.odata = self.odata[n:]
+                if not self.odata and callback:
+                    self.recursion -= 1
                     return
             else:
-                self.wait(odata)
+                self.wait(self.odata)
 
     def commit(self, *args):
         self.cmd('commit', args)
@@ -176,20 +186,26 @@ class RemoteStore(object):
     def list(self, *args):
         return self.cmd('list', args)
 
-    def flush_rpc(self):
-        while True:
+    def flush_rpc(self, counter=None, backlog=0):
+        counter = counter or self.notifier.enabled
+        while counter > backlog:
             if self.channel.closed:
                 raise Exception('Connection closed')
+            elif self.odata and self.channel.send_ready():
+                n = self.channel.send(self.odata)
+                if n > 0:
+                    self.odata = self.odata[n:]
             elif self.channel.recv_stderr_ready():
                 print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
             elif self.channel.recv_ready():
                 self.unpacker.feed(self.channel.recv(BUFSIZE))
                 for type, msgid, error, res in self.unpacker:
-                    self.notifier.enabled -= 1
+                    self.notifier.enabled.dec()
                     c, d = self.callbacks.pop(msgid, (None, None))
                     if c:
                         c(res, error, d)
                     if msgid == self.msgid:
                         return
             else:
-                self.wait()
+                self.wait(self.odata)
+