|
@@ -1,5 +1,6 @@
|
|
|
from binascii import hexlify
|
|
|
from configparser import RawConfigParser
|
|
|
+import errno
|
|
|
import os
|
|
|
from io import StringIO
|
|
|
import stat
|
|
@@ -70,11 +71,15 @@ class environment_variable:
|
|
|
else:
|
|
|
os.environ[k] = v
|
|
|
|
|
|
-
|
|
|
-def exec_cmd(*args, archiver=None, fork=False, **kw):
|
|
|
+def exec_cmd(*args, archiver=None, fork=False, exe=None, **kw):
|
|
|
if fork:
|
|
|
try:
|
|
|
- borg = (sys.executable, '-m', 'borg.archiver')
|
|
|
+ if exe is None:
|
|
|
+ borg = (sys.executable, '-m', 'borg.archiver')
|
|
|
+ elif isinstance(exe, str):
|
|
|
+ borg = (exe, )
|
|
|
+ elif not isinstance(exe, tuple):
|
|
|
+ raise ValueError('exe must be None, a tuple or a str')
|
|
|
output = subprocess.check_output(borg + args)
|
|
|
ret = 0
|
|
|
except subprocess.CalledProcessError as e:
|
|
@@ -94,13 +99,37 @@ def exec_cmd(*args, archiver=None, fork=False, **kw):
|
|
|
sys.stdin, sys.stdout, sys.stderr = stdin, stdout, stderr
|
|
|
|
|
|
|
|
|
-class ArchiverTestCaseBase(BaseTestCase):
|
|
|
+# check if the binary "borg.exe" is available
|
|
|
+try:
|
|
|
+ exec_cmd('help', exe='borg.exe', fork=True)
|
|
|
+ BORG_EXES = ['python', 'binary', ]
|
|
|
+except (IOError, OSError) as err:
|
|
|
+ if err.errno != errno.ENOENT:
|
|
|
+ raise
|
|
|
+ BORG_EXES = ['python', ]
|
|
|
+
|
|
|
+
|
|
|
+@pytest.fixture(params=BORG_EXES)
|
|
|
+def cmd(request):
|
|
|
+ if request.param == 'python':
|
|
|
+ exe = None
|
|
|
+ elif request.param == 'binary':
|
|
|
+ exe = 'borg.exe'
|
|
|
+ else:
|
|
|
+ raise ValueError("param must be 'python' or 'binary'")
|
|
|
+ def exec_fn(*args, **kw):
|
|
|
+ return exec_cmd(*args, exe=exe, fork=True, **kw)
|
|
|
+ return exec_fn
|
|
|
|
|
|
+
|
|
|
+class ArchiverTestCaseBase(BaseTestCase):
|
|
|
+ EXE = None # python source based
|
|
|
+ FORK_DEFAULT = False
|
|
|
prefix = ''
|
|
|
|
|
|
def setUp(self):
|
|
|
os.environ['BORG_CHECK_I_KNOW_WHAT_I_AM_DOING'] = '1'
|
|
|
- self.archiver = Archiver()
|
|
|
+ self.archiver = not self.FORK_DEFAULT and Archiver() or None
|
|
|
self.tmpdir = tempfile.mkdtemp()
|
|
|
self.repository_path = os.path.join(self.tmpdir, 'repository')
|
|
|
self.repository_location = self.prefix + self.repository_path
|
|
@@ -126,7 +155,10 @@ class ArchiverTestCaseBase(BaseTestCase):
|
|
|
|
|
|
def cmd(self, *args, **kw):
|
|
|
exit_code = kw.pop('exit_code', 0)
|
|
|
- ret, output = exec_cmd(*args, archiver=self.archiver, **kw)
|
|
|
+ fork = kw.pop('fork', None)
|
|
|
+ if fork is None:
|
|
|
+ fork = self.FORK_DEFAULT
|
|
|
+ ret, output = exec_cmd(*args, fork=fork, exe=self.EXE, archiver=self.archiver, **kw)
|
|
|
if ret != exit_code:
|
|
|
print(output)
|
|
|
self.assert_equal(ret, exit_code)
|
|
@@ -305,7 +337,10 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
self.cmd('init', '--encryption=none', self.repository_location)
|
|
|
self._set_repository_id(self.repository_path, repository_id)
|
|
|
self.assert_equal(repository_id, self._extract_repository_id(self.repository_path))
|
|
|
- self.assert_raises(Cache.EncryptionMethodMismatch, lambda: self.cmd('create', self.repository_location + '::test.2', 'input'))
|
|
|
+ if self.FORK_DEFAULT:
|
|
|
+ self.cmd('create', self.repository_location + '::test.2', 'input', exit_code=1) # fails
|
|
|
+ else:
|
|
|
+ self.assert_raises(Cache.EncryptionMethodMismatch, lambda: self.cmd('create', self.repository_location + '::test.2', 'input'))
|
|
|
|
|
|
def test_repository_swap_detection2(self):
|
|
|
self.create_test_files()
|
|
@@ -315,7 +350,10 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
self.cmd('create', self.repository_location + '_encrypted::test', 'input')
|
|
|
shutil.rmtree(self.repository_path + '_encrypted')
|
|
|
os.rename(self.repository_path + '_unencrypted', self.repository_path + '_encrypted')
|
|
|
- self.assert_raises(Cache.RepositoryAccessAborted, lambda: self.cmd('create', self.repository_location + '_encrypted::test.2', 'input'))
|
|
|
+ if self.FORK_DEFAULT:
|
|
|
+ self.cmd('create', self.repository_location + '_encrypted::test.2', 'input', exit_code=1) # fails
|
|
|
+ else:
|
|
|
+ self.assert_raises(Cache.RepositoryAccessAborted, lambda: self.cmd('create', self.repository_location + '_encrypted::test.2', 'input'))
|
|
|
|
|
|
def test_strip_components(self):
|
|
|
self.cmd('init', self.repository_location)
|
|
@@ -540,8 +578,12 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
self.assert_in('bar-2015-08-12-20:00', output)
|
|
|
|
|
|
def test_usage(self):
|
|
|
- self.assert_raises(SystemExit, lambda: self.cmd())
|
|
|
- self.assert_raises(SystemExit, lambda: self.cmd('-h'))
|
|
|
+ if self.FORK_DEFAULT:
|
|
|
+ self.cmd(exit_code=0)
|
|
|
+ self.cmd('-h', exit_code=0)
|
|
|
+ else:
|
|
|
+ self.assert_raises(SystemExit, lambda: self.cmd())
|
|
|
+ self.assert_raises(SystemExit, lambda: self.cmd('-h'))
|
|
|
|
|
|
def test_help(self):
|
|
|
assert 'Borg' in self.cmd('help')
|
|
@@ -628,6 +670,12 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
self.verify_aes_counter_uniqueness('passphrase')
|
|
|
|
|
|
|
|
|
+@unittest.skipUnless('binary' in BORG_EXES, 'no borg.exe available')
|
|
|
+class ArchiverTestCaseBinary(ArchiverTestCase):
|
|
|
+ EXE = 'borg.exe'
|
|
|
+ FORK_DEFAULT = True
|
|
|
+
|
|
|
+
|
|
|
class ArchiverCheckTestCase(ArchiverTestCaseBase):
|
|
|
|
|
|
def setUp(self):
|