|
@@ -1,14 +1,8 @@
|
|
|
import os
|
|
|
+import tarfile
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
-try:
|
|
|
- import attic.repository
|
|
|
- import attic.key
|
|
|
- import attic.helpers
|
|
|
-except ImportError:
|
|
|
- attic = None
|
|
|
-
|
|
|
from ..constants import * # NOQA
|
|
|
from ..crypto.key import KeyfileKey
|
|
|
from ..upgrader import AtticRepositoryUpgrader, AtticKeyfileKey
|
|
@@ -17,6 +11,28 @@ from ..repository import Repository
|
|
|
from . import are_hardlinks_supported
|
|
|
|
|
|
|
|
|
+# tar with a repo and repo keyfile from attic
|
|
|
+ATTIC_TAR = os.path.join(os.path.dirname(__file__), 'attic.tar.gz')
|
|
|
+
|
|
|
+
|
|
|
+def untar(tarfname, path, what):
|
|
|
+ """
|
|
|
+ extract <tarfname> tar archive to <path>, all stuff starting with <what>.
|
|
|
+
|
|
|
+ return path to <what>.
|
|
|
+ """
|
|
|
+
|
|
|
+ def files(members):
|
|
|
+ for tarinfo in members:
|
|
|
+ if tarinfo.name.startswith(what):
|
|
|
+ yield tarinfo
|
|
|
+
|
|
|
+ with tarfile.open(tarfname, 'r') as tf:
|
|
|
+ tf.extractall(path, members=files(tf))
|
|
|
+
|
|
|
+ return os.path.join(path, what)
|
|
|
+
|
|
|
+
|
|
|
def repo_valid(path):
|
|
|
"""
|
|
|
utility function to check if borg can open a repository
|
|
@@ -48,15 +64,10 @@ def attic_repo(tmpdir):
|
|
|
create an attic repo with some stuff in it
|
|
|
|
|
|
:param tmpdir: path to the repository to be created
|
|
|
- :returns: a attic.repository.Repository object
|
|
|
+ :returns: path to attic repository
|
|
|
"""
|
|
|
- attic_repo = attic.repository.Repository(str(tmpdir), create=True)
|
|
|
- # throw some stuff in that repo, copied from `RepositoryTestCase.test1`
|
|
|
- for x in range(100):
|
|
|
- attic_repo.put(('%-32d' % x).encode('ascii'), b'SOMEDATA')
|
|
|
- attic_repo.commit()
|
|
|
- attic_repo.close()
|
|
|
- return attic_repo
|
|
|
+ # there is some stuff in that repo, copied from `RepositoryTestCase.test1`
|
|
|
+ return untar(ATTIC_TAR, str(tmpdir), 'repo')
|
|
|
|
|
|
|
|
|
@pytest.fixture(params=[True, False])
|
|
@@ -64,52 +75,36 @@ def inplace(request):
|
|
|
return request.param
|
|
|
|
|
|
|
|
|
-@pytest.mark.skipif(attic is None, reason='cannot find an attic install')
|
|
|
-def test_convert_segments(tmpdir, attic_repo, inplace):
|
|
|
+def test_convert_segments(attic_repo, inplace):
|
|
|
"""test segment conversion
|
|
|
|
|
|
this will load the given attic repository, list all the segments
|
|
|
then convert them one at a time. we need to close the repo before
|
|
|
conversion otherwise we have errors from borg
|
|
|
|
|
|
- :param tmpdir: a temporary directory to run the test in (builtin
|
|
|
- fixture)
|
|
|
:param attic_repo: a populated attic repository (fixture)
|
|
|
"""
|
|
|
+ repo_path = attic_repo
|
|
|
# check should fail because of magic number
|
|
|
- assert not repo_valid(tmpdir)
|
|
|
- repository = AtticRepositoryUpgrader(str(tmpdir), create=False)
|
|
|
+ assert not repo_valid(repo_path)
|
|
|
+ repository = AtticRepositoryUpgrader(repo_path, create=False)
|
|
|
with repository:
|
|
|
segments = [filename for i, filename in repository.io.segment_iterator()]
|
|
|
repository.convert_segments(segments, dryrun=False, inplace=inplace)
|
|
|
repository.convert_cache(dryrun=False)
|
|
|
- assert repo_valid(tmpdir)
|
|
|
-
|
|
|
-
|
|
|
-class MockArgs:
|
|
|
- """
|
|
|
- mock attic location
|
|
|
-
|
|
|
- this is used to simulate a key location with a properly loaded
|
|
|
- repository object to create a key file
|
|
|
- """
|
|
|
- def __init__(self, path):
|
|
|
- self.repository = attic.helpers.Location(path)
|
|
|
+ assert repo_valid(repo_path)
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
-def attic_key_file(attic_repo, tmpdir, monkeypatch):
|
|
|
+def attic_key_file(tmpdir, monkeypatch):
|
|
|
"""
|
|
|
create an attic key file from the given repo, in the keys
|
|
|
subdirectory of the given tmpdir
|
|
|
|
|
|
- :param attic_repo: an attic.repository.Repository object (fixture
|
|
|
- define above)
|
|
|
:param tmpdir: a temporary directory (a builtin fixture)
|
|
|
- :returns: the KeyfileKey object as returned by
|
|
|
- attic.key.KeyfileKey.create()
|
|
|
+ :returns: path to key file
|
|
|
"""
|
|
|
- keys_dir = str(tmpdir.mkdir('keys'))
|
|
|
+ keys_dir = untar(ATTIC_TAR, str(tmpdir), 'keys')
|
|
|
|
|
|
# we use the repo dir for the created keyfile, because we do
|
|
|
# not want to clutter existing keyfiles
|
|
@@ -120,44 +115,42 @@ def attic_key_file(attic_repo, tmpdir, monkeypatch):
|
|
|
# about anyways. in real runs, the original key will be retained.
|
|
|
monkeypatch.setenv('BORG_KEYS_DIR', keys_dir)
|
|
|
monkeypatch.setenv('ATTIC_PASSPHRASE', 'test')
|
|
|
- return attic.key.KeyfileKey.create(attic_repo,
|
|
|
- MockArgs(keys_dir))
|
|
|
+
|
|
|
+ return os.path.join(keys_dir, 'repo')
|
|
|
|
|
|
|
|
|
-@pytest.mark.skipif(attic is None, reason='cannot find an attic install')
|
|
|
-def test_keys(tmpdir, attic_repo, attic_key_file):
|
|
|
+def test_keys(attic_repo, attic_key_file):
|
|
|
"""test key conversion
|
|
|
|
|
|
test that we can convert the given key to a properly formatted
|
|
|
borg key. assumes that the ATTIC_KEYS_DIR and BORG_KEYS_DIR have
|
|
|
been properly populated by the attic_key_file fixture.
|
|
|
|
|
|
- :param tmpdir: a temporary directory (a builtin fixture)
|
|
|
- :param attic_repo: an attic.repository.Repository object (fixture
|
|
|
- define above)
|
|
|
- :param attic_key_file: an attic.key.KeyfileKey (fixture created above)
|
|
|
+ :param attic_repo: path to an attic repository (fixture defined above)
|
|
|
+ :param attic_key_file: path to an attic key file (fixture defined above)
|
|
|
"""
|
|
|
- with AtticRepositoryUpgrader(str(tmpdir), create=False) as repository:
|
|
|
+ keyfile_path = attic_key_file
|
|
|
+ assert not key_valid(keyfile_path) # not upgraded yet
|
|
|
+ with AtticRepositoryUpgrader(attic_repo, create=False) as repository:
|
|
|
keyfile = AtticKeyfileKey.find_key_file(repository)
|
|
|
AtticRepositoryUpgrader.convert_keyfiles(keyfile, dryrun=False)
|
|
|
- assert key_valid(attic_key_file.path)
|
|
|
+ assert key_valid(keyfile_path)
|
|
|
|
|
|
|
|
|
-@pytest.mark.skipif(attic is None, reason='cannot find an attic install')
|
|
|
-def test_convert_all(tmpdir, attic_repo, attic_key_file, inplace):
|
|
|
+def test_convert_all(attic_repo, attic_key_file, inplace):
|
|
|
"""test all conversion steps
|
|
|
|
|
|
this runs everything. mostly redundant test, since everything is
|
|
|
done above. yet we expect a NotImplementedError because we do not
|
|
|
convert caches yet.
|
|
|
|
|
|
- :param tmpdir: a temporary directory (a builtin fixture)
|
|
|
- :param attic_repo: an attic.repository.Repository object (fixture
|
|
|
- define above)
|
|
|
- :param attic_key_file: an attic.key.KeyfileKey (fixture created above)
|
|
|
+ :param attic_repo: path to an attic repository (fixture defined above)
|
|
|
+ :param attic_key_file: path to an attic key file (fixture defined above)
|
|
|
"""
|
|
|
+ repo_path = attic_repo
|
|
|
+
|
|
|
# check should fail because of magic number
|
|
|
- assert not repo_valid(tmpdir)
|
|
|
+ assert not repo_valid(repo_path)
|
|
|
|
|
|
def stat_segment(path):
|
|
|
return os.stat(os.path.join(path, 'data', '0', '0'))
|
|
@@ -165,8 +158,8 @@ def test_convert_all(tmpdir, attic_repo, attic_key_file, inplace):
|
|
|
def first_inode(path):
|
|
|
return stat_segment(path).st_ino
|
|
|
|
|
|
- orig_inode = first_inode(attic_repo.path)
|
|
|
- with AtticRepositoryUpgrader(str(tmpdir), create=False) as repository:
|
|
|
+ orig_inode = first_inode(repo_path)
|
|
|
+ with AtticRepositoryUpgrader(repo_path, create=False) as repository:
|
|
|
# replicate command dispatch, partly
|
|
|
os.umask(UMASK_DEFAULT)
|
|
|
backup = repository.upgrade(dryrun=False, inplace=inplace)
|
|
@@ -181,8 +174,8 @@ def test_convert_all(tmpdir, attic_repo, attic_key_file, inplace):
|
|
|
if 'BORG_TESTS_IGNORE_MODES' not in os.environ:
|
|
|
assert stat_segment(backup).st_mode & UMASK_DEFAULT == 0
|
|
|
|
|
|
- assert key_valid(attic_key_file.path)
|
|
|
- assert repo_valid(tmpdir)
|
|
|
+ assert key_valid(attic_key_file)
|
|
|
+ assert repo_valid(repo_path)
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not are_hardlinks_supported(), reason='hardlinks not supported')
|