|
@@ -12,26 +12,22 @@ logger = create_logger()
|
|
|
files_cache_logger = create_logger("borg.debug.files_cache")
|
|
|
|
|
|
from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED, ROBJ_FILE_STREAM
|
|
|
-from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
|
|
|
+from .hashindex import ChunkIndex, ChunkIndexEntry
|
|
|
from .helpers import Error
|
|
|
from .helpers import get_cache_dir, get_security_dir
|
|
|
-from .helpers import bin_to_hex, hex_to_bin, parse_stringified_list
|
|
|
+from .helpers import hex_to_bin, parse_stringified_list
|
|
|
from .helpers import format_file_size
|
|
|
from .helpers import safe_ns
|
|
|
from .helpers import yes
|
|
|
-from .helpers import remove_surrogates
|
|
|
-from .helpers import ProgressIndicatorPercent, ProgressIndicatorMessage
|
|
|
-from .helpers import set_ec, EXIT_WARNING
|
|
|
-from .helpers import safe_unlink
|
|
|
+from .helpers import ProgressIndicatorMessage
|
|
|
from .helpers import msgpack
|
|
|
from .helpers.msgpack import int_to_timestamp, timestamp_to_int
|
|
|
-from .item import ArchiveItem, ChunkListEntry
|
|
|
+from .item import ChunkListEntry
|
|
|
from .crypto.key import PlaintextKey
|
|
|
-from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityCheckedFile, FileIntegrityError
|
|
|
+from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
|
|
|
from .locking import Lock
|
|
|
from .manifest import Manifest
|
|
|
from .platform import SaveFile
|
|
|
-from .remote import cache_if_remote
|
|
|
from .remote3 import RemoteRepository3
|
|
|
from .repository3 import LIST_SCAN_LIMIT, Repository3
|
|
|
|
|
@@ -355,24 +351,10 @@ class Cache:
|
|
|
warn_if_unencrypted=True,
|
|
|
progress=False,
|
|
|
lock_wait=None,
|
|
|
- no_cache_sync_permitted=False,
|
|
|
- no_cache_sync_forced=False,
|
|
|
prefer_adhoc_cache=False,
|
|
|
cache_mode=FILES_CACHE_MODE_DISABLED,
|
|
|
iec=False,
|
|
|
):
|
|
|
- def local():
|
|
|
- return LocalCache(
|
|
|
- manifest=manifest,
|
|
|
- path=path,
|
|
|
- sync=sync,
|
|
|
- warn_if_unencrypted=warn_if_unencrypted,
|
|
|
- progress=progress,
|
|
|
- iec=iec,
|
|
|
- lock_wait=lock_wait,
|
|
|
- cache_mode=cache_mode,
|
|
|
- )
|
|
|
-
|
|
|
def adhocwithfiles():
|
|
|
return AdHocWithFilesCache(
|
|
|
manifest=manifest,
|
|
@@ -389,38 +371,14 @@ class Cache:
|
|
|
|
|
|
impl = get_cache_impl()
|
|
|
if impl != "cli":
|
|
|
- methods = dict(local=local, adhocwithfiles=adhocwithfiles, adhoc=adhoc)
|
|
|
+ methods = dict(adhocwithfiles=adhocwithfiles, adhoc=adhoc)
|
|
|
try:
|
|
|
method = methods[impl]
|
|
|
except KeyError:
|
|
|
raise RuntimeError("Unknown BORG_CACHE_IMPL value: %s" % impl)
|
|
|
return method()
|
|
|
|
|
|
- if no_cache_sync_forced:
|
|
|
- return adhoc() if prefer_adhoc_cache else adhocwithfiles()
|
|
|
-
|
|
|
- if not no_cache_sync_permitted:
|
|
|
- return local()
|
|
|
-
|
|
|
- # no cache sync may be permitted, but if the local cache is in sync it'd be stupid to invalidate
|
|
|
- # it by needlessly using the AdHocCache or the AdHocWithFilesCache.
|
|
|
- # Check if the local cache exists and is in sync.
|
|
|
-
|
|
|
- cache_config = CacheConfig(repository, path, lock_wait)
|
|
|
- if cache_config.exists():
|
|
|
- with cache_config:
|
|
|
- cache_in_sync = cache_config.manifest_id == manifest.id
|
|
|
- # Don't nest cache locks
|
|
|
- if cache_in_sync:
|
|
|
- # Local cache is in sync, use it
|
|
|
- logger.debug("Cache: choosing local cache (in sync)")
|
|
|
- return local()
|
|
|
- if prefer_adhoc_cache: # adhoc cache, without files cache
|
|
|
- logger.debug("Cache: choosing AdHocCache (local cache does not exist or is not in sync)")
|
|
|
- return adhoc()
|
|
|
- else:
|
|
|
- logger.debug("Cache: choosing AdHocWithFilesCache (local cache does not exist or is not in sync)")
|
|
|
- return adhocwithfiles()
|
|
|
+ return adhoc() if prefer_adhoc_cache else adhocwithfiles()
|
|
|
|
|
|
|
|
|
class CacheStatsMixin:
|
|
@@ -671,15 +629,7 @@ class ChunksMixin:
|
|
|
entry = self.chunks.get(id, ChunkIndexEntry(0, None))
|
|
|
if entry.refcount and size is not None:
|
|
|
assert isinstance(entry.size, int)
|
|
|
- if entry.size:
|
|
|
- # LocalCache: has existing size information and uses *size* to make an effort at detecting collisions.
|
|
|
- if size != entry.size:
|
|
|
- # we already have a chunk with that id, but different size.
|
|
|
- # this is either a hash collision (unlikely) or corruption or a bug.
|
|
|
- raise Exception(
|
|
|
- "chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, entry.size, size)
|
|
|
- )
|
|
|
- else:
|
|
|
+ if not entry.size:
|
|
|
# AdHocWithFilesCache / AdHocCache:
|
|
|
# Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
|
|
|
self.chunks[id] = entry._replace(size=size)
|
|
@@ -737,7 +687,7 @@ class ChunksMixin:
|
|
|
for id_ in result:
|
|
|
num_chunks += 1
|
|
|
chunks[id_] = init_entry
|
|
|
- # LocalCache does not contain the manifest, either.
|
|
|
+ # Cache does not contain the manifest.
|
|
|
if not isinstance(self.repository, (Repository3, RemoteRepository3)):
|
|
|
del chunks[self.manifest.MANIFEST_ID]
|
|
|
duration = perf_counter() - t0 or 0.01
|
|
@@ -753,413 +703,6 @@ class ChunksMixin:
|
|
|
return chunks
|
|
|
|
|
|
|
|
|
-class LocalCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin):
|
|
|
- """
|
|
|
- Persistent, local (client-side) cache.
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(
|
|
|
- self,
|
|
|
- manifest,
|
|
|
- path=None,
|
|
|
- sync=True,
|
|
|
- warn_if_unencrypted=True,
|
|
|
- progress=False,
|
|
|
- lock_wait=None,
|
|
|
- cache_mode=FILES_CACHE_MODE_DISABLED,
|
|
|
- iec=False,
|
|
|
- ):
|
|
|
- """
|
|
|
- :param warn_if_unencrypted: print warning if accessing unknown unencrypted repository
|
|
|
- :param lock_wait: timeout for lock acquisition (int [s] or None [wait forever])
|
|
|
- :param sync: do :meth:`.sync`
|
|
|
- :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
|
|
|
- """
|
|
|
- CacheStatsMixin.__init__(self, iec=iec)
|
|
|
- FilesCacheMixin.__init__(self, cache_mode)
|
|
|
- assert isinstance(manifest, Manifest)
|
|
|
- self.manifest = manifest
|
|
|
- self.repository = manifest.repository
|
|
|
- self.key = manifest.key
|
|
|
- self.repo_objs = manifest.repo_objs
|
|
|
- self.progress = progress
|
|
|
- self._txn_active = False
|
|
|
- self.do_cache = os.environ.get("BORG_USE_CHUNKS_ARCHIVE", "yes").lower() in ["yes", "1", "true"]
|
|
|
-
|
|
|
- self.path = cache_dir(self.repository, path)
|
|
|
- self.security_manager = SecurityManager(self.repository)
|
|
|
- self.cache_config = CacheConfig(self.repository, self.path, lock_wait)
|
|
|
-
|
|
|
- # Warn user before sending data to a never seen before unencrypted repository
|
|
|
- if not os.path.exists(self.path):
|
|
|
- self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, self.key)
|
|
|
- self.create()
|
|
|
-
|
|
|
- try:
|
|
|
- self.open()
|
|
|
- except (FileNotFoundError, FileIntegrityError):
|
|
|
- self.wipe_cache()
|
|
|
- self.open()
|
|
|
-
|
|
|
- try:
|
|
|
- self.security_manager.assert_secure(manifest, self.key)
|
|
|
-
|
|
|
- if not self.check_cache_compatibility():
|
|
|
- self.wipe_cache()
|
|
|
-
|
|
|
- self.update_compatibility()
|
|
|
-
|
|
|
- if sync and self.manifest.id != self.cache_config.manifest_id:
|
|
|
- self.sync()
|
|
|
- self.commit()
|
|
|
- except: # noqa
|
|
|
- self.close()
|
|
|
- raise
|
|
|
-
|
|
|
- def __enter__(self):
|
|
|
- return self
|
|
|
-
|
|
|
- def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
- self.close()
|
|
|
-
|
|
|
- def create(self):
|
|
|
- """Create a new empty cache at `self.path`"""
|
|
|
- os.makedirs(self.path)
|
|
|
- with open(os.path.join(self.path, "README"), "w") as fd:
|
|
|
- fd.write(CACHE_README)
|
|
|
- self.cache_config.create()
|
|
|
- ChunkIndex().write(os.path.join(self.path, "chunks"))
|
|
|
- os.makedirs(os.path.join(self.path, "chunks.archive.d"))
|
|
|
- self._create_empty_files_cache(self.path)
|
|
|
-
|
|
|
- def _do_open(self):
|
|
|
- self.cache_config.load()
|
|
|
- with IntegrityCheckedFile(
|
|
|
- path=os.path.join(self.path, "chunks"),
|
|
|
- write=False,
|
|
|
- integrity_data=self.cache_config.integrity.get("chunks"),
|
|
|
- ) as fd:
|
|
|
- self.chunks = ChunkIndex.read(fd)
|
|
|
- self._read_files_cache()
|
|
|
-
|
|
|
- def open(self):
|
|
|
- if not os.path.isdir(self.path):
|
|
|
- raise Exception("%s Does not look like a Borg cache" % self.path)
|
|
|
- self.cache_config.open()
|
|
|
- self.rollback()
|
|
|
-
|
|
|
- def close(self):
|
|
|
- if self.cache_config is not None:
|
|
|
- self.cache_config.close()
|
|
|
- self.cache_config = None
|
|
|
-
|
|
|
- def begin_txn(self):
|
|
|
- # Initialize transaction snapshot
|
|
|
- pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
|
|
|
- txn_dir = os.path.join(self.path, "txn.tmp")
|
|
|
- os.mkdir(txn_dir)
|
|
|
- pi.output("Initializing cache transaction: Reading config")
|
|
|
- shutil.copy(os.path.join(self.path, "config"), txn_dir)
|
|
|
- pi.output("Initializing cache transaction: Reading chunks")
|
|
|
- shutil.copy(os.path.join(self.path, "chunks"), txn_dir)
|
|
|
- pi.output("Initializing cache transaction: Reading files")
|
|
|
- try:
|
|
|
- shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir)
|
|
|
- except FileNotFoundError:
|
|
|
- self._create_empty_files_cache(txn_dir)
|
|
|
- os.replace(txn_dir, os.path.join(self.path, "txn.active"))
|
|
|
- self._txn_active = True
|
|
|
- pi.finish()
|
|
|
-
|
|
|
- def commit(self):
|
|
|
- """Commit transaction"""
|
|
|
- if not self._txn_active:
|
|
|
- return
|
|
|
- self.security_manager.save(self.manifest, self.key)
|
|
|
- pi = ProgressIndicatorMessage(msgid="cache.commit")
|
|
|
- if self.files is not None:
|
|
|
- pi.output("Saving files cache")
|
|
|
- integrity_data = self._write_files_cache()
|
|
|
- self.cache_config.integrity[self.files_cache_name()] = integrity_data
|
|
|
- pi.output("Saving chunks cache")
|
|
|
- with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd:
|
|
|
- self.chunks.write(fd)
|
|
|
- self.cache_config.integrity["chunks"] = fd.integrity_data
|
|
|
- pi.output("Saving cache config")
|
|
|
- self.cache_config.save(self.manifest)
|
|
|
- os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
|
|
|
- shutil.rmtree(os.path.join(self.path, "txn.tmp"))
|
|
|
- self._txn_active = False
|
|
|
- pi.finish()
|
|
|
-
|
|
|
- def rollback(self):
|
|
|
- """Roll back partial and aborted transactions"""
|
|
|
- # Remove partial transaction
|
|
|
- if os.path.exists(os.path.join(self.path, "txn.tmp")):
|
|
|
- shutil.rmtree(os.path.join(self.path, "txn.tmp"))
|
|
|
- # Roll back active transaction
|
|
|
- txn_dir = os.path.join(self.path, "txn.active")
|
|
|
- if os.path.exists(txn_dir):
|
|
|
- shutil.copy(os.path.join(txn_dir, "config"), self.path)
|
|
|
- shutil.copy(os.path.join(txn_dir, "chunks"), self.path)
|
|
|
- shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path)
|
|
|
- txn_tmp = os.path.join(self.path, "txn.tmp")
|
|
|
- os.replace(txn_dir, txn_tmp)
|
|
|
- if os.path.exists(txn_tmp):
|
|
|
- shutil.rmtree(txn_tmp)
|
|
|
- self._txn_active = False
|
|
|
- self._do_open()
|
|
|
-
|
|
|
- def sync(self):
|
|
|
- """Re-synchronize chunks cache with repository.
|
|
|
-
|
|
|
- Maintains a directory with known backup archive indexes, so it only
|
|
|
- needs to fetch infos from repo and build a chunk index once per backup
|
|
|
- archive.
|
|
|
- If out of sync, missing archive indexes get added, outdated indexes
|
|
|
- get removed and a new master chunks index is built by merging all
|
|
|
- archive indexes.
|
|
|
- """
|
|
|
- archive_path = os.path.join(self.path, "chunks.archive.d")
|
|
|
- # Instrumentation
|
|
|
- processed_item_metadata_bytes = 0
|
|
|
- processed_item_metadata_chunks = 0
|
|
|
- compact_chunks_archive_saved_space = 0
|
|
|
-
|
|
|
- def mkpath(id, suffix=""):
|
|
|
- id_hex = bin_to_hex(id)
|
|
|
- path = os.path.join(archive_path, id_hex + suffix)
|
|
|
- return path
|
|
|
-
|
|
|
- def cached_archives():
|
|
|
- if self.do_cache:
|
|
|
- fns = os.listdir(archive_path)
|
|
|
- # filenames with 64 hex digits == 256bit,
|
|
|
- # or compact indices which are 64 hex digits + ".compact"
|
|
|
- return {hex_to_bin(fn) for fn in fns if len(fn) == 64} | {
|
|
|
- hex_to_bin(fn[:64]) for fn in fns if len(fn) == 72 and fn.endswith(".compact")
|
|
|
- }
|
|
|
- else:
|
|
|
- return set()
|
|
|
-
|
|
|
- def repo_archives():
|
|
|
- return {info.id for info in self.manifest.archives.list()}
|
|
|
-
|
|
|
- def cleanup_outdated(ids):
|
|
|
- for id in ids:
|
|
|
- cleanup_cached_archive(id)
|
|
|
-
|
|
|
- def cleanup_cached_archive(id, cleanup_compact=True):
|
|
|
- try:
|
|
|
- os.unlink(mkpath(id))
|
|
|
- os.unlink(mkpath(id) + ".integrity")
|
|
|
- except FileNotFoundError:
|
|
|
- pass
|
|
|
- if not cleanup_compact:
|
|
|
- return
|
|
|
- try:
|
|
|
- os.unlink(mkpath(id, suffix=".compact"))
|
|
|
- os.unlink(mkpath(id, suffix=".compact") + ".integrity")
|
|
|
- except FileNotFoundError:
|
|
|
- pass
|
|
|
-
|
|
|
- def fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx):
|
|
|
- nonlocal processed_item_metadata_bytes
|
|
|
- nonlocal processed_item_metadata_chunks
|
|
|
- csize, data = decrypted_repository.get(archive_id)
|
|
|
- chunk_idx.add(archive_id, 1, len(data))
|
|
|
- archive = self.key.unpack_archive(data)
|
|
|
- archive = ArchiveItem(internal_dict=archive)
|
|
|
- if archive.version not in (1, 2): # legacy
|
|
|
- raise Exception("Unknown archive metadata version")
|
|
|
- if archive.version == 1:
|
|
|
- items = archive.items
|
|
|
- elif archive.version == 2:
|
|
|
- items = []
|
|
|
- for chunk_id, (csize, data) in zip(archive.item_ptrs, decrypted_repository.get_many(archive.item_ptrs)):
|
|
|
- chunk_idx.add(chunk_id, 1, len(data))
|
|
|
- ids = msgpack.unpackb(data)
|
|
|
- items.extend(ids)
|
|
|
- sync = CacheSynchronizer(chunk_idx)
|
|
|
- for item_id, (csize, data) in zip(items, decrypted_repository.get_many(items)):
|
|
|
- chunk_idx.add(item_id, 1, len(data))
|
|
|
- processed_item_metadata_bytes += len(data)
|
|
|
- processed_item_metadata_chunks += 1
|
|
|
- sync.feed(data)
|
|
|
- if self.do_cache:
|
|
|
- write_archive_index(archive_id, chunk_idx)
|
|
|
-
|
|
|
- def write_archive_index(archive_id, chunk_idx):
|
|
|
- nonlocal compact_chunks_archive_saved_space
|
|
|
- compact_chunks_archive_saved_space += chunk_idx.compact()
|
|
|
- fn = mkpath(archive_id, suffix=".compact")
|
|
|
- fn_tmp = mkpath(archive_id, suffix=".tmp")
|
|
|
- try:
|
|
|
- with DetachedIntegrityCheckedFile(
|
|
|
- path=fn_tmp, write=True, filename=bin_to_hex(archive_id) + ".compact"
|
|
|
- ) as fd:
|
|
|
- chunk_idx.write(fd)
|
|
|
- except Exception:
|
|
|
- safe_unlink(fn_tmp)
|
|
|
- else:
|
|
|
- os.replace(fn_tmp, fn)
|
|
|
-
|
|
|
- def read_archive_index(archive_id, archive_name):
|
|
|
- archive_chunk_idx_path = mkpath(archive_id)
|
|
|
- logger.info("Reading cached archive chunk index for %s", archive_name)
|
|
|
- try:
|
|
|
- try:
|
|
|
- # Attempt to load compact index first
|
|
|
- with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path + ".compact", write=False) as fd:
|
|
|
- archive_chunk_idx = ChunkIndex.read(fd, permit_compact=True)
|
|
|
- # In case a non-compact index exists, delete it.
|
|
|
- cleanup_cached_archive(archive_id, cleanup_compact=False)
|
|
|
- # Compact index read - return index, no conversion necessary (below).
|
|
|
- return archive_chunk_idx
|
|
|
- except FileNotFoundError:
|
|
|
- # No compact index found, load non-compact index, and convert below.
|
|
|
- with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path, write=False) as fd:
|
|
|
- archive_chunk_idx = ChunkIndex.read(fd)
|
|
|
- except FileIntegrityError as fie:
|
|
|
- logger.error("Cached archive chunk index of %s is corrupted: %s", archive_name, fie)
|
|
|
- # Delete corrupted index, set warning. A new index must be build.
|
|
|
- cleanup_cached_archive(archive_id)
|
|
|
- set_ec(EXIT_WARNING)
|
|
|
- return None
|
|
|
-
|
|
|
- # Convert to compact index. Delete the existing index first.
|
|
|
- logger.debug("Found non-compact index for %s, converting to compact.", archive_name)
|
|
|
- cleanup_cached_archive(archive_id)
|
|
|
- write_archive_index(archive_id, archive_chunk_idx)
|
|
|
- return archive_chunk_idx
|
|
|
-
|
|
|
- def get_archive_ids_to_names(archive_ids):
|
|
|
- # Pass once over all archives and build a mapping from ids to names.
|
|
|
- # The easier approach, doing a similar loop for each archive, has
|
|
|
- # square complexity and does about a dozen million functions calls
|
|
|
- # with 1100 archives (which takes 30s CPU seconds _alone_).
|
|
|
- archive_names = {}
|
|
|
- for info in self.manifest.archives.list():
|
|
|
- if info.id in archive_ids:
|
|
|
- archive_names[info.id] = info.name
|
|
|
- assert len(archive_names) == len(archive_ids)
|
|
|
- return archive_names
|
|
|
-
|
|
|
- def create_master_idx(chunk_idx):
|
|
|
- logger.debug("Synchronizing chunks index...")
|
|
|
- cached_ids = cached_archives()
|
|
|
- archive_ids = repo_archives()
|
|
|
- logger.info(
|
|
|
- "Cached archive chunk indexes: %d fresh, %d stale, %d need fetching.",
|
|
|
- len(archive_ids & cached_ids),
|
|
|
- len(cached_ids - archive_ids),
|
|
|
- len(archive_ids - cached_ids),
|
|
|
- )
|
|
|
- # deallocates old hashindex, creates empty hashindex:
|
|
|
- chunk_idx.clear()
|
|
|
- cleanup_outdated(cached_ids - archive_ids)
|
|
|
- # Explicitly set the usable initial hash table capacity to avoid performance issues
|
|
|
- # due to hash table "resonance".
|
|
|
- master_index_capacity = len(self.repository)
|
|
|
- if archive_ids:
|
|
|
- chunk_idx = None if not self.do_cache else ChunkIndex(usable=master_index_capacity)
|
|
|
- pi = ProgressIndicatorPercent(
|
|
|
- total=len(archive_ids),
|
|
|
- step=0.1,
|
|
|
- msg="%3.0f%% Syncing chunks index. Processing archive %s.",
|
|
|
- msgid="cache.sync",
|
|
|
- )
|
|
|
- archive_ids_to_names = get_archive_ids_to_names(archive_ids)
|
|
|
- for archive_id, archive_name in archive_ids_to_names.items():
|
|
|
- pi.show(info=[remove_surrogates(archive_name)]) # legacy. borg2 always has pure unicode arch names.
|
|
|
- if self.do_cache:
|
|
|
- if archive_id in cached_ids:
|
|
|
- archive_chunk_idx = read_archive_index(archive_id, archive_name)
|
|
|
- if archive_chunk_idx is None:
|
|
|
- cached_ids.remove(archive_id)
|
|
|
- if archive_id not in cached_ids:
|
|
|
- # Do not make this an else branch; the FileIntegrityError exception handler
|
|
|
- # above can remove *archive_id* from *cached_ids*.
|
|
|
- logger.info("Fetching and building archive index for %s.", archive_name)
|
|
|
- archive_chunk_idx = ChunkIndex()
|
|
|
- fetch_and_build_idx(archive_id, decrypted_repository, archive_chunk_idx)
|
|
|
- logger.debug("Merging into master chunks index.")
|
|
|
- chunk_idx.merge(archive_chunk_idx)
|
|
|
- else:
|
|
|
- chunk_idx = chunk_idx or ChunkIndex(usable=master_index_capacity)
|
|
|
- logger.info("Fetching archive index for %s.", archive_name)
|
|
|
- fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx)
|
|
|
- pi.finish()
|
|
|
- logger.debug(
|
|
|
- "Chunks index sync: processed %s (%d chunks) of metadata.",
|
|
|
- format_file_size(processed_item_metadata_bytes),
|
|
|
- processed_item_metadata_chunks,
|
|
|
- )
|
|
|
- logger.debug(
|
|
|
- "Chunks index sync: compact chunks.archive.d storage saved %s bytes.",
|
|
|
- format_file_size(compact_chunks_archive_saved_space),
|
|
|
- )
|
|
|
- logger.debug("Chunks index sync done.")
|
|
|
- return chunk_idx
|
|
|
-
|
|
|
- # The cache can be used by a command that e.g. only checks against Manifest.Operation.WRITE,
|
|
|
- # which does not have to include all flags from Manifest.Operation.READ.
|
|
|
- # Since the sync will attempt to read archives, check compatibility with Manifest.Operation.READ.
|
|
|
- self.manifest.check_repository_compatibility((Manifest.Operation.READ,))
|
|
|
-
|
|
|
- self.begin_txn()
|
|
|
- with cache_if_remote(self.repository, decrypted_cache=self.repo_objs) as decrypted_repository:
|
|
|
- self.chunks = create_master_idx(self.chunks)
|
|
|
-
|
|
|
- def check_cache_compatibility(self):
|
|
|
- my_features = Manifest.SUPPORTED_REPO_FEATURES
|
|
|
- if self.cache_config.ignored_features & my_features:
|
|
|
- # The cache might not contain references of chunks that need a feature that is mandatory for some operation
|
|
|
- # and which this version supports. To avoid corruption while executing that operation force rebuild.
|
|
|
- return False
|
|
|
- if not self.cache_config.mandatory_features <= my_features:
|
|
|
- # The cache was build with consideration to at least one feature that this version does not understand.
|
|
|
- # This client might misinterpret the cache. Thus force a rebuild.
|
|
|
- return False
|
|
|
- return True
|
|
|
-
|
|
|
- def wipe_cache(self):
|
|
|
- logger.warning("Discarding incompatible or corrupted cache and forcing a cache rebuild")
|
|
|
- archive_path = os.path.join(self.path, "chunks.archive.d")
|
|
|
- if os.path.isdir(archive_path):
|
|
|
- shutil.rmtree(os.path.join(self.path, "chunks.archive.d"))
|
|
|
- os.makedirs(os.path.join(self.path, "chunks.archive.d"))
|
|
|
- self.chunks = ChunkIndex()
|
|
|
- with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd:
|
|
|
- self.chunks.write(fd)
|
|
|
- self.cache_config.integrity["chunks"] = fd.integrity_data
|
|
|
- integrity_data = self._create_empty_files_cache(self.path)
|
|
|
- self.cache_config.integrity[self.files_cache_name()] = integrity_data
|
|
|
- self.cache_config.manifest_id = ""
|
|
|
- self.cache_config._config.set("cache", "manifest", "")
|
|
|
- if not self.cache_config._config.has_section("integrity"):
|
|
|
- self.cache_config._config.add_section("integrity")
|
|
|
- for file, integrity_data in self.cache_config.integrity.items():
|
|
|
- self.cache_config._config.set("integrity", file, integrity_data)
|
|
|
- # This is needed to pass the integrity check later on inside CacheConfig.load()
|
|
|
- self.cache_config._config.set("integrity", "manifest", "")
|
|
|
-
|
|
|
- self.cache_config.ignored_features = set()
|
|
|
- self.cache_config.mandatory_features = set()
|
|
|
- with SaveFile(self.cache_config.config_path) as fd:
|
|
|
- self.cache_config._config.write(fd)
|
|
|
-
|
|
|
- def update_compatibility(self):
|
|
|
- operation_to_features_map = self.manifest.get_all_mandatory_features()
|
|
|
- my_features = Manifest.SUPPORTED_REPO_FEATURES
|
|
|
- repo_features = set()
|
|
|
- for operation, features in operation_to_features_map.items():
|
|
|
- repo_features.update(features)
|
|
|
-
|
|
|
- self.cache_config.ignored_features.update(repo_features - my_features)
|
|
|
- self.cache_config.mandatory_features.update(repo_features & my_features)
|
|
|
-
|
|
|
-
|
|
|
class AdHocWithFilesCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin):
|
|
|
"""
|
|
|
Like AdHocCache, but with a files cache.
|
|
@@ -1326,10 +869,10 @@ class AdHocCache(CacheStatsMixin, ChunksMixin):
|
|
|
"""
|
|
|
Ad-hoc, non-persistent cache.
|
|
|
|
|
|
- Compared to the standard LocalCache the AdHocCache does not maintain accurate reference count,
|
|
|
- nor does it provide a files cache (which would require persistence). Chunks that were not added
|
|
|
- during the current AdHocCache lifetime won't have correct size set (0 bytes) and will
|
|
|
- have an infinite reference count (MAX_VALUE).
|
|
|
+ The AdHocCache does not maintain accurate reference count, nor does it provide a files cache
|
|
|
+ (which would require persistence).
|
|
|
+ Chunks that were not added during the current AdHocCache lifetime won't have correct size set
|
|
|
+ (0 bytes) and will have an infinite reference count (MAX_VALUE).
|
|
|
"""
|
|
|
|
|
|
str_format = """\
|