Преглед изворни кода

Split large archive item packages into multiple smaller ones

Jonas Borgström пре 14 година
родитељ
комит
c9aca63d6d
2 измењених фајлова са 41 додато и 30 уклоњено
  1. 35 23
      darc/archive.py
  2. 6 7
      darc/archiver.py

+ 35 - 23
darc/archive.py

@@ -27,6 +27,7 @@ class Archive(object):
         self.keychain = keychain
         self.store = store
         self.items = []
+        self.items_ids = []
         self.hard_links = {}
         if name:
             self.load(self.keychain.id_hash(name))
@@ -40,17 +41,33 @@ class Archive(object):
         self.metadata = msgpack.unpackb(data)
         assert self.metadata['version'] == 1
 
-    def get_items(self):
+    def get_chunks(self):
         data, chunks_hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_CHUNKS, self.id))
         chunks = msgpack.unpackb(data)
         assert chunks['version'] == 1
         assert self.metadata['chunks_hash'] == chunks_hash
-        self.chunks = chunks['chunks']
-        data, items_hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_ITEMS, self.id))
-        items = msgpack.unpackb(data)
-        assert items['version'] == 1
-        assert self.metadata['items_hash'] == items_hash
-        self.items = items['items']
+        return chunks['chunks']
+
+    def get_items(self):
+        for id in self.metadata['items_ids']:
+            data, items_hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_ITEMS, id))
+            assert items_hash == id
+            items = msgpack.unpackb(data)
+            assert items['version'] == 1
+            for item in items['items']:
+                yield item
+
+    def add_item(self, item):
+        self.items.append(item)
+        if len(self.items) > 100000:
+            self.flush_items()
+
+    def flush_items(self):
+        items = {'version': 1, 'items': self.items}
+        data, items_hash = self.keychain.encrypt_read(msgpack.packb(items))
+        self.store.put(NS_ARCHIVE_ITEMS, items_hash, data)
+        self.items = []
+        self.items_ids.append(items_hash)
 
     def save(self, name, cache):
         self.id = self.keychain.id_hash(name)
@@ -58,14 +75,12 @@ class Archive(object):
         chunks = {'version': 1, 'chunks': self.chunks}
         data, chunks_hash = self.keychain.encrypt_create(msgpack.packb(chunks))
         self.store.put(NS_ARCHIVE_CHUNKS, self.id, data)
-        items = {'version': 1, 'items': self.items}
-        data, items_hash = self.keychain.encrypt_read(msgpack.packb(items))
-        self.store.put(NS_ARCHIVE_ITEMS, self.id, data)
+        self.flush_items()
         metadata = {
             'version': 1,
             'name': name,
             'chunks_hash': chunks_hash,
-            'items_hash': items_hash,
+            'items_ids': self.items_ids,
             'cmdline': sys.argv,
             'hostname': socket.gethostname(),
             'username': getuser(),
@@ -76,12 +91,11 @@ class Archive(object):
         self.store.commit()
 
     def stats(self, cache):
-        self.get_items()
         osize = csize = usize = 0
-        for item in self.items:
+        for item in self.get_items():
             if stat.S_ISREG(item['mode']) and not 'source' in item:
                 osize += item['size']
-        for id, size in self.chunks:
+        for id, size in self.get_chunks():
             csize += size
             if cache.seen_chunk(id) == 1:
                 usize += size
@@ -167,11 +181,10 @@ class Archive(object):
         return True
 
     def delete(self, cache):
-        self.get_items()
         self.store.delete(NS_ARCHIVE_CHUNKS, self.id)
         self.store.delete(NS_ARCHIVE_ITEMS, self.id)
         self.store.delete(NS_ARCHIVE_METADATA, self.id)
-        for id, size in self.chunks:
+        for id, size in self.get_chunks():
             cache.chunk_decref(id)
         self.store.commit()
         cache.save()
@@ -200,18 +213,18 @@ class Archive(object):
     def process_dir(self, path, st):
         item = {'path': path.lstrip('/\\:')}
         item.update(self.stat_attrs(st, path))
-        self.items.append(item)
+        self.add_item(item)
 
     def process_fifo(self, path, st):
         item = {'path': path.lstrip('/\\:')}
         item.update(self.stat_attrs(st, path))
-        self.items.append(item)
+        self.add_item(item)
 
     def process_symlink(self, path, st):
         source = os.readlink(path)
         item = {'path': path.lstrip('/\\:'), 'source': source}
         item.update(self.stat_attrs(st, path))
-        self.items.append(item)
+        self.add_item(item)
 
     def process_file(self, path, st, cache):
         safe_path = path.lstrip('/\\:')
@@ -219,9 +232,8 @@ class Archive(object):
         if st.st_nlink > 1:
             source = self.hard_links.get((st.st_ino, st.st_dev))
             if (st.st_ino, st.st_dev) in self.hard_links:
-                self.items.append({'mode': st.st_mode,
-                                   'path': path,
-                                   'source': source})
+                self.add_item({'mode': st.st_mode,
+                               'path': path, 'source': source})
                 return
             else:
                 self.hard_links[st.st_ino, st.st_dev] = safe_path
@@ -248,7 +260,7 @@ class Archive(object):
             cache.memorize_file_chunks(path_hash, st, ids)
         item = {'path': safe_path, 'chunks': ids, 'size': size}
         item.update(self.stat_attrs(st, path))
-        self.items.append(item)
+        self.add_item(item)
 
     @staticmethod
     def list_archives(store, keychain):

+ 6 - 7
darc/archiver.py

@@ -114,9 +114,8 @@ class Archiver(object):
         store = self.open_store(args.archive)
         keychain = Keychain(args.keychain)
         archive = Archive(store, keychain, args.archive.archive)
-        archive.get_items()
         dirs = []
-        for item in archive.items:
+        for item in archive.get_items():
             if exclude_path(item['path'], args.patterns):
                 continue
             self.print_verbose(item['path'].decode('utf-8'))
@@ -144,8 +143,7 @@ class Archiver(object):
         if args.src.archive:
             tmap = {1: 'p', 2: 'c', 4: 'd', 6: 'b', 010: '-', 012: 'l', 014: 's'}
             archive = Archive(store, keychain, args.src.archive)
-            archive.get_items()
-            for item in archive.items:
+            for item in archive.get_items():
                 type = tmap.get(item['mode'] / 4096, '?')
                 mode = format_file_mode(item['mode'])
                 size = item.get('size', 0)
@@ -158,11 +156,12 @@ class Archiver(object):
         return self.exit_code
 
     def do_verify(self, args):
+        import ipdb
+        ipdb.set_trace()
         store = self.open_store(args.archive)
         keychain = Keychain(args.keychain)
         archive = Archive(store, keychain, args.archive.archive)
-        archive.get_items()
-        for item in archive.items:
+        for item in archive.get_items():
             if stat.S_ISREG(item['mode']) and not 'source' in item:
                 self.print_verbose('%s ...', item['path'].decode('utf-8'), newline=False)
                 if archive.verify_file(item):
@@ -183,7 +182,7 @@ class Archiver(object):
         print 'Username:', archive.metadata['username']
         print 'Time:', archive.metadata['time']
         print 'Command line:', ' '.join(archive.metadata['cmdline'])
-        print 'Number of Files:', len(archive.items)
+        print 'Number of Files:', len(archive.get_items())
         print 'Original size:', format_file_size(osize)
         print 'Compressed size:', format_file_size(csize)
         print 'Unique data:', format_file_size(usize)