|
@@ -12,6 +12,7 @@ import pytest
|
|
|
|
|
|
from ... import platform
|
|
from ... import platform
|
|
from ...constants import * # NOQA
|
|
from ...constants import * # NOQA
|
|
|
|
+from ...constants import zeros
|
|
from ...manifest import Manifest
|
|
from ...manifest import Manifest
|
|
from ...platform import is_cygwin, is_win32, is_darwin
|
|
from ...platform import is_cygwin, is_win32, is_darwin
|
|
from ...repository import Repository
|
|
from ...repository import Repository
|
|
@@ -926,3 +927,127 @@ def test_common_options(archivers, request):
|
|
cmd(archiver, "repo-create", RK_ENCRYPTION)
|
|
cmd(archiver, "repo-create", RK_ENCRYPTION)
|
|
log = cmd(archiver, "--debug", "create", "test", "input")
|
|
log = cmd(archiver, "--debug", "create", "test", "input")
|
|
assert "security: read previous location" in log
|
|
assert "security: read previous location" in log
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def test_create_big_zeros_files(archivers, request):
|
|
|
|
+ """Test creating an archive from 10 files with 10MB zeros each."""
|
|
|
|
+ archiver = request.getfixturevalue(archivers)
|
|
|
|
+ # Create 10 files with 10,000,000 bytes of zeros each
|
|
|
|
+ count, size = 10, 10 * 1000 * 1000
|
|
|
|
+ assert size <= len(zeros)
|
|
|
|
+ for i in range(count):
|
|
|
|
+ create_regular_file(archiver.input_path, f"zeros_{i}", contents=memoryview(zeros)[:size])
|
|
|
|
+ # Create repository and archive
|
|
|
|
+ cmd(archiver, "repo-create", RK_ENCRYPTION)
|
|
|
|
+ cmd(archiver, "create", "test", "input")
|
|
|
|
+
|
|
|
|
+ # Extract the archive to verify contents
|
|
|
|
+ with tempfile.TemporaryDirectory() as extract_path:
|
|
|
|
+ with changedir(extract_path):
|
|
|
|
+ cmd(archiver, "extract", "test")
|
|
|
|
+
|
|
|
|
+ # Verify that the extracted files have the correct contents
|
|
|
|
+ for i in range(count):
|
|
|
|
+ extracted_file_path = os.path.join(extract_path, "input", f"zeros_{i}")
|
|
|
|
+ with open(extracted_file_path, "rb") as f:
|
|
|
|
+ extracted_data = f.read()
|
|
|
|
+ # Verify the file contains only zeros and has the correct size
|
|
|
|
+ assert extracted_data == bytes(size)
|
|
|
|
+ assert len(extracted_data) == size
|
|
|
|
+
|
|
|
|
+ # Also verify the directory structure matches
|
|
|
|
+ assert_dirs_equal(archiver.input_path, os.path.join(extract_path, "input"))
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def test_create_big_random_files(archivers, request):
|
|
|
|
+ """Test creating an archive from 10 files with 10MB random data each."""
|
|
|
|
+ archiver = request.getfixturevalue(archivers)
|
|
|
|
+ # Create 10 files with 10,000,000 bytes of random data each
|
|
|
|
+ count, size = 10, 10 * 1000 * 1000
|
|
|
|
+ random_data = {}
|
|
|
|
+ for i in range(count):
|
|
|
|
+ data = os.urandom(size)
|
|
|
|
+ random_data[i] = data
|
|
|
|
+ create_regular_file(archiver.input_path, f"random_{i}", contents=data)
|
|
|
|
+ # Create repository and archive
|
|
|
|
+ cmd(archiver, "repo-create", RK_ENCRYPTION)
|
|
|
|
+ cmd(archiver, "create", "test", "input")
|
|
|
|
+
|
|
|
|
+ # Extract the archive to verify contents
|
|
|
|
+ with tempfile.TemporaryDirectory() as extract_path:
|
|
|
|
+ with changedir(extract_path):
|
|
|
|
+ cmd(archiver, "extract", "test")
|
|
|
|
+
|
|
|
|
+ # Verify that the extracted files have the correct contents
|
|
|
|
+ for i in range(count):
|
|
|
|
+ extracted_file_path = os.path.join(extract_path, "input", f"random_{i}")
|
|
|
|
+ with open(extracted_file_path, "rb") as f:
|
|
|
|
+ extracted_data = f.read()
|
|
|
|
+ # Verify the file contains the original random data and has the correct size
|
|
|
|
+ assert extracted_data == random_data[i]
|
|
|
|
+ assert len(extracted_data) == size
|
|
|
|
+
|
|
|
|
+ # Also verify the directory structure matches
|
|
|
|
+ assert_dirs_equal(archiver.input_path, os.path.join(extract_path, "input"))
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def test_create_with_compression_algorithms(archivers, request):
|
|
|
|
+ """Test creating archives with different compression algorithms."""
|
|
|
|
+ archiver = request.getfixturevalue(archivers)
|
|
|
|
+
|
|
|
|
+ # Create test files: 5 files with zeros (highly compressible) and 5 with random data (incompressible)
|
|
|
|
+ count, size = 5, 1 * 1000 * 1000 # 1MB per file
|
|
|
|
+ random_data = {}
|
|
|
|
+
|
|
|
|
+ # Create zeros files
|
|
|
|
+ for i in range(count):
|
|
|
|
+ create_regular_file(archiver.input_path, f"zeros_{i}", contents=memoryview(zeros)[:size])
|
|
|
|
+
|
|
|
|
+ # Create random files
|
|
|
|
+ for i in range(count):
|
|
|
|
+ data = os.urandom(size)
|
|
|
|
+ random_data[i] = data
|
|
|
|
+ create_regular_file(archiver.input_path, f"random_{i}", contents=data)
|
|
|
|
+
|
|
|
|
+ # Create repository
|
|
|
|
+ cmd(archiver, "repo-create", RK_ENCRYPTION)
|
|
|
|
+
|
|
|
|
+ # Test different compression algorithms
|
|
|
|
+ algorithms = [
|
|
|
|
+ "none", # No compression
|
|
|
|
+ "lz4", # Fast compression
|
|
|
|
+ "zlib,6", # Medium compression
|
|
|
|
+ "zstd,3", # Good compression/speed balance
|
|
|
|
+ "lzma,6", # High compression
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ for algo in algorithms:
|
|
|
|
+ # Create archive with specific compression algorithm
|
|
|
|
+ archive_name = f"test_{algo.replace(',', '_')}"
|
|
|
|
+ cmd(archiver, "create", "--compression", algo, archive_name, "input")
|
|
|
|
+
|
|
|
|
+ # Extract the archive to verify contents
|
|
|
|
+ with tempfile.TemporaryDirectory() as extract_path:
|
|
|
|
+ with changedir(extract_path):
|
|
|
|
+ cmd(archiver, "extract", archive_name)
|
|
|
|
+
|
|
|
|
+ # Verify zeros files
|
|
|
|
+ for i in range(count):
|
|
|
|
+ extracted_file_path = os.path.join(extract_path, "input", f"zeros_{i}")
|
|
|
|
+ with open(extracted_file_path, "rb") as f:
|
|
|
|
+ extracted_data = f.read()
|
|
|
|
+ # Verify the file contains only zeros and has the correct size
|
|
|
|
+ assert extracted_data == bytes(size)
|
|
|
|
+ assert len(extracted_data) == size
|
|
|
|
+
|
|
|
|
+ # Verify random files
|
|
|
|
+ for i in range(count):
|
|
|
|
+ extracted_file_path = os.path.join(extract_path, "input", f"random_{i}")
|
|
|
|
+ with open(extracted_file_path, "rb") as f:
|
|
|
|
+ extracted_data = f.read()
|
|
|
|
+ # Verify the file contains the original random data and has the correct size
|
|
|
|
+ assert extracted_data == random_data[i]
|
|
|
|
+ assert len(extracted_data) == size
|
|
|
|
+
|
|
|
|
+ # Also verify the directory structure matches
|
|
|
|
+ assert_dirs_equal(archiver.input_path, os.path.join(extract_path, "input"))
|