helpers.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. import hashlib
  2. from time import mktime, strptime
  3. from datetime import datetime, timezone, timedelta
  4. from io import StringIO
  5. import os
  6. import pytest
  7. import sys
  8. import msgpack
  9. import msgpack.fallback
  10. from ..helpers import adjust_patterns, exclude_path, Location, format_file_size, format_timedelta, IncludePattern, ExcludePattern, make_path_safe, \
  11. prune_within, prune_split, get_cache_dir, Statistics, is_slow_msgpack, yes, \
  12. StableDict, int_to_bigint, bigint_to_int, parse_timestamp, CompressionSpec, ChunkerParams
  13. from . import BaseTestCase, environment_variable, FakeInputs
  14. class BigIntTestCase(BaseTestCase):
  15. def test_bigint(self):
  16. self.assert_equal(int_to_bigint(0), 0)
  17. self.assert_equal(int_to_bigint(2**63-1), 2**63-1)
  18. self.assert_equal(int_to_bigint(-2**63+1), -2**63+1)
  19. self.assert_equal(int_to_bigint(2**63), b'\x00\x00\x00\x00\x00\x00\x00\x80\x00')
  20. self.assert_equal(int_to_bigint(-2**63), b'\x00\x00\x00\x00\x00\x00\x00\x80\xff')
  21. self.assert_equal(bigint_to_int(int_to_bigint(-2**70)), -2**70)
  22. self.assert_equal(bigint_to_int(int_to_bigint(2**70)), 2**70)
  23. class TestLocationWithoutEnv:
  24. def test_ssh(self, monkeypatch):
  25. monkeypatch.delenv('BORG_REPO', raising=False)
  26. assert repr(Location('ssh://user@host:1234/some/path::archive')) == \
  27. "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path', archive='archive')"
  28. assert repr(Location('ssh://user@host:1234/some/path')) == \
  29. "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path', archive=None)"
  30. def test_file(self, monkeypatch):
  31. monkeypatch.delenv('BORG_REPO', raising=False)
  32. assert repr(Location('file:///some/path::archive')) == \
  33. "Location(proto='file', user=None, host=None, port=None, path='/some/path', archive='archive')"
  34. assert repr(Location('file:///some/path')) == \
  35. "Location(proto='file', user=None, host=None, port=None, path='/some/path', archive=None)"
  36. def test_scp(self, monkeypatch):
  37. monkeypatch.delenv('BORG_REPO', raising=False)
  38. assert repr(Location('user@host:/some/path::archive')) == \
  39. "Location(proto='ssh', user='user', host='host', port=None, path='/some/path', archive='archive')"
  40. assert repr(Location('user@host:/some/path')) == \
  41. "Location(proto='ssh', user='user', host='host', port=None, path='/some/path', archive=None)"
  42. def test_folder(self, monkeypatch):
  43. monkeypatch.delenv('BORG_REPO', raising=False)
  44. assert repr(Location('path::archive')) == \
  45. "Location(proto='file', user=None, host=None, port=None, path='path', archive='archive')"
  46. assert repr(Location('path')) == \
  47. "Location(proto='file', user=None, host=None, port=None, path='path', archive=None)"
  48. def test_abspath(self, monkeypatch):
  49. monkeypatch.delenv('BORG_REPO', raising=False)
  50. assert repr(Location('/some/absolute/path::archive')) == \
  51. "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path', archive='archive')"
  52. assert repr(Location('/some/absolute/path')) == \
  53. "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path', archive=None)"
  54. def test_relpath(self, monkeypatch):
  55. monkeypatch.delenv('BORG_REPO', raising=False)
  56. assert repr(Location('some/relative/path::archive')) == \
  57. "Location(proto='file', user=None, host=None, port=None, path='some/relative/path', archive='archive')"
  58. assert repr(Location('some/relative/path')) == \
  59. "Location(proto='file', user=None, host=None, port=None, path='some/relative/path', archive=None)"
  60. def test_underspecified(self, monkeypatch):
  61. monkeypatch.delenv('BORG_REPO', raising=False)
  62. with pytest.raises(ValueError):
  63. Location('::archive')
  64. with pytest.raises(ValueError):
  65. Location('::')
  66. with pytest.raises(ValueError):
  67. Location()
  68. def test_no_double_colon(self, monkeypatch):
  69. monkeypatch.delenv('BORG_REPO', raising=False)
  70. with pytest.raises(ValueError):
  71. Location('ssh://localhost:22/path:archive')
  72. def test_no_slashes(self, monkeypatch):
  73. monkeypatch.delenv('BORG_REPO', raising=False)
  74. with pytest.raises(ValueError):
  75. Location('/some/path/to/repo::archive_name_with/slashes/is_invalid')
  76. def test_canonical_path(self, monkeypatch):
  77. monkeypatch.delenv('BORG_REPO', raising=False)
  78. locations = ['some/path::archive', 'file://some/path::archive', 'host:some/path::archive',
  79. 'host:~user/some/path::archive', 'ssh://host/some/path::archive',
  80. 'ssh://user@host:1234/some/path::archive']
  81. for location in locations:
  82. assert Location(location).canonical_path() == \
  83. Location(Location(location).canonical_path()).canonical_path()
  84. class TestLocationWithEnv:
  85. def test_ssh(self, monkeypatch):
  86. monkeypatch.setenv('BORG_REPO', 'ssh://user@host:1234/some/path')
  87. assert repr(Location('::archive')) == \
  88. "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path', archive='archive')"
  89. assert repr(Location()) == \
  90. "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path', archive=None)"
  91. def test_file(self, monkeypatch):
  92. monkeypatch.setenv('BORG_REPO', 'file:///some/path')
  93. assert repr(Location('::archive')) == \
  94. "Location(proto='file', user=None, host=None, port=None, path='/some/path', archive='archive')"
  95. assert repr(Location()) == \
  96. "Location(proto='file', user=None, host=None, port=None, path='/some/path', archive=None)"
  97. def test_scp(self, monkeypatch):
  98. monkeypatch.setenv('BORG_REPO', 'user@host:/some/path')
  99. assert repr(Location('::archive')) == \
  100. "Location(proto='ssh', user='user', host='host', port=None, path='/some/path', archive='archive')"
  101. assert repr(Location()) == \
  102. "Location(proto='ssh', user='user', host='host', port=None, path='/some/path', archive=None)"
  103. def test_folder(self, monkeypatch):
  104. monkeypatch.setenv('BORG_REPO', 'path')
  105. assert repr(Location('::archive')) == \
  106. "Location(proto='file', user=None, host=None, port=None, path='path', archive='archive')"
  107. assert repr(Location()) == \
  108. "Location(proto='file', user=None, host=None, port=None, path='path', archive=None)"
  109. def test_abspath(self, monkeypatch):
  110. monkeypatch.setenv('BORG_REPO', '/some/absolute/path')
  111. assert repr(Location('::archive')) == \
  112. "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path', archive='archive')"
  113. assert repr(Location()) == \
  114. "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path', archive=None)"
  115. def test_relpath(self, monkeypatch):
  116. monkeypatch.setenv('BORG_REPO', 'some/relative/path')
  117. assert repr(Location('::archive')) == \
  118. "Location(proto='file', user=None, host=None, port=None, path='some/relative/path', archive='archive')"
  119. assert repr(Location()) == \
  120. "Location(proto='file', user=None, host=None, port=None, path='some/relative/path', archive=None)"
  121. def test_no_slashes(self, monkeypatch):
  122. monkeypatch.setenv('BORG_REPO', '/some/absolute/path')
  123. with pytest.raises(ValueError):
  124. Location('::archive_name_with/slashes/is_invalid')
  125. class FormatTimedeltaTestCase(BaseTestCase):
  126. def test(self):
  127. t0 = datetime(2001, 1, 1, 10, 20, 3, 0)
  128. t1 = datetime(2001, 1, 1, 12, 20, 4, 100000)
  129. self.assert_equal(
  130. format_timedelta(t1 - t0),
  131. '2 hours 1.10 seconds'
  132. )
  133. class PatternTestCase(BaseTestCase):
  134. files = [
  135. '/etc/passwd', '/etc/hosts', '/home',
  136. '/home/user/.profile', '/home/user/.bashrc',
  137. '/home/user2/.profile', '/home/user2/public_html/index.html',
  138. '/var/log/messages', '/var/log/dmesg',
  139. ]
  140. def evaluate(self, paths, excludes):
  141. patterns = adjust_patterns(paths, [ExcludePattern(p) for p in excludes])
  142. return [path for path in self.files if not exclude_path(path, patterns)]
  143. def test(self):
  144. self.assert_equal(self.evaluate(['/'], []), self.files)
  145. self.assert_equal(self.evaluate([], []), self.files)
  146. self.assert_equal(self.evaluate(['/'], ['/h']), self.files)
  147. self.assert_equal(self.evaluate(['/'], ['/home']),
  148. ['/etc/passwd', '/etc/hosts', '/var/log/messages', '/var/log/dmesg'])
  149. self.assert_equal(self.evaluate(['/'], ['/home/']),
  150. ['/etc/passwd', '/etc/hosts', '/home', '/var/log/messages', '/var/log/dmesg'])
  151. self.assert_equal(self.evaluate(['/home/u'], []), [])
  152. self.assert_equal(self.evaluate(['/', '/home', '/etc/hosts'], ['/']), [])
  153. self.assert_equal(self.evaluate(['/home/'], ['/home/user2']),
  154. ['/home', '/home/user/.profile', '/home/user/.bashrc'])
  155. self.assert_equal(self.evaluate(['/'], ['*.profile', '/var/log']),
  156. ['/etc/passwd', '/etc/hosts', '/home', '/home/user/.bashrc', '/home/user2/public_html/index.html'])
  157. self.assert_equal(self.evaluate(['/'], ['/home/*/public_html', '*.profile', '*/log/*']),
  158. ['/etc/passwd', '/etc/hosts', '/home', '/home/user/.bashrc'])
  159. self.assert_equal(self.evaluate(['/etc/', '/var'], ['dmesg']),
  160. ['/etc/passwd', '/etc/hosts', '/var/log/messages', '/var/log/dmesg'])
  161. @pytest.mark.skipif(sys.platform in ('darwin',), reason='all but OS X test')
  162. class PatternNonAsciiTestCase(BaseTestCase):
  163. def testComposedUnicode(self):
  164. pattern = 'b\N{LATIN SMALL LETTER A WITH ACUTE}'
  165. i = IncludePattern(pattern)
  166. e = ExcludePattern(pattern)
  167. assert i.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
  168. assert not i.match("ba\N{COMBINING ACUTE ACCENT}/foo")
  169. assert e.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
  170. assert not e.match("ba\N{COMBINING ACUTE ACCENT}/foo")
  171. def testDecomposedUnicode(self):
  172. pattern = 'ba\N{COMBINING ACUTE ACCENT}'
  173. i = IncludePattern(pattern)
  174. e = ExcludePattern(pattern)
  175. assert not i.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
  176. assert i.match("ba\N{COMBINING ACUTE ACCENT}/foo")
  177. assert not e.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
  178. assert e.match("ba\N{COMBINING ACUTE ACCENT}/foo")
  179. def testInvalidUnicode(self):
  180. pattern = str(b'ba\x80', 'latin1')
  181. i = IncludePattern(pattern)
  182. e = ExcludePattern(pattern)
  183. assert not i.match("ba/foo")
  184. assert i.match(str(b"ba\x80/foo", 'latin1'))
  185. assert not e.match("ba/foo")
  186. assert e.match(str(b"ba\x80/foo", 'latin1'))
  187. @pytest.mark.skipif(sys.platform not in ('darwin',), reason='OS X test')
  188. class OSXPatternNormalizationTestCase(BaseTestCase):
  189. def testComposedUnicode(self):
  190. pattern = 'b\N{LATIN SMALL LETTER A WITH ACUTE}'
  191. i = IncludePattern(pattern)
  192. e = ExcludePattern(pattern)
  193. assert i.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
  194. assert i.match("ba\N{COMBINING ACUTE ACCENT}/foo")
  195. assert e.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
  196. assert e.match("ba\N{COMBINING ACUTE ACCENT}/foo")
  197. def testDecomposedUnicode(self):
  198. pattern = 'ba\N{COMBINING ACUTE ACCENT}'
  199. i = IncludePattern(pattern)
  200. e = ExcludePattern(pattern)
  201. assert i.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
  202. assert i.match("ba\N{COMBINING ACUTE ACCENT}/foo")
  203. assert e.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
  204. assert e.match("ba\N{COMBINING ACUTE ACCENT}/foo")
  205. def testInvalidUnicode(self):
  206. pattern = str(b'ba\x80', 'latin1')
  207. i = IncludePattern(pattern)
  208. e = ExcludePattern(pattern)
  209. assert not i.match("ba/foo")
  210. assert i.match(str(b"ba\x80/foo", 'latin1'))
  211. assert not e.match("ba/foo")
  212. assert e.match(str(b"ba\x80/foo", 'latin1'))
  213. def test_compression_specs():
  214. with pytest.raises(ValueError):
  215. CompressionSpec('')
  216. assert CompressionSpec('0') == dict(name='zlib', level=0)
  217. assert CompressionSpec('1') == dict(name='zlib', level=1)
  218. assert CompressionSpec('9') == dict(name='zlib', level=9)
  219. with pytest.raises(ValueError):
  220. CompressionSpec('10')
  221. assert CompressionSpec('none') == dict(name='none')
  222. assert CompressionSpec('lz4') == dict(name='lz4')
  223. assert CompressionSpec('zlib') == dict(name='zlib', level=6)
  224. assert CompressionSpec('zlib,0') == dict(name='zlib', level=0)
  225. assert CompressionSpec('zlib,9') == dict(name='zlib', level=9)
  226. with pytest.raises(ValueError):
  227. CompressionSpec('zlib,9,invalid')
  228. assert CompressionSpec('lzma') == dict(name='lzma', level=6)
  229. assert CompressionSpec('lzma,0') == dict(name='lzma', level=0)
  230. assert CompressionSpec('lzma,9') == dict(name='lzma', level=9)
  231. with pytest.raises(ValueError):
  232. CompressionSpec('lzma,9,invalid')
  233. with pytest.raises(ValueError):
  234. CompressionSpec('invalid')
  235. def test_chunkerparams():
  236. assert ChunkerParams('19,23,21,4095') == (19, 23, 21, 4095)
  237. assert ChunkerParams('10,23,16,4095') == (10, 23, 16, 4095)
  238. with pytest.raises(ValueError):
  239. ChunkerParams('19,24,21,4095')
  240. class MakePathSafeTestCase(BaseTestCase):
  241. def test(self):
  242. self.assert_equal(make_path_safe('/foo/bar'), 'foo/bar')
  243. self.assert_equal(make_path_safe('/foo/bar'), 'foo/bar')
  244. self.assert_equal(make_path_safe('/f/bar'), 'f/bar')
  245. self.assert_equal(make_path_safe('fo/bar'), 'fo/bar')
  246. self.assert_equal(make_path_safe('../foo/bar'), 'foo/bar')
  247. self.assert_equal(make_path_safe('../../foo/bar'), 'foo/bar')
  248. self.assert_equal(make_path_safe('/'), '.')
  249. self.assert_equal(make_path_safe('/'), '.')
  250. class MockArchive:
  251. def __init__(self, ts):
  252. self.ts = ts
  253. def __repr__(self):
  254. return repr(self.ts)
  255. class PruneSplitTestCase(BaseTestCase):
  256. def test(self):
  257. def local_to_UTC(month, day):
  258. """Convert noon on the month and day in 2013 to UTC."""
  259. seconds = mktime(strptime('2013-%02d-%02d 12:00' % (month, day), '%Y-%m-%d %H:%M'))
  260. return datetime.fromtimestamp(seconds, tz=timezone.utc)
  261. def subset(lst, indices):
  262. return {lst[i] for i in indices}
  263. def dotest(test_archives, n, skip, indices):
  264. for ta in test_archives, reversed(test_archives):
  265. self.assert_equal(set(prune_split(ta, '%Y-%m', n, skip)),
  266. subset(test_archives, indices))
  267. test_pairs = [(1, 1), (2, 1), (2, 28), (3, 1), (3, 2), (3, 31), (5, 1)]
  268. test_dates = [local_to_UTC(month, day) for month, day in test_pairs]
  269. test_archives = [MockArchive(date) for date in test_dates]
  270. dotest(test_archives, 3, [], [6, 5, 2])
  271. dotest(test_archives, -1, [], [6, 5, 2, 0])
  272. dotest(test_archives, 3, [test_archives[6]], [5, 2, 0])
  273. dotest(test_archives, 3, [test_archives[5]], [6, 2, 0])
  274. dotest(test_archives, 3, [test_archives[4]], [6, 5, 2])
  275. dotest(test_archives, 0, [], [])
  276. class PruneWithinTestCase(BaseTestCase):
  277. def test(self):
  278. def subset(lst, indices):
  279. return {lst[i] for i in indices}
  280. def dotest(test_archives, within, indices):
  281. for ta in test_archives, reversed(test_archives):
  282. self.assert_equal(set(prune_within(ta, within)),
  283. subset(test_archives, indices))
  284. # 1 minute, 1.5 hours, 2.5 hours, 3.5 hours, 25 hours, 49 hours
  285. test_offsets = [60, 90*60, 150*60, 210*60, 25*60*60, 49*60*60]
  286. now = datetime.now(timezone.utc)
  287. test_dates = [now - timedelta(seconds=s) for s in test_offsets]
  288. test_archives = [MockArchive(date) for date in test_dates]
  289. dotest(test_archives, '1H', [0])
  290. dotest(test_archives, '2H', [0, 1])
  291. dotest(test_archives, '3H', [0, 1, 2])
  292. dotest(test_archives, '24H', [0, 1, 2, 3])
  293. dotest(test_archives, '26H', [0, 1, 2, 3, 4])
  294. dotest(test_archives, '2d', [0, 1, 2, 3, 4])
  295. dotest(test_archives, '50H', [0, 1, 2, 3, 4, 5])
  296. dotest(test_archives, '3d', [0, 1, 2, 3, 4, 5])
  297. dotest(test_archives, '1w', [0, 1, 2, 3, 4, 5])
  298. dotest(test_archives, '1m', [0, 1, 2, 3, 4, 5])
  299. dotest(test_archives, '1y', [0, 1, 2, 3, 4, 5])
  300. class StableDictTestCase(BaseTestCase):
  301. def test(self):
  302. d = StableDict(foo=1, bar=2, boo=3, baz=4)
  303. self.assert_equal(list(d.items()), [('bar', 2), ('baz', 4), ('boo', 3), ('foo', 1)])
  304. self.assert_equal(hashlib.md5(msgpack.packb(d)).hexdigest(), 'fc78df42cd60691b3ac3dd2a2b39903f')
  305. class TestParseTimestamp(BaseTestCase):
  306. def test(self):
  307. self.assert_equal(parse_timestamp('2015-04-19T20:25:00.226410'), datetime(2015, 4, 19, 20, 25, 0, 226410, timezone.utc))
  308. self.assert_equal(parse_timestamp('2015-04-19T20:25:00'), datetime(2015, 4, 19, 20, 25, 0, 0, timezone.utc))
  309. def test_get_cache_dir():
  310. """test that get_cache_dir respects environement"""
  311. # reset BORG_CACHE_DIR in order to test default
  312. old_env = None
  313. if os.environ.get('BORG_CACHE_DIR'):
  314. old_env = os.environ['BORG_CACHE_DIR']
  315. del(os.environ['BORG_CACHE_DIR'])
  316. assert get_cache_dir() == os.path.join(os.path.expanduser('~'), '.cache', 'borg')
  317. os.environ['XDG_CACHE_HOME'] = '/var/tmp/.cache'
  318. assert get_cache_dir() == os.path.join('/var/tmp/.cache', 'borg')
  319. os.environ['BORG_CACHE_DIR'] = '/var/tmp'
  320. assert get_cache_dir() == '/var/tmp'
  321. # reset old env
  322. if old_env is not None:
  323. os.environ['BORG_CACHE_DIR'] = old_env
  324. @pytest.fixture()
  325. def stats():
  326. stats = Statistics()
  327. stats.update(20, 10, unique=True)
  328. return stats
  329. def test_stats_basic(stats):
  330. assert stats.osize == 20
  331. assert stats.csize == stats.usize == 10
  332. stats.update(20, 10, unique=False)
  333. assert stats.osize == 40
  334. assert stats.csize == 20
  335. assert stats.usize == 10
  336. def tests_stats_progress(stats, columns=80):
  337. os.environ['COLUMNS'] = str(columns)
  338. out = StringIO()
  339. stats.show_progress(stream=out)
  340. s = '20 B O 10 B C 10 B D 0 N '
  341. buf = ' ' * (columns - len(s))
  342. assert out.getvalue() == s + buf + "\r"
  343. out = StringIO()
  344. stats.update(10**3, 0, unique=False)
  345. stats.show_progress(item={b'path': 'foo'}, final=False, stream=out)
  346. s = '1.02 kB O 10 B C 10 B D 0 N foo'
  347. buf = ' ' * (columns - len(s))
  348. assert out.getvalue() == s + buf + "\r"
  349. out = StringIO()
  350. stats.show_progress(item={b'path': 'foo'*40}, final=False, stream=out)
  351. s = '1.02 kB O 10 B C 10 B D 0 N foofoofoofoofoofoofoofo...oofoofoofoofoofoofoofoofoo'
  352. buf = ' ' * (columns - len(s))
  353. assert out.getvalue() == s + buf + "\r"
  354. def test_stats_format(stats):
  355. assert str(stats) == """\
  356. Original size Compressed size Deduplicated size
  357. This archive: 20 B 10 B 10 B"""
  358. s = "{0.osize_fmt}".format(stats)
  359. assert s == "20 B"
  360. # kind of redundant, but id is variable so we can't match reliably
  361. assert repr(stats) == '<Statistics object at {:#x} (20, 10, 10)>'.format(id(stats))
  362. def test_file_size():
  363. """test the size formatting routines"""
  364. si_size_map = {
  365. 0: '0 B', # no rounding necessary for those
  366. 1: '1 B',
  367. 142: '142 B',
  368. 999: '999 B',
  369. 1000: '1.00 kB', # rounding starts here
  370. 1001: '1.00 kB', # should be rounded away
  371. 1234: '1.23 kB', # should be rounded down
  372. 1235: '1.24 kB', # should be rounded up
  373. 1010: '1.01 kB', # rounded down as well
  374. 999990000: '999.99 MB', # rounded down
  375. 999990001: '999.99 MB', # rounded down
  376. 999995000: '1.00 GB', # rounded up to next unit
  377. 10**6: '1.00 MB', # and all the remaining units, megabytes
  378. 10**9: '1.00 GB', # gigabytes
  379. 10**12: '1.00 TB', # terabytes
  380. 10**15: '1.00 PB', # petabytes
  381. 10**18: '1.00 EB', # exabytes
  382. 10**21: '1.00 ZB', # zottabytes
  383. 10**24: '1.00 YB', # yottabytes
  384. }
  385. for size, fmt in si_size_map.items():
  386. assert format_file_size(size) == fmt
  387. def test_file_size_precision():
  388. assert format_file_size(1234, precision=1) == '1.2 kB' # rounded down
  389. assert format_file_size(1254, precision=1) == '1.3 kB' # rounded up
  390. assert format_file_size(999990000, precision=1) == '1.0 GB' # and not 999.9 MB or 1000.0 MB
  391. def test_is_slow_msgpack():
  392. saved_packer = msgpack.Packer
  393. try:
  394. msgpack.Packer = msgpack.fallback.Packer
  395. assert is_slow_msgpack()
  396. finally:
  397. msgpack.Packer = saved_packer
  398. # this assumes that we have fast msgpack on test platform:
  399. assert not is_slow_msgpack()
  400. def test_yes_simple():
  401. input = FakeInputs(['y', 'Y', 'yes', 'Yes', ])
  402. assert yes(input=input)
  403. assert yes(input=input)
  404. assert yes(input=input)
  405. assert yes(input=input)
  406. input = FakeInputs(['n', 'N', 'no', 'No', ])
  407. assert not yes(input=input)
  408. assert not yes(input=input)
  409. assert not yes(input=input)
  410. assert not yes(input=input)
  411. def test_yes_custom():
  412. input = FakeInputs(['YES', 'SURE', 'NOPE', ])
  413. assert yes(truish=('YES', ), input=input)
  414. assert yes(truish=('SURE', ), input=input)
  415. assert not yes(falsish=('NOPE', ), input=input)
  416. def test_yes_env():
  417. input = FakeInputs(['n', 'n'])
  418. with environment_variable(OVERRIDE_THIS='nonempty'):
  419. assert yes(env_var_override='OVERRIDE_THIS', input=input)
  420. with environment_variable(OVERRIDE_THIS=None): # env not set
  421. assert not yes(env_var_override='OVERRIDE_THIS', input=input)
  422. def test_yes_defaults():
  423. input = FakeInputs(['invalid', '', ' '])
  424. assert not yes(input=input) # default=False
  425. assert not yes(input=input)
  426. assert not yes(input=input)
  427. input = FakeInputs(['invalid', '', ' '])
  428. assert yes(default=True, input=input)
  429. assert yes(default=True, input=input)
  430. assert yes(default=True, input=input)
  431. ifile = StringIO()
  432. assert yes(default_notty=True, ifile=ifile)
  433. assert not yes(default_notty=False, ifile=ifile)
  434. input = FakeInputs([])
  435. assert yes(default_eof=True, input=input)
  436. assert not yes(default_eof=False, input=input)
  437. with pytest.raises(ValueError):
  438. yes(default=None)
  439. with pytest.raises(ValueError):
  440. yes(default_notty='invalid')
  441. with pytest.raises(ValueError):
  442. yes(default_eof='invalid')
  443. def test_yes_retry():
  444. input = FakeInputs(['foo', 'bar', 'y', ])
  445. assert yes(retry_msg='Retry: ', input=input)
  446. input = FakeInputs(['foo', 'bar', 'N', ])
  447. assert not yes(retry_msg='Retry: ', input=input)
  448. def test_yes_output(capfd):
  449. input = FakeInputs(['invalid', 'y', 'n'])
  450. assert yes(msg='intro-msg', false_msg='false-msg', true_msg='true-msg', retry_msg='retry-msg', input=input)
  451. out, err = capfd.readouterr()
  452. assert out == ''
  453. assert 'intro-msg' in err
  454. assert 'retry-msg' in err
  455. assert 'true-msg' in err
  456. assert not yes(msg='intro-msg', false_msg='false-msg', true_msg='true-msg', retry_msg='retry-msg', input=input)
  457. out, err = capfd.readouterr()
  458. assert out == ''
  459. assert 'intro-msg' in err
  460. assert 'retry-msg' not in err
  461. assert 'false-msg' in err