|
@@ -22,8 +22,10 @@ from .helpers import safe_ns
|
|
|
from .helpers import yes, hostname_is_unique
|
|
|
from .helpers import remove_surrogates
|
|
|
from .helpers import ProgressIndicatorPercent, ProgressIndicatorMessage
|
|
|
+from .helpers import set_ec, EXIT_WARNING
|
|
|
from .item import ArchiveItem, ChunkListEntry
|
|
|
from .crypto.key import PlaintextKey
|
|
|
+from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityCheckedFile, FileIntegrityError
|
|
|
from .locking import Lock
|
|
|
from .platform import SaveFile
|
|
|
from .remote import cache_if_remote
|
|
@@ -237,6 +239,8 @@ class CacheConfig:
|
|
|
config.set('cache', 'version', '1')
|
|
|
config.set('cache', 'repository', self.repository.id_str)
|
|
|
config.set('cache', 'manifest', '')
|
|
|
+ config.add_section('integrity')
|
|
|
+ config.set('integrity', 'manifest', '')
|
|
|
with SaveFile(self.config_path) as fd:
|
|
|
config.write(fd)
|
|
|
|
|
@@ -253,6 +257,20 @@ class CacheConfig:
|
|
|
self.manifest_id = unhexlify(self._config.get('cache', 'manifest'))
|
|
|
self.timestamp = self._config.get('cache', 'timestamp', fallback=None)
|
|
|
self.key_type = self._config.get('cache', 'key_type', fallback=None)
|
|
|
+ try:
|
|
|
+ self.integrity = dict(self._config.items('integrity'))
|
|
|
+ if self._config.get('cache', 'manifest') != self.integrity.pop('manifest'):
|
|
|
+ # The cache config file is updated (parsed with ConfigParser, the state of the ConfigParser
|
|
|
+ # is modified and then written out.), not re-created.
|
|
|
+ # Thus, older versions will leave our [integrity] section alone, making the section's data invalid.
|
|
|
+ # Therefore, we also add the manifest ID to this section and
|
|
|
+ # can discern whether an older version interfered by comparing the manifest IDs of this section
|
|
|
+ # and the main [cache] section.
|
|
|
+ self.integrity = {}
|
|
|
+ logger.warning('Cache integrity data not available: old Borg version modified the cache.')
|
|
|
+ except configparser.NoSectionError:
|
|
|
+ logger.debug('Cache integrity: No integrity data found (files, chunks). Cache is from old version.')
|
|
|
+ self.integrity = {}
|
|
|
previous_location = self._config.get('cache', 'previous_location', fallback=None)
|
|
|
if previous_location:
|
|
|
self.previous_location = recanonicalize_relative_location(previous_location, self.repository)
|
|
@@ -263,6 +281,11 @@ class CacheConfig:
|
|
|
if manifest:
|
|
|
self._config.set('cache', 'manifest', manifest.id_str)
|
|
|
self._config.set('cache', 'timestamp', manifest.timestamp)
|
|
|
+ if not self._config.has_section('integrity'):
|
|
|
+ self._config.add_section('integrity')
|
|
|
+ for file, integrity_data in self.integrity.items():
|
|
|
+ self._config.set('integrity', file, integrity_data)
|
|
|
+ self._config.set('integrity', 'manifest', manifest.id_str)
|
|
|
if key:
|
|
|
self._config.set('cache', 'key_type', str(key.TYPE))
|
|
|
self._config.set('cache', 'previous_location', self.repository._location.canonical_path())
|
|
@@ -392,14 +415,16 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
|
|
|
with open(os.path.join(self.path, 'README'), 'w') as fd:
|
|
|
fd.write(CACHE_README)
|
|
|
self.cache_config.create()
|
|
|
- ChunkIndex().write(os.path.join(self.path, 'chunks').encode('utf-8'))
|
|
|
+ ChunkIndex().write(os.path.join(self.path, 'chunks'))
|
|
|
os.makedirs(os.path.join(self.path, 'chunks.archive.d'))
|
|
|
with SaveFile(os.path.join(self.path, 'files'), binary=True) as fd:
|
|
|
pass # empty file
|
|
|
|
|
|
def _do_open(self):
|
|
|
self.cache_config.load()
|
|
|
- self.chunks = ChunkIndex.read(os.path.join(self.path, 'chunks').encode('utf-8'))
|
|
|
+ with IntegrityCheckedFile(path=os.path.join(self.path, 'chunks'), write=False,
|
|
|
+ integrity_data=self.cache_config.integrity.get('chunks')) as fd:
|
|
|
+ self.chunks = ChunkIndex.read(fd)
|
|
|
self.files = None
|
|
|
|
|
|
def open(self):
|
|
@@ -417,7 +442,9 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
|
|
|
self.files = {}
|
|
|
self._newest_mtime = None
|
|
|
logger.debug('Reading files cache ...')
|
|
|
- with open(os.path.join(self.path, 'files'), 'rb') as fd:
|
|
|
+
|
|
|
+ with IntegrityCheckedFile(path=os.path.join(self.path, 'files'), write=False,
|
|
|
+ integrity_data=self.cache_config.integrity.get('files')) as fd:
|
|
|
u = msgpack.Unpacker(use_list=True)
|
|
|
while True:
|
|
|
data = fd.read(64 * 1024)
|
|
@@ -458,7 +485,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
|
|
|
self._newest_mtime = 2 ** 63 - 1 # nanoseconds, good until y2262
|
|
|
ttl = int(os.environ.get('BORG_FILES_CACHE_TTL', 20))
|
|
|
pi.output('Saving files cache')
|
|
|
- with SaveFile(os.path.join(self.path, 'files'), binary=True) as fd:
|
|
|
+ with IntegrityCheckedFile(path=os.path.join(self.path, 'files'), write=True) as fd:
|
|
|
for path_hash, item in self.files.items():
|
|
|
# Only keep files seen in this backup that are older than newest mtime seen in this backup -
|
|
|
# this is to avoid issues with filesystem snapshots and mtime granularity.
|
|
@@ -467,10 +494,13 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
|
|
|
if entry.age == 0 and bigint_to_int(entry.mtime) < self._newest_mtime or \
|
|
|
entry.age > 0 and entry.age < ttl:
|
|
|
msgpack.pack((path_hash, entry), fd)
|
|
|
+ self.cache_config.integrity['files'] = fd.integrity_data
|
|
|
+ pi.output('Saving chunks cache')
|
|
|
+ with IntegrityCheckedFile(path=os.path.join(self.path, 'chunks'), write=True) as fd:
|
|
|
+ self.chunks.write(fd)
|
|
|
+ self.cache_config.integrity['chunks'] = fd.integrity_data
|
|
|
pi.output('Saving cache config')
|
|
|
self.cache_config.save(self.manifest, self.key)
|
|
|
- pi.output('Saving chunks cache')
|
|
|
- self.chunks.write(os.path.join(self.path, 'chunks').encode('utf-8'))
|
|
|
os.rename(os.path.join(self.path, 'txn.active'),
|
|
|
os.path.join(self.path, 'txn.tmp'))
|
|
|
shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
|
|
@@ -510,7 +540,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
|
|
|
def mkpath(id, suffix=''):
|
|
|
id_hex = bin_to_hex(id)
|
|
|
path = os.path.join(archive_path, id_hex + suffix)
|
|
|
- return path.encode('utf-8')
|
|
|
+ return path
|
|
|
|
|
|
def cached_archives():
|
|
|
if self.do_cache:
|
|
@@ -525,7 +555,14 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
|
|
|
|
|
|
def cleanup_outdated(ids):
|
|
|
for id in ids:
|
|
|
- os.unlink(mkpath(id))
|
|
|
+ cleanup_cached_archive(id)
|
|
|
+
|
|
|
+ def cleanup_cached_archive(id):
|
|
|
+ os.unlink(mkpath(id))
|
|
|
+ try:
|
|
|
+ os.unlink(mkpath(id) + '.integrity')
|
|
|
+ except FileNotFoundError:
|
|
|
+ pass
|
|
|
|
|
|
def fetch_and_build_idx(archive_id, repository, key, chunk_idx):
|
|
|
cdata = repository.get(archive_id)
|
|
@@ -542,14 +579,16 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
|
|
|
for item in unpacker:
|
|
|
if not isinstance(item, dict):
|
|
|
logger.error('Error: Did not get expected metadata dict - archive corrupted!')
|
|
|
- continue
|
|
|
+ continue # XXX: continue?!
|
|
|
for chunk_id, size, csize in item.get(b'chunks', []):
|
|
|
chunk_idx.add(chunk_id, 1, size, csize)
|
|
|
if self.do_cache:
|
|
|
fn = mkpath(archive_id)
|
|
|
fn_tmp = mkpath(archive_id, suffix='.tmp')
|
|
|
try:
|
|
|
- chunk_idx.write(fn_tmp)
|
|
|
+ with DetachedIntegrityCheckedFile(path=fn_tmp, write=True,
|
|
|
+ filename=bin_to_hex(archive_id)) as fd:
|
|
|
+ chunk_idx.write(fd)
|
|
|
except Exception:
|
|
|
os.unlink(fn_tmp)
|
|
|
else:
|
|
@@ -564,9 +603,9 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
|
|
|
logger.info('Synchronizing chunks cache...')
|
|
|
cached_ids = cached_archives()
|
|
|
archive_ids = repo_archives()
|
|
|
- logger.info('Archives: %d, w/ cached Idx: %d, w/ outdated Idx: %d, w/o cached Idx: %d.' % (
|
|
|
+ logger.info('Archives: %d, w/ cached Idx: %d, w/ outdated Idx: %d, w/o cached Idx: %d.',
|
|
|
len(archive_ids), len(cached_ids),
|
|
|
- len(cached_ids - archive_ids), len(archive_ids - cached_ids), ))
|
|
|
+ len(cached_ids - archive_ids), len(archive_ids - cached_ids))
|
|
|
# deallocates old hashindex, creates empty hashindex:
|
|
|
chunk_idx.clear()
|
|
|
cleanup_outdated(cached_ids - archive_ids)
|
|
@@ -583,10 +622,20 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
|
|
|
if self.do_cache:
|
|
|
if archive_id in cached_ids:
|
|
|
archive_chunk_idx_path = mkpath(archive_id)
|
|
|
- logger.info("Reading cached archive chunk index for %s ..." % archive_name)
|
|
|
- archive_chunk_idx = ChunkIndex.read(archive_chunk_idx_path)
|
|
|
- else:
|
|
|
- logger.info('Fetching and building archive index for %s ...' % archive_name)
|
|
|
+ logger.info("Reading cached archive chunk index for %s ...", archive_name)
|
|
|
+ try:
|
|
|
+ with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path, write=False) as fd:
|
|
|
+ archive_chunk_idx = ChunkIndex.read(fd)
|
|
|
+ except FileIntegrityError as fie:
|
|
|
+ logger.error('Cached archive chunk index of %s is corrupted: %s', archive_name, fie)
|
|
|
+ # Delete it and fetch a new index
|
|
|
+ cleanup_cached_archive(archive_id)
|
|
|
+ cached_ids.remove(archive_id)
|
|
|
+ set_ec(EXIT_WARNING)
|
|
|
+ if archive_id not in cached_ids:
|
|
|
+ # Do not make this an else branch; the FileIntegrityError exception handler
|
|
|
+ # above can remove *archive_id* from *cached_ids*.
|
|
|
+ logger.info('Fetching and building archive index for %s ...', archive_name)
|
|
|
archive_chunk_idx = ChunkIndex()
|
|
|
fetch_and_build_idx(archive_id, repository, self.key, archive_chunk_idx)
|
|
|
logger.info("Merging into master chunks index ...")
|
|
@@ -599,7 +648,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
|
|
|
chunk_idx.merge(archive_chunk_idx)
|
|
|
else:
|
|
|
chunk_idx = chunk_idx or ChunkIndex()
|
|
|
- logger.info('Fetching archive index for %s ...' % archive_name)
|
|
|
+ logger.info('Fetching archive index for %s ...', archive_name)
|
|
|
fetch_and_build_idx(archive_id, repository, self.key, chunk_idx)
|
|
|
if self.progress:
|
|
|
pi.finish()
|