|
@@ -123,12 +123,13 @@ def cmd(request):
|
|
|
|
|
|
|
|
|
class ArchiverTestCaseBase(BaseTestCase):
|
|
|
-
|
|
|
+ EXE = None # python source based
|
|
|
+ FORK_DEFAULT = False
|
|
|
prefix = ''
|
|
|
|
|
|
def setUp(self):
|
|
|
os.environ['BORG_CHECK_I_KNOW_WHAT_I_AM_DOING'] = '1'
|
|
|
- self.archiver = Archiver()
|
|
|
+ self.archiver = not self.FORK_DEFAULT and Archiver() or None
|
|
|
self.tmpdir = tempfile.mkdtemp()
|
|
|
self.repository_path = os.path.join(self.tmpdir, 'repository')
|
|
|
self.repository_location = self.prefix + self.repository_path
|
|
@@ -154,7 +155,10 @@ class ArchiverTestCaseBase(BaseTestCase):
|
|
|
|
|
|
def cmd(self, *args, **kw):
|
|
|
exit_code = kw.pop('exit_code', 0)
|
|
|
- ret, output = exec_cmd(*args, archiver=self.archiver, **kw)
|
|
|
+ fork = kw.pop('fork', None)
|
|
|
+ if fork is None:
|
|
|
+ fork = self.FORK_DEFAULT
|
|
|
+ ret, output = exec_cmd(*args, fork=fork, exe=self.EXE, archiver=self.archiver, **kw)
|
|
|
if ret != exit_code:
|
|
|
print(output)
|
|
|
self.assert_equal(ret, exit_code)
|
|
@@ -333,7 +337,10 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
self.cmd('init', '--encryption=none', self.repository_location)
|
|
|
self._set_repository_id(self.repository_path, repository_id)
|
|
|
self.assert_equal(repository_id, self._extract_repository_id(self.repository_path))
|
|
|
- self.assert_raises(Cache.EncryptionMethodMismatch, lambda: self.cmd('create', self.repository_location + '::test.2', 'input'))
|
|
|
+ if self.FORK_DEFAULT:
|
|
|
+ self.cmd('create', self.repository_location + '::test.2', 'input', exit_code=1) # fails
|
|
|
+ else:
|
|
|
+ self.assert_raises(Cache.EncryptionMethodMismatch, lambda: self.cmd('create', self.repository_location + '::test.2', 'input'))
|
|
|
|
|
|
def test_repository_swap_detection2(self):
|
|
|
self.create_test_files()
|
|
@@ -343,7 +350,10 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
self.cmd('create', self.repository_location + '_encrypted::test', 'input')
|
|
|
shutil.rmtree(self.repository_path + '_encrypted')
|
|
|
os.rename(self.repository_path + '_unencrypted', self.repository_path + '_encrypted')
|
|
|
- self.assert_raises(Cache.RepositoryAccessAborted, lambda: self.cmd('create', self.repository_location + '_encrypted::test.2', 'input'))
|
|
|
+ if self.FORK_DEFAULT:
|
|
|
+ self.cmd('create', self.repository_location + '_encrypted::test.2', 'input', exit_code=1) # fails
|
|
|
+ else:
|
|
|
+ self.assert_raises(Cache.RepositoryAccessAborted, lambda: self.cmd('create', self.repository_location + '_encrypted::test.2', 'input'))
|
|
|
|
|
|
def test_strip_components(self):
|
|
|
self.cmd('init', self.repository_location)
|
|
@@ -568,8 +578,12 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
self.assert_in('bar-2015-08-12-20:00', output)
|
|
|
|
|
|
def test_usage(self):
|
|
|
- self.assert_raises(SystemExit, lambda: self.cmd())
|
|
|
- self.assert_raises(SystemExit, lambda: self.cmd('-h'))
|
|
|
+ if self.FORK_DEFAULT:
|
|
|
+ self.cmd(exit_code=0)
|
|
|
+ self.cmd('-h', exit_code=0)
|
|
|
+ else:
|
|
|
+ self.assert_raises(SystemExit, lambda: self.cmd())
|
|
|
+ self.assert_raises(SystemExit, lambda: self.cmd('-h'))
|
|
|
|
|
|
def test_help(self):
|
|
|
assert 'Borg' in self.cmd('help')
|
|
@@ -656,6 +670,12 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
|
|
self.verify_aes_counter_uniqueness('passphrase')
|
|
|
|
|
|
|
|
|
+@unittest.skipUnless('binary' in BORG_EXES, 'no borg.exe available')
|
|
|
+class ArchiverTestCaseBinary(ArchiverTestCase):
|
|
|
+ EXE = 'borg.exe'
|
|
|
+ FORK_DEFAULT = True
|
|
|
+
|
|
|
+
|
|
|
class ArchiverCheckTestCase(ArchiverTestCaseBase):
|
|
|
|
|
|
def setUp(self):
|