test.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. from __future__ import with_statement
  2. import doctest
  3. import filecmp
  4. import os
  5. from StringIO import StringIO
  6. import sys
  7. import shutil
  8. import tempfile
  9. import unittest
  10. from xattr import xattr, XATTR_NOFOLLOW
  11. import getpass
  12. getpass.getpass = lambda m: 'abc123'
  13. from . import store, helpers, lrucache
  14. from .archiver import Archiver
  15. class Test(unittest.TestCase):
  16. def setUp(self):
  17. self.archiver = Archiver()
  18. self.tmpdir = tempfile.mkdtemp()
  19. self.store_path = os.path.join(self.tmpdir, 'store')
  20. self.input_path = os.path.join(self.tmpdir, 'input')
  21. self.output_path = os.path.join(self.tmpdir, 'output')
  22. self.keys_path = os.path.join(self.tmpdir, 'keys')
  23. self.cache_path = os.path.join(self.tmpdir, 'cache')
  24. os.environ['DARC_KEYS_DIR'] = self.keys_path
  25. os.environ['DARC_CACHE_DIR'] = self.cache_path
  26. os.mkdir(self.input_path)
  27. os.mkdir(self.output_path)
  28. os.mkdir(self.keys_path)
  29. os.mkdir(self.cache_path)
  30. os.chdir(self.tmpdir)
  31. def tearDown(self):
  32. shutil.rmtree(self.tmpdir)
  33. def darc(self, *args, **kwargs):
  34. exit_code = kwargs.get('exit_code', 0)
  35. args = list(args)
  36. try:
  37. stdout, stderr = sys.stdout, sys.stderr
  38. output = StringIO()
  39. sys.stdout = sys.stderr = output
  40. ret = self.archiver.run(args)
  41. sys.stdout, sys.stderr = stdout, stderr
  42. if ret != exit_code:
  43. print output.getvalue()
  44. self.assertEqual(exit_code, ret)
  45. return output.getvalue()
  46. finally:
  47. sys.stdout, sys.stderr = stdout, stderr
  48. def create_src_archive(self, name):
  49. src_dir = os.path.join(os.getcwd(), os.path.dirname(__file__))
  50. self.darc('init', '--password', '', self.store_path)
  51. self.darc('create', self.store_path + '::' + name, src_dir)
  52. def create_regual_file(self, name, size=0):
  53. filename = os.path.join(self.input_path, name)
  54. if not os.path.exists(os.path.dirname(filename)):
  55. os.makedirs(os.path.dirname(filename))
  56. with open(filename, 'wb') as fd:
  57. fd.write('X' * size)
  58. def get_xattrs(self, path):
  59. try:
  60. return dict(xattr(path, XATTR_NOFOLLOW))
  61. except IOError:
  62. return {}
  63. def diff_dirs(self, dir1, dir2):
  64. diff = filecmp.dircmp(dir1, dir2)
  65. self.assertEqual(diff.left_only, [])
  66. self.assertEqual(diff.right_only, [])
  67. self.assertEqual(diff.diff_files, [])
  68. for filename in diff.common:
  69. path1 = os.path.join(dir1, filename)
  70. path2 = os.path.join(dir2, filename)
  71. s1 = os.lstat(path1)
  72. s2 = os.lstat(path2)
  73. attrs = ['st_mode', 'st_uid', 'st_gid']
  74. # We can't restore symlink atime/mtime right now
  75. if not os.path.islink(path1):
  76. attrs.append('st_mtime')
  77. d1 = [filename] + [getattr(s1, a) for a in attrs]
  78. d2 = [filename] + [getattr(s2, a) for a in attrs]
  79. d1.append(self.get_xattrs(path1))
  80. d2.append(self.get_xattrs(path2))
  81. self.assertEqual(d1, d2)
  82. def test_basic_functionality(self):
  83. self.create_regual_file('file1', size=1024*80)
  84. self.create_regual_file('dir2/file2', size=1024*80)
  85. x = xattr(os.path.join(self.input_path, 'file1'))
  86. x.set('user.foo', 'bar')
  87. os.link(os.path.join(self.input_path, 'file1'),
  88. os.path.join(self.input_path, 'hardlink'))
  89. os.symlink('somewhere', os.path.join(self.input_path, 'link1'))
  90. os.mkfifo(os.path.join(self.input_path, 'fifo1'))
  91. self.darc('init', '-p', '', self.store_path)
  92. self.darc('create', self.store_path + '::test', 'input')
  93. self.darc('create', self.store_path + '::test.2', 'input')
  94. self.darc('extract', self.store_path + '::test', 'output')
  95. self.diff_dirs('input', 'output/input')
  96. info_output = self.darc('info', self.store_path + '::test')
  97. shutil.rmtree(self.cache_path)
  98. info_output2 = self.darc('info', self.store_path + '::test')
  99. # info_output2 starts with some "initializing cache" text but should
  100. # end the same way as info_output
  101. assert info_output2.endswith(info_output)
  102. def test_corrupted_store(self):
  103. self.create_src_archive('test')
  104. self.darc('verify', self.store_path + '::test')
  105. fd = open(os.path.join(self.tmpdir, 'store', 'data', '0', '0'), 'r+')
  106. fd.seek(100)
  107. fd.write('X')
  108. fd.close()
  109. self.darc('verify', self.store_path + '::test', exit_code=1)
  110. def test_purge_store(self):
  111. src_dir = os.path.join(os.getcwd(), os.path.dirname(__file__))
  112. self.darc('init', '-p', '', self.store_path)
  113. self.darc('create', self.store_path + '::test1', src_dir)
  114. self.darc('create', self.store_path + '::test2', src_dir)
  115. self.darc('purge', self.store_path, '--daily=2')
  116. output = self.darc('list', self.store_path)
  117. assert 'test1' in output
  118. assert 'test2' in output
  119. self.darc('purge', self.store_path, '--daily=2', '--really')
  120. output = self.darc('list', self.store_path)
  121. assert 'test1' not in output
  122. assert 'test2' in output
  123. def suite():
  124. suite = unittest.TestSuite()
  125. suite.addTest(unittest.TestLoader().loadTestsFromTestCase(Test))
  126. suite.addTest(store.suite())
  127. suite.addTest(doctest.DocTestSuite(helpers))
  128. suite.addTest(lrucache.suite())
  129. return suite
  130. if __name__ == '__main__':
  131. unittest.TextTestRunner(verbosity=2).run(suite())