Pārlūkot izejas kodu

new read_data param for repository.get() and .get_many()

True (default): return full chunk (client can decrypt meta and data)

False: return enough so client can decrypt only the meta
Thomas Waldmann 2 gadi atpakaļ
vecāks
revīzija
106abbe4d9
3 mainītis faili ar 195 papildinājumiem un 121 dzēšanām
  1. 13 11
      src/borg/remote.py
  2. 36 7
      src/borg/repository.py
  3. 146 103
      src/borg/testsuite/repository.py

+ 13 - 11
src/borg/remote.py

@@ -1001,12 +1001,12 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
     def flags_many(self, ids, mask=0xFFFFFFFF, value=None):
         """actual remoting is done via self.call in the @api decorator"""
 
-    def get(self, id):
-        for resp in self.get_many([id]):
+    def get(self, id, read_data=True):
+        for resp in self.get_many([id], read_data=read_data):
             return resp
 
-    def get_many(self, ids, is_preloaded=False):
-        yield from self.call_many("get", [{"id": id} for id in ids], is_preloaded=is_preloaded)
+    def get_many(self, ids, read_data=True, is_preloaded=False):
+        yield from self.call_many("get", [{"id": id, "read_data": read_data} for id in ids], is_preloaded=is_preloaded)
 
     @api(since=parse_version("1.0.0"))
     def put(self, id, data, wait=True):
@@ -1148,11 +1148,11 @@ class RepositoryNoCache:
     def __exit__(self, exc_type, exc_val, exc_tb):
         self.close()
 
-    def get(self, key):
-        return next(self.get_many([key], cache=False))
+    def get(self, key, read_data=True):
+        return next(self.get_many([key], read_data=read_data, cache=False))
 
-    def get_many(self, keys, cache=True):
-        for key, data in zip(keys, self.repository.get_many(keys)):
+    def get_many(self, keys, read_data=True, cache=True):
+        for key, data in zip(keys, self.repository.get_many(keys, read_data=read_data)):
             yield self.transform(key, data)
 
     def log_instrumentation(self):
@@ -1250,9 +1250,11 @@ class RepositoryCache(RepositoryNoCache):
         self.cache.clear()
         shutil.rmtree(self.basedir)
 
-    def get_many(self, keys, cache=True):
+    def get_many(self, keys, read_data=True, cache=True):
+        # TODO: this currently always requests the full chunk from self.repository (read_data=True).
+        # It could use different cache keys depending on read_data and cache full vs. meta-only chunks.
         unknown_keys = [key for key in keys if key not in self.cache]
-        repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys))
+        repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys, read_data=True))
         for key in keys:
             if key in self.cache:
                 file = self.key_filename(key)
@@ -1269,7 +1271,7 @@ class RepositoryCache(RepositoryNoCache):
                 else:
                     # slow path: eviction during this get_many removed this key from the cache
                     t0 = time.perf_counter()
-                    data = self.repository.get(key)
+                    data = self.repository.get(key, read_data=True)
                     self.slow_lat += time.perf_counter() - t0
                     transformed = self.add_entry(key, data, cache)
                     self.slow_misses += 1

+ 36 - 7
src/borg/repository.py

@@ -25,6 +25,7 @@ from .locking import Lock, LockError, LockErrorT
 from .logger import create_logger
 from .manifest import Manifest
 from .platform import SaveFile, SyncFile, sync_dir, safe_fadvise
+from .repoobj import RepoObj
 from .checksums import crc32, StreamingXXH64
 from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
 
@@ -1268,18 +1269,18 @@ class Repository:
     def flags_many(self, ids, mask=0xFFFFFFFF, value=None):
         return [self.flags(id_, mask, value) for id_ in ids]
 
-    def get(self, id):
+    def get(self, id, read_data=True):
         if not self.index:
             self.index = self.open_index(self.get_transaction_id())
         try:
             in_index = NSIndexEntry(*((self.index[id] + (None,))[:3]))  # legacy: index entries have no size element
-            return self.io.read(in_index.segment, in_index.offset, id, expected_size=in_index.size)
+            return self.io.read(in_index.segment, in_index.offset, id, expected_size=in_index.size, read_data=read_data)
         except KeyError:
             raise self.ObjectNotFound(id, self.path) from None
 
-    def get_many(self, ids, is_preloaded=False):
+    def get_many(self, ids, read_data=True, is_preloaded=False):
         for id_ in ids:
-            yield self.get(id_)
+            yield self.get(id_, read_data=read_data)
 
     def put(self, id, data, wait=True):
         """put a repo object
@@ -1659,13 +1660,12 @@ class LoggedIO:
 
         See the _read() docstring about confidence in the returned data.
         """
-        assert read_data is True  # False is not used (yet)
         if segment == self.segment and self._write_fd:
             self._write_fd.sync()
         fd = self.get_fd(segment)
         fd.seek(offset)
         header = fd.read(self.header_fmt.size)
-        size, tag, key, data = self._read(fd, header, segment, offset, (TAG_PUT2, TAG_PUT), read_data)
+        size, tag, key, data = self._read(fd, header, segment, offset, (TAG_PUT2, TAG_PUT), read_data=read_data)
         if id != key:
             raise IntegrityError(
                 "Invalid segment entry header, is not for wanted id [segment {}, offset {}]".format(segment, offset)
@@ -1686,6 +1686,11 @@ class LoggedIO:
         PUT2 tags, read_data == False: crc32 check (header)
         PUT tags, read_data == True: crc32 check (header+data)
         PUT tags, read_data == False: crc32 check can not be done, all data obtained must be considered informational
+
+        read_data == False behaviour:
+        PUT2 tags: return enough of the chunk so that the client is able to decrypt the metadata,
+                   do not read, but just seek over the data.
+        PUT tags:  return None and just seek over the data.
         """
 
         def check_crc32(wanted, header, *data):
@@ -1746,7 +1751,31 @@ class LoggedIO:
                             f"expected {self.ENTRY_HASH_SIZE}, got {len(entry_hash)} bytes"
                         )
                     check_crc32(crc, header, key, entry_hash)
-                if not read_data:  # seek over data
+                if not read_data:
+                    if tag == TAG_PUT2:
+                        # PUT2 is only used in new repos and they also have different RepoObj layout,
+                        # supporting separately encrypted metadata and data.
+                        # In this case, we return enough bytes so the client can decrypt the metadata
+                        # and seek over the rest (over the encrypted data).
+                        meta_len_size = RepoObj.meta_len_hdr.size
+                        meta_len = fd.read(meta_len_size)
+                        length -= meta_len_size
+                        if len(meta_len) != meta_len_size:
+                            raise IntegrityError(
+                                f"Segment entry meta length short read [segment {segment}, offset {offset}]: "
+                                f"expected {meta_len_size}, got {len(meta_len)} bytes"
+                            )
+                        ml = RepoObj.meta_len_hdr.unpack(meta_len)[0]
+                        meta = fd.read(ml)
+                        length -= ml
+                        if len(meta) != ml:
+                            raise IntegrityError(
+                                f"Segment entry meta short read [segment {segment}, offset {offset}]: "
+                                f"expected {ml}, got {len(meta)} bytes"
+                            )
+                        data = meta_len + meta  # shortened chunk - enough so the client can decrypt the metadata
+                        # we do not have a checksum for this data, but the client's AEAD crypto will check it.
+                    # in any case, we see over the remainder of the chunk
                     oldpos = fd.tell()
                     seeked = fd.seek(length, os.SEEK_CUR) - oldpos
                     if seeked != length:

+ 146 - 103
src/borg/testsuite/repository.py

@@ -15,6 +15,7 @@ from ..helpers import msgpack
 from ..locking import Lock, LockFailed
 from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, handle_remote_line
 from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT2, TAG_PUT, TAG_COMMIT
+from ..repoobj import RepoObj
 from . import BaseTestCase
 from .hashindex import H
 
@@ -22,6 +23,29 @@ from .hashindex import H
 UNSPECIFIED = object()  # for default values where we can't use None
 
 
+def fchunk(data, meta=b""):
+    # create a raw chunk that has valid RepoObj layout, but does not use encryption or compression.
+    meta_len = RepoObj.meta_len_hdr.pack(len(meta))
+    assert isinstance(data, bytes)
+    chunk = meta_len + meta + data
+    return chunk
+
+
+def pchunk(chunk):
+    # parse data and meta from a raw chunk made by fchunk
+    meta_len_size = RepoObj.meta_len_hdr.size
+    meta_len = chunk[:meta_len_size]
+    meta_len = RepoObj.meta_len_hdr.unpack(meta_len)[0]
+    meta = chunk[meta_len_size : meta_len_size + meta_len]
+    data = chunk[meta_len_size + meta_len :]
+    return data, meta
+
+
+def pdchunk(chunk):
+    # parse only data from a raw chunk made by fchunk
+    return pchunk(chunk)[0]
+
+
 class RepositoryTestCaseBase(BaseTestCase):
     key_size = 32
     exclusive = True
@@ -46,12 +70,12 @@ class RepositoryTestCaseBase(BaseTestCase):
         self.repository = self.open(exclusive=exclusive)
 
     def add_keys(self):
-        self.repository.put(H(0), b"foo")
-        self.repository.put(H(1), b"bar")
-        self.repository.put(H(3), b"bar")
+        self.repository.put(H(0), fchunk(b"foo"))
+        self.repository.put(H(1), fchunk(b"bar"))
+        self.repository.put(H(3), fchunk(b"bar"))
         self.repository.commit(compact=False)
-        self.repository.put(H(1), b"bar2")
-        self.repository.put(H(2), b"boo")
+        self.repository.put(H(1), fchunk(b"bar2"))
+        self.repository.put(H(2), fchunk(b"boo"))
         self.repository.delete(H(3))
 
     def repo_dump(self, label=None):
@@ -68,9 +92,9 @@ class RepositoryTestCaseBase(BaseTestCase):
 class RepositoryTestCase(RepositoryTestCaseBase):
     def test1(self):
         for x in range(100):
-            self.repository.put(H(x), b"SOMEDATA")
+            self.repository.put(H(x), fchunk(b"SOMEDATA"))
         key50 = H(50)
-        self.assert_equal(self.repository.get(key50), b"SOMEDATA")
+        self.assert_equal(pdchunk(self.repository.get(key50)), b"SOMEDATA")
         self.repository.delete(key50)
         self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(key50))
         self.repository.commit(compact=False)
@@ -80,55 +104,66 @@ class RepositoryTestCase(RepositoryTestCaseBase):
             for x in range(100):
                 if x == 50:
                     continue
-                self.assert_equal(repository2.get(H(x)), b"SOMEDATA")
+                self.assert_equal(pdchunk(repository2.get(H(x))), b"SOMEDATA")
 
     def test2(self):
         """Test multiple sequential transactions"""
-        self.repository.put(H(0), b"foo")
-        self.repository.put(H(1), b"foo")
+        self.repository.put(H(0), fchunk(b"foo"))
+        self.repository.put(H(1), fchunk(b"foo"))
         self.repository.commit(compact=False)
         self.repository.delete(H(0))
-        self.repository.put(H(1), b"bar")
+        self.repository.put(H(1), fchunk(b"bar"))
+        self.repository.commit(compact=False)
+        self.assert_equal(pdchunk(self.repository.get(H(1))), b"bar")
+
+    def test_read_data(self):
+        meta, data = b"meta", b"data"
+        meta_len = RepoObj.meta_len_hdr.pack(len(meta))
+        chunk_complete = meta_len + meta + data
+        chunk_short = meta_len + meta
+        self.repository.put(H(0), chunk_complete)
         self.repository.commit(compact=False)
-        self.assert_equal(self.repository.get(H(1)), b"bar")
+        self.assert_equal(self.repository.get(H(0)), chunk_complete)
+        self.assert_equal(self.repository.get(H(0), read_data=True), chunk_complete)
+        self.assert_equal(self.repository.get(H(0), read_data=False), chunk_short)
 
     def test_consistency(self):
         """Test cache consistency"""
-        self.repository.put(H(0), b"foo")
-        self.assert_equal(self.repository.get(H(0)), b"foo")
-        self.repository.put(H(0), b"foo2")
-        self.assert_equal(self.repository.get(H(0)), b"foo2")
-        self.repository.put(H(0), b"bar")
-        self.assert_equal(self.repository.get(H(0)), b"bar")
+        self.repository.put(H(0), fchunk(b"foo"))
+        self.assert_equal(pdchunk(self.repository.get(H(0))), b"foo")
+        self.repository.put(H(0), fchunk(b"foo2"))
+        self.assert_equal(pdchunk(self.repository.get(H(0))), b"foo2")
+        self.repository.put(H(0), fchunk(b"bar"))
+        self.assert_equal(pdchunk(self.repository.get(H(0))), b"bar")
         self.repository.delete(H(0))
         self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(H(0)))
 
     def test_consistency2(self):
         """Test cache consistency2"""
-        self.repository.put(H(0), b"foo")
-        self.assert_equal(self.repository.get(H(0)), b"foo")
+        self.repository.put(H(0), fchunk(b"foo"))
+        self.assert_equal(pdchunk(self.repository.get(H(0))), b"foo")
         self.repository.commit(compact=False)
-        self.repository.put(H(0), b"foo2")
-        self.assert_equal(self.repository.get(H(0)), b"foo2")
+        self.repository.put(H(0), fchunk(b"foo2"))
+        self.assert_equal(pdchunk(self.repository.get(H(0))), b"foo2")
         self.repository.rollback()
-        self.assert_equal(self.repository.get(H(0)), b"foo")
+        self.assert_equal(pdchunk(self.repository.get(H(0))), b"foo")
 
     def test_overwrite_in_same_transaction(self):
         """Test cache consistency2"""
-        self.repository.put(H(0), b"foo")
-        self.repository.put(H(0), b"foo2")
+        self.repository.put(H(0), fchunk(b"foo"))
+        self.repository.put(H(0), fchunk(b"foo2"))
         self.repository.commit(compact=False)
-        self.assert_equal(self.repository.get(H(0)), b"foo2")
+        self.assert_equal(pdchunk(self.repository.get(H(0))), b"foo2")
 
     def test_single_kind_transactions(self):
         # put
-        self.repository.put(H(0), b"foo")
+        self.repository.put(H(0), fchunk(b"foo"))
         self.repository.commit(compact=False)
         self.repository.close()
         # replace
         self.repository = self.open()
         with self.repository:
-            self.repository.put(H(0), b"bar")
+            self.repository.put(H(0), fchunk(b"bar"))
             self.repository.commit(compact=False)
         # delete
         self.repository = self.open()
@@ -138,7 +173,7 @@ class RepositoryTestCase(RepositoryTestCaseBase):
 
     def test_list(self):
         for x in range(100):
-            self.repository.put(H(x), b"SOMEDATA")
+            self.repository.put(H(x), fchunk(b"SOMEDATA"))
         self.repository.commit(compact=False)
         all = self.repository.list()
         self.assert_equal(len(all), 100)
@@ -152,7 +187,7 @@ class RepositoryTestCase(RepositoryTestCaseBase):
 
     def test_scan(self):
         for x in range(100):
-            self.repository.put(H(x), b"SOMEDATA")
+            self.repository.put(H(x), fchunk(b"SOMEDATA"))
         self.repository.commit(compact=False)
         all = self.repository.scan()
         assert len(all) == 100
@@ -168,14 +203,14 @@ class RepositoryTestCase(RepositoryTestCaseBase):
             assert all[x] == H(x)
 
     def test_max_data_size(self):
-        max_data = b"x" * MAX_DATA_SIZE
-        self.repository.put(H(0), max_data)
-        self.assert_equal(self.repository.get(H(0)), max_data)
-        self.assert_raises(IntegrityError, lambda: self.repository.put(H(1), max_data + b"x"))
+        max_data = b"x" * (MAX_DATA_SIZE - RepoObj.meta_len_hdr.size)
+        self.repository.put(H(0), fchunk(max_data))
+        self.assert_equal(pdchunk(self.repository.get(H(0))), max_data)
+        self.assert_raises(IntegrityError, lambda: self.repository.put(H(1), fchunk(max_data + b"x")))
 
     def test_set_flags(self):
         id = H(0)
-        self.repository.put(id, b"")
+        self.repository.put(id, fchunk(b""))
         self.assert_equal(self.repository.flags(id), 0x00000000)  # init == all zero
         self.repository.flags(id, mask=0x00000001, value=0x00000001)
         self.assert_equal(self.repository.flags(id), 0x00000001)
@@ -188,7 +223,7 @@ class RepositoryTestCase(RepositoryTestCaseBase):
 
     def test_get_flags(self):
         id = H(0)
-        self.repository.put(id, b"")
+        self.repository.put(id, fchunk(b""))
         self.assert_equal(self.repository.flags(id), 0x00000000)  # init == all zero
         self.repository.flags(id, mask=0xC0000003, value=0x80000001)
         self.assert_equal(self.repository.flags(id, mask=0x00000001), 0x00000001)
@@ -199,7 +234,7 @@ class RepositoryTestCase(RepositoryTestCaseBase):
     def test_flags_many(self):
         ids_flagged = [H(0), H(1)]
         ids_default_flags = [H(2), H(3)]
-        [self.repository.put(id, b"") for id in ids_flagged + ids_default_flags]
+        [self.repository.put(id, fchunk(b"")) for id in ids_flagged + ids_default_flags]
         self.repository.flags_many(ids_flagged, mask=0xFFFFFFFF, value=0xDEADBEEF)
         self.assert_equal(list(self.repository.flags_many(ids_default_flags)), [0x00000000, 0x00000000])
         self.assert_equal(list(self.repository.flags_many(ids_flagged)), [0xDEADBEEF, 0xDEADBEEF])
@@ -207,8 +242,8 @@ class RepositoryTestCase(RepositoryTestCaseBase):
         self.assert_equal(list(self.repository.flags_many(ids_flagged, mask=0x0000FFFF)), [0x0000BEEF, 0x0000BEEF])
 
     def test_flags_persistence(self):
-        self.repository.put(H(0), b"default")
-        self.repository.put(H(1), b"one one zero")
+        self.repository.put(H(0), fchunk(b"default"))
+        self.repository.put(H(1), fchunk(b"one one zero"))
         # we do not set flags for H(0), so we can later check their default state.
         self.repository.flags(H(1), mask=0x00000007, value=0x00000006)
         self.repository.commit(compact=False)
@@ -227,38 +262,39 @@ class LocalRepositoryTestCase(RepositoryTestCaseBase):
 
     def _assert_sparse(self):
         # The superseded 123456... PUT
-        assert self.repository.compact[0] == 41 + 8 + 9
+        assert self.repository.compact[0] == 41 + 8 + len(fchunk(b"123456789"))
         # a COMMIT
         assert self.repository.compact[1] == 9
         # The DELETE issued by the superseding PUT (or issued directly)
         assert self.repository.compact[2] == 41
         self.repository._rebuild_sparse(0)
-        assert self.repository.compact[0] == 41 + 8 + 9
+        assert self.repository.compact[0] == 41 + 8 + len(fchunk(b"123456789"))  # 9 is chunk or commit?
 
     def test_sparse1(self):
-        self.repository.put(H(0), b"foo")
-        self.repository.put(H(1), b"123456789")
+        self.repository.put(H(0), fchunk(b"foo"))
+        self.repository.put(H(1), fchunk(b"123456789"))
         self.repository.commit(compact=False)
-        self.repository.put(H(1), b"bar")
+        self.repository.put(H(1), fchunk(b"bar"))
         self._assert_sparse()
 
     def test_sparse2(self):
-        self.repository.put(H(0), b"foo")
-        self.repository.put(H(1), b"123456789")
+        self.repository.put(H(0), fchunk(b"foo"))
+        self.repository.put(H(1), fchunk(b"123456789"))
         self.repository.commit(compact=False)
         self.repository.delete(H(1))
         self._assert_sparse()
 
     def test_sparse_delete(self):
-        self.repository.put(H(0), b"1245")
+        ch0 = fchunk(b"1245")
+        self.repository.put(H(0), ch0)
         self.repository.delete(H(0))
         self.repository.io._write_fd.sync()
 
         # The on-line tracking works on a per-object basis...
-        assert self.repository.compact[0] == 41 + 8 + 41 + 4
+        assert self.repository.compact[0] == 41 + 8 + 41 + len(ch0)
         self.repository._rebuild_sparse(0)
         # ...while _rebuild_sparse can mark whole segments as completely sparse (which then includes the segment magic)
-        assert self.repository.compact[0] == 41 + 8 + 41 + 4 + len(MAGIC)
+        assert self.repository.compact[0] == 41 + 8 + 41 + len(ch0) + len(MAGIC)
 
         self.repository.commit(compact=True)
         assert 0 not in [segment for segment, _ in self.repository.io.segment_iterator()]
@@ -266,7 +302,7 @@ class LocalRepositoryTestCase(RepositoryTestCaseBase):
     def test_uncommitted_garbage(self):
         # uncommitted garbage should be no problem, it is cleaned up automatically.
         # we just have to be careful with invalidation of cached FDs in LoggedIO.
-        self.repository.put(H(0), b"foo")
+        self.repository.put(H(0), fchunk(b"foo"))
         self.repository.commit(compact=False)
         # write some crap to a uncommitted segment file
         last_segment = self.repository.io.get_latest_segment()
@@ -276,7 +312,7 @@ class LocalRepositoryTestCase(RepositoryTestCaseBase):
         # usually, opening the repo and starting a transaction should trigger a cleanup.
         self.repository = self.open()
         with self.repository:
-            self.repository.put(H(0), b"bar")  # this may trigger compact_segments()
+            self.repository.put(H(0), fchunk(b"bar"))  # this may trigger compact_segments()
             self.repository.commit(compact=True)
         # the point here is that nothing blows up with an exception.
 
@@ -363,8 +399,8 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
             assert not io.is_committed_segment(io.get_latest_segment())
 
     def test_moved_deletes_are_tracked(self):
-        self.repository.put(H(1), b"1")
-        self.repository.put(H(2), b"2")
+        self.repository.put(H(1), fchunk(b"1"))
+        self.repository.put(H(2), fchunk(b"2"))
         self.repository.commit(compact=False)
         self.repo_dump("p1 p2 c")
         self.repository.delete(H(1))
@@ -378,7 +414,7 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
                 num_deletes += 1
         assert num_deletes == 1
         assert last_segment in self.repository.compact
-        self.repository.put(H(3), b"3")
+        self.repository.put(H(3), fchunk(b"3"))
         self.repository.commit(compact=True)
         self.repo_dump("p3 cc")
         assert last_segment not in self.repository.compact
@@ -393,7 +429,7 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
 
     def test_shadowed_entries_are_preserved(self):
         get_latest_segment = self.repository.io.get_latest_segment
-        self.repository.put(H(1), b"1")
+        self.repository.put(H(1), fchunk(b"1"))
         # This is the segment with our original PUT of interest
         put_segment = get_latest_segment()
         self.repository.commit(compact=False)
@@ -401,7 +437,7 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
         # We now delete H(1), and force this segment to not be compacted, which can happen
         # if it's not sparse enough (symbolized by H(2) here).
         self.repository.delete(H(1))
-        self.repository.put(H(2), b"1")
+        self.repository.put(H(2), fchunk(b"1"))
         delete_segment = get_latest_segment()
 
         # We pretend these are mostly dense (not sparse) and won't be compacted
@@ -426,7 +462,7 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
         assert H(1) not in self.repository
 
     def test_shadow_index_rollback(self):
-        self.repository.put(H(1), b"1")
+        self.repository.put(H(1), fchunk(b"1"))
         self.repository.delete(H(1))
         assert self.repository.shadow_index[H(1)] == [0]
         self.repository.commit(compact=True)
@@ -440,7 +476,7 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
         assert self.repository.shadow_index[H(1)] == [4]
         self.repository.rollback()
         self.repo_dump("r")
-        self.repository.put(H(2), b"1")
+        self.repository.put(H(2), fchunk(b"1"))
         # After the rollback segment 4 shouldn't be considered anymore
         assert self.repository.shadow_index[H(1)] == []  # because the delete is considered unstable
 
@@ -459,19 +495,19 @@ class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase):
         def segments_in_repository():
             return len(list(self.repository.io.segment_iterator()))
 
-        self.repository.put(H(0), b"foo")
+        self.repository.put(H(0), fchunk(b"foo"))
         self.repository.commit(compact=False)
 
         self.repository.append_only = False
         assert segments_in_repository() == 2
-        self.repository.put(H(0), b"foo")
+        self.repository.put(H(0), fchunk(b"foo"))
         self.repository.commit(compact=True)
         # normal: compact squashes the data together, only one segment
         assert segments_in_repository() == 2
 
         self.repository.append_only = True
         assert segments_in_repository() == 2
-        self.repository.put(H(0), b"foo")
+        self.repository.put(H(0), fchunk(b"foo"))
         self.repository.commit(compact=False)
         # append only: does not compact, only new segments written
         assert segments_in_repository() == 4
@@ -485,7 +521,7 @@ class RepositoryFreeSpaceTestCase(RepositoryTestCaseBase):
         self.reopen()
 
         with self.repository:
-            self.repository.put(H(0), b"foobar")
+            self.repository.put(H(0), fchunk(b"foobar"))
             with pytest.raises(Repository.InsufficientFreeSpaceError):
                 self.repository.commit(compact=False)
         assert os.path.exists(self.repository.path)
@@ -500,45 +536,52 @@ class RepositoryFreeSpaceTestCase(RepositoryTestCaseBase):
 class QuotaTestCase(RepositoryTestCaseBase):
     def test_tracking(self):
         assert self.repository.storage_quota_use == 0
-        self.repository.put(H(1), bytes(1234))
-        assert self.repository.storage_quota_use == 1234 + 41 + 8
-        self.repository.put(H(2), bytes(5678))
-        assert self.repository.storage_quota_use == 1234 + 5678 + 2 * (41 + 8)
+        ch1 = fchunk(bytes(1234))
+        self.repository.put(H(1), ch1)
+        assert self.repository.storage_quota_use == len(ch1) + 41 + 8
+        ch2 = fchunk(bytes(5678))
+        self.repository.put(H(2), ch2)
+        assert self.repository.storage_quota_use == len(ch1) + len(ch2) + 2 * (41 + 8)
         self.repository.delete(H(1))
-        assert self.repository.storage_quota_use == 1234 + 5678 + 2 * (41 + 8)  # we have not compacted yet
+        assert self.repository.storage_quota_use == len(ch1) + len(ch2) + 2 * (41 + 8)  # we have not compacted yet
         self.repository.commit(compact=False)
-        assert self.repository.storage_quota_use == 1234 + 5678 + 2 * (41 + 8)  # we have not compacted yet
+        assert self.repository.storage_quota_use == len(ch1) + len(ch2) + 2 * (41 + 8)  # we have not compacted yet
         self.reopen()
         with self.repository:
             # Open new transaction; hints and thus quota data is not loaded unless needed.
-            self.repository.put(H(3), b"")
+            ch3 = fchunk(b"")
+            self.repository.put(H(3), ch3)
             self.repository.delete(H(3))
-            assert self.repository.storage_quota_use == 1234 + 5678 + 3 * (41 + 8)  # we have not compacted yet
+            assert self.repository.storage_quota_use == len(ch1) + len(ch2) + len(ch3) + 3 * (
+                41 + 8
+            )  # we have not compacted yet
             self.repository.commit(compact=True)
-            assert self.repository.storage_quota_use == 5678 + 41 + 8
+            assert self.repository.storage_quota_use == len(ch2) + 41 + 8
 
     def test_exceed_quota(self):
         assert self.repository.storage_quota_use == 0
         self.repository.storage_quota = 80
-        self.repository.put(H(1), b"")
-        assert self.repository.storage_quota_use == 41 + 8
+        ch1 = fchunk(b"x" * 7)
+        self.repository.put(H(1), ch1)
+        assert self.repository.storage_quota_use == len(ch1) + 41 + 8
         self.repository.commit(compact=False)
         with pytest.raises(Repository.StorageQuotaExceeded):
-            self.repository.put(H(2), b"")
-        assert self.repository.storage_quota_use == (41 + 8) * 2
+            ch2 = fchunk(b"y" * 13)
+            self.repository.put(H(2), ch2)
+        assert self.repository.storage_quota_use == len(ch1) + len(ch2) + (41 + 8) * 2  # check ch2!?
         with pytest.raises(Repository.StorageQuotaExceeded):
             self.repository.commit(compact=False)
-        assert self.repository.storage_quota_use == (41 + 8) * 2
+        assert self.repository.storage_quota_use == len(ch1) + len(ch2) + (41 + 8) * 2  # check ch2!?
         self.reopen()
         with self.repository:
             self.repository.storage_quota = 150
             # Open new transaction; hints and thus quota data is not loaded unless needed.
-            self.repository.put(H(1), b"")
+            self.repository.put(H(1), ch1)
             assert (
-                self.repository.storage_quota_use == (41 + 8) * 2
+                self.repository.storage_quota_use == len(ch1) * 2 + (41 + 8) * 2
             )  # we have 2 puts for H(1) here and not yet compacted.
             self.repository.commit(compact=True)
-            assert self.repository.storage_quota_use == 41 + 8  # now we have compacted.
+            assert self.repository.storage_quota_use == len(ch1) + 41 + 8  # now we have compacted.
 
 
 class NonceReservation(RepositoryTestCaseBase):
@@ -586,13 +629,13 @@ class NonceReservation(RepositoryTestCaseBase):
 class RepositoryAuxiliaryCorruptionTestCase(RepositoryTestCaseBase):
     def setUp(self):
         super().setUp()
-        self.repository.put(H(0), b"foo")
+        self.repository.put(H(0), fchunk(b"foo"))
         self.repository.commit(compact=False)
         self.repository.close()
 
     def do_commit(self):
         with self.repository:
-            self.repository.put(H(0), b"fox")
+            self.repository.put(H(0), fchunk(b"fox"))
             self.repository.commit(compact=False)
 
     def test_corrupted_hints(self):
@@ -648,7 +691,7 @@ class RepositoryAuxiliaryCorruptionTestCase(RepositoryTestCaseBase):
             # Data corruption is detected due to mismatching checksums
             # and fixed by rebuilding the index.
             assert len(self.repository) == 1
-            assert self.repository.get(H(0)) == b"foo"
+            assert pdchunk(self.repository.get(H(0))) == b"foo"
 
     def test_index_corrupted_without_integrity(self):
         self._corrupt_index()
@@ -684,17 +727,17 @@ class RepositoryAuxiliaryCorruptionTestCase(RepositoryTestCaseBase):
         with self.repository:
             # No issues accessing the repository
             assert len(self.repository) == 1
-            assert self.repository.get(H(0)) == b"foo"
+            assert pdchunk(self.repository.get(H(0))) == b"foo"
 
     def _subtly_corrupted_hints_setup(self):
         with self.repository:
             self.repository.append_only = True
             assert len(self.repository) == 1
-            assert self.repository.get(H(0)) == b"foo"
-            self.repository.put(H(1), b"bar")
-            self.repository.put(H(2), b"baz")
+            assert pdchunk(self.repository.get(H(0))) == b"foo"
+            self.repository.put(H(1), fchunk(b"bar"))
+            self.repository.put(H(2), fchunk(b"baz"))
             self.repository.commit(compact=False)
-            self.repository.put(H(2), b"bazz")
+            self.repository.put(H(2), fchunk(b"bazz"))
             self.repository.commit(compact=False)
 
         hints_path = os.path.join(self.repository.path, "hints.5")
@@ -711,14 +754,14 @@ class RepositoryAuxiliaryCorruptionTestCase(RepositoryTestCaseBase):
         self._subtly_corrupted_hints_setup()
         with self.repository:
             self.repository.append_only = False
-            self.repository.put(H(3), b"1234")
+            self.repository.put(H(3), fchunk(b"1234"))
             # Do a compaction run. Succeeds, since the failed checksum prompted a rebuild of the index+hints.
             self.repository.commit(compact=True)
 
             assert len(self.repository) == 4
-            assert self.repository.get(H(0)) == b"foo"
-            assert self.repository.get(H(1)) == b"bar"
-            assert self.repository.get(H(2)) == b"bazz"
+            assert pdchunk(self.repository.get(H(0))) == b"foo"
+            assert pdchunk(self.repository.get(H(1))) == b"bar"
+            assert pdchunk(self.repository.get(H(2))) == b"bazz"
 
     def test_subtly_corrupted_hints_without_integrity(self):
         self._subtly_corrupted_hints_setup()
@@ -726,7 +769,7 @@ class RepositoryAuxiliaryCorruptionTestCase(RepositoryTestCaseBase):
         os.unlink(integrity_path)
         with self.repository:
             self.repository.append_only = False
-            self.repository.put(H(3), b"1234")
+            self.repository.put(H(3), fchunk(b"1234"))
             # Do a compaction run. Fails, since the corrupted refcount was not detected and leads to an assertion failure.
             with pytest.raises(AssertionError) as exc_info:
                 self.repository.commit(compact=True)
@@ -748,12 +791,12 @@ class RepositoryCheckTestCase(RepositoryTestCaseBase):
 
     def get_objects(self, *ids):
         for id_ in ids:
-            self.repository.get(H(id_))
+            pdchunk(self.repository.get(H(id_)))
 
     def add_objects(self, segments):
         for ids in segments:
             for id_ in ids:
-                self.repository.put(H(id_), b"data")
+                self.repository.put(H(id_), fchunk(b"data"))
             self.repository.commit(compact=False)
 
     def get_head(self):
@@ -859,8 +902,8 @@ class RepositoryCheckTestCase(RepositoryTestCaseBase):
         self.assert_equal({1, 2, 3, 4, 5, 6}, self.list_objects())
 
     def test_crash_before_compact(self):
-        self.repository.put(H(0), b"data")
-        self.repository.put(H(0), b"data2")
+        self.repository.put(H(0), fchunk(b"data"))
+        self.repository.put(H(0), fchunk(b"data2"))
         # Simulate a crash before compact
         with patch.object(Repository, "compact_segments") as compact:
             self.repository.commit(compact=True)
@@ -868,12 +911,12 @@ class RepositoryCheckTestCase(RepositoryTestCaseBase):
         self.reopen()
         with self.repository:
             self.check(repair=True)
-            self.assert_equal(self.repository.get(H(0)), b"data2")
+            self.assert_equal(pdchunk(self.repository.get(H(0))), b"data2")
 
 
 class RepositoryHintsTestCase(RepositoryTestCaseBase):
     def test_hints_persistence(self):
-        self.repository.put(H(0), b"data")
+        self.repository.put(H(0), fchunk(b"data"))
         self.repository.delete(H(0))
         self.repository.commit(compact=False)
         shadow_index_expected = self.repository.shadow_index
@@ -884,7 +927,7 @@ class RepositoryHintsTestCase(RepositoryTestCaseBase):
         self.reopen()
         with self.repository:
             # see also do_compact()
-            self.repository.put(H(42), b"foobar")  # this will call prepare_txn() and load the hints data
+            self.repository.put(H(42), fchunk(b"foobar"))  # this will call prepare_txn() and load the hints data
             # check if hints persistence worked:
             self.assert_equal(shadow_index_expected, self.repository.shadow_index)
             self.assert_equal(compact_expected, self.repository.compact)
@@ -892,7 +935,7 @@ class RepositoryHintsTestCase(RepositoryTestCaseBase):
             self.assert_equal(segments_expected, self.repository.segments)
 
     def test_hints_behaviour(self):
-        self.repository.put(H(0), b"data")
+        self.repository.put(H(0), fchunk(b"data"))
         self.assert_equal(self.repository.shadow_index, {})
         assert len(self.repository.compact) == 0
         self.repository.delete(H(0))
@@ -901,7 +944,7 @@ class RepositoryHintsTestCase(RepositoryTestCaseBase):
         self.assert_in(H(0), self.repository.shadow_index)
         self.assert_equal(len(self.repository.shadow_index[H(0)]), 1)
         self.assert_in(0, self.repository.compact)  # segment 0 can be compacted
-        self.repository.put(H(42), b"foobar")  # see also do_compact()
+        self.repository.put(H(42), fchunk(b"foobar"))  # see also do_compact()
         self.repository.commit(compact=True, threshold=0.0)  # compact completely!
         # nothing to compact any more! no info left about stuff that does not exist any more:
         self.assert_not_in(H(0), self.repository.shadow_index)
@@ -1041,13 +1084,13 @@ class RemoteLegacyFree(RepositoryTestCaseBase):
 
     def test_legacy_free(self):
         # put
-        self.repository.put(H(0), b"foo")
+        self.repository.put(H(0), fchunk(b"foo"))
         self.repository.commit(compact=False)
         self.repository.close()
         # replace
         self.repository = self.open()
         with self.repository:
-            self.repository.put(H(0), b"bar")
+            self.repository.put(H(0), fchunk(b"bar"))
             self.repository.commit(compact=False)
         # delete
         self.repository = self.open()