|
@@ -35,7 +35,6 @@ class RepoObj:
|
|
) -> bytes:
|
|
) -> bytes:
|
|
assert isinstance(id, bytes)
|
|
assert isinstance(id, bytes)
|
|
assert isinstance(meta, dict)
|
|
assert isinstance(meta, dict)
|
|
- meta = dict(meta) # make a copy, so call arg is not modified
|
|
|
|
assert isinstance(data, (bytes, memoryview))
|
|
assert isinstance(data, (bytes, memoryview))
|
|
assert compress or size is not None and ctype is not None and clevel is not None
|
|
assert compress or size is not None and ctype is not None and clevel is not None
|
|
if compress:
|
|
if compress:
|
|
@@ -115,14 +114,22 @@ class RepoObj1: # legacy
|
|
def id_hash(self, data: bytes) -> bytes:
|
|
def id_hash(self, data: bytes) -> bytes:
|
|
return self.key.id_hash(data)
|
|
return self.key.id_hash(data)
|
|
|
|
|
|
- def format(self, id: bytes, meta: dict, data: bytes, compress: bool = True, size: int = None) -> bytes:
|
|
|
|
|
|
+ def format(
|
|
|
|
+ self,
|
|
|
|
+ id: bytes,
|
|
|
|
+ meta: dict,
|
|
|
|
+ data: bytes,
|
|
|
|
+ compress: bool = True,
|
|
|
|
+ size: int = None,
|
|
|
|
+ ctype: int = None,
|
|
|
|
+ clevel: int = None,
|
|
|
|
+ ) -> bytes:
|
|
assert isinstance(id, bytes)
|
|
assert isinstance(id, bytes)
|
|
assert meta == {}
|
|
assert meta == {}
|
|
assert isinstance(data, (bytes, memoryview))
|
|
assert isinstance(data, (bytes, memoryview))
|
|
- assert compress or size is not None
|
|
|
|
- assert compress or size is not None
|
|
|
|
|
|
+ assert compress or size is not None and ctype is not None and clevel is not None
|
|
if compress:
|
|
if compress:
|
|
- assert size is None
|
|
|
|
|
|
+ assert size is None or size == len(data)
|
|
meta, data_compressed = self.compressor.compress(meta, data)
|
|
meta, data_compressed = self.compressor.compress(meta, data)
|
|
else:
|
|
else:
|
|
assert isinstance(size, int)
|
|
assert isinstance(size, int)
|
|
@@ -130,6 +137,9 @@ class RepoObj1: # legacy
|
|
data_encrypted = self.key.encrypt(id, data_compressed)
|
|
data_encrypted = self.key.encrypt(id, data_compressed)
|
|
return data_encrypted
|
|
return data_encrypted
|
|
|
|
|
|
|
|
+ def parse_meta(self, id: bytes, cdata: bytes) -> dict:
|
|
|
|
+ raise NotImplementedError("parse_meta is not available for RepoObj1")
|
|
|
|
+
|
|
def parse(self, id: bytes, cdata: bytes, decompress: bool = True) -> tuple[dict, bytes]:
|
|
def parse(self, id: bytes, cdata: bytes, decompress: bool = True) -> tuple[dict, bytes]:
|
|
assert isinstance(id, bytes)
|
|
assert isinstance(id, bytes)
|
|
assert isinstance(cdata, bytes)
|
|
assert isinstance(cdata, bytes)
|