123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482 |
- import hashlib
- from time import mktime, strptime
- from datetime import datetime, timezone, timedelta
- from io import StringIO
- import os
- import pytest
- import sys
- import msgpack
- from ..helpers import adjust_patterns, exclude_path, Location, format_file_size, format_timedelta, IncludePattern, ExcludePattern, make_path_safe, \
- prune_within, prune_split, get_cache_dir, Statistics, \
- StableDict, int_to_bigint, bigint_to_int, parse_timestamp, CompressionSpec, ChunkerParams
- from . import BaseTestCase
- class BigIntTestCase(BaseTestCase):
- def test_bigint(self):
- self.assert_equal(int_to_bigint(0), 0)
- self.assert_equal(int_to_bigint(2**63-1), 2**63-1)
- self.assert_equal(int_to_bigint(-2**63+1), -2**63+1)
- self.assert_equal(int_to_bigint(2**63), b'\x00\x00\x00\x00\x00\x00\x00\x80\x00')
- self.assert_equal(int_to_bigint(-2**63), b'\x00\x00\x00\x00\x00\x00\x00\x80\xff')
- self.assert_equal(bigint_to_int(int_to_bigint(-2**70)), -2**70)
- self.assert_equal(bigint_to_int(int_to_bigint(2**70)), 2**70)
- class TestLocationWithoutEnv:
- def test_ssh(self, monkeypatch):
- monkeypatch.delenv('BORG_REPO', raising=False)
- assert repr(Location('ssh://user@host:1234/some/path::archive')) == \
- "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path', archive='archive')"
- assert repr(Location('ssh://user@host:1234/some/path')) == \
- "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path', archive=None)"
- def test_file(self, monkeypatch):
- monkeypatch.delenv('BORG_REPO', raising=False)
- assert repr(Location('file:///some/path::archive')) == \
- "Location(proto='file', user=None, host=None, port=None, path='/some/path', archive='archive')"
- assert repr(Location('file:///some/path')) == \
- "Location(proto='file', user=None, host=None, port=None, path='/some/path', archive=None)"
- def test_scp(self, monkeypatch):
- monkeypatch.delenv('BORG_REPO', raising=False)
- assert repr(Location('user@host:/some/path::archive')) == \
- "Location(proto='ssh', user='user', host='host', port=None, path='/some/path', archive='archive')"
- assert repr(Location('user@host:/some/path')) == \
- "Location(proto='ssh', user='user', host='host', port=None, path='/some/path', archive=None)"
- def test_folder(self, monkeypatch):
- monkeypatch.delenv('BORG_REPO', raising=False)
- assert repr(Location('path::archive')) == \
- "Location(proto='file', user=None, host=None, port=None, path='path', archive='archive')"
- assert repr(Location('path')) == \
- "Location(proto='file', user=None, host=None, port=None, path='path', archive=None)"
- def test_abspath(self, monkeypatch):
- monkeypatch.delenv('BORG_REPO', raising=False)
- assert repr(Location('/some/absolute/path::archive')) == \
- "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path', archive='archive')"
- assert repr(Location('/some/absolute/path')) == \
- "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path', archive=None)"
- def test_relpath(self, monkeypatch):
- monkeypatch.delenv('BORG_REPO', raising=False)
- assert repr(Location('some/relative/path::archive')) == \
- "Location(proto='file', user=None, host=None, port=None, path='some/relative/path', archive='archive')"
- assert repr(Location('some/relative/path')) == \
- "Location(proto='file', user=None, host=None, port=None, path='some/relative/path', archive=None)"
- def test_underspecified(self, monkeypatch):
- monkeypatch.delenv('BORG_REPO', raising=False)
- with pytest.raises(ValueError):
- Location('::archive')
- with pytest.raises(ValueError):
- Location('::')
- with pytest.raises(ValueError):
- Location()
- def test_no_double_colon(self, monkeypatch):
- monkeypatch.delenv('BORG_REPO', raising=False)
- with pytest.raises(ValueError):
- Location('ssh://localhost:22/path:archive')
- def test_no_slashes(self, monkeypatch):
- monkeypatch.delenv('BORG_REPO', raising=False)
- with pytest.raises(ValueError):
- Location('/some/path/to/repo::archive_name_with/slashes/is_invalid')
- def test_canonical_path(self, monkeypatch):
- monkeypatch.delenv('BORG_REPO', raising=False)
- locations = ['some/path::archive', 'file://some/path::archive', 'host:some/path::archive',
- 'host:~user/some/path::archive', 'ssh://host/some/path::archive',
- 'ssh://user@host:1234/some/path::archive']
- for location in locations:
- assert Location(location).canonical_path() == \
- Location(Location(location).canonical_path()).canonical_path()
- class TestLocationWithEnv:
- def test_ssh(self, monkeypatch):
- monkeypatch.setenv('BORG_REPO', 'ssh://user@host:1234/some/path')
- assert repr(Location('::archive')) == \
- "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path', archive='archive')"
- assert repr(Location()) == \
- "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path', archive=None)"
- def test_file(self, monkeypatch):
- monkeypatch.setenv('BORG_REPO', 'file:///some/path')
- assert repr(Location('::archive')) == \
- "Location(proto='file', user=None, host=None, port=None, path='/some/path', archive='archive')"
- assert repr(Location()) == \
- "Location(proto='file', user=None, host=None, port=None, path='/some/path', archive=None)"
- def test_scp(self, monkeypatch):
- monkeypatch.setenv('BORG_REPO', 'user@host:/some/path')
- assert repr(Location('::archive')) == \
- "Location(proto='ssh', user='user', host='host', port=None, path='/some/path', archive='archive')"
- assert repr(Location()) == \
- "Location(proto='ssh', user='user', host='host', port=None, path='/some/path', archive=None)"
- def test_folder(self, monkeypatch):
- monkeypatch.setenv('BORG_REPO', 'path')
- assert repr(Location('::archive')) == \
- "Location(proto='file', user=None, host=None, port=None, path='path', archive='archive')"
- assert repr(Location()) == \
- "Location(proto='file', user=None, host=None, port=None, path='path', archive=None)"
- def test_abspath(self, monkeypatch):
- monkeypatch.setenv('BORG_REPO', '/some/absolute/path')
- assert repr(Location('::archive')) == \
- "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path', archive='archive')"
- assert repr(Location()) == \
- "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path', archive=None)"
- def test_relpath(self, monkeypatch):
- monkeypatch.setenv('BORG_REPO', 'some/relative/path')
- assert repr(Location('::archive')) == \
- "Location(proto='file', user=None, host=None, port=None, path='some/relative/path', archive='archive')"
- assert repr(Location()) == \
- "Location(proto='file', user=None, host=None, port=None, path='some/relative/path', archive=None)"
- def test_no_slashes(self, monkeypatch):
- monkeypatch.setenv('BORG_REPO', '/some/absolute/path')
- with pytest.raises(ValueError):
- Location('::archive_name_with/slashes/is_invalid')
- class FormatTimedeltaTestCase(BaseTestCase):
- def test(self):
- t0 = datetime(2001, 1, 1, 10, 20, 3, 0)
- t1 = datetime(2001, 1, 1, 12, 20, 4, 100000)
- self.assert_equal(
- format_timedelta(t1 - t0),
- '2 hours 1.10 seconds'
- )
- class PatternTestCase(BaseTestCase):
- files = [
- '/etc/passwd', '/etc/hosts', '/home',
- '/home/user/.profile', '/home/user/.bashrc',
- '/home/user2/.profile', '/home/user2/public_html/index.html',
- '/var/log/messages', '/var/log/dmesg',
- ]
- def evaluate(self, paths, excludes):
- patterns = adjust_patterns(paths, [ExcludePattern(p) for p in excludes])
- return [path for path in self.files if not exclude_path(path, patterns)]
- def test(self):
- self.assert_equal(self.evaluate(['/'], []), self.files)
- self.assert_equal(self.evaluate([], []), self.files)
- self.assert_equal(self.evaluate(['/'], ['/h']), self.files)
- self.assert_equal(self.evaluate(['/'], ['/home']),
- ['/etc/passwd', '/etc/hosts', '/var/log/messages', '/var/log/dmesg'])
- self.assert_equal(self.evaluate(['/'], ['/home/']),
- ['/etc/passwd', '/etc/hosts', '/home', '/var/log/messages', '/var/log/dmesg'])
- self.assert_equal(self.evaluate(['/home/u'], []), [])
- self.assert_equal(self.evaluate(['/', '/home', '/etc/hosts'], ['/']), [])
- self.assert_equal(self.evaluate(['/home/'], ['/home/user2']),
- ['/home', '/home/user/.profile', '/home/user/.bashrc'])
- self.assert_equal(self.evaluate(['/'], ['*.profile', '/var/log']),
- ['/etc/passwd', '/etc/hosts', '/home', '/home/user/.bashrc', '/home/user2/public_html/index.html'])
- self.assert_equal(self.evaluate(['/'], ['/home/*/public_html', '*.profile', '*/log/*']),
- ['/etc/passwd', '/etc/hosts', '/home', '/home/user/.bashrc'])
- self.assert_equal(self.evaluate(['/etc/', '/var'], ['dmesg']),
- ['/etc/passwd', '/etc/hosts', '/var/log/messages', '/var/log/dmesg'])
- @pytest.mark.skipif(sys.platform in ('darwin',), reason='all but OS X test')
- class PatternNonAsciiTestCase(BaseTestCase):
- def testComposedUnicode(self):
- pattern = 'b\N{LATIN SMALL LETTER A WITH ACUTE}'
- i = IncludePattern(pattern)
- e = ExcludePattern(pattern)
- assert i.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
- assert not i.match("ba\N{COMBINING ACUTE ACCENT}/foo")
- assert e.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
- assert not e.match("ba\N{COMBINING ACUTE ACCENT}/foo")
- def testDecomposedUnicode(self):
- pattern = 'ba\N{COMBINING ACUTE ACCENT}'
- i = IncludePattern(pattern)
- e = ExcludePattern(pattern)
- assert not i.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
- assert i.match("ba\N{COMBINING ACUTE ACCENT}/foo")
- assert not e.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
- assert e.match("ba\N{COMBINING ACUTE ACCENT}/foo")
- def testInvalidUnicode(self):
- pattern = str(b'ba\x80', 'latin1')
- i = IncludePattern(pattern)
- e = ExcludePattern(pattern)
- assert not i.match("ba/foo")
- assert i.match(str(b"ba\x80/foo", 'latin1'))
- assert not e.match("ba/foo")
- assert e.match(str(b"ba\x80/foo", 'latin1'))
- @pytest.mark.skipif(sys.platform not in ('darwin',), reason='OS X test')
- class OSXPatternNormalizationTestCase(BaseTestCase):
- def testComposedUnicode(self):
- pattern = 'b\N{LATIN SMALL LETTER A WITH ACUTE}'
- i = IncludePattern(pattern)
- e = ExcludePattern(pattern)
- assert i.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
- assert i.match("ba\N{COMBINING ACUTE ACCENT}/foo")
- assert e.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
- assert e.match("ba\N{COMBINING ACUTE ACCENT}/foo")
- def testDecomposedUnicode(self):
- pattern = 'ba\N{COMBINING ACUTE ACCENT}'
- i = IncludePattern(pattern)
- e = ExcludePattern(pattern)
- assert i.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
- assert i.match("ba\N{COMBINING ACUTE ACCENT}/foo")
- assert e.match("b\N{LATIN SMALL LETTER A WITH ACUTE}/foo")
- assert e.match("ba\N{COMBINING ACUTE ACCENT}/foo")
- def testInvalidUnicode(self):
- pattern = str(b'ba\x80', 'latin1')
- i = IncludePattern(pattern)
- e = ExcludePattern(pattern)
- assert not i.match("ba/foo")
- assert i.match(str(b"ba\x80/foo", 'latin1'))
- assert not e.match("ba/foo")
- assert e.match(str(b"ba\x80/foo", 'latin1'))
- def test_compression_specs():
- with pytest.raises(ValueError):
- CompressionSpec('')
- assert CompressionSpec('0') == dict(name='zlib', level=0)
- assert CompressionSpec('1') == dict(name='zlib', level=1)
- assert CompressionSpec('9') == dict(name='zlib', level=9)
- with pytest.raises(ValueError):
- CompressionSpec('10')
- assert CompressionSpec('none') == dict(name='none')
- assert CompressionSpec('lz4') == dict(name='lz4')
- assert CompressionSpec('zlib') == dict(name='zlib', level=6)
- assert CompressionSpec('zlib,0') == dict(name='zlib', level=0)
- assert CompressionSpec('zlib,9') == dict(name='zlib', level=9)
- with pytest.raises(ValueError):
- CompressionSpec('zlib,9,invalid')
- assert CompressionSpec('lzma') == dict(name='lzma', level=6)
- assert CompressionSpec('lzma,0') == dict(name='lzma', level=0)
- assert CompressionSpec('lzma,9') == dict(name='lzma', level=9)
- with pytest.raises(ValueError):
- CompressionSpec('lzma,9,invalid')
- with pytest.raises(ValueError):
- CompressionSpec('invalid')
- def test_chunkerparams():
- assert ChunkerParams('19,23,21,4095') == (19, 23, 21, 4095)
- assert ChunkerParams('10,23,16,4095') == (10, 23, 16, 4095)
- with pytest.raises(ValueError):
- ChunkerParams('19,24,21,4095')
- class MakePathSafeTestCase(BaseTestCase):
- def test(self):
- self.assert_equal(make_path_safe('/foo/bar'), 'foo/bar')
- self.assert_equal(make_path_safe('/foo/bar'), 'foo/bar')
- self.assert_equal(make_path_safe('/f/bar'), 'f/bar')
- self.assert_equal(make_path_safe('fo/bar'), 'fo/bar')
- self.assert_equal(make_path_safe('../foo/bar'), 'foo/bar')
- self.assert_equal(make_path_safe('../../foo/bar'), 'foo/bar')
- self.assert_equal(make_path_safe('/'), '.')
- self.assert_equal(make_path_safe('/'), '.')
- class MockArchive:
- def __init__(self, ts):
- self.ts = ts
- def __repr__(self):
- return repr(self.ts)
- class PruneSplitTestCase(BaseTestCase):
- def test(self):
- def local_to_UTC(month, day):
- """Convert noon on the month and day in 2013 to UTC."""
- seconds = mktime(strptime('2013-%02d-%02d 12:00' % (month, day), '%Y-%m-%d %H:%M'))
- return datetime.fromtimestamp(seconds, tz=timezone.utc)
- def subset(lst, indices):
- return {lst[i] for i in indices}
- def dotest(test_archives, n, skip, indices):
- for ta in test_archives, reversed(test_archives):
- self.assert_equal(set(prune_split(ta, '%Y-%m', n, skip)),
- subset(test_archives, indices))
- test_pairs = [(1, 1), (2, 1), (2, 28), (3, 1), (3, 2), (3, 31), (5, 1)]
- test_dates = [local_to_UTC(month, day) for month, day in test_pairs]
- test_archives = [MockArchive(date) for date in test_dates]
- dotest(test_archives, 3, [], [6, 5, 2])
- dotest(test_archives, -1, [], [6, 5, 2, 0])
- dotest(test_archives, 3, [test_archives[6]], [5, 2, 0])
- dotest(test_archives, 3, [test_archives[5]], [6, 2, 0])
- dotest(test_archives, 3, [test_archives[4]], [6, 5, 2])
- dotest(test_archives, 0, [], [])
- class PruneWithinTestCase(BaseTestCase):
- def test(self):
- def subset(lst, indices):
- return {lst[i] for i in indices}
- def dotest(test_archives, within, indices):
- for ta in test_archives, reversed(test_archives):
- self.assert_equal(set(prune_within(ta, within)),
- subset(test_archives, indices))
- # 1 minute, 1.5 hours, 2.5 hours, 3.5 hours, 25 hours, 49 hours
- test_offsets = [60, 90*60, 150*60, 210*60, 25*60*60, 49*60*60]
- now = datetime.now(timezone.utc)
- test_dates = [now - timedelta(seconds=s) for s in test_offsets]
- test_archives = [MockArchive(date) for date in test_dates]
- dotest(test_archives, '1H', [0])
- dotest(test_archives, '2H', [0, 1])
- dotest(test_archives, '3H', [0, 1, 2])
- dotest(test_archives, '24H', [0, 1, 2, 3])
- dotest(test_archives, '26H', [0, 1, 2, 3, 4])
- dotest(test_archives, '2d', [0, 1, 2, 3, 4])
- dotest(test_archives, '50H', [0, 1, 2, 3, 4, 5])
- dotest(test_archives, '3d', [0, 1, 2, 3, 4, 5])
- dotest(test_archives, '1w', [0, 1, 2, 3, 4, 5])
- dotest(test_archives, '1m', [0, 1, 2, 3, 4, 5])
- dotest(test_archives, '1y', [0, 1, 2, 3, 4, 5])
- class StableDictTestCase(BaseTestCase):
- def test(self):
- d = StableDict(foo=1, bar=2, boo=3, baz=4)
- self.assert_equal(list(d.items()), [('bar', 2), ('baz', 4), ('boo', 3), ('foo', 1)])
- self.assert_equal(hashlib.md5(msgpack.packb(d)).hexdigest(), 'fc78df42cd60691b3ac3dd2a2b39903f')
- class TestParseTimestamp(BaseTestCase):
- def test(self):
- self.assert_equal(parse_timestamp('2015-04-19T20:25:00.226410'), datetime(2015, 4, 19, 20, 25, 0, 226410, timezone.utc))
- self.assert_equal(parse_timestamp('2015-04-19T20:25:00'), datetime(2015, 4, 19, 20, 25, 0, 0, timezone.utc))
- def test_get_cache_dir():
- """test that get_cache_dir respects environement"""
- # reset BORG_CACHE_DIR in order to test default
- old_env = None
- if os.environ.get('BORG_CACHE_DIR'):
- old_env = os.environ['BORG_CACHE_DIR']
- del(os.environ['BORG_CACHE_DIR'])
- assert get_cache_dir() == os.path.join(os.path.expanduser('~'), '.cache', 'borg')
- os.environ['XDG_CACHE_HOME'] = '/var/tmp/.cache'
- assert get_cache_dir() == os.path.join('/var/tmp/.cache', 'borg')
- os.environ['BORG_CACHE_DIR'] = '/var/tmp'
- assert get_cache_dir() == '/var/tmp'
- # reset old env
- if old_env is not None:
- os.environ['BORG_CACHE_DIR'] = old_env
- @pytest.fixture()
- def stats():
- stats = Statistics()
- stats.update(20, 10, unique=True)
- return stats
- def test_stats_basic(stats):
- assert stats.osize == 20
- assert stats.csize == stats.usize == 10
- stats.update(20, 10, unique=False)
- assert stats.osize == 40
- assert stats.csize == 20
- assert stats.usize == 10
- def tests_stats_progress(stats, columns=80):
- os.environ['COLUMNS'] = str(columns)
- out = StringIO()
- stats.show_progress(stream=out)
- s = '20 B O 10 B C 10 B D 0 N '
- buf = ' ' * (columns - len(s))
- assert out.getvalue() == s + buf + "\r"
- out = StringIO()
- stats.update(10**3, 0, unique=False)
- stats.show_progress(item={b'path': 'foo'}, final=False, stream=out)
- s = '1.02 kB O 10 B C 10 B D 0 N foo'
- buf = ' ' * (columns - len(s))
- assert out.getvalue() == s + buf + "\r"
- out = StringIO()
- stats.show_progress(item={b'path': 'foo'*40}, final=False, stream=out)
- s = '1.02 kB O 10 B C 10 B D 0 N foofoofoofoofoofoofoofo...oofoofoofoofoofoofoofoofoo'
- buf = ' ' * (columns - len(s))
- assert out.getvalue() == s + buf + "\r"
- def test_stats_format(stats):
- assert str(stats) == """\
- Original size Compressed size Deduplicated size
- This archive: 20 B 10 B 10 B"""
- s = "{0.osize_fmt}".format(stats)
- assert s == "20 B"
- # kind of redundant, but id is variable so we can't match reliably
- assert repr(stats) == '<Statistics object at {:#x} (20, 10, 10)>'.format(id(stats))
- def test_file_size():
- """test the size formatting routines"""
- si_size_map = {
- 0: '0 B', # no rounding necessary for those
- 1: '1 B',
- 142: '142 B',
- 999: '999 B',
- 1000: '1.00 kB', # rounding starts here
- 1001: '1.00 kB', # should be rounded away
- 1234: '1.23 kB', # should be rounded down
- 1235: '1.24 kB', # should be rounded up
- 1010: '1.01 kB', # rounded down as well
- 999990000: '999.99 MB', # rounded down
- 999990001: '999.99 MB', # rounded down
- 999995000: '1.00 GB', # rounded up to next unit
- 10**6: '1.00 MB', # and all the remaining units, megabytes
- 10**9: '1.00 GB', # gigabytes
- 10**12: '1.00 TB', # terabytes
- 10**15: '1.00 PB', # petabytes
- 10**18: '1.00 EB', # exabytes
- 10**21: '1.00 ZB', # zottabytes
- 10**24: '1.00 YB', # yottabytes
- }
- for size, fmt in si_size_map.items():
- assert format_file_size(size) == fmt
- def test_file_size_precision():
- assert format_file_size(1234, precision=1) == '1.2 kB' # rounded down
- assert format_file_size(1254, precision=1) == '1.3 kB' # rounded up
- assert format_file_size(999990000, precision=1) == '1.0 GB' # and not 999.9 MB or 1000.0 MB
|