test.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import filecmp
  2. import os
  3. from StringIO import StringIO
  4. import sys
  5. import shutil
  6. import tempfile
  7. import unittest
  8. from xattr import xattr, XATTR_NOFOLLOW
  9. from . import store
  10. from .archiver import Archiver
  11. class Test(unittest.TestCase):
  12. def setUp(self):
  13. self.archiver = Archiver()
  14. self.tmpdir = tempfile.mkdtemp()
  15. self.store_path = os.path.join(self.tmpdir, 'store')
  16. self.input_path = os.path.join(self.tmpdir, 'input')
  17. self.output_path = os.path.join(self.tmpdir, 'output')
  18. os.mkdir(self.input_path)
  19. os.mkdir(self.output_path)
  20. os.chdir(self.tmpdir)
  21. self.keychain = '/tmp/_test_dedupstore.keychain'
  22. if not os.path.exists(self.keychain):
  23. self.darc('init-keychain')
  24. self.darc('init', self.store_path)
  25. def tearDown(self):
  26. shutil.rmtree(self.tmpdir)
  27. def darc(self, *args, **kwargs):
  28. exit_code = kwargs.get('exit_code', 0)
  29. args = ['--keychain', self.keychain] + list(args)
  30. try:
  31. stdout, stderr = sys.stdout, sys.stderr
  32. output = StringIO()
  33. sys.stdout = sys.stderr = output
  34. ret = self.archiver.run(args)
  35. sys.stdout, sys.stderr = stdout, stderr
  36. if ret != exit_code:
  37. print output.getvalue()
  38. self.assertEqual(exit_code, ret)
  39. return output.getvalue()
  40. finally:
  41. sys.stdout, sys.stderr = stdout, stderr
  42. def create_src_archive(self, name):
  43. src_dir = os.path.join(os.getcwd(), os.path.dirname(__file__))
  44. self.darc('create', self.store_path + '::' + name, src_dir)
  45. def create_regual_file(self, name, size=0):
  46. filename = os.path.join(self.input_path, name)
  47. if not os.path.exists(os.path.dirname(filename)):
  48. os.makedirs(os.path.dirname(filename))
  49. with open(filename, 'wb') as fd:
  50. fd.write('X' * size)
  51. def get_xattrs(self, path):
  52. try:
  53. return dict(xattr(path, XATTR_NOFOLLOW))
  54. except IOError:
  55. return {}
  56. def diff_dirs(self, dir1, dir2):
  57. diff = filecmp.dircmp(dir1, dir2)
  58. self.assertEqual(diff.left_only, [])
  59. self.assertEqual(diff.right_only, [])
  60. self.assertEqual(diff.diff_files, [])
  61. for filename in diff.common:
  62. path1 = os.path.join(dir1, filename)
  63. path2 = os.path.join(dir2, filename)
  64. s1 = os.lstat(path1)
  65. s2 = os.lstat(path2)
  66. attrs = ['st_mode', 'st_uid', 'st_gid']
  67. # We can't restore symlink atime/mtime right now
  68. if not os.path.islink(path1):
  69. attrs.append('st_mtime')
  70. d1 = [filename] + [getattr(s1, a) for a in attrs]
  71. d2 = [filename] + [getattr(s2, a) for a in attrs]
  72. d1.append(self.get_xattrs(path1))
  73. d2.append(self.get_xattrs(path2))
  74. self.assertEqual(d1, d2)
  75. def test_basic_functionality(self):
  76. self.create_regual_file('file1', size=1024*80)
  77. self.create_regual_file('dir2/file2', size=1024*80)
  78. x = xattr(os.path.join(self.input_path, 'file1'))
  79. x.set('user:foo', 'bar')
  80. os.symlink('somewhere', os.path.join(self.input_path, 'link1'))
  81. os.mkfifo(os.path.join(self.input_path, 'fifo1'))
  82. self.darc('create', self.store_path + '::test', 'input')
  83. self.darc('extract', self.store_path + '::test', 'output')
  84. self.diff_dirs('input', 'output/input')
  85. def test_corrupted_store(self):
  86. self.create_src_archive('test')
  87. self.darc('verify', self.store_path + '::test')
  88. fd = open(os.path.join(self.tmpdir, 'store', 'bands', '0', '0'), 'r+')
  89. fd.seek(100)
  90. fd.write('X')
  91. fd.close()
  92. self.darc('verify', self.store_path + '::test', exit_code=1)
  93. def test_keychain(self):
  94. keychain = os.path.join(self.tmpdir, 'keychain')
  95. self.darc('-k', keychain, 'init-keychain')
  96. def suite():
  97. suite = unittest.TestSuite()
  98. suite.addTest(unittest.TestLoader().loadTestsFromTestCase(Test))
  99. suite.addTest(store.suite())
  100. return suite
  101. if __name__ == '__main__':
  102. unittest.TextTestRunner(verbosity=2).run(suite())