test.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. from __future__ import with_statement
  2. import doctest
  3. import filecmp
  4. import os
  5. from StringIO import StringIO
  6. import sys
  7. import shutil
  8. import tempfile
  9. import unittest
  10. from xattr import xattr, XATTR_NOFOLLOW
  11. import getpass
  12. getpass.getpass = lambda m: 'abc123'
  13. from . import store, helpers, lrucache
  14. from .archiver import Archiver
  15. class Test(unittest.TestCase):
  16. def setUp(self):
  17. self.archiver = Archiver()
  18. self.tmpdir = tempfile.mkdtemp()
  19. self.store_path = os.path.join(self.tmpdir, 'store')
  20. self.input_path = os.path.join(self.tmpdir, 'input')
  21. self.output_path = os.path.join(self.tmpdir, 'output')
  22. os.mkdir(self.input_path)
  23. os.mkdir(self.output_path)
  24. os.chdir(self.tmpdir)
  25. self.keychain = '/tmp/_test_dedupstore.keychain'
  26. if not os.path.exists(self.keychain):
  27. self.darc('init-keychain')
  28. self.darc('init', self.store_path)
  29. def tearDown(self):
  30. shutil.rmtree(self.tmpdir)
  31. def darc(self, *args, **kwargs):
  32. exit_code = kwargs.get('exit_code', 0)
  33. args = ['--keychain', self.keychain] + list(args)
  34. try:
  35. stdout, stderr = sys.stdout, sys.stderr
  36. output = StringIO()
  37. sys.stdout = sys.stderr = output
  38. ret = self.archiver.run(args)
  39. sys.stdout, sys.stderr = stdout, stderr
  40. if ret != exit_code:
  41. print output.getvalue()
  42. self.assertEqual(exit_code, ret)
  43. return output.getvalue()
  44. finally:
  45. sys.stdout, sys.stderr = stdout, stderr
  46. def create_src_archive(self, name):
  47. src_dir = os.path.join(os.getcwd(), os.path.dirname(__file__))
  48. self.darc('create', self.store_path + '::' + name, src_dir)
  49. def create_regual_file(self, name, size=0):
  50. filename = os.path.join(self.input_path, name)
  51. if not os.path.exists(os.path.dirname(filename)):
  52. os.makedirs(os.path.dirname(filename))
  53. with open(filename, 'wb') as fd:
  54. fd.write('X' * size)
  55. def get_xattrs(self, path):
  56. try:
  57. return dict(xattr(path, XATTR_NOFOLLOW))
  58. except IOError:
  59. return {}
  60. def diff_dirs(self, dir1, dir2):
  61. diff = filecmp.dircmp(dir1, dir2)
  62. self.assertEqual(diff.left_only, [])
  63. self.assertEqual(diff.right_only, [])
  64. self.assertEqual(diff.diff_files, [])
  65. for filename in diff.common:
  66. path1 = os.path.join(dir1, filename)
  67. path2 = os.path.join(dir2, filename)
  68. s1 = os.lstat(path1)
  69. s2 = os.lstat(path2)
  70. attrs = ['st_mode', 'st_uid', 'st_gid']
  71. # We can't restore symlink atime/mtime right now
  72. if not os.path.islink(path1):
  73. attrs.append('st_mtime')
  74. d1 = [filename] + [getattr(s1, a) for a in attrs]
  75. d2 = [filename] + [getattr(s2, a) for a in attrs]
  76. d1.append(self.get_xattrs(path1))
  77. d2.append(self.get_xattrs(path2))
  78. self.assertEqual(d1, d2)
  79. def test_basic_functionality(self):
  80. self.create_regual_file('file1', size=1024*80)
  81. self.create_regual_file('dir2/file2', size=1024*80)
  82. x = xattr(os.path.join(self.input_path, 'file1'))
  83. x.set('user.foo', 'bar')
  84. os.link(os.path.join(self.input_path, 'file1'),
  85. os.path.join(self.input_path, 'hardlink'))
  86. os.symlink('somewhere', os.path.join(self.input_path, 'link1'))
  87. os.mkfifo(os.path.join(self.input_path, 'fifo1'))
  88. self.darc('create', self.store_path + '::test', 'input')
  89. self.darc('create', self.store_path + '::test.2', 'input')
  90. self.darc('extract', self.store_path + '::test', 'output')
  91. self.diff_dirs('input', 'output/input')
  92. def test_corrupted_store(self):
  93. self.create_src_archive('test')
  94. self.darc('verify', self.store_path + '::test')
  95. fd = open(os.path.join(self.tmpdir, 'store', 'bands', '0', '0'), 'r+')
  96. fd.seek(100)
  97. fd.write('X')
  98. fd.close()
  99. self.darc('verify', self.store_path + '::test', exit_code=1)
  100. def test_keychain(self):
  101. keychain = os.path.join(self.tmpdir, 'keychain')
  102. keychain2 = os.path.join(self.tmpdir, 'keychain2')
  103. self.darc('-k', keychain, 'init-keychain')
  104. self.darc('-k', keychain, 'change-password')
  105. self.darc('-k', keychain, 'export-restricted', keychain2)
  106. def suite():
  107. suite = unittest.TestSuite()
  108. suite.addTest(unittest.TestLoader().loadTestsFromTestCase(Test))
  109. suite.addTest(store.suite())
  110. suite.addTest(doctest.DocTestSuite(helpers))
  111. suite.addTest(lrucache.suite())
  112. return suite
  113. if __name__ == '__main__':
  114. unittest.TextTestRunner(verbosity=2).run(suite())