Selaa lähdekoodia

Merge pull request #6514 from ThomasWaldmann/repo-v2

repository v2
TW 3 vuotta sitten
vanhempi
sitoutus
1b95950613

+ 19 - 10
docs/internals/data-structures.rst

@@ -59,7 +59,7 @@ Each repository has a ``config`` file which is a ``INI``-style file
 and looks like this::
 and looks like this::
 
 
     [repository]
     [repository]
-    version = 1
+    version = 2
     segments_per_dir = 1000
     segments_per_dir = 1000
     max_segment_size = 524288000
     max_segment_size = 524288000
     id = 57d6c1d52ce76a836b532b0e42e677dec6af9fca3673db511279358828a21ed6
     id = 57d6c1d52ce76a836b532b0e42e677dec6af9fca3673db511279358828a21ed6
@@ -94,18 +94,27 @@ this value in a non-empty repository, you may also need to relocate the segment
 files manually.
 files manually.
 
 
 A segment starts with a magic number (``BORG_SEG`` as an eight byte ASCII string),
 A segment starts with a magic number (``BORG_SEG`` as an eight byte ASCII string),
-followed by a number of log entries. Each log entry consists of: (in this order)
-
-* First, unsigned 32-bit number, the CRC32 of the entire entry (for a PUT including the DATA) excluding the CRC32 field
-* Second, unsigned 32-bit size of the entry (including the whole header)
-* Third, unsigned 8-bit entry tag: PUT(1), DELETE(2) or COMMIT(3)
-* Fourth, on PUT or DELETE, 32 byte key
-* Fifth, PUT only, (size - 41) bytes of data (length = size - sizeof(CRC32) - sizeof(size) - sizeof(entry tag) - sizeof(key))
+followed by a number of log entries. Each log entry consists of (in this order):
+
+* crc32 checksum (uint32):
+  - for PUT2: CRC32(size + tag + key + digest)
+  - for PUT: CRC32(size + tag + key + data)
+  - for DELETE: CRC32(size + tag + key)
+  - for COMMIT: CRC32(size + tag)
+* size (uint32) of the entry (including the whole header)
+* tag (uint8): PUT(0), DELETE(1), COMMIT(2) or PUT2(3)
+* key (256 bit) - only for PUT/PUT2/DELETE
+* data (size - 41 bytes) - only for PUT
+* xxh64 digest (64 bit) = XXH64(size + tag + key + data) - only for PUT2
+* data (size - 41 - 8 bytes) - only for PUT2
+
+PUT2 is new since repository version 2. For new log entries PUT2 is used.
+PUT is still supported to read version 1 repositories, but not generated any more.
+If we talk about ``PUT`` in general, it shall usually mean PUT2 for repository
+version 2+.
 
 
 Those files are strictly append-only and modified only once.
 Those files are strictly append-only and modified only once.
 
 
-Tag is either ``PUT``, ``DELETE``, or ``COMMIT``.
-
 When an object is written to the repository a ``PUT`` entry is written
 When an object is written to the repository a ``PUT`` entry is written
 to the file containing the object id and data. If an object is deleted
 to the file containing the object id and data. If an object is deleted
 a ``DELETE`` entry is appended with the object id.
 a ``DELETE`` entry is appended with the object id.

+ 7 - 6
src/borg/constants.py

@@ -33,14 +33,15 @@ CACHE_TAG_CONTENTS = b'Signature: 8a477f597d28d172789f06886806bc55'
 # bytes. That's why it's 500 MiB instead of 512 MiB.
 # bytes. That's why it's 500 MiB instead of 512 MiB.
 DEFAULT_MAX_SEGMENT_SIZE = 500 * 1024 * 1024
 DEFAULT_MAX_SEGMENT_SIZE = 500 * 1024 * 1024
 
 
-# 20 MiB minus 41 bytes for a Repository header (because the "size" field in the Repository includes
-# the header, and the total size was set to 20 MiB).
+# in borg < 1.3, this has been defined like this:
+# 20 MiB minus 41 bytes for a PUT header (because the "size" field in the Repository includes
+# the header, and the total size was set to precisely 20 MiB for borg < 1.3).
 MAX_DATA_SIZE = 20971479
 MAX_DATA_SIZE = 20971479
 
 
-# MAX_OBJECT_SIZE = <20 MiB (MAX_DATA_SIZE) + 41 bytes for a Repository PUT header, which consists of
-# a 1 byte tag ID, 4 byte CRC, 4 byte size and 32 bytes for the ID.
-MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41  # see LoggedIO.put_header_fmt.size assertion in repository module
-assert MAX_OBJECT_SIZE == 20 * 1024 * 1024
+# MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT2 header)
+# note: for borg >= 1.3, this makes the MAX_OBJECT_SIZE grow slightly over the precise 20MiB used by
+# borg < 1.3, but this is not expected to cause any issues.
+MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41 + 8  # see assertion at end of repository module
 
 
 # repo config max_segment_size value must be below this limit to stay within uint32 offsets:
 # repo config max_segment_size value must be below this limit to stay within uint32 offsets:
 MAX_SEGMENT_SIZE_LIMIT = 2 ** 32 - MAX_OBJECT_SIZE
 MAX_SEGMENT_SIZE_LIMIT = 2 ** 32 - MAX_OBJECT_SIZE

+ 126 - 55
src/borg/repository.py

@@ -25,7 +25,7 @@ from .locking import Lock, LockError, LockErrorT
 from .logger import create_logger
 from .logger import create_logger
 from .lrucache import LRUCache
 from .lrucache import LRUCache
 from .platform import SaveFile, SyncFile, sync_dir, safe_fadvise
 from .platform import SaveFile, SyncFile, sync_dir, safe_fadvise
-from .checksums import crc32
+from .checksums import crc32, StreamingXXH64
 from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
 from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
 
 
 logger = create_logger(__name__)
 logger = create_logger(__name__)
@@ -34,9 +34,11 @@ MAGIC = b'BORG_SEG'
 MAGIC_LEN = len(MAGIC)
 MAGIC_LEN = len(MAGIC)
 ATTIC_MAGIC = b'ATTICSEG'
 ATTIC_MAGIC = b'ATTICSEG'
 assert len(ATTIC_MAGIC) == MAGIC_LEN
 assert len(ATTIC_MAGIC) == MAGIC_LEN
+
 TAG_PUT = 0
 TAG_PUT = 0
 TAG_DELETE = 1
 TAG_DELETE = 1
 TAG_COMMIT = 2
 TAG_COMMIT = 2
+TAG_PUT2 = 3
 
 
 # Highest ID usable as TAG_* value
 # Highest ID usable as TAG_* value
 #
 #
@@ -163,6 +165,7 @@ class Repository:
                  make_parent_dirs=False):
                  make_parent_dirs=False):
         self.path = os.path.abspath(path)
         self.path = os.path.abspath(path)
         self._location = Location('file://%s' % self.path)
         self._location = Location('file://%s' % self.path)
+        self.version = None
         self.io = None  # type: LoggedIO
         self.io = None  # type: LoggedIO
         self.lock = None
         self.lock = None
         self.index = None
         self.index = None
@@ -284,7 +287,8 @@ class Repository:
         os.mkdir(os.path.join(path, 'data'))
         os.mkdir(os.path.join(path, 'data'))
         config = ConfigParser(interpolation=None)
         config = ConfigParser(interpolation=None)
         config.add_section('repository')
         config.add_section('repository')
-        config.set('repository', 'version', '1')
+        self.version = 2
+        config.set('repository', 'version', str(self.version))
         config.set('repository', 'segments_per_dir', str(DEFAULT_SEGMENTS_PER_DIR))
         config.set('repository', 'segments_per_dir', str(DEFAULT_SEGMENTS_PER_DIR))
         config.set('repository', 'max_segment_size', str(DEFAULT_MAX_SEGMENT_SIZE))
         config.set('repository', 'max_segment_size', str(DEFAULT_MAX_SEGMENT_SIZE))
         config.set('repository', 'append_only', str(int(self.append_only)))
         config.set('repository', 'append_only', str(int(self.append_only)))
@@ -440,7 +444,11 @@ class Repository:
         except FileNotFoundError:
         except FileNotFoundError:
             self.close()
             self.close()
             raise self.InvalidRepository(self.path)
             raise self.InvalidRepository(self.path)
-        if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1:
+        if 'repository' not in self.config.sections():
+            self.close()
+            raise self.InvalidRepository(path)
+        self.version = self.config.getint('repository', 'version')
+        if self.version not in (2, ):  # for now, only work on new repos
             self.close()
             self.close()
             raise self.InvalidRepository(path)
             raise self.InvalidRepository(path)
         self.max_segment_size = parse_file_size(self.config.get('repository', 'max_segment_size'))
         self.max_segment_size = parse_file_size(self.config.get('repository', 'max_segment_size'))
@@ -788,7 +796,7 @@ class Repository:
                     continue
                     continue
                 in_index = self.index.get(key)
                 in_index = self.index.get(key)
                 is_index_object = in_index == (segment, offset)
                 is_index_object = in_index == (segment, offset)
-                if tag == TAG_PUT and is_index_object:
+                if tag in (TAG_PUT2, TAG_PUT) and is_index_object:
                     try:
                     try:
                         new_segment, offset = self.io.write_put(key, data, raise_full=True)
                         new_segment, offset = self.io.write_put(key, data, raise_full=True)
                     except LoggedIO.SegmentFull:
                     except LoggedIO.SegmentFull:
@@ -798,7 +806,10 @@ class Repository:
                     segments.setdefault(new_segment, 0)
                     segments.setdefault(new_segment, 0)
                     segments[new_segment] += 1
                     segments[new_segment] += 1
                     segments[segment] -= 1
                     segments[segment] -= 1
-                elif tag == TAG_PUT and not is_index_object:
+                    if tag == TAG_PUT:
+                        # old tag is PUT, but new will be PUT2 and use a bit more storage
+                        self.storage_quota_use += self.io.ENTRY_HASH_SIZE
+                elif tag in (TAG_PUT2, TAG_PUT) and not is_index_object:
                     # If this is a PUT shadowed by a later tag, then it will be gone when this segment is deleted after
                     # If this is a PUT shadowed by a later tag, then it will be gone when this segment is deleted after
                     # this loop. Therefore it is removed from the shadow index.
                     # this loop. Therefore it is removed from the shadow index.
                     try:
                     try:
@@ -807,7 +818,10 @@ class Repository:
                         # do not remove entry with empty shadowed_segments list here,
                         # do not remove entry with empty shadowed_segments list here,
                         # it is needed for shadowed_put_exists code (see below)!
                         # it is needed for shadowed_put_exists code (see below)!
                         pass
                         pass
-                    self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE
+                    if tag == TAG_PUT2:
+                        self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE
+                    elif tag == TAG_PUT:
+                        self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE
                 elif tag == TAG_DELETE and not in_index:
                 elif tag == TAG_DELETE and not in_index:
                     # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
                     # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
                     # therefore we do not drop the delete, but write it to a current segment.
                     # therefore we do not drop the delete, but write it to a current segment.
@@ -830,7 +844,7 @@ class Repository:
                         # Consider the following series of operations if we would not do this, ie. this entire if:
                         # Consider the following series of operations if we would not do this, ie. this entire if:
                         # would be removed.
                         # would be removed.
                         # Columns are segments, lines are different keys (line 1 = some key, line 2 = some other key)
                         # Columns are segments, lines are different keys (line 1 = some key, line 2 = some other key)
-                        # Legend: P=TAG_PUT, D=TAG_DELETE, c=commit, i=index is written for latest commit
+                        # Legend: P=TAG_PUT/TAG_PUT2, D=TAG_DELETE, c=commit, i=index is written for latest commit
                         #
                         #
                         # Segment | 1     | 2   | 3
                         # Segment | 1     | 2   | 3
                         # --------+-------+-----+------
                         # --------+-------+-----+------
@@ -899,7 +913,7 @@ class Repository:
         """some code shared between replay_segments and check"""
         """some code shared between replay_segments and check"""
         self.segments[segment] = 0
         self.segments[segment] = 0
         for tag, key, offset, size in objects:
         for tag, key, offset, size in objects:
-            if tag == TAG_PUT:
+            if tag in (TAG_PUT2, TAG_PUT):
                 try:
                 try:
                     # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
                     # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
                     s, _ = self.index[key]
                     s, _ = self.index[key]
@@ -950,7 +964,7 @@ class Repository:
 
 
         self.compact[segment] = 0
         self.compact[segment] = 0
         for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
         for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
-            if tag == TAG_PUT:
+            if tag in (TAG_PUT2, TAG_PUT):
                 if self.index.get(key, (-1, -1)) != (segment, offset):
                 if self.index.get(key, (-1, -1)) != (segment, offset):
                     # This PUT is superseded later
                     # This PUT is superseded later
                     self.compact[segment] += size
                     self.compact[segment] += size
@@ -1169,7 +1183,7 @@ class Repository:
                     # also, for the next segment, we need to start at offset 0.
                     # also, for the next segment, we need to start at offset 0.
                     start_offset = 0
                     start_offset = 0
                     continue
                     continue
-                if tag == TAG_PUT and (segment, offset) == self.index.get(id):
+                if tag in (TAG_PUT2, TAG_PUT) and (segment, offset) == self.index.get(id):
                     # we have found an existing and current object
                     # we have found an existing and current object
                     result.append(id)
                     result.append(id)
                     if len(result) == limit:
                     if len(result) == limit:
@@ -1208,7 +1222,7 @@ class Repository:
             # be in the repo index (and we won't need it in the shadow_index).
             # be in the repo index (and we won't need it in the shadow_index).
             self._delete(id, segment, offset, update_shadow_index=False)
             self._delete(id, segment, offset, update_shadow_index=False)
         segment, offset = self.io.write_put(id, data)
         segment, offset = self.io.write_put(id, data)
-        self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE
+        self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE
         self.segments.setdefault(segment, 0)
         self.segments.setdefault(segment, 0)
         self.segments[segment] += 1
         self.segments[segment] += 1
         self.index[id] = segment, offset
         self.index[id] = segment, offset
@@ -1278,6 +1292,7 @@ class LoggedIO:
     COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
     COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
 
 
     HEADER_ID_SIZE = header_fmt.size + 32
     HEADER_ID_SIZE = header_fmt.size + 32
+    ENTRY_HASH_SIZE = 8
 
 
     def __init__(self, path, limit, segments_per_dir, capacity=90):
     def __init__(self, path, limit, segments_per_dir, capacity=90):
         self.path = path
         self.path = path
@@ -1475,7 +1490,8 @@ class LoggedIO:
         Return object iterator for *segment*.
         Return object iterator for *segment*.
 
 
         If read_data is False then include_data must be False as well.
         If read_data is False then include_data must be False as well.
-        Integrity checks are skipped: all data obtained from the iterator must be considered informational.
+
+        See the _read() docstring about confidence in the returned data.
 
 
         The iterator returns four-tuples of (tag, key, offset, data|size).
         The iterator returns four-tuples of (tag, key, offset, data|size).
         """
         """
@@ -1491,7 +1507,7 @@ class LoggedIO:
         header = fd.read(self.header_fmt.size)
         header = fd.read(self.header_fmt.size)
         while header:
         while header:
             size, tag, key, data = self._read(fd, header, segment, offset,
             size, tag, key, data = self._read(fd, header, segment, offset,
-                                              (TAG_PUT, TAG_DELETE, TAG_COMMIT),
+                                              (TAG_PUT2, TAG_DELETE, TAG_COMMIT, TAG_PUT),
                                               read_data=read_data)
                                               read_data=read_data)
             if include_data:
             if include_data:
                 yield tag, key, offset, data
                 yield tag, key, offset, data
@@ -1528,8 +1544,25 @@ class LoggedIO:
                         dst_fd.write(MAGIC)
                         dst_fd.write(MAGIC)
                         while len(d) >= self.header_fmt.size:
                         while len(d) >= self.header_fmt.size:
                             crc, size, tag = self.header_fmt.unpack(d[:self.header_fmt.size])
                             crc, size, tag = self.header_fmt.unpack(d[:self.header_fmt.size])
-                            if size > MAX_OBJECT_SIZE or tag > MAX_TAG_ID or size < self.header_fmt.size \
-                               or size > len(d) or crc32(d[4:size]) & 0xffffffff != crc:
+                            size_invalid = size > MAX_OBJECT_SIZE or size < self.header_fmt.size or size > len(d)
+                            if size_invalid or tag > MAX_TAG_ID:
+                                d = d[1:]
+                                continue
+                            if tag == TAG_PUT2:
+                                c_offset = self.HEADER_ID_SIZE + self.ENTRY_HASH_SIZE
+                                # skip if header is invalid
+                                if crc32(d[4:c_offset]) & 0xffffffff != crc:
+                                    d = d[1:]
+                                    continue
+                                # skip if content is invalid
+                                if self.entry_hash(d[4:self.HEADER_ID_SIZE], d[c_offset:size]) != d[self.HEADER_ID_SIZE:c_offset]:
+                                    d = d[1:]
+                                    continue
+                            elif tag in (TAG_DELETE, TAG_COMMIT, TAG_PUT):
+                                if crc32(d[4:size]) & 0xffffffff != crc:
+                                    d = d[1:]
+                                    continue
+                            else:  # tag unknown
                                 d = d[1:]
                                 d = d[1:]
                                 continue
                                 continue
                             dst_fd.write(d[:size])
                             dst_fd.write(d[:size])
@@ -1538,72 +1571,108 @@ class LoggedIO:
                         del d
                         del d
                         data.release()
                         data.release()
 
 
+    def entry_hash(self, *data):
+        h = StreamingXXH64()
+        for d in data:
+            h.update(d)
+        return h.digest()
+
     def read(self, segment, offset, id, read_data=True):
     def read(self, segment, offset, id, read_data=True):
         """
         """
         Read entry from *segment* at *offset* with *id*.
         Read entry from *segment* at *offset* with *id*.
+        If read_data is False the size of the entry is returned instead.
 
 
-        If read_data is False the size of the entry is returned instead and integrity checks are skipped.
-        The return value should thus be considered informational.
+        See the _read() docstring about confidence in the returned data.
         """
         """
         if segment == self.segment and self._write_fd:
         if segment == self.segment and self._write_fd:
             self._write_fd.sync()
             self._write_fd.sync()
         fd = self.get_fd(segment)
         fd = self.get_fd(segment)
         fd.seek(offset)
         fd.seek(offset)
         header = fd.read(self.header_fmt.size)
         header = fd.read(self.header_fmt.size)
-        size, tag, key, data = self._read(fd, header, segment, offset, (TAG_PUT,), read_data)
+        size, tag, key, data = self._read(fd, header, segment, offset, (TAG_PUT2, TAG_PUT), read_data)
         if id != key:
         if id != key:
             raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
             raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
                 segment, offset))
                 segment, offset))
         return data if read_data else size
         return data if read_data else size
 
 
     def _read(self, fd, header, segment, offset, acceptable_tags, read_data=True):
     def _read(self, fd, header, segment, offset, acceptable_tags, read_data=True):
-        # some code shared by read() and iter_objects()
+        """
+        Code shared by read() and iter_objects().
+
+        Confidence in returned data:
+        PUT2 tags, read_data == True: crc32 check (header) plus digest check (header+data)
+        PUT2 tags, read_data == False: crc32 check (header)
+        PUT tags, read_data == True: crc32 check (header+data)
+        PUT tags, read_data == False: crc32 check can not be done, all data obtained must be considered informational
+        """
+        def check_crc32(wanted, header, *data):
+            result = crc32(memoryview(header)[4:])  # skip first 32 bits of the header, they contain the crc.
+            for d in data:
+                result = crc32(d, result)
+            if result & 0xffffffff != wanted:
+                raise IntegrityError(f'Segment entry header checksum mismatch [segment {segment}, offset {offset}]')
+
         # See comment on MAX_TAG_ID for details
         # See comment on MAX_TAG_ID for details
         assert max(acceptable_tags) <= MAX_TAG_ID, 'Exceeding MAX_TAG_ID will break backwards compatibility'
         assert max(acceptable_tags) <= MAX_TAG_ID, 'Exceeding MAX_TAG_ID will break backwards compatibility'
+        key = data = None
         fmt = self.header_fmt
         fmt = self.header_fmt
         try:
         try:
             hdr_tuple = fmt.unpack(header)
             hdr_tuple = fmt.unpack(header)
         except struct.error as err:
         except struct.error as err:
-            raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format(
-                segment, offset, err)) from None
+            raise IntegrityError(f'Invalid segment entry header [segment {segment}, offset {offset}]: {err}') from None
         crc, size, tag = hdr_tuple
         crc, size, tag = hdr_tuple
         length = size - fmt.size  # we already read the header
         length = size - fmt.size  # we already read the header
         if size > MAX_OBJECT_SIZE:
         if size > MAX_OBJECT_SIZE:
             # if you get this on an archive made with borg < 1.0.7 and millions of files and
             # if you get this on an archive made with borg < 1.0.7 and millions of files and
             # you need to restore it, you can disable this check by using "if False:" above.
             # you need to restore it, you can disable this check by using "if False:" above.
-            raise IntegrityError('Invalid segment entry size {} - too big [segment {}, offset {}]'.format(
-                size, segment, offset))
+            raise IntegrityError(f'Invalid segment entry size {size} - too big [segment {segment}, offset {offset}]')
         if size < fmt.size:
         if size < fmt.size:
-            raise IntegrityError('Invalid segment entry size {} - too small [segment {}, offset {}]'.format(
-                size, segment, offset))
-        if tag in (TAG_PUT, TAG_DELETE):
+            raise IntegrityError(f'Invalid segment entry size {size} - too small [segment {segment}, offset {offset}]')
+        if tag not in (TAG_PUT2, TAG_DELETE, TAG_COMMIT, TAG_PUT):
+            raise IntegrityError(f'Invalid segment entry header, did not get a known tag '
+                                 f'[segment {segment}, offset {offset}]')
+        if tag not in acceptable_tags:
+            raise IntegrityError(f'Invalid segment entry header, did not get acceptable tag '
+                                 f'[segment {segment}, offset {offset}]')
+        if tag == TAG_COMMIT:
+            check_crc32(crc, header)
+            # that's all for COMMITs.
+        else:
+            # all other tags (TAG_PUT2, TAG_DELETE, TAG_PUT) have a key
             key = fd.read(32)
             key = fd.read(32)
             length -= 32
             length -= 32
             if len(key) != 32:
             if len(key) != 32:
-                raise IntegrityError(
-                    'Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
-                        segment, offset, 32, len(key)))
-        else:
-            key = None
-        if read_data and tag == TAG_PUT:
-            data = fd.read(length)
-            if len(data) != length:
-                raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
-                    segment, offset, length, len(data)))
-            if crc32(data, crc32(key, crc32(memoryview(header)[4:]))) & 0xffffffff != crc:
-                raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
-                    segment, offset))
-        else:
-            data = None
-            if length > 0:
-                oldpos = fd.tell()
-                seeked = fd.seek(length, os.SEEK_CUR) - oldpos
-                if seeked != length:
-                    raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format(
-                            segment, offset, length, seeked))
-        if tag not in acceptable_tags:
-            raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
-                segment, offset))
+                raise IntegrityError(f'Segment entry key short read [segment {segment}, offset {offset}]: '
+                                     f'expected {32}, got {len(key)} bytes')
+            if tag == TAG_DELETE:
+                check_crc32(crc, header, key)
+                # that's all for DELETEs.
+            else:
+                # TAG_PUT: we can not do a crc32 header check here, because the crc32 is computed over header+data!
+                #          for the check, see code below when read_data is True.
+                if tag == TAG_PUT2:
+                    entry_hash = fd.read(self.ENTRY_HASH_SIZE)
+                    length -= self.ENTRY_HASH_SIZE
+                    if len(entry_hash) != self.ENTRY_HASH_SIZE:
+                        raise IntegrityError(f'Segment entry hash short read [segment {segment}, offset {offset}]: '
+                                             f'expected {self.ENTRY_HASH_SIZE}, got {len(entry_hash)} bytes')
+                    check_crc32(crc, header, key, entry_hash)
+                if not read_data:  # seek over data
+                    oldpos = fd.tell()
+                    seeked = fd.seek(length, os.SEEK_CUR) - oldpos
+                    if seeked != length:
+                        raise IntegrityError(f'Segment entry data short seek [segment {segment}, offset {offset}]: '
+                                             f'expected {length}, got {seeked} bytes')
+                else:  # read data!
+                    data = fd.read(length)
+                    if len(data) != length:
+                        raise IntegrityError(f'Segment entry data short read [segment {segment}, offset {offset}]: '
+                                             f'expected {length}, got {len(data)} bytes')
+                    if tag == TAG_PUT2:
+                        if self.entry_hash(memoryview(header)[4:], key, data) != entry_hash:
+                            raise IntegrityError(f'Segment entry hash mismatch [segment {segment}, offset {offset}]')
+                    elif tag == TAG_PUT:
+                        check_crc32(crc, header, key, data)
         return size, tag, key, data
         return size, tag, key, data
 
 
     def write_put(self, id, data, raise_full=False):
     def write_put(self, id, data, raise_full=False):
@@ -1612,11 +1681,13 @@ class LoggedIO:
             # this would push the segment entry size beyond MAX_OBJECT_SIZE.
             # this would push the segment entry size beyond MAX_OBJECT_SIZE.
             raise IntegrityError(f'More than allowed put data [{data_size} > {MAX_DATA_SIZE}]')
             raise IntegrityError(f'More than allowed put data [{data_size} > {MAX_DATA_SIZE}]')
         fd = self.get_write_fd(want_new=(id == Manifest.MANIFEST_ID), raise_full=raise_full)
         fd = self.get_write_fd(want_new=(id == Manifest.MANIFEST_ID), raise_full=raise_full)
-        size = data_size + self.HEADER_ID_SIZE
+        size = data_size + self.HEADER_ID_SIZE + self.ENTRY_HASH_SIZE
         offset = self.offset
         offset = self.offset
-        header = self.header_no_crc_fmt.pack(size, TAG_PUT)
-        crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
-        fd.write(b''.join((crc, header, id, data)))
+        header = self.header_no_crc_fmt.pack(size, TAG_PUT2)
+        entry_hash = self.entry_hash(header, id, data)
+        crc = self.crc_fmt.pack(crc32(entry_hash, crc32(id, crc32(header))) & 0xffffffff)
+        fd.write(b''.join((crc, header, id, entry_hash)))
+        fd.write(data)
         self.offset += size
         self.offset += size
         return self.segment, offset
         return self.segment, offset
 
 
@@ -1641,4 +1712,4 @@ class LoggedIO:
         return self.segment - 1  # close_segment() increments it
         return self.segment - 1  # close_segment() increments it
 
 
 
 
-assert LoggedIO.HEADER_ID_SIZE == 41  # see constants.MAX_OBJECT_SIZE
+assert LoggedIO.HEADER_ID_SIZE + LoggedIO.ENTRY_HASH_SIZE == 41 + 8  # see constants.MAX_OBJECT_SIZE

+ 19 - 19
src/borg/testsuite/repository.py

@@ -14,7 +14,7 @@ from ..helpers import IntegrityError
 from ..helpers import msgpack
 from ..helpers import msgpack
 from ..locking import Lock, LockFailed
 from ..locking import Lock, LockFailed
 from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line
 from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line
-from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT, TAG_COMMIT
+from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT2, TAG_PUT, TAG_COMMIT
 from . import BaseTestCase
 from . import BaseTestCase
 from .hashindex import H
 from .hashindex import H
 
 
@@ -58,7 +58,7 @@ class RepositoryTestCaseBase(BaseTestCase):
         label = label + ': ' if label is not None else ''
         label = label + ': ' if label is not None else ''
         H_trans = {H(i): i for i in range(10)}
         H_trans = {H(i): i for i in range(10)}
         H_trans[None] = -1  # key == None appears in commits
         H_trans[None] = -1  # key == None appears in commits
-        tag_trans = {TAG_PUT: 'put', TAG_DELETE: 'del', TAG_COMMIT: 'comm'}
+        tag_trans = {TAG_PUT2: 'put2', TAG_PUT: 'put', TAG_DELETE: 'del', TAG_COMMIT: 'comm'}
         for segment, fn in self.repository.io.segment_iterator():
         for segment, fn in self.repository.io.segment_iterator():
             for tag, key, offset, size in self.repository.io.iter_objects(segment):
             for tag, key, offset, size in self.repository.io.iter_objects(segment):
                 print("%s%s H(%d) -> %s[%d..+%d]" % (label, tag_trans[tag], H_trans[key], fn, offset, size))
                 print("%s%s H(%d) -> %s[%d..+%d]" % (label, tag_trans[tag], H_trans[key], fn, offset, size))
@@ -185,13 +185,13 @@ class LocalRepositoryTestCase(RepositoryTestCaseBase):
 
 
     def _assert_sparse(self):
     def _assert_sparse(self):
         # The superseded 123456... PUT
         # The superseded 123456... PUT
-        assert self.repository.compact[0] == 41 + 9
+        assert self.repository.compact[0] == 41 + 8 + 9
         # a COMMIT
         # a COMMIT
         assert self.repository.compact[1] == 9
         assert self.repository.compact[1] == 9
         # The DELETE issued by the superseding PUT (or issued directly)
         # The DELETE issued by the superseding PUT (or issued directly)
         assert self.repository.compact[2] == 41
         assert self.repository.compact[2] == 41
         self.repository._rebuild_sparse(0)
         self.repository._rebuild_sparse(0)
-        assert self.repository.compact[0] == 41 + 9
+        assert self.repository.compact[0] == 41 + 8 + 9
 
 
     def test_sparse1(self):
     def test_sparse1(self):
         self.repository.put(H(0), b'foo')
         self.repository.put(H(0), b'foo')
@@ -213,10 +213,10 @@ class LocalRepositoryTestCase(RepositoryTestCaseBase):
         self.repository.io._write_fd.sync()
         self.repository.io._write_fd.sync()
 
 
         # The on-line tracking works on a per-object basis...
         # The on-line tracking works on a per-object basis...
-        assert self.repository.compact[0] == 41 + 41 + 4
+        assert self.repository.compact[0] == 41 + 8 + 41 + 4
         self.repository._rebuild_sparse(0)
         self.repository._rebuild_sparse(0)
         # ...while _rebuild_sparse can mark whole segments as completely sparse (which then includes the segment magic)
         # ...while _rebuild_sparse can mark whole segments as completely sparse (which then includes the segment magic)
-        assert self.repository.compact[0] == 41 + 41 + 4 + len(MAGIC)
+        assert self.repository.compact[0] == 41 + 8 + 41 + 4 + len(MAGIC)
 
 
         self.repository.commit(compact=True)
         self.repository.commit(compact=True)
         assert 0 not in [segment for segment, _ in self.repository.io.segment_iterator()]
         assert 0 not in [segment for segment, _ in self.repository.io.segment_iterator()]
@@ -459,42 +459,42 @@ class QuotaTestCase(RepositoryTestCaseBase):
     def test_tracking(self):
     def test_tracking(self):
         assert self.repository.storage_quota_use == 0
         assert self.repository.storage_quota_use == 0
         self.repository.put(H(1), bytes(1234))
         self.repository.put(H(1), bytes(1234))
-        assert self.repository.storage_quota_use == 1234 + 41
+        assert self.repository.storage_quota_use == 1234 + 41 + 8
         self.repository.put(H(2), bytes(5678))
         self.repository.put(H(2), bytes(5678))
-        assert self.repository.storage_quota_use == 1234 + 5678 + 2 * 41
+        assert self.repository.storage_quota_use == 1234 + 5678 + 2 * (41 + 8)
         self.repository.delete(H(1))
         self.repository.delete(H(1))
-        assert self.repository.storage_quota_use == 1234 + 5678 + 2 * 41  # we have not compacted yet
+        assert self.repository.storage_quota_use == 1234 + 5678 + 2 * (41 + 8)  # we have not compacted yet
         self.repository.commit(compact=False)
         self.repository.commit(compact=False)
-        assert self.repository.storage_quota_use == 1234 + 5678 + 2 * 41  # we have not compacted yet
+        assert self.repository.storage_quota_use == 1234 + 5678 + 2 * (41 + 8)  # we have not compacted yet
         self.reopen()
         self.reopen()
         with self.repository:
         with self.repository:
             # Open new transaction; hints and thus quota data is not loaded unless needed.
             # Open new transaction; hints and thus quota data is not loaded unless needed.
             self.repository.put(H(3), b'')
             self.repository.put(H(3), b'')
             self.repository.delete(H(3))
             self.repository.delete(H(3))
-            assert self.repository.storage_quota_use == 1234 + 5678 + 3 * 41  # we have not compacted yet
+            assert self.repository.storage_quota_use == 1234 + 5678 + 3 * (41 + 8)  # we have not compacted yet
             self.repository.commit(compact=True)
             self.repository.commit(compact=True)
-            assert self.repository.storage_quota_use == 5678 + 41
+            assert self.repository.storage_quota_use == 5678 + 41 + 8
 
 
     def test_exceed_quota(self):
     def test_exceed_quota(self):
         assert self.repository.storage_quota_use == 0
         assert self.repository.storage_quota_use == 0
-        self.repository.storage_quota = 50
+        self.repository.storage_quota = 80
         self.repository.put(H(1), b'')
         self.repository.put(H(1), b'')
-        assert self.repository.storage_quota_use == 41
+        assert self.repository.storage_quota_use == 41 + 8
         self.repository.commit(compact=False)
         self.repository.commit(compact=False)
         with pytest.raises(Repository.StorageQuotaExceeded):
         with pytest.raises(Repository.StorageQuotaExceeded):
             self.repository.put(H(2), b'')
             self.repository.put(H(2), b'')
-        assert self.repository.storage_quota_use == 82
+        assert self.repository.storage_quota_use == (41 + 8) * 2
         with pytest.raises(Repository.StorageQuotaExceeded):
         with pytest.raises(Repository.StorageQuotaExceeded):
             self.repository.commit(compact=False)
             self.repository.commit(compact=False)
-        assert self.repository.storage_quota_use == 82
+        assert self.repository.storage_quota_use == (41 + 8) * 2
         self.reopen()
         self.reopen()
         with self.repository:
         with self.repository:
-            self.repository.storage_quota = 100
+            self.repository.storage_quota = 150
             # Open new transaction; hints and thus quota data is not loaded unless needed.
             # Open new transaction; hints and thus quota data is not loaded unless needed.
             self.repository.put(H(1), b'')
             self.repository.put(H(1), b'')
-            assert self.repository.storage_quota_use == 82  # we have 2 puts for H(1) here and not yet compacted.
+            assert self.repository.storage_quota_use == (41 + 8) * 2  # we have 2 puts for H(1) here and not yet compacted.
             self.repository.commit(compact=True)
             self.repository.commit(compact=True)
-            assert self.repository.storage_quota_use == 41  # now we have compacted.
+            assert self.repository.storage_quota_use == 41 + 8  # now we have compacted.
 
 
 
 
 class NonceReservation(RepositoryTestCaseBase):
 class NonceReservation(RepositoryTestCaseBase):