|
@@ -6,11 +6,10 @@ import os
|
|
|
import socket
|
|
|
import stat
|
|
|
import sys
|
|
|
-from itertools import izip
|
|
|
+from os.path import dirname
|
|
|
from xattr import xattr, XATTR_NOFOLLOW
|
|
|
|
|
|
-from . import NS_ARCHIVE_METADATA, NS_ARCHIVE_ITEMS, NS_ARCHIVE_CHUNKS, NS_CHUNK, \
|
|
|
- PACKET_ARCHIVE_METADATA, PACKET_ARCHIVE_ITEMS, PACKET_ARCHIVE_CHUNKS, PACKET_CHUNK
|
|
|
+from . import NS_ARCHIVE_METADATA, NS_CHUNK
|
|
|
from ._speedups import chunkify
|
|
|
from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError
|
|
|
|
|
@@ -26,22 +25,24 @@ class Archive(object):
|
|
|
class DoesNotExist(Exception):
|
|
|
pass
|
|
|
|
|
|
- def __init__(self, store, keychain, name=None):
|
|
|
- self.keychain = keychain
|
|
|
+ def __init__(self, store, key, name=None, cache=None):
|
|
|
+ self.key = key
|
|
|
self.store = store
|
|
|
- self.items = []
|
|
|
+ self.cache = cache
|
|
|
+ self.items = ''
|
|
|
+ self.items_refs = []
|
|
|
+ self.items_prefix = ''
|
|
|
self.items_ids = []
|
|
|
self.hard_links = {}
|
|
|
if name:
|
|
|
- self.load(self.keychain.id_hash(name))
|
|
|
+ self.load(self.key.archive_hash(name))
|
|
|
|
|
|
def load(self, id):
|
|
|
self.id = id
|
|
|
try:
|
|
|
- kind, data, self.hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
|
|
|
+ data, self.hash = self.key.decrypt(self.store.get(NS_ARCHIVE_METADATA, self.id))
|
|
|
except self.store.DoesNotExist:
|
|
|
raise self.DoesNotExist
|
|
|
- assert kind == PACKET_ARCHIVE_METADATA
|
|
|
self.metadata = msgpack.unpackb(data)
|
|
|
assert self.metadata['version'] == 1
|
|
|
|
|
@@ -51,80 +52,90 @@ class Archive(object):
|
|
|
t, f = self.metadata['time'].split('.', 1)
|
|
|
return datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + timedelta(seconds=float('.' + f))
|
|
|
|
|
|
- def get_chunks(self):
|
|
|
- for id in self.metadata['chunks_ids']:
|
|
|
- magic, data, hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_CHUNKS, id))
|
|
|
- assert magic == PACKET_ARCHIVE_CHUNKS
|
|
|
- assert hash == id
|
|
|
- chunks = msgpack.unpackb(data)
|
|
|
- for chunk in chunks:
|
|
|
- yield chunk
|
|
|
-
|
|
|
def get_items(self):
|
|
|
- for id in self.metadata['items_ids']:
|
|
|
- magic, data, items_hash = self.keychain.decrypt(self.store.get(NS_ARCHIVE_ITEMS, id))
|
|
|
- assert magic == PACKET_ARCHIVE_ITEMS
|
|
|
- assert items_hash == id
|
|
|
- items = msgpack.unpackb(data)
|
|
|
- for item in items:
|
|
|
+ unpacker = msgpack.Unpacker()
|
|
|
+ for id, size, csize in self.metadata['items']:
|
|
|
+ data, items_hash = self.key.decrypt(self.store.get(NS_CHUNK, id))
|
|
|
+ assert self.key.id_hash(data) == id
|
|
|
+ unpacker.feed(data)
|
|
|
+ for item in unpacker:
|
|
|
yield item
|
|
|
|
|
|
- def add_item(self, item):
|
|
|
- self.items.append(item)
|
|
|
- if len(self.items) > 100000:
|
|
|
+ def add_item(self, item, refs=None):
|
|
|
+ data = msgpack.packb(item)
|
|
|
+ prefix = dirname(item['path'])
|
|
|
+ if self.items_prefix and self.items_prefix != prefix:
|
|
|
self.flush_items()
|
|
|
+ if refs:
|
|
|
+ self.items_refs += refs
|
|
|
+ self.items += data
|
|
|
+ self.items_prefix = prefix
|
|
|
|
|
|
def flush_items(self):
|
|
|
- data, hash = self.keychain.encrypt(PACKET_ARCHIVE_ITEMS, msgpack.packb(self.items))
|
|
|
- self.store.put(NS_ARCHIVE_ITEMS, hash, data)
|
|
|
- self.items_ids.append(hash)
|
|
|
- self.items = []
|
|
|
-
|
|
|
- def save_chunks(self, cache):
|
|
|
- chunks = []
|
|
|
- ids = []
|
|
|
- def flush(chunks):
|
|
|
- data, hash = self.keychain.encrypt(PACKET_ARCHIVE_CHUNKS, msgpack.packb(chunks))
|
|
|
- self.store.put(NS_ARCHIVE_CHUNKS, hash, data)
|
|
|
- ids.append(hash)
|
|
|
- for id, (count, size) in cache.chunks.iteritems():
|
|
|
- if count > 1000000:
|
|
|
- chunks.append((id, size))
|
|
|
- if len(chunks) > 100000:
|
|
|
- flush(chunks)
|
|
|
- chunks = []
|
|
|
- flush(chunks)
|
|
|
- return ids
|
|
|
+ if not self.items:
|
|
|
+ return
|
|
|
+ id = self.key.id_hash(self.items)
|
|
|
+ if self.cache.seen_chunk(id):
|
|
|
+ self.items_ids.append(self.cache.chunk_incref(id))
|
|
|
+ for id in self.items_refs:
|
|
|
+ self.cache.chunk_decref(id)
|
|
|
+ else:
|
|
|
+ self.items_ids.append(self.cache.add_chunk(id, self.items))
|
|
|
+ self.items = ''
|
|
|
+ self.items_refs = []
|
|
|
+ self.items_prefix = ''
|
|
|
|
|
|
def save(self, name, cache):
|
|
|
- self.id = self.keychain.id_hash(name)
|
|
|
- chunks_ids = self.save_chunks(cache)
|
|
|
+ self.id = self.key.archive_hash(name)
|
|
|
self.flush_items()
|
|
|
metadata = {
|
|
|
'version': 1,
|
|
|
'name': name,
|
|
|
- 'chunks_ids': chunks_ids,
|
|
|
- 'items_ids': self.items_ids,
|
|
|
+ 'items': self.items_ids,
|
|
|
'cmdline': sys.argv,
|
|
|
'hostname': socket.gethostname(),
|
|
|
'username': getuser(),
|
|
|
'time': datetime.utcnow().isoformat(),
|
|
|
}
|
|
|
- data, self.hash = self.keychain.encrypt(PACKET_ARCHIVE_METADATA, msgpack.packb(metadata))
|
|
|
+ data, self.hash = self.key.encrypt(msgpack.packb(metadata))
|
|
|
self.store.put(NS_ARCHIVE_METADATA, self.id, data)
|
|
|
self.store.commit()
|
|
|
cache.commit()
|
|
|
|
|
|
- def stats(self, cache):
|
|
|
- osize = csize = usize = 0
|
|
|
+ def get_chunks(self):
|
|
|
for item in self.get_items():
|
|
|
- if stat.S_ISREG(item['mode']) and not 'source' in item:
|
|
|
- osize += item['size']
|
|
|
- for id, size in self.get_chunks():
|
|
|
- csize += size
|
|
|
- if cache.seen_chunk(id) == 1:
|
|
|
- usize += size
|
|
|
- return osize, csize, usize
|
|
|
+ try:
|
|
|
+ for chunk in item['chunks']:
|
|
|
+ yield chunk
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+
|
|
|
+ def stats(self, cache):
|
|
|
+ # This function is a bit evil since it abuses the cache to calculate
|
|
|
+ # the stats. The cache transaction must be rolled back afterwards
|
|
|
+ unpacker = msgpack.Unpacker()
|
|
|
+ cache.begin_txn()
|
|
|
+ osize = zsize = usize = 0
|
|
|
+ for id, size, csize in self.metadata['items']:
|
|
|
+ osize += size
|
|
|
+ zsize += csize
|
|
|
+ unique = self.cache.seen_chunk(id) == 1
|
|
|
+ if unique:
|
|
|
+ usize += csize
|
|
|
+ data, items_hash = self.key.decrypt(self.store.get(NS_CHUNK, id))
|
|
|
+ assert self.key.id_hash(data) == id
|
|
|
+ unpacker.feed(data)
|
|
|
+ for item in unpacker:
|
|
|
+ try:
|
|
|
+ for id, size, csize in item['chunks']:
|
|
|
+ osize += size
|
|
|
+ zsize += csize
|
|
|
+ if unique and self.cache.seen_chunk(id) == 1:
|
|
|
+ usize += csize
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+ cache.rollback()
|
|
|
+ return osize, zsize, usize
|
|
|
|
|
|
def extract_item(self, item, dest=None, start_cb=None):
|
|
|
dest = dest or os.getcwdu()
|
|
@@ -163,14 +174,13 @@ class Archive(object):
|
|
|
if i==0:
|
|
|
start_cb(item)
|
|
|
assert not error
|
|
|
- magic, data, hash = self.keychain.decrypt(chunk)
|
|
|
- assert magic == PACKET_CHUNK
|
|
|
- if self.keychain.id_hash(data) != id:
|
|
|
+ data, hash = self.key.decrypt(chunk)
|
|
|
+ if self.key.id_hash(data) != id:
|
|
|
raise IntegrityError('chunk hash did not match')
|
|
|
fd.write(data)
|
|
|
if last:
|
|
|
- self.restore_attrs(path, item)
|
|
|
fd.close()
|
|
|
+ self.restore_attrs(path, item)
|
|
|
|
|
|
fd = open(path, 'wb')
|
|
|
n = len(item['chunks'])
|
|
@@ -179,7 +189,7 @@ class Archive(object):
|
|
|
self.restore_attrs(path, item)
|
|
|
fd.close()
|
|
|
else:
|
|
|
- for i, id in enumerate(item['chunks']):
|
|
|
+ for i, (id, size, csize) in enumerate(item['chunks']):
|
|
|
self.store.get(NS_CHUNK, id, callback=extract_cb, callback_data=(id, i, i==n-1))
|
|
|
|
|
|
else:
|
|
@@ -206,16 +216,15 @@ class Archive(object):
|
|
|
pass
|
|
|
if not symlink:
|
|
|
# FIXME: We should really call futimes here (c extension required)
|
|
|
- os.utime(path, (item['atime'], item['mtime']))
|
|
|
+ os.utime(path, (item['mtime'], item['mtime']))
|
|
|
|
|
|
def verify_file(self, item, start, result):
|
|
|
def verify_chunk(chunk, error, (id, i, last)):
|
|
|
if i == 0:
|
|
|
start(item)
|
|
|
assert not error
|
|
|
- magic, data, hash = self.keychain.decrypt(chunk)
|
|
|
- assert magic == PACKET_CHUNK
|
|
|
- if self.keychain.id_hash(data) != id:
|
|
|
+ data, hash = self.key.decrypt(chunk)
|
|
|
+ if self.key.id_hash(data) != id:
|
|
|
result(item, False)
|
|
|
elif last:
|
|
|
result(item, True)
|
|
@@ -224,17 +233,24 @@ class Archive(object):
|
|
|
start(item)
|
|
|
result(item, True)
|
|
|
else:
|
|
|
- for i, id in enumerate(item['chunks']):
|
|
|
+ for i, (id, size, csize) in enumerate(item['chunks']):
|
|
|
self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=(id, i, i==n-1))
|
|
|
|
|
|
def delete(self, cache):
|
|
|
- for id, size in self.get_chunks():
|
|
|
- cache.chunk_decref(id)
|
|
|
+ unpacker = msgpack.Unpacker()
|
|
|
+ for id, size, csize in self.metadata['items']:
|
|
|
+ if self.cache.seen_chunk(id) == 1:
|
|
|
+ data, items_hash = self.key.decrypt(self.store.get(NS_CHUNK, id))
|
|
|
+ assert self.key.id_hash(data) == id
|
|
|
+ unpacker.feed(data)
|
|
|
+ for item in unpacker:
|
|
|
+ try:
|
|
|
+ for chunk_id, size, csize in item['chunks']:
|
|
|
+ self.cache.chunk_decref(chunk_id)
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+ self.cache.chunk_decref(id)
|
|
|
self.store.delete(NS_ARCHIVE_METADATA, self.id)
|
|
|
- for id in self.metadata['chunks_ids']:
|
|
|
- self.store.delete(NS_ARCHIVE_CHUNKS, id)
|
|
|
- for id in self.metadata['items_ids']:
|
|
|
- self.store.delete(NS_ARCHIVE_ITEMS, id)
|
|
|
self.store.commit()
|
|
|
cache.commit()
|
|
|
|
|
@@ -243,7 +259,7 @@ class Archive(object):
|
|
|
'mode': st.st_mode,
|
|
|
'uid': st.st_uid, 'user': uid2user(st.st_uid),
|
|
|
'gid': st.st_gid, 'group': gid2group(st.st_gid),
|
|
|
- 'atime': st.st_atime, 'mtime': st.st_mtime,
|
|
|
+ 'mtime': st.st_mtime,
|
|
|
}
|
|
|
try:
|
|
|
xa = xattr(path, XATTR_NOFOLLOW)
|
|
@@ -287,34 +303,33 @@ class Archive(object):
|
|
|
return
|
|
|
else:
|
|
|
self.hard_links[st.st_ino, st.st_dev] = safe_path
|
|
|
- path_hash = self.keychain.id_hash(path.encode('utf-8'))
|
|
|
- ids, size = cache.file_known_and_unchanged(path_hash, st)
|
|
|
+ path_hash = self.key.id_hash(path.encode('utf-8'))
|
|
|
+ ids = cache.file_known_and_unchanged(path_hash, st)
|
|
|
+ chunks = None
|
|
|
if ids is not None:
|
|
|
# Make sure all ids are available
|
|
|
for id in ids:
|
|
|
if not cache.seen_chunk(id):
|
|
|
- ids = None
|
|
|
break
|
|
|
else:
|
|
|
- for id in ids:
|
|
|
- cache.chunk_incref(id)
|
|
|
+ chunks = [cache.chunk_incref(id) for id in ids]
|
|
|
# Only chunkify the file if needed
|
|
|
- if ids is None:
|
|
|
+ if chunks is None:
|
|
|
with open(path, 'rb') as fd:
|
|
|
- size = 0
|
|
|
- ids = []
|
|
|
+ chunks = []
|
|
|
for chunk in chunkify(fd, CHUNK_SIZE, WINDOW_SIZE,
|
|
|
- self.keychain.get_chunkify_seed()):
|
|
|
- ids.append(cache.add_chunk(self.keychain.id_hash(chunk), chunk))
|
|
|
- size += len(chunk)
|
|
|
+ self.key.chunk_seed):
|
|
|
+ chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk))
|
|
|
+ ids = [id for id, _, _ in chunks]
|
|
|
cache.memorize_file(path_hash, st, ids)
|
|
|
- item = {'path': safe_path, 'chunks': ids, 'size': size}
|
|
|
+ item = {'path': safe_path, 'chunks': chunks}
|
|
|
item.update(self.stat_attrs(st, path))
|
|
|
- self.add_item(item)
|
|
|
+ self.add_item(item, ids)
|
|
|
|
|
|
@staticmethod
|
|
|
- def list_archives(store, keychain):
|
|
|
+ def list_archives(store, key):
|
|
|
for id in list(store.list(NS_ARCHIVE_METADATA)):
|
|
|
- archive = Archive(store, keychain)
|
|
|
+ archive = Archive(store, key)
|
|
|
archive.load(id)
|
|
|
yield archive
|
|
|
+
|