helpers.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. import hashlib
  2. from time import mktime, strptime
  3. from datetime import datetime, timezone, timedelta
  4. import pytest
  5. import msgpack
  6. from ..helpers import adjust_patterns, exclude_path, Location, format_timedelta, ExcludePattern, make_path_safe, \
  7. prune_within, prune_split, \
  8. StableDict, int_to_bigint, bigint_to_int, parse_timestamp, CompressionSpec, ChunkerParams
  9. from . import BaseTestCase
  10. class BigIntTestCase(BaseTestCase):
  11. def test_bigint(self):
  12. self.assert_equal(int_to_bigint(0), 0)
  13. self.assert_equal(int_to_bigint(2**63-1), 2**63-1)
  14. self.assert_equal(int_to_bigint(-2**63+1), -2**63+1)
  15. self.assert_equal(int_to_bigint(2**63), b'\x00\x00\x00\x00\x00\x00\x00\x80\x00')
  16. self.assert_equal(int_to_bigint(-2**63), b'\x00\x00\x00\x00\x00\x00\x00\x80\xff')
  17. self.assert_equal(bigint_to_int(int_to_bigint(-2**70)), -2**70)
  18. self.assert_equal(bigint_to_int(int_to_bigint(2**70)), 2**70)
  19. class TestLocationWithoutEnv:
  20. def test_ssh(self, monkeypatch):
  21. monkeypatch.delenv('BORG_REPO', raising=False)
  22. assert repr(Location('ssh://user@host:1234/some/path::archive')) == \
  23. "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path', archive='archive')"
  24. assert repr(Location('ssh://user@host:1234/some/path')) == \
  25. "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path', archive=None)"
  26. def test_file(self, monkeypatch):
  27. monkeypatch.delenv('BORG_REPO', raising=False)
  28. assert repr(Location('file:///some/path::archive')) == \
  29. "Location(proto='file', user=None, host=None, port=None, path='/some/path', archive='archive')"
  30. assert repr(Location('file:///some/path')) == \
  31. "Location(proto='file', user=None, host=None, port=None, path='/some/path', archive=None)"
  32. def test_scp(self, monkeypatch):
  33. monkeypatch.delenv('BORG_REPO', raising=False)
  34. assert repr(Location('user@host:/some/path::archive')) == \
  35. "Location(proto='ssh', user='user', host='host', port=None, path='/some/path', archive='archive')"
  36. assert repr(Location('user@host:/some/path')) == \
  37. "Location(proto='ssh', user='user', host='host', port=None, path='/some/path', archive=None)"
  38. def test_folder(self, monkeypatch):
  39. monkeypatch.delenv('BORG_REPO', raising=False)
  40. assert repr(Location('path::archive')) == \
  41. "Location(proto='file', user=None, host=None, port=None, path='path', archive='archive')"
  42. assert repr(Location('path')) == \
  43. "Location(proto='file', user=None, host=None, port=None, path='path', archive=None)"
  44. def test_abspath(self, monkeypatch):
  45. monkeypatch.delenv('BORG_REPO', raising=False)
  46. assert repr(Location('/some/absolute/path::archive')) == \
  47. "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path', archive='archive')"
  48. assert repr(Location('/some/absolute/path')) == \
  49. "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path', archive=None)"
  50. def test_relpath(self, monkeypatch):
  51. monkeypatch.delenv('BORG_REPO', raising=False)
  52. assert repr(Location('some/relative/path::archive')) == \
  53. "Location(proto='file', user=None, host=None, port=None, path='some/relative/path', archive='archive')"
  54. assert repr(Location('some/relative/path')) == \
  55. "Location(proto='file', user=None, host=None, port=None, path='some/relative/path', archive=None)"
  56. def test_underspecified(self, monkeypatch):
  57. monkeypatch.delenv('BORG_REPO', raising=False)
  58. with pytest.raises(ValueError):
  59. Location('::archive')
  60. with pytest.raises(ValueError):
  61. Location('::')
  62. with pytest.raises(ValueError):
  63. Location()
  64. def test_no_double_colon(self, monkeypatch):
  65. monkeypatch.delenv('BORG_REPO', raising=False)
  66. with pytest.raises(ValueError):
  67. Location('ssh://localhost:22/path:archive')
  68. def test_canonical_path(self, monkeypatch):
  69. monkeypatch.delenv('BORG_REPO', raising=False)
  70. locations = ['some/path::archive', 'file://some/path::archive', 'host:some/path::archive',
  71. 'host:~user/some/path::archive', 'ssh://host/some/path::archive',
  72. 'ssh://user@host:1234/some/path::archive']
  73. for location in locations:
  74. assert Location(location).canonical_path() == \
  75. Location(Location(location).canonical_path()).canonical_path()
  76. class TestLocationWithEnv:
  77. def test_ssh(self, monkeypatch):
  78. monkeypatch.setenv('BORG_REPO', 'ssh://user@host:1234/some/path')
  79. assert repr(Location('::archive')) == \
  80. "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path', archive='archive')"
  81. assert repr(Location()) == \
  82. "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path', archive=None)"
  83. def test_file(self, monkeypatch):
  84. monkeypatch.setenv('BORG_REPO', 'file:///some/path')
  85. assert repr(Location('::archive')) == \
  86. "Location(proto='file', user=None, host=None, port=None, path='/some/path', archive='archive')"
  87. assert repr(Location()) == \
  88. "Location(proto='file', user=None, host=None, port=None, path='/some/path', archive=None)"
  89. def test_scp(self, monkeypatch):
  90. monkeypatch.setenv('BORG_REPO', 'user@host:/some/path')
  91. assert repr(Location('::archive')) == \
  92. "Location(proto='ssh', user='user', host='host', port=None, path='/some/path', archive='archive')"
  93. assert repr(Location()) == \
  94. "Location(proto='ssh', user='user', host='host', port=None, path='/some/path', archive=None)"
  95. def test_folder(self, monkeypatch):
  96. monkeypatch.setenv('BORG_REPO', 'path')
  97. assert repr(Location('::archive')) == \
  98. "Location(proto='file', user=None, host=None, port=None, path='path', archive='archive')"
  99. assert repr(Location()) == \
  100. "Location(proto='file', user=None, host=None, port=None, path='path', archive=None)"
  101. def test_abspath(self, monkeypatch):
  102. monkeypatch.setenv('BORG_REPO', '/some/absolute/path')
  103. assert repr(Location('::archive')) == \
  104. "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path', archive='archive')"
  105. assert repr(Location()) == \
  106. "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path', archive=None)"
  107. def test_relpath(self, monkeypatch):
  108. monkeypatch.setenv('BORG_REPO', 'some/relative/path')
  109. assert repr(Location('::archive')) == \
  110. "Location(proto='file', user=None, host=None, port=None, path='some/relative/path', archive='archive')"
  111. assert repr(Location()) == \
  112. "Location(proto='file', user=None, host=None, port=None, path='some/relative/path', archive=None)"
  113. class FormatTimedeltaTestCase(BaseTestCase):
  114. def test(self):
  115. t0 = datetime(2001, 1, 1, 10, 20, 3, 0)
  116. t1 = datetime(2001, 1, 1, 12, 20, 4, 100000)
  117. self.assert_equal(
  118. format_timedelta(t1 - t0),
  119. '2 hours 1.10 seconds'
  120. )
  121. class PatternTestCase(BaseTestCase):
  122. files = [
  123. '/etc/passwd', '/etc/hosts', '/home',
  124. '/home/user/.profile', '/home/user/.bashrc',
  125. '/home/user2/.profile', '/home/user2/public_html/index.html',
  126. '/var/log/messages', '/var/log/dmesg',
  127. ]
  128. def evaluate(self, paths, excludes):
  129. patterns = adjust_patterns(paths, [ExcludePattern(p) for p in excludes])
  130. return [path for path in self.files if not exclude_path(path, patterns)]
  131. def test(self):
  132. self.assert_equal(self.evaluate(['/'], []), self.files)
  133. self.assert_equal(self.evaluate([], []), self.files)
  134. self.assert_equal(self.evaluate(['/'], ['/h']), self.files)
  135. self.assert_equal(self.evaluate(['/'], ['/home']),
  136. ['/etc/passwd', '/etc/hosts', '/var/log/messages', '/var/log/dmesg'])
  137. self.assert_equal(self.evaluate(['/'], ['/home/']),
  138. ['/etc/passwd', '/etc/hosts', '/home', '/var/log/messages', '/var/log/dmesg'])
  139. self.assert_equal(self.evaluate(['/home/u'], []), [])
  140. self.assert_equal(self.evaluate(['/', '/home', '/etc/hosts'], ['/']), [])
  141. self.assert_equal(self.evaluate(['/home/'], ['/home/user2']),
  142. ['/home', '/home/user/.profile', '/home/user/.bashrc'])
  143. self.assert_equal(self.evaluate(['/'], ['*.profile', '/var/log']),
  144. ['/etc/passwd', '/etc/hosts', '/home', '/home/user/.bashrc', '/home/user2/public_html/index.html'])
  145. self.assert_equal(self.evaluate(['/'], ['/home/*/public_html', '*.profile', '*/log/*']),
  146. ['/etc/passwd', '/etc/hosts', '/home', '/home/user/.bashrc'])
  147. self.assert_equal(self.evaluate(['/etc/', '/var'], ['dmesg']),
  148. ['/etc/passwd', '/etc/hosts', '/var/log/messages', '/var/log/dmesg'])
  149. def test_compression_specs():
  150. with pytest.raises(ValueError):
  151. CompressionSpec('')
  152. assert CompressionSpec('0') == dict(name='zlib', level=0)
  153. assert CompressionSpec('1') == dict(name='zlib', level=1)
  154. assert CompressionSpec('9') == dict(name='zlib', level=9)
  155. with pytest.raises(ValueError):
  156. CompressionSpec('10')
  157. assert CompressionSpec('none') == dict(name='none')
  158. assert CompressionSpec('lz4') == dict(name='lz4')
  159. assert CompressionSpec('zlib') == dict(name='zlib', level=6)
  160. assert CompressionSpec('zlib,0') == dict(name='zlib', level=0)
  161. assert CompressionSpec('zlib,9') == dict(name='zlib', level=9)
  162. with pytest.raises(ValueError):
  163. CompressionSpec('zlib,9,invalid')
  164. assert CompressionSpec('lzma') == dict(name='lzma', level=6)
  165. assert CompressionSpec('lzma,0') == dict(name='lzma', level=0)
  166. assert CompressionSpec('lzma,9') == dict(name='lzma', level=9)
  167. with pytest.raises(ValueError):
  168. CompressionSpec('lzma,9,invalid')
  169. with pytest.raises(ValueError):
  170. CompressionSpec('invalid')
  171. def test_chunkerparams():
  172. assert ChunkerParams('19,23,21,4095') == (19, 23, 21, 4095)
  173. assert ChunkerParams('10,23,16,4095') == (10, 23, 16, 4095)
  174. with pytest.raises(ValueError):
  175. ChunkerParams('19,24,21,4095')
  176. class MakePathSafeTestCase(BaseTestCase):
  177. def test(self):
  178. self.assert_equal(make_path_safe('/foo/bar'), 'foo/bar')
  179. self.assert_equal(make_path_safe('/foo/bar'), 'foo/bar')
  180. self.assert_equal(make_path_safe('/f/bar'), 'f/bar')
  181. self.assert_equal(make_path_safe('fo/bar'), 'fo/bar')
  182. self.assert_equal(make_path_safe('../foo/bar'), 'foo/bar')
  183. self.assert_equal(make_path_safe('../../foo/bar'), 'foo/bar')
  184. self.assert_equal(make_path_safe('/'), '.')
  185. self.assert_equal(make_path_safe('/'), '.')
  186. class MockArchive:
  187. def __init__(self, ts):
  188. self.ts = ts
  189. def __repr__(self):
  190. return repr(self.ts)
  191. class PruneSplitTestCase(BaseTestCase):
  192. def test(self):
  193. def local_to_UTC(month, day):
  194. """Convert noon on the month and day in 2013 to UTC."""
  195. seconds = mktime(strptime('2013-%02d-%02d 12:00' % (month, day), '%Y-%m-%d %H:%M'))
  196. return datetime.fromtimestamp(seconds, tz=timezone.utc)
  197. def subset(lst, indices):
  198. return {lst[i] for i in indices}
  199. def dotest(test_archives, n, skip, indices):
  200. for ta in test_archives, reversed(test_archives):
  201. self.assert_equal(set(prune_split(ta, '%Y-%m', n, skip)),
  202. subset(test_archives, indices))
  203. test_pairs = [(1, 1), (2, 1), (2, 28), (3, 1), (3, 2), (3, 31), (5, 1)]
  204. test_dates = [local_to_UTC(month, day) for month, day in test_pairs]
  205. test_archives = [MockArchive(date) for date in test_dates]
  206. dotest(test_archives, 3, [], [6, 5, 2])
  207. dotest(test_archives, -1, [], [6, 5, 2, 0])
  208. dotest(test_archives, 3, [test_archives[6]], [5, 2, 0])
  209. dotest(test_archives, 3, [test_archives[5]], [6, 2, 0])
  210. dotest(test_archives, 3, [test_archives[4]], [6, 5, 2])
  211. dotest(test_archives, 0, [], [])
  212. class PruneWithinTestCase(BaseTestCase):
  213. def test(self):
  214. def subset(lst, indices):
  215. return {lst[i] for i in indices}
  216. def dotest(test_archives, within, indices):
  217. for ta in test_archives, reversed(test_archives):
  218. self.assert_equal(set(prune_within(ta, within)),
  219. subset(test_archives, indices))
  220. # 1 minute, 1.5 hours, 2.5 hours, 3.5 hours, 25 hours, 49 hours
  221. test_offsets = [60, 90*60, 150*60, 210*60, 25*60*60, 49*60*60]
  222. now = datetime.now(timezone.utc)
  223. test_dates = [now - timedelta(seconds=s) for s in test_offsets]
  224. test_archives = [MockArchive(date) for date in test_dates]
  225. dotest(test_archives, '1H', [0])
  226. dotest(test_archives, '2H', [0, 1])
  227. dotest(test_archives, '3H', [0, 1, 2])
  228. dotest(test_archives, '24H', [0, 1, 2, 3])
  229. dotest(test_archives, '26H', [0, 1, 2, 3, 4])
  230. dotest(test_archives, '2d', [0, 1, 2, 3, 4])
  231. dotest(test_archives, '50H', [0, 1, 2, 3, 4, 5])
  232. dotest(test_archives, '3d', [0, 1, 2, 3, 4, 5])
  233. dotest(test_archives, '1w', [0, 1, 2, 3, 4, 5])
  234. dotest(test_archives, '1m', [0, 1, 2, 3, 4, 5])
  235. dotest(test_archives, '1y', [0, 1, 2, 3, 4, 5])
  236. class StableDictTestCase(BaseTestCase):
  237. def test(self):
  238. d = StableDict(foo=1, bar=2, boo=3, baz=4)
  239. self.assert_equal(list(d.items()), [('bar', 2), ('baz', 4), ('boo', 3), ('foo', 1)])
  240. self.assert_equal(hashlib.md5(msgpack.packb(d)).hexdigest(), 'fc78df42cd60691b3ac3dd2a2b39903f')
  241. class TestParseTimestamp(BaseTestCase):
  242. def test(self):
  243. self.assert_equal(parse_timestamp('2015-04-19T20:25:00.226410'), datetime(2015, 4, 19, 20, 25, 0, 226410, timezone.utc))
  244. self.assert_equal(parse_timestamp('2015-04-19T20:25:00'), datetime(2015, 4, 19, 20, 25, 0, 0, timezone.utc))