test.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import doctest
  2. import filecmp
  3. import os
  4. from StringIO import StringIO
  5. import sys
  6. import shutil
  7. import tempfile
  8. import unittest
  9. from xattr import xattr, XATTR_NOFOLLOW
  10. import getpass
  11. getpass.getpass = lambda m: 'abc123'
  12. from . import store, helpers, lrucache
  13. from .archiver import Archiver
  14. class Test(unittest.TestCase):
  15. def setUp(self):
  16. self.archiver = Archiver()
  17. self.tmpdir = tempfile.mkdtemp()
  18. self.store_path = os.path.join(self.tmpdir, 'store')
  19. self.input_path = os.path.join(self.tmpdir, 'input')
  20. self.output_path = os.path.join(self.tmpdir, 'output')
  21. os.mkdir(self.input_path)
  22. os.mkdir(self.output_path)
  23. os.chdir(self.tmpdir)
  24. self.keychain = '/tmp/_test_dedupstore.keychain'
  25. if not os.path.exists(self.keychain):
  26. self.darc('init-keychain')
  27. self.darc('init', self.store_path)
  28. def tearDown(self):
  29. shutil.rmtree(self.tmpdir)
  30. def darc(self, *args, **kwargs):
  31. exit_code = kwargs.get('exit_code', 0)
  32. args = ['--keychain', self.keychain] + list(args)
  33. try:
  34. stdout, stderr = sys.stdout, sys.stderr
  35. output = StringIO()
  36. sys.stdout = sys.stderr = output
  37. ret = self.archiver.run(args)
  38. sys.stdout, sys.stderr = stdout, stderr
  39. if ret != exit_code:
  40. print output.getvalue()
  41. self.assertEqual(exit_code, ret)
  42. return output.getvalue()
  43. finally:
  44. sys.stdout, sys.stderr = stdout, stderr
  45. def create_src_archive(self, name):
  46. src_dir = os.path.join(os.getcwd(), os.path.dirname(__file__))
  47. self.darc('create', self.store_path + '::' + name, src_dir)
  48. def create_regual_file(self, name, size=0):
  49. filename = os.path.join(self.input_path, name)
  50. if not os.path.exists(os.path.dirname(filename)):
  51. os.makedirs(os.path.dirname(filename))
  52. with open(filename, 'wb') as fd:
  53. fd.write('X' * size)
  54. def get_xattrs(self, path):
  55. try:
  56. return dict(xattr(path, XATTR_NOFOLLOW))
  57. except IOError:
  58. return {}
  59. def diff_dirs(self, dir1, dir2):
  60. diff = filecmp.dircmp(dir1, dir2)
  61. self.assertEqual(diff.left_only, [])
  62. self.assertEqual(diff.right_only, [])
  63. self.assertEqual(diff.diff_files, [])
  64. for filename in diff.common:
  65. path1 = os.path.join(dir1, filename)
  66. path2 = os.path.join(dir2, filename)
  67. s1 = os.lstat(path1)
  68. s2 = os.lstat(path2)
  69. attrs = ['st_mode', 'st_uid', 'st_gid']
  70. # We can't restore symlink atime/mtime right now
  71. if not os.path.islink(path1):
  72. attrs.append('st_mtime')
  73. d1 = [filename] + [getattr(s1, a) for a in attrs]
  74. d2 = [filename] + [getattr(s2, a) for a in attrs]
  75. d1.append(self.get_xattrs(path1))
  76. d2.append(self.get_xattrs(path2))
  77. self.assertEqual(d1, d2)
  78. def test_basic_functionality(self):
  79. self.create_regual_file('file1', size=1024*80)
  80. self.create_regual_file('dir2/file2', size=1024*80)
  81. x = xattr(os.path.join(self.input_path, 'file1'))
  82. x.set('user.foo', 'bar')
  83. os.link(os.path.join(self.input_path, 'file1'),
  84. os.path.join(self.input_path, 'hardlink'))
  85. os.symlink('somewhere', os.path.join(self.input_path, 'link1'))
  86. os.mkfifo(os.path.join(self.input_path, 'fifo1'))
  87. self.darc('create', self.store_path + '::test', 'input')
  88. self.darc('create', self.store_path + '::test.2', 'input')
  89. self.darc('extract', self.store_path + '::test', 'output')
  90. self.diff_dirs('input', 'output/input')
  91. def test_corrupted_store(self):
  92. self.create_src_archive('test')
  93. self.darc('verify', self.store_path + '::test')
  94. fd = open(os.path.join(self.tmpdir, 'store', 'bands', '0', '0'), 'r+')
  95. fd.seek(100)
  96. fd.write('X')
  97. fd.close()
  98. self.darc('verify', self.store_path + '::test', exit_code=1)
  99. def test_keychain(self):
  100. keychain = os.path.join(self.tmpdir, 'keychain')
  101. keychain2 = os.path.join(self.tmpdir, 'keychain2')
  102. self.darc('-k', keychain, 'init-keychain')
  103. self.darc('-k', keychain, 'change-password')
  104. self.darc('-k', keychain, 'export-restricted', keychain2)
  105. def suite():
  106. suite = unittest.TestSuite()
  107. suite.addTest(unittest.TestLoader().loadTestsFromTestCase(Test))
  108. suite.addTest(store.suite())
  109. suite.addTest(doctest.DocTestSuite(helpers))
  110. suite.addTest(lrucache.suite())
  111. return suite
  112. if __name__ == '__main__':
  113. unittest.TextTestRunner(verbosity=2).run(suite())