repository.py 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179
  1. import io
  2. import logging
  3. import os
  4. import shutil
  5. import sys
  6. import tempfile
  7. from unittest.mock import patch
  8. import pytest
  9. from ..hashindex import NSIndex
  10. from ..helpers import Location
  11. from ..helpers import IntegrityError
  12. from ..helpers import msgpack
  13. from ..locking import Lock, LockFailed
  14. from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, handle_remote_line
  15. from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT2, TAG_PUT, TAG_COMMIT
  16. from ..repoobj import RepoObj
  17. from . import BaseTestCase
  18. from .hashindex import H
  19. UNSPECIFIED = object() # for default values where we can't use None
  20. def fchunk(data, meta=b""):
  21. # create a raw chunk that has valid RepoObj layout, but does not use encryption or compression.
  22. meta_len = RepoObj.meta_len_hdr.pack(len(meta))
  23. assert isinstance(data, bytes)
  24. chunk = meta_len + meta + data
  25. return chunk
  26. def pchunk(chunk):
  27. # parse data and meta from a raw chunk made by fchunk
  28. meta_len_size = RepoObj.meta_len_hdr.size
  29. meta_len = chunk[:meta_len_size]
  30. meta_len = RepoObj.meta_len_hdr.unpack(meta_len)[0]
  31. meta = chunk[meta_len_size : meta_len_size + meta_len]
  32. data = chunk[meta_len_size + meta_len :]
  33. return data, meta
  34. def pdchunk(chunk):
  35. # parse only data from a raw chunk made by fchunk
  36. return pchunk(chunk)[0]
  37. class RepositoryTestCaseBase(BaseTestCase):
  38. key_size = 32
  39. exclusive = True
  40. def open(self, create=False, exclusive=UNSPECIFIED):
  41. if exclusive is UNSPECIFIED:
  42. exclusive = self.exclusive
  43. return Repository(os.path.join(self.tmppath, "repository"), exclusive=exclusive, create=create)
  44. def setUp(self):
  45. self.tmppath = tempfile.mkdtemp()
  46. self.repository = self.open(create=True)
  47. self.repository.__enter__()
  48. def tearDown(self):
  49. self.repository.__exit__(None, None, None)
  50. shutil.rmtree(self.tmppath)
  51. def reopen(self, exclusive=UNSPECIFIED):
  52. self.repository.close()
  53. self.repository = self.open(exclusive=exclusive)
  54. def add_keys(self):
  55. self.repository.put(H(0), fchunk(b"foo"))
  56. self.repository.put(H(1), fchunk(b"bar"))
  57. self.repository.put(H(3), fchunk(b"bar"))
  58. self.repository.commit(compact=False)
  59. self.repository.put(H(1), fchunk(b"bar2"))
  60. self.repository.put(H(2), fchunk(b"boo"))
  61. self.repository.delete(H(3))
  62. def repo_dump(self, label=None):
  63. label = label + ": " if label is not None else ""
  64. H_trans = {H(i): i for i in range(10)}
  65. H_trans[None] = -1 # key == None appears in commits
  66. tag_trans = {TAG_PUT2: "put2", TAG_PUT: "put", TAG_DELETE: "del", TAG_COMMIT: "comm"}
  67. for segment, fn in self.repository.io.segment_iterator():
  68. for tag, key, offset, size, _ in self.repository.io.iter_objects(segment):
  69. print("%s%s H(%d) -> %s[%d..+%d]" % (label, tag_trans[tag], H_trans[key], fn, offset, size))
  70. print()
  71. class RepositoryTestCase(RepositoryTestCaseBase):
  72. def test1(self):
  73. for x in range(100):
  74. self.repository.put(H(x), fchunk(b"SOMEDATA"))
  75. key50 = H(50)
  76. self.assert_equal(pdchunk(self.repository.get(key50)), b"SOMEDATA")
  77. self.repository.delete(key50)
  78. self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(key50))
  79. self.repository.commit(compact=False)
  80. self.repository.close()
  81. with self.open() as repository2:
  82. self.assert_raises(Repository.ObjectNotFound, lambda: repository2.get(key50))
  83. for x in range(100):
  84. if x == 50:
  85. continue
  86. self.assert_equal(pdchunk(repository2.get(H(x))), b"SOMEDATA")
  87. def test2(self):
  88. """Test multiple sequential transactions"""
  89. self.repository.put(H(0), fchunk(b"foo"))
  90. self.repository.put(H(1), fchunk(b"foo"))
  91. self.repository.commit(compact=False)
  92. self.repository.delete(H(0))
  93. self.repository.put(H(1), fchunk(b"bar"))
  94. self.repository.commit(compact=False)
  95. self.assert_equal(pdchunk(self.repository.get(H(1))), b"bar")
  96. def test_read_data(self):
  97. meta, data = b"meta", b"data"
  98. meta_len = RepoObj.meta_len_hdr.pack(len(meta))
  99. chunk_complete = meta_len + meta + data
  100. chunk_short = meta_len + meta
  101. self.repository.put(H(0), chunk_complete)
  102. self.repository.commit(compact=False)
  103. self.assert_equal(self.repository.get(H(0)), chunk_complete)
  104. self.assert_equal(self.repository.get(H(0), read_data=True), chunk_complete)
  105. self.assert_equal(self.repository.get(H(0), read_data=False), chunk_short)
  106. def test_consistency(self):
  107. """Test cache consistency"""
  108. self.repository.put(H(0), fchunk(b"foo"))
  109. self.assert_equal(pdchunk(self.repository.get(H(0))), b"foo")
  110. self.repository.put(H(0), fchunk(b"foo2"))
  111. self.assert_equal(pdchunk(self.repository.get(H(0))), b"foo2")
  112. self.repository.put(H(0), fchunk(b"bar"))
  113. self.assert_equal(pdchunk(self.repository.get(H(0))), b"bar")
  114. self.repository.delete(H(0))
  115. self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(H(0)))
  116. def test_consistency2(self):
  117. """Test cache consistency2"""
  118. self.repository.put(H(0), fchunk(b"foo"))
  119. self.assert_equal(pdchunk(self.repository.get(H(0))), b"foo")
  120. self.repository.commit(compact=False)
  121. self.repository.put(H(0), fchunk(b"foo2"))
  122. self.assert_equal(pdchunk(self.repository.get(H(0))), b"foo2")
  123. self.repository.rollback()
  124. self.assert_equal(pdchunk(self.repository.get(H(0))), b"foo")
  125. def test_overwrite_in_same_transaction(self):
  126. """Test cache consistency2"""
  127. self.repository.put(H(0), fchunk(b"foo"))
  128. self.repository.put(H(0), fchunk(b"foo2"))
  129. self.repository.commit(compact=False)
  130. self.assert_equal(pdchunk(self.repository.get(H(0))), b"foo2")
  131. def test_single_kind_transactions(self):
  132. # put
  133. self.repository.put(H(0), fchunk(b"foo"))
  134. self.repository.commit(compact=False)
  135. self.repository.close()
  136. # replace
  137. self.repository = self.open()
  138. with self.repository:
  139. self.repository.put(H(0), fchunk(b"bar"))
  140. self.repository.commit(compact=False)
  141. # delete
  142. self.repository = self.open()
  143. with self.repository:
  144. self.repository.delete(H(0))
  145. self.repository.commit(compact=False)
  146. def test_list(self):
  147. for x in range(100):
  148. self.repository.put(H(x), fchunk(b"SOMEDATA"))
  149. self.repository.commit(compact=False)
  150. all = self.repository.list()
  151. self.assert_equal(len(all), 100)
  152. first_half = self.repository.list(limit=50)
  153. self.assert_equal(len(first_half), 50)
  154. self.assert_equal(first_half, all[:50])
  155. second_half = self.repository.list(marker=first_half[-1])
  156. self.assert_equal(len(second_half), 50)
  157. self.assert_equal(second_half, all[50:])
  158. self.assert_equal(len(self.repository.list(limit=50)), 50)
  159. def test_scan(self):
  160. for x in range(100):
  161. self.repository.put(H(x), fchunk(b"SOMEDATA"))
  162. self.repository.commit(compact=False)
  163. all, _ = self.repository.scan()
  164. assert len(all) == 100
  165. first_half, state = self.repository.scan(limit=50)
  166. assert len(first_half) == 50
  167. assert first_half == all[:50]
  168. second_half, _ = self.repository.scan(state=state)
  169. assert len(second_half) == 50
  170. assert second_half == all[50:]
  171. # check result order == on-disk order (which is hash order)
  172. for x in range(100):
  173. assert all[x] == H(x)
  174. def test_scan_modify(self):
  175. for x in range(100):
  176. self.repository.put(H(x), fchunk(b"ORIGINAL"))
  177. self.repository.commit(compact=False)
  178. # now we scan, read and modify chunks at the same time
  179. count = 0
  180. ids, _ = self.repository.scan()
  181. for id in ids:
  182. # scan results are in same order as we put the chunks into the repo (into the segment file)
  183. assert id == H(count)
  184. chunk = self.repository.get(id)
  185. # check that we **only** get data that was committed when we started scanning
  186. # and that we do not run into the new data we put into the repo.
  187. assert pdchunk(chunk) == b"ORIGINAL"
  188. count += 1
  189. self.repository.put(id, fchunk(b"MODIFIED"))
  190. assert count == 100
  191. self.repository.commit()
  192. # now we have committed all the modified chunks, and **only** must get the modified ones.
  193. count = 0
  194. ids, _ = self.repository.scan()
  195. for id in ids:
  196. # scan results are in same order as we put the chunks into the repo (into the segment file)
  197. assert id == H(count)
  198. chunk = self.repository.get(id)
  199. assert pdchunk(chunk) == b"MODIFIED"
  200. count += 1
  201. assert count == 100
  202. def test_max_data_size(self):
  203. max_data = b"x" * (MAX_DATA_SIZE - RepoObj.meta_len_hdr.size)
  204. self.repository.put(H(0), fchunk(max_data))
  205. self.assert_equal(pdchunk(self.repository.get(H(0))), max_data)
  206. self.assert_raises(IntegrityError, lambda: self.repository.put(H(1), fchunk(max_data + b"x")))
  207. def test_set_flags(self):
  208. id = H(0)
  209. self.repository.put(id, fchunk(b""))
  210. self.assert_equal(self.repository.flags(id), 0x00000000) # init == all zero
  211. self.repository.flags(id, mask=0x00000001, value=0x00000001)
  212. self.assert_equal(self.repository.flags(id), 0x00000001)
  213. self.repository.flags(id, mask=0x00000002, value=0x00000002)
  214. self.assert_equal(self.repository.flags(id), 0x00000003)
  215. self.repository.flags(id, mask=0x00000001, value=0x00000000)
  216. self.assert_equal(self.repository.flags(id), 0x00000002)
  217. self.repository.flags(id, mask=0x00000002, value=0x00000000)
  218. self.assert_equal(self.repository.flags(id), 0x00000000)
  219. def test_get_flags(self):
  220. id = H(0)
  221. self.repository.put(id, fchunk(b""))
  222. self.assert_equal(self.repository.flags(id), 0x00000000) # init == all zero
  223. self.repository.flags(id, mask=0xC0000003, value=0x80000001)
  224. self.assert_equal(self.repository.flags(id, mask=0x00000001), 0x00000001)
  225. self.assert_equal(self.repository.flags(id, mask=0x00000002), 0x00000000)
  226. self.assert_equal(self.repository.flags(id, mask=0x40000008), 0x00000000)
  227. self.assert_equal(self.repository.flags(id, mask=0x80000000), 0x80000000)
  228. def test_flags_many(self):
  229. ids_flagged = [H(0), H(1)]
  230. ids_default_flags = [H(2), H(3)]
  231. [self.repository.put(id, fchunk(b"")) for id in ids_flagged + ids_default_flags]
  232. self.repository.flags_many(ids_flagged, mask=0xFFFFFFFF, value=0xDEADBEEF)
  233. self.assert_equal(list(self.repository.flags_many(ids_default_flags)), [0x00000000, 0x00000000])
  234. self.assert_equal(list(self.repository.flags_many(ids_flagged)), [0xDEADBEEF, 0xDEADBEEF])
  235. self.assert_equal(list(self.repository.flags_many(ids_flagged, mask=0xFFFF0000)), [0xDEAD0000, 0xDEAD0000])
  236. self.assert_equal(list(self.repository.flags_many(ids_flagged, mask=0x0000FFFF)), [0x0000BEEF, 0x0000BEEF])
  237. def test_flags_persistence(self):
  238. self.repository.put(H(0), fchunk(b"default"))
  239. self.repository.put(H(1), fchunk(b"one one zero"))
  240. # we do not set flags for H(0), so we can later check their default state.
  241. self.repository.flags(H(1), mask=0x00000007, value=0x00000006)
  242. self.repository.commit(compact=False)
  243. self.repository.close()
  244. self.repository = self.open()
  245. with self.repository:
  246. # we query all flags to check if the initial flags were all zero and
  247. # only the ones we explicitly set to one are as expected.
  248. self.assert_equal(self.repository.flags(H(0), mask=0xFFFFFFFF), 0x00000000)
  249. self.assert_equal(self.repository.flags(H(1), mask=0xFFFFFFFF), 0x00000006)
  250. class LocalRepositoryTestCase(RepositoryTestCaseBase):
  251. # test case that doesn't work with remote repositories
  252. def _assert_sparse(self):
  253. # The superseded 123456... PUT
  254. assert self.repository.compact[0] == 41 + 8 + len(fchunk(b"123456789"))
  255. # a COMMIT
  256. assert self.repository.compact[1] == 9
  257. # The DELETE issued by the superseding PUT (or issued directly)
  258. assert self.repository.compact[2] == 41
  259. self.repository._rebuild_sparse(0)
  260. assert self.repository.compact[0] == 41 + 8 + len(fchunk(b"123456789")) # 9 is chunk or commit?
  261. def test_sparse1(self):
  262. self.repository.put(H(0), fchunk(b"foo"))
  263. self.repository.put(H(1), fchunk(b"123456789"))
  264. self.repository.commit(compact=False)
  265. self.repository.put(H(1), fchunk(b"bar"))
  266. self._assert_sparse()
  267. def test_sparse2(self):
  268. self.repository.put(H(0), fchunk(b"foo"))
  269. self.repository.put(H(1), fchunk(b"123456789"))
  270. self.repository.commit(compact=False)
  271. self.repository.delete(H(1))
  272. self._assert_sparse()
  273. def test_sparse_delete(self):
  274. ch0 = fchunk(b"1245")
  275. self.repository.put(H(0), ch0)
  276. self.repository.delete(H(0))
  277. self.repository.io._write_fd.sync()
  278. # The on-line tracking works on a per-object basis...
  279. assert self.repository.compact[0] == 41 + 8 + 41 + len(ch0)
  280. self.repository._rebuild_sparse(0)
  281. # ...while _rebuild_sparse can mark whole segments as completely sparse (which then includes the segment magic)
  282. assert self.repository.compact[0] == 41 + 8 + 41 + len(ch0) + len(MAGIC)
  283. self.repository.commit(compact=True)
  284. assert 0 not in [segment for segment, _ in self.repository.io.segment_iterator()]
  285. def test_uncommitted_garbage(self):
  286. # uncommitted garbage should be no problem, it is cleaned up automatically.
  287. # we just have to be careful with invalidation of cached FDs in LoggedIO.
  288. self.repository.put(H(0), fchunk(b"foo"))
  289. self.repository.commit(compact=False)
  290. # write some crap to a uncommitted segment file
  291. last_segment = self.repository.io.get_latest_segment()
  292. with open(self.repository.io.segment_filename(last_segment + 1), "wb") as f:
  293. f.write(MAGIC + b"crapcrapcrap")
  294. self.repository.close()
  295. # usually, opening the repo and starting a transaction should trigger a cleanup.
  296. self.repository = self.open()
  297. with self.repository:
  298. self.repository.put(H(0), fchunk(b"bar")) # this may trigger compact_segments()
  299. self.repository.commit(compact=True)
  300. # the point here is that nothing blows up with an exception.
  301. class RepositoryCommitTestCase(RepositoryTestCaseBase):
  302. def test_replay_of_missing_index(self):
  303. self.add_keys()
  304. for name in os.listdir(self.repository.path):
  305. if name.startswith("index."):
  306. os.unlink(os.path.join(self.repository.path, name))
  307. self.reopen()
  308. with self.repository:
  309. self.assert_equal(len(self.repository), 3)
  310. self.assert_equal(self.repository.check(), True)
  311. def test_crash_before_compact_segments(self):
  312. self.add_keys()
  313. self.repository.compact_segments = None
  314. try:
  315. self.repository.commit(compact=True)
  316. except TypeError:
  317. pass
  318. self.reopen()
  319. with self.repository:
  320. self.assert_equal(len(self.repository), 3)
  321. self.assert_equal(self.repository.check(), True)
  322. def test_crash_before_write_index(self):
  323. self.add_keys()
  324. self.repository.write_index = None
  325. try:
  326. self.repository.commit(compact=False)
  327. except TypeError:
  328. pass
  329. self.reopen()
  330. with self.repository:
  331. self.assert_equal(len(self.repository), 3)
  332. self.assert_equal(self.repository.check(), True)
  333. def test_replay_lock_upgrade_old(self):
  334. self.add_keys()
  335. for name in os.listdir(self.repository.path):
  336. if name.startswith("index."):
  337. os.unlink(os.path.join(self.repository.path, name))
  338. with patch.object(Lock, "upgrade", side_effect=LockFailed) as upgrade:
  339. self.reopen(exclusive=None) # simulate old client that always does lock upgrades
  340. with self.repository:
  341. # the repo is only locked by a shared read lock, but to replay segments,
  342. # we need an exclusive write lock - check if the lock gets upgraded.
  343. self.assert_raises(LockFailed, lambda: len(self.repository))
  344. upgrade.assert_called_once_with()
  345. def test_replay_lock_upgrade(self):
  346. self.add_keys()
  347. for name in os.listdir(self.repository.path):
  348. if name.startswith("index."):
  349. os.unlink(os.path.join(self.repository.path, name))
  350. with patch.object(Lock, "upgrade", side_effect=LockFailed) as upgrade:
  351. self.reopen(exclusive=False) # current client usually does not do lock upgrade, except for replay
  352. with self.repository:
  353. # the repo is only locked by a shared read lock, but to replay segments,
  354. # we need an exclusive write lock - check if the lock gets upgraded.
  355. self.assert_raises(LockFailed, lambda: len(self.repository))
  356. upgrade.assert_called_once_with()
  357. def test_crash_before_deleting_compacted_segments(self):
  358. self.add_keys()
  359. self.repository.io.delete_segment = None
  360. try:
  361. self.repository.commit(compact=False)
  362. except TypeError:
  363. pass
  364. self.reopen()
  365. with self.repository:
  366. self.assert_equal(len(self.repository), 3)
  367. self.assert_equal(self.repository.check(), True)
  368. self.assert_equal(len(self.repository), 3)
  369. def test_ignores_commit_tag_in_data(self):
  370. self.repository.put(H(0), LoggedIO.COMMIT)
  371. self.reopen()
  372. with self.repository:
  373. io = self.repository.io
  374. assert not io.is_committed_segment(io.get_latest_segment())
  375. def test_moved_deletes_are_tracked(self):
  376. self.repository.put(H(1), fchunk(b"1"))
  377. self.repository.put(H(2), fchunk(b"2"))
  378. self.repository.commit(compact=False)
  379. self.repo_dump("p1 p2 c")
  380. self.repository.delete(H(1))
  381. self.repository.commit(compact=True)
  382. self.repo_dump("d1 cc")
  383. last_segment = self.repository.io.get_latest_segment() - 1
  384. num_deletes = 0
  385. for tag, key, offset, size, _ in self.repository.io.iter_objects(last_segment):
  386. if tag == TAG_DELETE:
  387. assert key == H(1)
  388. num_deletes += 1
  389. assert num_deletes == 1
  390. assert last_segment in self.repository.compact
  391. self.repository.put(H(3), fchunk(b"3"))
  392. self.repository.commit(compact=True)
  393. self.repo_dump("p3 cc")
  394. assert last_segment not in self.repository.compact
  395. assert not self.repository.io.segment_exists(last_segment)
  396. for segment, _ in self.repository.io.segment_iterator():
  397. for tag, key, offset, size, _ in self.repository.io.iter_objects(segment):
  398. assert tag != TAG_DELETE
  399. assert key != H(1)
  400. # after compaction, there should be no empty shadowed_segments lists left over.
  401. # we have no put or del any more for H(1), so we lost knowledge about H(1).
  402. assert H(1) not in self.repository.shadow_index
  403. def test_shadowed_entries_are_preserved(self):
  404. get_latest_segment = self.repository.io.get_latest_segment
  405. self.repository.put(H(1), fchunk(b"1"))
  406. # This is the segment with our original PUT of interest
  407. put_segment = get_latest_segment()
  408. self.repository.commit(compact=False)
  409. # We now delete H(1), and force this segment not to be compacted, which can happen
  410. # if it's not sparse enough (symbolized by H(2) here).
  411. self.repository.delete(H(1))
  412. self.repository.put(H(2), fchunk(b"1"))
  413. delete_segment = get_latest_segment()
  414. # We pretend these are mostly dense (not sparse) and won't be compacted
  415. del self.repository.compact[put_segment]
  416. del self.repository.compact[delete_segment]
  417. self.repository.commit(compact=True)
  418. # Now we perform an unrelated operation on the segment containing the DELETE,
  419. # causing it to be compacted.
  420. self.repository.delete(H(2))
  421. self.repository.commit(compact=True)
  422. assert self.repository.io.segment_exists(put_segment)
  423. assert not self.repository.io.segment_exists(delete_segment)
  424. # Basic case, since the index survived this must be ok
  425. assert H(1) not in self.repository
  426. # Nuke index, force replay
  427. os.unlink(os.path.join(self.repository.path, "index.%d" % get_latest_segment()))
  428. # Must not reappear
  429. assert H(1) not in self.repository
  430. def test_shadow_index_rollback(self):
  431. self.repository.put(H(1), fchunk(b"1"))
  432. self.repository.delete(H(1))
  433. assert self.repository.shadow_index[H(1)] == [0]
  434. self.repository.commit(compact=True)
  435. self.repo_dump("p1 d1 cc")
  436. # note how an empty list means that nothing is shadowed for sure
  437. assert self.repository.shadow_index[H(1)] == [] # because the delete is considered unstable
  438. self.repository.put(H(1), b"1")
  439. self.repository.delete(H(1))
  440. self.repo_dump("p1 d1")
  441. # 0 put/delete; 1 commit; 2 compacted; 3 commit; 4 put/delete
  442. assert self.repository.shadow_index[H(1)] == [4]
  443. self.repository.rollback()
  444. self.repo_dump("r")
  445. self.repository.put(H(2), fchunk(b"1"))
  446. # After the rollback segment 4 shouldn't be considered anymore
  447. assert self.repository.shadow_index[H(1)] == [] # because the delete is considered unstable
  448. class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase):
  449. def open(self, create=False):
  450. return Repository(os.path.join(self.tmppath, "repository"), exclusive=True, create=create, append_only=True)
  451. def test_destroy_append_only(self):
  452. # Can't destroy append only repo (via the API)
  453. with self.assert_raises(ValueError):
  454. self.repository.destroy()
  455. assert self.repository.append_only
  456. def test_append_only(self):
  457. def segments_in_repository():
  458. return len(list(self.repository.io.segment_iterator()))
  459. self.repository.put(H(0), fchunk(b"foo"))
  460. self.repository.commit(compact=False)
  461. self.repository.append_only = False
  462. assert segments_in_repository() == 2
  463. self.repository.put(H(0), fchunk(b"foo"))
  464. self.repository.commit(compact=True)
  465. # normal: compact squashes the data together, only one segment
  466. assert segments_in_repository() == 2
  467. self.repository.append_only = True
  468. assert segments_in_repository() == 2
  469. self.repository.put(H(0), fchunk(b"foo"))
  470. self.repository.commit(compact=False)
  471. # append only: does not compact, only new segments written
  472. assert segments_in_repository() == 4
  473. class RepositoryFreeSpaceTestCase(RepositoryTestCaseBase):
  474. def test_additional_free_space(self):
  475. self.add_keys()
  476. self.repository.config.set("repository", "additional_free_space", "1000T")
  477. self.repository.save_key(b"shortcut to save_config")
  478. self.reopen()
  479. with self.repository:
  480. self.repository.put(H(0), fchunk(b"foobar"))
  481. with pytest.raises(Repository.InsufficientFreeSpaceError):
  482. self.repository.commit(compact=False)
  483. assert os.path.exists(self.repository.path)
  484. def test_create_free_space(self):
  485. self.repository.additional_free_space = 1e20
  486. with pytest.raises(Repository.InsufficientFreeSpaceError):
  487. self.add_keys()
  488. assert not os.path.exists(self.repository.path)
  489. class QuotaTestCase(RepositoryTestCaseBase):
  490. def test_tracking(self):
  491. assert self.repository.storage_quota_use == 0
  492. ch1 = fchunk(bytes(1234))
  493. self.repository.put(H(1), ch1)
  494. assert self.repository.storage_quota_use == len(ch1) + 41 + 8
  495. ch2 = fchunk(bytes(5678))
  496. self.repository.put(H(2), ch2)
  497. assert self.repository.storage_quota_use == len(ch1) + len(ch2) + 2 * (41 + 8)
  498. self.repository.delete(H(1))
  499. assert self.repository.storage_quota_use == len(ch1) + len(ch2) + 2 * (41 + 8) # we have not compacted yet
  500. self.repository.commit(compact=False)
  501. assert self.repository.storage_quota_use == len(ch1) + len(ch2) + 2 * (41 + 8) # we have not compacted yet
  502. self.reopen()
  503. with self.repository:
  504. # Open new transaction; hints and thus quota data is not loaded unless needed.
  505. ch3 = fchunk(b"")
  506. self.repository.put(H(3), ch3)
  507. self.repository.delete(H(3))
  508. assert self.repository.storage_quota_use == len(ch1) + len(ch2) + len(ch3) + 3 * (
  509. 41 + 8
  510. ) # we have not compacted yet
  511. self.repository.commit(compact=True)
  512. assert self.repository.storage_quota_use == len(ch2) + 41 + 8
  513. def test_exceed_quota(self):
  514. assert self.repository.storage_quota_use == 0
  515. self.repository.storage_quota = 80
  516. ch1 = fchunk(b"x" * 7)
  517. self.repository.put(H(1), ch1)
  518. assert self.repository.storage_quota_use == len(ch1) + 41 + 8
  519. self.repository.commit(compact=False)
  520. with pytest.raises(Repository.StorageQuotaExceeded):
  521. ch2 = fchunk(b"y" * 13)
  522. self.repository.put(H(2), ch2)
  523. assert self.repository.storage_quota_use == len(ch1) + len(ch2) + (41 + 8) * 2 # check ch2!?
  524. with pytest.raises(Repository.StorageQuotaExceeded):
  525. self.repository.commit(compact=False)
  526. assert self.repository.storage_quota_use == len(ch1) + len(ch2) + (41 + 8) * 2 # check ch2!?
  527. self.reopen()
  528. with self.repository:
  529. self.repository.storage_quota = 150
  530. # Open new transaction; hints and thus quota data is not loaded unless needed.
  531. self.repository.put(H(1), ch1)
  532. assert (
  533. self.repository.storage_quota_use == len(ch1) * 2 + (41 + 8) * 2
  534. ) # we have 2 puts for H(1) here and not yet compacted.
  535. self.repository.commit(compact=True)
  536. assert self.repository.storage_quota_use == len(ch1) + 41 + 8 # now we have compacted.
  537. class RepositoryAuxiliaryCorruptionTestCase(RepositoryTestCaseBase):
  538. def setUp(self):
  539. super().setUp()
  540. self.repository.put(H(0), fchunk(b"foo"))
  541. self.repository.commit(compact=False)
  542. self.repository.close()
  543. def do_commit(self):
  544. with self.repository:
  545. self.repository.put(H(0), fchunk(b"fox"))
  546. self.repository.commit(compact=False)
  547. def test_corrupted_hints(self):
  548. with open(os.path.join(self.repository.path, "hints.1"), "ab") as fd:
  549. fd.write(b"123456789")
  550. self.do_commit()
  551. def test_deleted_hints(self):
  552. os.unlink(os.path.join(self.repository.path, "hints.1"))
  553. self.do_commit()
  554. def test_deleted_index(self):
  555. os.unlink(os.path.join(self.repository.path, "index.1"))
  556. self.do_commit()
  557. def test_unreadable_hints(self):
  558. hints = os.path.join(self.repository.path, "hints.1")
  559. os.unlink(hints)
  560. os.mkdir(hints)
  561. with self.assert_raises(OSError):
  562. self.do_commit()
  563. def test_index(self):
  564. with open(os.path.join(self.repository.path, "index.1"), "wb") as fd:
  565. fd.write(b"123456789")
  566. self.do_commit()
  567. def test_index_outside_transaction(self):
  568. with open(os.path.join(self.repository.path, "index.1"), "wb") as fd:
  569. fd.write(b"123456789")
  570. with self.repository:
  571. assert len(self.repository) == 1
  572. def _corrupt_index(self):
  573. # HashIndex is able to detect incorrect headers and file lengths,
  574. # but on its own it can't tell if the data is correct.
  575. index_path = os.path.join(self.repository.path, "index.1")
  576. with open(index_path, "r+b") as fd:
  577. index_data = fd.read()
  578. # Flip one bit in a key stored in the index
  579. corrupted_key = (int.from_bytes(H(0), "little") ^ 1).to_bytes(32, "little")
  580. corrupted_index_data = index_data.replace(H(0), corrupted_key)
  581. assert corrupted_index_data != index_data
  582. assert len(corrupted_index_data) == len(index_data)
  583. fd.seek(0)
  584. fd.write(corrupted_index_data)
  585. def test_index_corrupted(self):
  586. # HashIndex is able to detect incorrect headers and file lengths,
  587. # but on its own it can't tell if the data itself is correct.
  588. self._corrupt_index()
  589. with self.repository:
  590. # Data corruption is detected due to mismatching checksums
  591. # and fixed by rebuilding the index.
  592. assert len(self.repository) == 1
  593. assert pdchunk(self.repository.get(H(0))) == b"foo"
  594. def test_index_corrupted_without_integrity(self):
  595. self._corrupt_index()
  596. integrity_path = os.path.join(self.repository.path, "integrity.1")
  597. os.unlink(integrity_path)
  598. with self.repository:
  599. # Since the corrupted key is not noticed, the repository still thinks
  600. # it contains one key...
  601. assert len(self.repository) == 1
  602. with pytest.raises(Repository.ObjectNotFound):
  603. # ... but the real, uncorrupted key is not found in the corrupted index.
  604. self.repository.get(H(0))
  605. def test_unreadable_index(self):
  606. index = os.path.join(self.repository.path, "index.1")
  607. os.unlink(index)
  608. os.mkdir(index)
  609. with self.assert_raises(OSError):
  610. self.do_commit()
  611. def test_unknown_integrity_version(self):
  612. # For now an unknown integrity data version is ignored and not an error.
  613. integrity_path = os.path.join(self.repository.path, "integrity.1")
  614. with open(integrity_path, "r+b") as fd:
  615. msgpack.pack(
  616. {
  617. # Borg only understands version 2
  618. b"version": 4.7
  619. },
  620. fd,
  621. )
  622. fd.truncate()
  623. with self.repository:
  624. # No issues accessing the repository
  625. assert len(self.repository) == 1
  626. assert pdchunk(self.repository.get(H(0))) == b"foo"
  627. def _subtly_corrupted_hints_setup(self):
  628. with self.repository:
  629. self.repository.append_only = True
  630. assert len(self.repository) == 1
  631. assert pdchunk(self.repository.get(H(0))) == b"foo"
  632. self.repository.put(H(1), fchunk(b"bar"))
  633. self.repository.put(H(2), fchunk(b"baz"))
  634. self.repository.commit(compact=False)
  635. self.repository.put(H(2), fchunk(b"bazz"))
  636. self.repository.commit(compact=False)
  637. hints_path = os.path.join(self.repository.path, "hints.5")
  638. with open(hints_path, "r+b") as fd:
  639. hints = msgpack.unpack(fd)
  640. fd.seek(0)
  641. # Corrupt segment refcount
  642. assert hints["segments"][2] == 1
  643. hints["segments"][2] = 0
  644. msgpack.pack(hints, fd)
  645. fd.truncate()
  646. def test_subtly_corrupted_hints(self):
  647. self._subtly_corrupted_hints_setup()
  648. with self.repository:
  649. self.repository.append_only = False
  650. self.repository.put(H(3), fchunk(b"1234"))
  651. # Do a compaction run. Succeeds, since the failed checksum prompted a rebuild of the index+hints.
  652. self.repository.commit(compact=True)
  653. assert len(self.repository) == 4
  654. assert pdchunk(self.repository.get(H(0))) == b"foo"
  655. assert pdchunk(self.repository.get(H(1))) == b"bar"
  656. assert pdchunk(self.repository.get(H(2))) == b"bazz"
  657. def test_subtly_corrupted_hints_without_integrity(self):
  658. self._subtly_corrupted_hints_setup()
  659. integrity_path = os.path.join(self.repository.path, "integrity.5")
  660. os.unlink(integrity_path)
  661. with self.repository:
  662. self.repository.append_only = False
  663. self.repository.put(H(3), fchunk(b"1234"))
  664. # Do a compaction run. Fails, since the corrupted refcount was not detected and leads to an assertion failure.
  665. with pytest.raises(AssertionError) as exc_info:
  666. self.repository.commit(compact=True)
  667. assert "Corrupted segment reference count" in str(exc_info.value)
  668. class RepositoryCheckTestCase(RepositoryTestCaseBase):
  669. def list_indices(self):
  670. return [name for name in os.listdir(os.path.join(self.tmppath, "repository")) if name.startswith("index.")]
  671. def check(self, repair=False, status=True):
  672. self.assert_equal(self.repository.check(repair=repair), status)
  673. # Make sure no tmp files are left behind
  674. self.assert_equal(
  675. [name for name in os.listdir(os.path.join(self.tmppath, "repository")) if "tmp" in name],
  676. [],
  677. "Found tmp files",
  678. )
  679. def get_objects(self, *ids):
  680. for id_ in ids:
  681. pdchunk(self.repository.get(H(id_)))
  682. def add_objects(self, segments):
  683. for ids in segments:
  684. for id_ in ids:
  685. self.repository.put(H(id_), fchunk(b"data"))
  686. self.repository.commit(compact=False)
  687. def get_head(self):
  688. return sorted(int(n) for n in os.listdir(os.path.join(self.tmppath, "repository", "data", "0")) if n.isdigit())[
  689. -1
  690. ]
  691. def open_index(self):
  692. return NSIndex.read(os.path.join(self.tmppath, "repository", f"index.{self.get_head()}"))
  693. def corrupt_object(self, id_):
  694. idx = self.open_index()
  695. segment, offset, _ = idx[H(id_)]
  696. with open(os.path.join(self.tmppath, "repository", "data", "0", str(segment)), "r+b") as fd:
  697. fd.seek(offset)
  698. fd.write(b"BOOM")
  699. def delete_segment(self, segment):
  700. self.repository.io.delete_segment(segment)
  701. def delete_index(self):
  702. os.unlink(os.path.join(self.tmppath, "repository", f"index.{self.get_head()}"))
  703. def rename_index(self, new_name):
  704. os.replace(
  705. os.path.join(self.tmppath, "repository", f"index.{self.get_head()}"),
  706. os.path.join(self.tmppath, "repository", new_name),
  707. )
  708. def list_objects(self):
  709. return {int(key) for key in self.repository.list()}
  710. def test_repair_corrupted_segment(self):
  711. self.add_objects([[1, 2, 3], [4, 5], [6]])
  712. self.assert_equal({1, 2, 3, 4, 5, 6}, self.list_objects())
  713. self.check(status=True)
  714. self.corrupt_object(5)
  715. self.assert_raises(IntegrityError, lambda: self.get_objects(5))
  716. self.repository.rollback()
  717. # Make sure a regular check does not repair anything
  718. self.check(status=False)
  719. self.check(status=False)
  720. # Make sure a repair actually repairs the repo
  721. self.check(repair=True, status=True)
  722. self.get_objects(4)
  723. self.check(status=True)
  724. self.assert_equal({1, 2, 3, 4, 6}, self.list_objects())
  725. def test_repair_missing_segment(self):
  726. self.add_objects([[1, 2, 3], [4, 5, 6]])
  727. self.assert_equal({1, 2, 3, 4, 5, 6}, self.list_objects())
  728. self.check(status=True)
  729. self.delete_segment(2)
  730. self.repository.rollback()
  731. self.check(repair=True, status=True)
  732. self.assert_equal({1, 2, 3}, self.list_objects())
  733. def test_repair_missing_commit_segment(self):
  734. self.add_objects([[1, 2, 3], [4, 5, 6]])
  735. self.delete_segment(3)
  736. self.assert_raises(Repository.ObjectNotFound, lambda: self.get_objects(4))
  737. self.assert_equal({1, 2, 3}, self.list_objects())
  738. def test_repair_corrupted_commit_segment(self):
  739. self.add_objects([[1, 2, 3], [4, 5, 6]])
  740. with open(os.path.join(self.tmppath, "repository", "data", "0", "3"), "r+b") as fd:
  741. fd.seek(-1, os.SEEK_END)
  742. fd.write(b"X")
  743. self.assert_raises(Repository.ObjectNotFound, lambda: self.get_objects(4))
  744. self.check(status=True)
  745. self.get_objects(3)
  746. self.assert_equal({1, 2, 3}, self.list_objects())
  747. def test_repair_no_commits(self):
  748. self.add_objects([[1, 2, 3]])
  749. with open(os.path.join(self.tmppath, "repository", "data", "0", "1"), "r+b") as fd:
  750. fd.seek(-1, os.SEEK_END)
  751. fd.write(b"X")
  752. self.assert_raises(Repository.CheckNeeded, lambda: self.get_objects(4))
  753. self.check(status=False)
  754. self.check(status=False)
  755. self.assert_equal(self.list_indices(), ["index.1"])
  756. self.check(repair=True, status=True)
  757. self.assert_equal(self.list_indices(), ["index.2"])
  758. self.check(status=True)
  759. self.get_objects(3)
  760. self.assert_equal({1, 2, 3}, self.list_objects())
  761. def test_repair_missing_index(self):
  762. self.add_objects([[1, 2, 3], [4, 5, 6]])
  763. self.delete_index()
  764. self.check(status=True)
  765. self.get_objects(4)
  766. self.assert_equal({1, 2, 3, 4, 5, 6}, self.list_objects())
  767. def test_repair_index_too_new(self):
  768. self.add_objects([[1, 2, 3], [4, 5, 6]])
  769. self.assert_equal(self.list_indices(), ["index.3"])
  770. self.rename_index("index.100")
  771. self.check(status=True)
  772. self.assert_equal(self.list_indices(), ["index.3"])
  773. self.get_objects(4)
  774. self.assert_equal({1, 2, 3, 4, 5, 6}, self.list_objects())
  775. def test_crash_before_compact(self):
  776. self.repository.put(H(0), fchunk(b"data"))
  777. self.repository.put(H(0), fchunk(b"data2"))
  778. # Simulate a crash before compact
  779. with patch.object(Repository, "compact_segments") as compact:
  780. self.repository.commit(compact=True)
  781. compact.assert_called_once_with(0.1)
  782. self.reopen()
  783. with self.repository:
  784. self.check(repair=True)
  785. self.assert_equal(pdchunk(self.repository.get(H(0))), b"data2")
  786. class RepositoryHintsTestCase(RepositoryTestCaseBase):
  787. def test_hints_persistence(self):
  788. self.repository.put(H(0), fchunk(b"data"))
  789. self.repository.delete(H(0))
  790. self.repository.commit(compact=False)
  791. shadow_index_expected = self.repository.shadow_index
  792. compact_expected = self.repository.compact
  793. segments_expected = self.repository.segments
  794. # close and re-open the repository (create fresh Repository instance) to
  795. # check whether hints were persisted to / reloaded from disk
  796. self.reopen()
  797. with self.repository:
  798. # see also do_compact()
  799. self.repository.put(H(42), fchunk(b"foobar")) # this will call prepare_txn() and load the hints data
  800. # check if hints persistence worked:
  801. self.assert_equal(shadow_index_expected, self.repository.shadow_index)
  802. self.assert_equal(compact_expected, self.repository.compact)
  803. del self.repository.segments[2] # ignore the segment created by put(H(42), ...)
  804. self.assert_equal(segments_expected, self.repository.segments)
  805. def test_hints_behaviour(self):
  806. self.repository.put(H(0), fchunk(b"data"))
  807. self.assert_equal(self.repository.shadow_index, {})
  808. assert len(self.repository.compact) == 0
  809. self.repository.delete(H(0))
  810. self.repository.commit(compact=False)
  811. # now there should be an entry for H(0) in shadow_index
  812. self.assert_in(H(0), self.repository.shadow_index)
  813. self.assert_equal(len(self.repository.shadow_index[H(0)]), 1)
  814. self.assert_in(0, self.repository.compact) # segment 0 can be compacted
  815. self.repository.put(H(42), fchunk(b"foobar")) # see also do_compact()
  816. self.repository.commit(compact=True, threshold=0.0) # compact completely!
  817. # nothing to compact any more! no info left about stuff that does not exist any more:
  818. self.assert_not_in(H(0), self.repository.shadow_index)
  819. # segment 0 was compacted away, no info about it left:
  820. self.assert_not_in(0, self.repository.compact)
  821. self.assert_not_in(0, self.repository.segments)
  822. class RemoteRepositoryTestCase(RepositoryTestCase):
  823. repository = None # type: RemoteRepository
  824. def open(self, create=False):
  825. return RemoteRepository(
  826. Location("ssh://__testsuite__" + os.path.join(self.tmppath, "repository")), exclusive=True, create=create
  827. )
  828. def _get_mock_args(self):
  829. class MockArgs:
  830. remote_path = "borg"
  831. umask = 0o077
  832. debug_topics = []
  833. rsh = None
  834. def __contains__(self, item):
  835. # To behave like argparse.Namespace
  836. return hasattr(self, item)
  837. return MockArgs()
  838. def test_invalid_rpc(self):
  839. self.assert_raises(InvalidRPCMethod, lambda: self.repository.call("__init__", {}))
  840. def test_rpc_exception_transport(self):
  841. s1 = "test string"
  842. try:
  843. self.repository.call("inject_exception", {"kind": "DoesNotExist"})
  844. except Repository.DoesNotExist as e:
  845. assert len(e.args) == 1
  846. assert e.args[0] == self.repository.location.processed
  847. try:
  848. self.repository.call("inject_exception", {"kind": "AlreadyExists"})
  849. except Repository.AlreadyExists as e:
  850. assert len(e.args) == 1
  851. assert e.args[0] == self.repository.location.processed
  852. try:
  853. self.repository.call("inject_exception", {"kind": "CheckNeeded"})
  854. except Repository.CheckNeeded as e:
  855. assert len(e.args) == 1
  856. assert e.args[0] == self.repository.location.processed
  857. try:
  858. self.repository.call("inject_exception", {"kind": "IntegrityError"})
  859. except IntegrityError as e:
  860. assert len(e.args) == 1
  861. assert e.args[0] == s1
  862. try:
  863. self.repository.call("inject_exception", {"kind": "PathNotAllowed"})
  864. except PathNotAllowed as e:
  865. assert len(e.args) == 1
  866. assert e.args[0] == "foo"
  867. try:
  868. self.repository.call("inject_exception", {"kind": "ObjectNotFound"})
  869. except Repository.ObjectNotFound as e:
  870. assert len(e.args) == 2
  871. assert e.args[0] == s1
  872. assert e.args[1] == self.repository.location.processed
  873. try:
  874. self.repository.call("inject_exception", {"kind": "InvalidRPCMethod"})
  875. except InvalidRPCMethod as e:
  876. assert len(e.args) == 1
  877. assert e.args[0] == s1
  878. try:
  879. self.repository.call("inject_exception", {"kind": "divide"})
  880. except RemoteRepository.RPCError as e:
  881. assert e.unpacked
  882. assert e.get_message() == "ZeroDivisionError: integer division or modulo by zero\n"
  883. assert e.exception_class == "ZeroDivisionError"
  884. assert len(e.exception_full) > 0
  885. def test_ssh_cmd(self):
  886. args = self._get_mock_args()
  887. self.repository._args = args
  888. assert self.repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "example.com"]
  889. assert self.repository.ssh_cmd(Location("ssh://user@example.com/foo")) == ["ssh", "user@example.com"]
  890. assert self.repository.ssh_cmd(Location("ssh://user@example.com:1234/foo")) == [
  891. "ssh",
  892. "-p",
  893. "1234",
  894. "user@example.com",
  895. ]
  896. os.environ["BORG_RSH"] = "ssh --foo"
  897. assert self.repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "--foo", "example.com"]
  898. def test_borg_cmd(self):
  899. assert self.repository.borg_cmd(None, testing=True) == [sys.executable, "-m", "borg", "serve"]
  900. args = self._get_mock_args()
  901. # XXX without next line we get spurious test fails when using pytest-xdist, root cause unknown:
  902. logging.getLogger().setLevel(logging.INFO)
  903. # note: test logger is on info log level, so --info gets added automagically
  904. assert self.repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"]
  905. args.remote_path = "borg-0.28.2"
  906. assert self.repository.borg_cmd(args, testing=False) == ["borg-0.28.2", "serve", "--info"]
  907. args.debug_topics = ["something_client_side", "repository_compaction"]
  908. assert self.repository.borg_cmd(args, testing=False) == [
  909. "borg-0.28.2",
  910. "serve",
  911. "--info",
  912. "--debug-topic=borg.debug.repository_compaction",
  913. ]
  914. args = self._get_mock_args()
  915. args.storage_quota = 0
  916. assert self.repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"]
  917. args.storage_quota = 314159265
  918. assert self.repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info", "--storage-quota=314159265"]
  919. args.rsh = "ssh -i foo"
  920. self.repository._args = args
  921. assert self.repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "-i", "foo", "example.com"]
  922. class RemoteLegacyFree(RepositoryTestCaseBase):
  923. # Keep testing this so we can someday safely remove the legacy tuple format.
  924. def open(self, create=False):
  925. with patch.object(RemoteRepository, "dictFormat", True):
  926. return RemoteRepository(
  927. Location("ssh://__testsuite__" + os.path.join(self.tmppath, "repository")),
  928. exclusive=True,
  929. create=create,
  930. )
  931. def test_legacy_free(self):
  932. # put
  933. self.repository.put(H(0), fchunk(b"foo"))
  934. self.repository.commit(compact=False)
  935. self.repository.close()
  936. # replace
  937. self.repository = self.open()
  938. with self.repository:
  939. self.repository.put(H(0), fchunk(b"bar"))
  940. self.repository.commit(compact=False)
  941. # delete
  942. self.repository = self.open()
  943. with self.repository:
  944. self.repository.delete(H(0))
  945. self.repository.commit(compact=False)
  946. class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase):
  947. def open(self, create=False):
  948. return RemoteRepository(
  949. Location("ssh://__testsuite__" + os.path.join(self.tmppath, "repository")), exclusive=True, create=create
  950. )
  951. def test_crash_before_compact(self):
  952. # skip this test, we can't mock-patch a Repository class in another process!
  953. pass
  954. def test_repair_missing_commit_segment(self):
  955. # skip this test, files in RemoteRepository cannot be deleted
  956. pass
  957. def test_repair_missing_segment(self):
  958. # skip this test, files in RemoteRepository cannot be deleted
  959. pass
  960. class RemoteLoggerTestCase(BaseTestCase):
  961. def setUp(self):
  962. self.stream = io.StringIO()
  963. self.handler = logging.StreamHandler(self.stream)
  964. logging.getLogger().handlers[:] = [self.handler]
  965. logging.getLogger("borg.repository").handlers[:] = []
  966. logging.getLogger("borg.repository.foo").handlers[:] = []
  967. # capture stderr
  968. sys.stderr.flush()
  969. self.old_stderr = sys.stderr
  970. self.stderr = sys.stderr = io.StringIO()
  971. def tearDown(self):
  972. sys.stderr = self.old_stderr
  973. def test_stderr_messages(self):
  974. handle_remote_line("unstructured stderr message\n")
  975. self.assert_equal(self.stream.getvalue(), "")
  976. # stderr messages don't get an implicit newline
  977. self.assert_equal(self.stderr.getvalue(), "Remote: unstructured stderr message\n")
  978. def test_stderr_progress_messages(self):
  979. handle_remote_line("unstructured stderr progress message\r")
  980. self.assert_equal(self.stream.getvalue(), "")
  981. # stderr messages don't get an implicit newline
  982. self.assert_equal(self.stderr.getvalue(), "Remote: unstructured stderr progress message\r")
  983. def test_pre11_format_messages(self):
  984. self.handler.setLevel(logging.DEBUG)
  985. logging.getLogger().setLevel(logging.DEBUG)
  986. handle_remote_line("$LOG INFO Remote: borg < 1.1 format message\n")
  987. self.assert_equal(self.stream.getvalue(), "Remote: borg < 1.1 format message\n")
  988. self.assert_equal(self.stderr.getvalue(), "")
  989. def test_post11_format_messages(self):
  990. self.handler.setLevel(logging.DEBUG)
  991. logging.getLogger().setLevel(logging.DEBUG)
  992. handle_remote_line("$LOG INFO borg.repository Remote: borg >= 1.1 format message\n")
  993. self.assert_equal(self.stream.getvalue(), "Remote: borg >= 1.1 format message\n")
  994. self.assert_equal(self.stderr.getvalue(), "")
  995. def test_remote_messages_screened(self):
  996. # default borg config for root logger
  997. self.handler.setLevel(logging.WARNING)
  998. logging.getLogger().setLevel(logging.WARNING)
  999. handle_remote_line("$LOG INFO borg.repository Remote: new format info message\n")
  1000. self.assert_equal(self.stream.getvalue(), "")
  1001. self.assert_equal(self.stderr.getvalue(), "")
  1002. def test_info_to_correct_local_child(self):
  1003. logging.getLogger("borg.repository").setLevel(logging.INFO)
  1004. logging.getLogger("borg.repository.foo").setLevel(logging.INFO)
  1005. # default borg config for root logger
  1006. self.handler.setLevel(logging.WARNING)
  1007. logging.getLogger().setLevel(logging.WARNING)
  1008. child_stream = io.StringIO()
  1009. child_handler = logging.StreamHandler(child_stream)
  1010. child_handler.setLevel(logging.INFO)
  1011. logging.getLogger("borg.repository").handlers[:] = [child_handler]
  1012. foo_stream = io.StringIO()
  1013. foo_handler = logging.StreamHandler(foo_stream)
  1014. foo_handler.setLevel(logging.INFO)
  1015. logging.getLogger("borg.repository.foo").handlers[:] = [foo_handler]
  1016. handle_remote_line("$LOG INFO borg.repository Remote: new format child message\n")
  1017. self.assert_equal(foo_stream.getvalue(), "")
  1018. self.assert_equal(child_stream.getvalue(), "Remote: new format child message\n")
  1019. self.assert_equal(self.stream.getvalue(), "")
  1020. self.assert_equal(self.stderr.getvalue(), "")