|
@@ -35,7 +35,7 @@ from .helpers import ProgressIndicatorPercent, log_multi
|
|
|
from .helpers import PathPrefixPattern, FnmatchPattern
|
|
|
from .helpers import consume
|
|
|
from .helpers import CompressionDecider1, CompressionDecider2, CompressionSpec
|
|
|
-from .item import Item
|
|
|
+from .item import Item, ArchiveItem
|
|
|
from .key import key_factory
|
|
|
from .platform import acl_get, acl_set, set_flags, get_flags, swidth
|
|
|
from .remote import cache_if_remote
|
|
@@ -277,29 +277,28 @@ class Archive:
|
|
|
|
|
|
def _load_meta(self, id):
|
|
|
_, data = self.key.decrypt(id, self.repository.get(id))
|
|
|
- metadata = msgpack.unpackb(data)
|
|
|
- if metadata[b'version'] != 1:
|
|
|
+ metadata = ArchiveItem(internal_dict=msgpack.unpackb(data))
|
|
|
+ if metadata.version != 1:
|
|
|
raise Exception('Unknown archive metadata version')
|
|
|
return metadata
|
|
|
|
|
|
def load(self, id):
|
|
|
self.id = id
|
|
|
self.metadata = self._load_meta(self.id)
|
|
|
- decode_dict(self.metadata, ARCHIVE_TEXT_KEYS)
|
|
|
- self.metadata[b'cmdline'] = [safe_decode(arg) for arg in self.metadata[b'cmdline']]
|
|
|
- self.name = self.metadata[b'name']
|
|
|
+ self.metadata.cmdline = [safe_decode(arg) for arg in self.metadata.cmdline]
|
|
|
+ self.name = self.metadata.name
|
|
|
|
|
|
@property
|
|
|
def ts(self):
|
|
|
"""Timestamp of archive creation (start) in UTC"""
|
|
|
- ts = self.metadata[b'time']
|
|
|
+ ts = self.metadata.time
|
|
|
return parse_timestamp(ts)
|
|
|
|
|
|
@property
|
|
|
def ts_end(self):
|
|
|
"""Timestamp of archive creation (end) in UTC"""
|
|
|
# fall back to time if there is no time_end present in metadata
|
|
|
- ts = self.metadata.get(b'time_end') or self.metadata[b'time']
|
|
|
+ ts = self.metadata.get('time_end') or self.metadata.time
|
|
|
return parse_timestamp(ts)
|
|
|
|
|
|
@property
|
|
@@ -336,7 +335,7 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
|
return filter(item) if filter else True
|
|
|
|
|
|
def iter_items(self, filter=None, preload=False):
|
|
|
- for item in self.pipeline.unpack_many(self.metadata[b'items'], preload=preload,
|
|
|
+ for item in self.pipeline.unpack_many(self.metadata.items, preload=preload,
|
|
|
filter=lambda item: self.item_filter(item, filter)):
|
|
|
yield item
|
|
|
|
|
@@ -366,7 +365,7 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
|
metadata = {
|
|
|
'version': 1,
|
|
|
'name': name,
|
|
|
- 'comment': comment,
|
|
|
+ 'comment': comment or '',
|
|
|
'items': self.items_buffer.chunks,
|
|
|
'cmdline': sys.argv,
|
|
|
'hostname': socket.gethostname(),
|
|
@@ -376,10 +375,11 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
|
'chunker_params': self.chunker_params,
|
|
|
}
|
|
|
metadata.update(additional_metadata or {})
|
|
|
- data = msgpack.packb(StableDict(metadata), unicode_errors='surrogateescape')
|
|
|
+ metadata = ArchiveItem(metadata)
|
|
|
+ data = msgpack.packb(metadata.as_dict(), unicode_errors='surrogateescape')
|
|
|
self.id = self.key.id_hash(data)
|
|
|
self.cache.add_chunk(self.id, Chunk(data), self.stats)
|
|
|
- self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
|
|
|
+ self.manifest.archives[name] = {'id': self.id, 'time': metadata.time}
|
|
|
self.manifest.write()
|
|
|
self.repository.commit()
|
|
|
self.cache.commit()
|
|
@@ -400,7 +400,7 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
|
cache.begin_txn()
|
|
|
stats = Statistics()
|
|
|
add(self.id)
|
|
|
- for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
|
|
|
+ for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)):
|
|
|
add(id)
|
|
|
_, data = self.key.decrypt(id, chunk)
|
|
|
unpacker.feed(data)
|
|
@@ -588,12 +588,12 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
|
raise
|
|
|
|
|
|
def set_meta(self, key, value):
|
|
|
- metadata = StableDict(self._load_meta(self.id))
|
|
|
- metadata[key] = value
|
|
|
- data = msgpack.packb(metadata, unicode_errors='surrogateescape')
|
|
|
+ metadata = self._load_meta(self.id)
|
|
|
+ setattr(metadata, key, value)
|
|
|
+ data = msgpack.packb(metadata.as_dict(), unicode_errors='surrogateescape')
|
|
|
new_id = self.key.id_hash(data)
|
|
|
self.cache.add_chunk(new_id, Chunk(data), self.stats)
|
|
|
- self.manifest.archives[self.name] = {'id': new_id, 'time': metadata[b'time']}
|
|
|
+ self.manifest.archives[self.name] = {'id': new_id, 'time': metadata.time}
|
|
|
self.cache.chunk_decref(self.id, self.stats)
|
|
|
self.id = new_id
|
|
|
|
|
@@ -602,7 +602,7 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
|
raise self.AlreadyExists(name)
|
|
|
oldname = self.name
|
|
|
self.name = name
|
|
|
- self.set_meta(b'name', name)
|
|
|
+ self.set_meta('name', name)
|
|
|
del self.manifest.archives[oldname]
|
|
|
|
|
|
def delete(self, stats, progress=False, forced=False):
|
|
@@ -625,7 +625,7 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
|
error = False
|
|
|
try:
|
|
|
unpacker = msgpack.Unpacker(use_list=False)
|
|
|
- items_ids = self.metadata[b'items']
|
|
|
+ items_ids = self.metadata.items
|
|
|
pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", same_line=True)
|
|
|
for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
|
|
|
if progress:
|
|
@@ -1075,8 +1075,9 @@ class ArchiveChecker:
|
|
|
except (TypeError, ValueError, StopIteration):
|
|
|
continue
|
|
|
if valid_archive(archive):
|
|
|
- logger.info('Found archive %s', archive[b'name'].decode('utf-8'))
|
|
|
- manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
|
|
|
+ archive = ArchiveItem(internal_dict=archive)
|
|
|
+ logger.info('Found archive %s', archive.name)
|
|
|
+ manifest.archives[archive.name] = {b'id': chunk_id, b'time': archive.time}
|
|
|
logger.info('Manifest rebuild complete.')
|
|
|
return manifest
|
|
|
|
|
@@ -1187,7 +1188,7 @@ class ArchiveChecker:
|
|
|
return required_item_keys.issubset(keys) and keys.issubset(item_keys)
|
|
|
|
|
|
i = 0
|
|
|
- for state, items in groupby(archive[b'items'], missing_chunk_detector):
|
|
|
+ for state, items in groupby(archive.items, missing_chunk_detector):
|
|
|
items = list(items)
|
|
|
if state % 2:
|
|
|
for chunk_id in items:
|
|
@@ -1241,11 +1242,10 @@ class ArchiveChecker:
|
|
|
mark_as_possibly_superseded(archive_id)
|
|
|
cdata = self.repository.get(archive_id)
|
|
|
_, data = self.key.decrypt(archive_id, cdata)
|
|
|
- archive = StableDict(msgpack.unpackb(data))
|
|
|
- if archive[b'version'] != 1:
|
|
|
+ archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
|
|
|
+ if archive.version != 1:
|
|
|
raise Exception('Unknown archive metadata version')
|
|
|
- decode_dict(archive, ARCHIVE_TEXT_KEYS)
|
|
|
- archive[b'cmdline'] = [safe_decode(arg) for arg in archive[b'cmdline']]
|
|
|
+ archive.cmdline = [safe_decode(arg) for arg in archive.cmdline]
|
|
|
items_buffer = ChunkBuffer(self.key)
|
|
|
items_buffer.write_chunk = add_callback
|
|
|
for item in robust_iterator(archive):
|
|
@@ -1253,10 +1253,10 @@ class ArchiveChecker:
|
|
|
verify_file_chunks(item)
|
|
|
items_buffer.add(item)
|
|
|
items_buffer.flush(flush=True)
|
|
|
- for previous_item_id in archive[b'items']:
|
|
|
+ for previous_item_id in archive.items:
|
|
|
mark_as_possibly_superseded(previous_item_id)
|
|
|
- archive[b'items'] = items_buffer.chunks
|
|
|
- data = msgpack.packb(archive, unicode_errors='surrogateescape')
|
|
|
+ archive.items = items_buffer.chunks
|
|
|
+ data = msgpack.packb(archive.as_dict(), unicode_errors='surrogateescape')
|
|
|
new_archive_id = self.key.id_hash(data)
|
|
|
cdata = self.key.encrypt(Chunk(data))
|
|
|
add_reference(new_archive_id, len(data), len(cdata), cdata)
|
|
@@ -1483,9 +1483,9 @@ class ArchiveRecreater:
|
|
|
if completed:
|
|
|
timestamp = archive.ts.replace(tzinfo=None)
|
|
|
if comment is None:
|
|
|
- comment = archive.metadata.get(b'comment', '')
|
|
|
+ comment = archive.metadata.get('comment', '')
|
|
|
target.save(timestamp=timestamp, comment=comment, additional_metadata={
|
|
|
- 'cmdline': archive.metadata[b'cmdline'],
|
|
|
+ 'cmdline': archive.metadata.cmdline,
|
|
|
'recreate_cmdline': sys.argv,
|
|
|
})
|
|
|
if replace_original:
|
|
@@ -1554,7 +1554,7 @@ class ArchiveRecreater:
|
|
|
if not target:
|
|
|
target = self.create_target_archive(target_name)
|
|
|
# If the archives use the same chunker params, then don't rechunkify
|
|
|
- target.recreate_rechunkify = tuple(archive.metadata.get(b'chunker_params')) != self.chunker_params
|
|
|
+ target.recreate_rechunkify = tuple(archive.metadata.get('chunker_params')) != self.chunker_params
|
|
|
return target, resume_from
|
|
|
|
|
|
def try_resume(self, archive, target_name):
|
|
@@ -1573,7 +1573,7 @@ class ArchiveRecreater:
|
|
|
return target, resume_from
|
|
|
|
|
|
def incref_partial_chunks(self, source_archive, target_archive):
|
|
|
- target_archive.recreate_partial_chunks = source_archive.metadata.get(b'recreate_partial_chunks', [])
|
|
|
+ target_archive.recreate_partial_chunks = source_archive.metadata.get('recreate_partial_chunks', [])
|
|
|
for chunk_id, size, csize in target_archive.recreate_partial_chunks:
|
|
|
if not self.cache.seen_chunk(chunk_id):
|
|
|
try:
|
|
@@ -1606,8 +1606,8 @@ class ArchiveRecreater:
|
|
|
return item
|
|
|
|
|
|
def can_resume(self, archive, old_target, target_name):
|
|
|
- resume_id = old_target.metadata[b'recreate_source_id']
|
|
|
- resume_args = [safe_decode(arg) for arg in old_target.metadata[b'recreate_args']]
|
|
|
+ resume_id = old_target.metadata.recreate_source_id
|
|
|
+ resume_args = [safe_decode(arg) for arg in old_target.metadata.recreate_args]
|
|
|
if resume_id != archive.id:
|
|
|
logger.warning('Source archive changed, will discard %s and start over', target_name)
|
|
|
logger.warning('Saved fingerprint: %s', bin_to_hex(resume_id))
|