|
@@ -6,6 +6,7 @@ import os
|
|
|
import shutil
|
|
|
import sys
|
|
|
from argparse import ArgumentTypeError
|
|
|
+from contextlib import contextmanager
|
|
|
from datetime import datetime, timezone, timedelta
|
|
|
from io import StringIO, BytesIO
|
|
|
|
|
@@ -14,6 +15,8 @@ import pytest
|
|
|
from ..archiver.prune_cmd import prune_within, prune_split
|
|
|
from .. import platform
|
|
|
from ..constants import * # NOQA
|
|
|
+from ..constants import CACHE_TAG_NAME, CACHE_TAG_CONTENTS
|
|
|
+from ..helpers.fs import dir_is_tagged
|
|
|
from ..helpers import Location
|
|
|
from ..helpers import Buffer
|
|
|
from ..helpers import (
|
|
@@ -1519,3 +1522,125 @@ def test_ec_invalid():
|
|
|
)
|
|
|
def test_max_ec(ec1, ec2, ec_max):
|
|
|
assert max_ec(ec1, ec2) == ec_max
|
|
|
+
|
|
|
+
|
|
|
+def test_dir_is_tagged(tmpdir):
|
|
|
+ """Test dir_is_tagged with both path-based and file descriptor-based operations."""
|
|
|
+
|
|
|
+ @contextmanager
|
|
|
+ def open_dir(path):
|
|
|
+ fd = os.open(path, os.O_RDONLY)
|
|
|
+ try:
|
|
|
+ yield fd
|
|
|
+ finally:
|
|
|
+ os.close(fd)
|
|
|
+
|
|
|
+ # Create directories for testing exclude_caches
|
|
|
+ cache_dir = tmpdir.mkdir("cache_dir")
|
|
|
+ cache_tag_path = cache_dir.join(CACHE_TAG_NAME)
|
|
|
+ cache_tag_path.write_binary(CACHE_TAG_CONTENTS)
|
|
|
+
|
|
|
+ invalid_cache_dir = tmpdir.mkdir("invalid_cache_dir")
|
|
|
+ invalid_cache_tag_path = invalid_cache_dir.join(CACHE_TAG_NAME)
|
|
|
+ invalid_cache_tag_path.write_binary(b"invalid signature")
|
|
|
+
|
|
|
+ # Create directories for testing exclude_if_present
|
|
|
+ tagged_dir = tmpdir.mkdir("tagged_dir")
|
|
|
+ tag_file = tagged_dir.join(".NOBACKUP")
|
|
|
+ tag_file.write("test")
|
|
|
+
|
|
|
+ other_tagged_dir = tmpdir.mkdir("other_tagged_dir")
|
|
|
+ other_tag_file = other_tagged_dir.join(".DONOTBACKUP")
|
|
|
+ other_tag_file.write("test")
|
|
|
+
|
|
|
+ # Create a directory with both a CACHEDIR.TAG and a custom tag file
|
|
|
+ both_dir = tmpdir.mkdir("both_dir")
|
|
|
+ cache_tag_path = both_dir.join(CACHE_TAG_NAME)
|
|
|
+ cache_tag_path.write_binary(CACHE_TAG_CONTENTS)
|
|
|
+ custom_tag_path = both_dir.join(".NOBACKUP")
|
|
|
+ custom_tag_path.write("test")
|
|
|
+
|
|
|
+ # Create a directory without any tag files
|
|
|
+ normal_dir = tmpdir.mkdir("normal_dir")
|
|
|
+
|
|
|
+ # Test edge cases
|
|
|
+ test_dir = tmpdir.mkdir("test_dir")
|
|
|
+ assert dir_is_tagged(path=str(test_dir), exclude_caches=None, exclude_if_present=None) == []
|
|
|
+ assert dir_is_tagged(path=str(test_dir), exclude_if_present=[]) == []
|
|
|
+
|
|
|
+ # Test with non-existent directory (should not raise an exception)
|
|
|
+ non_existent_dir = str(tmpdir.join("non_existent"))
|
|
|
+ result = dir_is_tagged(path=non_existent_dir, exclude_caches=True, exclude_if_present=[".NOBACKUP"])
|
|
|
+ assert result == []
|
|
|
+
|
|
|
+ # Test 1: exclude_caches with path-based operations
|
|
|
+ assert dir_is_tagged(path=str(cache_dir), exclude_caches=True) == [CACHE_TAG_NAME]
|
|
|
+ assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=True) == []
|
|
|
+ assert dir_is_tagged(path=str(normal_dir), exclude_caches=True) == []
|
|
|
+
|
|
|
+ assert dir_is_tagged(path=str(cache_dir), exclude_caches=False) == []
|
|
|
+ assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=False) == []
|
|
|
+ assert dir_is_tagged(path=str(normal_dir), exclude_caches=False) == []
|
|
|
+
|
|
|
+ # Test 2: exclude_caches with file-descriptor-based operations
|
|
|
+ with open_dir(str(cache_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == [CACHE_TAG_NAME]
|
|
|
+ with open_dir(str(invalid_cache_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == []
|
|
|
+ with open_dir(str(normal_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == []
|
|
|
+
|
|
|
+ with open_dir(str(cache_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
|
|
|
+ with open_dir(str(invalid_cache_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
|
|
|
+ with open_dir(str(normal_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
|
|
|
+
|
|
|
+ # Test 3: exclude_if_present with path-based operations
|
|
|
+ tags = [".NOBACKUP"]
|
|
|
+ assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"]
|
|
|
+ assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == []
|
|
|
+ assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == []
|
|
|
+
|
|
|
+ tags = [".NOBACKUP", ".DONOTBACKUP"]
|
|
|
+ assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"]
|
|
|
+ assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == [".DONOTBACKUP"]
|
|
|
+ assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == []
|
|
|
+
|
|
|
+ # Test 4: exclude_if_present with file descriptor-based operations
|
|
|
+ tags = [".NOBACKUP"]
|
|
|
+ with open_dir(str(tagged_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"]
|
|
|
+ with open_dir(str(other_tagged_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
|
|
|
+ with open_dir(str(normal_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
|
|
|
+
|
|
|
+ tags = [".NOBACKUP", ".DONOTBACKUP"]
|
|
|
+ with open_dir(str(tagged_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"]
|
|
|
+ with open_dir(str(other_tagged_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".DONOTBACKUP"]
|
|
|
+ with open_dir(str(normal_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
|
|
|
+
|
|
|
+ # Test 5: both exclude types with path-based operations
|
|
|
+ assert sorted(dir_is_tagged(path=str(both_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"])) == [
|
|
|
+ ".NOBACKUP",
|
|
|
+ CACHE_TAG_NAME,
|
|
|
+ ]
|
|
|
+ assert dir_is_tagged(path=str(cache_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME]
|
|
|
+ assert dir_is_tagged(path=str(tagged_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"]
|
|
|
+ assert dir_is_tagged(path=str(normal_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == []
|
|
|
+
|
|
|
+ # Test 6: both exclude types with file descriptor-based operations
|
|
|
+ with open_dir(str(both_dir)) as fd:
|
|
|
+ result = dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"])
|
|
|
+ assert sorted(result) == [".NOBACKUP", CACHE_TAG_NAME]
|
|
|
+ with open_dir(str(cache_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME]
|
|
|
+ with open_dir(str(tagged_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"]
|
|
|
+ with open_dir(str(normal_dir)) as fd:
|
|
|
+ assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == []
|