| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061506250635064506550665067506850695070507150725073507450755076507750785079508050815082508350845085508650875088508950905091509250935094509550965097509850995100510151025103510451055106510751085109511051115112511351145115511651175118511951205121512251235124512551265127512851295130513151325133513451355136513751385139514051415142514351445145514651475148514951505151515251535154515551565157515851595160516151625163516451655166516751685169517051715172517351745175517651775178517951805181518251835184518551865187518851895190519151925193519451955196519751985199520052015202520352045205520652075208520952105211521252135214521552165217521852195220522152225223522452255226522752285229523052315232523352345235523652375238523952405241524252435244524552465247524852495250525152525253525452555256525752585259526052615262526352645265526652675268526952705271527252735274527552765277527852795280528152825283528452855286528752885289529052915292529352945295529652975298529953005301530253035304530553065307 |
- import argparse
- import binascii
- import errno
- import io
- import json
- import logging
- import os
- import pstats
- import random
- import re
- import shutil
- import socket
- import stat
- import subprocess
- import sys
- import tempfile
- import time
- import unittest
- from configparser import ConfigParser
- from datetime import datetime
- from datetime import timezone
- from datetime import timedelta
- from hashlib import sha256
- from io import BytesIO, StringIO
- from unittest.mock import patch
- from pathlib import Path
- import pytest
- import borg
- import borg.helpers.errors
- from .. import xattr, helpers, platform
- from ..archive import Archive, ChunkBuffer
- from ..archiver import Archiver, parse_storage_quota, PURE_PYTHON_MSGPACK_WARNING
- from ..cache import Cache, LocalCache
- from ..chunker import has_seek_hole
- from ..constants import * # NOQA
- from ..crypto.low_level import bytes_to_long, num_cipher_blocks
- from ..crypto.key import KeyfileKeyBase, RepoKey, KeyfileKey, Passphrase, TAMRequiredError, ArchiveTAMRequiredError
- from ..crypto.keymanager import RepoIdMismatch, NotABorgKeyFile
- from ..crypto.file_integrity import FileIntegrityError
- from ..hashindex import ChunkIndex
- from ..helpers import Location, get_security_dir
- from ..helpers import Manifest, MandatoryFeatureUnsupported, ArchiveInfo
- from ..helpers import init_ec_warnings
- from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, Error, CancelledByUser, RTError, CommandError
- from ..helpers import bin_to_hex, hex_to_bin
- from ..helpers import MAX_S
- from ..helpers import msgpack
- from ..helpers import flags_noatime, flags_normal
- from ..helpers import utcnow
- from ..nanorst import RstToTextLazy, rst_to_terminal
- from ..patterns import IECommand, PatternMatcher, parse_pattern
- from ..item import Item, ItemDiff, chunks_contents_equal
- from ..locking import LockFailed
- from ..logger import setup_logging
- from ..remote import RemoteRepository, PathNotAllowed
- from ..repository import Repository
- from . import has_lchflags, llfuse
- from . import BaseTestCase, changedir, environment_variable, no_selinux, same_ts_ns
- from . import are_symlinks_supported, are_hardlinks_supported, are_fifos_supported, is_utime_fully_supported, is_birthtime_fully_supported
- from .platform import fakeroot_detected, is_darwin, is_win32
- from .upgrader import make_attic_repo
- from . import key
- src_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
- def exec_cmd(*args, archiver=None, fork=False, exe=None, input=b'', binary_output=False, **kw):
- if fork:
- try:
- if exe is None:
- borg = (sys.executable, '-m', 'borg.archiver')
- elif isinstance(exe, str):
- borg = (exe, )
- elif not isinstance(exe, tuple):
- raise ValueError('exe must be None, a tuple or a str')
- output = subprocess.check_output(borg + args, stderr=subprocess.STDOUT, input=input)
- ret = 0
- except subprocess.CalledProcessError as e:
- output = e.output
- ret = e.returncode
- except SystemExit as e: # possibly raised by argparse
- output = ''
- ret = e.code
- if binary_output:
- return ret, output
- else:
- return ret, os.fsdecode(output)
- else:
- stdin, stdout, stderr = sys.stdin, sys.stdout, sys.stderr
- try:
- sys.stdin = StringIO(input.decode())
- sys.stdin.buffer = BytesIO(input)
- output = BytesIO()
- # Always use utf-8 here, to simply .decode() below
- output_text = sys.stdout = sys.stderr = io.TextIOWrapper(output, encoding='utf-8')
- if archiver is None:
- archiver = Archiver()
- archiver.prerun_checks = lambda *args: None
- init_ec_warnings()
- try:
- args = archiver.parse_args(list(args))
- # argparse parsing may raise SystemExit when the command line is bad or
- # Actions that abort early (e.g., --help) were given. Catch this and return
- # the error code as-if we invoked a Borg binary.
- except SystemExit as e:
- output_text.flush()
- return e.code, output.getvalue() if binary_output else output.getvalue().decode()
- ret = archiver.run(args)
- output_text.flush()
- return ret, output.getvalue() if binary_output else output.getvalue().decode()
- finally:
- sys.stdin, sys.stdout, sys.stderr = stdin, stdout, stderr
- def have_gnutar():
- if not shutil.which('tar'):
- return False
- popen = subprocess.Popen(['tar', '--version'], stdout=subprocess.PIPE)
- stdout, stderr = popen.communicate()
- return b'GNU tar' in stdout
- # check if the binary "borg.exe" is available (for local testing a symlink to virtualenv/bin/borg should do)
- try:
- exec_cmd('help', exe='borg.exe', fork=True)
- BORG_EXES = ['python', 'binary', ]
- except FileNotFoundError:
- BORG_EXES = ['python', ]
- @pytest.fixture(params=BORG_EXES)
- def cmd(request):
- if request.param == 'python':
- exe = None
- elif request.param == 'binary':
- exe = 'borg.exe'
- else:
- raise ValueError("param must be 'python' or 'binary'")
- def exec_fn(*args, **kw):
- return exec_cmd(*args, exe=exe, fork=True, **kw)
- return exec_fn
- def test_return_codes(cmd, tmpdir):
- repo = tmpdir.mkdir('repo')
- input = tmpdir.mkdir('input')
- output = tmpdir.mkdir('output')
- input.join('test_file').write('content')
- rc, out = cmd('init', '--encryption=none', '%s' % str(repo))
- assert rc == EXIT_SUCCESS
- rc, out = cmd('create', '%s::archive' % repo, str(input))
- assert rc == EXIT_SUCCESS
- with changedir(str(output)):
- rc, out = cmd('extract', '%s::archive' % repo)
- assert rc == EXIT_SUCCESS
- rc, out = cmd('extract', '%s::archive' % repo, 'does/not/match')
- assert rc == EXIT_WARNING # pattern did not match
- rc, out = cmd('create', '%s::archive' % repo, str(input))
- assert rc == EXIT_ERROR # duplicate archive name
- """
- test_disk_full is very slow and not recommended to be included in daily testing.
- for this test, an empty, writable 16MB filesystem mounted on DF_MOUNT is required.
- for speed and other reasons, it is recommended that the underlying block device is
- in RAM, not a magnetic or flash disk.
- assuming /tmp is a tmpfs (in memory filesystem), one can use this:
- dd if=/dev/zero of=/tmp/borg-disk bs=16M count=1
- mkfs.ext4 /tmp/borg-disk
- mkdir /tmp/borg-mount
- sudo mount /tmp/borg-disk /tmp/borg-mount
- if the directory does not exist, the test will be skipped.
- """
- DF_MOUNT = '/tmp/borg-mount'
- @pytest.mark.skipif(not os.path.exists(DF_MOUNT), reason="needs a 16MB fs mounted on %s" % DF_MOUNT)
- def test_disk_full(cmd):
- def make_files(dir, count, size, rnd=True):
- shutil.rmtree(dir, ignore_errors=True)
- os.mkdir(dir)
- if rnd:
- count = random.randint(1, count)
- if size > 1:
- size = random.randint(1, size)
- for i in range(count):
- fn = os.path.join(dir, "file%03d" % i)
- with open(fn, 'wb') as f:
- data = os.urandom(size)
- f.write(data)
- with environment_variable(BORG_CHECK_I_KNOW_WHAT_I_AM_DOING='YES'):
- mount = DF_MOUNT
- assert os.path.exists(mount)
- repo = os.path.join(mount, 'repo')
- input = os.path.join(mount, 'input')
- reserve = os.path.join(mount, 'reserve')
- for j in range(100):
- shutil.rmtree(repo, ignore_errors=True)
- shutil.rmtree(input, ignore_errors=True)
- # keep some space and some inodes in reserve that we can free up later:
- make_files(reserve, 80, 100000, rnd=False)
- rc, out = cmd('init', repo)
- if rc != EXIT_SUCCESS:
- print('init', rc, out)
- assert rc == EXIT_SUCCESS
- try:
- success, i = True, 0
- while success:
- i += 1
- try:
- make_files(input, 20, 200000)
- except OSError as err:
- if err.errno == errno.ENOSPC:
- # already out of space
- break
- raise
- try:
- rc, out = cmd('create', '%s::test%03d' % (repo, i), input)
- success = rc == EXIT_SUCCESS
- if not success:
- print('create', rc, out)
- finally:
- # make sure repo is not locked
- shutil.rmtree(os.path.join(repo, 'lock.exclusive'), ignore_errors=True)
- os.remove(os.path.join(repo, 'lock.roster'))
- finally:
- # now some error happened, likely we are out of disk space.
- # free some space so we can expect borg to be able to work normally:
- shutil.rmtree(reserve, ignore_errors=True)
- rc, out = cmd('list', repo)
- if rc != EXIT_SUCCESS:
- print('list', rc, out)
- rc, out = cmd('check', '--repair', repo)
- if rc != EXIT_SUCCESS:
- print('check', rc, out)
- assert rc == EXIT_SUCCESS
- class ArchiverTestCaseBase(BaseTestCase):
- EXE = None # python source based
- FORK_DEFAULT = False
- prefix = ''
- def setUp(self):
- os.environ['BORG_CHECK_I_KNOW_WHAT_I_AM_DOING'] = 'YES'
- os.environ['BORG_DELETE_I_KNOW_WHAT_I_AM_DOING'] = 'YES'
- os.environ['BORG_PASSPHRASE'] = 'waytooeasyonlyfortests'
- os.environ['BORG_SELFTEST'] = 'disabled'
- self.archiver = not self.FORK_DEFAULT and Archiver() or None
- self.tmpdir = tempfile.mkdtemp()
- self.repository_path = os.path.join(self.tmpdir, 'repository')
- self.repository_location = self.prefix + self.repository_path
- self.input_path = os.path.join(self.tmpdir, 'input')
- self.output_path = os.path.join(self.tmpdir, 'output')
- self.keys_path = os.path.join(self.tmpdir, 'keys')
- self.cache_path = os.path.join(self.tmpdir, 'cache')
- self.exclude_file_path = os.path.join(self.tmpdir, 'excludes')
- self.patterns_file_path = os.path.join(self.tmpdir, 'patterns')
- os.environ['BORG_KEYS_DIR'] = self.keys_path
- os.environ['BORG_CACHE_DIR'] = self.cache_path
- os.mkdir(self.input_path)
- os.chmod(self.input_path, 0o777) # avoid troubles with fakeroot / FUSE
- os.mkdir(self.output_path)
- os.mkdir(self.keys_path)
- os.mkdir(self.cache_path)
- with open(self.exclude_file_path, 'wb') as fd:
- fd.write(b'input/file2\n# A comment line, then a blank line\n\n')
- with open(self.patterns_file_path, 'wb') as fd:
- fd.write(b'+input/file_important\n- input/file*\n# A comment line, then a blank line\n\n')
- self._old_wd = os.getcwd()
- os.chdir(self.tmpdir)
- def tearDown(self):
- os.chdir(self._old_wd)
- # note: ignore_errors=True as workaround for issue #862
- shutil.rmtree(self.tmpdir, ignore_errors=True)
- setup_logging()
- def cmd(self, *args, **kw):
- exit_code = kw.pop('exit_code', 0)
- fork = kw.pop('fork', None)
- binary_output = kw.get('binary_output', False)
- if fork is None:
- fork = self.FORK_DEFAULT
- ret, output = exec_cmd(*args, fork=fork, exe=self.EXE, archiver=self.archiver, **kw)
- if ret != exit_code:
- print(output)
- self.assert_equal(ret, exit_code)
- # if tests are run with the pure-python msgpack, there will be warnings about
- # this in the output, which would make a lot of tests fail.
- pp_msg = PURE_PYTHON_MSGPACK_WARNING.encode() if binary_output else PURE_PYTHON_MSGPACK_WARNING
- empty = b'' if binary_output else ''
- output = empty.join(line for line in output.splitlines(keepends=True)
- if pp_msg not in line)
- return output
- def create_src_archive(self, name):
- self.cmd('create', '--compression=lz4', self.repository_location + '::' + name, src_dir)
- def open_archive(self, name):
- repository = Repository(self.repository_path, exclusive=True)
- with repository:
- manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- archive = Archive(repository, key, manifest, name)
- return archive, repository
- def open_repository(self):
- return Repository(self.repository_path, exclusive=True)
- def create_regular_file(self, name, size=0, contents=None):
- assert not (size != 0 and contents and len(contents) != size), 'size and contents do not match'
- filename = os.path.join(self.input_path, name)
- if not os.path.exists(os.path.dirname(filename)):
- os.makedirs(os.path.dirname(filename))
- with open(filename, 'wb') as fd:
- if contents is None:
- contents = b'X' * size
- fd.write(contents)
- def create_test_files(self, create_hardlinks=True):
- """Create a minimal test case including all supported file types
- Args:
- create_hardlinks: whether to create a sample hardlink. When set to
- False, the hardlink file will not be created at all.
- """
- # File
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('flagfile', size=1024)
- # Directory
- self.create_regular_file('dir2/file2', size=1024 * 80)
- # File mode
- os.chmod('input/file1', 0o4755)
- # Hard link
- if create_hardlinks and are_hardlinks_supported():
- os.link(os.path.join(self.input_path, 'file1'),
- os.path.join(self.input_path, 'hardlink'))
- # Symlink
- if are_symlinks_supported():
- os.symlink('somewhere', os.path.join(self.input_path, 'link1'))
- self.create_regular_file('fusexattr', size=1)
- if not xattr.XATTR_FAKEROOT and xattr.is_enabled(self.input_path):
- fn = os.fsencode(os.path.join(self.input_path, 'fusexattr'))
- # ironically, due to the way how fakeroot works, comparing FUSE file xattrs to orig file xattrs
- # will FAIL if fakeroot supports xattrs, thus we only set the xattr if XATTR_FAKEROOT is False.
- # This is because fakeroot with xattr-support does not propagate xattrs of the underlying file
- # into "fakeroot space". Because the xattrs exposed by borgfs are these of an underlying file
- # (from fakeroots point of view) they are invisible to the test process inside the fakeroot.
- xattr.setxattr(fn, b'user.foo', b'bar')
- xattr.setxattr(fn, b'user.empty', b'')
- # XXX this always fails for me
- # ubuntu 14.04, on a TMP dir filesystem with user_xattr, using fakeroot
- # same for newer ubuntu and centos.
- # if this is supported just on specific platform, platform should be checked first,
- # so that the test setup for all tests using it does not fail here always for others.
- # xattr.setxattr(os.path.join(self.input_path, 'link1'), b'user.foo_symlink', b'bar_symlink', follow_symlinks=False)
- # FIFO node
- if are_fifos_supported():
- os.mkfifo(os.path.join(self.input_path, 'fifo1'))
- if has_lchflags:
- platform.set_flags(os.path.join(self.input_path, 'flagfile'), stat.UF_NODUMP)
- try:
- # Block device
- os.mknod('input/bdev', 0o600 | stat.S_IFBLK, os.makedev(10, 20))
- # Char device
- os.mknod('input/cdev', 0o600 | stat.S_IFCHR, os.makedev(30, 40))
- # File mode
- os.chmod('input/dir2', 0o555) # if we take away write perms, we need root to remove contents
- # File owner
- os.chown('input/file1', 100, 200) # raises OSError invalid argument on cygwin
- have_root = True # we have (fake)root
- except PermissionError:
- have_root = False
- except OSError as e:
- # Note: ENOSYS "Function not implemented" happens as non-root on Win 10 Linux Subsystem.
- if e.errno not in (errno.EINVAL, errno.ENOSYS):
- raise
- have_root = False
- time.sleep(1) # "empty" must have newer timestamp than other files
- self.create_regular_file('empty', size=0)
- return have_root
- class ArchiverTestCase(ArchiverTestCaseBase):
- requires_hardlinks = pytest.mark.skipif(not are_hardlinks_supported(), reason='hardlinks not supported')
- def get_security_dir(self):
- repository_id = bin_to_hex(self._extract_repository_id(self.repository_path))
- return get_security_dir(repository_id)
- def test_basic_functionality(self):
- have_root = self.create_test_files()
- # fork required to test show-rc output
- output = self.cmd('init', '--encryption=repokey', '--show-version', '--show-rc', self.repository_location, fork=True)
- self.assert_in('borgbackup version', output)
- self.assert_in('terminating with success status, rc 0', output)
- self.cmd('create', '--exclude-nodump', self.repository_location + '::test', 'input')
- output = self.cmd('create', '--exclude-nodump', '--stats', self.repository_location + '::test.2', 'input')
- self.assert_in('Archive name: test.2', output)
- self.assert_in('This archive: ', output)
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- list_output = self.cmd('list', '--short', self.repository_location)
- self.assert_in('test', list_output)
- self.assert_in('test.2', list_output)
- expected = [
- 'input',
- 'input/bdev',
- 'input/cdev',
- 'input/dir2',
- 'input/dir2/file2',
- 'input/empty',
- 'input/file1',
- 'input/flagfile',
- ]
- if are_fifos_supported():
- expected.append('input/fifo1')
- if are_symlinks_supported():
- expected.append('input/link1')
- if are_hardlinks_supported():
- expected.append('input/hardlink')
- if not have_root:
- # we could not create these device files without (fake)root
- expected.remove('input/bdev')
- expected.remove('input/cdev')
- if has_lchflags:
- # remove the file we did not backup, so input and output become equal
- expected.remove('input/flagfile') # this file is UF_NODUMP
- os.remove(os.path.join('input', 'flagfile'))
- list_output = self.cmd('list', '--short', self.repository_location + '::test')
- for name in expected:
- self.assert_in(name, list_output)
- self.assert_dirs_equal('input', 'output/input')
- info_output = self.cmd('info', self.repository_location + '::test')
- item_count = 4 if has_lchflags else 5 # one file is UF_NODUMP
- self.assert_in('Number of files: %d' % item_count, info_output)
- shutil.rmtree(self.cache_path)
- info_output2 = self.cmd('info', self.repository_location + '::test')
- def filter(output):
- # filter for interesting "info" output, ignore cache rebuilding related stuff
- prefixes = ['Name:', 'Fingerprint:', 'Number of files:', 'This archive:',
- 'All archives:', 'Chunk index:', ]
- result = []
- for line in output.splitlines():
- for prefix in prefixes:
- if line.startswith(prefix):
- result.append(line)
- return '\n'.join(result)
- # the interesting parts of info_output2 and info_output should be same
- self.assert_equal(filter(info_output), filter(info_output2))
- @requires_hardlinks
- def test_create_duplicate_root(self):
- # setup for #5603
- path_a = os.path.join(self.input_path, 'a')
- path_b = os.path.join(self.input_path, 'b')
- os.mkdir(path_a)
- os.mkdir(path_b)
- hl_a = os.path.join(path_a, 'hardlink')
- hl_b = os.path.join(path_b, 'hardlink')
- self.create_regular_file(hl_a, contents=b'123456')
- os.link(hl_a, hl_b)
- self.cmd('init', '--encryption=none', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input', 'input') # give input twice!
- # test if created archive has 'input' contents twice:
- archive_list = self.cmd('list', '--json-lines', self.repository_location + '::test')
- paths = [json.loads(line)['path'] for line in archive_list.split('\n') if line]
- # we have all fs items exactly once!
- assert sorted(paths) == ['input', 'input/a', 'input/a/hardlink', 'input/b', 'input/b/hardlink']
- def test_init_parent_dirs(self):
- parent_path = os.path.join(self.tmpdir, 'parent1', 'parent2')
- repository_path = os.path.join(parent_path, 'repository')
- repository_location = self.prefix + repository_path
- with pytest.raises(Repository.ParentPathDoesNotExist):
- # normal borg init does NOT create missing parent dirs
- self.cmd('init', '--encryption=none', repository_location)
- # but if told so, it does:
- self.cmd('init', '--encryption=none', '--make-parent-dirs', repository_location)
- assert os.path.exists(parent_path)
- def test_create_unreadable_parent(self):
- parent_dir = os.path.join(self.input_path, 'parent')
- root_dir = os.path.join(self.input_path, 'parent', 'root')
- os.mkdir(parent_dir)
- os.mkdir(root_dir)
- os.chmod(parent_dir, 0o111) # --x--x--x == parent dir traversable, but not readable
- self.cmd('init', '--encryption=none', self.repository_location)
- # issue #7746: we *can* read root_dir and we *can* traverse parent_dir, so this should work:
- self.cmd('create', self.repository_location + '::test', root_dir)
- def test_unix_socket(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- try:
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- sock.bind(os.path.join(self.input_path, 'unix-socket'))
- except PermissionError as err:
- if err.errno == errno.EPERM:
- pytest.skip('unix sockets disabled or not supported')
- elif err.errno == errno.EACCES:
- pytest.skip('permission denied to create unix sockets')
- self.cmd('create', self.repository_location + '::test', 'input')
- sock.close()
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- assert not os.path.exists('input/unix-socket')
- @pytest.mark.skipif(not are_symlinks_supported(), reason='symlinks not supported')
- def test_symlink_extract(self):
- self.create_test_files()
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- assert os.readlink('input/link1') == 'somewhere'
- @pytest.mark.skipif(not is_utime_fully_supported(), reason='cannot properly setup and execute test without utime')
- def test_directory_timestamps1(self):
- self.create_test_files()
- self.cmd('init', '--encryption=repokey', self.repository_location)
- # default file archiving order (internal recursion)
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- # extracting a file inside a directory touches the directory mtime
- assert os.path.exists('output/input/dir2/file2')
- # make sure borg fixes the directory mtime after touching it
- sti = os.stat('input/dir2')
- sto = os.stat('output/input/dir2')
- assert same_ts_ns(sti.st_mtime_ns, sto.st_mtime_ns)
- @pytest.mark.skipif(not is_utime_fully_supported(),
- reason='cannot properly setup and execute test without utime')
- def test_directory_timestamps2(self):
- self.create_test_files()
- self.cmd('init', '--encryption=repokey', self.repository_location)
- # given order, dir first, file second
- flist_dir_first = b"input/dir2\ninput/dir2/file2\n"
- self.cmd('create', '--paths-from-stdin', self.repository_location + '::test',
- input=flist_dir_first)
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- # extracting a file inside a directory touches the directory mtime
- assert os.path.exists('output/input/dir2/file2')
- # make sure borg fixes the directory mtime after touching it
- sti = os.stat('input/dir2')
- sto = os.stat('output/input/dir2')
- assert same_ts_ns(sti.st_mtime_ns, sto.st_mtime_ns)
- @pytest.mark.skipif(not is_utime_fully_supported(),
- reason='cannot properly setup and execute test without utime')
- def test_directory_timestamps3(self):
- self.create_test_files()
- self.cmd('init', '--encryption=repokey', self.repository_location)
- # given order, file first, dir second
- flist_file_first = b"input/dir2/file2\ninput/dir2\n"
- self.cmd('create', '--paths-from-stdin', self.repository_location + '::test',
- input=flist_file_first)
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- # extracting a file inside a directory touches the directory mtime
- assert os.path.exists('output/input/dir2/file2')
- # make sure borg fixes the directory mtime after touching it
- sti = os.stat('input/dir2')
- sto = os.stat('output/input/dir2')
- assert same_ts_ns(sti.st_mtime_ns, sto.st_mtime_ns)
- @pytest.mark.skipif(not is_utime_fully_supported(), reason='cannot properly setup and execute test without utime')
- def test_atime(self):
- def has_noatime(some_file):
- atime_before = os.stat(some_file).st_atime_ns
- try:
- with open(os.open(some_file, flags_noatime)) as file:
- file.read()
- except PermissionError:
- return False
- else:
- atime_after = os.stat(some_file).st_atime_ns
- noatime_used = flags_noatime != flags_normal
- return noatime_used and same_ts_ns(atime_before, atime_after)
- self.create_test_files()
- atime, mtime = 123456780, 234567890
- have_noatime = has_noatime('input/file1')
- os.utime('input/file1', (atime, mtime))
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', '--atime', self.repository_location + '::test', 'input')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- sti = os.stat('input/file1')
- sto = os.stat('output/input/file1')
- assert same_ts_ns(sti.st_mtime_ns, sto.st_mtime_ns)
- assert same_ts_ns(sto.st_mtime_ns, mtime * 1e9)
- if have_noatime:
- assert same_ts_ns(sti.st_atime_ns, sto.st_atime_ns)
- assert same_ts_ns(sto.st_atime_ns, atime * 1e9)
- else:
- # it touched the input file's atime while backing it up
- assert same_ts_ns(sto.st_atime_ns, atime * 1e9)
- @pytest.mark.skipif(not is_utime_fully_supported(), reason='cannot properly setup and execute test without utime')
- @pytest.mark.skipif(not is_birthtime_fully_supported(), reason='cannot properly setup and execute test without birthtime')
- def test_birthtime(self):
- self.create_test_files()
- birthtime, mtime, atime = 946598400, 946684800, 946771200
- os.utime('input/file1', (atime, birthtime))
- os.utime('input/file1', (atime, mtime))
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- sti = os.stat('input/file1')
- sto = os.stat('output/input/file1')
- assert same_ts_ns(sti.st_birthtime * 1e9, sto.st_birthtime * 1e9)
- assert same_ts_ns(sto.st_birthtime * 1e9, birthtime * 1e9)
- assert same_ts_ns(sti.st_mtime_ns, sto.st_mtime_ns)
- assert same_ts_ns(sto.st_mtime_ns, mtime * 1e9)
- @pytest.mark.skipif(not is_utime_fully_supported(), reason='cannot properly setup and execute test without utime')
- @pytest.mark.skipif(not is_birthtime_fully_supported(), reason='cannot properly setup and execute test without birthtime')
- def test_nobirthtime(self):
- self.create_test_files()
- birthtime, mtime, atime = 946598400, 946684800, 946771200
- os.utime('input/file1', (atime, birthtime))
- os.utime('input/file1', (atime, mtime))
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', '--nobirthtime', self.repository_location + '::test', 'input')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- sti = os.stat('input/file1')
- sto = os.stat('output/input/file1')
- assert same_ts_ns(sti.st_birthtime * 1e9, birthtime * 1e9)
- assert same_ts_ns(sto.st_birthtime * 1e9, mtime * 1e9)
- assert same_ts_ns(sti.st_mtime_ns, sto.st_mtime_ns)
- assert same_ts_ns(sto.st_mtime_ns, mtime * 1e9)
- def _extract_repository_id(self, path):
- with Repository(self.repository_path) as repository:
- return repository.id
- def _set_repository_id(self, path, id):
- config = ConfigParser(interpolation=None)
- config.read(os.path.join(path, 'config'))
- config.set('repository', 'id', bin_to_hex(id))
- with open(os.path.join(path, 'config'), 'w') as fd:
- config.write(fd)
- with Repository(self.repository_path) as repository:
- return repository.id
- def test_sparse_file(self):
- def is_sparse(fn, total_size, hole_size):
- st = os.stat(fn)
- assert st.st_size == total_size
- sparse = True
- if sparse and hasattr(st, 'st_blocks') and st.st_blocks * 512 >= st.st_size:
- sparse = False
- if sparse and has_seek_hole:
- with open(fn, 'rb') as fd:
- # only check if the first hole is as expected, because the 2nd hole check
- # is problematic on xfs due to its "dynamic speculative EOF preallocation
- try:
- if fd.seek(0, os.SEEK_HOLE) != 0:
- sparse = False
- if fd.seek(0, os.SEEK_DATA) != hole_size:
- sparse = False
- except OSError:
- # OS/FS does not really support SEEK_HOLE/SEEK_DATA
- sparse = False
- return sparse
- filename_in = os.path.join(self.input_path, 'sparse')
- content = b'foobar'
- hole_size = 5 * (1 << CHUNK_MAX_EXP) # 5 full chunker buffers
- total_size = hole_size + len(content) + hole_size
- with open(filename_in, 'wb') as fd:
- # create a file that has a hole at the beginning and end (if the
- # OS and filesystem supports sparse files)
- fd.seek(hole_size, 1)
- fd.write(content)
- fd.seek(hole_size, 1)
- pos = fd.tell()
- fd.truncate(pos)
- # we first check if we could create a sparse input file:
- sparse_support = is_sparse(filename_in, total_size, hole_size)
- if sparse_support:
- # we could create a sparse input file, so creating a backup of it and
- # extracting it again (as sparse) should also work:
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir(self.output_path):
- self.cmd('extract', '--sparse', self.repository_location + '::test')
- self.assert_dirs_equal('input', 'output/input')
- filename_out = os.path.join(self.output_path, 'input', 'sparse')
- with open(filename_out, 'rb') as fd:
- # check if file contents are as expected
- self.assert_equal(fd.read(hole_size), b'\0' * hole_size)
- self.assert_equal(fd.read(len(content)), content)
- self.assert_equal(fd.read(hole_size), b'\0' * hole_size)
- assert is_sparse(filename_out, total_size, hole_size)
- os.unlink(filename_out) # save space on TMPDIR
- os.unlink(filename_in) # save space on TMPDIR
- def test_unusual_filenames(self):
- filenames = ['normal', 'with some blanks', '(with_parens)', ]
- for filename in filenames:
- filename = os.path.join(self.input_path, filename)
- with open(filename, 'wb'):
- pass
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- for filename in filenames:
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test', os.path.join('input', filename))
- assert os.path.exists(os.path.join('output', 'input', filename))
- def test_repository_swap_detection(self):
- self.create_test_files()
- os.environ['BORG_PASSPHRASE'] = 'passphrase'
- self.cmd('init', '--encryption=repokey', self.repository_location)
- repository_id = self._extract_repository_id(self.repository_path)
- self.cmd('create', self.repository_location + '::test', 'input')
- shutil.rmtree(self.repository_path)
- self.cmd('init', '--encryption=none', self.repository_location)
- self._set_repository_id(self.repository_path, repository_id)
- self.assert_equal(repository_id, self._extract_repository_id(self.repository_path))
- if self.FORK_DEFAULT:
- self.cmd('create', self.repository_location + '::test.2', 'input', exit_code=EXIT_ERROR)
- else:
- with pytest.raises(Cache.EncryptionMethodMismatch):
- self.cmd('create', self.repository_location + '::test.2', 'input')
- def test_repository_swap_detection2(self):
- self.create_test_files()
- self.cmd('init', '--encryption=none', self.repository_location + '_unencrypted')
- os.environ['BORG_PASSPHRASE'] = 'passphrase'
- self.cmd('init', '--encryption=repokey', self.repository_location + '_encrypted')
- self.cmd('create', self.repository_location + '_encrypted::test', 'input')
- shutil.rmtree(self.repository_path + '_encrypted')
- os.rename(self.repository_path + '_unencrypted', self.repository_path + '_encrypted')
- if self.FORK_DEFAULT:
- self.cmd('create', self.repository_location + '_encrypted::test.2', 'input', exit_code=EXIT_ERROR)
- else:
- with pytest.raises(Cache.RepositoryAccessAborted):
- self.cmd('create', self.repository_location + '_encrypted::test.2', 'input')
- def test_repository_swap_detection_no_cache(self):
- self.create_test_files()
- os.environ['BORG_PASSPHRASE'] = 'passphrase'
- self.cmd('init', '--encryption=repokey', self.repository_location)
- repository_id = self._extract_repository_id(self.repository_path)
- self.cmd('create', self.repository_location + '::test', 'input')
- shutil.rmtree(self.repository_path)
- self.cmd('init', '--encryption=none', self.repository_location)
- self._set_repository_id(self.repository_path, repository_id)
- self.assert_equal(repository_id, self._extract_repository_id(self.repository_path))
- self.cmd('delete', '--cache-only', self.repository_location)
- if self.FORK_DEFAULT:
- self.cmd('create', self.repository_location + '::test.2', 'input', exit_code=EXIT_ERROR)
- else:
- with pytest.raises(Cache.EncryptionMethodMismatch):
- self.cmd('create', self.repository_location + '::test.2', 'input')
- def test_repository_swap_detection2_no_cache(self):
- self.create_test_files()
- self.cmd('init', '--encryption=none', self.repository_location + '_unencrypted')
- os.environ['BORG_PASSPHRASE'] = 'passphrase'
- self.cmd('init', '--encryption=repokey', self.repository_location + '_encrypted')
- self.cmd('create', self.repository_location + '_encrypted::test', 'input')
- self.cmd('delete', '--cache-only', self.repository_location + '_unencrypted')
- self.cmd('delete', '--cache-only', self.repository_location + '_encrypted')
- shutil.rmtree(self.repository_path + '_encrypted')
- os.rename(self.repository_path + '_unencrypted', self.repository_path + '_encrypted')
- if self.FORK_DEFAULT:
- self.cmd('create', self.repository_location + '_encrypted::test.2', 'input', exit_code=EXIT_ERROR)
- else:
- with pytest.raises(Cache.RepositoryAccessAborted):
- self.cmd('create', self.repository_location + '_encrypted::test.2', 'input')
- def test_repository_swap_detection_repokey_blank_passphrase(self):
- # Check that a repokey repo with a blank passphrase is considered like a plaintext repo.
- self.create_test_files()
- # User initializes her repository with her passphrase
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- # Attacker replaces it with her own repository, which is encrypted but has no passphrase set
- shutil.rmtree(self.repository_path)
- with environment_variable(BORG_PASSPHRASE=''):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- # Delete cache & security database, AKA switch to user perspective
- self.cmd('delete', '--cache-only', self.repository_location)
- shutil.rmtree(self.get_security_dir())
- with environment_variable(BORG_PASSPHRASE=None):
- # This is the part were the user would be tricked, e.g. she assumes that BORG_PASSPHRASE
- # is set, while it isn't. Previously this raised no warning,
- # since the repository is, technically, encrypted.
- if self.FORK_DEFAULT:
- self.cmd('create', self.repository_location + '::test.2', 'input', exit_code=EXIT_ERROR)
- else:
- with pytest.raises(Cache.CacheInitAbortedError):
- self.cmd('create', self.repository_location + '::test.2', 'input')
- def test_repository_move(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- security_dir = self.get_security_dir()
- os.rename(self.repository_path, self.repository_path + '_new')
- with environment_variable(BORG_RELOCATED_REPO_ACCESS_IS_OK='yes'):
- self.cmd('info', self.repository_location + '_new')
- with open(os.path.join(security_dir, 'location')) as fd:
- location = fd.read()
- assert location == Location(self.repository_location + '_new').canonical_path()
- # Needs no confirmation anymore
- self.cmd('info', self.repository_location + '_new')
- shutil.rmtree(self.cache_path)
- self.cmd('info', self.repository_location + '_new')
- shutil.rmtree(security_dir)
- self.cmd('info', self.repository_location + '_new')
- for file in ('location', 'key-type', 'manifest-timestamp'):
- assert os.path.exists(os.path.join(security_dir, file))
- def test_security_dir_compat(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- with open(os.path.join(self.get_security_dir(), 'location'), 'w') as fd:
- fd.write('something outdated')
- # This is fine, because the cache still has the correct information. security_dir and cache can disagree
- # if older versions are used to confirm a renamed repository.
- self.cmd('info', self.repository_location)
- def test_unknown_unencrypted(self):
- self.cmd('init', '--encryption=none', self.repository_location)
- # Ok: repository is known
- self.cmd('info', self.repository_location)
- # Ok: repository is still known (through security_dir)
- shutil.rmtree(self.cache_path)
- self.cmd('info', self.repository_location)
- # Needs confirmation: cache and security dir both gone (e.g., another host or rm -rf ~)
- shutil.rmtree(self.cache_path)
- shutil.rmtree(self.get_security_dir())
- if self.FORK_DEFAULT:
- self.cmd('info', self.repository_location, exit_code=EXIT_ERROR)
- else:
- with pytest.raises(Cache.CacheInitAbortedError):
- self.cmd('info', self.repository_location)
- with environment_variable(BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK='yes'):
- self.cmd('info', self.repository_location)
- def test_strip_components(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('dir/file')
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test', '--strip-components', '3')
- assert not os.path.exists('file')
- with self.assert_creates_file('file'):
- self.cmd('extract', self.repository_location + '::test', '--strip-components', '2')
- with self.assert_creates_file('dir/file'):
- self.cmd('extract', self.repository_location + '::test', '--strip-components', '1')
- with self.assert_creates_file('input/dir/file'):
- self.cmd('extract', self.repository_location + '::test', '--strip-components', '0')
- def _extract_hardlinks_setup(self):
- os.mkdir(os.path.join(self.input_path, 'dir1'))
- os.mkdir(os.path.join(self.input_path, 'dir1/subdir'))
- self.create_regular_file('source', contents=b'123456')
- os.link(os.path.join(self.input_path, 'source'),
- os.path.join(self.input_path, 'abba'))
- os.link(os.path.join(self.input_path, 'source'),
- os.path.join(self.input_path, 'dir1/hardlink'))
- os.link(os.path.join(self.input_path, 'source'),
- os.path.join(self.input_path, 'dir1/subdir/hardlink'))
- self.create_regular_file('dir1/source2')
- os.link(os.path.join(self.input_path, 'dir1/source2'),
- os.path.join(self.input_path, 'dir1/aaaa'))
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- @requires_hardlinks
- @unittest.skipUnless(llfuse, 'llfuse not installed')
- def test_fuse_mount_hardlinks(self):
- self._extract_hardlinks_setup()
- mountpoint = os.path.join(self.tmpdir, 'mountpoint')
- # we need to get rid of permissions checking because fakeroot causes issues with it.
- # On all platforms, borg defaults to "default_permissions" and we need to get rid of it via "ignore_permissions".
- # On macOS (darwin), we additionally need "defer_permissions" to switch off the checks in osxfuse.
- if sys.platform == 'darwin':
- ignore_perms = ['-o', 'ignore_permissions,defer_permissions']
- else:
- ignore_perms = ['-o', 'ignore_permissions']
- with self.fuse_mount(self.repository_location + '::test', mountpoint, '--strip-components=2', *ignore_perms), \
- changedir(mountpoint):
- assert os.stat('hardlink').st_nlink == 2
- assert os.stat('subdir/hardlink').st_nlink == 2
- assert open('subdir/hardlink', 'rb').read() == b'123456'
- assert os.stat('aaaa').st_nlink == 2
- assert os.stat('source2').st_nlink == 2
- with self.fuse_mount(self.repository_location + '::test', mountpoint, 'input/dir1', *ignore_perms), \
- changedir(mountpoint):
- assert os.stat('input/dir1/hardlink').st_nlink == 2
- assert os.stat('input/dir1/subdir/hardlink').st_nlink == 2
- assert open('input/dir1/subdir/hardlink', 'rb').read() == b'123456'
- assert os.stat('input/dir1/aaaa').st_nlink == 2
- assert os.stat('input/dir1/source2').st_nlink == 2
- with self.fuse_mount(self.repository_location + '::test', mountpoint, *ignore_perms), \
- changedir(mountpoint):
- assert os.stat('input/source').st_nlink == 4
- assert os.stat('input/abba').st_nlink == 4
- assert os.stat('input/dir1/hardlink').st_nlink == 4
- assert os.stat('input/dir1/subdir/hardlink').st_nlink == 4
- assert open('input/dir1/subdir/hardlink', 'rb').read() == b'123456'
- @requires_hardlinks
- def test_extract_hardlinks1(self):
- self._extract_hardlinks_setup()
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- assert os.stat('input/source').st_nlink == 4
- assert os.stat('input/abba').st_nlink == 4
- assert os.stat('input/dir1/hardlink').st_nlink == 4
- assert os.stat('input/dir1/subdir/hardlink').st_nlink == 4
- assert open('input/dir1/subdir/hardlink', 'rb').read() == b'123456'
- @requires_hardlinks
- def test_extract_hardlinks2(self):
- self._extract_hardlinks_setup()
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test', '--strip-components', '2')
- assert os.stat('hardlink').st_nlink == 2
- assert os.stat('subdir/hardlink').st_nlink == 2
- assert open('subdir/hardlink', 'rb').read() == b'123456'
- assert os.stat('aaaa').st_nlink == 2
- assert os.stat('source2').st_nlink == 2
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test', 'input/dir1')
- assert os.stat('input/dir1/hardlink').st_nlink == 2
- assert os.stat('input/dir1/subdir/hardlink').st_nlink == 2
- assert open('input/dir1/subdir/hardlink', 'rb').read() == b'123456'
- assert os.stat('input/dir1/aaaa').st_nlink == 2
- assert os.stat('input/dir1/source2').st_nlink == 2
- @requires_hardlinks
- def test_extract_hardlinks_twice(self):
- # setup for #5603
- path_a = os.path.join(self.input_path, 'a')
- path_b = os.path.join(self.input_path, 'b')
- os.mkdir(path_a)
- os.mkdir(path_b)
- hl_a = os.path.join(path_a, 'hardlink')
- hl_b = os.path.join(path_b, 'hardlink')
- self.create_regular_file(hl_a, contents=b'123456')
- os.link(hl_a, hl_b)
- self.cmd('init', '--encryption=none', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input', 'input') # give input twice!
- # now test extraction
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- # if issue #5603 happens, extraction gives rc == 1 (triggering AssertionError) and warnings like:
- # input/a/hardlink: link: [Errno 2] No such file or directory: 'input/a/hardlink' -> 'input/a/hardlink'
- # input/b/hardlink: link: [Errno 2] No such file or directory: 'input/a/hardlink' -> 'input/b/hardlink'
- # otherwise, when fixed, the hardlinks should be there and have a link count of 2
- assert os.stat('input/a/hardlink').st_nlink == 2
- assert os.stat('input/b/hardlink').st_nlink == 2
- def test_extract_include_exclude(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('file2', size=1024 * 80)
- self.create_regular_file('file3', size=1024 * 80)
- self.create_regular_file('file4', size=1024 * 80)
- self.cmd('create', '--exclude=input/file4', self.repository_location + '::test', 'input')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test', 'input/file1', )
- self.assert_equal(sorted(os.listdir('output/input')), ['file1'])
- with changedir('output'):
- self.cmd('extract', '--exclude=input/file2', self.repository_location + '::test')
- self.assert_equal(sorted(os.listdir('output/input')), ['file1', 'file3'])
- with changedir('output'):
- self.cmd('extract', '--exclude-from=' + self.exclude_file_path, self.repository_location + '::test')
- self.assert_equal(sorted(os.listdir('output/input')), ['file1', 'file3'])
- def test_extract_include_exclude_regex(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('file2', size=1024 * 80)
- self.create_regular_file('file3', size=1024 * 80)
- self.create_regular_file('file4', size=1024 * 80)
- self.create_regular_file('file333', size=1024 * 80)
- # Create with regular expression exclusion for file4
- self.cmd('create', '--exclude=re:input/file4$', self.repository_location + '::test', 'input')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- self.assert_equal(sorted(os.listdir('output/input')), ['file1', 'file2', 'file3', 'file333'])
- shutil.rmtree('output/input')
- # Extract with regular expression exclusion
- with changedir('output'):
- self.cmd('extract', '--exclude=re:file3+', self.repository_location + '::test')
- self.assert_equal(sorted(os.listdir('output/input')), ['file1', 'file2'])
- shutil.rmtree('output/input')
- # Combine --exclude with fnmatch and regular expression
- with changedir('output'):
- self.cmd('extract', '--exclude=input/file2', '--exclude=re:file[01]', self.repository_location + '::test')
- self.assert_equal(sorted(os.listdir('output/input')), ['file3', 'file333'])
- shutil.rmtree('output/input')
- # Combine --exclude-from and regular expression exclusion
- with changedir('output'):
- self.cmd('extract', '--exclude-from=' + self.exclude_file_path, '--exclude=re:file1',
- '--exclude=re:file(\\d)\\1\\1$', self.repository_location + '::test')
- self.assert_equal(sorted(os.listdir('output/input')), ['file3'])
- def test_extract_include_exclude_regex_from_file(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('file2', size=1024 * 80)
- self.create_regular_file('file3', size=1024 * 80)
- self.create_regular_file('file4', size=1024 * 80)
- self.create_regular_file('file333', size=1024 * 80)
- self.create_regular_file('aa:something', size=1024 * 80)
- # Create while excluding using mixed pattern styles
- with open(self.exclude_file_path, 'wb') as fd:
- fd.write(b're:input/file4$\n')
- fd.write(b'fm:*aa:*thing\n')
- self.cmd('create', '--exclude-from=' + self.exclude_file_path, self.repository_location + '::test', 'input')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- self.assert_equal(sorted(os.listdir('output/input')), ['file1', 'file2', 'file3', 'file333'])
- shutil.rmtree('output/input')
- # Exclude using regular expression
- with open(self.exclude_file_path, 'wb') as fd:
- fd.write(b're:file3+\n')
- with changedir('output'):
- self.cmd('extract', '--exclude-from=' + self.exclude_file_path, self.repository_location + '::test')
- self.assert_equal(sorted(os.listdir('output/input')), ['file1', 'file2'])
- shutil.rmtree('output/input')
- # Mixed exclude pattern styles
- with open(self.exclude_file_path, 'wb') as fd:
- fd.write(b're:file(\\d)\\1\\1$\n')
- fd.write(b'fm:nothingwillmatchthis\n')
- fd.write(b'*/file1\n')
- fd.write(b're:file2$\n')
- with changedir('output'):
- self.cmd('extract', '--exclude-from=' + self.exclude_file_path, self.repository_location + '::test')
- self.assert_equal(sorted(os.listdir('output/input')), ['file3'])
- def test_extract_with_pattern(self):
- self.cmd("init", '--encryption=repokey', self.repository_location)
- self.create_regular_file("file1", size=1024 * 80)
- self.create_regular_file("file2", size=1024 * 80)
- self.create_regular_file("file3", size=1024 * 80)
- self.create_regular_file("file4", size=1024 * 80)
- self.create_regular_file("file333", size=1024 * 80)
- self.cmd("create", self.repository_location + "::test", "input")
- # Extract everything with regular expression
- with changedir("output"):
- self.cmd("extract", self.repository_location + "::test", "re:.*")
- self.assert_equal(sorted(os.listdir("output/input")), ["file1", "file2", "file3", "file333", "file4"])
- shutil.rmtree("output/input")
- # Extract with pattern while also excluding files
- with changedir("output"):
- self.cmd("extract", "--exclude=re:file[34]$", self.repository_location + "::test", r"re:file\d$")
- self.assert_equal(sorted(os.listdir("output/input")), ["file1", "file2"])
- shutil.rmtree("output/input")
- # Combine --exclude with pattern for extraction
- with changedir("output"):
- self.cmd("extract", "--exclude=input/file1", self.repository_location + "::test", "re:file[12]$")
- self.assert_equal(sorted(os.listdir("output/input")), ["file2"])
- shutil.rmtree("output/input")
- # Multiple pattern
- with changedir("output"):
- self.cmd("extract", self.repository_location + "::test", "fm:input/file1", "fm:*file33*", "input/file2")
- self.assert_equal(sorted(os.listdir("output/input")), ["file1", "file2", "file333"])
- def test_extract_list_output(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file', size=1024 * 80)
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- output = self.cmd('extract', self.repository_location + '::test')
- self.assert_not_in("input/file", output)
- shutil.rmtree('output/input')
- with changedir('output'):
- output = self.cmd('extract', '--info', self.repository_location + '::test')
- self.assert_not_in("input/file", output)
- shutil.rmtree('output/input')
- with changedir('output'):
- output = self.cmd('extract', '--list', self.repository_location + '::test')
- self.assert_in("input/file", output)
- shutil.rmtree('output/input')
- with changedir('output'):
- output = self.cmd('extract', '--list', '--info', self.repository_location + '::test')
- self.assert_in("input/file", output)
- def test_extract_progress(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file', size=1024 * 80)
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- output = self.cmd('extract', self.repository_location + '::test', '--progress')
- assert 'Extracting:' in output
- def _create_test_caches(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('cache1/%s' % CACHE_TAG_NAME,
- contents=CACHE_TAG_CONTENTS + b' extra stuff')
- self.create_regular_file('cache2/%s' % CACHE_TAG_NAME,
- contents=b'invalid signature')
- os.mkdir('input/cache3')
- if are_hardlinks_supported():
- os.link('input/cache1/%s' % CACHE_TAG_NAME, 'input/cache3/%s' % CACHE_TAG_NAME)
- else:
- self.create_regular_file('cache3/%s' % CACHE_TAG_NAME,
- contents=CACHE_TAG_CONTENTS + b' extra stuff')
- def test_create_stdin(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- input_data = b'\x00foo\n\nbar\n \n'
- self.cmd('create', self.repository_location + '::test', '-', input=input_data)
- item = json.loads(self.cmd('list', '--json-lines', self.repository_location + '::test'))
- assert item['uid'] == 0
- assert item['gid'] == 0
- assert item['size'] == len(input_data)
- assert item['path'] == 'stdin'
- extracted_data = self.cmd('extract', '--stdout', self.repository_location + '::test', binary_output=True)
- assert extracted_data == input_data
- def test_create_content_from_command(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- input_data = 'some test content'
- name = 'a/b/c'
- self.cmd('create', '--stdin-name', name, '--content-from-command',
- self.repository_location + '::test', '--', 'echo', input_data)
- item = json.loads(self.cmd('list', '--json-lines', self.repository_location + '::test'))
- assert item['uid'] == 0
- assert item['gid'] == 0
- assert item['size'] == len(input_data) + 1 # `echo` adds newline
- assert item['path'] == name
- extracted_data = self.cmd('extract', '--stdout', self.repository_location + '::test')
- assert extracted_data == input_data + '\n'
- def test_create_content_from_command_with_failed_command(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- if self.FORK_DEFAULT:
- output = self.cmd('create', '--content-from-command', self.repository_location + '::test',
- '--', 'sh', '-c', 'exit 73;', exit_code=2)
- assert output.endswith("Command 'sh' exited with status 73\n")
- else:
- with pytest.raises(CommandError):
- self.cmd('create', '--content-from-command', self.repository_location + '::test',
- '--', 'sh', '-c', 'exit 73;')
- archive_list = json.loads(self.cmd('list', '--json', self.repository_location))
- assert archive_list['archives'] == []
- def test_create_content_from_command_missing_command(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- output = self.cmd('create', '--content-from-command', self.repository_location + '::test', exit_code=2)
- assert output.endswith('No command given.\n')
- def test_create_paths_from_stdin(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file("file1", size=1024 * 80)
- self.create_regular_file("dir1/file2", size=1024 * 80)
- self.create_regular_file("dir1/file3", size=1024 * 80)
- self.create_regular_file("file4", size=1024 * 80)
- input_data = b'input/file1\0input/dir1\0input/file4'
- self.cmd('create', '--paths-from-stdin', '--paths-delimiter', '\\0',
- self.repository_location + '::test', input=input_data)
- archive_list = self.cmd('list', '--json-lines', self.repository_location + '::test')
- paths = [json.loads(line)['path'] for line in archive_list.split('\n') if line]
- assert paths == ['input/file1', 'input/dir1', 'input/file4']
- def test_create_paths_from_command(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file("file1", size=1024 * 80)
- self.create_regular_file("file2", size=1024 * 80)
- self.create_regular_file("file3", size=1024 * 80)
- self.create_regular_file("file4", size=1024 * 80)
- input_data = 'input/file1\ninput/file2\ninput/file3'
- self.cmd('create', '--paths-from-command',
- self.repository_location + '::test', '--', 'echo', input_data)
- archive_list = self.cmd('list', '--json-lines', self.repository_location + '::test')
- paths = [json.loads(line)['path'] for line in archive_list.split('\n') if line]
- assert paths == ['input/file1', 'input/file2', 'input/file3']
- def test_create_paths_from_command_with_failed_command(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- if self.FORK_DEFAULT:
- output = self.cmd('create', '--paths-from-command', self.repository_location + '::test',
- '--', 'sh', '-c', 'exit 73;', exit_code=2)
- assert output.endswith("Command 'sh' exited with status 73\n")
- else:
- with pytest.raises(CommandError):
- self.cmd('create', '--paths-from-command', self.repository_location + '::test',
- '--', 'sh', '-c', 'exit 73;')
- archive_list = json.loads(self.cmd('list', '--json', self.repository_location))
- assert archive_list['archives'] == []
- def test_create_paths_from_command_missing_command(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- output = self.cmd('create', '--paths-from-command', self.repository_location + '::test', exit_code=2)
- assert output.endswith('No command given.\n')
- def test_create_without_root(self):
- """test create without a root"""
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', exit_code=2)
- def test_create_pattern_root(self):
- """test create with only a root pattern"""
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('file2', size=1024 * 80)
- output = self.cmd('create', '-v', '--list', '--pattern=R input', self.repository_location + '::test')
- self.assert_in("A input/file1", output)
- self.assert_in("A input/file2", output)
- def test_create_pattern(self):
- """test file patterns during create"""
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('file2', size=1024 * 80)
- self.create_regular_file('file_important', size=1024 * 80)
- output = self.cmd('create', '-v', '--list',
- '--pattern=+input/file_important', '--pattern=-input/file*',
- self.repository_location + '::test', 'input')
- self.assert_in("A input/file_important", output)
- self.assert_in('x input/file1', output)
- self.assert_in('x input/file2', output)
- def test_create_pattern_file(self):
- """test file patterns during create"""
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('file2', size=1024 * 80)
- self.create_regular_file('otherfile', size=1024 * 80)
- self.create_regular_file('file_important', size=1024 * 80)
- output = self.cmd('create', '-v', '--list',
- '--pattern=-input/otherfile', '--patterns-from=' + self.patterns_file_path,
- self.repository_location + '::test', 'input')
- self.assert_in("A input/file_important", output)
- self.assert_in('x input/file1', output)
- self.assert_in('x input/file2', output)
- self.assert_in('x input/otherfile', output)
- def test_create_pattern_exclude_folder_but_recurse(self):
- """test when patterns exclude a parent folder, but include a child"""
- self.patterns_file_path2 = os.path.join(self.tmpdir, 'patterns2')
- with open(self.patterns_file_path2, 'wb') as fd:
- fd.write(b'+ input/x/b\n- input/x*\n')
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('x/a/foo_a', size=1024 * 80)
- self.create_regular_file('x/b/foo_b', size=1024 * 80)
- self.create_regular_file('y/foo_y', size=1024 * 80)
- output = self.cmd('create', '-v', '--list',
- '--patterns-from=' + self.patterns_file_path2,
- self.repository_location + '::test', 'input')
- self.assert_in('x input/x/a/foo_a', output)
- self.assert_in("A input/x/b/foo_b", output)
- self.assert_in('A input/y/foo_y', output)
- def test_create_pattern_exclude_folder_no_recurse(self):
- """test when patterns exclude a parent folder and, but include a child"""
- self.patterns_file_path2 = os.path.join(self.tmpdir, 'patterns2')
- with open(self.patterns_file_path2, 'wb') as fd:
- fd.write(b'+ input/x/b\n! input/x*\n')
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('x/a/foo_a', size=1024 * 80)
- self.create_regular_file('x/b/foo_b', size=1024 * 80)
- self.create_regular_file('y/foo_y', size=1024 * 80)
- output = self.cmd('create', '-v', '--list',
- '--patterns-from=' + self.patterns_file_path2,
- self.repository_location + '::test', 'input')
- self.assert_not_in('input/x/a/foo_a', output)
- self.assert_not_in('input/x/a', output)
- self.assert_in('A input/y/foo_y', output)
- def test_create_pattern_intermediate_folders_first(self):
- """test that intermediate folders appear first when patterns exclude a parent folder but include a child"""
- self.patterns_file_path2 = os.path.join(self.tmpdir, 'patterns2')
- with open(self.patterns_file_path2, 'wb') as fd:
- fd.write(b'+ input/x/a\n+ input/x/b\n- input/x*\n')
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('x/a/foo_a', size=1024 * 80)
- self.create_regular_file('x/b/foo_b', size=1024 * 80)
- with changedir('input'):
- self.cmd('create', '--patterns-from=' + self.patterns_file_path2,
- self.repository_location + '::test', '.')
- # list the archive and verify that the "intermediate" folders appear before
- # their contents
- out = self.cmd('list', '--format', '{type} {path}{NL}', self.repository_location + '::test')
- out_list = out.splitlines()
- self.assert_in('d x/a', out_list)
- self.assert_in('d x/b', out_list)
- assert out_list.index('d x/a') < out_list.index('- x/a/foo_a')
- assert out_list.index('d x/b') < out_list.index('- x/b/foo_b')
- def test_create_no_cache_sync(self):
- self.create_test_files()
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('delete', '--cache-only', self.repository_location)
- create_json = json.loads(self.cmd('create', '--no-cache-sync', self.repository_location + '::test', 'input',
- '--json', '--error')) # ignore experimental warning
- info_json = json.loads(self.cmd('info', self.repository_location + '::test', '--json'))
- create_stats = create_json['cache']['stats']
- info_stats = info_json['cache']['stats']
- assert create_stats == info_stats
- self.cmd('delete', '--cache-only', self.repository_location)
- self.cmd('create', '--no-cache-sync', self.repository_location + '::test2', 'input')
- self.cmd('info', self.repository_location)
- self.cmd('check', self.repository_location)
- def test_extract_pattern_opt(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('file2', size=1024 * 80)
- self.create_regular_file('file_important', size=1024 * 80)
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- self.cmd('extract',
- '--pattern=+input/file_important', '--pattern=-input/file*',
- self.repository_location + '::test')
- self.assert_equal(sorted(os.listdir('output/input')), ['file_important'])
- def _assert_test_caches(self):
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- self.assert_equal(sorted(os.listdir('output/input')), ['cache2', 'file1'])
- self.assert_equal(sorted(os.listdir('output/input/cache2')), [CACHE_TAG_NAME])
- def test_exclude_caches(self):
- self._create_test_caches()
- self.cmd('create', '--exclude-caches', self.repository_location + '::test', 'input')
- self._assert_test_caches()
- def test_recreate_exclude_caches(self):
- self._create_test_caches()
- self.cmd('create', self.repository_location + '::test', 'input')
- self.cmd('recreate', '--exclude-caches', self.repository_location + '::test')
- self._assert_test_caches()
- def _create_test_tagged(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('tagged1/.NOBACKUP')
- self.create_regular_file('tagged2/00-NOBACKUP')
- self.create_regular_file('tagged3/.NOBACKUP/file2', size=1024)
- def _assert_test_tagged(self):
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- self.assert_equal(sorted(os.listdir('output/input')), ['file1'])
- def test_exclude_tagged(self):
- self._create_test_tagged()
- self.cmd('create', '--exclude-if-present', '.NOBACKUP', '--exclude-if-present', '00-NOBACKUP', self.repository_location + '::test', 'input')
- self._assert_test_tagged()
- def test_recreate_exclude_tagged(self):
- self._create_test_tagged()
- self.cmd('create', self.repository_location + '::test', 'input')
- self.cmd('recreate', '--exclude-if-present', '.NOBACKUP', '--exclude-if-present', '00-NOBACKUP',
- self.repository_location + '::test')
- self._assert_test_tagged()
- def _create_test_keep_tagged(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file0', size=1024)
- self.create_regular_file('tagged1/.NOBACKUP1')
- self.create_regular_file('tagged1/file1', size=1024)
- self.create_regular_file('tagged2/.NOBACKUP2/subfile1', size=1024)
- self.create_regular_file('tagged2/file2', size=1024)
- self.create_regular_file('tagged3/%s' % CACHE_TAG_NAME,
- contents=CACHE_TAG_CONTENTS + b' extra stuff')
- self.create_regular_file('tagged3/file3', size=1024)
- self.create_regular_file('taggedall/.NOBACKUP1')
- self.create_regular_file('taggedall/.NOBACKUP2/subfile1', size=1024)
- self.create_regular_file('taggedall/%s' % CACHE_TAG_NAME,
- contents=CACHE_TAG_CONTENTS + b' extra stuff')
- self.create_regular_file('taggedall/file4', size=1024)
- def _assert_test_keep_tagged(self):
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- self.assert_equal(sorted(os.listdir('output/input')), ['file0', 'tagged1', 'tagged2', 'tagged3', 'taggedall'])
- self.assert_equal(os.listdir('output/input/tagged1'), ['.NOBACKUP1'])
- self.assert_equal(os.listdir('output/input/tagged2'), ['.NOBACKUP2'])
- self.assert_equal(os.listdir('output/input/tagged3'), [CACHE_TAG_NAME])
- self.assert_equal(sorted(os.listdir('output/input/taggedall')),
- ['.NOBACKUP1', '.NOBACKUP2', CACHE_TAG_NAME, ])
- def test_exclude_keep_tagged(self):
- self._create_test_keep_tagged()
- self.cmd('create', '--exclude-if-present', '.NOBACKUP1', '--exclude-if-present', '.NOBACKUP2',
- '--exclude-caches', '--keep-exclude-tags', self.repository_location + '::test', 'input')
- self._assert_test_keep_tagged()
- def test_recreate_exclude_keep_tagged(self):
- self._create_test_keep_tagged()
- self.cmd('create', self.repository_location + '::test', 'input')
- self.cmd('recreate', '--exclude-if-present', '.NOBACKUP1', '--exclude-if-present', '.NOBACKUP2',
- '--exclude-caches', '--keep-exclude-tags', self.repository_location + '::test')
- self._assert_test_keep_tagged()
- @pytest.mark.skipif(not are_hardlinks_supported(), reason='hardlinks not supported')
- def test_recreate_hardlinked_tags(self): # test for issue #4911
- self.cmd('init', '--encryption=none', self.repository_location)
- self.create_regular_file('file1', contents=CACHE_TAG_CONTENTS) # "wrong" filename, but correct tag contents
- os.mkdir(os.path.join(self.input_path, 'subdir')) # to make sure the tag is encountered *after* file1
- os.link(os.path.join(self.input_path, 'file1'),
- os.path.join(self.input_path, 'subdir', CACHE_TAG_NAME)) # correct tag name, hardlink to file1
- self.cmd('create', self.repository_location + '::test', 'input')
- # in the "test" archive, we now have, in this order:
- # - a regular file item for "file1"
- # - a hardlink item for "CACHEDIR.TAG" referring back to file1 for its contents
- self.cmd('recreate', '--exclude-caches', '--keep-exclude-tags', self.repository_location + '::test')
- # if issue #4911 is present, the recreate will crash with a KeyError for "input/file1"
- @pytest.mark.skipif(not xattr.XATTR_FAKEROOT, reason='Linux capabilities test, requires fakeroot >= 1.20.2')
- def test_extract_capabilities(self):
- fchown = os.fchown
- # We need to manually patch chown to get the behaviour Linux has, since fakeroot does not
- # accurately model the interaction of chown(2) and Linux capabilities, i.e. it does not remove them.
- def patched_fchown(fd, uid, gid):
- xattr.setxattr(fd, b'security.capability', b'', follow_symlinks=False)
- fchown(fd, uid, gid)
- # The capability descriptor used here is valid and taken from a /usr/bin/ping
- capabilities = b'\x01\x00\x00\x02\x00 \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- self.create_regular_file('file')
- xattr.setxattr(b'input/file', b'security.capability', capabilities)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- with patch.object(os, 'fchown', patched_fchown):
- self.cmd('extract', self.repository_location + '::test')
- assert xattr.getxattr(b'input/file', b'security.capability') == capabilities
- @pytest.mark.skipif(not xattr.XATTR_FAKEROOT, reason='xattr not supported on this system or on this version of '
- 'fakeroot')
- def test_extract_xattrs_errors(self):
- def patched_setxattr_E2BIG(*args, **kwargs):
- raise OSError(errno.E2BIG, 'E2BIG')
- def patched_setxattr_ENOSPC(*args, **kwargs):
- raise OSError(errno.ENOSPC, 'ENOSPC')
- def patched_setxattr_EACCES(*args, **kwargs):
- raise OSError(errno.EACCES, 'EACCES')
- self.create_regular_file('file')
- xattr.setxattr(b'input/file', b'user.attribute', b'value')
- self.cmd('init', self.repository_location, '-e' 'none')
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- input_abspath = os.path.abspath('input/file')
- with patch.object(xattr, 'setxattr', patched_setxattr_E2BIG):
- out = self.cmd('extract', self.repository_location + '::test', exit_code=EXIT_WARNING)
- assert 'when setting extended attribute user.attribute: too big for this filesystem' in out
- os.remove(input_abspath)
- with patch.object(xattr, 'setxattr', patched_setxattr_ENOSPC):
- out = self.cmd('extract', self.repository_location + '::test', exit_code=EXIT_WARNING)
- assert 'when setting extended attribute user.attribute: fs full or xattr too big?' in out
- os.remove(input_abspath)
- with patch.object(xattr, 'setxattr', patched_setxattr_EACCES):
- out = self.cmd('extract', self.repository_location + '::test', exit_code=EXIT_WARNING)
- assert 'when setting extended attribute user.attribute:' in out
- assert os.path.isfile(input_abspath)
- @pytest.mark.skipif(not is_darwin, reason='only for macOS')
- def test_extract_xattrs_resourcefork(self):
- self.create_regular_file('file')
- self.cmd('init', self.repository_location, '-e' 'none')
- input_path = os.path.abspath('input/file')
- xa_key, xa_value = b'com.apple.ResourceFork', b'whatshouldbehere' # issue #7234
- xattr.setxattr(input_path.encode(), xa_key, xa_value)
- birthtime_expected = platform.get_birthtime_ns(os.stat(input_path), input_path)
- mtime_expected = os.stat(input_path).st_mtime_ns
- # atime_expected = os.stat(input_path).st_atime_ns
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- extracted_path = os.path.abspath('input/file')
- birthtime_extracted = platform.get_birthtime_ns(os.stat(extracted_path), extracted_path)
- mtime_extracted = os.stat(extracted_path).st_mtime_ns
- # atime_extracted = os.stat(extracted_path).st_atime_ns
- xa_value_extracted = xattr.getxattr(extracted_path.encode(), xa_key)
- assert xa_value_extracted == xa_value
- assert same_ts_ns(birthtime_extracted, birthtime_expected)
- assert same_ts_ns(mtime_extracted, mtime_expected)
- # assert same_ts_ns(atime_extracted, atime_expected) # still broken, but not really important.
- def test_path_normalization(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('dir1/dir2/file', size=1024 * 80)
- with changedir('input/dir1/dir2'):
- self.cmd('create', self.repository_location + '::test', '../../../input/dir1/../dir1/dir2/..')
- output = self.cmd('list', self.repository_location + '::test')
- self.assert_not_in('..', output)
- self.assert_in(' input/dir1/dir2/file', output)
- def test_exclude_normalization(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('file2', size=1024 * 80)
- with changedir('input'):
- self.cmd('create', '--exclude=file1', self.repository_location + '::test1', '.')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test1')
- self.assert_equal(sorted(os.listdir('output')), ['file2'])
- with changedir('input'):
- self.cmd('create', '--exclude=./file1', self.repository_location + '::test2', '.')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test2')
- self.assert_equal(sorted(os.listdir('output')), ['file2'])
- self.cmd('create', '--exclude=input/./file1', self.repository_location + '::test3', 'input')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test3')
- self.assert_equal(sorted(os.listdir('output/input')), ['file2'])
- def test_repeated_files(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input', 'input')
- def test_overwrite(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('dir2/file2', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- # Overwriting regular files and directories should be supported
- os.mkdir('output/input')
- os.mkdir('output/input/file1')
- os.mkdir('output/input/dir2')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- self.assert_dirs_equal('input', 'output/input')
- # But non-empty dirs should fail
- os.unlink('output/input/file1')
- os.mkdir('output/input/file1')
- os.mkdir('output/input/file1/dir')
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test', exit_code=1)
- def test_rename(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('dir2/file2', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- self.cmd('create', self.repository_location + '::test.2', 'input')
- self.cmd('extract', '--dry-run', self.repository_location + '::test')
- self.cmd('extract', '--dry-run', self.repository_location + '::test.2')
- self.cmd('rename', self.repository_location + '::test', 'test.3')
- self.cmd('extract', '--dry-run', self.repository_location + '::test.2')
- self.cmd('rename', self.repository_location + '::test.2', 'test.4')
- self.cmd('extract', '--dry-run', self.repository_location + '::test.3')
- self.cmd('extract', '--dry-run', self.repository_location + '::test.4')
- # Make sure both archives have been renamed
- with Repository(self.repository_path) as repository:
- manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- self.assert_equal(len(manifest.archives), 2)
- self.assert_in('test.3', manifest.archives)
- self.assert_in('test.4', manifest.archives)
- def test_info(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- info_repo = self.cmd('info', self.repository_location)
- assert 'All archives:' in info_repo
- info_archive = self.cmd('info', self.repository_location + '::test')
- assert 'Archive name: test\n' in info_archive
- info_archive = self.cmd('info', '--first', '1', self.repository_location)
- assert 'Archive name: test\n' in info_archive
- def test_info_and_create_stats(self):
- # "This archive" deduplicated size should match between `borg info` and `borg create --stats`.
- # "All archives" deduplicated size should match between `borg info` and `borg create --stats`.
- # However, even if there is only one archive in the repo, the deduplicated-size stats for
- # "All archives" do not match those for "This archive" because metadata is accounted for at the
- # repository level ("All archives"), but not for an individual archive ("This archive").
- data = b'Z' * (1024 * 80)
- self.create_regular_file('file1', contents=data)
- self.create_regular_file('file2', contents=data)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- create_json = json.loads(self.cmd('create', '--json', self.repository_location + '::only', 'input'))
- # From create --json
- dedup_this_create = create_json['archive']['stats']['deduplicated_size']
- dedup_all_create = create_json['cache']['stats']['unique_size']
- # From info --json (archive and repository views)
- info_archive_json = json.loads(self.cmd('info', '--json', self.repository_location + '::only'))
- info_repo_json = json.loads(self.cmd('info', '--json', self.repository_location))
- assert len(info_archive_json['archives']) == 1
- dedup_this_info = info_archive_json['archives'][0]['stats']['deduplicated_size']
- dedup_all_info = info_repo_json['cache']['stats']['unique_size']
- # create and info shall give the same numbers
- assert dedup_this_create == dedup_this_info
- assert dedup_all_create == dedup_all_info
- # accounting for "all archives" includes metadata chunks, for "this archive" it does not,
- # thus a mismatch is expected.
- assert dedup_this_create < dedup_all_create
- assert dedup_this_info < dedup_all_info
- def test_info_json(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- info_repo = json.loads(self.cmd('info', '--json', self.repository_location))
- repository = info_repo['repository']
- assert len(repository['id']) == 64
- assert 'last_modified' in repository
- assert datetime.strptime(repository['last_modified'], ISO_FORMAT) # must not raise
- assert info_repo['encryption']['mode'] == 'repokey'
- assert 'keyfile' not in info_repo['encryption']
- cache = info_repo['cache']
- stats = cache['stats']
- assert all(isinstance(o, int) for o in stats.values())
- assert all(key in stats for key in ('total_chunks', 'total_csize', 'total_size', 'total_unique_chunks', 'unique_csize', 'unique_size'))
- info_archive = json.loads(self.cmd('info', '--json', self.repository_location + '::test'))
- assert info_repo['repository'] == info_archive['repository']
- assert info_repo['cache'] == info_archive['cache']
- archives = info_archive['archives']
- assert len(archives) == 1
- archive = archives[0]
- assert archive['name'] == 'test'
- assert isinstance(archive['command_line'], list)
- assert isinstance(archive['duration'], float)
- assert len(archive['id']) == 64
- assert 'stats' in archive
- assert datetime.strptime(archive['start'], ISO_FORMAT)
- assert datetime.strptime(archive['end'], ISO_FORMAT)
- def test_info_json_of_empty_archive(self):
- """See https://github.com/borgbackup/borg/issues/6120"""
- self.cmd('init', '--encryption=repokey', self.repository_location)
- info_repo = json.loads(self.cmd('info', '--json', '--first=1', self.repository_location))
- assert info_repo["archives"] == []
- info_repo = json.loads(self.cmd('info', '--json', '--last=1', self.repository_location))
- assert info_repo["archives"] == []
- def test_comment(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test1', 'input')
- self.cmd('create', '--comment', 'this is the comment', self.repository_location + '::test2', 'input')
- self.cmd('create', '--comment', '"deleted" comment', self.repository_location + '::test3', 'input')
- self.cmd('create', '--comment', 'preserved comment', self.repository_location + '::test4', 'input')
- assert 'Comment: \n' in self.cmd('info', self.repository_location + '::test1')
- assert 'Comment: this is the comment' in self.cmd('info', self.repository_location + '::test2')
- self.cmd('recreate', self.repository_location + '::test1', '--comment', 'added comment')
- self.cmd('recreate', self.repository_location + '::test2', '--comment', 'modified comment')
- self.cmd('recreate', self.repository_location + '::test3', '--comment', '')
- self.cmd('recreate', self.repository_location + '::test4', '12345')
- assert 'Comment: added comment' in self.cmd('info', self.repository_location + '::test1')
- assert 'Comment: modified comment' in self.cmd('info', self.repository_location + '::test2')
- assert 'Comment: \n' in self.cmd('info', self.repository_location + '::test3')
- assert 'Comment: preserved comment' in self.cmd('info', self.repository_location + '::test4')
- def test_delete(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('dir2/file2', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- self.cmd('create', self.repository_location + '::test.2', 'input')
- self.cmd('create', self.repository_location + '::test.3', 'input')
- self.cmd('create', self.repository_location + '::another_test.1', 'input')
- self.cmd('create', self.repository_location + '::another_test.2', 'input')
- self.cmd('extract', '--dry-run', self.repository_location + '::test')
- self.cmd('extract', '--dry-run', self.repository_location + '::test.2')
- self.cmd('delete', '--glob-archives', 'another_*', self.repository_location)
- self.cmd('delete', '--last', '1', self.repository_location)
- self.cmd('delete', self.repository_location + '::test')
- self.cmd('extract', '--dry-run', self.repository_location + '::test.2')
- output = self.cmd('delete', '--stats', self.repository_location + '::test.2')
- self.assert_in('Deleted data:', output)
- # Make sure all data except the manifest has been deleted
- with Repository(self.repository_path) as repository:
- self.assert_equal(len(repository), 1)
- def test_delete_multiple(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test1', 'input')
- self.cmd('create', self.repository_location + '::test2', 'input')
- self.cmd('create', self.repository_location + '::test3', 'input')
- self.cmd('delete', self.repository_location + '::test1', 'test2')
- self.cmd('extract', '--dry-run', self.repository_location + '::test3')
- self.cmd('delete', self.repository_location, 'test3')
- assert not self.cmd('list', self.repository_location)
- def test_delete_repo(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.create_regular_file('dir2/file2', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- self.cmd('create', self.repository_location + '::test.2', 'input')
- os.environ['BORG_DELETE_I_KNOW_WHAT_I_AM_DOING'] = 'no'
- if self.FORK_DEFAULT:
- self.cmd('delete', self.repository_location, exit_code=2)
- else:
- with pytest.raises(CancelledByUser):
- self.cmd('delete', self.repository_location)
- assert os.path.exists(self.repository_path)
- os.environ['BORG_DELETE_I_KNOW_WHAT_I_AM_DOING'] = 'YES'
- self.cmd('delete', self.repository_location)
- # Make sure the repo is gone
- self.assertFalse(os.path.exists(self.repository_path))
- def test_delete_force(self):
- self.cmd('init', '--encryption=none', self.repository_location)
- self.create_src_archive('test')
- with Repository(self.repository_path, exclusive=True) as repository:
- manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- archive = Archive(repository, key, manifest, 'test')
- for item in archive.iter_items():
- if item.path.endswith('testsuite/archiver.py'):
- repository.delete(item.chunks[-1].id)
- break
- else:
- assert False # missed the file
- repository.commit(compact=False)
- output = self.cmd('delete', '--force', self.repository_location + '::test')
- self.assert_in('deleted archive was corrupted', output)
- self.cmd('check', '--repair', self.repository_location)
- output = self.cmd('list', self.repository_location)
- self.assert_not_in('test', output)
- def test_delete_double_force(self):
- self.cmd('init', '--encryption=none', self.repository_location)
- self.create_src_archive('test')
- with Repository(self.repository_path, exclusive=True) as repository:
- manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- archive = Archive(repository, key, manifest, 'test')
- id = archive.metadata.items[0]
- repository.put(id, b'corrupted items metadata stream chunk')
- repository.commit(compact=False)
- self.cmd('delete', '--force', '--force', self.repository_location + '::test')
- self.cmd('check', '--repair', self.repository_location)
- output = self.cmd('list', self.repository_location)
- self.assert_not_in('test', output)
- def test_corrupted_repository(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('test')
- self.cmd('extract', '--dry-run', self.repository_location + '::test')
- output = self.cmd('check', '--show-version', self.repository_location)
- self.assert_in('borgbackup version', output) # implied output even without --info given
- self.assert_not_in('Starting repository check', output) # --info not given for root logger
- name = sorted(os.listdir(os.path.join(self.tmpdir, 'repository', 'data', '0')), reverse=True)[1]
- with open(os.path.join(self.tmpdir, 'repository', 'data', '0', name), 'r+b') as fd:
- fd.seek(100)
- fd.write(b'XXXX')
- output = self.cmd('check', '--info', self.repository_location, exit_code=1)
- self.assert_in('Starting repository check', output) # --info given for root logger
- def test_readonly_check(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('test')
- with self.read_only(self.repository_path):
- # verify that command normally doesn't work with read-only repo
- if self.FORK_DEFAULT:
- self.cmd('check', '--verify-data', self.repository_location, exit_code=EXIT_ERROR)
- else:
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
- self.cmd('check', '--verify-data', self.repository_location)
- if isinstance(excinfo.value, RemoteRepository.RPCError):
- assert excinfo.value.exception_class == 'LockFailed'
- # verify that command works with read-only repo when using --bypass-lock
- self.cmd('check', '--verify-data', self.repository_location, '--bypass-lock')
- def test_readonly_diff(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('a')
- self.create_src_archive('b')
- with self.read_only(self.repository_path):
- # verify that command normally doesn't work with read-only repo
- if self.FORK_DEFAULT:
- self.cmd('diff', '%s::a' % self.repository_location, 'b', exit_code=EXIT_ERROR)
- else:
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
- self.cmd('diff', '%s::a' % self.repository_location, 'b')
- if isinstance(excinfo.value, RemoteRepository.RPCError):
- assert excinfo.value.exception_class == 'LockFailed'
- # verify that command works with read-only repo when using --bypass-lock
- self.cmd('diff', '%s::a' % self.repository_location, 'b', '--bypass-lock')
- def test_readonly_export_tar(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('test')
- with self.read_only(self.repository_path):
- # verify that command normally doesn't work with read-only repo
- if self.FORK_DEFAULT:
- self.cmd('export-tar', '%s::test' % self.repository_location, 'test.tar', exit_code=EXIT_ERROR)
- else:
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
- self.cmd('export-tar', '%s::test' % self.repository_location, 'test.tar')
- if isinstance(excinfo.value, RemoteRepository.RPCError):
- assert excinfo.value.exception_class == 'LockFailed'
- # verify that command works with read-only repo when using --bypass-lock
- self.cmd('export-tar', '%s::test' % self.repository_location, 'test.tar', '--bypass-lock')
- def test_readonly_extract(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('test')
- with self.read_only(self.repository_path):
- # verify that command normally doesn't work with read-only repo
- if self.FORK_DEFAULT:
- self.cmd('extract', '%s::test' % self.repository_location, exit_code=EXIT_ERROR)
- else:
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
- self.cmd('extract', '%s::test' % self.repository_location)
- if isinstance(excinfo.value, RemoteRepository.RPCError):
- assert excinfo.value.exception_class == 'LockFailed'
- # verify that command works with read-only repo when using --bypass-lock
- self.cmd('extract', '%s::test' % self.repository_location, '--bypass-lock')
- def test_readonly_info(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('test')
- with self.read_only(self.repository_path):
- # verify that command normally doesn't work with read-only repo
- if self.FORK_DEFAULT:
- self.cmd('info', self.repository_location, exit_code=EXIT_ERROR)
- else:
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
- self.cmd('info', self.repository_location)
- if isinstance(excinfo.value, RemoteRepository.RPCError):
- assert excinfo.value.exception_class == 'LockFailed'
- # verify that command works with read-only repo when using --bypass-lock
- self.cmd('info', self.repository_location, '--bypass-lock')
- def test_readonly_list(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('test')
- with self.read_only(self.repository_path):
- # verify that command normally doesn't work with read-only repo
- if self.FORK_DEFAULT:
- self.cmd('list', self.repository_location, exit_code=EXIT_ERROR)
- else:
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
- self.cmd('list', self.repository_location)
- if isinstance(excinfo.value, RemoteRepository.RPCError):
- assert excinfo.value.exception_class == 'LockFailed'
- # verify that command works with read-only repo when using --bypass-lock
- self.cmd('list', self.repository_location, '--bypass-lock')
- @unittest.skipUnless(llfuse, 'llfuse not installed')
- def test_readonly_mount(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('test')
- with self.read_only(self.repository_path):
- # verify that command normally doesn't work with read-only repo
- if self.FORK_DEFAULT:
- with self.fuse_mount(self.repository_location, exit_code=EXIT_ERROR):
- pass
- else:
- with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
- # self.fuse_mount always assumes fork=True, so for this test we have to manually set fork=False
- with self.fuse_mount(self.repository_location, fork=False):
- pass
- if isinstance(excinfo.value, RemoteRepository.RPCError):
- assert excinfo.value.exception_class == 'LockFailed'
- # verify that command works with read-only repo when using --bypass-lock
- with self.fuse_mount(self.repository_location, None, '--bypass-lock'):
- pass
- @pytest.mark.skipif('BORG_TESTS_IGNORE_MODES' in os.environ, reason='modes unreliable')
- def test_umask(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- mode = os.stat(self.repository_path).st_mode
- self.assertEqual(stat.S_IMODE(mode), 0o700)
- def test_create_dry_run(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', '--dry-run', self.repository_location + '::test', 'input')
- # Make sure no archive has been created
- with Repository(self.repository_path) as repository:
- manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- self.assert_equal(len(manifest.archives), 0)
- def add_unknown_feature(self, operation):
- with Repository(self.repository_path, exclusive=True) as repository:
- manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- manifest.config[b'feature_flags'] = {operation.value.encode(): {b'mandatory': [b'unknown-feature']}}
- manifest.write()
- repository.commit(compact=False)
- def cmd_raises_unknown_feature(self, args):
- if self.FORK_DEFAULT:
- self.cmd(*args, exit_code=EXIT_ERROR)
- else:
- with pytest.raises(MandatoryFeatureUnsupported) as excinfo:
- self.cmd(*args)
- assert excinfo.value.args == (['unknown-feature'],)
- def test_unknown_feature_on_create(self):
- print(self.cmd('init', '--encryption=repokey', self.repository_location))
- self.add_unknown_feature(Manifest.Operation.WRITE)
- self.cmd_raises_unknown_feature(['create', self.repository_location + '::test', 'input'])
- def test_unknown_feature_on_cache_sync(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('delete', '--cache-only', self.repository_location)
- self.add_unknown_feature(Manifest.Operation.READ)
- self.cmd_raises_unknown_feature(['create', self.repository_location + '::test', 'input'])
- def test_unknown_feature_on_change_passphrase(self):
- print(self.cmd('init', '--encryption=repokey', self.repository_location))
- self.add_unknown_feature(Manifest.Operation.CHECK)
- self.cmd_raises_unknown_feature(['key', 'change-passphrase', self.repository_location])
- def test_unknown_feature_on_read(self):
- print(self.cmd('init', '--encryption=repokey', self.repository_location))
- self.cmd('create', self.repository_location + '::test', 'input')
- self.add_unknown_feature(Manifest.Operation.READ)
- with changedir('output'):
- self.cmd_raises_unknown_feature(['extract', self.repository_location + '::test'])
- self.cmd_raises_unknown_feature(['list', self.repository_location])
- self.cmd_raises_unknown_feature(['info', self.repository_location + '::test'])
- def test_unknown_feature_on_rename(self):
- print(self.cmd('init', '--encryption=repokey', self.repository_location))
- self.cmd('create', self.repository_location + '::test', 'input')
- self.add_unknown_feature(Manifest.Operation.CHECK)
- self.cmd_raises_unknown_feature(['rename', self.repository_location + '::test', 'other'])
- def test_unknown_feature_on_delete(self):
- print(self.cmd('init', '--encryption=repokey', self.repository_location))
- self.cmd('create', self.repository_location + '::test', 'input')
- self.add_unknown_feature(Manifest.Operation.DELETE)
- # delete of an archive raises
- self.cmd_raises_unknown_feature(['delete', self.repository_location + '::test'])
- self.cmd_raises_unknown_feature(['prune', '--keep-daily=3', self.repository_location])
- # delete of the whole repository ignores features
- self.cmd('delete', self.repository_location)
- @unittest.skipUnless(llfuse, 'llfuse not installed')
- def test_unknown_feature_on_mount(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- self.add_unknown_feature(Manifest.Operation.READ)
- mountpoint = os.path.join(self.tmpdir, 'mountpoint')
- os.mkdir(mountpoint)
- # XXX this might hang if it doesn't raise an error
- self.cmd_raises_unknown_feature(['mount', self.repository_location + '::test', mountpoint])
- @pytest.mark.allow_cache_wipe
- def test_unknown_mandatory_feature_in_cache(self):
- remote_repo = bool(self.prefix)
- print(self.cmd('init', '--encryption=repokey', self.repository_location))
- with Repository(self.repository_path, exclusive=True) as repository:
- if remote_repo:
- repository._location = Location(self.repository_location)
- manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- with Cache(repository, key, manifest) as cache:
- cache.begin_txn()
- cache.cache_config.mandatory_features = {'unknown-feature'}
- cache.commit()
- if self.FORK_DEFAULT:
- self.cmd('create', self.repository_location + '::test', 'input')
- else:
- called = False
- wipe_cache_safe = LocalCache.wipe_cache
- def wipe_wrapper(*args):
- nonlocal called
- called = True
- wipe_cache_safe(*args)
- with patch.object(LocalCache, 'wipe_cache', wipe_wrapper):
- self.cmd('create', self.repository_location + '::test', 'input')
- assert called
- with Repository(self.repository_path, exclusive=True) as repository:
- if remote_repo:
- repository._location = Location(self.repository_location)
- manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- with Cache(repository, key, manifest) as cache:
- assert cache.cache_config.mandatory_features == set()
- def test_progress_on(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- output = self.cmd('create', '--progress', self.repository_location + '::test4', 'input')
- self.assert_in("\r", output)
- def test_progress_off(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- output = self.cmd('create', self.repository_location + '::test5', 'input')
- self.assert_not_in("\r", output)
- def test_file_status(self):
- """test that various file status show expected results
- clearly incomplete: only tests for the weird "unchanged" status for now"""
- self.create_regular_file('file1', size=1024 * 80)
- time.sleep(1) # file2 must have newer timestamps than file1
- self.create_regular_file('file2', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- output = self.cmd('create', '--list', self.repository_location + '::test', 'input')
- self.assert_in("A input/file1", output)
- self.assert_in("A input/file2", output)
- # should find first file as unmodified
- output = self.cmd('create', '--list', self.repository_location + '::test1', 'input')
- self.assert_in("U input/file1", output)
- # this is expected, although surprising, for why, see:
- # https://borgbackup.readthedocs.org/en/latest/faq.html#i-am-seeing-a-added-status-for-a-unchanged-file
- self.assert_in("A input/file2", output)
- def test_file_status_cs_cache_mode(self):
- """test that a changed file with faked "previous" mtime still gets backed up in ctime,size cache_mode"""
- self.create_regular_file('file1', contents=b'123')
- time.sleep(1) # file2 must have newer timestamps than file1
- self.create_regular_file('file2', size=10)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- output = self.cmd('create', '--list', '--files-cache=ctime,size', self.repository_location + '::test1', 'input')
- # modify file1, but cheat with the mtime (and atime) and also keep same size:
- st = os.stat('input/file1')
- self.create_regular_file('file1', contents=b'321')
- os.utime('input/file1', ns=(st.st_atime_ns, st.st_mtime_ns))
- # this mode uses ctime for change detection, so it should find file1 as modified
- output = self.cmd('create', '--list', '--files-cache=ctime,size', self.repository_location + '::test2', 'input')
- self.assert_in("M input/file1", output)
- def test_file_status_ms_cache_mode(self):
- """test that a chmod'ed file with no content changes does not get chunked again in mtime,size cache_mode"""
- self.create_regular_file('file1', size=10)
- time.sleep(1) # file2 must have newer timestamps than file1
- self.create_regular_file('file2', size=10)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- output = self.cmd('create', '--list', '--files-cache=mtime,size', self.repository_location + '::test1', 'input')
- # change mode of file1, no content change:
- st = os.stat('input/file1')
- os.chmod('input/file1', st.st_mode ^ stat.S_IRWXO) # this triggers a ctime change, but mtime is unchanged
- # this mode uses mtime for change detection, so it should find file1 as unmodified
- output = self.cmd('create', '--list', '--files-cache=mtime,size', self.repository_location + '::test2', 'input')
- self.assert_in("U input/file1", output)
- def test_file_status_rc_cache_mode(self):
- """test that files get rechunked unconditionally in rechunk,ctime cache mode"""
- self.create_regular_file('file1', size=10)
- time.sleep(1) # file2 must have newer timestamps than file1
- self.create_regular_file('file2', size=10)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- output = self.cmd('create', '--list', '--files-cache=rechunk,ctime', self.repository_location + '::test1', 'input')
- # no changes here, but this mode rechunks unconditionally
- output = self.cmd('create', '--list', '--files-cache=rechunk,ctime', self.repository_location + '::test2', 'input')
- self.assert_in("A input/file1", output)
- def test_file_status_excluded(self):
- """test that excluded paths are listed"""
- self.create_regular_file('file1', size=1024 * 80)
- time.sleep(1) # file2 must have newer timestamps than file1
- self.create_regular_file('file2', size=1024 * 80)
- if has_lchflags:
- self.create_regular_file('file3', size=1024 * 80)
- platform.set_flags(os.path.join(self.input_path, 'file3'), stat.UF_NODUMP)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- output = self.cmd('create', '--list', '--exclude-nodump', self.repository_location + '::test', 'input')
- self.assert_in("A input/file1", output)
- self.assert_in("A input/file2", output)
- if has_lchflags:
- self.assert_in("x input/file3", output)
- # should find second file as excluded
- output = self.cmd('create', '--list', '--exclude-nodump', self.repository_location + '::test1', 'input', '--exclude', '*/file2')
- self.assert_in("U input/file1", output)
- self.assert_in("x input/file2", output)
- if has_lchflags:
- self.assert_in("x input/file3", output)
- def test_create_json(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- create_info = json.loads(self.cmd('create', '--json', self.repository_location + '::test', 'input'))
- # The usual keys
- assert 'encryption' in create_info
- assert 'repository' in create_info
- assert 'cache' in create_info
- assert 'last_modified' in create_info['repository']
- archive = create_info['archive']
- assert archive['name'] == 'test'
- assert isinstance(archive['command_line'], list)
- assert isinstance(archive['duration'], float)
- assert len(archive['id']) == 64
- assert 'stats' in archive
- def test_create_topical(self):
- self.create_regular_file('file1', size=1024 * 80)
- time.sleep(1) # file2 must have newer timestamps than file1
- self.create_regular_file('file2', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- # no listing by default
- output = self.cmd('create', self.repository_location + '::test', 'input')
- self.assert_not_in('file1', output)
- # shouldn't be listed even if unchanged
- output = self.cmd('create', self.repository_location + '::test0', 'input')
- self.assert_not_in('file1', output)
- # should list the file as unchanged
- output = self.cmd('create', '--list', '--filter=U', self.repository_location + '::test1', 'input')
- self.assert_in('file1', output)
- # should *not* list the file as changed
- output = self.cmd('create', '--list', '--filter=AM', self.repository_location + '::test2', 'input')
- self.assert_not_in('file1', output)
- # change the file
- self.create_regular_file('file1', size=1024 * 100)
- # should list the file as changed
- output = self.cmd('create', '--list', '--filter=AM', self.repository_location + '::test3', 'input')
- self.assert_in('file1', output)
- @pytest.mark.skipif(not are_fifos_supported(), reason='FIFOs not supported')
- def test_create_read_special_symlink(self):
- from threading import Thread
- def fifo_feeder(fifo_fn, data):
- fd = os.open(fifo_fn, os.O_WRONLY)
- try:
- os.write(fd, data)
- finally:
- os.close(fd)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- archive = self.repository_location + '::test'
- data = b'foobar' * 1000
- fifo_fn = os.path.join(self.input_path, 'fifo')
- link_fn = os.path.join(self.input_path, 'link_fifo')
- os.mkfifo(fifo_fn)
- os.symlink(fifo_fn, link_fn)
- t = Thread(target=fifo_feeder, args=(fifo_fn, data))
- t.start()
- try:
- self.cmd('create', '--read-special', archive, 'input/link_fifo')
- finally:
- t.join()
- with changedir('output'):
- self.cmd('extract', archive)
- fifo_fn = 'input/link_fifo'
- with open(fifo_fn, 'rb') as f:
- extracted_data = f.read()
- assert extracted_data == data
- def test_create_read_special_broken_symlink(self):
- os.symlink('somewhere does not exist', os.path.join(self.input_path, 'link'))
- self.cmd('init', '--encryption=repokey', self.repository_location)
- archive = self.repository_location + '::test'
- self.cmd('create', '--read-special', archive, 'input')
- output = self.cmd('list', archive)
- assert 'input/link -> somewhere does not exist' in output
- def test_create_dotslash_hack(self):
- os.makedirs(os.path.join(self.input_path, 'first', 'secondA', 'thirdA'))
- os.makedirs(os.path.join(self.input_path, 'first', 'secondB', 'thirdB'))
- self.cmd('init', '--encryption=none', self.repository_location)
- archive = self.repository_location + '::test'
- self.cmd('create', archive, 'input/first/./') # hack!
- output = self.cmd('list', archive)
- # dir levels left of slashdot (= input, first) not in archive:
- assert 'input' not in output
- assert 'input/first' not in output
- assert 'input/first/secondA' not in output
- assert 'input/first/secondA/thirdA' not in output
- assert 'input/first/secondB' not in output
- assert 'input/first/secondB/thirdB' not in output
- assert 'first' not in output
- assert 'first/secondA' not in output
- assert 'first/secondA/thirdA' not in output
- assert 'first/secondB' not in output
- assert 'first/secondB/thirdB' not in output
- # dir levels right of slashdot are in archive:
- assert 'secondA' in output
- assert 'secondA/thirdA' in output
- assert 'secondB' in output
- assert 'secondB/thirdB' in output
- # def test_cmdline_compatibility(self):
- # self.create_regular_file('file1', size=1024 * 80)
- # self.cmd('init', '--encryption=repokey', self.repository_location)
- # self.cmd('create', self.repository_location + '::test', 'input')
- # output = self.cmd('foo', self.repository_location, '--old')
- # self.assert_in('"--old" has been deprecated. Use "--new" instead', output)
- def test_prune_repository(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test1', src_dir)
- self.cmd('create', self.repository_location + '::test2', src_dir)
- # these are not really a checkpoints, but they look like some:
- self.cmd('create', self.repository_location + '::test3.checkpoint', src_dir)
- self.cmd('create', self.repository_location + '::test3.checkpoint.1', src_dir)
- self.cmd('create', self.repository_location + '::test4.checkpoint', src_dir)
- output = self.cmd('prune', '--list', '--dry-run', self.repository_location, '--keep-daily=1')
- assert re.search(r'Would prune:\s+test1', output)
- # must keep the latest non-checkpoint archive:
- assert re.search(r'Keeping archive \(rule: daily #1\):\s+test2', output)
- # must keep the latest checkpoint archive:
- assert re.search(r'Keeping checkpoint archive:\s+test4.checkpoint', output)
- output = self.cmd('list', '--consider-checkpoints', self.repository_location)
- self.assert_in('test1', output)
- self.assert_in('test2', output)
- self.assert_in('test3.checkpoint', output)
- self.assert_in('test3.checkpoint.1', output)
- self.assert_in('test4.checkpoint', output)
- self.cmd('prune', self.repository_location, '--keep-daily=1')
- output = self.cmd('list', '--consider-checkpoints', self.repository_location)
- self.assert_not_in('test1', output)
- # the latest non-checkpoint archive must be still there:
- self.assert_in('test2', output)
- # only the latest checkpoint archive must still be there:
- self.assert_not_in('test3.checkpoint', output)
- self.assert_not_in('test3.checkpoint.1', output)
- self.assert_in('test4.checkpoint', output)
- # now we supersede the latest checkpoint by a successful backup:
- self.cmd('create', self.repository_location + '::test5', src_dir)
- self.cmd('prune', self.repository_location, '--keep-daily=2')
- output = self.cmd('list', '--consider-checkpoints', self.repository_location)
- # all checkpoints should be gone now:
- self.assert_not_in('checkpoint', output)
- # the latest archive must be still there
- self.assert_in('test5', output)
- # Given a date and time in local tz, create a UTC timestamp string suitable
- # for create --timestamp command line option
- def _to_utc_timestamp(self, year, month, day, hour, minute, second):
- dtime = datetime(year, month, day, hour, minute, second, 0).astimezone() # local time with local timezone
- return dtime.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
- def _create_archive_ts(self, name, y, m, d, H=0, M=0, S=0):
- loc = self.repository_location + '::' + name
- self.cmd('create', '--timestamp', self._to_utc_timestamp(y, m, d, H, M, S), loc, src_dir)
- # This test must match docs/misc/prune-example.txt
- def test_prune_repository_example(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- # Archives that will be kept, per the example
- # Oldest archive
- self._create_archive_ts('test01', 2015, 1, 1)
- # 6 monthly archives
- self._create_archive_ts('test02', 2015, 6, 30)
- self._create_archive_ts('test03', 2015, 7, 31)
- self._create_archive_ts('test04', 2015, 8, 31)
- self._create_archive_ts('test05', 2015, 9, 30)
- self._create_archive_ts('test06', 2015, 10, 31)
- self._create_archive_ts('test07', 2015, 11, 30)
- # 14 daily archives
- self._create_archive_ts('test08', 2015, 12, 17)
- self._create_archive_ts('test09', 2015, 12, 18)
- self._create_archive_ts('test10', 2015, 12, 20)
- self._create_archive_ts('test11', 2015, 12, 21)
- self._create_archive_ts('test12', 2015, 12, 22)
- self._create_archive_ts('test13', 2015, 12, 23)
- self._create_archive_ts('test14', 2015, 12, 24)
- self._create_archive_ts('test15', 2015, 12, 25)
- self._create_archive_ts('test16', 2015, 12, 26)
- self._create_archive_ts('test17', 2015, 12, 27)
- self._create_archive_ts('test18', 2015, 12, 28)
- self._create_archive_ts('test19', 2015, 12, 29)
- self._create_archive_ts('test20', 2015, 12, 30)
- self._create_archive_ts('test21', 2015, 12, 31)
- # Additional archives that would be pruned
- # The second backup of the year
- self._create_archive_ts('test22', 2015, 1, 2)
- # The next older monthly backup
- self._create_archive_ts('test23', 2015, 5, 31)
- # The next older daily backup
- self._create_archive_ts('test24', 2015, 12, 16)
- output = self.cmd('prune', '--list', '--dry-run', self.repository_location, '--keep-daily=14', '--keep-monthly=6', '--keep-yearly=1')
- # Prune second backup of the year
- assert re.search(r'Would prune:\s+test22', output)
- # Prune next older monthly and daily backups
- assert re.search(r'Would prune:\s+test23', output)
- assert re.search(r'Would prune:\s+test24', output)
- # Must keep the other 21 backups
- # Yearly is kept as oldest archive
- assert re.search(r'Keeping archive \(rule: yearly\[oldest\] #1\):\s+test01', output)
- for i in range(1, 7):
- assert re.search(r'Keeping archive \(rule: monthly #' + str(i) + r'\):\s+test' + ("%02d" % (8-i)), output)
- for i in range(1, 15):
- assert re.search(r'Keeping archive \(rule: daily #' + str(i) + r'\):\s+test' + ("%02d" % (22-i)), output)
- output = self.cmd('list', self.repository_location)
- # Nothing pruned after dry run
- for i in range(1, 25):
- self.assert_in('test%02d' % i, output)
- self.cmd('prune', self.repository_location, '--keep-daily=14', '--keep-monthly=6', '--keep-yearly=1')
- output = self.cmd('list', self.repository_location)
- # All matching backups plus oldest kept
- for i in range(1, 22):
- self.assert_in('test%02d' % i, output)
- # Other backups have been pruned
- for i in range(22, 25):
- self.assert_not_in('test%02d' % i, output)
- def test_prune_quarterly(self):
- # Example worked through by hand when developing quarterly
- # strategy, based upon existing backups where quarterly strategy
- # is desired. Weekly/monthly backups that don't affect results were
- # trimmed to speed up the test.
- #
- # Week number is shown in comment for every row in the below list.
- # Year is also shown when it doesn't match the year given in the
- # date tuple.
- test_dates = [
- (2020, 12, 6), (2021, 1, 3), # 49, 2020-53
- (2021, 3, 28), (2021, 4, 25), # 12, 16
- (2021, 6, 27), (2021, 7, 4), # 25, 26
- (2021, 9, 26), (2021, 10, 3), # 38, 39
- (2021, 12, 26), (2022, 1, 2) # 51, 2021-52
- ]
- def mk_name(tup):
- (y, m, d) = tup
- suff = datetime(y, m, d).strftime("%Y-%m-%d")
- return f"test-{suff}"
- # The kept repos are based on working on an example by hand,
- # archives made on the following dates should be kept:
- EXPECTED_KEPT = {
- "13weekly": [
- (2020, 12, 6), (2021, 1, 3), (2021, 3, 28), (2021, 7, 4),
- (2021, 10, 3), (2022, 1, 2)
- ],
- "3monthly": [
- (2020, 12, 6), (2021, 3, 28), (2021, 6, 27), (2021, 9, 26),
- (2021, 12, 26), (2022, 1, 2)
- ]
- }
- for (strat, to_keep) in EXPECTED_KEPT.items():
- # Initialize our repo.
- self.cmd('init', '--encryption=repokey', self.repository_location)
- for a, (y, m, d) in zip(map(mk_name, test_dates), test_dates):
- self._create_archive_ts(a, y, m, d)
- to_prune = list(set(test_dates) - set(to_keep))
- # Use 99 instead of -1 to test that oldest backup is kept.
- output = self.cmd('prune', '--list', '--dry-run', self.repository_location, f"--keep-{strat}=99")
- for a in map(mk_name, to_prune):
- assert re.search(fr"Would prune:\s+{a}", output)
- oldest = r"\[oldest\]" if strat in ("13weekly") else ""
- assert re.search(fr"Keeping archive \(rule: quarterly_{strat}{oldest} #\d+\):\s+test-2020-12-06", output)
- for a in map(mk_name, to_keep[1:]):
- assert re.search(fr"Keeping archive \(rule: quarterly_{strat} #\d+\):\s+{a}", output)
- output = self.cmd('list', self.repository_location)
- # Nothing pruned after dry run
- for a in map(mk_name, test_dates):
- self.assert_in(a, output)
- self.cmd('prune', self.repository_location, f"--keep-{strat}=99")
- output = self.cmd('list', self.repository_location)
- # All matching backups plus oldest kept
- for a in map(mk_name, to_keep):
- self.assert_in(a, output)
- # Other backups have been pruned
- for a in map(mk_name, to_prune):
- self.assert_not_in(a, output)
- # Delete repo and begin anew
- self.cmd('delete', self.repository_location)
- # With an initial and daily backup, prune daily until oldest is replaced by a monthly backup
- def test_prune_retain_and_expire_oldest(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- # Initial backup
- self._create_archive_ts('original_archive', 2020, 9, 1, 11, 15)
- # Archive and prune daily for 30 days
- for i in range(1, 31):
- self._create_archive_ts('september%02d' % i, 2020, 9, i, 12)
- self.cmd('prune', self.repository_location, '--keep-daily=7', '--keep-monthly=1')
- # Archive and prune 6 days into the next month
- for i in range(1, 7):
- self._create_archive_ts('october%02d' % i, 2020, 10, i, 12)
- self.cmd('prune', self.repository_location, '--keep-daily=7', '--keep-monthly=1')
- # Oldest backup is still retained
- output = self.cmd('prune', '--list', '--dry-run', self.repository_location, '--keep-daily=7', '--keep-monthly=1')
- assert re.search(r'Keeping archive \(rule: monthly\[oldest\] #1' + r'\):\s+original_archive', output)
- # Archive one more day and prune.
- self._create_archive_ts('october07', 2020, 10, 7, 12)
- self.cmd('prune', self.repository_location, '--keep-daily=7', '--keep-monthly=1')
- # Last day of previous month is retained as monthly, and oldest is expired.
- output = self.cmd('prune', '--list', '--dry-run', self.repository_location, '--keep-daily=7', '--keep-monthly=1')
- assert re.search(r'Keeping archive \(rule: monthly #1\):\s+september30', output)
- self.assert_not_in('original_archive', output)
- def test_prune_repository_save_space(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test1', src_dir)
- self.cmd('create', self.repository_location + '::test2', src_dir)
- output = self.cmd('prune', '--list', '--dry-run', self.repository_location, '--keep-daily=1')
- assert re.search(r'Keeping archive \(rule: daily #1\):\s+test2', output)
- assert re.search(r'Would prune:\s+test1', output)
- output = self.cmd('list', self.repository_location)
- self.assert_in('test1', output)
- self.assert_in('test2', output)
- self.cmd('prune', '--save-space', self.repository_location, '--keep-daily=1')
- output = self.cmd('list', self.repository_location)
- self.assert_not_in('test1', output)
- self.assert_in('test2', output)
- def test_prune_repository_prefix(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::foo-2015-08-12-10:00', src_dir)
- self.cmd('create', self.repository_location + '::foo-2015-08-12-20:00', src_dir)
- self.cmd('create', self.repository_location + '::bar-2015-08-12-10:00', src_dir)
- self.cmd('create', self.repository_location + '::bar-2015-08-12-20:00', src_dir)
- output = self.cmd('prune', '--list', '--dry-run', self.repository_location, '--keep-daily=1', '--prefix=foo-')
- assert re.search(r'Keeping archive \(rule: daily #1\):\s+foo-2015-08-12-20:00', output)
- assert re.search(r'Would prune:\s+foo-2015-08-12-10:00', output)
- output = self.cmd('list', self.repository_location)
- self.assert_in('foo-2015-08-12-10:00', output)
- self.assert_in('foo-2015-08-12-20:00', output)
- self.assert_in('bar-2015-08-12-10:00', output)
- self.assert_in('bar-2015-08-12-20:00', output)
- self.cmd('prune', self.repository_location, '--keep-daily=1', '--prefix=foo-')
- output = self.cmd('list', self.repository_location)
- self.assert_not_in('foo-2015-08-12-10:00', output)
- self.assert_in('foo-2015-08-12-20:00', output)
- self.assert_in('bar-2015-08-12-10:00', output)
- self.assert_in('bar-2015-08-12-20:00', output)
- def test_prune_repository_glob(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::2015-08-12-10:00-foo', src_dir)
- self.cmd('create', self.repository_location + '::2015-08-12-20:00-foo', src_dir)
- self.cmd('create', self.repository_location + '::2015-08-12-10:00-bar', src_dir)
- self.cmd('create', self.repository_location + '::2015-08-12-20:00-bar', src_dir)
- output = self.cmd('prune', '--list', '--dry-run', self.repository_location, '--keep-daily=1', '--glob-archives=2015-*-foo')
- assert re.search(r'Keeping archive \(rule: daily #1\):\s+2015-08-12-20:00-foo', output)
- assert re.search(r'Would prune:\s+2015-08-12-10:00-foo', output)
- output = self.cmd('list', self.repository_location)
- self.assert_in('2015-08-12-10:00-foo', output)
- self.assert_in('2015-08-12-20:00-foo', output)
- self.assert_in('2015-08-12-10:00-bar', output)
- self.assert_in('2015-08-12-20:00-bar', output)
- self.cmd('prune', self.repository_location, '--keep-daily=1', '--glob-archives=2015-*-foo')
- output = self.cmd('list', self.repository_location)
- self.assert_not_in('2015-08-12-10:00-foo', output)
- self.assert_in('2015-08-12-20:00-foo', output)
- self.assert_in('2015-08-12-10:00-bar', output)
- self.assert_in('2015-08-12-20:00-bar', output)
- def test_list_glob(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test-1', src_dir)
- self.cmd('create', self.repository_location + '::something-else-than-test-1', src_dir)
- self.cmd('create', self.repository_location + '::test-2', src_dir)
- output = self.cmd('list', '--glob-archives=test-*', self.repository_location)
- self.assert_in('test-1', output)
- self.assert_in('test-2', output)
- self.assert_not_in('something-else', output)
- def test_list_format(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- test_archive = self.repository_location + '::test'
- self.cmd('create', test_archive, src_dir)
- output_1 = self.cmd('list', test_archive)
- output_2 = self.cmd('list', '--format', '{mode} {user:6} {group:6} {size:8d} {mtime} {path}{extra}{NEWLINE}', test_archive)
- output_3 = self.cmd('list', '--format', '{mtime:%s} {path}{NL}', test_archive)
- self.assertEqual(output_1, output_2)
- self.assertNotEqual(output_1, output_3)
- def test_list_repository_format(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', '--comment', 'comment 1', self.repository_location + '::test-1', src_dir)
- self.cmd('create', '--comment', 'comment 2', self.repository_location + '::test-2', src_dir)
- output_1 = self.cmd('list', self.repository_location)
- output_2 = self.cmd('list', '--format', '{archive:<36} {time} [{id}]{NL}', self.repository_location)
- self.assertEqual(output_1, output_2)
- output_1 = self.cmd('list', '--short', self.repository_location)
- self.assertEqual(output_1, 'test-1\ntest-2\n')
- output_1 = self.cmd('list', '--format', '{barchive}/', self.repository_location)
- self.assertEqual(output_1, 'test-1/test-2/')
- output_3 = self.cmd('list', '--format', '{name} {comment}{NL}', self.repository_location)
- self.assert_in('test-1 comment 1\n', output_3)
- self.assert_in('test-2 comment 2\n', output_3)
- def test_list_hash(self):
- self.create_regular_file('empty_file', size=0)
- self.create_regular_file('amb', contents=b'a' * 1000000)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- test_archive = self.repository_location + '::test'
- self.cmd('create', test_archive, 'input')
- output = self.cmd('list', '--format', '{sha256} {path}{NL}', test_archive)
- assert "cdc76e5c9914fb9281a1c7e284d73e67f1809a48a497200e046d39ccc7112cd0 input/amb" in output
- assert "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 input/empty_file" in output
- def test_list_consider_checkpoints(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test1', src_dir)
- # these are not really a checkpoints, but they look like some:
- self.cmd('create', self.repository_location + '::test2.checkpoint', src_dir)
- self.cmd('create', self.repository_location + '::test3.checkpoint.1', src_dir)
- output = self.cmd('list', self.repository_location)
- assert "test1" in output
- assert "test2.checkpoint" not in output
- assert "test3.checkpoint.1" not in output
- output = self.cmd('list', '--consider-checkpoints', self.repository_location)
- assert "test1" in output
- assert "test2.checkpoint" in output
- assert "test3.checkpoint.1" in output
- def test_list_chunk_counts(self):
- self.create_regular_file('empty_file', size=0)
- self.create_regular_file('two_chunks')
- filename = os.path.join(self.input_path, 'two_chunks')
- with open(filename, 'wb') as fd:
- fd.write(b'abba' * 2000000)
- fd.write(b'baab' * 2000000)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- test_archive = self.repository_location + '::test'
- self.cmd('create', test_archive, 'input')
- os.unlink(filename) # save space on TMPDIR
- output = self.cmd('list', '--format', '{num_chunks} {unique_chunks} {path}{NL}', test_archive)
- assert "0 0 input/empty_file" in output
- assert "2 2 input/two_chunks" in output
- def test_list_size(self):
- self.create_regular_file('compressible_file', size=10000)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- test_archive = self.repository_location + '::test'
- self.cmd('create', '-C', 'lz4', test_archive, 'input')
- output = self.cmd('list', '--format', '{size} {csize} {dsize} {dcsize} {path}{NL}', test_archive)
- size, csize, dsize, dcsize, path = output.split("\n")[1].split(" ")
- assert int(csize) < int(size)
- assert int(dcsize) < int(dsize)
- assert int(dsize) <= int(size)
- assert int(dcsize) <= int(csize)
- def test_list_json(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- list_repo = json.loads(self.cmd('list', '--json', self.repository_location))
- repository = list_repo['repository']
- assert len(repository['id']) == 64
- assert datetime.strptime(repository['last_modified'], ISO_FORMAT) # must not raise
- assert list_repo['encryption']['mode'] == 'repokey'
- assert 'keyfile' not in list_repo['encryption']
- archive0 = list_repo['archives'][0]
- assert datetime.strptime(archive0['time'], ISO_FORMAT) # must not raise
- list_archive = self.cmd('list', '--json-lines', self.repository_location + '::test')
- items = [json.loads(s) for s in list_archive.splitlines()]
- assert len(items) == 2
- file1 = items[1]
- assert file1['path'] == 'input/file1'
- assert file1['size'] == 81920
- assert datetime.strptime(file1['mtime'], ISO_FORMAT) # must not raise
- list_archive = self.cmd('list', '--json-lines', '--format={sha256}', self.repository_location + '::test')
- items = [json.loads(s) for s in list_archive.splitlines()]
- assert len(items) == 2
- file1 = items[1]
- assert file1['path'] == 'input/file1'
- assert file1['sha256'] == 'b2915eb69f260d8d3c25249195f2c8f4f716ea82ec760ae929732c0262442b2b'
- def test_list_json_args(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- if self.FORK_DEFAULT:
- self.cmd('list', '--json-lines', self.repository_location, exit_code=2)
- else:
- with pytest.raises(CommandError):
- self.cmd('list', '--json-lines', self.repository_location)
- if self.FORK_DEFAULT:
- self.cmd('list', '--json', self.repository_location + '::archive', exit_code=2)
- else:
- with pytest.raises(CommandError):
- self.cmd('list', '--json', self.repository_location + '::archive')
- def test_log_json(self):
- self.create_test_files()
- self.cmd('init', '--encryption=repokey', self.repository_location)
- log = self.cmd('create', '--log-json', self.repository_location + '::test', 'input', '--list', '--debug')
- messages = {} # type -> message, one of each kind
- for line in log.splitlines():
- msg = json.loads(line)
- messages[msg['type']] = msg
- file_status = messages['file_status']
- assert 'status' in file_status
- assert file_status['path'].startswith('input')
- log_message = messages['log_message']
- assert isinstance(log_message['time'], float)
- assert log_message['levelname'] == 'DEBUG' # there should only be DEBUG messages
- assert isinstance(log_message['message'], str)
- def test_debug_profile(self):
- self.create_test_files()
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input', '--debug-profile=create.prof')
- self.cmd('debug', 'convert-profile', 'create.prof', 'create.pyprof')
- stats = pstats.Stats('create.pyprof')
- stats.strip_dirs()
- stats.sort_stats('cumtime')
- self.cmd('create', self.repository_location + '::test2', 'input', '--debug-profile=create.pyprof')
- stats = pstats.Stats('create.pyprof') # Only do this on trusted data!
- stats.strip_dirs()
- stats.sort_stats('cumtime')
- def test_common_options(self):
- self.create_test_files()
- self.cmd('init', '--encryption=repokey', self.repository_location)
- log = self.cmd('--debug', 'create', self.repository_location + '::test', 'input')
- assert 'security: read previous location' in log
- def _get_sizes(self, compression, compressible, size=10000):
- if compressible:
- contents = b'X' * size
- else:
- contents = os.urandom(size)
- self.create_regular_file('file', contents=contents)
- self.cmd('init', '--encryption=none', self.repository_location)
- archive = self.repository_location + '::test'
- self.cmd('create', '-C', compression, archive, 'input')
- output = self.cmd('list', '--format', '{size} {csize} {path}{NL}', archive)
- size, csize, path = output.split("\n")[1].split(" ")
- return int(size), int(csize)
- def test_compression_none_compressible(self):
- size, csize = self._get_sizes('none', compressible=True)
- assert csize == size + 3
- def test_compression_none_uncompressible(self):
- size, csize = self._get_sizes('none', compressible=False)
- assert csize == size + 3
- def test_compression_zlib_compressible(self):
- size, csize = self._get_sizes('zlib', compressible=True)
- assert csize < size * 0.1
- def test_compression_zlib_uncompressible(self):
- size, csize = self._get_sizes('zlib', compressible=False)
- assert csize >= size
- def test_compression_lz4_compressible(self):
- size, csize = self._get_sizes('lz4', compressible=True)
- assert csize < size * 0.1
- def test_compression_lz4_uncompressible(self):
- size, csize = self._get_sizes('lz4', compressible=False)
- assert csize == size + 3 # same as compression 'none'
- def test_compression_lzma_compressible(self):
- size, csize = self._get_sizes('lzma', compressible=True)
- assert csize < size * 0.1
- def test_compression_lzma_uncompressible(self):
- size, csize = self._get_sizes('lzma', compressible=False)
- assert csize == size + 3 # same as compression 'none'
- def test_compression_zstd_compressible(self):
- size, csize = self._get_sizes('zstd', compressible=True)
- assert csize < size * 0.1
- def test_compression_zstd_uncompressible(self):
- size, csize = self._get_sizes('zstd', compressible=False)
- assert csize == size + 3 # same as compression 'none'
- def test_compression_auto_compressible(self):
- # this is testing whether the "auto" meta-compressor behaves as expected:
- # - it checks whether the data is compressible (detector is the lz4 compressor)
- # - as the data is compressible, it runs the "expensive" zlib compression on it
- # - it returns whatever is shortest, either the lz4 compressed data or the zlib compressed data.
- auto_size, auto_csize = self._get_sizes('auto,zlib', compressible=True)
- self.cmd('delete', self.repository_location)
- zlib_size, zlib_csize = self._get_sizes('zlib', compressible=True)
- self.cmd('delete', self.repository_location)
- lz4_size, lz4_csize = self._get_sizes('lz4', compressible=True)
- assert auto_size == zlib_size == lz4_size
- assert auto_csize < auto_size * 0.1 # it did compress!
- smallest_csize = min(zlib_csize, lz4_csize)
- assert auto_csize == smallest_csize
- def test_compression_auto_uncompressible(self):
- # this is testing whether the "auto" meta-compressor chooses the "none" compression (storing the
- # data "as is" with just the 3 bytes header) if all else would result in something bigger.
- size, csize = self._get_sizes('auto,zlib', compressible=False)
- assert csize >= size
- assert csize == size + 3 # same as compression 'none'
- def test_change_passphrase(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- os.environ['BORG_NEW_PASSPHRASE'] = 'newpassphrase'
- # here we have both BORG_PASSPHRASE and BORG_NEW_PASSPHRASE set:
- self.cmd('key', 'change-passphrase', self.repository_location)
- os.environ['BORG_PASSPHRASE'] = 'newpassphrase'
- self.cmd('list', self.repository_location)
- def test_break_lock(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('break-lock', self.repository_location)
- def test_usage(self):
- self.cmd()
- self.cmd('-h')
- def test_help(self):
- assert 'Borg' in self.cmd('help')
- assert 'patterns' in self.cmd('help', 'patterns')
- assert 'Initialize' in self.cmd('help', 'init')
- assert 'positional arguments' not in self.cmd('help', 'init', '--epilog-only')
- assert 'This command initializes' not in self.cmd('help', 'init', '--usage-only')
- @unittest.skipUnless(llfuse, 'llfuse not installed')
- def test_fuse(self):
- def has_noatime(some_file):
- atime_before = os.stat(some_file).st_atime_ns
- try:
- os.close(os.open(some_file, flags_noatime))
- except PermissionError:
- return False
- else:
- atime_after = os.stat(some_file).st_atime_ns
- noatime_used = flags_noatime != flags_normal
- return noatime_used and same_ts_ns(atime_before, atime_after)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_test_files()
- have_noatime = has_noatime('input/file1')
- self.cmd('create', '--exclude-nodump', '--atime', self.repository_location + '::archive', 'input')
- self.cmd('create', '--exclude-nodump', '--atime', self.repository_location + '::archive2', 'input')
- if has_lchflags:
- # remove the file we did not backup, so input and output become equal
- os.remove(os.path.join('input', 'flagfile'))
- mountpoint = os.path.join(self.tmpdir, 'mountpoint')
- # mount the whole repository, archive contents shall show up in archivename subdirs of mountpoint:
- with self.fuse_mount(self.repository_location, mountpoint):
- # flags are not supported by the FUSE mount
- # we also ignore xattrs here, they are tested separately
- self.assert_dirs_equal(self.input_path, os.path.join(mountpoint, 'archive', 'input'),
- ignore_flags=True, ignore_xattrs=True)
- self.assert_dirs_equal(self.input_path, os.path.join(mountpoint, 'archive2', 'input'),
- ignore_flags=True, ignore_xattrs=True)
- # mount only 1 archive, its contents shall show up directly in mountpoint:
- with self.fuse_mount(self.repository_location + '::archive', mountpoint):
- self.assert_dirs_equal(self.input_path, os.path.join(mountpoint, 'input'),
- ignore_flags=True, ignore_xattrs=True)
- # regular file
- in_fn = 'input/file1'
- out_fn = os.path.join(mountpoint, 'input', 'file1')
- # stat
- sti1 = os.stat(in_fn)
- sto1 = os.stat(out_fn)
- assert sti1.st_mode == sto1.st_mode
- assert sti1.st_uid == sto1.st_uid
- assert sti1.st_gid == sto1.st_gid
- assert sti1.st_size == sto1.st_size
- if have_noatime:
- assert same_ts_ns(sti1.st_atime * 1e9, sto1.st_atime * 1e9)
- assert same_ts_ns(sti1.st_ctime * 1e9, sto1.st_ctime * 1e9)
- assert same_ts_ns(sti1.st_mtime * 1e9, sto1.st_mtime * 1e9)
- if are_hardlinks_supported():
- # note: there is another hardlink to this, see below
- assert sti1.st_nlink == sto1.st_nlink == 2
- # read
- with open(in_fn, 'rb') as in_f, open(out_fn, 'rb') as out_f:
- assert in_f.read() == out_f.read()
- # hardlink (to 'input/file1')
- if are_hardlinks_supported():
- in_fn = 'input/hardlink'
- out_fn = os.path.join(mountpoint, 'input', 'hardlink')
- sti2 = os.stat(in_fn)
- sto2 = os.stat(out_fn)
- assert sti2.st_nlink == sto2.st_nlink == 2
- assert sto1.st_ino == sto2.st_ino
- # symlink
- if are_symlinks_supported():
- in_fn = 'input/link1'
- out_fn = os.path.join(mountpoint, 'input', 'link1')
- sti = os.stat(in_fn, follow_symlinks=False)
- sto = os.stat(out_fn, follow_symlinks=False)
- assert sti.st_size == len('somewhere')
- assert sto.st_size == len('somewhere')
- assert stat.S_ISLNK(sti.st_mode)
- assert stat.S_ISLNK(sto.st_mode)
- assert os.readlink(in_fn) == os.readlink(out_fn)
- # FIFO
- if are_fifos_supported():
- out_fn = os.path.join(mountpoint, 'input', 'fifo1')
- sto = os.stat(out_fn)
- assert stat.S_ISFIFO(sto.st_mode)
- # list/read xattrs
- try:
- in_fn = 'input/fusexattr'
- out_fn = os.fsencode(os.path.join(mountpoint, 'input', 'fusexattr'))
- if not xattr.XATTR_FAKEROOT and xattr.is_enabled(self.input_path):
- assert sorted(no_selinux(xattr.listxattr(out_fn))) == [b'user.empty', b'user.foo', ]
- assert xattr.getxattr(out_fn, b'user.foo') == b'bar'
- assert xattr.getxattr(out_fn, b'user.empty') == b''
- else:
- assert no_selinux(xattr.listxattr(out_fn)) == []
- try:
- xattr.getxattr(out_fn, b'user.foo')
- except OSError as e:
- assert e.errno == llfuse.ENOATTR
- else:
- assert False, "expected OSError(ENOATTR), but no error was raised"
- except OSError as err:
- if sys.platform.startswith(('nothing_here_now', )) and err.errno == errno.ENOTSUP:
- # some systems have no xattr support on FUSE
- pass
- else:
- raise
- @unittest.skipUnless(llfuse, 'llfuse not installed')
- def test_fuse_versions_view(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('test', contents=b'first')
- if are_hardlinks_supported():
- self.create_regular_file('hardlink1', contents=b'123456')
- os.link('input/hardlink1', 'input/hardlink2')
- os.link('input/hardlink1', 'input/hardlink3')
- self.cmd('create', self.repository_location + '::archive1', 'input')
- self.create_regular_file('test', contents=b'second')
- self.cmd('create', self.repository_location + '::archive2', 'input')
- mountpoint = os.path.join(self.tmpdir, 'mountpoint')
- # mount the whole repository, archive contents shall show up in versioned view:
- with self.fuse_mount(self.repository_location, mountpoint, '-o', 'versions'):
- path = os.path.join(mountpoint, 'input', 'test') # filename shows up as directory ...
- files = os.listdir(path)
- assert all(f.startswith('test.') for f in files) # ... with files test.xxxxx in there
- assert {b'first', b'second'} == {open(os.path.join(path, f), 'rb').read() for f in files}
- if are_hardlinks_supported():
- hl1 = os.path.join(mountpoint, 'input', 'hardlink1', 'hardlink1.00001')
- hl2 = os.path.join(mountpoint, 'input', 'hardlink2', 'hardlink2.00001')
- hl3 = os.path.join(mountpoint, 'input', 'hardlink3', 'hardlink3.00001')
- assert os.stat(hl1).st_ino == os.stat(hl2).st_ino == os.stat(hl3).st_ino
- assert open(hl3, 'rb').read() == b'123456'
- # similar again, but exclude the hardlink master:
- with self.fuse_mount(self.repository_location, mountpoint, '-o', 'versions', '-e', 'input/hardlink1'):
- if are_hardlinks_supported():
- hl2 = os.path.join(mountpoint, 'input', 'hardlink2', 'hardlink2.00001')
- hl3 = os.path.join(mountpoint, 'input', 'hardlink3', 'hardlink3.00001')
- assert os.stat(hl2).st_ino == os.stat(hl3).st_ino
- assert open(hl3, 'rb').read() == b'123456'
- @unittest.skipUnless(llfuse, 'llfuse not installed')
- def test_fuse_allow_damaged_files(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('archive')
- # Get rid of a chunk and repair it
- archive, repository = self.open_archive('archive')
- with repository:
- for item in archive.iter_items():
- if item.path.endswith('testsuite/archiver.py'):
- repository.delete(item.chunks[-1].id)
- path = item.path # store full path for later
- break
- else:
- assert False # missed the file
- repository.commit(compact=False)
- self.cmd('check', '--repair', self.repository_location, exit_code=0)
- mountpoint = os.path.join(self.tmpdir, 'mountpoint')
- with self.fuse_mount(self.repository_location + '::archive', mountpoint):
- with pytest.raises(OSError) as excinfo:
- open(os.path.join(mountpoint, path))
- assert excinfo.value.errno == errno.EIO
- with self.fuse_mount(self.repository_location + '::archive', mountpoint, '-o', 'allow_damaged_files'):
- open(os.path.join(mountpoint, path)).close()
- @unittest.skipUnless(llfuse, 'llfuse not installed')
- def test_fuse_mount_options(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('arch11')
- self.create_src_archive('arch12')
- self.create_src_archive('arch21')
- self.create_src_archive('arch22')
- mountpoint = os.path.join(self.tmpdir, 'mountpoint')
- with self.fuse_mount(self.repository_location, mountpoint, '--first=2', '--sort=name'):
- assert sorted(os.listdir(os.path.join(mountpoint))) == ['arch11', 'arch12']
- with self.fuse_mount(self.repository_location, mountpoint, '--last=2', '--sort=name'):
- assert sorted(os.listdir(os.path.join(mountpoint))) == ['arch21', 'arch22']
- with self.fuse_mount(self.repository_location, mountpoint, '--glob-archives=arch1*'):
- assert sorted(os.listdir(os.path.join(mountpoint))) == ['arch11', 'arch12']
- with self.fuse_mount(self.repository_location, mountpoint, '--glob-archives=arch2*'):
- assert sorted(os.listdir(os.path.join(mountpoint))) == ['arch21', 'arch22']
- with self.fuse_mount(self.repository_location, mountpoint, '--glob-archives=arch*'):
- assert sorted(os.listdir(os.path.join(mountpoint))) == ['arch11', 'arch12', 'arch21', 'arch22']
- with self.fuse_mount(self.repository_location, mountpoint, '--glob-archives=nope*'):
- assert sorted(os.listdir(os.path.join(mountpoint))) == []
- @unittest.skipUnless(llfuse, 'llfuse not installed')
- def test_migrate_lock_alive(self):
- """Both old_id and new_id must not be stale during lock migration / daemonization."""
- from functools import wraps
- import pickle
- import traceback
- # Check results are communicated from the borg mount background process
- # to the pytest process by means of a serialized dict object stored in this file.
- assert_data_file = os.path.join(self.tmpdir, 'migrate_lock_assert_data.pickle')
- # Decorates Lock.migrate_lock() with process_alive() checks before and after.
- # (We don't want to mix testing code into runtime.)
- def write_assert_data(migrate_lock):
- @wraps(migrate_lock)
- def wrapper(self, old_id, new_id):
- wrapper.num_calls += 1
- assert_data = {
- 'num_calls': wrapper.num_calls,
- 'old_id': old_id,
- 'new_id': new_id,
- 'before': {
- 'old_id_alive': platform.process_alive(*old_id),
- 'new_id_alive': platform.process_alive(*new_id)},
- 'exception': None,
- 'exception.extr_tb': None,
- 'after': {
- 'old_id_alive': None,
- 'new_id_alive': None}}
- try:
- with open(assert_data_file, 'wb') as _out:
- pickle.dump(assert_data, _out)
- except:
- pass
- try:
- return migrate_lock(self, old_id, new_id)
- except BaseException as e:
- assert_data['exception'] = e
- assert_data['exception.extr_tb'] = traceback.extract_tb(e.__traceback__)
- finally:
- assert_data['after'].update({
- 'old_id_alive': platform.process_alive(*old_id),
- 'new_id_alive': platform.process_alive(*new_id)})
- try:
- with open(assert_data_file, 'wb') as _out:
- pickle.dump(assert_data, _out)
- except:
- pass
- wrapper.num_calls = 0
- return wrapper
- # Decorate
- borg.locking.Lock.migrate_lock = write_assert_data(borg.locking.Lock.migrate_lock)
- try:
- self.cmd('init', '--encryption=none', self.repository_location)
- self.create_src_archive('arch')
- mountpoint = os.path.join(self.tmpdir, 'mountpoint')
- # In order that the decoration is kept for the borg mount process, we must not spawn, but actually fork;
- # not to be confused with the forking in borg.helpers.daemonize() which is done as well.
- with self.fuse_mount(self.repository_location, mountpoint, os_fork=True):
- pass
- with open(assert_data_file, 'rb') as _in:
- assert_data = pickle.load(_in)
- print(f'\nLock.migrate_lock(): assert_data = {assert_data!r}.', file=sys.stderr, flush=True)
- exception = assert_data['exception']
- if exception is not None:
- extracted_tb = assert_data['exception.extr_tb']
- print(
- 'Lock.migrate_lock() raised an exception:\n',
- 'Traceback (most recent call last):\n',
- *traceback.format_list(extracted_tb),
- *traceback.format_exception(exception.__class__, exception, None),
- sep='', end='', file=sys.stderr, flush=True)
- assert assert_data['num_calls'] == 1, "Lock.migrate_lock() must be called exactly once."
- assert exception is None, "Lock.migrate_lock() may not raise an exception."
- assert_data_before = assert_data['before']
- assert assert_data_before['old_id_alive'], "old_id must be alive (=must not be stale) when calling Lock.migrate_lock()."
- assert assert_data_before['new_id_alive'], "new_id must be alive (=must not be stale) when calling Lock.migrate_lock()."
- assert_data_after = assert_data['after']
- assert assert_data_after['old_id_alive'], "old_id must be alive (=must not be stale) when Lock.migrate_lock() has returned."
- assert assert_data_after['new_id_alive'], "new_id must be alive (=must not be stale) when Lock.migrate_lock() has returned."
- finally:
- # Undecorate
- borg.locking.Lock.migrate_lock = borg.locking.Lock.migrate_lock.__wrapped__
- def verify_aes_counter_uniqueness(self, method):
- seen = set() # Chunks already seen
- used = set() # counter values already used
- def verify_uniqueness():
- with Repository(self.repository_path) as repository:
- for id, _ in repository.open_index(repository.get_transaction_id()).iteritems():
- data = repository.get(id)
- hash = sha256(data).digest()
- if hash not in seen:
- seen.add(hash)
- num_blocks = num_cipher_blocks(len(data) - 41)
- nonce = bytes_to_long(data[33:41])
- for counter in range(nonce, nonce + num_blocks):
- self.assert_not_in(counter, used)
- used.add(counter)
- self.create_test_files()
- os.environ['BORG_PASSPHRASE'] = 'passphrase'
- self.cmd('init', '--encryption=' + method, self.repository_location)
- verify_uniqueness()
- self.cmd('create', self.repository_location + '::test', 'input')
- verify_uniqueness()
- self.cmd('create', self.repository_location + '::test.2', 'input')
- verify_uniqueness()
- self.cmd('delete', self.repository_location + '::test.2')
- verify_uniqueness()
- def test_aes_counter_uniqueness_keyfile(self):
- self.verify_aes_counter_uniqueness('keyfile')
- def test_aes_counter_uniqueness_passphrase(self):
- self.verify_aes_counter_uniqueness('repokey')
- def test_debug_dump_archive_items(self):
- self.create_test_files()
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- output = self.cmd('debug', 'dump-archive-items', self.repository_location + '::test')
- output_dir = sorted(os.listdir('output'))
- assert len(output_dir) > 0 and output_dir[0].startswith('000000_')
- assert 'Done.' in output
- def test_debug_dump_repo_objs(self):
- self.create_test_files()
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- output = self.cmd('debug', 'dump-repo-objs', self.repository_location)
- output_dir = sorted(os.listdir('output'))
- assert len(output_dir) > 0 and output_dir[0].startswith('00000000_')
- assert 'Done.' in output
- def test_debug_put_get_delete_obj(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- data = b'some data'
- self.create_regular_file('file', contents=data)
- output = self.cmd('debug', 'id-hash', self.repository_location, 'input/file')
- id_hash = output.strip()
- output = self.cmd('debug', 'put-obj', self.repository_location, id_hash, 'input/file')
- assert id_hash in output
- output = self.cmd('debug', 'get-obj', self.repository_location, id_hash, 'output/file')
- assert id_hash in output
- with open('output/file', 'rb') as f:
- data_read = f.read()
- assert data == data_read
- output = self.cmd('debug', 'delete-obj', self.repository_location, id_hash)
- assert "deleted" in output
- output = self.cmd('debug', 'delete-obj', self.repository_location, id_hash)
- assert "not found" in output
- output = self.cmd('debug', 'delete-obj', self.repository_location, 'invalid')
- assert "is invalid" in output
- def test_init_interrupt(self):
- def raise_eof(*args):
- raise EOFError
- with patch.object(KeyfileKeyBase, 'create', raise_eof):
- if self.FORK_DEFAULT:
- self.cmd('init', '--encryption=repokey', self.repository_location, exit_code=2)
- else:
- with pytest.raises(CancelledByUser):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- assert not os.path.exists(self.repository_location)
- def test_init_requires_encryption_option(self):
- self.cmd('init', self.repository_location, exit_code=2)
- def test_init_nested_repositories(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- if self.FORK_DEFAULT:
- self.cmd('init', '--encryption=repokey', self.repository_location + '/nested', exit_code=2)
- else:
- with pytest.raises(Repository.AlreadyExists):
- self.cmd('init', '--encryption=repokey', self.repository_location + '/nested')
- def test_init_refuse_to_overwrite_keyfile(self):
- """BORG_KEY_FILE=something borg init should quit if "something" already exists.
- See https://github.com/borgbackup/borg/pull/6046"""
- keyfile = os.path.join(self.tmpdir, 'keyfile')
- with environment_variable(BORG_KEY_FILE=keyfile):
- self.cmd('init', '--encryption=keyfile', self.repository_location + '0')
- with open(keyfile) as file:
- before = file.read()
- arg = ('init', '--encryption=keyfile', self.repository_location + '1')
- if self.FORK_DEFAULT:
- self.cmd(*arg, exit_code=2)
- else:
- with pytest.raises(borg.helpers.errors.Error):
- self.cmd(*arg)
- with open(keyfile) as file:
- after = file.read()
- assert before == after
- def test_init_keyfile_same_path_creates_new_keys(self):
- """Regression test for GH issue #6230.
- When creating a new keyfile-encrypted repository at the same filesystem path
- multiple times (e.g., after moving/unmounting the previous one), Borg must not
- overwrite or reuse the existing key file. Instead, it should create a new key
- file in the keys directory, appending a numeric suffix like .2, .3, ...
- """
- # First init at path A
- self.cmd('init', '--encryption=keyfile', self.repository_location)
- keys = sorted(os.listdir(self.keys_path))
- assert len(keys) == 1
- base_key = keys[0]
- base_path = os.path.join(self.keys_path, base_key)
- with open(base_path, 'rb') as f:
- base_contents = f.read()
- # Simulate moving/unmounting the repo by changing the path and initializing again at the same path
- # We remove the repo to allow re-init at the same path
- shutil.rmtree(self.repository_path)
- self.cmd('init', '--encryption=keyfile', self.repository_location)
- keys = sorted(os.listdir(self.keys_path))
- assert len(keys) == 2
- assert base_key in keys
- # The new file should be base_key suffixed with .2
- assert any(k == base_key + '.2' for k in keys)
- second_path = os.path.join(self.keys_path, base_key + '.2')
- with open(second_path, 'rb') as f:
- second_contents = f.read()
- assert second_contents != base_contents
- # Remove repo again and init a third time at same path
- shutil.rmtree(self.repository_path)
- self.cmd('init', '--encryption=keyfile', self.repository_location)
- keys = sorted(os.listdir(self.keys_path))
- assert len(keys) == 3
- assert any(k == base_key + '.3' for k in keys)
- third_path = os.path.join(self.keys_path, base_key + '.3')
- with open(third_path, 'rb') as f:
- third_contents = f.read()
- # Ensure all keys are distinct
- assert third_contents != base_contents
- assert third_contents != second_contents
- def check_cache(self):
- # First run a regular borg check
- self.cmd('check', self.repository_location)
- # Then check that the cache on disk matches exactly what's in the repo.
- with self.open_repository() as repository:
- manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- with Cache(repository, key, manifest, sync=False) as cache:
- original_chunks = cache.chunks
- Cache.destroy(repository)
- with Cache(repository, key, manifest) as cache:
- correct_chunks = cache.chunks
- assert original_chunks is not correct_chunks
- seen = set()
- for id, (refcount, size, csize) in correct_chunks.iteritems():
- o_refcount, o_size, o_csize = original_chunks[id]
- assert refcount == o_refcount
- assert size == o_size
- assert csize == o_csize
- seen.add(id)
- for id, (refcount, size, csize) in original_chunks.iteritems():
- assert id in seen
- def test_check_cache(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- with self.open_repository() as repository:
- manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- with Cache(repository, key, manifest, sync=False) as cache:
- cache.begin_txn()
- cache.chunks.incref(list(cache.chunks.iteritems())[0][0])
- cache.commit()
- with pytest.raises(AssertionError):
- self.check_cache()
- def test_env_use_chunks_archive(self):
- self.create_test_files()
- with environment_variable(BORG_USE_CHUNKS_ARCHIVE="no"):
- self.cmd("init", "--encryption=repokey", self.repository_location)
- self.cmd("create", self.repository_location + "::test", "input")
- repository_id = bin_to_hex(self._extract_repository_id(self.repository_path))
- cache_path = os.path.join(self.cache_path, repository_id)
- assert os.path.exists(cache_path)
- assert os.path.exists(os.path.join(cache_path, "chunks.archive.d"))
- assert len(os.listdir(os.path.join(cache_path, "chunks.archive.d"))) == 0
- self.cmd("delete", self.repository_location, "--cache-only")
- with environment_variable(BORG_USE_CHUNKS_ARCHIVE="yes"):
- self.cmd("create", self.repository_location + "::test2", "input")
- assert len(os.listdir(os.path.join(cache_path, "chunks.archive.d"))) > 0
- def test_recreate_target_rc(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- if self.FORK_DEFAULT:
- output = self.cmd('recreate', self.repository_location, '--target=asdf', exit_code=2)
- assert 'Need to specify single archive' in output
- else:
- with pytest.raises(CommandError):
- self.cmd('recreate', self.repository_location, '--target=asdf')
- def test_recreate_target(self):
- self.create_test_files()
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.check_cache()
- archive = self.repository_location + '::test0'
- self.cmd('create', archive, 'input')
- self.check_cache()
- original_archive = self.cmd('list', self.repository_location)
- self.cmd('recreate', archive, 'input/dir2', '-e', 'input/dir2/file3', '--target=new-archive')
- self.check_cache()
- archives = self.cmd('list', self.repository_location)
- assert original_archive in archives
- assert 'new-archive' in archives
- archive = self.repository_location + '::new-archive'
- listing = self.cmd('list', '--short', archive)
- assert 'file1' not in listing
- assert 'dir2/file2' in listing
- assert 'dir2/file3' not in listing
- def test_recreate_basic(self):
- self.create_test_files()
- self.create_regular_file('dir2/file3', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- archive = self.repository_location + '::test0'
- self.cmd('create', archive, 'input')
- self.cmd('recreate', archive, 'input/dir2', '-e', 'input/dir2/file3')
- self.check_cache()
- listing = self.cmd('list', '--short', archive)
- assert 'file1' not in listing
- assert 'dir2/file2' in listing
- assert 'dir2/file3' not in listing
- @pytest.mark.skipif(not are_hardlinks_supported(), reason='hardlinks not supported')
- def test_recreate_subtree_hardlinks(self):
- # This is essentially the same problem set as in test_extract_hardlinks
- self._extract_hardlinks_setup()
- self.cmd('create', self.repository_location + '::test2', 'input')
- self.cmd('recreate', self.repository_location + '::test', 'input/dir1')
- self.check_cache()
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- assert os.stat('input/dir1/hardlink').st_nlink == 2
- assert os.stat('input/dir1/subdir/hardlink').st_nlink == 2
- assert os.stat('input/dir1/aaaa').st_nlink == 2
- assert os.stat('input/dir1/source2').st_nlink == 2
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test2')
- assert os.stat('input/dir1/hardlink').st_nlink == 4
- def test_recreate_rechunkify(self):
- with open(os.path.join(self.input_path, 'large_file'), 'wb') as fd:
- fd.write(b'a' * 280)
- fd.write(b'b' * 280)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', '--chunker-params', '7,9,8,127', self.repository_location + '::test1', 'input')
- self.cmd('create', self.repository_location + '::test2', 'input', '--files-cache=disabled')
- list = self.cmd('list', self.repository_location + '::test1', 'input/large_file',
- '--format', '{num_chunks} {unique_chunks}')
- num_chunks, unique_chunks = map(int, list.split(' '))
- # test1 and test2 do not deduplicate
- assert num_chunks == unique_chunks
- self.cmd('recreate', self.repository_location, '--chunker-params', 'default')
- self.check_cache()
- # test1 and test2 do deduplicate after recreate
- assert int(self.cmd('list', self.repository_location + '::test1', 'input/large_file', '--format={size}'))
- assert not int(self.cmd('list', self.repository_location + '::test1', 'input/large_file',
- '--format', '{unique_chunks}'))
- def test_recreate_fixed_rechunkify(self):
- with open(os.path.join(self.input_path, 'file'), 'wb') as fd:
- fd.write(b'a' * 8192)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', '--chunker-params', '7,9,8,127', self.repository_location + '::test', 'input')
- output = self.cmd('list', self.repository_location + '::test', 'input/file',
- '--format', '{num_chunks}')
- num_chunks = int(output)
- assert num_chunks > 2
- self.cmd('recreate', self.repository_location, '--chunker-params', 'fixed,4096')
- output = self.cmd('list', self.repository_location + '::test', 'input/file',
- '--format', '{num_chunks}')
- num_chunks = int(output)
- assert num_chunks == 2
- def test_recreate_no_rechunkify(self):
- with open(os.path.join(self.input_path, 'file'), 'wb') as fd:
- fd.write(b'a' * 8192)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- # first create an archive with non-default chunker params:
- self.cmd('create', '--chunker-params', '7,9,8,127', self.repository_location + '::test', 'input')
- output = self.cmd('list', self.repository_location + '::test', 'input/file',
- '--format', '{num_chunks}')
- num_chunks = int(output)
- # now recreate the archive and do NOT specify chunker params:
- output = self.cmd('recreate', '--debug', '--exclude', 'filename_never_matches', self.repository_location + '::test')
- assert 'Rechunking' not in output # we did not give --chunker-params, so it must not rechunk!
- output = self.cmd('list', self.repository_location + '::test', 'input/file',
- '--format', '{num_chunks}')
- num_chunks_after_recreate = int(output)
- assert num_chunks == num_chunks_after_recreate
- def test_recreate_recompress(self):
- self.create_regular_file('compressible', size=10000)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input', '-C', 'none')
- file_list = self.cmd('list', self.repository_location + '::test', 'input/compressible',
- '--format', '{size} {csize} {sha256}')
- size, csize, sha256_before = file_list.split(' ')
- assert int(csize) >= int(size) # >= due to metadata overhead
- self.cmd('recreate', self.repository_location, '-C', 'lz4', '--recompress')
- self.check_cache()
- file_list = self.cmd('list', self.repository_location + '::test', 'input/compressible',
- '--format', '{size} {csize} {sha256}')
- size, csize, sha256_after = file_list.split(' ')
- assert int(csize) < int(size)
- assert sha256_before == sha256_after
- def test_recreate_timestamp(self):
- local_timezone = datetime.now(timezone(timedelta(0))).astimezone().tzinfo
- self.create_test_files()
- self.cmd('init', '--encryption=repokey', self.repository_location)
- archive = self.repository_location + '::test0'
- self.cmd('create', archive, 'input')
- self.cmd('recreate', '--timestamp', "1970-01-02T00:00:00", '--comment',
- 'test', archive)
- info = self.cmd('info', archive).splitlines()
- dtime = datetime(1970, 1, 2) + local_timezone.utcoffset(None)
- s_time = dtime.strftime("%Y-%m-%d")
- assert any([re.search(r'Time \(start\).+ %s' % s_time, item) for item in info])
- assert any([re.search(r'Time \(end\).+ %s' % s_time, item) for item in info])
- def test_recreate_dry_run(self):
- self.create_regular_file('compressible', size=10000)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- archives_before = self.cmd('list', self.repository_location + '::test')
- self.cmd('recreate', self.repository_location, '-n', '-e', 'input/compressible')
- self.check_cache()
- archives_after = self.cmd('list', self.repository_location + '::test')
- assert archives_after == archives_before
- def test_recreate_skips_nothing_to_do(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- info_before = self.cmd('info', self.repository_location + '::test')
- self.cmd('recreate', self.repository_location, '--chunker-params', 'default')
- self.check_cache()
- info_after = self.cmd('info', self.repository_location + '::test')
- assert info_before == info_after # includes archive ID
- def test_with_lock(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- lock_path = os.path.join(self.repository_path, 'lock.exclusive')
- cmd = 'python3', '-c', 'import os, sys; sys.exit(42 if os.path.exists("%s") else 23)' % lock_path
- self.cmd('with-lock', self.repository_location, *cmd, fork=True, exit_code=42)
- def test_with_lock_non_existent_command(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- cmd = ['non_existent_command', ]
- self.cmd('with-lock', self.repository_location, *cmd, fork=True, exit_code=EXIT_ERROR)
- def test_recreate_list_output(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('file1', size=0)
- self.create_regular_file('file2', size=0)
- self.create_regular_file('file3', size=0)
- self.create_regular_file('file4', size=0)
- self.create_regular_file('file5', size=0)
- self.cmd('create', self.repository_location + '::test', 'input')
- output = self.cmd('recreate', '--list', '--info', self.repository_location + '::test', '-e', 'input/file2')
- self.check_cache()
- self.assert_in("input/file1", output)
- self.assert_in("x input/file2", output)
- output = self.cmd('recreate', '--list', self.repository_location + '::test', '-e', 'input/file3')
- self.check_cache()
- self.assert_in("input/file1", output)
- self.assert_in("x input/file3", output)
- output = self.cmd('recreate', self.repository_location + '::test', '-e', 'input/file4')
- self.check_cache()
- self.assert_not_in("input/file1", output)
- self.assert_not_in("x input/file4", output)
- output = self.cmd('recreate', '--info', self.repository_location + '::test', '-e', 'input/file5')
- self.check_cache()
- self.assert_not_in("input/file1", output)
- self.assert_not_in("x input/file5", output)
- def test_bad_filters(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- self.cmd('delete', '--first', '1', '--last', '1', self.repository_location, fork=True, exit_code=2)
- def test_key_export_keyfile(self):
- export_file = self.output_path + '/exported'
- self.cmd('init', self.repository_location, '--encryption', 'keyfile')
- repo_id = self._extract_repository_id(self.repository_path)
- self.cmd('key', 'export', self.repository_location, export_file)
- with open(export_file) as fd:
- export_contents = fd.read()
- assert export_contents.startswith('BORG_KEY ' + bin_to_hex(repo_id) + '\n')
- key_file = self.keys_path + '/' + os.listdir(self.keys_path)[0]
- with open(key_file) as fd:
- key_contents = fd.read()
- assert key_contents == export_contents
- os.unlink(key_file)
- self.cmd('key', 'import', self.repository_location, export_file)
- with open(key_file) as fd:
- key_contents2 = fd.read()
- assert key_contents2 == key_contents
- def test_key_import_keyfile_with_borg_key_file(self):
- self.cmd('init', self.repository_location, '--encryption', 'keyfile')
- exported_key_file = os.path.join(self.output_path, 'exported')
- self.cmd('key', 'export', self.repository_location, exported_key_file)
- key_file = os.path.join(self.keys_path, os.listdir(self.keys_path)[0])
- with open(key_file) as fd:
- key_contents = fd.read()
- os.unlink(key_file)
- imported_key_file = os.path.join(self.output_path, 'imported')
- with environment_variable(BORG_KEY_FILE=imported_key_file):
- self.cmd('key', 'import', self.repository_location, exported_key_file)
- assert not os.path.isfile(key_file), '"borg key import" should respect BORG_KEY_FILE'
- with open(imported_key_file) as fd:
- imported_key_contents = fd.read()
- assert imported_key_contents == key_contents
- def test_key_export_repokey(self):
- export_file = self.output_path + '/exported'
- self.cmd('init', self.repository_location, '--encryption', 'repokey')
- repo_id = self._extract_repository_id(self.repository_path)
- self.cmd('key', 'export', self.repository_location, export_file)
- with open(export_file) as fd:
- export_contents = fd.read()
- assert export_contents.startswith('BORG_KEY ' + bin_to_hex(repo_id) + '\n')
- with Repository(self.repository_path) as repository:
- repo_key = RepoKey(repository)
- repo_key.load(None, Passphrase.env_passphrase())
- backup_key = KeyfileKey(key.TestKey.MockRepository())
- backup_key.load(export_file, Passphrase.env_passphrase())
- assert repo_key.enc_key == backup_key.enc_key
- with Repository(self.repository_path) as repository:
- repository.save_key(b'')
- self.cmd('key', 'import', self.repository_location, export_file)
- with Repository(self.repository_path) as repository:
- repo_key2 = RepoKey(repository)
- repo_key2.load(None, Passphrase.env_passphrase())
- assert repo_key2.enc_key == repo_key2.enc_key
- def test_key_export_qr(self):
- export_file = self.output_path + '/exported.html'
- self.cmd('init', self.repository_location, '--encryption', 'repokey')
- repo_id = self._extract_repository_id(self.repository_path)
- self.cmd('key', 'export', '--qr-html', self.repository_location, export_file)
- with open(export_file, encoding='utf-8') as fd:
- export_contents = fd.read()
- assert bin_to_hex(repo_id) in export_contents
- assert export_contents.startswith('<!doctype html>')
- assert export_contents.endswith('</html>\n')
- def test_key_export_directory(self):
- export_directory = self.output_path + '/exported'
- os.mkdir(export_directory)
- self.cmd('init', self.repository_location, '--encryption', 'repokey')
- if self.FORK_DEFAULT:
- self.cmd('key', 'export', self.repository_location, export_directory, exit_code=EXIT_ERROR)
- else:
- with pytest.raises(CommandError):
- self.cmd('key', 'export', self.repository_location, export_directory)
- def test_key_import_errors(self):
- export_file = self.output_path + '/exported'
- self.cmd('init', self.repository_location, '--encryption', 'keyfile')
- if self.FORK_DEFAULT:
- self.cmd('key', 'import', self.repository_location, export_file, exit_code=EXIT_ERROR)
- else:
- with pytest.raises(CommandError):
- self.cmd('key', 'import', self.repository_location, export_file)
- with open(export_file, 'w') as fd:
- fd.write('something not a key\n')
- if self.FORK_DEFAULT:
- self.cmd('key', 'import', self.repository_location, export_file, exit_code=2)
- else:
- with pytest.raises(NotABorgKeyFile):
- self.cmd('key', 'import', self.repository_location, export_file)
- with open(export_file, 'w') as fd:
- fd.write('BORG_KEY a0a0a0\n')
- if self.FORK_DEFAULT:
- self.cmd('key', 'import', self.repository_location, export_file, exit_code=2)
- else:
- with pytest.raises(RepoIdMismatch):
- self.cmd('key', 'import', self.repository_location, export_file)
- def test_key_export_paperkey(self):
- repo_id = 'e294423506da4e1ea76e8dcdf1a3919624ae3ae496fddf905610c351d3f09239'
- export_file = self.output_path + '/exported'
- self.cmd('init', self.repository_location, '--encryption', 'keyfile')
- self._set_repository_id(self.repository_path, hex_to_bin(repo_id))
- key_file = self.keys_path + '/' + os.listdir(self.keys_path)[0]
- with open(key_file, 'w') as fd:
- fd.write(KeyfileKey.FILE_ID + ' ' + repo_id + '\n')
- fd.write(binascii.b2a_base64(b'abcdefghijklmnopqrstu').decode())
- self.cmd('key', 'export', '--paper', self.repository_location, export_file)
- with open(export_file) as fd:
- export_contents = fd.read()
- assert export_contents == """To restore key use borg key import --paper /path/to/repo
- BORG PAPER KEY v1
- id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02
- 1: 616263 646566 676869 6a6b6c 6d6e6f 707172 - 6d
- 2: 737475 - 88
- """
- def test_key_import_paperkey(self):
- repo_id = 'e294423506da4e1ea76e8dcdf1a3919624ae3ae496fddf905610c351d3f09239'
- self.cmd('init', self.repository_location, '--encryption', 'keyfile')
- self._set_repository_id(self.repository_path, hex_to_bin(repo_id))
- key_file = self.keys_path + '/' + os.listdir(self.keys_path)[0]
- with open(key_file, 'w') as fd:
- fd.write(KeyfileKey.FILE_ID + ' ' + repo_id + '\n')
- fd.write(binascii.b2a_base64(b'abcdefghijklmnopqrstu').decode())
- typed_input = (
- b'2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 02\n' # Forgot to type "-"
- b'2 / e29442 3506da 4e1ea7 25f62a 5a3d41 - 02\n' # Forgot to type second "/"
- b'2 / e29442 3506da 4e1ea7 / 25f62a 5a3d42 - 02\n' # Typo (..42 not ..41)
- b'2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02\n' # Correct! Congratulations
- b'616263 646566 676869 6a6b6c 6d6e6f 707172 - 6d\n'
- b'\n\n' # Abort [yN] => N
- b'737475 88\n' # missing "-"
- b'73747i - 88\n' # typo
- b'73747 - 88\n' # missing nibble
- b'73 74 75 - 89\n' # line checksum mismatch
- b'00a1 - 88\n' # line hash collision - overall hash mismatch, have to start over
- b'2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02\n'
- b'616263 646566 676869 6a6b6c 6d6e6f 707172 - 6d\n'
- b'73 74 75 - 88\n'
- )
- # In case that this has to change, here is a quick way to find a colliding line hash:
- #
- # from hashlib import sha256
- # hash_fn = lambda x: sha256(b'\x00\x02' + x).hexdigest()[:2]
- # for i in range(1000):
- # if hash_fn(i.to_bytes(2, byteorder='big')) == '88': # 88 = line hash
- # print(i.to_bytes(2, 'big'))
- # break
- self.cmd('key', 'import', '--paper', self.repository_location, input=typed_input)
- # Test abort paths
- typed_input = b'\ny\n'
- self.cmd('key', 'import', '--paper', self.repository_location, input=typed_input)
- typed_input = b'2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02\n\ny\n'
- self.cmd('key', 'import', '--paper', self.repository_location, input=typed_input)
- def test_debug_dump_manifest(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- dump_file = self.output_path + '/dump'
- output = self.cmd('debug', 'dump-manifest', self.repository_location, dump_file)
- assert output == ""
- with open(dump_file) as f:
- result = json.load(f)
- assert 'archives' in result
- assert 'config' in result
- assert 'item_keys' in result
- assert 'timestamp' in result
- assert 'version' in result
- def test_debug_dump_archive(self):
- self.create_regular_file('file1', size=1024 * 80)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- dump_file = self.output_path + '/dump'
- output = self.cmd('debug', 'dump-archive', self.repository_location + "::test", dump_file)
- assert output == ""
- with open(dump_file) as f:
- result = json.load(f)
- assert '_name' in result
- assert '_manifest_entry' in result
- assert '_meta' in result
- assert '_items' in result
- def test_debug_refcount_obj(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- output = self.cmd('debug', 'refcount-obj', self.repository_location, '0' * 64).strip()
- assert output == 'object 0000000000000000000000000000000000000000000000000000000000000000 not found [info from chunks cache].'
- create_json = json.loads(self.cmd('create', '--json', self.repository_location + '::test', 'input'))
- archive_id = create_json['archive']['id']
- output = self.cmd('debug', 'refcount-obj', self.repository_location, archive_id).strip()
- assert output == 'object ' + archive_id + ' has 1 referrers [info from chunks cache].'
- # Invalid IDs do not abort or return an error
- output = self.cmd('debug', 'refcount-obj', self.repository_location, '124', 'xyza').strip()
- assert output == 'object id 124 is invalid.\nobject id xyza is invalid.'
- def test_debug_info(self):
- output = self.cmd('debug', 'info')
- assert 'CRC implementation' in output
- assert 'Python' in output
- def test_benchmark_crud(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- with environment_variable(_BORG_BENCHMARK_CRUD_TEST='YES'):
- self.cmd('benchmark', 'crud', self.repository_location, self.input_path)
- def test_config(self):
- self.create_test_files()
- os.unlink('input/flagfile')
- self.cmd('init', '--encryption=repokey', self.repository_location)
- output = self.cmd('config', '--list', self.repository_location)
- self.assert_in('[repository]', output)
- self.assert_in('version', output)
- self.assert_in('segments_per_dir', output)
- self.assert_in('storage_quota', output)
- self.assert_in('append_only', output)
- self.assert_in('additional_free_space', output)
- self.assert_in('id', output)
- self.assert_not_in('last_segment_checked', output)
- if self.FORK_DEFAULT:
- output = self.cmd('config', self.repository_location, 'last_segment_checked', exit_code=2)
- self.assert_in('No option ', output)
- else:
- with pytest.raises(Error):
- self.cmd('config', self.repository_location, 'last_segment_checked')
- self.cmd('config', self.repository_location, 'last_segment_checked', '123')
- output = self.cmd('config', self.repository_location, 'last_segment_checked')
- assert output == '123' + '\n'
- output = self.cmd('config', '--list', self.repository_location)
- self.assert_in('last_segment_checked', output)
- self.cmd('config', '--delete', self.repository_location, 'last_segment_checked')
- for cfg_key, cfg_value in [
- ('additional_free_space', '2G'),
- ('repository.append_only', '1'),
- ]:
- output = self.cmd('config', self.repository_location, cfg_key)
- assert output == '0' + '\n'
- self.cmd('config', self.repository_location, cfg_key, cfg_value)
- output = self.cmd('config', self.repository_location, cfg_key)
- assert output == cfg_value + '\n'
- self.cmd('config', '--delete', self.repository_location, cfg_key)
- if self.FORK_DEFAULT:
- self.cmd('config', self.repository_location, cfg_key, exit_code=2)
- else:
- with pytest.raises(Error):
- self.cmd('config', self.repository_location, cfg_key)
- self.cmd('config', '--list', '--delete', self.repository_location, exit_code=2)
- if self.FORK_DEFAULT:
- self.cmd('config', self.repository_location, exit_code=2)
- else:
- with pytest.raises(CommandError):
- self.cmd('config', self.repository_location)
- if self.FORK_DEFAULT:
- self.cmd('config', self.repository_location, 'invalid-option', exit_code=2)
- else:
- with pytest.raises(Error):
- self.cmd('config', self.repository_location, 'invalid-option')
- requires_gnutar = pytest.mark.skipif(not have_gnutar(), reason='GNU tar must be installed for this test.')
- requires_gzip = pytest.mark.skipif(not shutil.which('gzip'), reason='gzip must be installed for this test.')
- @requires_gnutar
- def test_export_tar(self):
- self.create_test_files()
- os.unlink('input/flagfile')
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- self.cmd('export-tar', self.repository_location + '::test', 'simple.tar', '--progress')
- with changedir('output'):
- # This probably assumes GNU tar. Note -p switch to extract permissions regardless of umask.
- subprocess.check_call(['tar', 'xpf', '../simple.tar', '--warning=no-timestamp'])
- self.assert_dirs_equal('input', 'output/input', ignore_flags=True, ignore_xattrs=True, ignore_ns=True)
- @requires_gnutar
- @requires_gzip
- def test_export_tar_gz(self):
- if not shutil.which('gzip'):
- pytest.skip('gzip is not installed')
- self.create_test_files()
- os.unlink('input/flagfile')
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- list = self.cmd('export-tar', self.repository_location + '::test', 'simple.tar.gz', '--list')
- assert 'input/file1\n' in list
- assert 'input/dir2\n' in list
- with changedir('output'):
- subprocess.check_call(['tar', 'xpf', '../simple.tar.gz', '--warning=no-timestamp'])
- self.assert_dirs_equal('input', 'output/input', ignore_flags=True, ignore_xattrs=True, ignore_ns=True)
- @requires_gnutar
- def test_export_tar_strip_components(self):
- if not shutil.which('gzip'):
- pytest.skip('gzip is not installed')
- self.create_test_files()
- os.unlink('input/flagfile')
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- list = self.cmd('export-tar', self.repository_location + '::test', 'simple.tar', '--strip-components=1', '--list')
- # --list's path are those before processing with --strip-components
- assert 'input/file1\n' in list
- assert 'input/dir2\n' in list
- with changedir('output'):
- subprocess.check_call(['tar', 'xpf', '../simple.tar', '--warning=no-timestamp'])
- self.assert_dirs_equal('input', 'output/', ignore_flags=True, ignore_xattrs=True, ignore_ns=True)
- @requires_hardlinks
- @requires_gnutar
- def test_export_tar_strip_components_links(self):
- self._extract_hardlinks_setup()
- self.cmd('export-tar', self.repository_location + '::test', 'output.tar', '--strip-components=2')
- with changedir('output'):
- subprocess.check_call(['tar', 'xpf', '../output.tar', '--warning=no-timestamp'])
- assert os.stat('hardlink').st_nlink == 2
- assert os.stat('subdir/hardlink').st_nlink == 2
- assert os.stat('aaaa').st_nlink == 2
- assert os.stat('source2').st_nlink == 2
- @requires_hardlinks
- @requires_gnutar
- def test_extract_hardlinks_tar(self):
- self._extract_hardlinks_setup()
- self.cmd('export-tar', self.repository_location + '::test', 'output.tar', 'input/dir1')
- with changedir('output'):
- subprocess.check_call(['tar', 'xpf', '../output.tar', '--warning=no-timestamp'])
- assert os.stat('input/dir1/hardlink').st_nlink == 2
- assert os.stat('input/dir1/subdir/hardlink').st_nlink == 2
- assert os.stat('input/dir1/aaaa').st_nlink == 2
- assert os.stat('input/dir1/source2').st_nlink == 2
- def test_import_tar(self):
- self.create_test_files()
- os.unlink('input/flagfile')
- self.cmd('init', '--encryption=none', self.repository_location)
- self.cmd('create', self.repository_location + '::src', 'input')
- self.cmd('export-tar', self.repository_location + '::src', 'simple.tar')
- self.cmd('import-tar', self.repository_location + '::dst', 'simple.tar')
- with changedir(self.output_path):
- self.cmd('extract', self.repository_location + '::dst')
- self.assert_dirs_equal('input', 'output/input', ignore_ns=True, ignore_xattrs=True)
- @requires_gzip
- def test_import_tar_gz(self):
- if not shutil.which('gzip'):
- pytest.skip('gzip is not installed')
- self.create_test_files()
- os.unlink('input/flagfile')
- self.cmd('init', '--encryption=none', self.repository_location)
- self.cmd('create', self.repository_location + '::src', 'input')
- self.cmd('export-tar', self.repository_location + '::src', 'simple.tgz')
- self.cmd('import-tar', self.repository_location + '::dst', 'simple.tgz')
- with changedir(self.output_path):
- self.cmd('extract', self.repository_location + '::dst')
- self.assert_dirs_equal('input', 'output/input', ignore_ns=True, ignore_xattrs=True)
- @requires_gnutar
- def test_import_concatenated_tar_with_ignore_zeros(self):
- # file1 has a hardlink reference to it, but we put it in a separate
- # tarball, breaking the link during import-tar. It could be any other
- # file though, so we won't take chances and just avoid hardlinks.
- self.create_test_files(create_hardlinks=False)
- os.unlink('input/flagfile')
- with changedir('input'):
- subprocess.check_call(['tar', 'cf', 'file1.tar', 'file1'])
- subprocess.check_call(['tar', 'cf', 'the_rest.tar', '--exclude', 'file1*', '.'])
- with open('concatenated.tar', 'wb') as concatenated:
- with open('file1.tar', 'rb') as file1:
- concatenated.write(file1.read())
- # Clean up for assert_dirs_equal.
- os.unlink('file1.tar')
- with open('the_rest.tar', 'rb') as the_rest:
- concatenated.write(the_rest.read())
- # Clean up for assert_dirs_equal.
- os.unlink('the_rest.tar')
- self.cmd('init', '--encryption=none', self.repository_location)
- self.cmd('import-tar', '--ignore-zeros', self.repository_location + '::dst', 'input/concatenated.tar')
- os.unlink('input/concatenated.tar')
- with changedir(self.output_path):
- self.cmd('extract', self.repository_location + '::dst')
- self.assert_dirs_equal('input', 'output', ignore_ns=True, ignore_xattrs=True)
- @requires_gnutar
- def test_import_concatenated_tar_without_ignore_zeros(self):
- self.create_test_files()
- os.unlink('input/flagfile')
- with changedir('input'):
- subprocess.check_call(['tar', 'cf', 'file1.tar', 'file1'])
- subprocess.check_call(['tar', 'cf', 'the_rest.tar', '--exclude', 'file1*', '.'])
- with open('concatenated.tar', 'wb') as concatenated:
- with open('file1.tar', 'rb') as file1:
- concatenated.write(file1.read())
- with open('the_rest.tar', 'rb') as the_rest:
- concatenated.write(the_rest.read())
- self.cmd('init', '--encryption=none', self.repository_location)
- self.cmd('import-tar', self.repository_location + '::dst', 'input/concatenated.tar')
- with changedir(self.output_path):
- self.cmd('extract', self.repository_location + '::dst')
- # Negative test -- assert that only file1 has been extracted, and the_rest has been ignored
- # due to zero-filled block marker.
- self.assert_equal(os.listdir('output'), ['file1'])
- @requires_gnutar
- def test_import_tar_with_dotslash_paths(self):
- """Test that paths starting with './' are normalized during import-tar."""
- # Create a simple directory structure
- os.makedirs('input/dir', exist_ok=True)
- self.create_regular_file('dir/file')
- # Create a tar file with paths starting with './'
- with changedir('input'):
- # Directly use a path that starts with './'
- subprocess.check_call(['tar', 'cf', 'dotslash.tar', './dir'])
- # Verify the tar file contains paths with './' prefix
- tar_content = subprocess.check_output(['tar', 'tf', 'dotslash.tar']).decode()
- assert './dir' in tar_content
- assert './dir/file' in tar_content
- # Import the tar file into a Borg repository
- self.cmd('init', '--encryption=none', self.repository_location)
- self.cmd('import-tar', self.repository_location + '::dotslash', 'input/dotslash.tar')
- # List the archive contents and verify no paths start with './'
- output = self.cmd('list', '--format={path}{NL}', self.repository_location + '::dotslash')
- assert './dir' not in output
- assert 'dir' in output
- assert 'dir/file' in output
- def test_detect_attic_repo(self):
- path = make_attic_repo(self.repository_path)
- cmds = [
- ['create', path + '::test', self.tmpdir],
- ['extract', path + '::test'],
- ['check', path],
- ['rename', path + '::test', 'newname'],
- ['list', path],
- ['delete', path],
- ['prune', path],
- ['info', path + '::test'],
- ['key', 'export', path, 'exported'],
- ['key', 'import', path, 'import'],
- ['key', 'change-passphrase', path],
- ['break-lock', path],
- ]
- for args in cmds:
- output = self.cmd(*args, fork=True, exit_code=2)
- assert 'Attic repository detected.' in output
- # derived from test_extract_xattrs_errors()
- @pytest.mark.skipif(not xattr.XATTR_FAKEROOT, reason='xattr not supported on this system or on this version of '
- 'fakeroot')
- def test_do_not_fail_when_percent_is_in_xattr_name(self):
- """https://github.com/borgbackup/borg/issues/6063"""
- def patched_setxattr_EACCES(*args, **kwargs):
- raise OSError(errno.EACCES, 'EACCES')
- self.create_regular_file('file')
- xattr.setxattr(b'input/file', b'user.attribute%p', b'value')
- self.cmd('init', self.repository_location, '-e' 'none')
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- with patch.object(xattr, 'setxattr', patched_setxattr_EACCES):
- self.cmd('extract', self.repository_location + '::test', exit_code=EXIT_WARNING)
- # derived from test_extract_xattrs_errors()
- @pytest.mark.skipif(not xattr.XATTR_FAKEROOT, reason='xattr not supported on this system or on this version of '
- 'fakeroot')
- def test_do_not_fail_when_percent_is_in_file_name(self):
- """https://github.com/borgbackup/borg/issues/6063"""
- def patched_setxattr_EACCES(*args, **kwargs):
- raise OSError(errno.EACCES, 'EACCES')
- os.makedirs(os.path.join(self.input_path, 'dir%p'))
- xattr.setxattr(b'input/dir%p', b'user.attribute', b'value')
- self.cmd('init', self.repository_location, '-e' 'none')
- self.cmd('create', self.repository_location + '::test', 'input')
- with changedir('output'):
- with patch.object(xattr, 'setxattr', patched_setxattr_EACCES):
- self.cmd('extract', self.repository_location + '::test', exit_code=EXIT_WARNING)
- def test_do_not_mention_archive_if_you_can_not_find_repo(self):
- """https://github.com/borgbackup/borg/issues/6014"""
- archive = self.repository_location + '-this-repository-does-not-exist' + '::test'
- output = self.cmd('info', archive, exit_code=2, fork=True)
- self.assert_in('this-repository-does-not-exist', output)
- self.assert_not_in('this-repository-does-not-exist::test', output)
- def test_can_read_repo_even_if_nonce_is_deleted(self):
- """Nonce is only used for encrypting new data.
- It should be possible to retrieve the data from an archive even if
- both the client and the server forget the nonce"""
- self.create_regular_file('file1', contents=b'Hello, borg')
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- # Oops! We have removed the repo-side memory of the nonce!
- # See https://github.com/borgbackup/borg/issues/5858
- os.remove(os.path.join(self.repository_path, 'nonce'))
- # Oops! The client has lost the nonce too!
- os.remove(os.path.join(self.get_security_dir(), 'nonce'))
- # The repo should still be readable
- repo_info = self.cmd('info', self.repository_location)
- assert 'All archives:' in repo_info
- repo_list = self.cmd('list', self.repository_location)
- assert 'test' in repo_list
- # The archive should still be readable
- archive_info = self.cmd('info', self.repository_location + '::test')
- assert 'Archive name: test\n' in archive_info
- archive_list = self.cmd('list', self.repository_location + '::test')
- assert 'file1' in archive_list
- # Extracting the archive should work
- with changedir('output'):
- self.cmd('extract', self.repository_location + '::test')
- self.assert_dirs_equal('input', 'output/input')
- def test_recovery_from_deleted_repo_nonce(self):
- """We should be able to recover if path/to/repo/nonce is deleted.
- The nonce is stored in two places: in the repo and in $HOME.
- The nonce in the repo is only needed when multiple clients use the same
- repo. Otherwise we can just use our own copy of the nonce.
- """
- self.create_regular_file('file1', contents=b'Hello, borg')
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test', 'input')
- # Oops! We have removed the repo-side memory of the nonce!
- # See https://github.com/borgbackup/borg/issues/5858
- nonce = os.path.join(self.repository_path, 'nonce')
- os.remove(nonce)
- self.cmd('create', self.repository_location + '::test2', 'input')
- assert os.path.exists(nonce)
- def test_exit_codes(self):
- # we create the repo path, but do NOT initialize the borg repo,
- # so the borg create commands are expected to fail with InvalidRepository.
- os.makedirs(self.repository_path, exist_ok=True)
- with environment_variable(BORG_EXIT_CODES='classic'):
- self.cmd('create', self.repository_location + '::archive', 'input', fork=True,
- exit_code=EXIT_ERROR)
- with environment_variable(BORG_EXIT_CODES='modern'):
- self.cmd('create', self.repository_location + '::archive', 'input', fork=True,
- exit_code=Repository.InvalidRepository.exit_mcode)
- def test_original_size_stable_across_recreate(self):
- # Test that changes in archive metadata (like number of chunks) do not influence the original size.
- self.cmd('init', '--encryption=repokey', self.repository_location)
- def original_size(archive_name):
- info = json.loads(self.cmd('info', '--json', f"{self.repository_location}::{archive_name}"))
- return info['archives'][0]['stats']['original_size']
- sizes = [12345, 67890]
- self.create_regular_file('file1', size=sizes[0])
- self.create_regular_file('file2', size=sizes[1])
- self.cmd('create', '--compression=none', self.repository_location + '::archive', 'input')
- assert original_size('archive') == sum(sizes)
- # Recreate with different chunker params to try to reproduce #8898.
- self.cmd('recreate', '--chunker-params=10,12,11,63', self.repository_location + '::archive')
- assert original_size('archive') == sum(sizes)
- @unittest.skipUnless('binary' in BORG_EXES, 'no borg.exe available')
- class ArchiverTestCaseBinary(ArchiverTestCase):
- EXE = 'borg.exe'
- FORK_DEFAULT = True
- @unittest.skip('does not raise Exception, but sets rc==2')
- def test_init_parent_dirs(self):
- pass
- @unittest.skip('patches objects')
- def test_init_interrupt(self):
- pass
- @unittest.skip('patches objects')
- def test_extract_capabilities(self):
- pass
- @unittest.skip('patches objects')
- def test_extract_xattrs_errors(self):
- pass
- @unittest.skip('test_basic_functionality seems incompatible with fakeroot and/or the binary.')
- def test_basic_functionality(self):
- pass
- @unittest.skip('test_overwrite seems incompatible with fakeroot and/or the binary.')
- def test_overwrite(self):
- pass
- def test_fuse(self):
- if fakeroot_detected():
- unittest.skip('test_fuse with the binary is not compatible with fakeroot')
- else:
- super().test_fuse()
- @unittest.skip('patches objects')
- def test_do_not_fail_when_percent_is_in_xattr_name(self):
- pass
- @unittest.skip('patches objects')
- def test_do_not_fail_when_percent_is_in_file_name(self):
- pass
- class ArchiverCheckTestCase(ArchiverTestCaseBase):
- def setUp(self):
- super().setUp()
- with patch.object(ChunkBuffer, 'BUFFER_SIZE', 10):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('archive1')
- self.create_src_archive('archive2')
- def test_check_usage(self):
- output = self.cmd('check', '-v', '--progress', self.repository_location, exit_code=0)
- self.assert_in('Starting repository check', output)
- self.assert_in('Starting archive consistency check', output)
- self.assert_in('Checking segments', output)
- # reset logging to new process default to avoid need for fork=True on next check
- logging.getLogger('borg.output.progress').setLevel(logging.NOTSET)
- output = self.cmd('check', '-v', '--repository-only', self.repository_location, exit_code=0)
- self.assert_in('Starting repository check', output)
- self.assert_not_in('Starting archive consistency check', output)
- self.assert_not_in('Checking segments', output)
- output = self.cmd('check', '-v', '--archives-only', self.repository_location, exit_code=0)
- self.assert_not_in('Starting repository check', output)
- self.assert_in('Starting archive consistency check', output)
- output = self.cmd('check', '-v', '--archives-only', '--glob-archives=archive2', self.repository_location, exit_code=0)
- self.assert_not_in('archive1', output)
- output = self.cmd('check', '-v', '--archives-only', '--first=1', self.repository_location, exit_code=0)
- self.assert_in('archive1', output)
- self.assert_not_in('archive2', output)
- output = self.cmd('check', '-v', '--archives-only', '--last=1', self.repository_location, exit_code=0)
- self.assert_not_in('archive1', output)
- self.assert_in('archive2', output)
- def test_missing_file_chunk(self):
- archive, repository = self.open_archive('archive1')
- with repository:
- for item in archive.iter_items():
- if item.path.endswith('testsuite/archiver.py'):
- valid_chunks = item.chunks
- killed_chunk = valid_chunks[-1]
- repository.delete(killed_chunk.id)
- break
- else:
- self.fail('should not happen')
- repository.commit(compact=False)
- self.cmd('check', self.repository_location, exit_code=1)
- output = self.cmd('check', '--repair', self.repository_location, exit_code=0)
- self.assert_in('New missing file chunk detected', output)
- self.cmd('check', self.repository_location, exit_code=0)
- output = self.cmd('list', '--format={health}#{path}{LF}', self.repository_location + '::archive1', exit_code=0)
- self.assert_in('broken#', output)
- # check that the file in the old archives has now a different chunk list without the killed chunk.
- # also check that the correct original chunks list is preserved in item.chunks_healthy.
- for archive_name in ('archive1', 'archive2'):
- archive, repository = self.open_archive(archive_name)
- with repository:
- for item in archive.iter_items():
- if item.path.endswith('testsuite/archiver.py'):
- self.assert_equal(len(valid_chunks), len(item.chunks))
- self.assert_not_in(killed_chunk, item.chunks)
- self.assert_not_equal(valid_chunks, item.chunks)
- self.assert_in('chunks_healthy', item)
- self.assert_equal(len(valid_chunks), len(item.chunks_healthy))
- self.assert_in(killed_chunk, item.chunks_healthy)
- self.assert_equal(valid_chunks, item.chunks_healthy)
- break
- else:
- self.fail('should not happen')
- # do a fresh backup (that will include the killed chunk)
- with patch.object(ChunkBuffer, 'BUFFER_SIZE', 10):
- self.create_src_archive('archive3')
- # check should be able to heal the file now:
- output = self.cmd('check', '-v', '--repair', self.repository_location, exit_code=0)
- self.assert_in('Healed previously missing file chunk', output)
- self.assert_in('testsuite/archiver.py: Completely healed previously damaged file!', output)
- # check that the file in the old archives has the correct chunks again.
- # also check that chunks_healthy list is removed as it is not needed any more.
- for archive_name in ('archive1', 'archive2'):
- archive, repository = self.open_archive(archive_name)
- with repository:
- for item in archive.iter_items():
- if item.path.endswith('testsuite/archiver.py'):
- self.assert_equal(valid_chunks, item.chunks)
- self.assert_not_in('chunks_healthy', item)
- break
- else:
- self.fail('should not happen')
- # list is also all-healthy again
- output = self.cmd('list', '--format={health}#{path}{LF}', self.repository_location + '::archive1', exit_code=0)
- self.assert_not_in('broken#', output)
- # check should be fine now (and not show it has healed anything).
- output = self.cmd('check', '-v', '--repair', self.repository_location, exit_code=0)
- self.assert_not_in('Healed previously missing file chunk', output)
- self.assert_not_in('testsuite/archiver.py: Completely healed previously damaged file!', output)
- def test_missing_archive_item_chunk(self):
- archive, repository = self.open_archive('archive1')
- with repository:
- repository.delete(archive.metadata.items[0])
- repository.commit(compact=False)
- self.cmd('check', self.repository_location, exit_code=1)
- self.cmd('check', '--repair', self.repository_location, exit_code=0)
- self.cmd('check', self.repository_location, exit_code=0)
- def test_missing_archive_metadata(self):
- archive, repository = self.open_archive('archive1')
- with repository:
- repository.delete(archive.id)
- repository.commit(compact=False)
- self.cmd('check', self.repository_location, exit_code=1)
- self.cmd('check', '--repair', self.repository_location, exit_code=0)
- self.cmd('check', self.repository_location, exit_code=0)
- def test_missing_manifest(self):
- archive, repository = self.open_archive('archive1')
- with repository:
- repository.delete(Manifest.MANIFEST_ID)
- repository.commit(compact=False)
- self.cmd('check', self.repository_location, exit_code=1)
- output = self.cmd('check', '-v', '--repair', self.repository_location, exit_code=0)
- self.assert_in('archive1', output)
- self.assert_in('archive2', output)
- self.cmd('check', self.repository_location, exit_code=0)
- def test_corrupted_manifest(self):
- archive, repository = self.open_archive('archive1')
- with repository:
- manifest = repository.get(Manifest.MANIFEST_ID)
- corrupted_manifest = manifest + b'corrupted!'
- repository.put(Manifest.MANIFEST_ID, corrupted_manifest)
- repository.commit(compact=False)
- self.cmd('check', self.repository_location, exit_code=1)
- output = self.cmd('check', '-v', '--repair', self.repository_location, exit_code=0)
- self.assert_in('archive1', output)
- self.assert_in('archive2', output)
- self.cmd('check', self.repository_location, exit_code=0)
- def test_manifest_rebuild_corrupted_chunk(self):
- archive, repository = self.open_archive('archive1')
- with repository:
- manifest = repository.get(Manifest.MANIFEST_ID)
- corrupted_manifest = manifest + b'corrupted!'
- repository.put(Manifest.MANIFEST_ID, corrupted_manifest)
- chunk = repository.get(archive.id)
- corrupted_chunk = chunk + b'corrupted!'
- repository.put(archive.id, corrupted_chunk)
- repository.commit(compact=False)
- self.cmd('check', self.repository_location, exit_code=1)
- output = self.cmd('check', '-v', '--repair', self.repository_location, exit_code=0)
- self.assert_in('archive2', output)
- self.cmd('check', self.repository_location, exit_code=0)
- def test_manifest_rebuild_duplicate_archive(self):
- archive, repository = self.open_archive('archive1')
- key = archive.key
- with repository:
- manifest = repository.get(Manifest.MANIFEST_ID)
- corrupted_manifest = manifest + b'corrupted!'
- repository.put(Manifest.MANIFEST_ID, corrupted_manifest)
- archive_dict = {
- 'cmdline': [],
- 'items': [],
- 'hostname': 'foo',
- 'username': 'bar',
- 'name': 'archive1',
- 'time': '2016-12-15T18:49:51.849711',
- 'version': 1,
- }
- archive = key.pack_and_authenticate_metadata(archive_dict, context=b'archive')
- archive_id = key.id_hash(archive)
- repository.put(archive_id, key.encrypt(archive))
- repository.commit(compact=False)
- self.cmd('check', self.repository_location, exit_code=1)
- self.cmd('check', '--repair', self.repository_location, exit_code=0)
- output = self.cmd('list', self.repository_location)
- self.assert_in('archive1', output)
- self.assert_in('archive1.1', output)
- self.assert_in('archive2', output)
- def test_extra_chunks(self):
- self.cmd('check', self.repository_location, exit_code=0)
- with Repository(self.repository_location, exclusive=True) as repository:
- repository.put(b'01234567890123456789012345678901', b'xxxx')
- repository.commit(compact=False)
- self.cmd('check', self.repository_location, exit_code=1)
- self.cmd('check', self.repository_location, exit_code=1)
- self.cmd('check', '--repair', self.repository_location, exit_code=0)
- self.cmd('check', self.repository_location, exit_code=0)
- self.cmd('extract', '--dry-run', self.repository_location + '::archive1', exit_code=0)
- def _test_verify_data(self, *init_args):
- shutil.rmtree(self.repository_path)
- self.cmd('init', self.repository_location, *init_args)
- self.create_src_archive('archive1')
- archive, repository = self.open_archive('archive1')
- with repository:
- for item in archive.iter_items():
- if item.path.endswith('testsuite/archiver.py'):
- chunk = item.chunks[-1]
- data = repository.get(chunk.id) + b'1234'
- repository.put(chunk.id, data)
- break
- repository.commit(compact=False)
- self.cmd('check', self.repository_location, exit_code=0)
- output = self.cmd('check', '--verify-data', self.repository_location, exit_code=1)
- assert bin_to_hex(chunk.id) + ', integrity error' in output
- # repair (heal is tested in another test)
- output = self.cmd('check', '--repair', '--verify-data', self.repository_location, exit_code=0)
- assert bin_to_hex(chunk.id) + ', integrity error' in output
- assert 'testsuite/archiver.py: New missing file chunk detected' in output
- def test_verify_data(self):
- self._test_verify_data('--encryption', 'repokey')
- def test_verify_data_unencrypted(self):
- self._test_verify_data('--encryption', 'none')
- def test_empty_repository(self):
- with Repository(self.repository_location, exclusive=True) as repository:
- for id_ in repository.list():
- repository.delete(id_)
- repository.commit(compact=False)
- self.cmd('check', self.repository_location, exit_code=1)
- def test_attic013_acl_bug(self):
- # Attic up to release 0.13 contained a bug where every item unintentionally received
- # a b'acl'=None key-value pair.
- # This bug can still live on in Borg repositories (through borg upgrade).
- class Attic013Item:
- def as_dict(self):
- return {
- # These are required
- b'path': '1234',
- b'mtime': 0,
- b'mode': 0,
- b'user': b'0',
- b'group': b'0',
- b'uid': 0,
- b'gid': 0,
- # acl is the offending key.
- b'acl': None,
- }
- archive, repository = self.open_archive('archive1')
- with repository:
- manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- with Cache(repository, key, manifest) as cache:
- archive = Archive(repository, key, manifest, '0.13', cache=cache, create=True)
- archive.items_buffer.add(Attic013Item())
- archive.save()
- self.cmd('check', self.repository_location, exit_code=0)
- self.cmd('list', self.repository_location + '::0.13', exit_code=0)
- class ManifestAuthenticationTest(ArchiverTestCaseBase):
- def spoof_manifest(self, repository):
- with repository:
- _, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- repository.put(Manifest.MANIFEST_ID, key.encrypt(msgpack.packb({
- 'version': 1,
- 'archives': {},
- 'config': {},
- 'timestamp': (utcnow() + timedelta(days=1)).strftime(ISO_FORMAT),
- })))
- repository.commit(compact=False)
- def test_fresh_init_tam_required(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- repository = Repository(self.repository_path, exclusive=True)
- with repository:
- manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- repository.put(Manifest.MANIFEST_ID, key.encrypt(msgpack.packb({
- 'version': 1,
- 'archives': {},
- 'timestamp': (utcnow() + timedelta(days=1)).strftime(ISO_FORMAT),
- })))
- repository.commit(compact=False)
- with pytest.raises(TAMRequiredError):
- self.cmd('list', self.repository_location)
- def test_not_required(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('archive1234')
- repository = Repository(self.repository_path, exclusive=True)
- with repository:
- shutil.rmtree(get_security_dir(bin_to_hex(repository.id)))
- _, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- key.tam_required = False
- key.change_passphrase(key._passphrase)
- manifest = msgpack.unpackb(key.decrypt(None, repository.get(Manifest.MANIFEST_ID)))
- del manifest[b'tam']
- repository.put(Manifest.MANIFEST_ID, key.encrypt(msgpack.packb(manifest)))
- repository.commit(compact=False)
- output = self.cmd('list', '--debug', self.repository_location)
- assert 'archive1234' in output
- assert 'Manifest TAM not found and not required' in output
- # Run upgrade
- self.cmd('upgrade', '--tam', self.repository_location)
- # Manifest must be authenticated now
- output = self.cmd('list', '--debug', self.repository_location)
- assert 'archive1234' in output
- assert 'TAM-verified manifest' in output
- # Try to spoof / modify pre-1.0.9
- self.spoof_manifest(repository)
- # Fails
- with pytest.raises(TAMRequiredError):
- self.cmd('list', self.repository_location)
- # Force upgrade
- self.cmd('upgrade', '--tam', '--force', self.repository_location)
- self.cmd('list', self.repository_location)
- def test_disable(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('archive1234')
- self.cmd('upgrade', '--disable-tam', self.repository_location)
- repository = Repository(self.repository_path, exclusive=True)
- self.spoof_manifest(repository)
- assert not self.cmd('list', self.repository_location)
- def test_disable2(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('archive1234')
- repository = Repository(self.repository_path, exclusive=True)
- self.spoof_manifest(repository)
- self.cmd('upgrade', '--disable-tam', self.repository_location)
- assert not self.cmd('list', self.repository_location)
- class ArchiveAuthenticationTest(ArchiverTestCaseBase):
- def write_archive_without_tam(self, repository, archive_name):
- manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
- archive_data = msgpack.packb({
- 'version': 1,
- 'name': archive_name,
- 'items': [],
- 'cmdline': '',
- 'hostname': '',
- 'username': '',
- 'time': utcnow().strftime(ISO_FORMAT),
- })
- archive_id = key.id_hash(archive_data)
- repository.put(archive_id, key.encrypt(archive_data))
- manifest.archives[archive_name] = (archive_id, datetime.now())
- manifest.write()
- repository.commit(compact=False)
- def test_upgrade_archives_tam(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('archive_tam')
- repository = Repository(self.repository_path, exclusive=True)
- with repository:
- self.write_archive_without_tam(repository, "archive_no_tam")
- output = self.cmd('list', '--format="{name} tam:{tam}{NL}"', self.repository_location)
- assert 'archive_tam tam:verified' in output # good
- assert 'archive_no_tam tam:none' in output # could be borg < 1.0.9 archive or fake
- self.cmd('upgrade', '--archives-tam', self.repository_location)
- output = self.cmd('list', '--format="{name} tam:{tam}{NL}"', self.repository_location)
- assert 'archive_tam tam:verified' in output # still good
- assert 'archive_no_tam tam:verified' in output # previously TAM-less archives got a TAM now
- def test_check_rebuild_manifest(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('archive_tam')
- repository = Repository(self.repository_path, exclusive=True)
- with repository:
- self.write_archive_without_tam(repository, "archive_no_tam")
- repository.delete(Manifest.MANIFEST_ID) # kill manifest, so check has to rebuild it
- repository.commit(compact=False)
- self.cmd('check', '--repair', self.repository_location)
- output = self.cmd('list', '--format="{name} tam:{tam}{NL}"', self.repository_location)
- assert 'archive_tam tam:verified' in output # TAM-verified archive is in rebuilt manifest
- assert 'archive_no_tam' not in output # check got rid of untrusted not TAM-verified archive
- def test_check_rebuild_refcounts(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_src_archive('archive_tam')
- archive_id_pre_check = self.cmd('list', '--format="{name} {id}{NL}"', self.repository_location)
- repository = Repository(self.repository_path, exclusive=True)
- with repository:
- self.write_archive_without_tam(repository, "archive_no_tam")
- output = self.cmd('list', '--format="{name} tam:{tam}{NL}"', self.repository_location)
- assert 'archive_tam tam:verified' in output # good
- assert 'archive_no_tam tam:none' in output # could be borg < 1.0.9 archive or fake
- self.cmd('check', '--repair', self.repository_location)
- output = self.cmd('list', '--format="{name} tam:{tam}{NL}"', self.repository_location)
- assert 'archive_tam tam:verified' in output # TAM-verified archive still there
- assert 'archive_no_tam' not in output # check got rid of untrusted not TAM-verified archive
- archive_id_post_check = self.cmd('list', '--format="{name} {id}{NL}"', self.repository_location)
- assert archive_id_post_check == archive_id_pre_check # rebuild_refcounts didn't change archive_tam archive id
- class RemoteArchiverTestCase(ArchiverTestCase):
- prefix = '__testsuite__:'
- def open_repository(self):
- return RemoteRepository(Location(self.repository_location))
- def test_remote_repo_restrict_to_path(self):
- # restricted to repo directory itself:
- with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', self.repository_path]):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- # restricted to repo directory itself, fail for other directories with same prefix:
- with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', self.repository_path]):
- with pytest.raises(PathNotAllowed):
- self.cmd('init', '--encryption=repokey', self.repository_location + '_0')
- # restricted to a completely different path:
- with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', '/foo']):
- with pytest.raises(PathNotAllowed):
- self.cmd('init', '--encryption=repokey', self.repository_location + '_1')
- path_prefix = os.path.dirname(self.repository_path)
- # restrict to repo directory's parent directory:
- with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', path_prefix]):
- self.cmd('init', '--encryption=repokey', self.repository_location + '_2')
- # restrict to repo directory's parent directory and another directory:
- with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', '/foo', '--restrict-to-path', path_prefix]):
- self.cmd('init', '--encryption=repokey', self.repository_location + '_3')
- def test_remote_repo_restrict_to_repository(self):
- # restricted to repo directory itself:
- with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-repository', self.repository_path]):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- parent_path = os.path.join(self.repository_path, '..')
- with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-repository', parent_path]):
- with pytest.raises(PathNotAllowed):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- @unittest.skip('only works locally')
- def test_debug_put_get_delete_obj(self):
- pass
- @unittest.skip('only works locally')
- def test_config(self):
- pass
- @unittest.skip('only works locally')
- def test_migrate_lock_alive(self):
- pass
- def test_remote_repo_strip_components_doesnt_leak(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('dir/file', contents=b"test file contents 1")
- self.create_regular_file('dir/file2', contents=b"test file contents 2")
- self.create_regular_file('skipped-file1', contents=b"test file contents 3")
- self.create_regular_file('skipped-file2', contents=b"test file contents 4")
- self.create_regular_file('skipped-file3', contents=b"test file contents 5")
- self.cmd('create', self.repository_location + '::test', 'input')
- marker = 'cached responses left in RemoteRepository'
- with changedir('output'):
- res = self.cmd('extract', "--debug", self.repository_location + '::test', '--strip-components', '3')
- assert marker not in res
- with self.assert_creates_file('file'):
- res = self.cmd('extract', "--debug", self.repository_location + '::test', '--strip-components', '2')
- assert marker not in res
- with self.assert_creates_file('dir/file'):
- res = self.cmd('extract', "--debug", self.repository_location + '::test', '--strip-components', '1')
- assert marker not in res
- with self.assert_creates_file('input/dir/file'):
- res = self.cmd('extract', "--debug", self.repository_location + '::test', '--strip-components', '0')
- assert marker not in res
- class ArchiverCorruptionTestCase(ArchiverTestCaseBase):
- def setUp(self):
- super().setUp()
- self.create_test_files()
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cache_path = json.loads(self.cmd('info', self.repository_location, '--json'))['cache']['path']
- def corrupt(self, file, amount=1):
- with open(file, 'r+b') as fd:
- fd.seek(-amount, io.SEEK_END)
- corrupted = bytes(255-c for c in fd.read(amount))
- fd.seek(-amount, io.SEEK_END)
- fd.write(corrupted)
- @pytest.mark.allow_cache_wipe
- def test_cache_chunks(self):
- self.create_src_archive("test")
- chunks_path = os.path.join(self.cache_path, 'chunks')
- chunks_before_corruption = set(ChunkIndex(path=chunks_path).iteritems())
- self.corrupt(chunks_path)
- assert not self.FORK_DEFAULT # test does not support forking
- chunks_in_memory = None
- sync_chunks = LocalCache.sync
- def sync_wrapper(cache):
- nonlocal chunks_in_memory
- sync_chunks(cache)
- chunks_in_memory = set(cache.chunks.iteritems())
- with patch.object(LocalCache, "sync", sync_wrapper):
- out = self.cmd("info", self.repository_location)
- assert chunks_in_memory == chunks_before_corruption
- assert "forcing a cache rebuild" in out
- chunks_after_repair = set(ChunkIndex(path=chunks_path).iteritems())
- assert chunks_after_repair == chunks_before_corruption
- def test_cache_files(self):
- self.cmd('create', self.repository_location + '::test', 'input')
- self.corrupt(os.path.join(self.cache_path, 'files'))
- out = self.cmd('create', self.repository_location + '::test1', 'input')
- # borg warns about the corrupt files cache, but then continues without files cache.
- assert 'files cache is corrupted' in out
- def test_chunks_archive(self):
- self.cmd('create', self.repository_location + '::test1', 'input')
- # Find ID of test1 so we can corrupt it later :)
- target_id = self.cmd('list', self.repository_location, '--format={id}{LF}').strip()
- self.cmd('create', self.repository_location + '::test2', 'input')
- # Force cache sync, creating archive chunks of test1 and test2 in chunks.archive.d
- self.cmd('delete', '--cache-only', self.repository_location)
- self.cmd('info', self.repository_location, '--json')
- chunks_archive = os.path.join(self.cache_path, 'chunks.archive.d')
- assert len(os.listdir(chunks_archive)) == 4 # two archives, one chunks cache and one .integrity file each
- self.corrupt(os.path.join(chunks_archive, target_id + '.compact'))
- # Trigger cache sync by changing the manifest ID in the cache config
- config_path = os.path.join(self.cache_path, 'config')
- config = ConfigParser(interpolation=None)
- config.read(config_path)
- config.set('cache', 'manifest', bin_to_hex(bytes(32)))
- with open(config_path, 'w') as fd:
- config.write(fd)
- # Cache sync notices corrupted archive chunks, but automatically recovers.
- out = self.cmd('create', '-v', self.repository_location + '::test3', 'input', exit_code=1)
- assert 'Reading cached archive chunk index for test1' in out
- assert 'Cached archive chunk index of test1 is corrupted' in out
- assert 'Fetching and building archive index for test1' in out
- def test_old_version_interfered(self):
- # Modify the main manifest ID without touching the manifest ID in the integrity section.
- # This happens if a version without integrity checking modifies the cache.
- config_path = os.path.join(self.cache_path, 'config')
- config = ConfigParser(interpolation=None)
- config.read(config_path)
- config.set('cache', 'manifest', bin_to_hex(bytes(32)))
- with open(config_path, 'w') as fd:
- config.write(fd)
- out = self.cmd('info', self.repository_location)
- assert 'Cache integrity data not available: old Borg version modified the cache.' in out
- class DiffArchiverTestCase(ArchiverTestCaseBase):
- requires_hardlinks = pytest.mark.skipif(not are_hardlinks_supported(), reason='hardlinks not supported')
- def test_basic_functionality(self):
- # Setup files for the first snapshot
- self.create_regular_file('empty', size=0)
- self.create_regular_file('file_unchanged', size=128)
- self.create_regular_file('file_removed', size=256)
- self.create_regular_file('file_removed2', size=512)
- self.create_regular_file('file_replaced', size=1024)
- self.create_regular_file('file_touched', size=128)
- os.mkdir('input/dir_replaced_with_file')
- os.chmod('input/dir_replaced_with_file', stat.S_IFDIR | 0o755)
- os.mkdir('input/dir_removed')
- if are_symlinks_supported():
- os.mkdir('input/dir_replaced_with_link')
- os.symlink('input/dir_replaced_with_file', 'input/link_changed')
- os.symlink('input/file_unchanged', 'input/link_removed')
- os.symlink('input/file_removed2', 'input/link_target_removed')
- os.symlink('input/empty', 'input/link_target_contents_changed')
- os.symlink('input/empty', 'input/link_replaced_by_file')
- if are_hardlinks_supported():
- os.link('input/file_replaced', 'input/hardlink_target_replaced')
- os.link('input/empty', 'input/hardlink_contents_changed')
- os.link('input/file_removed', 'input/hardlink_removed')
- os.link('input/file_removed2', 'input/hardlink_target_removed')
- self.cmd('init', '--encryption=repokey', self.repository_location)
- # Create the first snapshot
- self.cmd('create', self.repository_location + '::test0', 'input')
- # Setup files for the second snapshot
- self.create_regular_file('file_added', size=2048)
- self.create_regular_file('file_empty_added', size=0)
- os.unlink('input/file_replaced')
- self.create_regular_file('file_replaced', contents=b'0' * 4096)
- os.unlink('input/file_removed')
- os.unlink('input/file_removed2')
- time.sleep(1) # macOS HFS+ has a 1s timestamp granularity
- Path('input/file_touched').touch()
- os.rmdir('input/dir_replaced_with_file')
- self.create_regular_file('dir_replaced_with_file', size=8192)
- os.chmod('input/dir_replaced_with_file', stat.S_IFREG | 0o755)
- os.mkdir('input/dir_added')
- os.rmdir('input/dir_removed')
- if are_symlinks_supported():
- os.rmdir('input/dir_replaced_with_link')
- os.symlink('input/dir_added', 'input/dir_replaced_with_link')
- os.unlink('input/link_changed')
- os.symlink('input/dir_added', 'input/link_changed')
- os.symlink('input/dir_added', 'input/link_added')
- os.unlink('input/link_replaced_by_file')
- self.create_regular_file('link_replaced_by_file', size=16384)
- os.unlink('input/link_removed')
- if are_hardlinks_supported():
- os.unlink('input/hardlink_removed')
- os.link('input/file_added', 'input/hardlink_added')
- with open('input/empty', 'ab') as fd:
- fd.write(b'appended_data')
- # Create the second snapshot
- self.cmd('create', self.repository_location + '::test1a', 'input')
- self.cmd('create', '--chunker-params', '16,18,17,4095', self.repository_location + '::test1b', 'input')
- def do_asserts(output, can_compare_ids, content_only=False):
- # File contents changed (deleted and replaced with a new file)
- change = 'B' if can_compare_ids else '{:<19}'.format('modified')
- lines = output.splitlines()
- assert 'file_replaced' in output # added to debug #3494
- self.assert_line_exists(lines, f"{change}.*input/file_replaced")
- # File unchanged
- assert 'input/file_unchanged' not in output
- # Directory replaced with a regular file
- if "BORG_TESTS_IGNORE_MODES" not in os.environ and not is_win32 and not content_only:
- self.assert_line_exists(lines, "drwxr-xr-x -> -rwxr-xr-x.*input/dir_replaced_with_file")
- # Basic directory cases
- assert 'added directory input/dir_added' in output
- assert 'removed directory input/dir_removed' in output
- if are_symlinks_supported():
- # Basic symlink cases
- self.assert_line_exists(lines, "changed link.*input/link_changed")
- self.assert_line_exists(lines, "added link.*input/link_added")
- self.assert_line_exists(lines, "removed link.*input/link_removed")
- # Symlink replacing or being replaced
- assert 'input/dir_replaced_with_link' in output
- assert 'input/link_replaced_by_file' in output
- # Symlink target removed. Should not affect the symlink at all.
- assert 'input/link_target_removed' not in output
- # The inode has two links and the file contents changed. Borg
- # should notice the changes in both links. However, the symlink
- # pointing to the file is not changed.
- change = '0 B' if can_compare_ids else '{:<19}'.format('modified')
- self.assert_line_exists(lines, f"{change}.*input/empty")
- # Do not show a 0 byte change for a file whose contents weren't
- # modified.
- self.assert_line_not_exists(lines, '0 B.*input/file_touched')
- if not content_only:
- self.assert_line_exists(lines, "[cm]time:.*input/file_touched")
- else:
- # And if we're doing content-only, don't show the file at all.
- assert "input/file_touched" not in output
- if are_hardlinks_supported():
- self.assert_line_exists(lines, f"{change}.*input/hardlink_contents_changed")
- if are_symlinks_supported():
- assert 'input/link_target_contents_changed' not in output
- # Added a new file and a hard link to it. Both links to the same
- # inode should appear as separate files.
- assert 'added 2.05 kB input/file_added' in output
- if are_hardlinks_supported():
- assert 'added 2.05 kB input/hardlink_added' in output
- # check if a diff between non-existent and empty new file is found
- assert 'added 0 B input/file_empty_added' in output
- # The inode has two links and both of them are deleted. They should
- # appear as two deleted files.
- assert 'removed 256 B input/file_removed' in output
- if are_hardlinks_supported():
- assert 'removed 256 B input/hardlink_removed' in output
- if are_hardlinks_supported() and content_only:
- # Another link (marked previously as the source in borg) to the
- # same inode was removed. This should only change the ctime since removing
- # the link would result in the decrementation of the inode's hard-link count.
- assert "input/hardlink_target_removed" not in output
- # Another link (marked previously as the source in borg) to the
- # same inode was replaced with a new regular file. This should only change
- # its ctime. This should not be reflected in the output if content-only is set
- assert "input/hardlink_target_replaced" not in output
- def do_json_asserts(output, can_compare_ids, content_only=False):
- def get_changes(filename, data):
- chgsets = [j['changes'] for j in data if j['path'] == filename]
- assert len(chgsets) < 2
- # return a flattened list of changes for given filename
- return [chg for chgset in chgsets for chg in chgset]
- # convert output to list of dicts
- joutput = [json.loads(line) for line in output.split('\n') if line]
- # File contents changed (deleted and replaced with a new file)
- expected = {'type': 'modified', 'added': 4096, 'removed': 1024} if can_compare_ids else {'type': 'modified'}
- assert expected in get_changes('input/file_replaced', joutput)
- # File unchanged
- assert not any(get_changes('input/file_unchanged', joutput))
- # Do not show a 0 byte change for a file whose contents weren't
- # modified.
- unexpected = {'type': 'modified', 'added': 0, 'removed': 0}
- assert unexpected not in get_changes('input/file_touched', joutput)
- if not content_only:
- assert {"ctime", "mtime"}.issubset({c["type"] for c in get_changes('input/file_touched', joutput)})
- else:
- # And if we're doing content-only, don't show the file at all.
- assert not any(get_changes('input/file_touched', joutput))
- # Directory replaced with a regular file
- if 'BORG_TESTS_IGNORE_MODES' not in os.environ and not content_only:
- assert {'type': 'mode', 'old_mode': 'drwxr-xr-x', 'new_mode': '-rwxr-xr-x'} in \
- get_changes('input/dir_replaced_with_file', joutput)
- # Basic directory cases
- assert {'type': 'added directory'} in get_changes('input/dir_added', joutput)
- assert {'type': 'removed directory'} in get_changes('input/dir_removed', joutput)
- if are_symlinks_supported():
- # Basic symlink cases
- assert {'type': 'changed link'} in get_changes('input/link_changed', joutput)
- assert {'type': 'added link'} in get_changes('input/link_added', joutput)
- assert {'type': 'removed link'} in get_changes('input/link_removed', joutput)
- # Symlink replacing or being replaced
- if not content_only:
- assert any(
- chg["type"] == "mode" and chg["new_mode"].startswith("l")
- for chg in get_changes("input/dir_replaced_with_link", joutput)
- ), get_changes("input/dir_replaced_with_link", joutput)
- assert any(
- chg["type"] == "mode" and chg["old_mode"].startswith("l")
- for chg in get_changes("input/link_replaced_by_file", joutput)
- ), get_changes("input/link_replaced_by_file", joutput)
- # Symlink target removed. Should not affect the symlink at all.
- assert not any(get_changes('input/link_target_removed', joutput))
- # The inode has two links and the file contents changed. Borg
- # should notice the changes in both links. However, the symlink
- # pointing to the file is not changed.
- expected = {'type': 'modified', 'added': 13, 'removed': 0} if can_compare_ids else {'type': 'modified'}
- assert expected in get_changes('input/empty', joutput)
- if are_hardlinks_supported():
- assert expected in get_changes('input/hardlink_contents_changed', joutput)
- if are_symlinks_supported():
- assert not any(get_changes('input/link_target_contents_changed', joutput))
- # Added a new file and a hard link to it. Both links to the same
- # inode should appear as separate files.
- assert {'type': 'added', 'size': 2048} in get_changes('input/file_added', joutput)
- if are_hardlinks_supported():
- assert {'type': 'added', 'size': 2048} in get_changes('input/hardlink_added', joutput)
- # check if a diff between non-existent and empty new file is found
- assert {'type': 'added', 'size': 0} in get_changes('input/file_empty_added', joutput)
- # The inode has two links and both of them are deleted. They should
- # appear as two deleted files.
- assert {'type': 'removed', 'size': 256} in get_changes('input/file_removed', joutput)
- if are_hardlinks_supported():
- assert {'type': 'removed', 'size': 256} in get_changes('input/hardlink_removed', joutput)
- if are_hardlinks_supported() and content_only:
- # Another link (marked previously as the source in borg) to the
- # same inode was removed. This should only change the ctime since removing
- # the link would result in the decrementation of the inode's hard-link count.
- assert not any(get_changes("input/hardlink_target_removed", joutput))
- # Another link (marked previously as the source in borg) to the
- # same inode was replaced with a new regular file. This should only change
- # its ctime. This should not be reflected in the output if content-only is set
- assert not any(get_changes("input/hardlink_target_replaced", joutput))
- output = self.cmd("diff", self.repository_location + "::test0", "test1a")
- do_asserts(output, True)
- output = self.cmd("diff", self.repository_location + "::test0", "test1b", "--content-only")
- do_asserts(output, False, content_only=True)
- output = self.cmd("diff", self.repository_location + "::test0", "test1a", "--json-lines")
- do_json_asserts(output, True)
- output = self.cmd("diff", self.repository_location + "::test0", "test1a", "--json-lines", "--content-only")
- do_json_asserts(output, True, content_only=True)
- def test_sort_option(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('a_file_removed', size=8)
- self.create_regular_file('f_file_removed', size=16)
- self.create_regular_file('c_file_changed', size=32)
- self.create_regular_file('e_file_changed', size=64)
- self.cmd('create', self.repository_location + '::test0', 'input')
- os.unlink('input/a_file_removed')
- os.unlink('input/f_file_removed')
- os.unlink('input/c_file_changed')
- os.unlink('input/e_file_changed')
- self.create_regular_file('c_file_changed', size=512)
- self.create_regular_file('e_file_changed', size=1024)
- self.create_regular_file('b_file_added', size=128)
- self.create_regular_file('d_file_added', size=256)
- self.cmd('create', self.repository_location + '::test1', 'input')
- # default sorting: legacy --sort sorts by path ascending
- output = self.cmd('diff', self.repository_location + '::test0', 'test1', '--content-only', '--sort')
- expected = [
- 'Warning: "--sort" is deprecated', # workaround to make test succeed
- 'a_file_removed',
- 'b_file_added',
- 'c_file_changed',
- 'd_file_added',
- 'e_file_changed',
- 'f_file_removed',
- ]
- assert all(x in line for x, line in zip(expected, output.splitlines()))
- # single field sort by size_added descending (new --sort-by)
- output = self.cmd('diff', self.repository_location + '::test0', 'test1', '--content-only', '--sort-by=>size_added')
- # size_added for entries: e_file_changed (1024), c_file_changed (512), d_file_added (256), b_file_added (128), a_file_removed (0), f_file_removed (0)
- expected = [
- 'e_file_changed',
- 'c_file_changed',
- 'd_file_added',
- 'b_file_added',
- ]
- names_in_output = [line.split()[-1].split('/')[-1] for line in output.splitlines()]
- subset = [n for n in names_in_output if n in expected]
- assert subset == expected
- # multi-key sort: primary by size_added descending, secondary by path ascending for ties (removed files have 0)
- output = self.cmd('diff', self.repository_location + '::test0', 'test1', '--content-only', '--sort-by=>size_added,path')
- expected = [
- 'e_file_changed',
- 'c_file_changed',
- 'd_file_added',
- 'b_file_added',
- 'a_file_removed',
- 'f_file_removed',
- ]
- assert all(x in line for x, line in zip(expected, output.splitlines()))
- # sort by size_diff descending (net content change)
- output = self.cmd('diff', self.repository_location + '::test0', 'test1', '--content-only', '--sort-by=>size_diff')
- expected = [
- 'e_file_changed',
- 'c_file_changed',
- 'd_file_added',
- 'b_file_added',
- ]
- names_in_output = [line.split()[-1].split('/')[-1] for line in output.splitlines()]
- subset = [n for n in names_in_output if n in expected]
- assert subset == expected
- # sort by size (file size in archive2) descending; removed files have 0 and come last
- output = self.cmd('diff', self.repository_location + '::test0', 'test1', '--content-only', '--sort-by=>size')
- expected = [
- 'e_file_changed', # 1024 in archive2
- 'c_file_changed', # 512
- 'd_file_added', # 256
- 'b_file_added', # 128
- ]
- names_in_output = [line.split()[-1].split('/')[-1] for line in output.splitlines()]
- subset = [n for n in names_in_output if n in expected]
- assert subset == expected
- def test_sort_validation(self):
- # invalid sort field should be rejected by argparse
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file('x', size=1)
- self.cmd('create', self.repository_location + '::a', 'input')
- out = self.cmd('diff', self.repository_location + '::a', 'a', '--sort-by=invalid_field', fork=True, exit_code=2)
- assert 'unsupported sort field' in out or 'invalid choice' in out or 'unsupported' in out
- def test_time_diffs(self):
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.create_regular_file("test_file", size=10)
- self.cmd('create', self.repository_location + '::archive1', 'input')
- time.sleep(0.1)
- os.unlink("input/test_file")
- if is_win32:
- # Sleeping for 15s because Windows doesn't refresh ctime if file is deleted and recreated within 15 seconds.
- time.sleep(15)
- elif is_darwin:
- time.sleep(1) # HFS has a 1s timestamp granularity
- self.create_regular_file("test_file", size=15)
- self.cmd('create', self.repository_location + '::archive2', 'input')
- output = self.cmd("diff", self.repository_location + "::archive1", "archive2")
- self.assert_in("mtime", output)
- self.assert_in("ctime", output) # Should show up on windows as well since it is a new file.
- if is_darwin:
- time.sleep(1) # HFS has a 1s timestamp granularity
- os.chmod("input/test_file", 0o777)
- self.cmd('create', self.repository_location + '::archive3', 'input')
- output = self.cmd("diff", self.repository_location + "::archive2", "archive3")
- self.assert_not_in("mtime", output)
- # Checking platform because ctime should not be shown on windows since it wasn't recreated.
- if not is_win32:
- self.assert_in("ctime", output)
- else:
- self.assert_not_in("ctime", output)
- @requires_hardlinks
- def test_multiple_link_exclusion(self):
- path_a = os.path.join(self.input_path, 'a')
- path_b = os.path.join(self.input_path, 'b')
- os.mkdir(path_a)
- os.mkdir(path_b)
- hl_a = os.path.join(path_a, 'hardlink')
- hl_b = os.path.join(path_b, 'hardlink')
- self.create_regular_file(hl_a, contents=b'123456')
- os.link(hl_a, hl_b)
- self.cmd('init', '--encryption=repokey', self.repository_location)
- self.cmd('create', self.repository_location + '::test0', 'input')
- os.unlink(hl_a) # Don't duplicate warning message- one is enough.
- self.cmd('create', self.repository_location + '::test1', 'input')
- output = self.cmd('diff', '--pattern=+ fm:input/b', '--pattern=! **/', self.repository_location + '::test0', 'test1', exit_code=EXIT_WARNING)
- lines = output.splitlines()
- self.assert_line_exists(lines, 'cannot find hardlink source for.*skipping compare.')
- def test_get_args():
- archiver = Archiver()
- # everything normal:
- # first param is argv as produced by ssh forced command,
- # second param is like from SSH_ORIGINAL_COMMAND env variable
- args = archiver.get_args(['borg', 'serve', '--umask=0027', '--restrict-to-path=/p1', '--restrict-to-path=/p2', ],
- 'borg serve --info')
- assert args.func == archiver.do_serve
- assert args.restrict_to_paths == ['/p1', '/p2']
- assert args.umask == 0o027
- assert args.log_level == 'info'
- # similar, but with --restrict-to-repository
- args = archiver.get_args(['borg', 'serve', '--restrict-to-repository=/r1', '--restrict-to-repository=/r2', ],
- 'borg serve --info --umask=0027')
- assert args.restrict_to_repositories == ['/r1', '/r2']
- # trying to cheat - break out of path restriction
- args = archiver.get_args(['borg', 'serve', '--restrict-to-path=/p1', '--restrict-to-path=/p2', ],
- 'borg serve --restrict-to-path=/')
- assert args.restrict_to_paths == ['/p1', '/p2']
- # trying to cheat - break out of repository restriction
- args = archiver.get_args(['borg', 'serve', '--restrict-to-repository=/r1', '--restrict-to-repository=/r2', ],
- 'borg serve --restrict-to-repository=/')
- assert args.restrict_to_repositories == ['/r1', '/r2']
- # trying to cheat - break below repository restriction
- args = archiver.get_args(['borg', 'serve', '--restrict-to-repository=/r1', '--restrict-to-repository=/r2', ],
- 'borg serve --restrict-to-repository=/r1/below')
- assert args.restrict_to_repositories == ['/r1', '/r2']
- # trying to cheat - try to execute different subcommand
- args = archiver.get_args(['borg', 'serve', '--restrict-to-path=/p1', '--restrict-to-path=/p2', ],
- 'borg init --encryption=repokey /')
- assert args.func == archiver.do_serve
- # Check that environment variables in the forced command don't cause issues. If the command
- # were not forced, environment variables would be interpreted by the shell, but this does not
- # happen for forced commands - we get the verbatim command line and need to deal with env vars.
- args = archiver.get_args(['borg', 'serve', ],
- 'BORG_FOO=bar borg serve --info')
- assert args.func == archiver.do_serve
- def test_chunk_content_equal():
- def ccc(a, b):
- chunks_a = [data for data in a]
- chunks_b = [data for data in b]
- compare1 = chunks_contents_equal(iter(chunks_a), iter(chunks_b))
- compare2 = chunks_contents_equal(iter(chunks_b), iter(chunks_a))
- assert compare1 == compare2
- return compare1
- assert ccc([
- b'1234', b'567A', b'bC'
- ], [
- b'1', b'23', b'4567A', b'b', b'C'
- ])
- # one iterator exhausted before the other
- assert not ccc([
- b'12345',
- ], [
- b'1234', b'56'
- ])
- # content mismatch
- assert not ccc([
- b'1234', b'65'
- ], [
- b'1234', b'56'
- ])
- # first is the prefix of second
- assert not ccc([
- b'1234', b'56'
- ], [
- b'1234', b'565'
- ])
- class TestBuildFilter:
- @staticmethod
- def peek_and_store_hardlink_masters(item, matched):
- pass
- def test_basic(self):
- matcher = PatternMatcher()
- matcher.add([parse_pattern('included')], IECommand.Include)
- filter = Archiver.build_filter(matcher, self.peek_and_store_hardlink_masters, 0)
- assert filter(Item(path='included'))
- assert filter(Item(path='included/file'))
- assert not filter(Item(path='something else'))
- def test_empty(self):
- matcher = PatternMatcher(fallback=True)
- filter = Archiver.build_filter(matcher, self.peek_and_store_hardlink_masters, 0)
- assert filter(Item(path='anything'))
- def test_strip_components(self):
- matcher = PatternMatcher(fallback=True)
- filter = Archiver.build_filter(matcher, self.peek_and_store_hardlink_masters, strip_components=1)
- assert not filter(Item(path='shallow'))
- assert not filter(Item(path='shallow/')) # can this even happen? paths are normalized...
- assert filter(Item(path='deep enough/file'))
- assert filter(Item(path='something/dir/file'))
- class TestCommonOptions:
- @staticmethod
- def define_common_options(add_common_option):
- add_common_option('-h', '--help', action='help', help='show this help message and exit')
- add_common_option('--critical', dest='log_level', help='foo',
- action='store_const', const='critical', default='warning')
- add_common_option('--error', dest='log_level', help='foo',
- action='store_const', const='error', default='warning')
- add_common_option('--append', dest='append', help='foo',
- action='append', metavar='TOPIC', default=[])
- add_common_option('-p', '--progress', dest='progress', action='store_true', help='foo')
- add_common_option('--lock-wait', dest='lock_wait', type=int, metavar='N', default=1,
- help='(default: %(default)d).')
- @pytest.fixture
- def basic_parser(self):
- parser = argparse.ArgumentParser(prog='test', description='test parser', add_help=False)
- parser.common_options = Archiver.CommonOptions(self.define_common_options,
- suffix_precedence=('_level0', '_level1'))
- return parser
- @pytest.fixture
- def subparsers(self, basic_parser):
- return basic_parser.add_subparsers(title='required arguments', metavar='<command>')
- @pytest.fixture
- def parser(self, basic_parser):
- basic_parser.common_options.add_common_group(basic_parser, '_level0', provide_defaults=True)
- return basic_parser
- @pytest.fixture
- def common_parser(self, parser):
- common_parser = argparse.ArgumentParser(add_help=False, prog='test')
- parser.common_options.add_common_group(common_parser, '_level1')
- return common_parser
- @pytest.fixture
- def parse_vars_from_line(self, parser, subparsers, common_parser):
- subparser = subparsers.add_parser('subcommand', parents=[common_parser], add_help=False,
- description='foo', epilog='bar', help='baz',
- formatter_class=argparse.RawDescriptionHelpFormatter)
- subparser.set_defaults(func=1234)
- subparser.add_argument('--append-only', dest='append_only', action='store_true')
- def parse_vars_from_line(*line):
- print(line)
- args = parser.parse_args(line)
- parser.common_options.resolve(args)
- return vars(args)
- return parse_vars_from_line
- def test_simple(self, parse_vars_from_line):
- assert parse_vars_from_line('--error') == {
- 'append': [],
- 'lock_wait': 1,
- 'log_level': 'error',
- 'progress': False
- }
- assert parse_vars_from_line('--error', 'subcommand', '--critical') == {
- 'append': [],
- 'lock_wait': 1,
- 'log_level': 'critical',
- 'progress': False,
- 'append_only': False,
- 'func': 1234,
- }
- with pytest.raises(SystemExit):
- parse_vars_from_line('--append-only', 'subcommand')
- assert parse_vars_from_line('--append=foo', '--append', 'bar', 'subcommand', '--append', 'baz') == {
- 'append': ['foo', 'bar', 'baz'],
- 'lock_wait': 1,
- 'log_level': 'warning',
- 'progress': False,
- 'append_only': False,
- 'func': 1234,
- }
- @pytest.mark.parametrize('position', ('before', 'after', 'both'))
- @pytest.mark.parametrize('flag,args_key,args_value', (
- ('-p', 'progress', True),
- ('--lock-wait=3', 'lock_wait', 3),
- ))
- def test_flag_position_independence(self, parse_vars_from_line, position, flag, args_key, args_value):
- line = []
- if position in ('before', 'both'):
- line.append(flag)
- line.append('subcommand')
- if position in ('after', 'both'):
- line.append(flag)
- result = {
- 'append': [],
- 'lock_wait': 1,
- 'log_level': 'warning',
- 'progress': False,
- 'append_only': False,
- 'func': 1234,
- }
- result[args_key] = args_value
- assert parse_vars_from_line(*line) == result
- def test_parse_storage_quota():
- assert parse_storage_quota('50M') == 50 * 1000**2
- with pytest.raises(argparse.ArgumentTypeError):
- parse_storage_quota('5M')
- def get_all_parsers():
- """
- Return dict mapping command to parser.
- """
- parser = Archiver(prog='borg').build_parser()
- borgfs_parser = Archiver(prog='borgfs').build_parser()
- parsers = {}
- def discover_level(prefix, parser, Archiver, extra_choices=None):
- choices = {}
- for action in parser._actions:
- if action.choices is not None and 'SubParsersAction' in str(action.__class__):
- for cmd, parser in action.choices.items():
- choices[prefix + cmd] = parser
- if extra_choices is not None:
- choices.update(extra_choices)
- if prefix and not choices:
- return
- for command, parser in sorted(choices.items()):
- discover_level(command + " ", parser, Archiver)
- parsers[command] = parser
- discover_level("", parser, Archiver, {'borgfs': borgfs_parser})
- return parsers
- @pytest.mark.parametrize('command, parser', list(get_all_parsers().items()))
- def test_help_formatting(command, parser):
- if isinstance(parser.epilog, RstToTextLazy):
- assert parser.epilog.rst
- @pytest.mark.parametrize('topic', list(Archiver.helptext.keys()))
- def test_help_formatting_helptexts(topic):
- helptext = Archiver.helptext[topic]
- assert str(rst_to_terminal(helptext))
|