test_attic.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. from collections import OrderedDict
  2. try:
  3. # Python 2
  4. import __builtin__ as builtins
  5. except ImportError:
  6. # Python 3
  7. import builtins
  8. from flexmock import flexmock
  9. from nose.tools import assert_raises
  10. from atticmatic import attic as module
  11. class MockCalledProcessError(Exception):
  12. def __init__(self, output):
  13. self.output = output
  14. def insert_subprocess_check_output_mock(call_command, error_output=None, **kwargs):
  15. subprocess = flexmock(CalledProcessError=MockCalledProcessError, STDOUT=flexmock())
  16. expectation = subprocess.should_receive('check_output').with_args(
  17. call_command,
  18. stderr=subprocess.STDOUT,
  19. **kwargs
  20. ).once()
  21. if error_output:
  22. expectation.and_raise(MockCalledProcessError, output=error_output)
  23. flexmock(builtins).should_receive('print')
  24. flexmock(module).subprocess = subprocess
  25. return subprocess
  26. def insert_subprocess_check_call_mock(call_command, **kwargs):
  27. subprocess = flexmock()
  28. subprocess.should_receive('check_call').with_args(
  29. call_command,
  30. **kwargs
  31. ).once()
  32. flexmock(module).subprocess = subprocess
  33. return subprocess
  34. def insert_platform_mock():
  35. flexmock(module).platform = flexmock().should_receive('node').and_return('host').mock
  36. def insert_datetime_mock():
  37. flexmock(module).datetime = flexmock().should_receive('now').and_return(
  38. flexmock().should_receive('isoformat').and_return('now').mock
  39. ).mock
  40. def test_create_archive_should_call_attic_with_parameters():
  41. insert_subprocess_check_output_mock(
  42. ('attic', 'create', '--exclude-from', 'excludes', 'repo::host-now', 'foo', 'bar'),
  43. )
  44. insert_platform_mock()
  45. insert_datetime_mock()
  46. module.create_archive(
  47. excludes_filename='excludes',
  48. verbose=False,
  49. source_directories='foo bar',
  50. repository='repo',
  51. )
  52. def test_create_archive_with_verbose_should_call_attic_with_verbose_parameters():
  53. insert_subprocess_check_output_mock(
  54. (
  55. 'attic', 'create', '--exclude-from', 'excludes', 'repo::host-now', 'foo', 'bar',
  56. '--verbose', '--stats',
  57. ),
  58. )
  59. insert_platform_mock()
  60. insert_datetime_mock()
  61. module.create_archive(
  62. excludes_filename='excludes',
  63. verbose=True,
  64. source_directories='foo bar',
  65. repository='repo',
  66. )
  67. def test_create_archive_with_missing_repository_should_raise():
  68. insert_subprocess_check_output_mock(
  69. ('attic', 'create', '--exclude-from', 'excludes', 'repo::host-now', 'foo', 'bar'),
  70. error_output='Error: Repository repo does not exist',
  71. )
  72. insert_platform_mock()
  73. insert_datetime_mock()
  74. with assert_raises(RuntimeError):
  75. module.create_archive(
  76. excludes_filename='excludes',
  77. verbose=False,
  78. source_directories='foo bar',
  79. repository='repo',
  80. )
  81. def test_create_archive_with_other_error_should_raise():
  82. subprocess = insert_subprocess_check_output_mock(
  83. ('attic', 'create', '--exclude-from', 'excludes', 'repo::host-now', 'foo', 'bar'),
  84. error_output='Something went wrong',
  85. )
  86. insert_platform_mock()
  87. insert_datetime_mock()
  88. with assert_raises(subprocess.CalledProcessError):
  89. module.create_archive(
  90. excludes_filename='excludes',
  91. verbose=False,
  92. source_directories='foo bar',
  93. repository='repo',
  94. )
  95. BASE_PRUNE_FLAGS = (
  96. ('--keep-daily', '1'),
  97. ('--keep-weekly', '2'),
  98. ('--keep-monthly', '3'),
  99. )
  100. def test_make_prune_flags_should_return_flags_from_config():
  101. retention_config = OrderedDict(
  102. (
  103. ('keep_daily', 1),
  104. ('keep_weekly', 2),
  105. ('keep_monthly', 3),
  106. )
  107. )
  108. result = module.make_prune_flags(retention_config)
  109. assert tuple(result) == BASE_PRUNE_FLAGS
  110. def test_prune_archives_should_call_attic_with_parameters():
  111. retention_config = flexmock()
  112. flexmock(module).should_receive('make_prune_flags').with_args(retention_config).and_return(
  113. BASE_PRUNE_FLAGS,
  114. )
  115. insert_subprocess_check_call_mock(
  116. (
  117. 'attic', 'prune', 'repo', '--keep-daily', '1', '--keep-weekly', '2', '--keep-monthly',
  118. '3',
  119. ),
  120. )
  121. module.prune_archives(
  122. verbose=False,
  123. repository='repo',
  124. retention_config=retention_config,
  125. )
  126. def test_prune_archives_with_verbose_should_call_attic_with_verbose_parameters():
  127. retention_config = flexmock()
  128. flexmock(module).should_receive('make_prune_flags').with_args(retention_config).and_return(
  129. BASE_PRUNE_FLAGS,
  130. )
  131. insert_subprocess_check_call_mock(
  132. (
  133. 'attic', 'prune', 'repo', '--keep-daily', '1', '--keep-weekly', '2', '--keep-monthly',
  134. '3', '--verbose',
  135. ),
  136. )
  137. module.prune_archives(
  138. repository='repo',
  139. verbose=True,
  140. retention_config=retention_config,
  141. )
  142. def test_check_archives_should_call_attic_with_parameters():
  143. stdout = flexmock()
  144. insert_subprocess_check_call_mock(
  145. ('attic', 'check', 'repo'),
  146. stdout=stdout,
  147. )
  148. insert_platform_mock()
  149. insert_datetime_mock()
  150. flexmock(module).open = lambda filename, mode: stdout
  151. flexmock(module).os = flexmock().should_receive('devnull').mock
  152. module.check_archives(
  153. verbose=False,
  154. repository='repo',
  155. )
  156. def test_check_archives_with_verbose_should_call_attic_with_verbose_parameters():
  157. insert_subprocess_check_call_mock(
  158. ('attic', 'check', 'repo', '--verbose'),
  159. stdout=None,
  160. )
  161. insert_platform_mock()
  162. insert_datetime_mock()
  163. module.check_archives(
  164. verbose=True,
  165. repository='repo',
  166. )