瀏覽代碼

Asynchronous extract and verify mode

Jonas Borgström 14 年之前
父節點
當前提交
3f20f8ed7d
共有 4 個文件被更改,包括 88 次插入44 次删除
  1. 38 24
      darc/archive.py
  2. 15 8
      darc/archiver.py
  3. 32 12
      darc/remote.py
  4. 3 0
      darc/store.py

+ 38 - 24
darc/archive.py

@@ -126,7 +126,7 @@ class Archive(object):
                 usize += size
         return osize, csize, usize
 
-    def extract_item(self, item, dest=None):
+    def extract_item(self, item, dest=None, start_cb=None):
         dest = dest or os.getcwdu()
         dir_stat_queue = []
         assert item['path'][0] not in ('/', '\\', ':')
@@ -159,17 +159,29 @@ class Archive(object):
                     os.unlink(path)
                 os.link(source, path)
             else:
-                with open(path, 'wb') as fd:
-                    for id in item['chunks']:
-                        try:
-                            magic, data, hash = self.keychain.decrypt(self.store.get(NS_CHUNK, id))
-                            assert magic == PACKET_CHUNK
-                            if self.keychain.id_hash(data) != id:
-                                raise IntegrityError('chunk hash did not match')
-                            fd.write(data)
-                        except ValueError:
-                            raise Exception('Invalid chunk checksum')
-                self.restore_attrs(path, item)
+                def extract_cb(chunk, error, (id, i, last)):
+                    if i==0:
+                        start_cb(item)
+                    assert not error
+                    magic, data, hash = self.keychain.decrypt(chunk)
+                    assert magic == PACKET_CHUNK
+                    if self.keychain.id_hash(data) != id:
+                        raise IntegrityError('chunk hash did not match')
+                    fd.write(data)
+                    if last:
+                        self.restore_attrs(path, item)
+                        fd.close()
+
+                fd = open(path, 'wb')
+                n = len(item['chunks'])
+                if n == 0:
+                    start_cb(item)
+                    self.restore_attrs(path, item)
+                    fd.close()
+                else:
+                    for i, id in enumerate(item['chunks']):
+                        self.store.get(NS_CHUNK, id, callback=extract_cb, callback_data=(id, i, i==n-1))
+
         else:
             raise Exception('Unknown archive item type %r' % item['mode'])
 
@@ -196,22 +208,24 @@ class Archive(object):
             # FIXME: We should really call futimes here (c extension required)
             os.utime(path, (item['atime'], item['mtime']))
 
-    def verify_file(self, item):
-        def verify_chunk(chunk, error, id):
+    def verify_file(self, item, start, result):
+        def verify_chunk(chunk, error, (id, i, last)):
+            if i == 0:
+                start(item)
             assert not error
             magic, data, hash = self.keychain.decrypt(chunk)
             assert magic == PACKET_CHUNK
             if self.keychain.id_hash(data) != id:
-                raise IntegrityError()
-        try:
-            for id in item['chunks'][:-1]:
-                self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=id)
-            last = item['chunks'][-1]
-            chunk = self.store.get(NS_CHUNK, last)
-            verify_chunk(chunk, None, last)
-            return True
-        except IntegrityError:
-            return False
+                result(item, False)
+            elif last:
+                result(item, True)
+        n = len(item['chunks'])
+        if n == 0:
+            start(item)
+            result(item, True)
+        else:
+            for i, id in enumerate(item['chunks']):
+                self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=(id, i, i==n-1))
 
     def delete(self, cache):
         for id, size in self.get_chunks():

+ 15 - 8
darc/archiver.py

@@ -109,6 +109,8 @@ class Archiver(object):
             self.print_error('Unknown file type: %s', path)
 
     def do_extract(self, args):
+        def start_cb(item):
+            self.print_verbose(item['path'].decode('utf-8'))
         store = self.open_store(args.archive)
         keychain = Keychain(args.keychain)
         archive = Archive(store, keychain, args.archive.archive)
@@ -116,13 +118,13 @@ class Archiver(object):
         for item in archive.get_items():
             if exclude_path(item['path'], args.patterns):
                 continue
-            self.print_verbose(item['path'].decode('utf-8'))
-            archive.extract_item(item, args.dest)
+            archive.extract_item(item, args.dest, start_cb)
             if stat.S_ISDIR(item['mode']):
                 dirs.append(item)
             if dirs and not item['path'].startswith(dirs[-1]['path']):
                 # Extract directories twice to make sure mtime is correctly restored
                 archive.extract_item(dirs.pop(-1), args.dest)
+        store.flush_rpc()
         while dirs:
             archive.extract_item(dirs.pop(-1), args.dest)
         return self.exit_code
@@ -166,16 +168,21 @@ class Archiver(object):
         store = self.open_store(args.archive)
         keychain = Keychain(args.keychain)
         archive = Archive(store, keychain, args.archive.archive)
+        def start_cb(item):
+            self.print_verbose('%s ...', item['path'].decode('utf-8'), newline=False)
+        def result_cb(item, success):
+            if success:
+                self.print_verbose('OK')
+            else:
+                self.print_verbose('ERROR')
+                self.print_error('%s: verification failed' % item['path'])
+
         for item in archive.get_items():
             if exclude_path(item['path'], args.patterns):
                 continue
             if stat.S_ISREG(item['mode']) and not 'source' in item:
-                self.print_verbose('%s ...', item['path'].decode('utf-8'), newline=False)
-                if archive.verify_file(item):
-                    self.print_verbose('OK')
-                else:
-                    self.print_verbose('ERROR')
-                    self.print_error('%s: verification failed' % item['path'])
+                archive.verify_file(item, start_cb, result_cb)
+        store.flush_rpc()
         return self.exit_code
 
     def do_info(self, args):

+ 32 - 12
darc/remote.py

@@ -108,21 +108,22 @@ class RemoteStore(object):
         self.msgid = 0
         self.id, self.tid = self.cmd('open', (location.path, create))
 
+    def wait(self):
+        with self.channel.lock:
+            if (self.channel.out_window_size == 0 and
+                not self.channel.recv_ready() and
+                not self.channel.recv_stderr_ready()):
+                self.channel.out_buffer_cv.wait(10)
+
     def cmd(self, cmd, args, callback=None, callback_data=None):
         self.msgid += 1
         self.notifier.enabled += 1
+        odata = msgpack.packb((0, self.msgid, cmd, args))
         if callback:
             self.callbacks[self.msgid] = callback, callback_data
-        odata = msgpack.packb((0, self.msgid, cmd, args))
         while True:
             if self.channel.closed:
                 raise Exception('Connection closed')
-            if odata and self.channel.send_ready():
-                n = self.channel.send(odata)
-                if n > 0:
-                    odata = odata[n:]
-                if not odata and callback:
-                    return
             elif self.channel.recv_stderr_ready():
                 print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
             elif self.channel.recv_ready():
@@ -137,12 +138,14 @@ class RemoteStore(object):
                         c, d = self.callbacks.pop(msgid, (None, None))
                         if c:
                             c(res, error, d)
+            elif odata and self.channel.send_ready():
+                n = self.channel.send(odata)
+                if n > 0:
+                    odata = odata[n:]
+                if not odata and callback:
+                    return
             else:
-                with self.channel.lock:
-                    if (self.channel.out_window_size == 0 and
-                        not self.channel.recv_ready() and
-                        not self.channel.recv_stderr_ready()):
-                        self.channel.out_buffer_cv.wait(10)
+                self.wait()
 
     def commit(self, *args):
         self.cmd('commit', args)
@@ -173,3 +176,20 @@ class RemoteStore(object):
     def list(self, *args):
         return self.cmd('list', args)
 
+    def flush_rpc(self):
+        while True:
+            if self.channel.closed:
+                raise Exception('Connection closed')
+            elif self.channel.recv_stderr_ready():
+                print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
+            elif self.channel.recv_ready():
+                self.unpacker.feed(self.channel.recv(BUFSIZE))
+                for type, msgid, error, res in self.unpacker:
+                    self.notifier.enabled -= 1
+                    c, d = self.callbacks.pop(msgid, (None, None))
+                    if c:
+                        c(res, error, d)
+                    if msgid == self.msgid:
+                        return
+            else:
+                self.wait()

+ 3 - 0
darc/store.py

@@ -211,6 +211,9 @@ class Store(object):
     def list(self, ns, marker=None, limit=1000000):
         return [key for key, value in self.get_index(ns).iteritems(marker=marker, limit=limit)]
 
+    def flush_rpc(self):
+        pass
+
 
 class BandIO(object):