|
@@ -34,23 +34,23 @@ def test_stats_basic(stats):
|
|
|
assert stats.usize == 20
|
|
|
|
|
|
|
|
|
-def tests_stats_progress(stats, monkeypatch, columns=80):
|
|
|
+@pytest.mark.parametrize(
|
|
|
+ "item_path, update_size, expected_output",
|
|
|
+ [
|
|
|
+ ("", 0, "20 B O 20 B U 1 N "), # test unchanged 'stats' fixture
|
|
|
+ ("foo", 10**3, "1.02 kB O 20 B U 1 N foo"), # test updated original size and set item path
|
|
|
+ # test long item path which exceeds 80 characters
|
|
|
+ ("foo" * 40, 10**3, "1.02 kB O 20 B U 1 N foofoofoofoofoofoofoofoofo...foofoofoofoofoofoofoofoofoofoo"),
|
|
|
+ ],
|
|
|
+)
|
|
|
+def test_stats_progress(item_path, update_size, expected_output, stats, monkeypatch, columns=80):
|
|
|
monkeypatch.setenv("COLUMNS", str(columns))
|
|
|
out = StringIO()
|
|
|
- stats.show_progress(stream=out)
|
|
|
- s = "20 B O 20 B U 1 N "
|
|
|
- buf = " " * (columns - len(s))
|
|
|
- assert out.getvalue() == s + buf + "\r"
|
|
|
+ item = Item(path=item_path) if item_path else None
|
|
|
+ s = expected_output
|
|
|
|
|
|
- out = StringIO()
|
|
|
- stats.update(10**3, unique=False)
|
|
|
- stats.show_progress(item=Item(path="foo"), final=False, stream=out)
|
|
|
- s = "1.02 kB O 20 B U 1 N foo"
|
|
|
- buf = " " * (columns - len(s))
|
|
|
- assert out.getvalue() == s + buf + "\r"
|
|
|
- out = StringIO()
|
|
|
- stats.show_progress(item=Item(path="foo" * 40), final=False, stream=out)
|
|
|
- s = "1.02 kB O 20 B U 1 N foofoofoofoofoofoofoofoofo...foofoofoofoofoofoofoofoofoofoo"
|
|
|
+ stats.update(update_size, unique=False)
|
|
|
+ stats.show_progress(item=item, stream=out)
|
|
|
buf = " " * (columns - len(s))
|
|
|
assert out.getvalue() == s + buf + "\r"
|
|
|
|
|
@@ -103,6 +103,22 @@ def test_stats_progress_json(stats):
|
|
|
assert "nfiles" not in result
|
|
|
|
|
|
|
|
|
+@pytest.mark.parametrize(
|
|
|
+ "isoformat, expected",
|
|
|
+ [
|
|
|
+ ("1970-01-01T00:00:01.000001", datetime(1970, 1, 1, 0, 0, 1, 1, timezone.utc)), # test with microseconds
|
|
|
+ ("1970-01-01T00:00:01", datetime(1970, 1, 1, 0, 0, 1, 0, timezone.utc)), # test without microseconds
|
|
|
+ ],
|
|
|
+)
|
|
|
+def test_timestamp_parsing(monkeypatch, isoformat, expected):
|
|
|
+ repository = Mock()
|
|
|
+ key = PlaintextKey(repository)
|
|
|
+ manifest = Manifest(key, repository)
|
|
|
+ a = Archive(manifest, "test", create=True)
|
|
|
+ a.metadata = ArchiveItem(time=isoformat)
|
|
|
+ assert a.ts == expected
|
|
|
+
|
|
|
+
|
|
|
class MockCache:
|
|
|
class MockRepo:
|
|
|
def async_response(self, wait=True):
|
|
@@ -117,24 +133,8 @@ class MockCache:
|
|
|
return id, len(data)
|
|
|
|
|
|
|
|
|
-class ArchiveTimestampTestCase(BaseTestCase):
|
|
|
- def _test_timestamp_parsing(self, isoformat, expected):
|
|
|
- repository = Mock()
|
|
|
- key = PlaintextKey(repository)
|
|
|
- manifest = Manifest(key, repository)
|
|
|
- a = Archive(manifest, "test", create=True)
|
|
|
- a.metadata = ArchiveItem(time=isoformat)
|
|
|
- self.assert_equal(a.ts, expected)
|
|
|
-
|
|
|
- def test_with_microseconds(self):
|
|
|
- self._test_timestamp_parsing("1970-01-01T00:00:01.000001", datetime(1970, 1, 1, 0, 0, 1, 1, timezone.utc))
|
|
|
-
|
|
|
- def test_without_microseconds(self):
|
|
|
- self._test_timestamp_parsing("1970-01-01T00:00:01", datetime(1970, 1, 1, 0, 0, 1, 0, timezone.utc))
|
|
|
-
|
|
|
-
|
|
|
class ChunkBufferTestCase(BaseTestCase):
|
|
|
- def test(self):
|
|
|
+ def test_cache_chunk_buffer(self):
|
|
|
data = [Item(path="p1"), Item(path="p2")]
|
|
|
cache = MockCache()
|
|
|
key = PlaintextKey(None)
|
|
@@ -149,7 +149,7 @@ class ChunkBufferTestCase(BaseTestCase):
|
|
|
unpacker.feed(cache.objects[id])
|
|
|
self.assert_equal(data, [Item(internal_dict=d) for d in unpacker])
|
|
|
|
|
|
- def test_partial(self):
|
|
|
+ def test_partial_cache_chunk_buffer(self):
|
|
|
big = "0123456789abcdefghijklmnopqrstuvwxyz" * 25000
|
|
|
data = [Item(path="full", target=big), Item(path="partial", target=big)]
|
|
|
cache = MockCache()
|