Browse Source

removed TestCaseBase from testsuite/archive.py

bigtedde 2 năm trước cách đây
mục cha
commit
d2f32986f3
1 tập tin đã thay đổi với 92 bổ sung92 xóa
  1. 92 92
      src/borg/testsuite/archive.py

+ 92 - 92
src/borg/testsuite/archive.py

@@ -7,7 +7,6 @@ from unittest.mock import Mock
 
 import pytest
 
-from . import BaseTestCase
 from . import rejected_dotdot_paths
 from ..crypto.key import PlaintextKey
 from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics
@@ -133,97 +132,98 @@ class MockCache:
         return id, len(data)
 
 
-class ChunkBufferTestCase(BaseTestCase):
-    def test_cache_chunk_buffer(self):
-        data = [Item(path="p1"), Item(path="p2")]
-        cache = MockCache()
-        key = PlaintextKey(None)
-        chunks = CacheChunkBuffer(cache, key, None)
-        for d in data:
-            chunks.add(d)
-            chunks.flush()
-        chunks.flush(flush=True)
-        self.assert_equal(len(chunks.chunks), 2)
-        unpacker = msgpack.Unpacker()
-        for id in chunks.chunks:
-            unpacker.feed(cache.objects[id])
-        self.assert_equal(data, [Item(internal_dict=d) for d in unpacker])
-
-    def test_partial_cache_chunk_buffer(self):
-        big = "0123456789abcdefghijklmnopqrstuvwxyz" * 25000
-        data = [Item(path="full", target=big), Item(path="partial", target=big)]
-        cache = MockCache()
-        key = PlaintextKey(None)
-        chunks = CacheChunkBuffer(cache, key, None)
-        for d in data:
-            chunks.add(d)
-        chunks.flush(flush=False)
-        # the code is expected to leave the last partial chunk in the buffer
-        self.assert_equal(len(chunks.chunks), 3)
-        assert chunks.buffer.tell() > 0
-        # now really flush
-        chunks.flush(flush=True)
-        self.assert_equal(len(chunks.chunks), 4)
-        assert chunks.buffer.tell() == 0
-        unpacker = msgpack.Unpacker()
-        for id in chunks.chunks:
-            unpacker.feed(cache.objects[id])
-        self.assert_equal(data, [Item(internal_dict=d) for d in unpacker])
-
-
-class RobustUnpackerTestCase(BaseTestCase):
-    def make_chunks(self, items):
-        return b"".join(msgpack.packb({"path": item}) for item in items)
-
-    def _validator(self, value):
-        return isinstance(value, dict) and value.get("path") in ("foo", "bar", "boo", "baz")
-
-    def process(self, input):
-        unpacker = RobustUnpacker(validator=self._validator, item_keys=ITEM_KEYS)
-        result = []
-        for should_sync, chunks in input:
-            if should_sync:
-                unpacker.resync()
-            for data in chunks:
-                unpacker.feed(data)
-                for item in unpacker:
-                    result.append(item)
-        return result
-
-    def test_extra_garbage_no_sync(self):
-        chunks = [
-            (False, [self.make_chunks(["foo", "bar"])]),
-            (False, [b"garbage"] + [self.make_chunks(["boo", "baz"])]),
-        ]
-        result = self.process(chunks)
-        self.assert_equal(
-            result, [{"path": "foo"}, {"path": "bar"}, 103, 97, 114, 98, 97, 103, 101, {"path": "boo"}, {"path": "baz"}]
-        )
-
-    def split(self, left, length):
-        parts = []
-        while left:
-            parts.append(left[:length])
-            left = left[length:]
-        return parts
-
-    def test_correct_stream(self):
-        chunks = self.split(self.make_chunks(["foo", "bar", "boo", "baz"]), 2)
-        input = [(False, chunks)]
-        result = self.process(input)
-        self.assert_equal(result, [{"path": "foo"}, {"path": "bar"}, {"path": "boo"}, {"path": "baz"}])
-
-    def test_missing_chunk(self):
-        chunks = self.split(self.make_chunks(["foo", "bar", "boo", "baz"]), 4)
-        input = [(False, chunks[:3]), (True, chunks[4:])]
-        result = self.process(input)
-        self.assert_equal(result, [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}])
-
-    def test_corrupt_chunk(self):
-        chunks = self.split(self.make_chunks(["foo", "bar", "boo", "baz"]), 4)
-        input = [(False, chunks[:3]), (True, [b"gar", b"bage"] + chunks[3:])]
-        result = self.process(input)
-        self.assert_equal(result, [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}])
+def test_cache_chunk_buffer():
+    data = [Item(path="p1"), Item(path="p2")]
+    cache = MockCache()
+    key = PlaintextKey(None)
+    chunks = CacheChunkBuffer(cache, key, None)
+    for d in data:
+        chunks.add(d)
+        chunks.flush()
+    chunks.flush(flush=True)
+    assert len(chunks.chunks) == 2
+    unpacker = msgpack.Unpacker()
+    for id in chunks.chunks:
+        unpacker.feed(cache.objects[id])
+    assert data == [Item(internal_dict=d) for d in unpacker]
+
+
+def test_partial_cache_chunk_buffer():
+    big = "0123456789abcdefghijklmnopqrstuvwxyz" * 25000
+    data = [Item(path="full", target=big), Item(path="partial", target=big)]
+    cache = MockCache()
+    key = PlaintextKey(None)
+    chunks = CacheChunkBuffer(cache, key, None)
+    for d in data:
+        chunks.add(d)
+    chunks.flush(flush=False)
+    # the code is expected to leave the last partial chunk in the buffer
+    assert len(chunks.chunks) == 3
+    assert chunks.buffer.tell() > 0
+    # now really flush
+    chunks.flush(flush=True)
+    assert len(chunks.chunks) == 4
+    assert chunks.buffer.tell() == 0
+    unpacker = msgpack.Unpacker()
+    for id in chunks.chunks:
+        unpacker.feed(cache.objects[id])
+    assert data == [Item(internal_dict=d) for d in unpacker]
+
+
+def make_chunks(items):
+    return b"".join(msgpack.packb({"path": item}) for item in items)
+
+
+def _validator(value):
+    return isinstance(value, dict) and value.get("path") in ("foo", "bar", "boo", "baz")
+
+
+def process(input):
+    unpacker = RobustUnpacker(validator=_validator, item_keys=ITEM_KEYS)
+    result = []
+    for should_sync, chunks in input:
+        if should_sync:
+            unpacker.resync()
+        for data in chunks:
+            unpacker.feed(data)
+            for item in unpacker:
+                result.append(item)
+    return result
+
+
+def test_extra_garbage_no_sync():
+    chunks = [(False, [make_chunks(["foo", "bar"])]), (False, [b"garbage"] + [make_chunks(["boo", "baz"])])]
+    res = process(chunks)
+    assert res == [{"path": "foo"}, {"path": "bar"}, 103, 97, 114, 98, 97, 103, 101, {"path": "boo"}, {"path": "baz"}]
+
+
+def split(left, length):
+    parts = []
+    while left:
+        parts.append(left[:length])
+        left = left[length:]
+    return parts
+
+
+def test_correct_stream():
+    chunks = split(make_chunks(["foo", "bar", "boo", "baz"]), 2)
+    input = [(False, chunks)]
+    result = process(input)
+    assert result == [{"path": "foo"}, {"path": "bar"}, {"path": "boo"}, {"path": "baz"}]
+
+
+def test_missing_chunk():
+    chunks = split(make_chunks(["foo", "bar", "boo", "baz"]), 4)
+    input = [(False, chunks[:3]), (True, chunks[4:])]
+    result = process(input)
+    assert result == [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}]
+
+
+def test_corrupt_chunk():
+    chunks = split(make_chunks(["foo", "bar", "boo", "baz"]), 4)
+    input = [(False, chunks[:3]), (True, [b"gar", b"bage"] + chunks[3:])]
+    result = process(input)
+    assert result == [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}]
 
 
 @pytest.fixture