| 
					
				 | 
			
			
				@@ -0,0 +1,105 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import json 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import os 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import shutil 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import subprocess 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import pytest 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from .. import changedir 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from . import cmd, create_regular_file, RK_ENCRYPTION, assert_dirs_equal 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+SFTP_URL = os.environ.get("BORG_TEST_SFTP_REPO") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+S3_URL = os.environ.get("BORG_TEST_S3_REPO") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def have_rclone(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    rclone_path = shutil.which("rclone") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    if not rclone_path: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return False  # not installed 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    try: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # rclone returns JSON for core/version, e.g. {"decomposed": [1,59,2], "version": "v1.59.2"} 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        out = subprocess.check_output([rclone_path, "rc", "--loopback", "core/version"]) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        info = json.loads(out.decode("utf-8")) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    except Exception: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return False 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    try: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if info.get("decomposed", []) < [1, 57, 0]: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return False  # too old 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    except Exception: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return False 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return True  # looks good 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+@pytest.mark.skipif(not have_rclone(), reason="rclone must be installed for this test.") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def test_rclone_repo_basics(archiver, tmp_path): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    create_regular_file(archiver.input_path, "file1", size=100 * 1024) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    create_regular_file(archiver.input_path, "file2", size=10 * 1024) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    rclone_repo_dir = tmp_path / "rclone-repo" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    os.makedirs(rclone_repo_dir, exist_ok=True) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    archiver.repository_location = f"rclone:{os.fspath(rclone_repo_dir)}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    archive_name = "test-archive" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cmd(archiver, "repo-create", RK_ENCRYPTION) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cmd(archiver, "create", archive_name, "input") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    list_output = cmd(archiver, "repo-list") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert archive_name in list_output 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    archive_list_output = cmd(archiver, "list", archive_name) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert "input/file1" in archive_list_output 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert "input/file2" in archive_list_output 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    with changedir("output"): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        cmd(archiver, "extract", archive_name) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert_dirs_equal( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        archiver.input_path, os.path.join(archiver.output_path, "input"), ignore_flags=True, ignore_xattrs=True 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cmd(archiver, "delete", "-a", archive_name) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    list_output = cmd(archiver, "repo-list") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert archive_name not in list_output 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cmd(archiver, "repo-delete") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+@pytest.mark.skipif(not SFTP_URL, reason="BORG_TEST_SFTP_REPO not set.") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def test_sftp_repo_basics(archiver): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    create_regular_file(archiver.input_path, "file1", size=100 * 1024) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    create_regular_file(archiver.input_path, "file2", size=10 * 1024) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    archiver.repository_location = SFTP_URL 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    archive_name = "test-archive" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cmd(archiver, "repo-create", RK_ENCRYPTION) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cmd(archiver, "create", archive_name, "input") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    list_output = cmd(archiver, "repo-list") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert archive_name in list_output 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    archive_list_output = cmd(archiver, "list", archive_name) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert "input/file1" in archive_list_output 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert "input/file2" in archive_list_output 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    with changedir("output"): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        cmd(archiver, "extract", archive_name) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert_dirs_equal( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        archiver.input_path, os.path.join(archiver.output_path, "input"), ignore_flags=True, ignore_xattrs=True 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cmd(archiver, "delete", "-a", archive_name) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    list_output = cmd(archiver, "repo-list") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert archive_name not in list_output 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cmd(archiver, "repo-delete") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+@pytest.mark.skipif(not S3_URL, reason="BORG_TEST_S3_REPO not set.") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def test_s3_repo_basics(archiver): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    create_regular_file(archiver.input_path, "file1", size=100 * 1024) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    create_regular_file(archiver.input_path, "file2", size=10 * 1024) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    archiver.repository_location = S3_URL 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    archive_name = "test-archive" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cmd(archiver, "repo-create", RK_ENCRYPTION) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cmd(archiver, "create", archive_name, "input") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    list_output = cmd(archiver, "repo-list") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert archive_name in list_output 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    archive_list_output = cmd(archiver, "list", archive_name) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert "input/file1" in archive_list_output 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert "input/file2" in archive_list_output 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    with changedir("output"): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        cmd(archiver, "extract", archive_name) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert_dirs_equal( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        archiver.input_path, os.path.join(archiver.output_path, "input"), ignore_flags=True, ignore_xattrs=True 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cmd(archiver, "delete", "-a", archive_name) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    list_output = cmd(archiver, "repo-list") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    assert archive_name not in list_output 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    cmd(archiver, "repo-delete") 
			 |