test_attic.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. from collections import OrderedDict
  2. from flexmock import flexmock
  3. from atticmatic import attic as module
  4. from atticmatic.tests.builtins import builtins_mock
  5. def insert_subprocess_mock(check_call_command, **kwargs):
  6. subprocess = flexmock()
  7. subprocess.should_receive('check_call').with_args(check_call_command, **kwargs).once()
  8. flexmock(module).subprocess = subprocess
  9. def insert_subprocess_never():
  10. subprocess = flexmock()
  11. subprocess.should_receive('check_call').never()
  12. flexmock(module).subprocess = subprocess
  13. def insert_platform_mock():
  14. flexmock(module.platform).should_receive('node').and_return('host')
  15. def insert_datetime_mock():
  16. flexmock(module).datetime = flexmock().should_receive('now').and_return(
  17. flexmock().should_receive('isoformat').and_return('now').mock
  18. ).mock
  19. def test_create_archive_should_call_attic_with_parameters():
  20. insert_subprocess_mock(
  21. ('attic', 'create', '--exclude-from', 'excludes', 'repo::host-now', 'foo', 'bar'),
  22. )
  23. insert_platform_mock()
  24. insert_datetime_mock()
  25. module.create_archive(
  26. excludes_filename='excludes',
  27. verbose=False,
  28. source_directories='foo bar',
  29. repository='repo',
  30. )
  31. def test_create_archive_with_verbose_should_call_attic_with_verbose_parameters():
  32. insert_subprocess_mock(
  33. (
  34. 'attic', 'create', '--exclude-from', 'excludes', 'repo::host-now', 'foo', 'bar',
  35. '--verbose', '--stats',
  36. ),
  37. )
  38. insert_platform_mock()
  39. insert_datetime_mock()
  40. module.create_archive(
  41. excludes_filename='excludes',
  42. verbose=True,
  43. source_directories='foo bar',
  44. repository='repo',
  45. )
  46. BASE_PRUNE_FLAGS = (
  47. ('--keep-daily', '1'),
  48. ('--keep-weekly', '2'),
  49. ('--keep-monthly', '3'),
  50. )
  51. def test_make_prune_flags_should_return_flags_from_config():
  52. retention_config = OrderedDict(
  53. (
  54. ('keep_daily', 1),
  55. ('keep_weekly', 2),
  56. ('keep_monthly', 3),
  57. )
  58. )
  59. result = module._make_prune_flags(retention_config)
  60. assert tuple(result) == BASE_PRUNE_FLAGS
  61. def test_prune_archives_should_call_attic_with_parameters():
  62. retention_config = flexmock()
  63. flexmock(module).should_receive('_make_prune_flags').with_args(retention_config).and_return(
  64. BASE_PRUNE_FLAGS,
  65. )
  66. insert_subprocess_mock(
  67. (
  68. 'attic', 'prune', 'repo', '--keep-daily', '1', '--keep-weekly', '2', '--keep-monthly',
  69. '3',
  70. ),
  71. )
  72. module.prune_archives(
  73. verbose=False,
  74. repository='repo',
  75. retention_config=retention_config,
  76. )
  77. def test_prune_archives_with_verbose_should_call_attic_with_verbose_parameters():
  78. retention_config = flexmock()
  79. flexmock(module).should_receive('_make_prune_flags').with_args(retention_config).and_return(
  80. BASE_PRUNE_FLAGS,
  81. )
  82. insert_subprocess_mock(
  83. (
  84. 'attic', 'prune', 'repo', '--keep-daily', '1', '--keep-weekly', '2', '--keep-monthly',
  85. '3', '--verbose',
  86. ),
  87. )
  88. module.prune_archives(
  89. repository='repo',
  90. verbose=True,
  91. retention_config=retention_config,
  92. )
  93. def test_parse_checks_returns_them_as_tuple():
  94. checks = module._parse_checks({'checks': 'foo disabled bar'})
  95. assert checks == ('foo', 'bar')
  96. def test_parse_checks_with_missing_value_returns_defaults():
  97. checks = module._parse_checks({})
  98. assert checks == module.DEFAULT_CHECKS
  99. def test_parse_checks_with_blank_value_returns_defaults():
  100. checks = module._parse_checks({'checks': ''})
  101. assert checks == module.DEFAULT_CHECKS
  102. def test_parse_checks_with_disabled_returns_no_checks():
  103. checks = module._parse_checks({'checks': 'disabled'})
  104. assert checks == ()
  105. def test_make_check_flags_with_checks_returns_flags():
  106. flags = module._make_check_flags(('foo', 'bar'))
  107. assert flags == ('--foo-only', '--bar-only')
  108. def test_make_check_flags_with_default_checks_returns_no_flags():
  109. flags = module._make_check_flags(module.DEFAULT_CHECKS)
  110. assert flags == ()
  111. def test_check_archives_should_call_attic_with_parameters():
  112. consistency_config = flexmock()
  113. flexmock(module).should_receive('_parse_checks').and_return(flexmock())
  114. flexmock(module).should_receive('_make_check_flags').and_return(())
  115. stdout = flexmock()
  116. insert_subprocess_mock(
  117. ('attic', 'check', 'repo'),
  118. stdout=stdout,
  119. )
  120. insert_platform_mock()
  121. insert_datetime_mock()
  122. builtins_mock().should_receive('open').and_return(stdout)
  123. flexmock(module.os).should_receive('devnull')
  124. module.check_archives(
  125. verbose=False,
  126. repository='repo',
  127. consistency_config=consistency_config,
  128. )
  129. def test_check_archives_with_verbose_should_call_attic_with_verbose_parameters():
  130. consistency_config = flexmock()
  131. flexmock(module).should_receive('_parse_checks').and_return(flexmock())
  132. flexmock(module).should_receive('_make_check_flags').and_return(())
  133. insert_subprocess_mock(
  134. ('attic', 'check', 'repo', '--verbose'),
  135. stdout=None,
  136. )
  137. insert_platform_mock()
  138. insert_datetime_mock()
  139. module.check_archives(
  140. verbose=True,
  141. repository='repo',
  142. consistency_config=consistency_config,
  143. )
  144. def test_check_archives_without_any_checks_should_bail():
  145. consistency_config = flexmock()
  146. flexmock(module).should_receive('_parse_checks').and_return(())
  147. insert_subprocess_never()
  148. module.check_archives(
  149. verbose=False,
  150. repository='repo',
  151. consistency_config=consistency_config,
  152. )