|
@@ -6,6 +6,7 @@ import zlib
|
|
|
|
|
|
from ..hashindex import NSIndex, ChunkIndex
|
|
|
from .. import hashindex
|
|
|
+from ..crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
|
|
|
from . import BaseTestCase
|
|
|
|
|
|
# Note: these tests are part of the self test, do not use or import py.test functionality here.
|
|
@@ -319,6 +320,27 @@ class HashIndexDataTestCase(BaseTestCase):
|
|
|
assert idx1[H(3)] == (ChunkIndex.MAX_VALUE, 6, 7)
|
|
|
|
|
|
|
|
|
+class HashIndexIntegrityTestCase(HashIndexDataTestCase):
|
|
|
+ def write_integrity_checked_index(self, tempdir):
|
|
|
+ idx = self._deserialize_hashindex(self.HASHINDEX)
|
|
|
+ file = os.path.join(tempdir, 'idx')
|
|
|
+ with IntegrityCheckedFile(path=file, write=True) as fd:
|
|
|
+ idx.write(fd)
|
|
|
+ integrity_data = fd.integrity_data
|
|
|
+ assert 'final' in integrity_data
|
|
|
+ assert 'HashHeader' in integrity_data
|
|
|
+ return file, integrity_data
|
|
|
+
|
|
|
+ def test_integrity_checked_file(self):
|
|
|
+ with tempfile.TemporaryDirectory() as tempdir:
|
|
|
+ file, integrity_data = self.write_integrity_checked_index(tempdir)
|
|
|
+ with open(file, 'r+b') as fd:
|
|
|
+ fd.write(b'Foo')
|
|
|
+ with self.assert_raises(FileIntegrityError):
|
|
|
+ with IntegrityCheckedFile(path=file, write=False, integrity_data=integrity_data) as fd:
|
|
|
+ ChunkIndex.read(fd)
|
|
|
+
|
|
|
+
|
|
|
class NSIndexTestCase(BaseTestCase):
|
|
|
def test_nsindex_segment_limit(self):
|
|
|
idx = NSIndex()
|