|
@@ -18,7 +18,7 @@ from io import BytesIO
|
|
from . import xattr
|
|
from . import xattr
|
|
from .compress import Compressor, COMPR_BUFFER
|
|
from .compress import Compressor, COMPR_BUFFER
|
|
from .constants import * # NOQA
|
|
from .constants import * # NOQA
|
|
-from .helpers import Error, uid2user, user2uid, gid2group, group2gid, \
|
|
|
|
|
|
+from .helpers import Chunk, Error, uid2user, user2uid, gid2group, group2gid, \
|
|
parse_timestamp, to_localtime, format_time, format_timedelta, \
|
|
parse_timestamp, to_localtime, format_time, format_timedelta, \
|
|
Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
|
|
Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
|
|
ProgressIndicatorPercent, ChunkIteratorFileWrapper, remove_surrogates, log_multi, \
|
|
ProgressIndicatorPercent, ChunkIteratorFileWrapper, remove_surrogates, log_multi, \
|
|
@@ -45,7 +45,7 @@ class DownloadPipeline:
|
|
|
|
|
|
def unpack_many(self, ids, filter=None, preload=False):
|
|
def unpack_many(self, ids, filter=None, preload=False):
|
|
unpacker = msgpack.Unpacker(use_list=False)
|
|
unpacker = msgpack.Unpacker(use_list=False)
|
|
- for data in self.fetch_many(ids):
|
|
|
|
|
|
+ for _, data in self.fetch_many(ids):
|
|
unpacker.feed(data)
|
|
unpacker.feed(data)
|
|
items = [decode_dict(item, ITEM_TEXT_KEYS) for item in unpacker]
|
|
items = [decode_dict(item, ITEM_TEXT_KEYS) for item in unpacker]
|
|
if filter:
|
|
if filter:
|
|
@@ -87,7 +87,7 @@ class ChunkBuffer:
|
|
if self.buffer.tell() == 0:
|
|
if self.buffer.tell() == 0:
|
|
return
|
|
return
|
|
self.buffer.seek(0)
|
|
self.buffer.seek(0)
|
|
- chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
|
|
|
|
|
|
+ chunks = list(Chunk(bytes(s)) for s in self.chunker.chunkify(self.buffer))
|
|
self.buffer.seek(0)
|
|
self.buffer.seek(0)
|
|
self.buffer.truncate(0)
|
|
self.buffer.truncate(0)
|
|
# Leave the last partial chunk in the buffer unless flush is True
|
|
# Leave the last partial chunk in the buffer unless flush is True
|
|
@@ -109,7 +109,7 @@ class CacheChunkBuffer(ChunkBuffer):
|
|
self.stats = stats
|
|
self.stats = stats
|
|
|
|
|
|
def write_chunk(self, chunk):
|
|
def write_chunk(self, chunk):
|
|
- id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
|
|
|
|
|
|
+ id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats)
|
|
return id_
|
|
return id_
|
|
|
|
|
|
|
|
|
|
@@ -166,7 +166,7 @@ class Archive:
|
|
self.zeros = b'\0' * (1 << chunker_params[1])
|
|
self.zeros = b'\0' * (1 << chunker_params[1])
|
|
|
|
|
|
def _load_meta(self, id):
|
|
def _load_meta(self, id):
|
|
- data = self.key.decrypt(id, self.repository.get(id))
|
|
|
|
|
|
+ _, data = self.key.decrypt(id, self.repository.get(id))
|
|
metadata = msgpack.unpackb(data)
|
|
metadata = msgpack.unpackb(data)
|
|
if metadata[b'version'] != 1:
|
|
if metadata[b'version'] != 1:
|
|
raise Exception('Unknown archive metadata version')
|
|
raise Exception('Unknown archive metadata version')
|
|
@@ -263,7 +263,7 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
metadata.update(additional_metadata or {})
|
|
metadata.update(additional_metadata or {})
|
|
data = msgpack.packb(StableDict(metadata), unicode_errors='surrogateescape')
|
|
data = msgpack.packb(StableDict(metadata), unicode_errors='surrogateescape')
|
|
self.id = self.key.id_hash(data)
|
|
self.id = self.key.id_hash(data)
|
|
- self.cache.add_chunk(self.id, data, self.stats)
|
|
|
|
|
|
+ self.cache.add_chunk(self.id, Chunk(data), self.stats)
|
|
self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
|
|
self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
|
|
self.manifest.write()
|
|
self.manifest.write()
|
|
self.repository.commit()
|
|
self.repository.commit()
|
|
@@ -287,7 +287,8 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
add(self.id)
|
|
add(self.id)
|
|
for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
|
|
for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
|
|
add(id)
|
|
add(id)
|
|
- unpacker.feed(self.key.decrypt(id, chunk))
|
|
|
|
|
|
+ _, data = self.key.decrypt(id, chunk)
|
|
|
|
+ unpacker.feed(data)
|
|
for item in unpacker:
|
|
for item in unpacker:
|
|
if b'chunks' in item:
|
|
if b'chunks' in item:
|
|
stats.nfiles += 1
|
|
stats.nfiles += 1
|
|
@@ -354,7 +355,7 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
# Extract chunks, since the item which had the chunks was not extracted
|
|
# Extract chunks, since the item which had the chunks was not extracted
|
|
with open(path, 'wb') as fd:
|
|
with open(path, 'wb') as fd:
|
|
ids = [c.id for c in item[b'chunks']]
|
|
ids = [c.id for c in item[b'chunks']]
|
|
- for data in self.pipeline.fetch_many(ids, is_preloaded=True):
|
|
|
|
|
|
+ for _, data in self.pipeline.fetch_many(ids, is_preloaded=True):
|
|
if sparse and self.zeros.startswith(data):
|
|
if sparse and self.zeros.startswith(data):
|
|
# all-zero chunk: create a hole in a sparse file
|
|
# all-zero chunk: create a hole in a sparse file
|
|
fd.seek(len(data), 1)
|
|
fd.seek(len(data), 1)
|
|
@@ -449,7 +450,7 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
metadata[key] = value
|
|
metadata[key] = value
|
|
data = msgpack.packb(metadata, unicode_errors='surrogateescape')
|
|
data = msgpack.packb(metadata, unicode_errors='surrogateescape')
|
|
new_id = self.key.id_hash(data)
|
|
new_id = self.key.id_hash(data)
|
|
- self.cache.add_chunk(new_id, data, self.stats)
|
|
|
|
|
|
+ self.cache.add_chunk(new_id, Chunk(data), self.stats)
|
|
self.manifest.archives[self.name] = {'id': new_id, 'time': metadata[b'time']}
|
|
self.manifest.archives[self.name] = {'id': new_id, 'time': metadata[b'time']}
|
|
self.cache.chunk_decref(self.id, self.stats)
|
|
self.cache.chunk_decref(self.id, self.stats)
|
|
self.id = new_id
|
|
self.id = new_id
|
|
@@ -469,7 +470,8 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
|
|
for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
|
|
if progress:
|
|
if progress:
|
|
pi.show(i)
|
|
pi.show(i)
|
|
- unpacker.feed(self.key.decrypt(items_id, data))
|
|
|
|
|
|
+ _, data = self.key.decrypt(items_id, data)
|
|
|
|
+ unpacker.feed(data)
|
|
self.cache.chunk_decref(items_id, stats)
|
|
self.cache.chunk_decref(items_id, stats)
|
|
for item in unpacker:
|
|
for item in unpacker:
|
|
if b'chunks' in item:
|
|
if b'chunks' in item:
|
|
@@ -531,8 +533,8 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
uid, gid = 0, 0
|
|
uid, gid = 0, 0
|
|
fd = sys.stdin.buffer # binary
|
|
fd = sys.stdin.buffer # binary
|
|
chunks = []
|
|
chunks = []
|
|
- for chunk in self.chunker.chunkify(fd):
|
|
|
|
- chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
|
|
|
|
|
|
+ for data in self.chunker.chunkify(fd):
|
|
|
|
+ chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data), self.stats))
|
|
self.stats.nfiles += 1
|
|
self.stats.nfiles += 1
|
|
t = int_to_bigint(int(time.time()) * 1000000000)
|
|
t = int_to_bigint(int(time.time()) * 1000000000)
|
|
item = {
|
|
item = {
|
|
@@ -588,8 +590,8 @@ Number of files: {0.stats.nfiles}'''.format(
|
|
fh = Archive._open_rb(path)
|
|
fh = Archive._open_rb(path)
|
|
with os.fdopen(fh, 'rb') as fd:
|
|
with os.fdopen(fh, 'rb') as fd:
|
|
chunks = []
|
|
chunks = []
|
|
- for chunk in self.chunker.chunkify(fd, fh):
|
|
|
|
- chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
|
|
|
|
|
|
+ for data in self.chunker.chunkify(fd, fh):
|
|
|
|
+ chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data), self.stats))
|
|
if self.show_progress:
|
|
if self.show_progress:
|
|
self.stats.show_progress(item=item, dt=0.2)
|
|
self.stats.show_progress(item=item, dt=0.2)
|
|
cache.memorize_file(path_hash, st, [c.id for c in chunks])
|
|
cache.memorize_file(path_hash, st, [c.id for c in chunks])
|
|
@@ -735,7 +737,7 @@ class ArchiveChecker:
|
|
manifest = Manifest(self.key, self.repository)
|
|
manifest = Manifest(self.key, self.repository)
|
|
for chunk_id, _ in self.chunks.iteritems():
|
|
for chunk_id, _ in self.chunks.iteritems():
|
|
cdata = self.repository.get(chunk_id)
|
|
cdata = self.repository.get(chunk_id)
|
|
- data = self.key.decrypt(chunk_id, cdata)
|
|
|
|
|
|
+ _, data = self.key.decrypt(chunk_id, cdata)
|
|
# Some basic sanity checks of the payload before feeding it into msgpack
|
|
# Some basic sanity checks of the payload before feeding it into msgpack
|
|
if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
|
|
if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
|
|
continue
|
|
continue
|
|
@@ -766,9 +768,9 @@ class ArchiveChecker:
|
|
self.possibly_superseded.add(id_)
|
|
self.possibly_superseded.add(id_)
|
|
|
|
|
|
def add_callback(chunk):
|
|
def add_callback(chunk):
|
|
- id_ = self.key.id_hash(chunk)
|
|
|
|
|
|
+ id_ = self.key.id_hash(chunk.data)
|
|
cdata = self.key.encrypt(chunk)
|
|
cdata = self.key.encrypt(chunk)
|
|
- add_reference(id_, len(chunk), len(cdata), cdata)
|
|
|
|
|
|
+ add_reference(id_, len(chunk.data), len(cdata), cdata)
|
|
return id_
|
|
return id_
|
|
|
|
|
|
def add_reference(id_, size, csize, cdata=None):
|
|
def add_reference(id_, size, csize, cdata=None):
|
|
@@ -795,7 +797,7 @@ class ArchiveChecker:
|
|
self.error_found = True
|
|
self.error_found = True
|
|
data = bytes(size)
|
|
data = bytes(size)
|
|
chunk_id = self.key.id_hash(data)
|
|
chunk_id = self.key.id_hash(data)
|
|
- cdata = self.key.encrypt(data)
|
|
|
|
|
|
+ cdata = self.key.encrypt(Chunk(data))
|
|
csize = len(cdata)
|
|
csize = len(cdata)
|
|
add_reference(chunk_id, size, csize, cdata)
|
|
add_reference(chunk_id, size, csize, cdata)
|
|
else:
|
|
else:
|
|
@@ -835,7 +837,8 @@ class ArchiveChecker:
|
|
if state > 0:
|
|
if state > 0:
|
|
unpacker.resync()
|
|
unpacker.resync()
|
|
for chunk_id, cdata in zip(items, repository.get_many(items)):
|
|
for chunk_id, cdata in zip(items, repository.get_many(items)):
|
|
- unpacker.feed(self.key.decrypt(chunk_id, cdata))
|
|
|
|
|
|
+ _, data = self.key.decrypt(chunk_id, cdata)
|
|
|
|
+ unpacker.feed(data)
|
|
try:
|
|
try:
|
|
for item in unpacker:
|
|
for item in unpacker:
|
|
if isinstance(item, dict):
|
|
if isinstance(item, dict):
|
|
@@ -872,7 +875,7 @@ class ArchiveChecker:
|
|
continue
|
|
continue
|
|
mark_as_possibly_superseded(archive_id)
|
|
mark_as_possibly_superseded(archive_id)
|
|
cdata = self.repository.get(archive_id)
|
|
cdata = self.repository.get(archive_id)
|
|
- data = self.key.decrypt(archive_id, cdata)
|
|
|
|
|
|
+ _, data = self.key.decrypt(archive_id, cdata)
|
|
archive = StableDict(msgpack.unpackb(data))
|
|
archive = StableDict(msgpack.unpackb(data))
|
|
if archive[b'version'] != 1:
|
|
if archive[b'version'] != 1:
|
|
raise Exception('Unknown archive metadata version')
|
|
raise Exception('Unknown archive metadata version')
|
|
@@ -890,7 +893,7 @@ class ArchiveChecker:
|
|
archive[b'items'] = items_buffer.chunks
|
|
archive[b'items'] = items_buffer.chunks
|
|
data = msgpack.packb(archive, unicode_errors='surrogateescape')
|
|
data = msgpack.packb(archive, unicode_errors='surrogateescape')
|
|
new_archive_id = self.key.id_hash(data)
|
|
new_archive_id = self.key.id_hash(data)
|
|
- cdata = self.key.encrypt(data)
|
|
|
|
|
|
+ cdata = self.key.encrypt(Chunk(data))
|
|
add_reference(new_archive_id, len(data), len(cdata), cdata)
|
|
add_reference(new_archive_id, len(data), len(cdata), cdata)
|
|
info[b'id'] = new_archive_id
|
|
info[b'id'] = new_archive_id
|
|
|
|
|
|
@@ -1045,7 +1048,7 @@ class ArchiveRecreater:
|
|
chunk_iterator = self.create_chunk_iterator(archive, target, item)
|
|
chunk_iterator = self.create_chunk_iterator(archive, target, item)
|
|
consume(chunk_iterator, len(new_chunks))
|
|
consume(chunk_iterator, len(new_chunks))
|
|
for chunk in chunk_iterator:
|
|
for chunk in chunk_iterator:
|
|
- chunk_id = self.key.id_hash(chunk)
|
|
|
|
|
|
+ chunk_id = self.key.id_hash(chunk.data)
|
|
if chunk_id in self.seen_chunks:
|
|
if chunk_id in self.seen_chunks:
|
|
new_chunks.append(self.cache.chunk_incref(chunk_id, target.stats))
|
|
new_chunks.append(self.cache.chunk_incref(chunk_id, target.stats))
|
|
else:
|
|
else:
|
|
@@ -1076,7 +1079,12 @@ class ArchiveRecreater:
|
|
# The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
|
|
# The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
|
|
# (does not load the entire file into memory)
|
|
# (does not load the entire file into memory)
|
|
file = ChunkIteratorFileWrapper(chunk_iterator)
|
|
file = ChunkIteratorFileWrapper(chunk_iterator)
|
|
- chunk_iterator = target.chunker.chunkify(file)
|
|
|
|
|
|
+
|
|
|
|
+ def _chunk_iterator():
|
|
|
|
+ for data in target.chunker.chunkify(file):
|
|
|
|
+ yield Chunk(data)
|
|
|
|
+
|
|
|
|
+ chunk_iterator = _chunk_iterator()
|
|
return chunk_iterator
|
|
return chunk_iterator
|
|
|
|
|
|
def process_partial_chunks(self, target):
|
|
def process_partial_chunks(self, target):
|