helpers_test.py 63 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646
  1. import base64
  2. import errno
  3. import getpass
  4. import hashlib
  5. import os
  6. import shutil
  7. import sys
  8. from argparse import ArgumentTypeError
  9. from contextlib import contextmanager
  10. from datetime import datetime, timezone, timedelta
  11. from io import StringIO, BytesIO
  12. import pytest
  13. from ..archiver.prune_cmd import prune_within, prune_split
  14. from .. import platform
  15. from ..constants import * # NOQA
  16. from ..constants import CACHE_TAG_NAME, CACHE_TAG_CONTENTS
  17. from ..helpers.fs import dir_is_tagged
  18. from ..helpers import Location
  19. from ..helpers import Buffer
  20. from ..helpers import (
  21. partial_format,
  22. format_file_size,
  23. parse_file_size,
  24. format_timedelta,
  25. format_line,
  26. PlaceholderError,
  27. replace_placeholders,
  28. )
  29. from ..helpers import remove_dotdot_prefixes, make_path_safe, clean_lines
  30. from ..helpers import interval
  31. from ..helpers import get_base_dir, get_cache_dir, get_keys_dir, get_security_dir, get_config_dir, get_runtime_dir
  32. from ..helpers import is_slow_msgpack
  33. from ..helpers import msgpack
  34. from ..helpers import yes, TRUISH, FALSISH, DEFAULTISH
  35. from ..helpers import StableDict, bin_to_hex
  36. from ..helpers import parse_timestamp, ChunkIteratorFileWrapper, ChunkerParams
  37. from ..helpers import archivename_validator, text_validator
  38. from ..helpers import ProgressIndicatorPercent
  39. from ..helpers import swidth_slice
  40. from ..helpers import chunkit
  41. from ..helpers import safe_ns, safe_s, SUPPORT_32BIT_PLATFORMS
  42. from ..helpers import popen_with_error_handling
  43. from ..helpers import dash_open
  44. from ..helpers import iter_separated
  45. from ..helpers import eval_escapes
  46. from ..helpers import safe_unlink
  47. from ..helpers import text_to_json, binary_to_json
  48. from ..helpers import classify_ec, max_ec
  49. from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded
  50. from ..platform import is_cygwin, is_win32, is_darwin
  51. from . import FakeInputs, are_hardlinks_supported
  52. from . import rejected_dotdot_paths
  53. def test_bin_to_hex():
  54. assert bin_to_hex(b"") == ""
  55. assert bin_to_hex(b"\x00\x01\xff") == "0001ff"
  56. @pytest.mark.parametrize(
  57. "key,value",
  58. [("key", b"\x00\x01\x02\x03"), ("key", b"\x00\x01\x02"), ("key", b"\x00\x01"), ("key", b"\x00"), ("key", b"")],
  59. )
  60. def test_binary_to_json(key, value):
  61. key_b64 = key + "_b64"
  62. d = binary_to_json(key, value)
  63. assert key_b64 in d
  64. assert base64.b64decode(d[key_b64]) == value
  65. @pytest.mark.parametrize(
  66. "key,value,strict",
  67. [
  68. ("key", "abc", True),
  69. ("key", "äöü", True),
  70. ("key", "", True),
  71. ("key", b"\x00\xff".decode("utf-8", errors="surrogateescape"), False),
  72. ("key", "äöü".encode("latin1").decode("utf-8", errors="surrogateescape"), False),
  73. ],
  74. )
  75. def test_text_to_json(key, value, strict):
  76. key_b64 = key + "_b64"
  77. d = text_to_json(key, value)
  78. value_b = value.encode("utf-8", errors="surrogateescape")
  79. if strict:
  80. # no surrogate-escapes, just unicode text
  81. assert key in d
  82. assert d[key] == value_b.decode("utf-8", errors="strict")
  83. assert d[key].encode("utf-8", errors="strict") == value_b
  84. assert key_b64 not in d # not needed. pure valid unicode.
  85. else:
  86. # requiring surrogate-escapes. text has replacement chars, base64 representation is present.
  87. assert key in d
  88. assert d[key] == value.encode("utf-8", errors="replace").decode("utf-8", errors="strict")
  89. assert d[key].encode("utf-8", errors="strict") == value.encode("utf-8", errors="replace")
  90. assert key_b64 in d
  91. assert base64.b64decode(d[key_b64]) == value_b
  92. class TestLocationWithoutEnv:
  93. @pytest.fixture
  94. def keys_dir(self, tmpdir, monkeypatch):
  95. tmpdir = str(tmpdir)
  96. monkeypatch.setenv("BORG_KEYS_DIR", tmpdir)
  97. if not tmpdir.endswith(os.path.sep):
  98. tmpdir += os.path.sep
  99. return tmpdir
  100. def test_ssh(self, monkeypatch, keys_dir):
  101. monkeypatch.delenv("BORG_REPO", raising=False)
  102. assert (
  103. repr(Location("ssh://user@host:1234//absolute/path"))
  104. == "Location(proto='ssh', user='user', host='host', port=1234, path='/absolute/path')"
  105. )
  106. assert Location("ssh://user@host:1234//absolute/path").to_key_filename() == keys_dir + "host___absolute_path"
  107. assert (
  108. repr(Location("ssh://user@host:1234/relative/path"))
  109. == "Location(proto='ssh', user='user', host='host', port=1234, path='relative/path')"
  110. )
  111. assert Location("ssh://user@host:1234/relative/path").to_key_filename() == keys_dir + "host__relative_path"
  112. assert (
  113. repr(Location("ssh://user@host/relative/path"))
  114. == "Location(proto='ssh', user='user', host='host', port=None, path='relative/path')"
  115. )
  116. assert (
  117. repr(Location("ssh://user@[::]:1234/relative/path"))
  118. == "Location(proto='ssh', user='user', host='::', port=1234, path='relative/path')"
  119. )
  120. assert Location("ssh://user@[::]:1234/relative/path").to_key_filename() == keys_dir + "____relative_path"
  121. assert (
  122. repr(Location("ssh://user@[::]/relative/path"))
  123. == "Location(proto='ssh', user='user', host='::', port=None, path='relative/path')"
  124. )
  125. assert (
  126. repr(Location("ssh://user@[2001:db8::]:1234/relative/path"))
  127. == "Location(proto='ssh', user='user', host='2001:db8::', port=1234, path='relative/path')"
  128. )
  129. assert (
  130. Location("ssh://user@[2001:db8::]:1234/relative/path").to_key_filename()
  131. == keys_dir + "2001_db8____relative_path"
  132. )
  133. assert (
  134. repr(Location("ssh://user@[2001:db8::]/relative/path"))
  135. == "Location(proto='ssh', user='user', host='2001:db8::', port=None, path='relative/path')"
  136. )
  137. assert (
  138. repr(Location("ssh://user@[2001:db8::c0:ffee]:1234/relative/path"))
  139. == "Location(proto='ssh', user='user', host='2001:db8::c0:ffee', port=1234, path='relative/path')"
  140. )
  141. assert (
  142. repr(Location("ssh://user@[2001:db8::c0:ffee]/relative/path"))
  143. == "Location(proto='ssh', user='user', host='2001:db8::c0:ffee', port=None, path='relative/path')"
  144. )
  145. assert (
  146. repr(Location("ssh://user@[2001:db8::192.0.2.1]:1234/relative/path"))
  147. == "Location(proto='ssh', user='user', host='2001:db8::192.0.2.1', port=1234, path='relative/path')"
  148. )
  149. assert (
  150. repr(Location("ssh://user@[2001:db8::192.0.2.1]/relative/path"))
  151. == "Location(proto='ssh', user='user', host='2001:db8::192.0.2.1', port=None, path='relative/path')"
  152. )
  153. assert (
  154. Location("ssh://user@[2001:db8::192.0.2.1]/relative/path").to_key_filename()
  155. == keys_dir + "2001_db8__192_0_2_1__relative_path"
  156. )
  157. assert (
  158. repr(Location("ssh://user@[2a02:0001:0002:0003:0004:0005:0006:0007]/relative/path"))
  159. == "Location(proto='ssh', user='user', "
  160. "host='2a02:0001:0002:0003:0004:0005:0006:0007', port=None, path='relative/path')"
  161. )
  162. assert (
  163. repr(Location("ssh://user@[2a02:0001:0002:0003:0004:0005:0006:0007]:1234/relative/path"))
  164. == "Location(proto='ssh', user='user', "
  165. "host='2a02:0001:0002:0003:0004:0005:0006:0007', port=1234, path='relative/path')"
  166. )
  167. def test_rclone(self, monkeypatch, keys_dir):
  168. monkeypatch.delenv("BORG_REPO", raising=False)
  169. assert (
  170. repr(Location("rclone:remote:path"))
  171. == "Location(proto='rclone', user=None, host=None, port=None, path='remote:path')"
  172. )
  173. assert Location("rclone:remote:path").to_key_filename() == keys_dir + "remote_path"
  174. def test_sftp(self, monkeypatch, keys_dir):
  175. monkeypatch.delenv("BORG_REPO", raising=False)
  176. # relative path
  177. assert (
  178. repr(Location("sftp://user@host:1234/rel/path"))
  179. == "Location(proto='sftp', user='user', host='host', port=1234, path='rel/path')"
  180. )
  181. assert Location("sftp://user@host:1234/rel/path").to_key_filename() == keys_dir + "host__rel_path"
  182. # absolute path
  183. assert (
  184. repr(Location("sftp://user@host:1234//abs/path"))
  185. == "Location(proto='sftp', user='user', host='host', port=1234, path='/abs/path')"
  186. )
  187. assert Location("sftp://user@host:1234//abs/path").to_key_filename() == keys_dir + "host___abs_path"
  188. def test_socket(self, monkeypatch, keys_dir):
  189. monkeypatch.delenv("BORG_REPO", raising=False)
  190. assert (
  191. repr(Location("socket:///repo/path"))
  192. == "Location(proto='socket', user=None, host=None, port=None, path='/repo/path')"
  193. )
  194. assert Location("socket:///some/path").to_key_filename() == keys_dir + "_some_path"
  195. def test_file(self, monkeypatch, keys_dir):
  196. monkeypatch.delenv("BORG_REPO", raising=False)
  197. assert (
  198. repr(Location("file:///some/path"))
  199. == "Location(proto='file', user=None, host=None, port=None, path='/some/path')"
  200. )
  201. assert (
  202. repr(Location("file:///some/path"))
  203. == "Location(proto='file', user=None, host=None, port=None, path='/some/path')"
  204. )
  205. assert Location("file:///some/path").to_key_filename() == keys_dir + "_some_path"
  206. def test_smb(self, monkeypatch, keys_dir):
  207. monkeypatch.delenv("BORG_REPO", raising=False)
  208. assert (
  209. repr(Location("file:////server/share/path"))
  210. == "Location(proto='file', user=None, host=None, port=None, path='//server/share/path')"
  211. )
  212. assert Location("file:////server/share/path").to_key_filename() == keys_dir + "__server_share_path"
  213. def test_folder(self, monkeypatch, keys_dir):
  214. monkeypatch.delenv("BORG_REPO", raising=False)
  215. rel_path = "path"
  216. abs_path = os.path.abspath(rel_path)
  217. assert repr(Location(rel_path)) == f"Location(proto='file', user=None, host=None, port=None, path='{abs_path}')"
  218. assert Location("path").to_key_filename().endswith(rel_path)
  219. def test_abspath(self, monkeypatch, keys_dir):
  220. monkeypatch.delenv("BORG_REPO", raising=False)
  221. assert (
  222. repr(Location("/absolute/path"))
  223. == "Location(proto='file', user=None, host=None, port=None, path='/absolute/path')"
  224. )
  225. assert Location("/absolute/path").to_key_filename() == keys_dir + "_absolute_path"
  226. assert (
  227. repr(Location("ssh://user@host//absolute/path"))
  228. == "Location(proto='ssh', user='user', host='host', port=None, path='/absolute/path')"
  229. )
  230. assert Location("ssh://user@host//absolute/path").to_key_filename() == keys_dir + "host___absolute_path"
  231. def test_relpath(self, monkeypatch, keys_dir):
  232. monkeypatch.delenv("BORG_REPO", raising=False)
  233. # for a local path, borg creates a Location instance with an absolute path
  234. rel_path = "relative/path"
  235. abs_path = os.path.abspath(rel_path)
  236. assert repr(Location(rel_path)) == f"Location(proto='file', user=None, host=None, port=None, path='{abs_path}')"
  237. assert Location(rel_path).to_key_filename().endswith("relative_path")
  238. assert (
  239. repr(Location("ssh://user@host/relative/path"))
  240. == "Location(proto='ssh', user='user', host='host', port=None, path='relative/path')"
  241. )
  242. assert Location("ssh://user@host/relative/path").to_key_filename() == keys_dir + "host__relative_path"
  243. def test_with_colons(self, monkeypatch, keys_dir):
  244. monkeypatch.delenv("BORG_REPO", raising=False)
  245. assert (
  246. repr(Location("/abs/path:w:cols"))
  247. == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:w:cols')"
  248. )
  249. assert (
  250. repr(Location("/abs/path:with:colons"))
  251. == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:with:colons')"
  252. )
  253. assert (
  254. repr(Location("/abs/path:with:colons"))
  255. == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:with:colons')"
  256. )
  257. assert Location("/abs/path:with:colons").to_key_filename() == keys_dir + "_abs_path_with_colons"
  258. def test_canonical_path(self, monkeypatch):
  259. monkeypatch.delenv("BORG_REPO", raising=False)
  260. locations = [
  261. "relative/path",
  262. "/absolute/path",
  263. "file:///absolute/path",
  264. "socket:///absolute/path",
  265. "ssh://host/relative/path",
  266. "ssh://host//absolute/path",
  267. "ssh://user@host:1234/relative/path",
  268. "sftp://host/relative/path",
  269. "sftp://host//absolute/path",
  270. "sftp://user@host:1234/relative/path",
  271. "rclone:remote:path",
  272. ]
  273. for location in locations:
  274. assert (
  275. Location(location).canonical_path() == Location(Location(location).canonical_path()).canonical_path()
  276. ), ("failed: %s" % location)
  277. def test_bad_syntax(self):
  278. with pytest.raises(ValueError):
  279. # this is invalid due to the 2nd colon, correct: 'ssh://user@host/path'
  280. Location("ssh://user@host:/path")
  281. @pytest.mark.parametrize(
  282. "name",
  283. [
  284. "foobar",
  285. # placeholders
  286. "foobar-{now}",
  287. ],
  288. )
  289. def test_archivename_ok(name):
  290. archivename_validator(name) # must not raise an exception
  291. @pytest.mark.parametrize(
  292. "name",
  293. [
  294. "", # too short
  295. "x" * 201, # too long
  296. # invalid chars:
  297. "foo/bar",
  298. "foo\\bar",
  299. ">foo",
  300. "<foo",
  301. "|foo",
  302. 'foo"bar',
  303. "foo?",
  304. "*bar",
  305. "foo\nbar",
  306. "foo\0bar",
  307. # leading/trailing blanks
  308. " foo",
  309. "bar ",
  310. # contains surrogate-escapes
  311. "foo\udc80bar",
  312. "foo\udcffbar",
  313. ],
  314. )
  315. def test_archivename_invalid(name):
  316. with pytest.raises(ArgumentTypeError):
  317. archivename_validator(name)
  318. @pytest.mark.parametrize("text", ["", "single line", "multi\nline\ncomment"])
  319. def test_text_ok(text):
  320. tv = text_validator(max_length=100, name="name")
  321. tv(text) # must not raise an exception
  322. @pytest.mark.parametrize(
  323. "text",
  324. [
  325. "x" * 101, # too long
  326. # invalid chars:
  327. "foo\0bar",
  328. # contains surrogate-escapes
  329. "foo\udc80bar",
  330. "foo\udcffbar",
  331. ],
  332. )
  333. def test_text_invalid(text):
  334. tv = text_validator(max_length=100, name="name")
  335. with pytest.raises(ArgumentTypeError):
  336. tv(text)
  337. def test_format_timedelta():
  338. t0 = datetime(2001, 1, 1, 10, 20, 3, 0)
  339. t1 = datetime(2001, 1, 1, 12, 20, 4, 100000)
  340. assert format_timedelta(t1 - t0) == "2 hours 1.100 seconds"
  341. @pytest.mark.parametrize(
  342. "chunker_params, expected_return",
  343. [
  344. ("default", ("buzhash", 19, 23, 21, 4095)),
  345. ("19,23,21,4095", ("buzhash", 19, 23, 21, 4095)),
  346. ("buzhash,19,23,21,4095", ("buzhash", 19, 23, 21, 4095)),
  347. ("10,23,16,4095", ("buzhash", 10, 23, 16, 4095)),
  348. ("fixed,4096", ("fixed", 4096, 0)),
  349. ("fixed,4096,200", ("fixed", 4096, 200)),
  350. ],
  351. )
  352. def test_valid_chunkerparams(chunker_params, expected_return):
  353. assert ChunkerParams(chunker_params) == expected_return
  354. @pytest.mark.parametrize(
  355. "invalid_chunker_params",
  356. [
  357. "crap,1,2,3,4", # invalid algo
  358. "buzhash,5,7,6,4095", # too small min. size
  359. "buzhash,19,24,21,4095", # too big max. size
  360. "buzhash,23,19,21,4095", # violates min <= mask <= max
  361. "fixed,63", # too small block size
  362. "fixed,%d,%d" % (MAX_DATA_SIZE + 1, 4096), # too big block size
  363. "fixed,%d,%d" % (4096, MAX_DATA_SIZE + 1), # too big header size
  364. ],
  365. )
  366. def test_invalid_chunkerparams(invalid_chunker_params):
  367. with pytest.raises(ArgumentTypeError):
  368. ChunkerParams(invalid_chunker_params)
  369. @pytest.mark.parametrize(
  370. "original_path, expected_path",
  371. [
  372. (".", "."),
  373. ("..", "."),
  374. ("/", "."),
  375. ("//", "."),
  376. ("foo", "foo"),
  377. ("foo/bar", "foo/bar"),
  378. ("/foo/bar", "foo/bar"),
  379. ("../foo/bar", "foo/bar"),
  380. ],
  381. )
  382. def test_remove_dotdot_prefixes(original_path, expected_path):
  383. assert remove_dotdot_prefixes(original_path) == expected_path
  384. @pytest.mark.parametrize(
  385. "original_path, expected_path",
  386. [
  387. (".", "."),
  388. ("./", "."),
  389. ("/foo", "foo"),
  390. ("//foo", "foo"),
  391. (".//foo//bar//", "foo/bar"),
  392. ("/foo/bar", "foo/bar"),
  393. ("//foo/bar", "foo/bar"),
  394. ("//foo/./bar", "foo/bar"),
  395. (".test", ".test"),
  396. (".test.", ".test."),
  397. ("..test..", "..test.."),
  398. ("/te..st/foo/bar", "te..st/foo/bar"),
  399. ("/..test../abc//", "..test../abc"),
  400. ],
  401. )
  402. def test_valid_make_path_safe(original_path, expected_path):
  403. assert make_path_safe(original_path) == expected_path
  404. @pytest.mark.parametrize("path", rejected_dotdot_paths)
  405. def test_invalid_make_path_safe(path):
  406. with pytest.raises(ValueError, match="unexpected '..' element in path"):
  407. make_path_safe(path)
  408. class MockArchive:
  409. def __init__(self, ts, id):
  410. self.ts = ts
  411. self.id = id
  412. def __repr__(self):
  413. return f"{self.id}: {self.ts.isoformat()}"
  414. # This is the local timezone of the system running the tests.
  415. # We need this e.g. to construct archive timestamps for the prune tests,
  416. # because borg prune operates in the local timezone (it first converts the
  417. # archive timestamp to the local timezone). So, if we want the y/m/d/h/m/s
  418. # values which prune uses to be exactly the ones we give [and NOT shift them
  419. # by tzoffset], we need to give the timestamps in the same local timezone.
  420. # Please note that the timestamps in a real borg archive or manifest are
  421. # stored in UTC timezone.
  422. local_tz = datetime.now(tz=timezone.utc).astimezone(tz=None).tzinfo
  423. @pytest.mark.parametrize(
  424. "rule,num_to_keep,expected_ids",
  425. [
  426. ("yearly", 3, (13, 2, 1)),
  427. ("monthly", 3, (13, 8, 4)),
  428. ("weekly", 2, (13, 8)),
  429. ("daily", 3, (13, 8, 7)),
  430. ("hourly", 3, (13, 10, 8)),
  431. ("minutely", 3, (13, 10, 9)),
  432. ("secondly", 4, (13, 12, 11, 10)),
  433. ("daily", 0, []),
  434. ],
  435. )
  436. def test_prune_split(rule, num_to_keep, expected_ids):
  437. def subset(lst, ids):
  438. return {i for i in lst if i.id in ids}
  439. archives = [
  440. # years apart
  441. MockArchive(datetime(2015, 1, 1, 10, 0, 0, tzinfo=local_tz), 1),
  442. MockArchive(datetime(2016, 1, 1, 10, 0, 0, tzinfo=local_tz), 2),
  443. MockArchive(datetime(2017, 1, 1, 10, 0, 0, tzinfo=local_tz), 3),
  444. # months apart
  445. MockArchive(datetime(2017, 2, 1, 10, 0, 0, tzinfo=local_tz), 4),
  446. MockArchive(datetime(2017, 3, 1, 10, 0, 0, tzinfo=local_tz), 5),
  447. # days apart
  448. MockArchive(datetime(2017, 3, 2, 10, 0, 0, tzinfo=local_tz), 6),
  449. MockArchive(datetime(2017, 3, 3, 10, 0, 0, tzinfo=local_tz), 7),
  450. MockArchive(datetime(2017, 3, 4, 10, 0, 0, tzinfo=local_tz), 8),
  451. # minutes apart
  452. MockArchive(datetime(2017, 10, 1, 9, 45, 0, tzinfo=local_tz), 9),
  453. MockArchive(datetime(2017, 10, 1, 9, 55, 0, tzinfo=local_tz), 10),
  454. # seconds apart
  455. MockArchive(datetime(2017, 10, 1, 10, 0, 1, tzinfo=local_tz), 11),
  456. MockArchive(datetime(2017, 10, 1, 10, 0, 3, tzinfo=local_tz), 12),
  457. MockArchive(datetime(2017, 10, 1, 10, 0, 5, tzinfo=local_tz), 13),
  458. ]
  459. kept_because = {}
  460. keep = prune_split(archives, rule, num_to_keep, kept_because)
  461. assert set(keep) == subset(archives, expected_ids)
  462. for item in keep:
  463. assert kept_because[item.id][0] == rule
  464. def test_prune_split_keep_oldest():
  465. def subset(lst, ids):
  466. return {i for i in lst if i.id in ids}
  467. archives = [
  468. # oldest backup, but not last in its year
  469. MockArchive(datetime(2018, 1, 1, 10, 0, 0, tzinfo=local_tz), 1),
  470. # an interim backup
  471. MockArchive(datetime(2018, 12, 30, 10, 0, 0, tzinfo=local_tz), 2),
  472. # year-end backups
  473. MockArchive(datetime(2018, 12, 31, 10, 0, 0, tzinfo=local_tz), 3),
  474. MockArchive(datetime(2019, 12, 31, 10, 0, 0, tzinfo=local_tz), 4),
  475. ]
  476. # Keep oldest when retention target can't otherwise be met
  477. kept_because = {}
  478. keep = prune_split(archives, "yearly", 3, kept_because)
  479. assert set(keep) == subset(archives, [1, 3, 4])
  480. assert kept_because[1][0] == "yearly[oldest]"
  481. assert kept_because[3][0] == "yearly"
  482. assert kept_because[4][0] == "yearly"
  483. # Otherwise, prune it
  484. kept_because = {}
  485. keep = prune_split(archives, "yearly", 2, kept_because)
  486. assert set(keep) == subset(archives, [3, 4])
  487. assert kept_because[3][0] == "yearly"
  488. assert kept_because[4][0] == "yearly"
  489. def test_prune_split_no_archives():
  490. archives = []
  491. kept_because = {}
  492. keep = prune_split(archives, "yearly", 3, kept_because)
  493. assert keep == []
  494. assert kept_because == {}
  495. @pytest.mark.parametrize(
  496. "timeframe, num_secs",
  497. [
  498. ("5S", 5),
  499. ("2M", 2 * 60),
  500. ("1H", 60 * 60),
  501. ("1d", 24 * 60 * 60),
  502. ("1w", 7 * 24 * 60 * 60),
  503. ("1m", 31 * 24 * 60 * 60),
  504. ("1y", 365 * 24 * 60 * 60),
  505. ],
  506. )
  507. def test_interval(timeframe, num_secs):
  508. assert interval(timeframe) == num_secs
  509. @pytest.mark.parametrize(
  510. "invalid_interval, error_tuple",
  511. [
  512. ("H", ('Invalid number "": expected positive integer',)),
  513. ("-1d", ('Invalid number "-1": expected positive integer',)),
  514. ("food", ('Invalid number "foo": expected positive integer',)),
  515. ],
  516. )
  517. def test_interval_time_unit(invalid_interval, error_tuple):
  518. with pytest.raises(ArgumentTypeError) as exc:
  519. interval(invalid_interval)
  520. assert exc.value.args == error_tuple
  521. def test_interval_number():
  522. with pytest.raises(ArgumentTypeError) as exc:
  523. interval("5")
  524. assert exc.value.args == ('Unexpected time unit "5": choose from y, m, w, d, H, M, S',)
  525. def test_prune_within():
  526. def subset(lst, indices):
  527. return {lst[i] for i in indices}
  528. def dotest(test_archives, within, indices):
  529. for ta in test_archives, reversed(test_archives):
  530. kept_because = {}
  531. keep = prune_within(ta, interval(within), kept_because)
  532. assert set(keep) == subset(test_archives, indices)
  533. assert all("within" == kept_because[a.id][0] for a in keep)
  534. # 1 minute, 1.5 hours, 2.5 hours, 3.5 hours, 25 hours, 49 hours
  535. test_offsets = [60, 90 * 60, 150 * 60, 210 * 60, 25 * 60 * 60, 49 * 60 * 60]
  536. now = datetime.now(timezone.utc)
  537. test_dates = [now - timedelta(seconds=s) for s in test_offsets]
  538. test_archives = [MockArchive(date, i) for i, date in enumerate(test_dates)]
  539. dotest(test_archives, "15S", [])
  540. dotest(test_archives, "2M", [0])
  541. dotest(test_archives, "1H", [0])
  542. dotest(test_archives, "2H", [0, 1])
  543. dotest(test_archives, "3H", [0, 1, 2])
  544. dotest(test_archives, "24H", [0, 1, 2, 3])
  545. dotest(test_archives, "26H", [0, 1, 2, 3, 4])
  546. dotest(test_archives, "2d", [0, 1, 2, 3, 4])
  547. dotest(test_archives, "50H", [0, 1, 2, 3, 4, 5])
  548. dotest(test_archives, "3d", [0, 1, 2, 3, 4, 5])
  549. dotest(test_archives, "1w", [0, 1, 2, 3, 4, 5])
  550. dotest(test_archives, "1m", [0, 1, 2, 3, 4, 5])
  551. dotest(test_archives, "1y", [0, 1, 2, 3, 4, 5])
  552. def test_stable_dict():
  553. d = StableDict(foo=1, bar=2, boo=3, baz=4)
  554. assert list(d.items()) == [("bar", 2), ("baz", 4), ("boo", 3), ("foo", 1)]
  555. assert hashlib.md5(msgpack.packb(d)).hexdigest() == "fc78df42cd60691b3ac3dd2a2b39903f"
  556. def test_parse_timestamp():
  557. assert parse_timestamp("2015-04-19T20:25:00.226410") == datetime(2015, 4, 19, 20, 25, 0, 226410, timezone.utc)
  558. assert parse_timestamp("2015-04-19T20:25:00") == datetime(2015, 4, 19, 20, 25, 0, 0, timezone.utc)
  559. def test_get_base_dir(monkeypatch):
  560. """test that get_base_dir respects environment"""
  561. monkeypatch.delenv("BORG_BASE_DIR", raising=False)
  562. monkeypatch.delenv("HOME", raising=False)
  563. monkeypatch.delenv("USER", raising=False)
  564. assert get_base_dir(legacy=True) == os.path.expanduser("~")
  565. monkeypatch.setenv("USER", "root")
  566. assert get_base_dir(legacy=True) == os.path.expanduser("~root")
  567. monkeypatch.setenv("HOME", "/var/tmp/home")
  568. assert get_base_dir(legacy=True) == "/var/tmp/home"
  569. monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
  570. assert get_base_dir(legacy=True) == "/var/tmp/base"
  571. # non-legacy is much easier:
  572. monkeypatch.delenv("BORG_BASE_DIR", raising=False)
  573. assert get_base_dir(legacy=False) is None
  574. monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
  575. assert get_base_dir(legacy=False) == "/var/tmp/base"
  576. def test_get_base_dir_compat(monkeypatch):
  577. """test that it works the same for legacy and for non-legacy implementation"""
  578. monkeypatch.delenv("BORG_BASE_DIR", raising=False)
  579. # old way: if BORG_BASE_DIR is not set, make something up with HOME/USER/~
  580. # new way: if BORG_BASE_DIR is not set, return None and let caller deal with it.
  581. assert get_base_dir(legacy=False) is None
  582. # new and old way: BORG_BASE_DIR overrides all other "base path determination".
  583. monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
  584. assert get_base_dir(legacy=False) == get_base_dir(legacy=True)
  585. def test_get_config_dir(monkeypatch):
  586. """test that get_config_dir respects environment"""
  587. monkeypatch.delenv("BORG_BASE_DIR", raising=False)
  588. home_dir = os.path.expanduser("~")
  589. if is_win32:
  590. monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
  591. assert get_config_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg")
  592. monkeypatch.setenv("BORG_CONFIG_DIR", home_dir)
  593. assert get_config_dir(create=False) == home_dir
  594. elif is_darwin:
  595. monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
  596. assert get_config_dir(create=False) == os.path.join(home_dir, "Library", "Application Support", "borg")
  597. monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp")
  598. assert get_config_dir(create=False) == "/var/tmp"
  599. else:
  600. monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
  601. monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
  602. assert get_config_dir(create=False) == os.path.join(home_dir, ".config", "borg")
  603. monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/.config")
  604. assert get_config_dir(create=False) == os.path.join("/var/tmp/.config", "borg")
  605. monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp")
  606. assert get_config_dir(create=False) == "/var/tmp"
  607. def test_get_config_dir_compat(monkeypatch):
  608. """test that it works the same for legacy and for non-legacy implementation"""
  609. monkeypatch.delenv("BORG_CONFIG_DIR", raising=False)
  610. monkeypatch.delenv("BORG_BASE_DIR", raising=False)
  611. monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
  612. if not is_darwin and not is_win32:
  613. # fails on macOS: assert '/Users/tw/Library/Application Support/borg' == '/Users/tw/.config/borg'
  614. # fails on win32 MSYS2 (but we do not need legacy compat there).
  615. assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
  616. monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/xdg.config.d")
  617. # fails on macOS: assert '/Users/tw/Library/Application Support/borg' == '/var/tmp/xdg.config.d'
  618. # fails on win32 MSYS2 (but we do not need legacy compat there).
  619. assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
  620. monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
  621. assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
  622. monkeypatch.setenv("BORG_CONFIG_DIR", "/var/tmp/borg.config.d")
  623. assert get_config_dir(legacy=False, create=False) == get_config_dir(legacy=True, create=False)
  624. def test_get_cache_dir(monkeypatch):
  625. """test that get_cache_dir respects environment"""
  626. monkeypatch.delenv("BORG_BASE_DIR", raising=False)
  627. home_dir = os.path.expanduser("~")
  628. if is_win32:
  629. monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
  630. assert get_cache_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "Cache")
  631. monkeypatch.setenv("BORG_CACHE_DIR", home_dir)
  632. assert get_cache_dir(create=False) == home_dir
  633. elif is_darwin:
  634. monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
  635. assert get_cache_dir(create=False) == os.path.join(home_dir, "Library", "Caches", "borg")
  636. monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp")
  637. assert get_cache_dir(create=False) == "/var/tmp"
  638. else:
  639. monkeypatch.delenv("XDG_CACHE_HOME", raising=False)
  640. monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
  641. assert get_cache_dir(create=False) == os.path.join(home_dir, ".cache", "borg")
  642. monkeypatch.setenv("XDG_CACHE_HOME", "/var/tmp/.cache")
  643. assert get_cache_dir(create=False) == os.path.join("/var/tmp/.cache", "borg")
  644. monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp")
  645. assert get_cache_dir(create=False) == "/var/tmp"
  646. def test_get_cache_dir_compat(monkeypatch):
  647. """test that it works the same for legacy and for non-legacy implementation"""
  648. monkeypatch.delenv("BORG_CACHE_DIR", raising=False)
  649. monkeypatch.delenv("BORG_BASE_DIR", raising=False)
  650. monkeypatch.delenv("XDG_CACHE_HOME", raising=False)
  651. if not is_darwin and not is_win32:
  652. # fails on macOS: assert '/Users/tw/Library/Caches/borg' == '/Users/tw/.cache/borg'
  653. # fails on win32 MSYS2 (but we do not need legacy compat there).
  654. assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
  655. # fails on macOS: assert '/Users/tw/Library/Caches/borg' == '/var/tmp/xdg.cache.d'
  656. # fails on win32 MSYS2 (but we do not need legacy compat there).
  657. monkeypatch.setenv("XDG_CACHE_HOME", "/var/tmp/xdg.cache.d")
  658. assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
  659. monkeypatch.setenv("BORG_BASE_DIR", "/var/tmp/base")
  660. assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
  661. monkeypatch.setenv("BORG_CACHE_DIR", "/var/tmp/borg.cache.d")
  662. assert get_cache_dir(legacy=False, create=False) == get_cache_dir(legacy=True, create=False)
  663. def test_get_keys_dir(monkeypatch):
  664. """test that get_keys_dir respects environment"""
  665. monkeypatch.delenv("BORG_BASE_DIR", raising=False)
  666. home_dir = os.path.expanduser("~")
  667. if is_win32:
  668. monkeypatch.delenv("BORG_KEYS_DIR", raising=False)
  669. assert get_keys_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "keys")
  670. monkeypatch.setenv("BORG_KEYS_DIR", home_dir)
  671. assert get_keys_dir(create=False) == home_dir
  672. elif is_darwin:
  673. monkeypatch.delenv("BORG_KEYS_DIR", raising=False)
  674. assert get_keys_dir(create=False) == os.path.join(home_dir, "Library", "Application Support", "borg", "keys")
  675. monkeypatch.setenv("BORG_KEYS_DIR", "/var/tmp")
  676. assert get_keys_dir(create=False) == "/var/tmp"
  677. else:
  678. monkeypatch.delenv("XDG_CONFIG_HOME", raising=False)
  679. monkeypatch.delenv("BORG_KEYS_DIR", raising=False)
  680. assert get_keys_dir(create=False) == os.path.join(home_dir, ".config", "borg", "keys")
  681. monkeypatch.setenv("XDG_CONFIG_HOME", "/var/tmp/.config")
  682. assert get_keys_dir(create=False) == os.path.join("/var/tmp/.config", "borg", "keys")
  683. monkeypatch.setenv("BORG_KEYS_DIR", "/var/tmp")
  684. assert get_keys_dir(create=False) == "/var/tmp"
  685. def test_get_security_dir(monkeypatch):
  686. """test that get_security_dir respects environment"""
  687. monkeypatch.delenv("BORG_BASE_DIR", raising=False)
  688. home_dir = os.path.expanduser("~")
  689. if is_win32:
  690. monkeypatch.delenv("BORG_SECURITY_DIR", raising=False)
  691. assert get_security_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "borg", "borg", "security")
  692. assert get_security_dir(repository_id="1234", create=False) == os.path.join(
  693. home_dir, "AppData", "Local", "borg", "borg", "security", "1234"
  694. )
  695. monkeypatch.setenv("BORG_SECURITY_DIR", home_dir)
  696. assert get_security_dir(create=False) == home_dir
  697. elif is_darwin:
  698. monkeypatch.delenv("BORG_SECURITY_DIR", raising=False)
  699. assert get_security_dir(create=False) == os.path.join(
  700. home_dir, "Library", "Application Support", "borg", "security"
  701. )
  702. assert get_security_dir(repository_id="1234", create=False) == os.path.join(
  703. home_dir, "Library", "Application Support", "borg", "security", "1234"
  704. )
  705. monkeypatch.setenv("BORG_SECURITY_DIR", "/var/tmp")
  706. assert get_security_dir(create=False) == "/var/tmp"
  707. else:
  708. monkeypatch.delenv("XDG_DATA_HOME", raising=False)
  709. monkeypatch.delenv("BORG_SECURITY_DIR", raising=False)
  710. assert get_security_dir(create=False) == os.path.join(home_dir, ".local", "share", "borg", "security")
  711. assert get_security_dir(repository_id="1234", create=False) == os.path.join(
  712. home_dir, ".local", "share", "borg", "security", "1234"
  713. )
  714. monkeypatch.setenv("XDG_DATA_HOME", "/var/tmp/.config")
  715. assert get_security_dir(create=False) == os.path.join("/var/tmp/.config", "borg", "security")
  716. monkeypatch.setenv("BORG_SECURITY_DIR", "/var/tmp")
  717. assert get_security_dir(create=False) == "/var/tmp"
  718. def test_get_runtime_dir(monkeypatch):
  719. """test that get_runtime_dir respects environment"""
  720. monkeypatch.delenv("BORG_BASE_DIR", raising=False)
  721. home_dir = os.path.expanduser("~")
  722. if is_win32:
  723. monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False)
  724. assert get_runtime_dir(create=False) == os.path.join(home_dir, "AppData", "Local", "Temp", "borg", "borg")
  725. monkeypatch.setenv("BORG_RUNTIME_DIR", home_dir)
  726. assert get_runtime_dir(create=False) == home_dir
  727. elif is_darwin:
  728. monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False)
  729. assert get_runtime_dir(create=False) == os.path.join(home_dir, "Library", "Caches", "TemporaryItems", "borg")
  730. monkeypatch.setenv("BORG_RUNTIME_DIR", "/var/tmp")
  731. assert get_runtime_dir(create=False) == "/var/tmp"
  732. else:
  733. monkeypatch.delenv("XDG_RUNTIME_DIR", raising=False)
  734. monkeypatch.delenv("BORG_RUNTIME_DIR", raising=False)
  735. uid = str(os.getuid())
  736. assert get_runtime_dir(create=False) in [
  737. os.path.join("/run/user", uid, "borg"),
  738. os.path.join("/var/run/user", uid, "borg"),
  739. os.path.join(f"/tmp/runtime-{uid}", "borg"),
  740. ]
  741. monkeypatch.setenv("XDG_RUNTIME_DIR", "/var/tmp/.cache")
  742. assert get_runtime_dir(create=False) == os.path.join("/var/tmp/.cache", "borg")
  743. monkeypatch.setenv("BORG_RUNTIME_DIR", "/var/tmp")
  744. assert get_runtime_dir(create=False) == "/var/tmp"
  745. @pytest.mark.parametrize(
  746. "size, fmt",
  747. [
  748. (0, "0 B"), # no rounding necessary for those
  749. (1, "1 B"),
  750. (142, "142 B"),
  751. (999, "999 B"),
  752. (1000, "1.00 kB"), # rounding starts here
  753. (1001, "1.00 kB"), # should be rounded away
  754. (1234, "1.23 kB"), # should be rounded down
  755. (1235, "1.24 kB"), # should be rounded up
  756. (1010, "1.01 kB"), # rounded down as well
  757. (999990000, "999.99 MB"), # rounded down
  758. (999990001, "999.99 MB"), # rounded down
  759. (999995000, "1.00 GB"), # rounded up to next unit
  760. (10**6, "1.00 MB"), # and all the remaining units, megabytes
  761. (10**9, "1.00 GB"), # gigabytes
  762. (10**12, "1.00 TB"), # terabytes
  763. (10**15, "1.00 PB"), # petabytes
  764. (10**18, "1.00 EB"), # exabytes
  765. (10**21, "1.00 ZB"), # zottabytes
  766. (10**24, "1.00 YB"), # yottabytes
  767. (-1, "-1 B"), # negative value
  768. (-1010, "-1.01 kB"), # negative value with rounding
  769. ],
  770. )
  771. def test_file_size(size, fmt):
  772. """test the size formatting routines"""
  773. assert format_file_size(size) == fmt
  774. @pytest.mark.parametrize(
  775. "size, fmt",
  776. [
  777. (0, "0 B"),
  778. (2**0, "1 B"),
  779. (2**10, "1.00 KiB"),
  780. (2**20, "1.00 MiB"),
  781. (2**30, "1.00 GiB"),
  782. (2**40, "1.00 TiB"),
  783. (2**50, "1.00 PiB"),
  784. (2**60, "1.00 EiB"),
  785. (2**70, "1.00 ZiB"),
  786. (2**80, "1.00 YiB"),
  787. (-(2**0), "-1 B"),
  788. (-(2**10), "-1.00 KiB"),
  789. (-(2**20), "-1.00 MiB"),
  790. ],
  791. )
  792. def test_file_size_iec(size, fmt):
  793. """test the size formatting routines"""
  794. assert format_file_size(size, iec=True) == fmt
  795. @pytest.mark.parametrize(
  796. "original_size, formatted_size",
  797. [
  798. (1234, "1.2 kB"), # rounded down
  799. (1254, "1.3 kB"), # rounded up
  800. (999990000, "1.0 GB"), # and not 999.9 MB or 1000.0 MB
  801. ],
  802. )
  803. def test_file_size_precision(original_size, formatted_size):
  804. assert format_file_size(original_size, precision=1) == formatted_size
  805. @pytest.mark.parametrize("size, fmt", [(0, "0 B"), (1, "+1 B"), (1234, "+1.23 kB"), (-1, "-1 B"), (-1234, "-1.23 kB")])
  806. def test_file_size_sign(size, fmt):
  807. assert format_file_size(size, sign=True) == fmt
  808. @pytest.mark.parametrize(
  809. "string, value", [("1", 1), ("20", 20), ("5K", 5000), ("1.75M", 1750000), ("1e+9", 1e9), ("-1T", -1e12)]
  810. )
  811. def test_parse_file_size(string, value):
  812. assert parse_file_size(string) == int(value)
  813. @pytest.mark.parametrize("string", ("", "5 Äpfel", "4E", "2229 bit", "1B"))
  814. def test_parse_file_size_invalid(string):
  815. with pytest.raises(ValueError):
  816. parse_file_size(string)
  817. def expected_py_mp_slow_combination():
  818. """do we expect msgpack to be slow in this environment?"""
  819. # we need to import upstream msgpack package here, not helpers.msgpack:
  820. import msgpack
  821. # msgpack is slow on cygwin
  822. if is_cygwin:
  823. return True
  824. # msgpack < 1.0.6 did not have py312 wheels
  825. if sys.version_info[:2] == (3, 12) and msgpack.version < (1, 0, 6):
  826. return True
  827. # otherwise we expect msgpack to be fast!
  828. return False
  829. @pytest.mark.skipif(expected_py_mp_slow_combination(), reason="ignore expected slow msgpack")
  830. def test_is_slow_msgpack():
  831. # we need to import upstream msgpack package here, not helpers.msgpack:
  832. import msgpack
  833. import msgpack.fallback
  834. saved_packer = msgpack.Packer
  835. try:
  836. msgpack.Packer = msgpack.fallback.Packer
  837. assert is_slow_msgpack()
  838. finally:
  839. msgpack.Packer = saved_packer
  840. # this tests that we have fast msgpack on test platform:
  841. assert not is_slow_msgpack()
  842. class TestBuffer:
  843. def test_type(self):
  844. buffer = Buffer(bytearray)
  845. assert isinstance(buffer.get(), bytearray)
  846. buffer = Buffer(bytes) # don't do that in practice
  847. assert isinstance(buffer.get(), bytes)
  848. def test_len(self):
  849. buffer = Buffer(bytearray, size=0)
  850. b = buffer.get()
  851. assert len(buffer) == len(b) == 0
  852. buffer = Buffer(bytearray, size=1234)
  853. b = buffer.get()
  854. assert len(buffer) == len(b) == 1234
  855. def test_resize(self):
  856. buffer = Buffer(bytearray, size=100)
  857. assert len(buffer) == 100
  858. b1 = buffer.get()
  859. buffer.resize(200)
  860. assert len(buffer) == 200
  861. b2 = buffer.get()
  862. assert b2 is not b1 # new, bigger buffer
  863. buffer.resize(100)
  864. assert len(buffer) >= 100
  865. b3 = buffer.get()
  866. assert b3 is b2 # still same buffer (200)
  867. buffer.resize(100, init=True)
  868. assert len(buffer) == 100 # except on init
  869. b4 = buffer.get()
  870. assert b4 is not b3 # new, smaller buffer
  871. def test_limit(self):
  872. buffer = Buffer(bytearray, size=100, limit=200)
  873. buffer.resize(200)
  874. assert len(buffer) == 200
  875. with pytest.raises(Buffer.MemoryLimitExceeded):
  876. buffer.resize(201)
  877. assert len(buffer) == 200
  878. def test_get(self):
  879. buffer = Buffer(bytearray, size=100, limit=200)
  880. b1 = buffer.get(50)
  881. assert len(b1) >= 50 # == 100
  882. b2 = buffer.get(100)
  883. assert len(b2) >= 100 # == 100
  884. assert b2 is b1 # did not need resizing yet
  885. b3 = buffer.get(200)
  886. assert len(b3) == 200
  887. assert b3 is not b2 # new, resized buffer
  888. with pytest.raises(Buffer.MemoryLimitExceeded):
  889. buffer.get(201) # beyond limit
  890. assert len(buffer) == 200
  891. def test_yes_input():
  892. inputs = list(TRUISH)
  893. input = FakeInputs(inputs)
  894. for i in inputs:
  895. assert yes(input=input)
  896. inputs = list(FALSISH)
  897. input = FakeInputs(inputs)
  898. for i in inputs:
  899. assert not yes(input=input)
  900. def test_yes_input_defaults():
  901. inputs = list(DEFAULTISH)
  902. input = FakeInputs(inputs)
  903. for i in inputs:
  904. assert yes(default=True, input=input)
  905. input = FakeInputs(inputs)
  906. for i in inputs:
  907. assert not yes(default=False, input=input)
  908. def test_yes_input_custom():
  909. input = FakeInputs(["YES", "SURE", "NOPE"])
  910. assert yes(truish=("YES",), input=input)
  911. assert yes(truish=("SURE",), input=input)
  912. assert not yes(falsish=("NOPE",), input=input)
  913. def test_yes_env(monkeypatch):
  914. for value in TRUISH:
  915. monkeypatch.setenv("OVERRIDE_THIS", value)
  916. assert yes(env_var_override="OVERRIDE_THIS")
  917. for value in FALSISH:
  918. monkeypatch.setenv("OVERRIDE_THIS", value)
  919. assert not yes(env_var_override="OVERRIDE_THIS")
  920. def test_yes_env_default(monkeypatch):
  921. for value in DEFAULTISH:
  922. monkeypatch.setenv("OVERRIDE_THIS", value)
  923. assert yes(env_var_override="OVERRIDE_THIS", default=True)
  924. assert not yes(env_var_override="OVERRIDE_THIS", default=False)
  925. def test_yes_defaults():
  926. input = FakeInputs(["invalid", "", " "])
  927. assert not yes(input=input) # default=False
  928. assert not yes(input=input)
  929. assert not yes(input=input)
  930. input = FakeInputs(["invalid", "", " "])
  931. assert yes(default=True, input=input)
  932. assert yes(default=True, input=input)
  933. assert yes(default=True, input=input)
  934. input = FakeInputs([])
  935. assert yes(default=True, input=input)
  936. assert not yes(default=False, input=input)
  937. with pytest.raises(ValueError):
  938. yes(default=None)
  939. def test_yes_retry():
  940. input = FakeInputs(["foo", "bar", TRUISH[0]])
  941. assert yes(retry_msg="Retry: ", input=input)
  942. input = FakeInputs(["foo", "bar", FALSISH[0]])
  943. assert not yes(retry_msg="Retry: ", input=input)
  944. def test_yes_no_retry():
  945. input = FakeInputs(["foo", "bar", TRUISH[0]])
  946. assert not yes(retry=False, default=False, input=input)
  947. input = FakeInputs(["foo", "bar", FALSISH[0]])
  948. assert yes(retry=False, default=True, input=input)
  949. def test_yes_output(capfd):
  950. input = FakeInputs(["invalid", "y", "n"])
  951. assert yes(msg="intro-msg", false_msg="false-msg", true_msg="true-msg", retry_msg="retry-msg", input=input)
  952. out, err = capfd.readouterr()
  953. assert out == ""
  954. assert "intro-msg" in err
  955. assert "retry-msg" in err
  956. assert "true-msg" in err
  957. assert not yes(msg="intro-msg", false_msg="false-msg", true_msg="true-msg", retry_msg="retry-msg", input=input)
  958. out, err = capfd.readouterr()
  959. assert out == ""
  960. assert "intro-msg" in err
  961. assert "retry-msg" not in err
  962. assert "false-msg" in err
  963. def test_yes_env_output(capfd, monkeypatch):
  964. env_var = "OVERRIDE_SOMETHING"
  965. monkeypatch.setenv(env_var, "yes")
  966. assert yes(env_var_override=env_var)
  967. out, err = capfd.readouterr()
  968. assert out == ""
  969. assert env_var in err
  970. assert "yes" in err
  971. def test_progress_percentage(capfd):
  972. pi = ProgressIndicatorPercent(1000, step=5, start=0, msg="%3.0f%%")
  973. pi.logger.setLevel("INFO")
  974. pi.show(0)
  975. out, err = capfd.readouterr()
  976. assert err == " 0%\n"
  977. pi.show(420)
  978. pi.show(680)
  979. out, err = capfd.readouterr()
  980. assert err == " 42%\n 68%\n"
  981. pi.show(1000)
  982. out, err = capfd.readouterr()
  983. assert err == "100%\n"
  984. pi.finish()
  985. out, err = capfd.readouterr()
  986. assert err == "\n"
  987. def test_progress_percentage_step(capfd):
  988. pi = ProgressIndicatorPercent(100, step=2, start=0, msg="%3.0f%%")
  989. pi.logger.setLevel("INFO")
  990. pi.show()
  991. out, err = capfd.readouterr()
  992. assert err == " 0%\n"
  993. pi.show()
  994. out, err = capfd.readouterr()
  995. assert err == "" # no output at 1% as we have step == 2
  996. pi.show()
  997. out, err = capfd.readouterr()
  998. assert err == " 2%\n"
  999. def test_progress_percentage_quiet(capfd):
  1000. pi = ProgressIndicatorPercent(1000, step=5, start=0, msg="%3.0f%%")
  1001. pi.logger.setLevel("WARN")
  1002. pi.show(0)
  1003. out, err = capfd.readouterr()
  1004. assert err == ""
  1005. pi.show(1000)
  1006. out, err = capfd.readouterr()
  1007. assert err == ""
  1008. pi.finish()
  1009. out, err = capfd.readouterr()
  1010. assert err == ""
  1011. @pytest.mark.parametrize(
  1012. "fmt, items_map, expected_result",
  1013. [
  1014. ("{space:10}", {"space": " "}, " " * 10),
  1015. ("{foobar}", {"bar": "wrong", "foobar": "correct"}, "correct"),
  1016. ("{unknown_key}", {}, "{unknown_key}"),
  1017. ("{key}{{escaped_key}}", {}, "{key}{{escaped_key}}"),
  1018. ("{{escaped_key}}", {"escaped_key": 1234}, "{{escaped_key}}"),
  1019. ],
  1020. )
  1021. def test_partial_format(fmt, items_map, expected_result):
  1022. assert partial_format(fmt, items_map) == expected_result
  1023. def test_chunk_file_wrapper():
  1024. cfw = ChunkIteratorFileWrapper(iter([b"abc", b"def"]))
  1025. assert cfw.read(2) == b"ab"
  1026. assert cfw.read(50) == b"cdef"
  1027. assert cfw.exhausted
  1028. cfw = ChunkIteratorFileWrapper(iter([]))
  1029. assert cfw.read(2) == b""
  1030. assert cfw.exhausted
  1031. def test_chunkit():
  1032. it = chunkit("abcdefg", 3)
  1033. assert next(it) == ["a", "b", "c"]
  1034. assert next(it) == ["d", "e", "f"]
  1035. assert next(it) == ["g"]
  1036. with pytest.raises(StopIteration):
  1037. next(it)
  1038. with pytest.raises(StopIteration):
  1039. next(it)
  1040. it = chunkit("ab", 3)
  1041. assert list(it) == [["a", "b"]]
  1042. it = chunkit("", 3)
  1043. assert list(it) == []
  1044. def test_clean_lines():
  1045. conf = """\
  1046. #comment
  1047. data1 #data1
  1048. data2
  1049. data3
  1050. """.splitlines(
  1051. keepends=True
  1052. )
  1053. assert list(clean_lines(conf)) == ["data1 #data1", "data2", "data3"]
  1054. assert list(clean_lines(conf, lstrip=False)) == ["data1 #data1", "data2", " data3"]
  1055. assert list(clean_lines(conf, rstrip=False)) == ["data1 #data1\n", "data2\n", "data3\n"]
  1056. assert list(clean_lines(conf, remove_empty=False)) == ["data1 #data1", "data2", "", "data3"]
  1057. assert list(clean_lines(conf, remove_comments=False)) == ["#comment", "data1 #data1", "data2", "data3"]
  1058. def test_format_line():
  1059. data = dict(foo="bar baz")
  1060. assert format_line("", data) == ""
  1061. assert format_line("{foo}", data) == "bar baz"
  1062. assert format_line("foo{foo}foo", data) == "foobar bazfoo"
  1063. def test_format_line_erroneous():
  1064. data = dict()
  1065. with pytest.raises(PlaceholderError):
  1066. assert format_line("{invalid}", data)
  1067. with pytest.raises(PlaceholderError):
  1068. assert format_line("{}", data)
  1069. with pytest.raises(PlaceholderError):
  1070. assert format_line("{now!r}", data)
  1071. with pytest.raises(PlaceholderError):
  1072. assert format_line("{now.__class__.__module__.__builtins__}", data)
  1073. def test_replace_placeholders():
  1074. replace_placeholders.reset() # avoid overrides are spoiled by previous tests
  1075. now = datetime.now()
  1076. assert " " not in replace_placeholders("{now}")
  1077. assert int(replace_placeholders("{now:%Y}")) == now.year
  1078. def test_override_placeholders():
  1079. assert replace_placeholders("{uuid4}", overrides={"uuid4": "overridden"}) == "overridden"
  1080. def working_swidth():
  1081. return platform.swidth("선") == 2
  1082. @pytest.mark.skipif(not working_swidth(), reason="swidth() is not supported / active")
  1083. def test_swidth_slice():
  1084. string = "나윤선나윤선나윤선나윤선나윤선"
  1085. assert swidth_slice(string, 1) == ""
  1086. assert swidth_slice(string, -1) == ""
  1087. assert swidth_slice(string, 4) == "나윤"
  1088. assert swidth_slice(string, -4) == "윤선"
  1089. @pytest.mark.skipif(not working_swidth(), reason="swidth() is not supported / active")
  1090. def test_swidth_slice_mixed_characters():
  1091. string = "나윤a선나윤선나윤선나윤선나윤선"
  1092. assert swidth_slice(string, 5) == "나윤a"
  1093. assert swidth_slice(string, 6) == "나윤a"
  1094. def utcfromtimestamp(timestamp):
  1095. """Returns a naive datetime instance representing the timestamp in the UTC timezone"""
  1096. return datetime.fromtimestamp(timestamp, timezone.utc).replace(tzinfo=None)
  1097. def test_safe_timestamps():
  1098. if SUPPORT_32BIT_PLATFORMS:
  1099. # ns fit into int64
  1100. assert safe_ns(2**64) <= 2**63 - 1
  1101. assert safe_ns(-1) == 0
  1102. # s fit into int32
  1103. assert safe_s(2**64) <= 2**31 - 1
  1104. assert safe_s(-1) == 0
  1105. # datetime won't fall over its y10k problem
  1106. beyond_y10k = 2**100
  1107. with pytest.raises(OverflowError):
  1108. utcfromtimestamp(beyond_y10k)
  1109. assert utcfromtimestamp(safe_s(beyond_y10k)) > datetime(2038, 1, 1)
  1110. assert utcfromtimestamp(safe_ns(beyond_y10k) / 1000000000) > datetime(2038, 1, 1)
  1111. else:
  1112. # ns fit into int64
  1113. assert safe_ns(2**64) <= 2**63 - 1
  1114. assert safe_ns(-1) == 0
  1115. # s are so that their ns conversion fits into int64
  1116. assert safe_s(2**64) * 1000000000 <= 2**63 - 1
  1117. assert safe_s(-1) == 0
  1118. # datetime won't fall over its y10k problem
  1119. beyond_y10k = 2**100
  1120. with pytest.raises(OverflowError):
  1121. utcfromtimestamp(beyond_y10k)
  1122. assert utcfromtimestamp(safe_s(beyond_y10k)) > datetime(2262, 1, 1)
  1123. assert utcfromtimestamp(safe_ns(beyond_y10k) / 1000000000) > datetime(2262, 1, 1)
  1124. class TestPopenWithErrorHandling:
  1125. @pytest.mark.skipif(not shutil.which("test"), reason='"test" binary is needed')
  1126. def test_simple(self):
  1127. proc = popen_with_error_handling("test 1")
  1128. assert proc.wait() == 0
  1129. @pytest.mark.skipif(
  1130. shutil.which("borg-foobar-test-notexist"), reason='"borg-foobar-test-notexist" binary exists (somehow?)'
  1131. )
  1132. def test_not_found(self):
  1133. proc = popen_with_error_handling("borg-foobar-test-notexist 1234")
  1134. assert proc is None
  1135. @pytest.mark.parametrize("cmd", ('mismatched "quote', 'foo --bar="baz', ""))
  1136. def test_bad_syntax(self, cmd):
  1137. proc = popen_with_error_handling(cmd)
  1138. assert proc is None
  1139. def test_shell(self):
  1140. with pytest.raises(AssertionError):
  1141. popen_with_error_handling("", shell=True)
  1142. def test_dash_open():
  1143. assert dash_open("-", "r") is sys.stdin
  1144. assert dash_open("-", "w") is sys.stdout
  1145. assert dash_open("-", "rb") is sys.stdin.buffer
  1146. assert dash_open("-", "wb") is sys.stdout.buffer
  1147. def test_iter_separated():
  1148. # newline and utf-8
  1149. sep, items = "\n", ["foo", "bar/baz", "αáčő"]
  1150. fd = StringIO(sep.join(items))
  1151. assert list(iter_separated(fd)) == items
  1152. # null and bogus ending
  1153. sep, items = "\0", ["foo/bar", "baz", "spam"]
  1154. fd = StringIO(sep.join(items) + "\0")
  1155. assert list(iter_separated(fd, sep=sep)) == ["foo/bar", "baz", "spam"]
  1156. # multichar
  1157. sep, items = "SEP", ["foo/bar", "baz", "spam"]
  1158. fd = StringIO(sep.join(items))
  1159. assert list(iter_separated(fd, sep=sep)) == items
  1160. # bytes
  1161. sep, items = b"\n", [b"foo", b"blop\t", b"gr\xe4ezi"]
  1162. fd = BytesIO(sep.join(items))
  1163. assert list(iter_separated(fd)) == items
  1164. def test_eval_escapes():
  1165. assert eval_escapes("\\n\\0\\x23") == "\n\0#"
  1166. assert eval_escapes("äç\\n") == "äç\n"
  1167. @pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported")
  1168. def test_safe_unlink_is_safe(tmpdir):
  1169. contents = b"Hello, world\n"
  1170. victim = tmpdir / "victim"
  1171. victim.write_binary(contents)
  1172. hard_link = tmpdir / "hardlink"
  1173. os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32
  1174. safe_unlink(hard_link)
  1175. assert victim.read_binary() == contents
  1176. @pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported")
  1177. def test_safe_unlink_is_safe_ENOSPC(tmpdir, monkeypatch):
  1178. contents = b"Hello, world\n"
  1179. victim = tmpdir / "victim"
  1180. victim.write_binary(contents)
  1181. hard_link = tmpdir / "hardlink"
  1182. os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32
  1183. def os_unlink(_):
  1184. raise OSError(errno.ENOSPC, "Pretend that we ran out of space")
  1185. monkeypatch.setattr(os, "unlink", os_unlink)
  1186. with pytest.raises(OSError):
  1187. safe_unlink(hard_link)
  1188. assert victim.read_binary() == contents
  1189. class TestPassphrase:
  1190. def test_passphrase_new_verification(self, capsys, monkeypatch):
  1191. monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234aöäü")
  1192. monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "no")
  1193. Passphrase.new()
  1194. out, err = capsys.readouterr()
  1195. assert "1234" not in out
  1196. assert "1234" not in err
  1197. monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "yes")
  1198. passphrase = Passphrase.new()
  1199. out, err = capsys.readouterr()
  1200. assert "3132333461c3b6c3a4c3bc" not in out
  1201. assert "3132333461c3b6c3a4c3bc" in err
  1202. assert passphrase == "1234aöäü"
  1203. monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234/@=")
  1204. Passphrase.new()
  1205. out, err = capsys.readouterr()
  1206. assert "1234/@=" not in out
  1207. assert "1234/@=" in err
  1208. def test_passphrase_new_empty(self, capsys, monkeypatch):
  1209. monkeypatch.delenv("BORG_PASSPHRASE", False)
  1210. monkeypatch.setattr(getpass, "getpass", lambda prompt: "")
  1211. with pytest.raises(PasswordRetriesExceeded):
  1212. Passphrase.new(allow_empty=False)
  1213. out, err = capsys.readouterr()
  1214. assert "must not be blank" in err
  1215. def test_passphrase_new_retries(self, monkeypatch):
  1216. monkeypatch.delenv("BORG_PASSPHRASE", False)
  1217. ascending_numbers = iter(range(20))
  1218. monkeypatch.setattr(getpass, "getpass", lambda prompt: str(next(ascending_numbers)))
  1219. with pytest.raises(PasswordRetriesExceeded):
  1220. Passphrase.new()
  1221. def test_passphrase_repr(self):
  1222. assert "secret" not in repr(Passphrase("secret"))
  1223. def test_passphrase_wrong_debug(self, capsys, monkeypatch):
  1224. passphrase = "wrong_passphrase"
  1225. monkeypatch.setenv("BORG_DEBUG_PASSPHRASE", "YES")
  1226. monkeypatch.setenv("BORG_PASSPHRASE", "env_passphrase")
  1227. monkeypatch.setenv("BORG_PASSCOMMAND", "command")
  1228. monkeypatch.setenv("BORG_PASSPHRASE_FD", "fd_value")
  1229. Passphrase.display_debug_info(passphrase)
  1230. out, err = capsys.readouterr()
  1231. assert "Incorrect passphrase!" in err
  1232. assert passphrase in err
  1233. assert bin_to_hex(passphrase.encode("utf-8")) in err
  1234. assert 'BORG_PASSPHRASE = "env_passphrase"' in err
  1235. assert 'BORG_PASSCOMMAND = "command"' in err
  1236. assert 'BORG_PASSPHRASE_FD = "fd_value"' in err
  1237. monkeypatch.delenv("BORG_DEBUG_PASSPHRASE", raising=False)
  1238. Passphrase.display_debug_info(passphrase)
  1239. out, err = capsys.readouterr()
  1240. assert "Incorrect passphrase!" not in err
  1241. assert passphrase not in err
  1242. def test_verification(self, capsys, monkeypatch):
  1243. passphrase = "test_passphrase"
  1244. hex_value = passphrase.encode("utf-8").hex()
  1245. monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "no")
  1246. Passphrase.verification(passphrase)
  1247. out, err = capsys.readouterr()
  1248. assert passphrase not in err
  1249. monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "yes")
  1250. Passphrase.verification(passphrase)
  1251. out, err = capsys.readouterr()
  1252. assert passphrase in err
  1253. assert hex_value in err
  1254. @pytest.mark.parametrize(
  1255. "ec_range,ec_class",
  1256. (
  1257. # inclusive range start, exclusive range end
  1258. ((0, 1), "success"),
  1259. ((1, 2), "warning"),
  1260. ((2, 3), "error"),
  1261. ((EXIT_ERROR_BASE, EXIT_WARNING_BASE), "error"),
  1262. ((EXIT_WARNING_BASE, EXIT_SIGNAL_BASE), "warning"),
  1263. ((EXIT_SIGNAL_BASE, 256), "signal"),
  1264. ),
  1265. )
  1266. def test_classify_ec(ec_range, ec_class):
  1267. for ec in range(*ec_range):
  1268. classify_ec(ec) == ec_class
  1269. def test_ec_invalid():
  1270. with pytest.raises(ValueError):
  1271. classify_ec(666)
  1272. with pytest.raises(ValueError):
  1273. classify_ec(-1)
  1274. with pytest.raises(TypeError):
  1275. classify_ec(None)
  1276. @pytest.mark.parametrize(
  1277. "ec1,ec2,ec_max",
  1278. (
  1279. # same for modern / legacy
  1280. (EXIT_SUCCESS, EXIT_SUCCESS, EXIT_SUCCESS),
  1281. (EXIT_SUCCESS, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE),
  1282. # legacy exit codes
  1283. (EXIT_SUCCESS, EXIT_WARNING, EXIT_WARNING),
  1284. (EXIT_SUCCESS, EXIT_ERROR, EXIT_ERROR),
  1285. (EXIT_WARNING, EXIT_SUCCESS, EXIT_WARNING),
  1286. (EXIT_WARNING, EXIT_WARNING, EXIT_WARNING),
  1287. (EXIT_WARNING, EXIT_ERROR, EXIT_ERROR),
  1288. (EXIT_WARNING, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE),
  1289. (EXIT_ERROR, EXIT_SUCCESS, EXIT_ERROR),
  1290. (EXIT_ERROR, EXIT_WARNING, EXIT_ERROR),
  1291. (EXIT_ERROR, EXIT_ERROR, EXIT_ERROR),
  1292. (EXIT_ERROR, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE),
  1293. # some modern codes
  1294. (EXIT_SUCCESS, EXIT_WARNING_BASE, EXIT_WARNING_BASE),
  1295. (EXIT_SUCCESS, EXIT_ERROR_BASE, EXIT_ERROR_BASE),
  1296. (EXIT_WARNING_BASE, EXIT_SUCCESS, EXIT_WARNING_BASE),
  1297. (EXIT_WARNING_BASE + 1, EXIT_WARNING_BASE + 2, EXIT_WARNING_BASE + 1),
  1298. (EXIT_WARNING_BASE, EXIT_ERROR_BASE, EXIT_ERROR_BASE),
  1299. (EXIT_WARNING_BASE, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE),
  1300. (EXIT_ERROR_BASE, EXIT_SUCCESS, EXIT_ERROR_BASE),
  1301. (EXIT_ERROR_BASE, EXIT_WARNING_BASE, EXIT_ERROR_BASE),
  1302. (EXIT_ERROR_BASE + 1, EXIT_ERROR_BASE + 2, EXIT_ERROR_BASE + 1),
  1303. (EXIT_ERROR_BASE, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE),
  1304. ),
  1305. )
  1306. def test_max_ec(ec1, ec2, ec_max):
  1307. assert max_ec(ec1, ec2) == ec_max
  1308. def test_dir_is_tagged(tmpdir):
  1309. """Test dir_is_tagged with both path-based and file descriptor-based operations."""
  1310. @contextmanager
  1311. def open_dir(path):
  1312. fd = os.open(path, os.O_RDONLY)
  1313. try:
  1314. yield fd
  1315. finally:
  1316. os.close(fd)
  1317. # Create directories for testing exclude_caches
  1318. cache_dir = tmpdir.mkdir("cache_dir")
  1319. cache_tag_path = cache_dir.join(CACHE_TAG_NAME)
  1320. cache_tag_path.write_binary(CACHE_TAG_CONTENTS)
  1321. invalid_cache_dir = tmpdir.mkdir("invalid_cache_dir")
  1322. invalid_cache_tag_path = invalid_cache_dir.join(CACHE_TAG_NAME)
  1323. invalid_cache_tag_path.write_binary(b"invalid signature")
  1324. # Create directories for testing exclude_if_present
  1325. tagged_dir = tmpdir.mkdir("tagged_dir")
  1326. tag_file = tagged_dir.join(".NOBACKUP")
  1327. tag_file.write("test")
  1328. other_tagged_dir = tmpdir.mkdir("other_tagged_dir")
  1329. other_tag_file = other_tagged_dir.join(".DONOTBACKUP")
  1330. other_tag_file.write("test")
  1331. # Create a directory with both a CACHEDIR.TAG and a custom tag file
  1332. both_dir = tmpdir.mkdir("both_dir")
  1333. cache_tag_path = both_dir.join(CACHE_TAG_NAME)
  1334. cache_tag_path.write_binary(CACHE_TAG_CONTENTS)
  1335. custom_tag_path = both_dir.join(".NOBACKUP")
  1336. custom_tag_path.write("test")
  1337. # Create a directory without any tag files
  1338. normal_dir = tmpdir.mkdir("normal_dir")
  1339. # Test edge cases
  1340. test_dir = tmpdir.mkdir("test_dir")
  1341. assert dir_is_tagged(path=str(test_dir), exclude_caches=None, exclude_if_present=None) == []
  1342. assert dir_is_tagged(path=str(test_dir), exclude_if_present=[]) == []
  1343. # Test with non-existent directory (should not raise an exception)
  1344. non_existent_dir = str(tmpdir.join("non_existent"))
  1345. result = dir_is_tagged(path=non_existent_dir, exclude_caches=True, exclude_if_present=[".NOBACKUP"])
  1346. assert result == []
  1347. # Test 1: exclude_caches with path-based operations
  1348. assert dir_is_tagged(path=str(cache_dir), exclude_caches=True) == [CACHE_TAG_NAME]
  1349. assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=True) == []
  1350. assert dir_is_tagged(path=str(normal_dir), exclude_caches=True) == []
  1351. assert dir_is_tagged(path=str(cache_dir), exclude_caches=False) == []
  1352. assert dir_is_tagged(path=str(invalid_cache_dir), exclude_caches=False) == []
  1353. assert dir_is_tagged(path=str(normal_dir), exclude_caches=False) == []
  1354. # Test 2: exclude_caches with file-descriptor-based operations
  1355. with open_dir(str(cache_dir)) as fd:
  1356. assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == [CACHE_TAG_NAME]
  1357. with open_dir(str(invalid_cache_dir)) as fd:
  1358. assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == []
  1359. with open_dir(str(normal_dir)) as fd:
  1360. assert dir_is_tagged(dir_fd=fd, exclude_caches=True) == []
  1361. with open_dir(str(cache_dir)) as fd:
  1362. assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
  1363. with open_dir(str(invalid_cache_dir)) as fd:
  1364. assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
  1365. with open_dir(str(normal_dir)) as fd:
  1366. assert dir_is_tagged(dir_fd=fd, exclude_caches=False) == []
  1367. # Test 3: exclude_if_present with path-based operations
  1368. tags = [".NOBACKUP"]
  1369. assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"]
  1370. assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == []
  1371. assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == []
  1372. tags = [".NOBACKUP", ".DONOTBACKUP"]
  1373. assert dir_is_tagged(path=str(tagged_dir), exclude_if_present=tags) == [".NOBACKUP"]
  1374. assert dir_is_tagged(path=str(other_tagged_dir), exclude_if_present=tags) == [".DONOTBACKUP"]
  1375. assert dir_is_tagged(path=str(normal_dir), exclude_if_present=tags) == []
  1376. # Test 4: exclude_if_present with file descriptor-based operations
  1377. tags = [".NOBACKUP"]
  1378. with open_dir(str(tagged_dir)) as fd:
  1379. assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"]
  1380. with open_dir(str(other_tagged_dir)) as fd:
  1381. assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
  1382. with open_dir(str(normal_dir)) as fd:
  1383. assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
  1384. tags = [".NOBACKUP", ".DONOTBACKUP"]
  1385. with open_dir(str(tagged_dir)) as fd:
  1386. assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".NOBACKUP"]
  1387. with open_dir(str(other_tagged_dir)) as fd:
  1388. assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == [".DONOTBACKUP"]
  1389. with open_dir(str(normal_dir)) as fd:
  1390. assert dir_is_tagged(dir_fd=fd, exclude_if_present=tags) == []
  1391. # Test 5: both exclude types with path-based operations
  1392. assert sorted(dir_is_tagged(path=str(both_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"])) == [
  1393. ".NOBACKUP",
  1394. CACHE_TAG_NAME,
  1395. ]
  1396. assert dir_is_tagged(path=str(cache_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME]
  1397. assert dir_is_tagged(path=str(tagged_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"]
  1398. assert dir_is_tagged(path=str(normal_dir), exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == []
  1399. # Test 6: both exclude types with file descriptor-based operations
  1400. with open_dir(str(both_dir)) as fd:
  1401. result = dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"])
  1402. assert sorted(result) == [".NOBACKUP", CACHE_TAG_NAME]
  1403. with open_dir(str(cache_dir)) as fd:
  1404. assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [CACHE_TAG_NAME]
  1405. with open_dir(str(tagged_dir)) as fd:
  1406. assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == [".NOBACKUP"]
  1407. with open_dir(str(normal_dir)) as fd:
  1408. assert dir_is_tagged(dir_fd=fd, exclude_caches=True, exclude_if_present=[".NOBACKUP"]) == []