Browse Source

Merge pull request #6511 from ThomasWaldmann/repo-cleanup

repo code cleanup
TW 3 years ago
parent
commit
7abc62b308
1 changed files with 33 additions and 38 deletions
  1. 33 38
      src/borg/repository.py

+ 33 - 38
src/borg/repository.py

@@ -807,7 +807,7 @@ class Repository:
                         # do not remove entry with empty shadowed_segments list here,
                         # it is needed for shadowed_put_exists code (see below)!
                         pass
-                    self.storage_quota_use -= len(data) + self.io.put_header_fmt.size
+                    self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE
                 elif tag == TAG_DELETE and not in_index:
                     # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
                     # therefore we do not drop the delete, but write it to a current segment.
@@ -1208,7 +1208,7 @@ class Repository:
             # be in the repo index (and we won't need it in the shadow_index).
             self._delete(id, segment, offset, update_shadow_index=False)
         segment, offset = self.io.write_put(id, data)
-        self.storage_quota_use += len(data) + self.io.put_header_fmt.size
+        self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE
         self.segments.setdefault(segment, 0)
         self.segments[segment] += 1
         self.index[id] = segment, offset
@@ -1269,8 +1269,6 @@ class LoggedIO:
 
     header_fmt = struct.Struct('<IIB')
     assert header_fmt.size == 9
-    put_header_fmt = struct.Struct('<IIB32s')
-    assert put_header_fmt.size == 41
     header_no_crc_fmt = struct.Struct('<IB')
     assert header_no_crc_fmt.size == 5
     crc_fmt = struct.Struct('<I')
@@ -1279,6 +1277,8 @@ class LoggedIO:
     _commit = header_no_crc_fmt.pack(9, TAG_COMMIT)
     COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
 
+    HEADER_ID_SIZE = header_fmt.size + 32
+
     def __init__(self, path, limit, segments_per_dir, capacity=90):
         self.path = path
         self.fds = LRUCache(capacity, dispose=self._close_fd)
@@ -1490,7 +1490,7 @@ class LoggedIO:
             offset = MAGIC_LEN
         header = fd.read(self.header_fmt.size)
         while header:
-            size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset,
+            size, tag, key, data = self._read(fd, header, segment, offset,
                                               (TAG_PUT, TAG_DELETE, TAG_COMMIT),
                                               read_data=read_data)
             if include_data:
@@ -1549,31 +1549,25 @@ class LoggedIO:
             self._write_fd.sync()
         fd = self.get_fd(segment)
         fd.seek(offset)
-        header = fd.read(self.put_header_fmt.size)
-        size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ), read_data)
+        header = fd.read(self.header_fmt.size)
+        size, tag, key, data = self._read(fd, header, segment, offset, (TAG_PUT,), read_data)
         if id != key:
             raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
                 segment, offset))
         return data if read_data else size
 
-    def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=True):
+    def _read(self, fd, header, segment, offset, acceptable_tags, read_data=True):
         # some code shared by read() and iter_objects()
-
         # See comment on MAX_TAG_ID for details
         assert max(acceptable_tags) <= MAX_TAG_ID, 'Exceeding MAX_TAG_ID will break backwards compatibility'
-
+        fmt = self.header_fmt
         try:
             hdr_tuple = fmt.unpack(header)
         except struct.error as err:
             raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format(
                 segment, offset, err)) from None
-        if fmt is self.put_header_fmt:
-            crc, size, tag, key = hdr_tuple
-        elif fmt is self.header_fmt:
-            crc, size, tag = hdr_tuple
-            key = None
-        else:
-            raise TypeError("_read called with unsupported format")
+        crc, size, tag = hdr_tuple
+        length = size - fmt.size  # we already read the header
         if size > MAX_OBJECT_SIZE:
             # if you get this on an archive made with borg < 1.0.7 and millions of files and
             # you need to restore it, you can disable this check by using "if False:" above.
@@ -1582,30 +1576,31 @@ class LoggedIO:
         if size < fmt.size:
             raise IntegrityError('Invalid segment entry size {} - too small [segment {}, offset {}]'.format(
                 size, segment, offset))
-        length = size - fmt.size
-        if read_data:
+        if tag in (TAG_PUT, TAG_DELETE):
+            key = fd.read(32)
+            length -= 32
+            if len(key) != 32:
+                raise IntegrityError(
+                    'Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
+                        segment, offset, 32, len(key)))
+        else:
+            key = None
+        if read_data and tag == TAG_PUT:
             data = fd.read(length)
             if len(data) != length:
                 raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
                     segment, offset, length, len(data)))
-            if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
+            if crc32(data, crc32(key, crc32(memoryview(header)[4:]))) & 0xffffffff != crc:
                 raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
                     segment, offset))
-            if key is None and tag in (TAG_PUT, TAG_DELETE):
-                key, data = data[:32], data[32:]
         else:
-            if key is None and tag in (TAG_PUT, TAG_DELETE):
-                key = fd.read(32)
-                length -= 32
-                if len(key) != 32:
-                    raise IntegrityError('Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
-                        segment, offset, 32, len(key)))
-            oldpos = fd.tell()
-            seeked = fd.seek(length, os.SEEK_CUR) - oldpos
             data = None
-            if seeked != length:
-                raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format(
-                        segment, offset, length, seeked))
+            if length > 0:
+                oldpos = fd.tell()
+                seeked = fd.seek(length, os.SEEK_CUR) - oldpos
+                if seeked != length:
+                    raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format(
+                            segment, offset, length, seeked))
         if tag not in acceptable_tags:
             raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
                 segment, offset))
@@ -1617,7 +1612,7 @@ class LoggedIO:
             # this would push the segment entry size beyond MAX_OBJECT_SIZE.
             raise IntegrityError(f'More than allowed put data [{data_size} > {MAX_DATA_SIZE}]')
         fd = self.get_write_fd(want_new=(id == Manifest.MANIFEST_ID), raise_full=raise_full)
-        size = data_size + self.put_header_fmt.size
+        size = data_size + self.HEADER_ID_SIZE
         offset = self.offset
         header = self.header_no_crc_fmt.pack(size, TAG_PUT)
         crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
@@ -1627,11 +1622,11 @@ class LoggedIO:
 
     def write_delete(self, id, raise_full=False):
         fd = self.get_write_fd(want_new=(id == Manifest.MANIFEST_ID), raise_full=raise_full)
-        header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
+        header = self.header_no_crc_fmt.pack(self.HEADER_ID_SIZE, TAG_DELETE)
         crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
         fd.write(b''.join((crc, header, id)))
-        self.offset += self.put_header_fmt.size
-        return self.segment, self.put_header_fmt.size
+        self.offset += self.HEADER_ID_SIZE
+        return self.segment, self.HEADER_ID_SIZE
 
     def write_commit(self, intermediate=False):
         # Intermediate commits go directly into the current segment - this makes checking their validity more
@@ -1646,4 +1641,4 @@ class LoggedIO:
         return self.segment - 1  # close_segment() increments it
 
 
-assert LoggedIO.put_header_fmt.size == 41  # see constants.MAX_OBJECT_SIZE
+assert LoggedIO.HEADER_ID_SIZE == 41  # see constants.MAX_OBJECT_SIZE