12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646 |
- import base64
- import errno
- import getpass
- import hashlib
- import os
- import shutil
- import sys
- from argparse import ArgumentTypeError
- from contextlib import contextmanager
- from datetime import datetime, timezone, timedelta
- from io import StringIO, BytesIO
- import pytest
- from ..archiver.prune_cmd import prune_within, prune_split
- from .. import platform
- from ..constants import * # NOQA
- from ..constants import CACHE_TAG_NAME, CACHE_TAG_CONTENTS
- from ..helpers.fs import dir_is_tagged
- from ..helpers import Location
- from ..helpers import Buffer
- from ..helpers import (
- partial_format,
- format_file_size,
- parse_file_size,
- format_timedelta,
- format_line,
- PlaceholderError,
- replace_placeholders,
- )
- from ..helpers import remove_dotdot_prefixes, make_path_safe, clean_lines
- from ..helpers import interval
- from ..helpers import get_base_dir, get_cache_dir, get_keys_dir, get_security_dir, get_config_dir, get_runtime_dir
- from ..helpers import is_slow_msgpack
- from ..helpers import msgpack
- from ..helpers import yes, TRUISH, FALSISH, DEFAULTISH
- from ..helpers import StableDict, bin_to_hex
- from ..helpers import parse_timestamp, ChunkIteratorFileWrapper, ChunkerParams
- from ..helpers import archivename_validator, text_validator
- from ..helpers import ProgressIndicatorPercent
- from ..helpers import swidth_slice
- from ..helpers import chunkit
- from ..helpers import safe_ns, safe_s, SUPPORT_32BIT_PLATFORMS
- from ..helpers import popen_with_error_handling
- from ..helpers import dash_open
- from ..helpers import iter_separated
- from ..helpers import eval_escapes
- from ..helpers import safe_unlink
- from ..helpers import text_to_json, binary_to_json
- from ..helpers import classify_ec, max_ec
- from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded
- from ..platform import is_cygwin, is_win32, is_darwin
- from . import FakeInputs, are_hardlinks_supported
- from . import rejected_dotdot_paths
- def test_bin_to_hex():
- assert bin_to_hex(b"") == ""
- assert bin_to_hex(b"\x00\x01\xff") == "0001ff"
- @pytest.mark.parametrize(
- "key,value",
- [("key", b"\x00\x01\x02\x03"), ("key", b"\x00\x01\x02"), ("key", b"\x00\x01"), ("key", b"\x00"), ("key", b"")],
- )
- def test_binary_to_json(key, value):
- key_b64 = key + "_b64"
- d = binary_to_json(key, value)
- assert key_b64 in d
- assert base64.b64decode(d[key_b64]) == value
- @pytest.mark.parametrize(
- "key,value,strict",
- [
- ("key", "abc", True),
- ("key", "äöü", True),
- ("key", "", True),
- ("key", b"\x00\xff".decode("utf-8", errors="surrogateescape"), False),
- ("key", "äöü".encode("latin1").decode("utf-8", errors="surrogateescape"), False),
- ],
- )
- def test_text_to_json(key, value, strict):
- key_b64 = key + "_b64"
- d = text_to_json(key, value)
- value_b = value.encode("utf-8", errors="surrogateescape")
- if strict:
- # no surrogate-escapes, just unicode text
- assert key in d
- assert d[key] == value_b.decode("utf-8", errors="strict")
- assert d[key].encode("utf-8", errors="strict") == value_b
- assert key_b64 not in d # not needed. pure valid unicode.
- else:
- # requiring surrogate-escapes. text has replacement chars, base64 representation is present.
- assert key in d
- assert d[key] == value.encode("utf-8", errors="replace").decode("utf-8", errors="strict")
- assert d[key].encode("utf-8", errors="strict") == value.encode("utf-8", errors="replace")
- assert key_b64 in d
- assert base64.b64decode(d[key_b64]) == value_b
- class TestLocationWithoutEnv:
- @pytest.fixture
- def keys_dir(self, tmpdir, monkeypatch):
- tmpdir = str(tmpdir)
- monkeypatch.setenv("BORG_KEYS_DIR", tmpdir)
- if not tmpdir.endswith(os.path.sep):
- tmpdir += os.path.sep
- return tmpdir
- def test_ssh(self, monkeypatch, keys_dir):
- monkeypatch.delenv("BORG_REPO", raising=False)
- assert (
- repr(Location("ssh://user@host:1234//absolute/path"))
- == "Location(proto='ssh', user='user', host='host', port=1234, path='/absolute/path')"
- )
- assert Location("ssh://user@host:1234//absolute/path").to_key_filename() == keys_dir + "host___absolute_path"
- assert (
- repr(Location("ssh://user@host:1234/relative/path"))
- == "Location(proto='ssh', user='user', host='host', port=1234, path='relative/path')"
- )
- assert Location("ssh://user@host:1234/relative/path").to_key_filename() == keys_dir + "host__relative_path"
- assert (
- repr(Location("ssh://user@host/relative/path"))
- == "Location(proto='ssh', user='user', host='host', port=None, path='relative/path')"
- )
- assert (
- repr(Location("ssh://user@[::]:1234/relative/path"))
- == "Location(proto='ssh', user='user', host='::', port=1234, path='relative/path')"
- )
- assert Location("ssh://user@[::]:1234/relative/path").to_key_filename() == keys_dir + "____relative_path"
- assert (
- repr(Location("ssh://user@[::]/relative/path"))
- == "Location(proto='ssh', user='user', host='::', port=None, path='relative/path')"
- )
- assert (
- repr(Location("ssh://user@[2001:db8::]:1234/relative/path"))
- == "Location(proto='ssh', user='user', host='2001:db8::', port=1234, path='relative/path')"
- )
- assert (
- Location("ssh://user@[2001:db8::]:1234/relative/path").to_key_filename()
- == keys_dir + "2001_db8____relative_path"
- )
- assert (
- repr(Location("ssh://user@[2001:db8::]/relative/path"))
- == "Location(proto='ssh', user='user', host='2001:db8::', port=None, path='relative/path')"
- )
- assert (
- repr(Location("ssh://user@[2001:db8::c0:ffee]:1234/relative/path"))
- == "Location(proto='ssh', user='user', host='2001:db8::c0:ffee', port=1234, path='relative/path')"
- )
- assert (
- repr(Location("ssh://user@[2001:db8::c0:ffee]/relative/path"))
- == "Location(proto='ssh', user='user', host='2001:db8::c0:ffee', port=None, path='relative/path')"
- )
- assert (
- repr(Location("ssh://user@[2001:db8::192.0.2.1]:1234/relative/path"))
- == "Location(proto='ssh', user='user', host='2001:db8::192.0.2.1', port=1234, path='relative/path')"
- )
- assert (
- repr(Location("ssh://user@[2001:db8::192.0.2.1]/relative/path"))
- == "Location(proto='ssh', user='user', host='2001:db8::192.0.2.1', port=None, path='relative/path')"
- )
- assert (
- Location("ssh://user@[2001:db8::192.0.2.1]/relative/path").to_key_filename()
- == keys_dir + "2001_db8__192_0_2_1__relative_path"
- )
- assert (
- repr(Location("ssh://user@[2a02:0001:0002:0003:0004:0005:0006:0007]/relative/path"))
- == "Location(proto='ssh', user='user', "
- "host='2a02:0001:0002:0003:0004:0005:0006:0007', port=None, path='relative/path')"
- )
- assert (
- repr(Location("ssh://user@[2a02:0001:0002:0003:0004:0005:0006:0007]:1234/relative/path"))
- == "Location(proto='ssh', user='user', "
- "host='2a02:0001:0002:0003:0004:0005:0006:0007', port=1234, path='relative/path')"
- )
- def test_rclone(self, monkeypatch, keys_dir):
- monkeypatch.delenv("BORG_REPO", raising=False)
- assert (
- repr(Location("rclone:remote:path"))
- == "Location(proto='rclone', user=None, host=None, port=None, path='remote:path')"
- )
- assert Location("rclone:remote:path").to_key_filename() == keys_dir + "remote_path"
- def test_sftp(self, monkeypatch, keys_dir):
- monkeypatch.delenv("BORG_REPO", raising=False)
- # relative path
- assert (
- repr(Location("sftp://user@host:1234/rel/path"))
- == "Location(proto='sftp', user='user', host='host', port=1234, path='rel/path')"
- )
- assert Location("sftp://user@host:1234/rel/path").to_key_filename() == keys_dir + "host__rel_path"
- # absolute path
- assert (
- repr(Location("sftp://user@host:1234//abs/path"))
- == "Location(proto='sftp', user='user', host='host', port=1234, path='/abs/path')"
- )
- assert Location("sftp://user@host:1234//abs/path").to_key_filename() == keys_dir + "host___abs_path"
- def test_socket(self, monkeypatch, keys_dir):
- monkeypatch.delenv("BORG_REPO", raising=False)
- assert (
- repr(Location("socket:///repo/path"))
- == "Location(proto='socket', user=None, host=None, port=None, path='/repo/path')"
- )
- assert Location("socket:///some/path").to_key_filename() == keys_dir + "_some_path"
- def test_file(self, monkeypatch, keys_dir):
- monkeypatch.delenv("BORG_REPO", raising=False)
- assert (
- repr(Location("file:///some/path"))
- == "Location(proto='file', user=None, host=None, port=None, path='/some/path')"
- )
- assert (
- repr(Location("file:///some/path"))
- == "Location(proto='file', user=None, host=None, port=None, path='/some/path')"
- )
- assert Location("file:///some/path").to_key_filename() == keys_dir + "_some_path"
- def test_smb(self, monkeypatch, keys_dir):
- monkeypatch.delenv("BORG_REPO", raising=False)
- assert (
- repr(Location("file:////server/share/path"))
- == "Location(proto='file', user=None, host=None, port=None, path='//server/share/path')"
- )
- assert Location("file:////server/share/path").to_key_filename() == keys_dir + "__server_share_path"
- def test_folder(self, monkeypatch, keys_dir):
- monkeypatch.delenv("BORG_REPO", raising=False)
- rel_path = "path"
- abs_path = os.path.abspath(rel_path)
- assert repr(Location(rel_path)) == f"Location(proto='file', user=None, host=None, port=None, path='{abs_path}')"
- assert Location("path").to_key_filename().endswith(rel_path)
- def test_abspath(self, monkeypatch, keys_dir):
- monkeypatch.delenv("BORG_REPO", raising=False)
- assert (
- repr(Location("/absolute/path"))
- == "Location(proto='file', user=None, host=None, port=None, path='/absolute/path')"
- )
- assert Location("/absolute/path").to_key_filename() == keys_dir + "_absolute_path"
- assert (
- repr(Location("ssh://user@host//absolute/path"))
- == "Location(proto='ssh', user='user', host='host', port=None, path='/absolute/path')"
- )
- assert Location("ssh://user@host//absolute/path").to_key_filename() == keys_dir + "host___absolute_path"
- def test_relpath(self, monkeypatch, keys_dir):
- monkeypatch.delenv("BORG_REPO", raising=False)
- # for a local path, borg creates a Location instance with an absolute path
- rel_path = "relative/path"
- abs_path = os.path.abspath(rel_path)
- assert repr(Location(rel_path)) == f"Location(proto='file', user=None, host=None, port=None, path='{abs_path}')"
- assert Location(rel_path).to_key_filename().endswith("relative_path")
- assert (
- repr(Location("ssh://user@host/relative/path"))
- == "Location(proto='ssh', user='user', host='host', port=None, path='relative/path')"
- )
- assert Location("ssh://user@host/relative/path").to_key_filename() == keys_dir + "host__relative_path"
- def test_with_colons(self, monkeypatch, keys_dir):
- monkeypatch.delenv("BORG_REPO", raising=False)
- assert (
- repr(Location("/abs/path:w:cols"))
- == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:w:cols')"
- )
- assert (
- repr(Location("/abs/path:with:colons"))
- == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:with:colons')"
- )
- assert (
- repr(Location("/abs/path:with:colons"))
- == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:with:colons')"
- )
- assert Location("/abs/path:with:colons").to_key_filename() == keys_dir + "_abs_path_with_colons"
- def test_canonical_path(self, monkeypatch):
- monkeypatch.delenv("BORG_REPO", raising=False)
- locations = [
- "relative/path",
- "/absolute/path",
- "file:///absolute/path",
- "socket:///absolute/path",
- "ssh://host/relative/path",
- "ssh://host//absolute/path",
- "ssh://user@host:1234/relative/path",
- "sftp://host/relative/path",
- "sftp://host//absolute/path",
- "sftp://user@host:1234/relative/path",
- "rclone:remote:path",
- ]
- for location in locations:
- assert (
- Location(location).canonical_path() == Location(Location(location).canonical_path()).canonical_path()
- ), ("failed: %s" % location)
- def test_bad_syntax(self):
- with pytest.raises(ValueError):
- # this is invalid due to the 2nd colon, correct: 'ssh://user@host/path'
- Location("ssh://user@host:/path")
- @pytest.mark.parametrize(
- "name",
- [
- "foobar",
- # placeholders
- "foobar-{now}",
- ],
- )
- def test_archivename_ok(name):
- archivename_validator(name) # must not raise an exception
- @pytest.mark.parametrize(
- "name",
- [
- "", # too short
- "x" * 201, # too long
- # invalid chars:
- "foo/bar",
- "foo\\bar",
- ">foo",
- "<foo",
- "|foo",
- 'foo"bar',
- "foo?",
- "*bar",
- "foo\nbar",
- "foo\0bar",
- # leading/trailing blanks
- " foo",
- "bar ",
- # contains surrogate-escapes
- "foo\udc80bar",
- "foo\udcffbar",
- ],
- )
- def test_archivename_invalid(name):
- with pytest.raises(ArgumentTypeError):
- archivename_validator(name)
- @pytest.mark.parametrize("text", ["", "single line", "multi\nline\ncomment"])
- def test_text_ok(text):
- tv = text_validator(max_length=100, name="name")
- tv(text) # must not raise an exception
- @pytest.mark.parametrize(
- "text",
- [
- "x" * 101, # too long
- # invalid chars:
- "foo\0bar",
- # contains surrogate-escapes
- "foo\udc80bar",
- "foo\udcffbar",
- ],
- )
- def test_text_invalid(text):
- tv = text_validator(max_length=100, name="name")
- with pytest.raises(ArgumentTypeError):
- tv(text)
- def test_format_timedelta():
- t0 = datetime(2001, 1, 1, 10, 20, 3, 0)
- t1 = datetime(2001, 1, 1, 12, 20, 4, 100000)
- assert format_timedelta(t1 - t0) == "2 hours 1.100 seconds"
- @pytest.mark.parametrize(
- "chunker_params, expected_return",
- [
- ("default", ("buzhash", 19, 23, 21, 4095)),
- ("19,23,21,4095", ("buzhash", 19, 23, 21, 4095)),
- ("buzhash,19,23,21,4095", ("buzhash", 19, 23, 21, 4095)),
- ("10,23,16,4095", ("buzhash", 10, 23, 16, 4095)),
- ("fixed,4096", ("fixed", 4096, 0)),
- ("fixed,4096,200", ("fixed", 4096, 200)),
- ],
- )
- def test_valid_chunkerparams(chunker_params, expected_return):
- assert ChunkerParams(chunker_params) == expected_return
- @pytest.mark.parametrize(
- "invalid_chunker_params",
- [
- "crap,1,2,3,4", # invalid algo
- "buzhash,5,7,6,4095", # too small min. size
- "buzhash,19,24,21,4095", # too big max. size
- "buzhash,23,19,21,4095", # violates min <= mask <= max
- "fixed,63", # too small block size
- "fixed,%d,%d" % (MAX_DATA_SIZE + 1, 4096), # too big block size
- "fixed,%d,%d" % (4096, MAX_DATA_SIZE + 1), # too big header size
- ],
- )
- def test_invalid_chunkerparams(invalid_chunker_params):
- with pytest.raises(ArgumentTypeError):
- ChunkerParams(invalid_chunker_params)
- @pytest.mark.parametrize(
- "original_path, expected_path",
- [
- (".", "."),
- ("..", "."),
- ("/", "."),
- ("//", "."),
- ("foo", "foo"),
- ("foo/bar", "foo/bar"),
- ("/foo/bar", "foo/bar"),
- ("../foo/bar", "foo/bar"),
- ],
- )
- def test_remove_dotdot_prefixes(original_path, expected_path):
- assert remove_dotdot_prefixes(original_path) == expected_path
- @pytest.mark.parametrize(
- "original_path, expected_path",
- [
- (".", "."),
- ("./", "."),
- ("/foo", "foo"),
- ("//foo", "foo"),
- (".//foo//bar//", "foo/bar"),
- ("/foo/bar", "foo/bar"),
- ("//foo/bar", "foo/bar"),
- ("//foo/./bar", "foo/bar"),
- (".test", ".test"),
- (".test.", ".test."),
- ("..test..", "..test.."),
- ("/te..st/foo/bar", "te..st/foo/bar"),
- ("/..test../abc//", "..test../abc"),
- ],
- )
- def test_valid_make_path_safe(original_path, expected_path):
- assert make_path_safe(original_path) == expected_path
- @pytest.mark.parametrize("path", rejected_dotdot_paths)
- def test_invalid_make_path_safe(path):
- with pytest.raises(ValueError, match="unexpected '..' element in path"):
- make_path_safe(path)
- class MockArchive:
- def __init__(self, ts, id):
- self.ts = ts
- self.id = id
- def __repr__(self):
- return f"{self.id}: {self.ts.isoformat()}"
- # This is the local timezone of the system running the tests.
- # We need this e.g. to construct archive timestamps for the prune tests,
- # because borg prune operates in the local timezone (it first converts the
- # archive timestamp to the local timezone). So, if we want the y/m/d/h/m/s
- # values which prune uses to be exactly the ones we give [and NOT shift them
- # by tzoffset], we need to give the timestamps in the same local timezone.
- # Please note that the timestamps in a real borg archive or manifest are
- # stored in UTC timezone.
- local_tz = datetime.now(tz=timezone.utc).astimezone(tz=None).tzinfo
- @pytest.mark.parametrize(
- "rule,num_to_keep,expected_ids",
- [
- ("yearly", 3, (13, 2, 1)),
- ("monthly", 3, (13, 8, 4)),
- ("weekly", 2, (13, 8)),
- ("daily", 3, (13, 8, 7)),
- ("hourly", 3, (13, 10, 8)),
- ("minutely", 3, (13, 10, 9)),
- ("secondly", 4, (13, 12, 11, 10)),
- ("daily", 0, []),
- ],
- )
- def test_prune_split(rule, num_to_keep, expected_ids):
- def subset(lst, ids):
- return {i for i in lst if i.id in ids}
- archives = [
- # years apart
- MockArchive(datetime(2015, 1, 1, 10, 0, 0, tzinfo=local_tz), 1),
- MockArchive(datetime(2016, 1, 1, 10, 0, 0, tzinfo=local_tz), 2),
- MockArchive(datetime(2017, 1, 1, 10, 0, 0, tzinfo=local_tz), 3),
- # months apart
- MockArchive(datetime(2017, 2, 1, 10, 0, 0, tzinfo=local_tz), 4),
- MockArchive(datetime(2017, 3, 1, 10, 0, 0, tzinfo=local_tz), 5),
- # days apart
- MockArchive(datetime(2017, 3, 2, 10, 0, 0, tzinfo=local_tz), 6),
- MockArchive(datetime(2017, 3, 3, 10, 0, 0, tzinfo=local_tz), 7),
- MockArchive(datetime(2017, 3, 4, 10, 0, 0, tzinfo=local_tz), 8),
- # minutes apart
- MockArchive(datetime(2017, 10, 1, 9, 45, 0, tzinfo=local_tz), 9),
- MockArchive(datetime(2017, 10, 1, 9, 55, 0, tzinfo=local_tz), 10),
- # seconds apart
- MockArchive(datetime(2017, 10, 1, 10, 0, 1, tzinfo=local_tz), 11),
- MockArchive(datetime(2017, 10, 1, 10, 0, 3, tzinfo=local_tz), 12),
- MockArchive(datetime(2017, 10, 1, 10, 0, 5, tzinfo=local_tz), 13),
- ]
- kept_because = {}
- keep = prune_split(archives, rule, num_to_keep, kept_because)
- assert set(keep) == subset(archives, expected_ids)
- for item in keep:
- assert kept_because[item.id][0] == rule
- def test_prune_split_keep_oldest():
- def subset(lst, ids):
- return {i for i in lst if i.id in ids}
- archives = [
- # oldest backup, but not last in its year
- MockArchive(datetime(2018, 1, 1, 10, 0, 0, tzinfo=local_tz), 1),
- # an interim backup
- MockArchive(datetime(2018, 12, 30, 10, 0, 0, tzinfo=local_tz), 2),
- # year-end backups
- MockArchive(datetime(2018, 12, 31, 10, 0, 0, tzinfo=local_tz), 3),
- MockArchive(datetime(2019, 12, 31, 10, 0, 0, tzinfo=local_tz), 4),
- ]
- # Keep oldest when retention target can't otherwise be met
- kept_because = {}
- keep = prune_split(archives, "yearly", 3, kept_because)
- assert set(keep) == subset(archives, [1, 3, 4])
- assert kept_because[1][0] == "yearly[oldest]"
- assert kept_because[3][0] == "yearly"
- assert kept_because[4][0] == "yearly"
- # Otherwise, prune it
- kept_because = {}
- keep = prune_split(archives, "yearly", 2, kept_because)
- assert set(keep) == subset(archives, [3, 4])
- assert kept_because[3][0] == "yearly"
- assert kept_because[4][0] == "yearly"
- def test_prune_split_no_archives():
- archives = []
- kept_because = {}
- keep = prune_split(archives, "yearly", 3, kept_because)
- assert keep == []
- assert kept_because == {}
- @pytest.mark.parametrize(
- "timeframe, num_secs",
- [
- ("5S", 5),
- ("2M", 2 * 60),
- ("1H", 60 * 60),
- ("1d", 24 * 60 * 60),
- ("1w", 7 * 24 * 60 * 60),
- ("1m", 31 * 24 * 60 * 60),
- ("1y", 365 * 24 * 60 * 60),
- ],
- )
- def test_interval(timeframe, num_secs):
- assert interval(timeframe) == num_secs
- @pytest.mark.parametrize(
- "invalid_interval, error_tuple",
- [
- ("H", ('Invalid number "": expected positive integer',)),
- ("-1d", ('Invalid number "-1": expected positive integer',)),
- ("food", ('Invalid number "foo": expected positive integer',)),
- ],
- )
- def test_interval_time_unit(invalid_interval, error_tuple):
- with pytest.raises(ArgumentTypeError) as exc:
- interval(invalid_interval)
- assert exc.value.args == error_tuple
- def test_interval_number():
- with pytest.raises(ArgumentTypeError) as exc:
- interval("5")
- assert exc.value.args == ('Unexpected time unit "5": choose from y, m, w, d, H, M, S',)
- def test_prune_within():
- def subset(lst, indices):
- return {lst[i] for i in indices}
- def dotest(test_archives, within, indices):
- for ta in test_archives, reversed(test_archives):
- kept_because = {}
- keep = prune_within(ta, interval(within), kept_because)
- assert set(keep) == subset(test_archives, indices)
- assert all("within" == kept_because[a.id][0] for a in keep)
- # 1 minute, 1.5 hours, 2.5 hours, 3.5 hours, 25 hours, 49 hours
- test_offsets = [60, 90 * 60, 150 * 60, 210 * 60, 25 * 60 * 60, 49 * 60 * 60]
- now = datetime.now(timezone.utc)
- test_dates = [now - timedelta(seconds=s) for s in test_offsets]
- test_archives = [MockArchive(date, i) for i, date in enumerate(test_dates)]
- dotest(test_archives, "15S", [])
- dotest(test_archives, "2M", [0])
- dotest(test_archives, "1H", [0])
- dotest(test_archives, "2H", [0, 1])
- dotest(test_archives, "3H", [0, 1, 2])
- dotest(test_archives, "24H", [0, 1, 2, 3])
- dotest(test_archives, "26H", [0, 1, 2, 3, 4])
- dotest(test_archives, "2d", [0, 1, 2, 3, 4])
- dotest(test_archives, "50H", [0, 1, 2, 3, 4, 5])
- dotest(test_archives, "3d", [0, 1, 2, 3, 4, 5])
- dotest(test_archives, "1w", [0, 1, 2, 3, 4, 5])
- dotest(test_archives, "1m", [0, 1, 2, 3, 4, 5])
- dotest(test_archives, "1y", [0, 1, 2, 3, 4, 5])
- def test_stable_dict():
- d = StableDict(foo=1, bar=2, boo=3, baz=4)
- assert list(d.items()) == [("bar", 2), ("baz", 4), ("boo", 3), ("foo", 1)]
- assert hashlib.md5(msgpack.packb(d)).hexdigest() == "fc78df42cd60691b3ac3dd2a2b39903f"
- def test_parse_timestamp():
- assert parse_timestamp("2015-04-19T20:25:00.226410") == datetime(2015, 4, 19, 20, 25, 0, 226410, timezone.utc)
- assert parse_timestamp("2015-04-19T20:25:00") == datetime(2015, 4, 19, 20, 25, 0, 0, timezone.utc)
- def test_get_base_dir(monkeypatch):
- """test that get_base_dir respects environment"""
- monkeypatch.delenv("BORG_BASE_DIR", raising=False)
- monkeypatch.delenv("HOME", raising=False)
- monkeypatch.delenv("USER", raising=False)
- assert get_base_dir(legacy=True) == os.path.expanduser("~")
- monkeypatch.setenv("USER", "root")
- assert get_base_dir(legacy=True) == os.path.expanduser("~root")
- monkeypatch.setenv("HOME", "/var/tmp/home")
- assert get_base_dir(legacy=True) == "/var/tmp/home"
- monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
- assert get_base_dir(legacy=True) == "/var/tmp/base"
- # non-legacy is much easier:
- monkeypatch.delenv("BORG_BASE_DIR", raising=False)
- assert get_base_dir(legacy=False) is None
- monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
- assert get_base_dir(legacy=False) == "/var/tmp/base"
- def test_get_base_dir_compat(monkeypatch):
- """test that it works the same for legacy and for non-legacy implementation"""
- monkeypatch.delenv("BORG_BASE_DIR", raising=False)
- # old way: if BORG_BASE_DIR is not set, make something up with HOME/USER/~
- # new way: if BORG_BASE_DIR is not set, return None and let caller deal with it.
- assert get_base_dir(legacy=False) is None
- # new and old way: BORG_BASE_DIR overrides all other "base path determination".
- monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
- assert get_base_dir(legacy=False) == get_base_dir(legacy=True)
- def test_get_config_dir(monkeypatch):
- """test that get_config_dir respects environment"""
- monkeypatch.delenv("BORG_BASE_DIR", raising=False)
- home_dir = os.path.expanduser("~")
- if is_win32:
- monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
- assert get_config_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg")
- monkeypatch.setenv("BORG_CONFIG_DIR", home_dir)
- assert get_config_dir(create=False) == home_dir
- elif is_darwin:
- monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
- assert get_config_dir(create=False) == os.path.join(home_dir, "Library", "Application Support", "borg")
- monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp")
- assert get_config_dir(create=False) == "/var/tmp"
- else:
- monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
- monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
- assert get_config_dir(create=False) == os.path.join(home_dir, ".config", "borg")
- monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/.config")
- assert get_config_dir(create=False) == os.path.join("/var/tmp/.config", "borg")
- monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp")
- assert get_config_dir(create=False) == "/var/tmp"
- def test_get_config_dir_compat(monkeypatch):
- """test that it works the same for legacy and for non-legacy implementation"""
- monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
- monkeypatch.delenv("BORG_BASE_DIR", raising=False)
- monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
- if not is_darwin and not is_win32:
- # fails on macOS: assert '/Users/tw/Library/Application Support/borg' == '/Users/tw/.config/borg'
- # fails on win32 MSYS2 (but we do not need legacy compat there).
- assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
- monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/xdg.config.d")
- # fails on macOS: assert '/Users/tw/Library/Application Support/borg' == '/var/tmp/xdg.config.d'
- # fails on win32 MSYS2 (but we do not need legacy compat there).
- assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
- monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
- assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
- monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp/borg.config.d")
- assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
- def test_get_cache_dir(monkeypatch):
- """test that get_cache_dir respects environment"""
- monkeypatch.delenv("BORG_BASE_DIR", raising=False)
- home_dir = os.path.expanduser("~")
- if is_win32:
- monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
- assert get_cache_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "Cache")
- monkeypatch.setenv("BORG_CACHE_DIR", home_dir)
- assert get_cache_dir(create=False) == home_dir
- elif is_darwin:
- monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
- assert get_cache_dir(create=False) == os.path.join(home_dir, "Library", "Caches", "borg")
- monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp")
- assert get_cache_dir(create=False) == "/var/tmp"
- else:
- monkeypatch.delenv("XDG_CACHE_HOME", raising=False)
- monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
- assert get_cache_dir(create=False) == os.path.join(home_dir, ".cache", "borg")
- monkeypatch.setenv("XDG_CACHE_HOME", "/var/tmp/.cache")
- assert get_cache_dir(create=False) == os.path.join("/var/tmp/.cache", "borg")
- monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp")
- assert get_cache_dir(create=False) == "/var/tmp"
- def test_get_cache_dir_compat(monkeypatch):
- """test that it works the same for legacy and for non-legacy implementation"""
- monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
- monkeypatch.delenv("BORG_BASE_DIR", raising=False)
- monkeypatch.delenv("XDG_CACHE_HOME", raising=False)
- if not is_darwin and not is_win32:
- # fails on macOS: assert '/Users/tw/Library/Caches/borg' == '/Users/tw/.cache/borg'
- # fails on win32 MSYS2 (but we do not need legacy compat there).
- assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
- # fails on macOS: assert '/Users/tw/Library/Caches/borg' == '/var/tmp/xdg.cache.d'
- # fails on win32 MSYS2 (but we do not need legacy compat there).
- monkeypatch.setenv("XDG_CACHE_HOME", "/var/tmp/xdg.cache.d")
- assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
- monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
- assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
- monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp/borg.cache.d")
- assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
- def test_get_keys_dir(monkeypatch):
- """test that get_keys_dir respects environment"""
- monkeypatch.delenv("BORG_BASE_DIR", raising=False)
- home_dir = os.path.expanduser("~")
- if is_win32:
- monkeypatch.delenv("BORG_KEYS_DIR", raising=False)
- assert get_keys_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "keys")
- monkeypatch.setenv("BORG_KEYS_DIR", home_dir)
- assert get_keys_dir(create=False) == home_dir
- elif is_darwin:
- monkeypatch.delenv("BORG_KEYS_DIR", raising=False)
- assert get_keys_dir(create=False) == os.path.join(home_dir, "Library", "Application Support", "borg", "keys")
- monkeypatch.setenv("BORG_KEYS_DIR", "/var/tmp")
- assert get_keys_dir(create=False) == "/var/tmp"
- else:
- monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
- monkeypatch.delenv("BORG_KEYS_DIR", raising=False)
- assert get_keys_dir(create=False) == os.path.join(home_dir, ".config", "borg", "keys")
- monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/.config")
- assert get_keys_dir(create=False) == os.path.join("/var/tmp/.config", "borg", "keys")
- monkeypatch.setenv("BORG_KEYS_DIR", "/var/tmp")
- assert get_keys_dir(create=False) == "/var/tmp"
- def test_get_security_dir(monkeypatch):
- """test that get_security_dir respects environment"""
- monkeypatch.delenv("BORG_BASE_DIR", raising=False)
- home_dir = os.path.expanduser("~")
- if is_win32:
- monkeypatch.delenv("BORG_SECURITY_DIR", raising=False)
- assert get_security_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "security")
- assert get_security_dir(repository_id="1234", create=False) == os.path.join(
- home_dir, "AppData", "Local", "borg", "borg", "security", "1234"
- )
- monkeypatch.setenv("BORG_SECURITY_DIR", home_dir)
- assert get_security_dir(create=False) == home_dir
- elif is_darwin:
- monkeypatch.delenv("BORG_SECURITY_DIR", raising=False)
- assert get_security_dir(create=False) == os.path.join(
- home_dir, "Library", "Application Support", "borg", "security"
- )
- assert get_security_dir(repository_id="1234", create=False) == os.path.join(
- home_dir, "Library", "Application Support", "borg", "security", "1234"
- )
- monkeypatch.setenv("BORG_SECURITY_DIR", "/var/tmp")
- assert get_security_dir(create=False) == "/var/tmp"
- else:
- monkeypatch.delenv("XDG_DATA_HOME", raising=False)
- monkeypatch.delenv("BORG_SECURITY_DIR", raising=False)
- assert get_security_dir(create=False) == os.path.join(home_dir, ".local", "share", "borg", "security")
- assert get_security_dir(repository_id="1234", create=False) == os.path.join(
- home_dir, ".local", "share", "borg", "security", "1234"
- )
- monkeypatch.setenv("XDG_DATA_HOME", "/var/tmp/.config")
- assert get_security_dir(create=False) == os.path.join("/var/tmp/.config", "borg", "security")
- monkeypatch.setenv("BORG_SECURITY_DIR", "/var/tmp")
- assert get_security_dir(create=False) == "/var/tmp"
- def test_get_runtime_dir(monkeypatch):
- """test that get_runtime_dir respects environment"""
- monkeypatch.delenv("BORG_BASE_DIR", raising=False)
- home_dir = os.path.expanduser("~")
- if is_win32:
- monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False)
- assert get_runtime_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "Temp", "borg", "borg")
- monkeypatch.setenv("BORG_RUNTIME_DIR", home_dir)
- assert get_runtime_dir(create=False) == home_dir
- elif is_darwin:
- monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False)
- assert get_runtime_dir(create=False) == os.path.join(home_dir, "Library", "Caches", "TemporaryItems", "borg")
- monkeypatch.setenv("BORG_RUNTIME_DIR", "/var/tmp")
- assert get_runtime_dir(create=False) == "/var/tmp"
- else:
- monkeypatch.delenv("XDG_RUNTIME_DIR", raising=False)
- monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False)
- uid = str(os.getuid())
- assert get_runtime_dir(create=False) in [
- os.path.join("/run/user", uid, "borg"),
- os.path.join("/var/run/user", uid, "borg"),
- os.path.join(f"/tmp/runtime-{uid}", "borg"),
- ]
- monkeypatch.setenv("XDG_RUNTIME_DIR", "/var/tmp/.cache")
- assert get_runtime_dir(create=False) == os.path.join("/var/tmp/.cache", "borg")
- monkeypatch.setenv("BORG_RUNTIME_DIR", "/var/tmp")
- assert get_runtime_dir(create=False) == "/var/tmp"
- @pytest.mark.parametrize(
- "size, fmt",
- [
- (0, "0 B"), # no rounding necessary for those
- (1, "1 B"),
- (142, "142 B"),
- (999, "999 B"),
- (1000, "1.00 kB"), # rounding starts here
- (1001, "1.00 kB"), # should be rounded away
- (1234, "1.23 kB"), # should be rounded down
- (1235, "1.24 kB"), # should be rounded up
- (1010, "1.01 kB"), # rounded down as well
- (999990000, "999.99 MB"), # rounded down
- (999990001, "999.99 MB"), # rounded down
- (999995000, "1.00 GB"), # rounded up to next unit
- (10**6, "1.00 MB"), # and all the remaining units, megabytes
- (10**9, "1.00 GB"), # gigabytes
- (10**12, "1.00 TB"), # terabytes
- (10**15, "1.00 PB"), # petabytes
- (10**18, "1.00 EB"), # exabytes
- (10**21, "1.00 ZB"), # zottabytes
- (10**24, "1.00 YB"), # yottabytes
- (-1, "-1 B"), # negative value
- (-1010, "-1.01 kB"), # negative value with rounding
- ],
- )
- def test_file_size(size, fmt):
- """test the size formatting routines"""
- assert format_file_size(size) == fmt
- @pytest.mark.parametrize(
- "size, fmt",
- [
- (0, "0 B"),
- (2**0, "1 B"),
- (2**10, "1.00 KiB"),
- (2**20, "1.00 MiB"),
- (2**30, "1.00 GiB"),
- (2**40, "1.00 TiB"),
- (2**50, "1.00 PiB"),
- (2**60, "1.00 EiB"),
- (2**70, "1.00 ZiB"),
- (2**80, "1.00 YiB"),
- (-(2**0), "-1 B"),
- (-(2**10), "-1.00 KiB"),
- (-(2**20), "-1.00 MiB"),
- ],
- )
- def test_file_size_iec(size, fmt):
- """test the size formatting routines"""
- assert format_file_size(size, iec=True) == fmt
- @pytest.mark.parametrize(
- "original_size, formatted_size",
- [
- (1234, "1.2 kB"), # rounded down
- (1254, "1.3 kB"), # rounded up
- (999990000, "1.0 GB"), # and not 999.9 MB or 1000.0 MB
- ],
- )
- def test_file_size_precision(original_size, formatted_size):
- assert format_file_size(original_size, precision=1) == formatted_size
- @pytest.mark.parametrize("size, fmt", [(0, "0 B"), (1, "+1 B"), (1234, "+1.23 kB"), (-1, "-1 B"), (-1234, "-1.23 kB")])
- def test_file_size_sign(size, fmt):
- assert format_file_size(size, sign=True) == fmt
- @pytest.mark.parametrize(
- "string, value", [("1", 1), ("20", 20), ("5K", 5000), ("1.75M", 1750000), ("1e+9", 1e9), ("-1T", -1e12)]
- )
- def test_parse_file_size(string, value):
- assert parse_file_size(string) == int(value)
- @pytest.mark.parametrize("string", ("", "5 Äpfel", "4E", "2229 bit", "1B"))
- def test_parse_file_size_invalid(string):
- with pytest.raises(ValueError):
- parse_file_size(string)
- def expected_py_mp_slow_combination():
- """do we expect msgpack to be slow in this environment?"""
- # we need to import upstream msgpack package here, not helpers.msgpack:
- import msgpack
- # msgpack is slow on cygwin
- if is_cygwin:
- return True
- # msgpack < 1.0.6 did not have py312 wheels
- if sys.version_info[:2] == (3, 12) and msgpack.version < (1, 0, 6):
- return True
- # otherwise we expect msgpack to be fast!
- return False
- @pytest.mark.skipif(expected_py_mp_slow_combination(), reason="ignore expected slow msgpack")
- def test_is_slow_msgpack():
- # we need to import upstream msgpack package here, not helpers.msgpack:
- import msgpack
- import msgpack.fallback
- saved_packer = msgpack.Packer
- try:
- msgpack.Packer = msgpack.fallback.Packer
- assert is_slow_msgpack()
- finally:
- msgpack.Packer = saved_packer
- # this tests that we have fast msgpack on test platform:
- assert not is_slow_msgpack()
- class TestBuffer:
- def test_type(self):
- buffer = Buffer(bytearray)
- assert isinstance(buffer.get(), bytearray)
- buffer = Buffer(bytes) # don't do that in practice
- assert isinstance(buffer.get(), bytes)
- def test_len(self):
- buffer = Buffer(bytearray, size=0)
- b = buffer.get()
- assert len(buffer) == len(b) == 0
- buffer = Buffer(bytearray, size=1234)
- b = buffer.get()
- assert len(buffer) == len(b) == 1234
- def test_resize(self):
- buffer = Buffer(bytearray, size=100)
- assert len(buffer) == 100
- b1 = buffer.get()
- buffer.resize(200)
- assert len(buffer) == 200
- b2 = buffer.get()
- assert b2 is not b1 # new, bigger buffer
- buffer.resize(100)
- assert len(buffer) >= 100
- b3 = buffer.get()
- assert b3 is b2 # still same buffer (200)
- buffer.resize(100, init=True)
- assert len(buffer) == 100 # except on init
- b4 = buffer.get()
- assert b4 is not b3 # new, smaller buffer
- def test_limit(self):
- buffer = Buffer(bytearray, size=100, limit=200)
- buffer.resize(200)
- assert len(buffer) == 200
- with pytest.raises(Buffer.MemoryLimitExceeded):
- buffer.resize(201)
- assert len(buffer) == 200
- def test_get(self):
- buffer = Buffer(bytearray, size=100, limit=200)
- b1 = buffer.get(50)
- assert len(b1) >= 50 # == 100
- b2 = buffer.get(100)
- assert len(b2) >= 100 # == 100
- assert b2 is b1 # did not need resizing yet
- b3 = buffer.get(200)
- assert len(b3) == 200
- assert b3 is not b2 # new, resized buffer
- with pytest.raises(Buffer.MemoryLimitExceeded):
- buffer.get(201) # beyond limit
- assert len(buffer) == 200
- def test_yes_input():
- inputs = list(TRUISH)
- input = FakeInputs(inputs)
- for i in inputs:
- assert yes(input=input)
- inputs = list(FALSISH)
- input = FakeInputs(inputs)
- for i in inputs:
- assert not yes(input=input)
- def test_yes_input_defaults():
- inputs = list(DEFAULTISH)
- input = FakeInputs(inputs)
- for i in inputs:
- assert yes(default=True, input=input)
- input = FakeInputs(inputs)
- for i in inputs:
- assert not yes(default=False, input=input)
- def test_yes_input_custom():
- input = FakeInputs(["YES", "SURE", "NOPE"])
- assert yes(truish=("YES",), input=input)
- assert yes(truish=("SURE",), input=input)
- assert not yes(falsish=("NOPE",), input=input)
- def test_yes_env(monkeypatch):
- for value in TRUISH:
- monkeypatch.setenv("OVERRIDE_THIS", value)
- assert yes(env_var_override="OVERRIDE_THIS")
- for value in FALSISH:
- monkeypatch.setenv("OVERRIDE_THIS", value)
- assert not yes(env_var_override="OVERRIDE_THIS")
- def test_yes_env_default(monkeypatch):
- for value in DEFAULTISH:
- monkeypatch.setenv("OVERRIDE_THIS", value)
- assert yes(env_var_override="OVERRIDE_THIS", default=True)
- assert not yes(env_var_override="OVERRIDE_THIS", default=False)
- def test_yes_defaults():
- input = FakeInputs(["invalid", "", " "])
- assert not yes(input=input) # default=False
- assert not yes(input=input)
- assert not yes(input=input)
- input = FakeInputs(["invalid", "", " "])
- assert yes(default=True, input=input)
- assert yes(default=True, input=input)
- assert yes(default=True, input=input)
- input = FakeInputs([])
- assert yes(default=True, input=input)
- assert not yes(default=False, input=input)
- with pytest.raises(ValueError):
- yes(default=None)
- def test_yes_retry():
- input = FakeInputs(["foo", "bar", TRUISH[0]])
- assert yes(retry_msg="Retry: ", input=input)
- input = FakeInputs(["foo", "bar", FALSISH[0]])
- assert not yes(retry_msg="Retry: ", input=input)
- def test_yes_no_retry():
- input = FakeInputs(["foo", "bar", TRUISH[0]])
- assert not yes(retry=False, default=False, input=input)
- input = FakeInputs(["foo", "bar", FALSISH[0]])
- assert yes(retry=False, default=True, input=input)
- def test_yes_output(capfd):
- input = FakeInputs(["invalid", "y", "n"])
- assert yes(msg="intro-msg", false_msg="false-msg", true_msg="true-msg", retry_msg="retry-msg", input=input)
- out, err = capfd.readouterr()
- assert out == ""
- assert "intro-msg" in err
- assert "retry-msg" in err
- assert "true-msg" in err
- assert not yes(msg="intro-msg", false_msg="false-msg", true_msg="true-msg", retry_msg="retry-msg", input=input)
- out, err = capfd.readouterr()
- assert out == ""
- assert "intro-msg" in err
- assert "retry-msg" not in err
- assert "false-msg" in err
- def test_yes_env_output(capfd, monkeypatch):
- env_var = "OVERRIDE_SOMETHING"
- monkeypatch.setenv(env_var, "yes")
- assert yes(env_var_override=env_var)
- out, err = capfd.readouterr()
- assert out == ""
- assert env_var in err
- assert "yes" in err
- def test_progress_percentage(capfd):
- pi = ProgressIndicatorPercent(1000, step=5, start=0, msg="%3.0f%%")
- pi.logger.setLevel("INFO")
- pi.show(0)
- out, err = capfd.readouterr()
- assert err == " 0%\n"
- pi.show(420)
- pi.show(680)
- out, err = capfd.readouterr()
- assert err == " 42%\n 68%\n"
- pi.show(1000)
- out, err = capfd.readouterr()
- assert err == "100%\n"
- pi.finish()
- out, err = capfd.readouterr()
- assert err == "\n"
- def test_progress_percentage_step(capfd):
- pi = ProgressIndicatorPercent(100, step=2, start=0, msg="%3.0f%%")
- pi.logger.setLevel("INFO")
- pi.show()
- out, err = capfd.readouterr()
- assert err == " 0%\n"
- pi.show()
- out, err = capfd.readouterr()
- assert err == "" # no output at 1% as we have step == 2
- pi.show()
- out, err = capfd.readouterr()
- assert err == " 2%\n"
- def test_progress_percentage_quiet(capfd):
- pi = ProgressIndicatorPercent(1000, step=5, start=0, msg="%3.0f%%")
- pi.logger.setLevel("WARN")
- pi.show(0)
- out, err = capfd.readouterr()
- assert err == ""
- pi.show(1000)
- out, err = capfd.readouterr()
- assert err == ""
- pi.finish()
- out, err = capfd.readouterr()
- assert err == ""
- @pytest.mark.parametrize(
- "fmt, items_map, expected_result",
- [
- ("{space:10}", {"space": " "}, " " * 10),
- ("{foobar}", {"bar": "wrong", "foobar": "correct"}, "correct"),
- ("{unknown_key}", {}, "{unknown_key}"),
- ("{key}{{escaped_key}}", {}, "{key}{{escaped_key}}"),
- ("{{escaped_key}}", {"escaped_key": 1234}, "{{escaped_key}}"),
- ],
- )
- def test_partial_format(fmt, items_map, expected_result):
- assert partial_format(fmt, items_map) == expected_result
- def test_chunk_file_wrapper():
- cfw = ChunkIteratorFileWrapper(iter([b"abc", b"def"]))
- assert cfw.read(2) == b"ab"
- assert cfw.read(50) == b"cdef"
- assert cfw.exhausted
- cfw = ChunkIteratorFileWrapper(iter([]))
- assert cfw.read(2) == b""
- assert cfw.exhausted
- def test_chunkit():
- it = chunkit("abcdefg", 3)
- assert next(it) == ["a", "b", "c"]
- assert next(it) == ["d", "e", "f"]
- assert next(it) == ["g"]
- with pytest.raises(StopIteration):
- next(it)
- with pytest.raises(StopIteration):
- next(it)
- it = chunkit("ab", 3)
- assert list(it) == [["a", "b"]]
- it = chunkit("", 3)
- assert list(it) == []
- def test_clean_lines():
- conf = """\
- #comment
- data1 #data1
- data2
- data3
- """.splitlines(
- keepends=True
- )
- assert list(clean_lines(conf)) == ["data1 #data1", "data2", "data3"]
- assert list(clean_lines(conf, lstrip=False)) == ["data1 #data1", "data2", " data3"]
- assert list(clean_lines(conf, rstrip=False)) == ["data1 #data1\n", "data2\n", "data3\n"]
- assert list(clean_lines(conf, remove_empty=False)) == ["data1 #data1", "data2", "", "data3"]
- assert list(clean_lines(conf, remove_comments=False)) == ["#comment", "data1 #data1", "data2", "data3"]
- def test_format_line():
- data = dict(foo="bar baz")
- assert format_line("", data) == ""
- assert format_line("{foo}", data) == "bar baz"
- assert format_line("foo{foo}foo", data) == "foobar bazfoo"
- def test_format_line_erroneous():
- data = dict()
- with pytest.raises(PlaceholderError):
- assert format_line("{invalid}", data)
- with pytest.raises(PlaceholderError):
- assert format_line("{}", data)
- with pytest.raises(PlaceholderError):
- assert format_line("{now!r}", data)
- with pytest.raises(PlaceholderError):
- assert format_line("{now.__class__.__module__.__builtins__}", data)
- def test_replace_placeholders():
- replace_placeholders.reset() # avoid overrides are spoiled by previous tests
- now = datetime.now()
- assert " " not in replace_placeholders("{now}")
- assert int(replace_placeholders("{now:%Y}")) == now.year
- def test_override_placeholders():
- assert replace_placeholders("{uuid4}", overrides={"uuid4": "overridden"}) == "overridden"
- def working_swidth():
- return platform.swidth("선") == 2
- @pytest.mark.skipif(not working_swidth(), reason="swidth() is not supported / active")
- def test_swidth_slice():
- string = "나윤선나윤선나윤선나윤선나윤선"
- assert swidth_slice(string, 1) == ""
- assert swidth_slice(string, -1) == ""
- assert swidth_slice(string, 4) == "나윤"
- assert swidth_slice(string, -4) == "윤선"
- @pytest.mark.skipif(not working_swidth(), reason="swidth() is not supported / active")
- def test_swidth_slice_mixed_characters():
- string = "나윤a선나윤선나윤선나윤선나윤선"
- assert swidth_slice(string, 5) == "나윤a"
- assert swidth_slice(string, 6) == "나윤a"
- def utcfromtimestamp(timestamp):
- """Returns a naive datetime instance representing the timestamp in the UTC timezone"""
- return datetime.fromtimestamp(timestamp, timezone.utc).replace(tzinfo=None)
- def test_safe_timestamps():
- if SUPPORT_32BIT_PLATFORMS:
- # ns fit into int64
- assert safe_ns(2**64) <= 2**63 - 1
- assert safe_ns(-1) == 0
- # s fit into int32
- assert safe_s(2**64) <= 2**31 - 1
- assert safe_s(-1) == 0
- # datetime won't fall over its y10k problem
- beyond_y10k = 2**100
- with pytest.raises(OverflowError):
- utcfromtimestamp(beyond_y10k)
- assert utcfromtimestamp(safe_s(beyond_y10k)) > datetime(2038, 1, 1)
- assert utcfromtimestamp(safe_ns(beyond_y10k) / 1000000000) > datetime(2038, 1, 1)
- else:
- # ns fit into int64
- assert safe_ns(2**64) <= 2**63 - 1
- assert safe_ns(-1) == 0
- # s are so that their ns conversion fits into int64
- assert safe_s(2**64) * 1000000000 <= 2**63 - 1
- assert safe_s(-1) == 0
- # datetime won't fall over its y10k problem
- beyond_y10k = 2**100
- with pytest.raises(OverflowError):
- utcfromtimestamp(beyond_y10k)
- assert utcfromtimestamp(safe_s(beyond_y10k)) > datetime(2262, 1, 1)
- assert utcfromtimestamp(safe_ns(beyond_y10k) / 1000000000) > datetime(2262, 1, 1)
- class TestPopenWithErrorHandling:
- @pytest.mark.skipif(not shutil.which("test"), reason='"test" binary is needed')
- def test_simple(self):
- proc = popen_with_error_handling("test 1")
- assert proc.wait() == 0
- @pytest.mark.skipif(
- shutil.which("borg-foobar-test-notexist"), reason='"borg-foobar-test-notexist" binary exists (somehow?)'
- )
- def test_not_found(self):
- proc = popen_with_error_handling("borg-foobar-test-notexist 1234")
- assert proc is None
- @pytest.mark.parametrize("cmd", ('mismatched "quote', 'foo --bar="baz', ""))
- def test_bad_syntax(self, cmd):
- proc = popen_with_error_handling(cmd)
- assert proc is None
- def test_shell(self):
- with pytest.raises(AssertionError):
- popen_with_error_handling("", shell=True)
- def test_dash_open():
- assert dash_open("-", "r") is sys.stdin
- assert dash_open("-", "w") is sys.stdout
- assert dash_open("-", "rb") is sys.stdin.buffer
- assert dash_open("-", "wb") is sys.stdout.buffer
- def test_iter_separated():
- # newline and utf-8
- sep, items = "\n", ["foo", "bar/baz", "αáčő"]
- fd = StringIO(sep.join(items))
- assert list(iter_separated(fd)) == items
- # null and bogus ending
- sep, items = "\0", ["foo/bar", "baz", "spam"]
- fd = StringIO(sep.join(items) + "\0")
- assert list(iter_separated(fd, sep=sep)) == ["foo/bar", "baz", "spam"]
- # multichar
- sep, items = "SEP", ["foo/bar", "baz", "spam"]
- fd = StringIO(sep.join(items))
- assert list(iter_separated(fd, sep=sep)) == items
- # bytes
- sep, items = b"\n", [b"foo", b"blop\t", b"gr\xe4ezi"]
- fd = BytesIO(sep.join(items))
- assert list(iter_separated(fd)) == items
- def test_eval_escapes():
- assert eval_escapes("\\n\\0\\x23") == "\n\0#"
- assert eval_escapes("äç\\n") == "äç\n"
- @pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported")
- def test_safe_unlink_is_safe(tmpdir):
- contents = b"Hello, world\n"
- victim = tmpdir / "victim"
- victim.write_binary(contents)
- hard_link = tmpdir / "hardlink"
- os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32
- safe_unlink(hard_link)
- assert victim.read_binary() == contents
- @pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported")
- def test_safe_unlink_is_safe_ENOSPC(tmpdir, monkeypatch):
- contents = b"Hello, world\n"
- victim = tmpdir / "victim"
- victim.write_binary(contents)
- hard_link = tmpdir / "hardlink"
- os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32
- def os_unlink(_):
- raise OSError(errno.ENOSPC, "Pretend that we ran out of space")
- monkeypatch.setattr(os, "unlink", os_unlink)
- with pytest.raises(OSError):
- safe_unlink(hard_link)
- assert victim.read_binary() == contents
- class TestPassphrase:
- def test_passphrase_new_verification(self, capsys, monkeypatch):
- monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234aöäü")
- monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "no")
- Passphrase.new()
- out, err = capsys.readouterr()
- assert "1234" not in out
- assert "1234" not in err
- monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "yes")
- passphrase = Passphrase.new()
- out, err = capsys.readouterr()
- assert "3132333461c3b6c3a4c3bc" not in out
- assert "3132333461c3b6c3a4c3bc" in err
- assert passphrase == "1234aöäü"
- monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234/@=")
- Passphrase.new()
- out, err = capsys.readouterr()
- assert "1234/@=" not in out
- assert "1234/@=" in err
- def test_passphrase_new_empty(self, capsys, monkeypatch):
- monkeypatch.delenv("BORG_PASSPHRASE", False)
- monkeypatch.setattr(getpass, "getpass", lambda prompt: "")
- with pytest.raises(PasswordRetriesExceeded):
- Passphrase.new(allow_empty=False)
- out, err = capsys.readouterr()
- assert "must not be blank" in err
- def test_passphrase_new_retries(self, monkeypatch):
- monkeypatch.delenv("BORG_PASSPHRASE", False)
- ascending_numbers = iter(range(20))
- monkeypatch.setattr(getpass, "getpass", lambda prompt: str(next(ascending_numbers)))
- with pytest.raises(PasswordRetriesExceeded):
- Passphrase.new()
- def test_passphrase_repr(self):
- assert "secret" not in repr(Passphrase("secret"))
- def test_passphrase_wrong_debug(self, capsys, monkeypatch):
- passphrase = "wrong_passphrase"
- monkeypatch.setenv("BORG_DEBUG_PASSPHRASE", "YES")
- monkeypatch.setenv("BORG_PASSPHRASE", "env_passphrase")
- monkeypatch.setenv("BORG_PASSCOMMAND", "command")
- monkeypatch.setenv("BORG_PASSPHRASE_FD", "fd_value")
- Passphrase.display_debug_info(passphrase)
- out, err = capsys.readouterr()
- assert "Incorrect passphrase!" in err
- assert passphrase in err
- assert bin_to_hex(passphrase.encode("utf-8")) in err
- assert 'BORG_PASSPHRASE = "env_passphrase"' in err
- assert 'BORG_PASSCOMMAND = "command"' in err
- assert 'BORG_PASSPHRASE_FD = "fd_value"' in err
- monkeypatch.delenv("BORG_DEBUG_PASSPHRASE", raising=False)
- Passphrase.display_debug_info(passphrase)
- out, err = capsys.readouterr()
- assert "Incorrect passphrase!" not in err
- assert passphrase not in err
- def test_verification(self, capsys, monkeypatch):
- passphrase = "test_passphrase"
- hex_value = passphrase.encode("utf-8").hex()
- monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "no")
- Passphrase.verification(passphrase)
- out, err = capsys.readouterr()
- assert passphrase not in err
- monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "yes")
- Passphrase.verification(passphrase)
- out, err = capsys.readouterr()
- assert passphrase in err
- assert hex_value in err
- @pytest.mark.parametrize(
- "ec_range,ec_class",
- (
- # inclusive range start, exclusive range end
- ((0, 1), "success"),
- ((1, 2), "warning"),
- ((2, 3), "error"),
- ((EXIT_ERROR_BASE, EXIT_WARNING_BASE), "error"),
- ((EXIT_WARNING_BASE, EXIT_SIGNAL_BASE), "warning"),
- ((EXIT_SIGNAL_BASE, 256), "signal"),
- ),
- )
- def test_classify_ec(ec_range, ec_class):
- for ec in range(*ec_range):
- classify_ec(ec) == ec_class
- def test_ec_invalid():
- with pytest.raises(ValueError):
- classify_ec(666)
- with pytest.raises(ValueError):
- classify_ec(-1)
- with pytest.raises(TypeError):
- classify_ec(None)
- @pytest.mark.parametrize(
- "ec1,ec2,ec_max",
- (
- # same for modern / legacy
- (EXIT_SUCCESS, EXIT_SUCCESS, EXIT_SUCCESS),
- (EXIT_SUCCESS, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE),
- # legacy exit codes
- (EXIT_SUCCESS, EXIT_WARNING, EXIT_WARNING),
- (EXIT_SUCCESS, EXIT_ERROR, EXIT_ERROR),
- (EXIT_WARNING, EXIT_SUCCESS, EXIT_WARNING),
- (EXIT_WARNING, EXIT_WARNING, EXIT_WARNING),
- (EXIT_WARNING, EXIT_ERROR, EXIT_ERROR),
- (EXIT_WARNING, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE),
- (EXIT_ERROR, EXIT_SUCCESS, EXIT_ERROR),
- (EXIT_ERROR, EXIT_WARNING, EXIT_ERROR),
- (EXIT_ERROR, EXIT_ERROR, EXIT_ERROR),
- (EXIT_ERROR, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE),
- # some modern codes
- (EXIT_SUCCESS, EXIT_WARNING_BASE, EXIT_WARNING_BASE),
- (EXIT_SUCCESS, EXIT_ERROR_BASE, EXIT_ERROR_BASE),
- (EXIT_WARNING_BASE, EXIT_SUCCESS, EXIT_WARNING_BASE),
- (EXIT_WARNING_BASE + 1, EXIT_WARNING_BASE + 2, EXIT_WARNING_BASE + 1),
- (EXIT_WARNING_BASE, EXIT_ERROR_BASE, EXIT_ERROR_BASE),
- (EXIT_WARNING_BASE, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE),
- (EXIT_ERROR_BASE, EXIT_SUCCESS, EXIT_ERROR_BASE),
- (EXIT_ERROR_BASE, EXIT_WARNING_BASE, EXIT_ERROR_BASE),
- (EXIT_ERROR_BASE + 1, EXIT_ERROR_BASE + 2, EXIT_ERROR_BASE + 1),
- (EXIT_ERROR_BASE, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE),
- ),
- )
- def test_max_ec(ec1, ec2, ec_max):
- assert max_ec(ec1, ec2) == ec_max
- def test_dir_is_tagged(tmpdir):
- """Test dir_is_tagged with both path-based and file descriptor-based operations."""
- @contextmanager
- def open_dir(path):
- fd = os.open(path, os.O_RDONLY)
- try:
- yield fd
- finally:
- os.close(fd)
- # Create directories for testing exclude_caches
- cache_dir = tmpdir.mkdir("cache_dir")
- cache_tag_path = cache_dir.join(CACHE_TAG_NAME)
- cache_tag_path.write_binary(CACHE_TAG_CONTENTS)
- invalid_cache_dir = tmpdir.mkdir("invalid_cache_dir")
- invalid_cache_tag_path = invalid_cache_dir.join(CACHE_TAG_NAME)
- invalid_cache_tag_path.write_binary(b"invalid signature")
- # Create directories for testing exclude_if_present
- tagged_dir = tmpdir.mkdir("tagged_dir")
- tag_file = tagged_dir.join(".NOBACKUP")
- tag_file.write("test")
- other_tagged_dir = tmpdir.mkdir("other_tagged_dir")
- other_tag_file = other_tagged_dir.join(".DONOTBACKUP")
- other_tag_file.write("test")
- # Create a directory with both a CACHEDIR.TAG and a custom tag file
- both_dir = tmpdir.mkdir("both_dir")
- cache_tag_path = both_dir.join(CACHE_TAG_NAME)
- cache_tag_path.write_binary(CACHE_TAG_CONTENTS)
- custom_tag_path = both_dir.join(".NOBACKUP")
- custom_tag_path.write("test")
- # Create a directory without any tag files
- normal_dir = tmpdir.mkdir("normal_dir")
- # Test edge cases
- test_dir = tmpdir.mkdir("test_dir")
- assert dir_is_tagged(path=str(test_dir), exclude_caches=None, exclude_if_present=None) == []
- assert dir_is_tagged(path=str(test_dir), exclude_if_present=[]) == []
- # Test with non-existent directory (should not raise an exception)
- non_existent_dir = str(tmpdir.join("non_existent"))
- result = dir_is_tagged(path=non_existent_dir, exclude_caches=True, exclude_if_present=[".NOBACKUP"])
- assert result == []
- # Test 1: exclude_caches with path-based operations
- assert dir_is_tagged(path=str(cache_dir), exclude_caches=True) == [CACHE_TAG_NAME]
- assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=True) == []
- assert dir_is_tagged(path=str(normal_dir), exclude_caches=True) == []
- assert dir_is_tagged(path=str(cache_dir), exclude_caches=False) == []
- assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=False) == []
- assert dir_is_tagged(path=str(normal_dir), exclude_caches=False) == []
- # Test 2: exclude_caches with file-descriptor-based operations
- with open_dir(str(cache_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == [CACHE_TAG_NAME]
- with open_dir(str(invalid_cache_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == []
- with open_dir(str(normal_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == []
- with open_dir(str(cache_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
- with open_dir(str(invalid_cache_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
- with open_dir(str(normal_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
- # Test 3: exclude_if_present with path-based operations
- tags = [".NOBACKUP"]
- assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"]
- assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == []
- assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == []
- tags = [".NOBACKUP", ".DONOTBACKUP"]
- assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"]
- assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == [".DONOTBACKUP"]
- assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == []
- # Test 4: exclude_if_present with file descriptor-based operations
- tags = [".NOBACKUP"]
- with open_dir(str(tagged_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"]
- with open_dir(str(other_tagged_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
- with open_dir(str(normal_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
- tags = [".NOBACKUP", ".DONOTBACKUP"]
- with open_dir(str(tagged_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"]
- with open_dir(str(other_tagged_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".DONOTBACKUP"]
- with open_dir(str(normal_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
- # Test 5: both exclude types with path-based operations
- assert sorted(dir_is_tagged(path=str(both_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"])) == [
- ".NOBACKUP",
- CACHE_TAG_NAME,
- ]
- assert dir_is_tagged(path=str(cache_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME]
- assert dir_is_tagged(path=str(tagged_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"]
- assert dir_is_tagged(path=str(normal_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == []
- # Test 6: both exclude types with file descriptor-based operations
- with open_dir(str(both_dir)) as fd:
- result = dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"])
- assert sorted(result) == [".NOBACKUP", CACHE_TAG_NAME]
- with open_dir(str(cache_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME]
- with open_dir(str(tagged_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"]
- with open_dir(str(normal_dir)) as fd:
- assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == []
|