2
0

test.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import filecmp
  2. import os
  3. import time
  4. from StringIO import StringIO
  5. import sys
  6. import shutil
  7. import tempfile
  8. import unittest
  9. from xattr import xattr, XATTR_NOFOLLOW
  10. from . import store
  11. from .archiver import Archiver
  12. class Test(unittest.TestCase):
  13. def setUp(self):
  14. self.archiver = Archiver()
  15. self.tmpdir = tempfile.mkdtemp()
  16. self.store_path = os.path.join(self.tmpdir, 'store')
  17. self.input_path = os.path.join(self.tmpdir, 'input')
  18. self.output_path = os.path.join(self.tmpdir, 'output')
  19. os.mkdir(self.input_path)
  20. os.mkdir(self.output_path)
  21. os.chdir(self.tmpdir)
  22. self.keychain = '/tmp/_test_dedupstore.keychain'
  23. if not os.path.exists(self.keychain):
  24. self.darc('keychain', 'generate')
  25. self.darc('init', self.store_path)
  26. def tearDown(self):
  27. shutil.rmtree(self.tmpdir)
  28. def darc(self, *args, **kwargs):
  29. exit_code = kwargs.get('exit_code', 0)
  30. args = ['--keychain', self.keychain] + list(args)
  31. try:
  32. stdout, stderr = sys.stdout, sys.stderr
  33. output = StringIO()
  34. sys.stdout = sys.stderr = output
  35. ret = self.archiver.run(args)
  36. sys.stdout, sys.stderr = stdout, stderr
  37. if ret != exit_code:
  38. print output.getvalue()
  39. self.assertEqual(exit_code, ret)
  40. return output.getvalue()
  41. finally:
  42. sys.stdout, sys.stderr = stdout, stderr
  43. def create_src_archive(self, name):
  44. src_dir = os.path.join(os.getcwd(), os.path.dirname(__file__))
  45. self.darc('create', self.store_path + '::' + name, src_dir)
  46. def create_regual_file(self, name, size=0):
  47. filename = os.path.join(self.input_path, name)
  48. if not os.path.exists(os.path.dirname(filename)):
  49. os.makedirs(os.path.dirname(filename))
  50. with open(filename, 'wb') as fd:
  51. fd.write('X' * size)
  52. def get_xattrs(self, path):
  53. try:
  54. return dict(xattr(path, XATTR_NOFOLLOW))
  55. except IOError:
  56. return {}
  57. def diff_dirs(self, dir1, dir2):
  58. diff = filecmp.dircmp(dir1, dir2)
  59. self.assertEqual(diff.left_only, [])
  60. self.assertEqual(diff.right_only, [])
  61. self.assertEqual(diff.diff_files, [])
  62. for filename in diff.common:
  63. path1 = os.path.join(dir1, filename)
  64. path2 = os.path.join(dir2, filename)
  65. s1 = os.lstat(path1)
  66. s2 = os.lstat(path2)
  67. attrs = ['st_mode', 'st_uid', 'st_gid']
  68. # We can't restore symlink atime/mtime right now
  69. if not os.path.islink(path1):
  70. attrs.append('st_mtime')
  71. d1 = [filename] + [getattr(s1, a) for a in attrs]
  72. d2 = [filename] + [getattr(s2, a) for a in attrs]
  73. d1.append(self.get_xattrs(path1))
  74. d2.append(self.get_xattrs(path2))
  75. self.assertEqual(d1, d2)
  76. def test_basic_functionality(self):
  77. self.create_regual_file('file1', size=1024*80)
  78. self.create_regual_file('dir2/file2', size=1024*80)
  79. x = xattr(os.path.join(self.input_path, 'file1'))
  80. x.set('user:foo', 'bar')
  81. os.symlink('somewhere', os.path.join(self.input_path, 'link1'))
  82. os.mkfifo(os.path.join(self.input_path, 'fifo1'))
  83. self.darc('create', self.store_path + '::test', 'input')
  84. time.sleep(1)
  85. self.darc('extract', self.store_path + '::test', 'output')
  86. self.diff_dirs('input', 'output/input')
  87. def test_corrupted_store(self):
  88. self.create_src_archive('test')
  89. self.darc('verify', self.store_path + '::test')
  90. fd = open(os.path.join(self.tmpdir, 'store', 'bands', '0', '0'), 'r+')
  91. fd.seek(100)
  92. fd.write('X')
  93. fd.close()
  94. self.darc('verify', self.store_path + '::test', exit_code=1)
  95. def suite():
  96. suite = unittest.TestSuite()
  97. suite.addTest(unittest.TestLoader().loadTestsFromTestCase(Test))
  98. suite.addTest(store.suite())
  99. return suite
  100. if __name__ == '__main__':
  101. unittest.TextTestRunner(verbosity=2).run(suite())