Browse Source

Optimize list and verify for remote stores

Jonas Borgström 14 years ago
parent
commit
2d47afe368
2 changed files with 43 additions and 34 deletions
  1. 8 6
      darc/archive.py
  2. 35 28
      darc/archiver.py

+ 8 - 6
darc/archive.py

@@ -52,14 +52,16 @@ class Archive(object):
         t, f = self.metadata['time'].split('.', 1)
         t, f = self.metadata['time'].split('.', 1)
         return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
         return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
 
 
-    def get_items(self):
+    def iter_items(self, callback):
         unpacker = msgpack.Unpacker()
         unpacker = msgpack.Unpacker()
-        for id, size, csize in self.metadata['items']:
-            data, items_hash = self.key.decrypt(self.store.get(NS_CHUNK, id))
+        def cb(chunk, error, id):
+            data, items_hash = self.key.decrypt(chunk)
             assert self.key.id_hash(data) == id
             assert self.key.id_hash(data) == id
             unpacker.feed(data)
             unpacker.feed(data)
             for item in unpacker:
             for item in unpacker:
-                yield item
+                callback(item)
+        for id, size, csize in self.metadata['items']:
+            self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
 
 
     def add_item(self, item, refs=None):
     def add_item(self, item, refs=None):
         data = msgpack.packb(item)
         data = msgpack.packb(item)
@@ -236,7 +238,7 @@ class Archive(object):
                 self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=(id, i, i==n-1))
                 self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=(id, i, i==n-1))
 
 
     def delete(self, cache):
     def delete(self, cache):
-        def cb(chunk, error, id):
+        def callback(chunk, error, id):
             assert not error
             assert not error
             data, items_hash = self.key.decrypt(chunk)
             data, items_hash = self.key.decrypt(chunk)
             assert self.key.id_hash(data) == id
             assert self.key.id_hash(data) == id
@@ -251,7 +253,7 @@ class Archive(object):
         unpacker = msgpack.Unpacker()
         unpacker = msgpack.Unpacker()
         for id, size, csize in self.metadata['items']:
         for id, size, csize in self.metadata['items']:
             if self.cache.seen_chunk(id) == 1:
             if self.cache.seen_chunk(id) == 1:
-                self.store.get(NS_CHUNK, id, callback=cb, callback_data=id)
+                self.store.get(NS_CHUNK, id, callback=callback, callback_data=id)
             else:
             else:
                 self.cache.chunk_decref(id)
                 self.cache.chunk_decref(id)
         self.store.flush_rpc()
         self.store.flush_rpc()

+ 35 - 28
darc/archiver.py

@@ -115,19 +115,20 @@ class Archiver(object):
     def do_extract(self, args):
     def do_extract(self, args):
         def start_cb(item):
         def start_cb(item):
             self.print_verbose(item['path'].decode('utf-8'))
             self.print_verbose(item['path'].decode('utf-8'))
-        store = self.open_store(args.archive)
-        key = Key(store)
-        archive = Archive(store, key, args.archive.archive)
-        dirs = []
-        for item in archive.get_items():
+        def extract_cb(item):
             if exclude_path(item['path'], args.patterns):
             if exclude_path(item['path'], args.patterns):
-                continue
+                return
             archive.extract_item(item, args.dest, start_cb)
             archive.extract_item(item, args.dest, start_cb)
             if stat.S_ISDIR(item['mode']):
             if stat.S_ISDIR(item['mode']):
                 dirs.append(item)
                 dirs.append(item)
             if dirs and not item['path'].startswith(dirs[-1]['path']):
             if dirs and not item['path'].startswith(dirs[-1]['path']):
                 # Extract directories twice to make sure mtime is correctly restored
                 # Extract directories twice to make sure mtime is correctly restored
                 archive.extract_item(dirs.pop(-1), args.dest)
                 archive.extract_item(dirs.pop(-1), args.dest)
+        store = self.open_store(args.archive)
+        key = Key(store)
+        archive = Archive(store, key, args.archive.archive)
+        dirs = []
+        archive.iter_items(extract_cb)
         store.flush_rpc()
         store.flush_rpc()
         while dirs:
         while dirs:
             archive.extract_item(dirs.pop(-1), args.dest)
             archive.extract_item(dirs.pop(-1), args.dest)
@@ -142,29 +143,35 @@ class Archiver(object):
         return self.exit_code
         return self.exit_code
 
 
     def do_list(self, args):
     def do_list(self, args):
+        def callback(item):
+            type = tmap.get(item['mode'] / 4096, '?')
+            mode = format_file_mode(item['mode'])
+            size = 0
+            if type == '-':
+                try:
+                    size = sum(size for _, size, _ in item['chunks'])
+                except KeyError:
+                    pass
+            mtime = format_time(datetime.fromtimestamp(item['mtime']))
+            if 'source' in item:
+                if type == 'l':
+                    extra = ' -> %s' % item['source']
+                else:
+                    type = 'h'
+                    extra = ' link to %s' % item['source']
+            else:
+                extra = ''
+            print '%s%s %-6s %-6s %8d %s %s%s' % (type, mode, item['user'],
+                                              item['group'], size, mtime,
+                                              item['path'], extra)
+
         store = self.open_store(args.src)
         store = self.open_store(args.src)
         key = Key(store)
         key = Key(store)
         if args.src.archive:
         if args.src.archive:
             tmap = {1: 'p', 2: 'c', 4: 'd', 6: 'b', 010: '-', 012: 'l', 014: 's'}
             tmap = {1: 'p', 2: 'c', 4: 'd', 6: 'b', 010: '-', 012: 'l', 014: 's'}
             archive = Archive(store, key, args.src.archive)
             archive = Archive(store, key, args.src.archive)
-            for item in archive.get_items():
-                type = tmap.get(item['mode'] / 4096, '?')
-                mode = format_file_mode(item['mode'])
-                size = 0
-                if type == '-':
-                    size = sum(size for _, size, _ in item['chunks'])
-                mtime = format_time(datetime.fromtimestamp(item['mtime']))
-                if 'source' in item:
-                    if type == 'l':
-                        extra = ' -> %s' % item['source']
-                    else:
-                        type = 'h'
-                        extra = ' link to %s' % item['source']
-                else:
-                    extra = ''
-                print '%s%s %-6s %-6s %8d %s %s%s' % (type, mode, item['user'],
-                                                  item['group'], size, mtime,
-                                                  item['path'], extra)
+            archive.iter_items(callback)
+            store.flush_rpc()
         else:
         else:
             for archive in sorted(Archive.list_archives(store, key), key=attrgetter('ts')):
             for archive in sorted(Archive.list_archives(store, key), key=attrgetter('ts')):
                 print '%-20s %s' % (archive.metadata['name'], to_localtime(archive.ts).strftime('%c'))
                 print '%-20s %s' % (archive.metadata['name'], to_localtime(archive.ts).strftime('%c'))
@@ -182,12 +189,12 @@ class Archiver(object):
             else:
             else:
                 self.print_verbose('ERROR')
                 self.print_verbose('ERROR')
                 self.print_error('%s: verification failed' % item['path'])
                 self.print_error('%s: verification failed' % item['path'])
-
-        for item in archive.get_items():
+        def callback(item):
             if exclude_path(item['path'], args.patterns):
             if exclude_path(item['path'], args.patterns):
-                continue
-            if stat.S_ISREG(item['mode']) and not 'source' in item:
+                return
+            if stat.S_ISREG(item['mode']) and 'chunks' in item:
                 archive.verify_file(item, start_cb, result_cb)
                 archive.verify_file(item, start_cb, result_cb)
+        archive.iter_items(callback)
         store.flush_rpc()
         store.flush_rpc()
         return self.exit_code
         return self.exit_code