helpers.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. from time import mktime, strptime
  2. from datetime import datetime, timezone
  3. import os
  4. import tempfile
  5. import unittest
  6. from attic.helpers import adjust_patterns, exclude_path, Location, format_timedelta, IncludePattern, ExcludePattern, make_path_safe, UpgradableLock, prune_split, to_localtime
  7. from attic.testsuite import AtticTestCase
  8. class LocationTestCase(AtticTestCase):
  9. def test(self):
  10. self.assert_equal(
  11. repr(Location('ssh://user@host:1234/some/path::archive')),
  12. "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path', archive='archive')"
  13. )
  14. self.assert_equal(
  15. repr(Location('file:///some/path::archive')),
  16. "Location(proto='file', user=None, host=None, port=None, path='/some/path', archive='archive')"
  17. )
  18. self.assert_equal(
  19. repr(Location('user@host:/some/path::archive')),
  20. "Location(proto='ssh', user='user', host='host', port=None, path='/some/path', archive='archive')"
  21. )
  22. self.assert_equal(
  23. repr(Location('mybackup.attic::archive')),
  24. "Location(proto='file', user=None, host=None, port=None, path='mybackup.attic', archive='archive')"
  25. )
  26. self.assert_equal(
  27. repr(Location('/some/absolute/path::archive')),
  28. "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path', archive='archive')"
  29. )
  30. self.assert_equal(
  31. repr(Location('some/relative/path::archive')),
  32. "Location(proto='file', user=None, host=None, port=None, path='some/relative/path', archive='archive')"
  33. )
  34. class FormatTimedeltaTestCase(AtticTestCase):
  35. def test(self):
  36. t0 = datetime(2001, 1, 1, 10, 20, 3, 0)
  37. t1 = datetime(2001, 1, 1, 12, 20, 4, 100000)
  38. self.assert_equal(
  39. format_timedelta(t1 - t0),
  40. '2 hours 1.10 seconds'
  41. )
  42. class PatternTestCase(AtticTestCase):
  43. files = [
  44. '/etc/passwd', '/etc/hosts', '/home',
  45. '/home/user/.profile', '/home/user/.bashrc',
  46. '/home/user2/.profile', '/home/user2/public_html/index.html',
  47. '/var/log/messages', '/var/log/dmesg',
  48. ]
  49. def evaluate(self, paths, excludes):
  50. patterns = adjust_patterns(paths, [ExcludePattern(p) for p in excludes])
  51. return [path for path in self.files if not exclude_path(path, patterns)]
  52. def test(self):
  53. self.assert_equal(self.evaluate(['/'], []), self.files)
  54. self.assert_equal(self.evaluate([], []), self.files)
  55. self.assert_equal(self.evaluate(['/'], ['/h']), self.files)
  56. self.assert_equal(self.evaluate(['/'], ['/home']),
  57. ['/etc/passwd', '/etc/hosts', '/var/log/messages', '/var/log/dmesg'])
  58. self.assert_equal(self.evaluate(['/'], ['/home/']),
  59. ['/etc/passwd', '/etc/hosts', '/home', '/var/log/messages', '/var/log/dmesg'])
  60. self.assert_equal(self.evaluate(['/home/u'], []), [])
  61. self.assert_equal(self.evaluate(['/', '/home', '/etc/hosts'], ['/']), [])
  62. self.assert_equal(self.evaluate(['/home/'], ['/home/user2']),
  63. ['/home', '/home/user/.profile', '/home/user/.bashrc'])
  64. self.assert_equal(self.evaluate(['/'], ['*.profile', '/var/log']),
  65. ['/etc/passwd', '/etc/hosts', '/home', '/home/user/.bashrc', '/home/user2/public_html/index.html'])
  66. self.assert_equal(self.evaluate(['/'], ['/home/*/public_html', '*.profile', '*/log/*']),
  67. ['/etc/passwd', '/etc/hosts', '/home', '/home/user/.bashrc'])
  68. self.assert_equal(self.evaluate(['/etc/', '/var'], ['dmesg']),
  69. ['/etc/passwd', '/etc/hosts', '/var/log/messages', '/var/log/dmesg'])
  70. class MakePathSafeTestCase(AtticTestCase):
  71. def test(self):
  72. self.assert_equal(make_path_safe('/foo/bar'), 'foo/bar')
  73. self.assert_equal(make_path_safe('/foo/bar'), 'foo/bar')
  74. self.assert_equal(make_path_safe('../foo/bar'), 'foo/bar')
  75. self.assert_equal(make_path_safe('../../foo/bar'), 'foo/bar')
  76. self.assert_equal(make_path_safe('/'), '.')
  77. self.assert_equal(make_path_safe('/'), '.')
  78. class UpgradableLockTestCase(AtticTestCase):
  79. def test(self):
  80. file = tempfile.NamedTemporaryFile()
  81. lock = UpgradableLock(file.name)
  82. lock.upgrade()
  83. lock.upgrade()
  84. lock.release()
  85. @unittest.skipIf(os.getuid() == 0, 'Root can always open files for writing')
  86. def test_read_only_lock_file(self):
  87. file = tempfile.NamedTemporaryFile()
  88. os.chmod(file.name, 0o444)
  89. lock = UpgradableLock(file.name)
  90. self.assert_raises(UpgradableLock.LockUpgradeFailed, lock.upgrade)
  91. lock.release()
  92. class MockArchive(object):
  93. def __init__(self, ts):
  94. self.ts = ts
  95. def __repr__(self):
  96. return repr(self.ts)
  97. class PruneSplitTestCase(AtticTestCase):
  98. def test(self):
  99. def local_to_UTC(month, day):
  100. 'Convert noon on the month and day in 2013 to UTC.'
  101. seconds = mktime(strptime('2013-%02d-%02d 12:00' % (month, day), '%Y-%m-%d %H:%M'))
  102. return datetime.fromtimestamp(seconds, tz=timezone.utc)
  103. def subset(lst, indices):
  104. return {lst[i] for i in indices}
  105. def dotest(test_archives, n, skip, indices):
  106. for ta in test_archives, reversed(test_archives):
  107. self.assert_equal(set(prune_split(ta, '%Y-%m', n, skip)),
  108. subset(test_archives, indices))
  109. test_pairs = [(1,1), (2,1), (2,28), (3,1), (3,2), (3,31), (5,1)]
  110. test_dates = [local_to_UTC(month, day) for month, day in test_pairs]
  111. test_archives = [MockArchive(date) for date in test_dates]
  112. dotest(test_archives, 3, [], [6, 5, 2])
  113. dotest(test_archives, -1, [], [6, 5, 2, 0])
  114. dotest(test_archives, 3, [test_archives[6]], [5, 2, 0])
  115. dotest(test_archives, 3, [test_archives[5]], [6, 2, 0])
  116. dotest(test_archives, 3, [test_archives[4]], [6, 5, 2])
  117. dotest(test_archives, 0, [], [])