checks.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. import os
  2. import shutil
  3. from datetime import datetime, timezone, timedelta
  4. from unittest.mock import patch
  5. import pytest
  6. from ...cache import Cache, LocalCache
  7. from ...constants import * # NOQA
  8. from ...crypto.key import TAMRequiredError
  9. from ...helpers import Location, get_security_dir, bin_to_hex
  10. from ...helpers import EXIT_ERROR
  11. from ...helpers import msgpack
  12. from ...manifest import Manifest, MandatoryFeatureUnsupported
  13. from ...remote import RemoteRepository, PathNotAllowed
  14. from ...repository import Repository
  15. from .. import llfuse
  16. from .. import changedir, environment_variable
  17. from . import cmd, _extract_repository_id, open_repository, check_cache, create_test_files, create_src_archive
  18. from . import _set_repository_id, create_regular_file, assert_creates_file, generate_archiver_tests, RK_ENCRYPTION
  19. pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote") # NOQA
  20. def get_security_directory(repo_path):
  21. repository_id = bin_to_hex(_extract_repository_id(repo_path))
  22. return get_security_dir(repository_id)
  23. def add_unknown_feature(repo_path, operation):
  24. with Repository(repo_path, exclusive=True) as repository:
  25. manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
  26. manifest.config["feature_flags"] = {operation.value: {"mandatory": ["unknown-feature"]}}
  27. manifest.write()
  28. repository.commit(compact=False)
  29. def cmd_raises_unknown_feature(archiver, args):
  30. if archiver.FORK_DEFAULT:
  31. cmd(archiver, *args, exit_code=EXIT_ERROR)
  32. else:
  33. with pytest.raises(MandatoryFeatureUnsupported) as excinfo:
  34. cmd(archiver, *args)
  35. assert excinfo.value.args == (["unknown-feature"],)
  36. def test_repository_swap_detection(archivers, request):
  37. archiver = request.getfixturevalue(archivers)
  38. repo_location, repo_path, input_path = archiver.repository_location, archiver.repository_path, archiver.input_path
  39. create_test_files(input_path)
  40. os.environ["BORG_PASSPHRASE"] = "passphrase"
  41. cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  42. repository_id = _extract_repository_id(repo_path)
  43. cmd(archiver, f"--repo={repo_location}", "create", "test", "input")
  44. shutil.rmtree(repo_path)
  45. cmd(archiver, f"--repo={repo_location}", "rcreate", "--encryption=none")
  46. _set_repository_id(repo_path, repository_id)
  47. assert repository_id == _extract_repository_id(repo_path)
  48. if archiver.FORK_DEFAULT:
  49. cmd(archiver, f"--repo={repo_location}", "create", "test.2", "input", exit_code=EXIT_ERROR)
  50. else:
  51. with pytest.raises(Cache.EncryptionMethodMismatch):
  52. cmd(archiver, f"--repo={repo_location}", "create", "test.2", "input")
  53. def test_repository_swap_detection2(archivers, request):
  54. archiver = request.getfixturevalue(archivers)
  55. repo_location, repo_path, input_path = archiver.repository_location, archiver.repository_path, archiver.input_path
  56. create_test_files(input_path)
  57. cmd(archiver, f"--repo={repo_location}_unencrypted", "rcreate", "--encryption=none")
  58. os.environ["BORG_PASSPHRASE"] = "passphrase"
  59. cmd(archiver, f"--repo={repo_location}_encrypted", "rcreate", RK_ENCRYPTION)
  60. cmd(archiver, f"--repo={repo_location}_encrypted", "create", "test", "input")
  61. shutil.rmtree(repo_path + "_encrypted")
  62. os.replace(repo_path + "_unencrypted", repo_path + "_encrypted")
  63. if archiver.FORK_DEFAULT:
  64. cmd(archiver, f"--repo={repo_location}_encrypted", "create", "test.2", "input", exit_code=EXIT_ERROR)
  65. else:
  66. with pytest.raises(Cache.RepositoryAccessAborted):
  67. cmd(archiver, f"--repo={repo_location}_encrypted", "create", "test.2", "input")
  68. def test_repository_swap_detection_no_cache(archivers, request):
  69. archiver = request.getfixturevalue(archivers)
  70. repo_location, repo_path, input_path = archiver.repository_location, archiver.repository_path, archiver.input_path
  71. create_test_files(input_path)
  72. os.environ["BORG_PASSPHRASE"] = "passphrase"
  73. cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  74. repository_id = _extract_repository_id(repo_path)
  75. cmd(archiver, f"--repo={repo_location}", "create", "test", "input")
  76. shutil.rmtree(repo_path)
  77. cmd(archiver, f"--repo={repo_location}", "rcreate", "--encryption=none")
  78. _set_repository_id(repo_path, repository_id)
  79. assert repository_id == _extract_repository_id(repo_path)
  80. cmd(archiver, f"--repo={repo_location}", "rdelete", "--cache-only")
  81. if archiver.FORK_DEFAULT:
  82. cmd(archiver, f"--repo={repo_location}", "create", "test.2", "input", exit_code=EXIT_ERROR)
  83. else:
  84. with pytest.raises(Cache.EncryptionMethodMismatch):
  85. cmd(archiver, f"--repo={repo_location}", "create", "test.2", "input")
  86. def test_repository_swap_detection2_no_cache(archivers, request):
  87. archiver = request.getfixturevalue(archivers)
  88. repo_location, repo_path, input_path = archiver.repository_location, archiver.repository_path, archiver.input_path
  89. create_test_files(input_path)
  90. cmd(archiver, f"--repo={repo_location}_unencrypted", "rcreate", "--encryption=none")
  91. os.environ["BORG_PASSPHRASE"] = "passphrase"
  92. cmd(archiver, f"--repo={repo_location}_encrypted", "rcreate", RK_ENCRYPTION)
  93. cmd(archiver, f"--repo={repo_location}_encrypted", "create", "test", "input")
  94. cmd(archiver, f"--repo={repo_location}_unencrypted", "rdelete", "--cache-only")
  95. cmd(archiver, f"--repo={repo_location}_encrypted", "rdelete", "--cache-only")
  96. shutil.rmtree(repo_path + "_encrypted")
  97. os.replace(repo_path + "_unencrypted", repo_path + "_encrypted")
  98. if archiver.FORK_DEFAULT:
  99. cmd(archiver, f"--repo={repo_location}_encrypted", "create", "test.2", "input", exit_code=EXIT_ERROR)
  100. else:
  101. with pytest.raises(Cache.RepositoryAccessAborted):
  102. cmd(archiver, f"--repo={repo_location}_encrypted", "create", "test.2", "input")
  103. def test_repository_swap_detection_repokey_blank_passphrase(archivers, request):
  104. archiver = request.getfixturevalue(archivers)
  105. repo_location, repo_path, input_path = archiver.repository_location, archiver.repository_path, archiver.input_path
  106. # Check that a repokey repo with a blank passphrase is considered like a plaintext repo.
  107. create_test_files(input_path)
  108. # User initializes her repository with her passphrase
  109. cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  110. cmd(archiver, f"--repo={repo_location}", "create", "test", "input")
  111. # Attacker replaces it with her own repository, which is encrypted but has no passphrase set
  112. shutil.rmtree(repo_path)
  113. with environment_variable(BORG_PASSPHRASE=""):
  114. cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  115. # Delete cache & security database, AKA switch to user perspective
  116. cmd(archiver, f"--repo={repo_location}", "rdelete", "--cache-only")
  117. shutil.rmtree(get_security_directory(repo_path))
  118. with environment_variable(BORG_PASSPHRASE=None):
  119. # This is the part were the user would be tricked, e.g. she assumes that BORG_PASSPHRASE
  120. # is set, while it isn't. Previously this raised no warning,
  121. # since the repository is, technically, encrypted.
  122. if archiver.FORK_DEFAULT:
  123. cmd(archiver, f"--repo={repo_location}", "create", "test.2", "input", exit_code=EXIT_ERROR)
  124. else:
  125. with pytest.raises(Cache.CacheInitAbortedError):
  126. cmd(archiver, f"--repo={repo_location}", "create", "test.2", "input")
  127. def test_repository_move(archivers, request):
  128. archiver = request.getfixturevalue(archivers)
  129. repo_location, repo_path = archiver.repository_location, archiver.repository_path
  130. cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  131. security_dir = get_security_directory(repo_path)
  132. os.replace(repo_path, repo_path + "_new")
  133. with environment_variable(BORG_RELOCATED_REPO_ACCESS_IS_OK="yes"):
  134. cmd(archiver, f"--repo={repo_location}_new", "rinfo")
  135. with open(os.path.join(security_dir, "location")) as fd:
  136. location = fd.read()
  137. assert location == Location(repo_location + "_new").canonical_path()
  138. # Needs no confirmation anymore
  139. cmd(archiver, f"--repo={repo_location}_new", "rinfo")
  140. shutil.rmtree(archiver.cache_path)
  141. cmd(archiver, f"--repo={repo_location}_new", "rinfo")
  142. shutil.rmtree(security_dir)
  143. cmd(archiver, f"--repo={repo_location}_new", "rinfo")
  144. for file in ("location", "key-type", "manifest-timestamp"):
  145. assert os.path.exists(os.path.join(security_dir, file))
  146. def test_security_dir_compat(archivers, request):
  147. archiver = request.getfixturevalue(archivers)
  148. repo_location, repo_path = archiver.repository_location, archiver.repository_path
  149. cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  150. with open(os.path.join(get_security_directory(repo_path), "location"), "w") as fd:
  151. fd.write("something outdated")
  152. # This is fine, because the cache still has the correct information. security_dir and cache can disagree
  153. # if older versions are used to confirm a renamed repository.
  154. cmd(archiver, f"--repo={repo_location}", "rinfo")
  155. def test_unknown_unencrypted(archivers, request):
  156. archiver = request.getfixturevalue(archivers)
  157. repo_location, repo_path, cache_path = archiver.repository_location, archiver.repository_path, archiver.cache_path
  158. cmd(archiver, f"--repo={repo_location}", "rcreate", "--encryption=none")
  159. # Ok: repository is known
  160. cmd(archiver, f"--repo={repo_location}", "rinfo")
  161. # Ok: repository is still known (through security_dir)
  162. shutil.rmtree(cache_path)
  163. cmd(archiver, f"--repo={repo_location}", "rinfo")
  164. # Needs confirmation: cache and security dir both gone (e.g. another host or rm -rf ~)
  165. shutil.rmtree(get_security_directory(repo_path))
  166. if archiver.FORK_DEFAULT:
  167. cmd(archiver, f"--repo={repo_location}", "rinfo", exit_code=EXIT_ERROR)
  168. else:
  169. with pytest.raises(Cache.CacheInitAbortedError):
  170. cmd(archiver, f"--repo={repo_location}", "rinfo")
  171. with environment_variable(BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK="yes"):
  172. cmd(archiver, f"--repo={repo_location}", "rinfo")
  173. def test_unknown_feature_on_create(archivers, request):
  174. archiver = request.getfixturevalue(archivers)
  175. repo_location, repo_path = archiver.repository_location, archiver.repository_path
  176. print(cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION))
  177. add_unknown_feature(repo_path, Manifest.Operation.WRITE)
  178. cmd_raises_unknown_feature(archiver, [f"--repo={repo_location}", "create", "test", "input"])
  179. def test_unknown_feature_on_cache_sync(archivers, request):
  180. archiver = request.getfixturevalue(archivers)
  181. repo_location, repo_path = archiver.repository_location, archiver.repository_path
  182. cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  183. cmd(archiver, f"--repo={repo_location}", "rdelete", "--cache-only")
  184. add_unknown_feature(repo_path, Manifest.Operation.READ)
  185. cmd_raises_unknown_feature(archiver, [f"--repo={repo_location}", "create", "test", "input"])
  186. def test_unknown_feature_on_change_passphrase(archivers, request):
  187. archiver = request.getfixturevalue(archivers)
  188. repo_location, repo_path = archiver.repository_location, archiver.repository_path
  189. print(cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION))
  190. add_unknown_feature(repo_path, Manifest.Operation.CHECK)
  191. cmd_raises_unknown_feature(archiver, [f"--repo={repo_location}", "key", "change-passphrase"])
  192. def test_unknown_feature_on_read(archivers, request):
  193. archiver = request.getfixturevalue(archivers)
  194. repo_location, repo_path = archiver.repository_location, archiver.repository_path
  195. print(cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION))
  196. cmd(archiver, f"--repo={repo_location}", "create", "test", "input")
  197. add_unknown_feature(repo_path, Manifest.Operation.READ)
  198. with changedir("output"):
  199. cmd_raises_unknown_feature(archiver, [f"--repo={repo_location}", "extract", "test"])
  200. cmd_raises_unknown_feature(archiver, [f"--repo={repo_location}", "rlist"])
  201. cmd_raises_unknown_feature(archiver, [f"--repo={repo_location}", "info", "-a", "test"])
  202. def test_unknown_feature_on_rename(archivers, request):
  203. archiver = request.getfixturevalue(archivers)
  204. repo_location, repo_path = archiver.repository_location, archiver.repository_path
  205. print(cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION))
  206. cmd(archiver, f"--repo={repo_location}", "create", "test", "input")
  207. add_unknown_feature(repo_path, Manifest.Operation.CHECK)
  208. cmd_raises_unknown_feature(archiver, [f"--repo={repo_location}", "rename", "test", "other"])
  209. def test_unknown_feature_on_delete(archivers, request):
  210. archiver = request.getfixturevalue(archivers)
  211. repo_location, repo_path = archiver.repository_location, archiver.repository_path
  212. print(cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION))
  213. cmd(archiver, f"--repo={repo_location}", "create", "test", "input")
  214. add_unknown_feature(repo_path, Manifest.Operation.DELETE)
  215. # delete of an archive raises
  216. cmd_raises_unknown_feature(archiver, [f"--repo={repo_location}", "delete", "-a", "test"])
  217. cmd_raises_unknown_feature(archiver, [f"--repo={repo_location}", "prune", "--keep-daily=3"])
  218. # delete of the whole repository ignores features
  219. cmd(archiver, f"--repo={repo_location}", "rdelete")
  220. @pytest.mark.skipif(not llfuse, reason="llfuse not installed")
  221. def test_unknown_feature_on_mount(archivers, request):
  222. archiver = request.getfixturevalue(archivers)
  223. repo_location, repo_path = archiver.repository_location, archiver.repository_path
  224. cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  225. cmd(archiver, f"--repo={repo_location}", "create", "test", "input")
  226. add_unknown_feature(repo_path, Manifest.Operation.READ)
  227. mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
  228. os.mkdir(mountpoint)
  229. # XXX this might hang if it doesn't raise an error
  230. cmd_raises_unknown_feature(archiver, [f"--repo={repo_location}::test", "mount", mountpoint])
  231. @pytest.mark.allow_cache_wipe
  232. def test_unknown_mandatory_feature_in_cache(archivers, request):
  233. archiver = request.getfixturevalue(archivers)
  234. repo_location, repo_path = archiver.repository_location, archiver.repository_path
  235. remote_repo = True if archiver.get_kind() == "remote" else False
  236. print(cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION))
  237. with Repository(repo_path, exclusive=True) as repository:
  238. if remote_repo:
  239. repository._location = Location(repo_location)
  240. manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
  241. with Cache(repository, manifest) as cache:
  242. cache.begin_txn()
  243. cache.cache_config.mandatory_features = {"unknown-feature"}
  244. cache.commit()
  245. if archiver.FORK_DEFAULT:
  246. cmd(archiver, f"--repo={repo_location}", "create", "test", "input")
  247. else:
  248. called = False
  249. wipe_cache_safe = LocalCache.wipe_cache
  250. def wipe_wrapper(*args):
  251. nonlocal called
  252. called = True
  253. wipe_cache_safe(*args)
  254. with patch.object(LocalCache, "wipe_cache", wipe_wrapper):
  255. cmd(archiver, f"--repo={repo_location}", "create", "test", "input")
  256. assert called
  257. with Repository(repo_path, exclusive=True) as repository:
  258. if remote_repo:
  259. repository._location = Location(repo_location)
  260. manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
  261. with Cache(repository, manifest) as cache:
  262. assert cache.cache_config.mandatory_features == set()
  263. def test_check_cache(archivers, request):
  264. archiver = request.getfixturevalue(archivers)
  265. repo_location = archiver.repository_location
  266. cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  267. cmd(archiver, f"--repo={repo_location}", "create", "test", "input")
  268. with open_repository(archiver) as repository:
  269. manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
  270. with Cache(repository, manifest, sync=False) as cache:
  271. cache.begin_txn()
  272. cache.chunks.incref(list(cache.chunks.iteritems())[0][0])
  273. cache.commit()
  274. with pytest.raises(AssertionError):
  275. check_cache(archiver)
  276. # Begin manifest tests
  277. def spoof_manifest(repository):
  278. with repository:
  279. manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
  280. cdata = manifest.repo_objs.format(
  281. Manifest.MANIFEST_ID,
  282. {},
  283. msgpack.packb(
  284. {
  285. "version": 1,
  286. "archives": {},
  287. "config": {},
  288. "timestamp": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(timespec="microseconds"),
  289. }
  290. ),
  291. )
  292. repository.put(Manifest.MANIFEST_ID, cdata)
  293. repository.commit(compact=False)
  294. def test_fresh_init_tam_required(archiver):
  295. repo_location, repo_path = archiver.repository_location, archiver.repository_path
  296. cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  297. repository = Repository(repo_path, exclusive=True)
  298. with repository:
  299. manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
  300. cdata = manifest.repo_objs.format(
  301. Manifest.MANIFEST_ID,
  302. {},
  303. msgpack.packb(
  304. {
  305. "version": 1,
  306. "archives": {},
  307. "timestamp": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(timespec="microseconds"),
  308. }
  309. ),
  310. )
  311. repository.put(Manifest.MANIFEST_ID, cdata)
  312. repository.commit(compact=False)
  313. with pytest.raises(TAMRequiredError):
  314. cmd(archiver, f"--repo={repo_location}", "rlist")
  315. def test_not_required(archiver):
  316. repo_location, repo_path = archiver.repository_location, archiver.repository_path
  317. cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  318. create_src_archive(archiver, "archive1234")
  319. repository = Repository(repo_path, exclusive=True)
  320. # Manifest must be authenticated now
  321. output = cmd(archiver, f"--repo={repo_location}", "rlist", "--debug")
  322. assert "archive1234" in output
  323. assert "TAM-verified manifest" in output
  324. # Try to spoof / modify pre-1.0.9
  325. spoof_manifest(repository)
  326. # Fails
  327. with pytest.raises(TAMRequiredError):
  328. cmd(archiver, f"--repo={repo_location}", "rlist")
  329. # Begin Remote Tests
  330. def test_remote_repo_restrict_to_path(remote_archiver):
  331. repo_location, repo_path = remote_archiver.repository_location, remote_archiver.repository_path
  332. # restricted to repo directory itself:
  333. with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]):
  334. cmd(remote_archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  335. # restricted to repo directory itself, fail for other directories with same prefix:
  336. with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]):
  337. with pytest.raises(PathNotAllowed):
  338. cmd(remote_archiver, f"--repo={repo_location}_0", "rcreate", RK_ENCRYPTION)
  339. # restricted to a completely different path:
  340. with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo"]):
  341. with pytest.raises(PathNotAllowed):
  342. cmd(remote_archiver, f"--repo={repo_location}_1", "rcreate", RK_ENCRYPTION)
  343. path_prefix = os.path.dirname(repo_path)
  344. # restrict to repo directory's parent directory:
  345. with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", path_prefix]):
  346. cmd(remote_archiver, f"--repo={repo_location}_2", "rcreate", RK_ENCRYPTION)
  347. # restrict to repo directory's parent directory and another directory:
  348. with patch.object(
  349. RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo", "--restrict-to-path", path_prefix]
  350. ):
  351. cmd(remote_archiver, f"--repo={repo_location}_3", "rcreate", RK_ENCRYPTION)
  352. def test_remote_repo_restrict_to_repository(remote_archiver):
  353. repo_location, repo_path = remote_archiver.repository_location, remote_archiver.repository_path
  354. # restricted to repo directory itself:
  355. with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", repo_path]):
  356. cmd(remote_archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  357. parent_path = os.path.join(repo_path, "..")
  358. with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", parent_path]):
  359. with pytest.raises(PathNotAllowed):
  360. cmd(remote_archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  361. def test_remote_repo_strip_components_doesnt_leak(remote_archiver):
  362. repo_location, input_path = remote_archiver.repository_location, remote_archiver.input_path
  363. cmd(remote_archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
  364. create_regular_file(input_path, "dir/file", contents=b"test file contents 1")
  365. create_regular_file(input_path, "dir/file2", contents=b"test file contents 2")
  366. create_regular_file(input_path, "skipped-file1", contents=b"test file contents 3")
  367. create_regular_file(input_path, "skipped-file2", contents=b"test file contents 4")
  368. create_regular_file(input_path, "skipped-file3", contents=b"test file contents 5")
  369. cmd(remote_archiver, f"--repo={repo_location}", "create", "test", "input")
  370. marker = "cached responses left in RemoteRepository"
  371. with changedir("output"):
  372. res = cmd(remote_archiver, f"--repo={repo_location}", "extract", "test", "--debug", "--strip-components", "3")
  373. assert marker not in res
  374. with assert_creates_file("file"):
  375. res = cmd(
  376. remote_archiver, f"--repo={repo_location}", "extract", "test", "--debug", "--strip-components", "2"
  377. )
  378. assert marker not in res
  379. with assert_creates_file("dir/file"):
  380. res = cmd(
  381. remote_archiver, f"--repo={repo_location}", "extract", "test", "--debug", "--strip-components", "1"
  382. )
  383. assert marker not in res
  384. with assert_creates_file("input/dir/file"):
  385. res = cmd(
  386. remote_archiver, f"--repo={repo_location}", "extract", "test", "--debug", "--strip-components", "0"
  387. )
  388. assert marker not in res