浏览代码

chunks index archive: remove all tar and compression related stuff and just use separate files in a directory

the compression was quite cpu intensive and didn't work that great anyway.
now the disk space usage is a bit higher, but it is much faster and less hard on the cpu.

disk space needs grow linearly with the amount and size of the archives, this
is a problem esp. if one has many and/or big archives (but this problem existed
before also because compression was not as effective as I believed).

the tar archive always needed a complete rebuild (and thus: decompression
and recompression) because deleting outdated archive indexes was not
possible in the tar file.

now we just have a directory chunks.archive.d and keep archive index files
there for all archives we already know.
if an archive does not exist any more in the repo, we just delete its index file.
if an archive is unknown still, we fetch the infos and build a new index file.

when merging, we avoid growing the hash table from zero, but just start
with the first archive's index as basis for merging.
Thomas Waldmann 9 年之前
父节点
当前提交
22dd925986
共有 1 个文件被更改,包括 84 次插入105 次删除
  1. 84 105
      borg/cache.py

+ 84 - 105
borg/cache.py

@@ -96,8 +96,7 @@ class Cache:
         with open(os.path.join(self.path, 'config'), 'w') as fd:
             config.write(fd)
         ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8'))
-        with open(os.path.join(self.path, 'chunks.archive'), 'wb') as fd:
-            pass  # empty file
+        os.makedirs(os.path.join(self.path, 'chunks.archive.d'))
         with open(os.path.join(self.path, 'files'), 'wb') as fd:
             pass  # empty file
 
@@ -153,7 +152,6 @@ class Cache:
         os.mkdir(txn_dir)
         shutil.copy(os.path.join(self.path, 'config'), txn_dir)
         shutil.copy(os.path.join(self.path, 'chunks'), txn_dir)
-        shutil.copy(os.path.join(self.path, 'chunks.archive'), txn_dir)
         shutil.copy(os.path.join(self.path, 'files'), txn_dir)
         os.rename(os.path.join(self.path, 'txn.tmp'),
                   os.path.join(self.path, 'txn.active'))
@@ -195,7 +193,6 @@ class Cache:
         if os.path.exists(txn_dir):
             shutil.copy(os.path.join(txn_dir, 'config'), self.path)
             shutil.copy(os.path.join(txn_dir, 'chunks'), self.path)
-            shutil.copy(os.path.join(txn_dir, 'chunks.archive'), self.path)
             shutil.copy(os.path.join(txn_dir, 'files'), self.path)
             os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
             if os.path.exists(os.path.join(self.path, 'txn.tmp')):
@@ -206,53 +203,14 @@ class Cache:
     def sync(self):
         """Re-synchronize chunks cache with repository.
 
-        If present, uses a compressed tar archive of known backup archive
-        indices, so it only needs to fetch infos from repo and build a chunk
-        index once per backup archive.
-        If out of sync, the tar gets rebuilt from known + fetched chunk infos,
-        so it has complete and current information about all backup archives.
-        Finally, it builds the master chunks index by merging all indices from
-        the tar.
+        Maintains a directory with known backup archive indexes, so it only
+        needs to fetch infos from repo and build a chunk index once per backup
+        archive.
+        If out of sync, missing archive indexes get added, outdated indexes
+        get removed and a new master chunks index is built by merging all
+        archive indexes.
         """
-        in_archive_path = os.path.join(self.path, 'chunks.archive')
-        out_archive_path = os.path.join(self.path, 'chunks.archive.tmp')
-
-        def open_in_archive():
-            try:
-                tf = tarfile.open(in_archive_path, 'r')
-            except OSError as e:
-                if e.errno != errno.ENOENT:
-                    raise
-                # file not found
-                tf = None
-            except tarfile.ReadError:
-                # empty file?
-                tf = None
-            return tf
-
-        def open_out_archive():
-            for compression in ('gz', ):
-                # 'xz' needs py 3.3 and is expensive on the cpu
-                # 'bz2' also works on 3.2 and is expensive on the cpu
-                # 'gz' also works on 3.2 and is less expensive on the cpu
-                try:
-                    tf = tarfile.open(out_archive_path, 'w:'+compression, format=tarfile.PAX_FORMAT)
-                    break
-                except tarfile.CompressionError:
-                    continue
-            else:  # shouldn't happen
-                tf = None
-            return tf
-
-        def close_archive(tf):
-            if tf:
-                tf.close()
-
-        def delete_in_archive():
-            os.unlink(in_archive_path)
-
-        def rename_out_archive():
-            os.rename(out_archive_path, in_archive_path)
+        archive_path = os.path.join(self.path, 'chunks.archive.d')
 
         def add(chunk_idx, id, size, csize, incr=1):
             try:
@@ -261,16 +219,21 @@ class Cache:
             except KeyError:
                 chunk_idx[id] = incr, size, csize
 
-        def transfer_known_idx(archive_id, tf_in, tf_out):
-            archive_id_hex = hexlify(archive_id).decode('ascii')
-            tarinfo = tf_in.getmember(archive_id_hex)
-            archive_name = tarinfo.pax_headers['archive_name']
-            print('Already known archive:', archive_name)
-            f_in = tf_in.extractfile(archive_id_hex)
-            tf_out.addfile(tarinfo, f_in)
-            return archive_name
-
-        def fetch_and_build_idx(archive_id, repository, key, tmp_dir, tf_out):
+        def mkpath(id, suffix=''):
+            path = os.path.join(archive_path, id + suffix)
+            return path.encode('utf-8')
+
+        def list_archives():
+            fns = os.listdir(archive_path)
+            # only return filenames that are 64 hex digits (256bit)
+            return [fn for fn in fns if len(fn) == 64]
+
+        def cleanup_outdated(ids):
+            for id in ids:
+                id_hex = hexlify(id).decode('ascii')
+                os.unlink(mkpath(id_hex))
+
+        def fetch_and_build_idx(archive_id, repository, key):
             chunk_idx = ChunkIndex()
             cdata = repository.get(archive_id)
             data = key.decrypt(archive_id, cdata)
@@ -293,55 +256,71 @@ class Cache:
                         for chunk_id, size, csize in item[b'chunks']:
                             add(chunk_idx, chunk_id, size, csize)
             archive_id_hex = hexlify(archive_id).decode('ascii')
-            file_tmp = os.path.join(tmp_dir, archive_id_hex).encode('utf-8')
-            chunk_idx.write(file_tmp)
-            tarinfo = tf_out.gettarinfo(file_tmp, archive_id_hex)
-            tarinfo.pax_headers['archive_name'] = archive[b'name']
-            with open(file_tmp, 'rb') as f:
-                tf_out.addfile(tarinfo, f)
-            os.unlink(file_tmp)
-
-        def create_master_idx(chunk_idx, tf_in, tmp_dir):
+            fn = mkpath(archive_id_hex)
+            fn_tmp = mkpath(archive_id_hex, suffix='.tmp')
+            try:
+                chunk_idx.write(fn_tmp)
+            except Exception:
+                os.unlink(fn_tmp)
+            else:
+                os.rename(fn_tmp, fn)
+
+        def create_master_idx(chunk_idx):
+            # deallocates old hashindex, creates empty hashindex:
             chunk_idx.clear()
-            for tarinfo in tf_in:
-                archive_id_hex = tarinfo.name
-                archive_name = tarinfo.pax_headers['archive_name']
-                print("- extracting archive %s ..." % archive_name)
-                tf_in.extract(archive_id_hex, tmp_dir)
-                chunk_idx_path = os.path.join(tmp_dir, archive_id_hex).encode('utf-8')
-                print("- reading archive ...")
-                archive_chunk_idx = ChunkIndex.read(chunk_idx_path)
-                print("- merging archive ...")
-                chunk_idx.merge(archive_chunk_idx)
-                os.unlink(chunk_idx_path)
+            archives = list_archives()
+            if archives:
+                chunk_idx = None
+                for fn in archives:
+                    archive_id_hex = fn
+                    archive_id = unhexlify(archive_id_hex)
+                    for name, info in self.manifest.archives.items():
+                        if info[b'id'] == archive_id:
+                            archive_name = name
+                            break
+                    archive_chunk_idx_path = mkpath(archive_id_hex)
+                    print("- reading archive %s ..." % archive_name)
+                    archive_chunk_idx = ChunkIndex.read(archive_chunk_idx_path)
+                    print("- merging archive ...")
+                    if chunk_idx is None:
+                        # we just use the first archive's idx as starting point,
+                        # to avoid growing the hash table from 0 size and also
+                        # to save 1 merge call.
+                        chunk_idx = archive_chunk_idx
+                    else:
+                        chunk_idx.merge(archive_chunk_idx)
+            return chunk_idx
+
+        def legacy_support():
+            try:
+                # get rid of the compressed tar file, if present
+                os.unlink(os.path.join(self.path, 'chunks.archive'))
+            except:
+                pass
+            try:
+                # create the directory for the archive index files we use now
+                os.mkdir(archive_path)
+            except:
+                pass
+
 
         self.begin_txn()
         print('Synchronizing chunks cache...')
-        # XXX we have to do stuff on disk due to lacking ChunkIndex api
-        with tempfile.TemporaryDirectory(prefix='borg-tmp') as tmp_dir:
-            repository = cache_if_remote(self.repository)
-            out_archive = open_out_archive()
-            in_archive = open_in_archive()
-            if in_archive:
-                known_ids = set(unhexlify(hexid) for hexid in in_archive.getnames())
-            else:
-                known_ids = set()
-            archive_ids = set(info[b'id'] for info in self.manifest.archives.values())
-            print('Rebuilding archive collection. Known: %d Repo: %d Unknown: %d' % (
-                len(known_ids), len(archive_ids), len(archive_ids - known_ids), ))
-            for archive_id in archive_ids & known_ids:
-                transfer_known_idx(archive_id, in_archive, out_archive)
-            close_archive(in_archive)
-            delete_in_archive()  # free disk space
-            for archive_id in archive_ids - known_ids:
-                fetch_and_build_idx(archive_id, repository, self.key, tmp_dir, out_archive)
-            close_archive(out_archive)
-            rename_out_archive()
-            print('Merging collection into master chunks cache...')
-            in_archive = open_in_archive()
-            create_master_idx(self.chunks, in_archive, tmp_dir)
-            close_archive(in_archive)
-            print('Done.')
+        repository = cache_if_remote(self.repository)
+        legacy_support()
+        known_ids = set(unhexlify(hexid) for hexid in list_archives())
+        archive_ids = set(info[b'id'] for info in self.manifest.archives.values())
+        print('Rebuilding archive collection. Repo: %d Known: %d Outdated: %d Unknown: %d' % (
+            len(archive_ids), len(known_ids),
+            len(known_ids - archive_ids), len(archive_ids - known_ids), ))
+        cleanup_outdated(known_ids - archive_ids)
+        for archive_id in archive_ids - known_ids:
+            fetch_and_build_idx(archive_id, repository, self.key)
+        known_ids = set(unhexlify(hexid) for hexid in list_archives())
+        assert known_ids == archive_ids
+        print('Merging collection into master chunks cache...')
+        self.chunks = create_master_idx(self.chunks)
+        print('Done.')
 
     def add_chunk(self, id, data, stats):
         if not self.txn_active: