repository.py 82 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739
  1. import errno
  2. import mmap
  3. import os
  4. import shutil
  5. import stat
  6. import struct
  7. import time
  8. from collections import defaultdict
  9. from configparser import ConfigParser
  10. from functools import partial
  11. from itertools import islice
  12. from .constants import * # NOQA
  13. from .hashindex import NSIndex
  14. from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size
  15. from .helpers import Location
  16. from .helpers import ProgressIndicatorPercent
  17. from .helpers import bin_to_hex, hex_to_bin
  18. from .helpers import secure_erase, safe_unlink
  19. from .helpers import Manifest
  20. from .helpers import msgpack
  21. from .helpers import utcnow
  22. from .locking import Lock, LockError, LockErrorT
  23. from .logger import create_logger
  24. from .lrucache import LRUCache
  25. from .platform import SaveFile, SyncFile, sync_dir, safe_fadvise
  26. from .algorithms.checksums import crc32
  27. from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
  28. logger = create_logger(__name__)
  29. MAGIC = b'BORG_SEG'
  30. MAGIC_LEN = len(MAGIC)
  31. ATTIC_MAGIC = b'ATTICSEG'
  32. assert len(ATTIC_MAGIC) == MAGIC_LEN
  33. TAG_PUT = 0
  34. TAG_DELETE = 1
  35. TAG_COMMIT = 2
  36. # Highest ID usable as TAG_* value
  37. #
  38. # Code may expect not to find any tags exceeding this value. In particular,
  39. # in order to speed up `borg check --repair`, any tag greater than MAX_TAG_ID
  40. # is assumed to be corrupted. When increasing this value, in order to add more
  41. # tags, keep in mind that old versions of Borg accessing a new repository
  42. # may not be able to handle the new tags.
  43. MAX_TAG_ID = 15
  44. FreeSpace = partial(defaultdict, int)
  45. class Repository:
  46. """
  47. Filesystem-based transactional key-value store.
  48. Transactionality is achieved by using a log (aka journal) to record changes. The log is a series of numbered files
  49. called segments. Each segment is a series of log entries. The segment number together with the offset of each
  50. entry relative to its segment start establishes an ordering of the log entries. This is the "definition" of
  51. time for the purposes of the log.
  52. Log entries are either PUT, DELETE or COMMIT.
  53. A COMMIT is always the final log entry in a segment and marks all data from the beginning of the log until the
  54. segment ending with the COMMIT as committed and consistent. The segment number of a segment ending with a COMMIT
  55. is called the transaction ID of that commit, and a segment ending with a COMMIT is called committed.
  56. When reading from a repository it is first checked whether the last segment is committed. If it is not, then
  57. all segments after the last committed segment are deleted; they contain log entries whose consistency is not
  58. established by a COMMIT.
  59. Note that the COMMIT can't establish consistency by itself, but only manages to do so with proper support from
  60. the platform (including the hardware). See platform.base.SyncFile for details.
  61. A PUT inserts a key-value pair. The value is stored in the log entry, hence the repository implements
  62. full data logging, meaning that all data is consistent, not just metadata (which is common in file systems).
  63. A DELETE marks a key as deleted.
  64. For a given key only the last entry regarding the key, which is called current (all other entries are called
  65. superseded), is relevant: If there is no entry or the last entry is a DELETE then the key does not exist.
  66. Otherwise the last PUT defines the value of the key.
  67. By superseding a PUT (with either another PUT or a DELETE) the log entry becomes obsolete. A segment containing
  68. such obsolete entries is called sparse, while a segment containing no such entries is called compact.
  69. Sparse segments can be compacted and thereby disk space freed. This destroys the transaction for which the
  70. superseded entries were current.
  71. On-disk layout:
  72. dir/README
  73. dir/config
  74. dir/data/<X // SEGMENTS_PER_DIR>/<X>
  75. dir/index.X
  76. dir/hints.X
  77. File system interaction
  78. -----------------------
  79. LoggedIO generally tries to rely on common behaviors across transactional file systems.
  80. Segments that are deleted are truncated first, which avoids problems if the FS needs to
  81. allocate space to delete the dirent of the segment. This mostly affects CoW file systems,
  82. traditional journaling file systems have a fairly good grip on this problem.
  83. Note that deletion, i.e. unlink(2), is atomic on every file system that uses inode reference
  84. counts, which includes pretty much all of them. To remove a dirent the inode's reference count has
  85. to be decreased, but you cannot decrease the reference count before removing the dirent nor can you
  86. decrease the reference count after removing the dirent. File systems solve this with a lock,
  87. and by ensuring it all stays within the same FS transaction.
  88. Truncation is generally not atomic in itself, and combining truncate(2) and unlink(2) is of
  89. course never guaranteed to be atomic. Truncation in a classic extent-based FS is done in
  90. roughly two phases, first the extents are removed then the inode is updated. (In practice
  91. this is of course way more complex).
  92. LoggedIO gracefully handles truncate/unlink splits as long as the truncate resulted in
  93. a zero-length file. Zero-length segments are considered to not exist, while LoggedIO.cleanup()
  94. will still get rid of them.
  95. """
  96. class AlreadyExists(Error):
  97. """A repository already exists at {}."""
  98. exit_mcode = 10
  99. class AtticRepository(Error):
  100. """Attic repository detected. Please run "borg upgrade {}"."""
  101. exit_mcode = 11
  102. class CheckNeeded(ErrorWithTraceback):
  103. """Inconsistency detected. Please run "borg check {}"."""
  104. exit_mcode = 12
  105. class DoesNotExist(Error):
  106. """Repository {} does not exist."""
  107. exit_mcode = 13
  108. class InsufficientFreeSpaceError(Error):
  109. """Insufficient free space to complete transaction (required: {}, available: {})."""
  110. exit_mcode = 14
  111. class InvalidRepository(Error):
  112. """{} is not a valid repository. Check repo config."""
  113. exit_mcode = 15
  114. class InvalidRepositoryConfig(Error):
  115. """{} does not have a valid configuration. Check repo config [{}]."""
  116. exit_mcode = 16
  117. class ObjectNotFound(ErrorWithTraceback):
  118. """Object with key {} not found in repository {}."""
  119. exit_mcode = 17
  120. def __init__(self, id, repo):
  121. if isinstance(id, bytes):
  122. id = bin_to_hex(id)
  123. super().__init__(id, repo)
  124. class ParentPathDoesNotExist(Error):
  125. """The parent path of the repo directory [{}] does not exist."""
  126. exit_mcode = 18
  127. class PathAlreadyExists(Error):
  128. """There is already something at {}."""
  129. exit_mcode = 19
  130. class StorageQuotaExceeded(Error):
  131. """The storage quota ({}) has been exceeded ({}). Try deleting some archives."""
  132. exit_mcode = 20
  133. class PathPermissionDenied(Error):
  134. """Permission denied to {}."""
  135. exit_mcode = 21
  136. def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=True,
  137. append_only=False, storage_quota=None, check_segment_magic=True,
  138. make_parent_dirs=False):
  139. self.path = os.path.abspath(path)
  140. self._location = Location('file://%s' % self.path)
  141. self.io = None # type: LoggedIO
  142. self.lock = None
  143. self.index = None
  144. # This is an index of shadowed log entries during this transaction. Consider the following sequence:
  145. # segment_n PUT A, segment_x DELETE A
  146. # After the "DELETE A" in segment_x the shadow index will contain "A -> [n]".
  147. # .delete() is updating this index, it is persisted into "hints" file and is later used by .compact_segments().
  148. # We need the entries in the shadow_index to not accidentally drop the "DELETE A" when we compact segment_x
  149. # only (and we do not compact segment_n), because DELETE A is still needed then because PUT A will be still
  150. # there. Otherwise chunk A would reappear although it was previously deleted.
  151. self.shadow_index = {}
  152. self._active_txn = False
  153. self.lock_wait = lock_wait
  154. self.do_lock = lock
  155. self.do_create = create
  156. self.created = False
  157. self.exclusive = exclusive
  158. self.append_only = append_only
  159. self.storage_quota = storage_quota
  160. self.storage_quota_use = 0
  161. self.transaction_doomed = None
  162. self.check_segment_magic = check_segment_magic
  163. self.make_parent_dirs = make_parent_dirs
  164. def __del__(self):
  165. if self.lock:
  166. self.close()
  167. assert False, "cleanup happened in Repository.__del__"
  168. def __repr__(self):
  169. return f'<{self.__class__.__name__} {self.path}>'
  170. def __enter__(self):
  171. if self.do_create:
  172. self.do_create = False
  173. self.create(self.path)
  174. self.created = True
  175. self.open(self.path, bool(self.exclusive), lock_wait=self.lock_wait, lock=self.do_lock)
  176. return self
  177. def __exit__(self, exc_type, exc_val, exc_tb):
  178. if exc_type is not None:
  179. no_space_left_on_device = exc_type is OSError and exc_val.errno == errno.ENOSPC
  180. # The ENOSPC could have originated somewhere else besides the Repository. The cleanup is always safe, unless
  181. # EIO or FS corruption ensues, which is why we specifically check for ENOSPC.
  182. if self._active_txn and no_space_left_on_device:
  183. logger.warning('No space left on device, cleaning up partial transaction to free space.')
  184. cleanup = True
  185. else:
  186. cleanup = False
  187. self._rollback(cleanup=cleanup)
  188. self.close()
  189. @property
  190. def id_str(self):
  191. return bin_to_hex(self.id)
  192. @staticmethod
  193. def is_repository(path):
  194. """Check whether there is already a Borg repository at *path*."""
  195. try:
  196. # Use binary mode to avoid troubles if a README contains some stuff not in our locale
  197. with open(os.path.join(path, 'README'), 'rb') as fd:
  198. # Read only the first ~100 bytes (if any), in case some README file we stumble upon is large.
  199. readme_head = fd.read(100)
  200. # The first comparison captures our current variant (REPOSITORY_README), the second comparison
  201. # is an older variant of the README file (used by 1.0.x).
  202. return b'Borg Backup repository' in readme_head or b'Borg repository' in readme_head
  203. except OSError:
  204. # Ignore FileNotFound, PermissionError, ...
  205. return False
  206. def check_can_create_repository(self, path):
  207. """
  208. Raise an exception if a repository already exists at *path* or any parent directory.
  209. Checking parent directories is done for two reasons:
  210. (1) It's just a weird thing to do, and usually not intended. A Borg using the "parent" repository
  211. may be confused, or we may accidentally put stuff into the "data/" or "data/<n>/" directories.
  212. (2) When implementing repository quotas (which we currently don't), it's important to prohibit
  213. folks from creating quota-free repositories. Since no one can create a repository within another
  214. repository, user's can only use the quota'd repository, when their --restrict-to-path points
  215. at the user's repository.
  216. """
  217. try:
  218. st = os.stat(path)
  219. except FileNotFoundError:
  220. pass # nothing there!
  221. except PermissionError:
  222. raise self.PathPermissionDenied(path) from None
  223. else:
  224. # there is something already there!
  225. if self.is_repository(path):
  226. raise self.AlreadyExists(path)
  227. if not stat.S_ISDIR(st.st_mode):
  228. raise self.PathAlreadyExists(path)
  229. try:
  230. files = os.listdir(path)
  231. except PermissionError:
  232. raise self.PathPermissionDenied(path) from None
  233. else:
  234. if files: # a dir, but not empty
  235. raise self.PathAlreadyExists(path)
  236. else: # an empty directory is acceptable for us.
  237. pass
  238. while True:
  239. # Check all parent directories for Borg's repository README
  240. previous_path = path
  241. # Thus, path = previous_path/..
  242. path = os.path.abspath(os.path.join(previous_path, os.pardir))
  243. if path == previous_path:
  244. # We reached the root of the directory hierarchy (/.. = / and C:\.. = C:\).
  245. break
  246. if self.is_repository(path):
  247. raise self.AlreadyExists(path)
  248. def create(self, path):
  249. """Create a new empty repository at `path`
  250. """
  251. self.check_can_create_repository(path)
  252. if self.make_parent_dirs:
  253. parent_path = os.path.join(path, os.pardir)
  254. os.makedirs(parent_path, exist_ok=True)
  255. if not os.path.exists(path):
  256. try:
  257. os.mkdir(path)
  258. except FileNotFoundError as err:
  259. raise self.ParentPathDoesNotExist(path) from err
  260. with open(os.path.join(path, 'README'), 'w') as fd:
  261. fd.write(REPOSITORY_README)
  262. os.mkdir(os.path.join(path, 'data'))
  263. config = ConfigParser(interpolation=None)
  264. config.add_section('repository')
  265. config.set('repository', 'version', '1')
  266. config.set('repository', 'segments_per_dir', str(DEFAULT_SEGMENTS_PER_DIR))
  267. config.set('repository', 'max_segment_size', str(DEFAULT_MAX_SEGMENT_SIZE))
  268. config.set('repository', 'append_only', str(int(self.append_only)))
  269. if self.storage_quota:
  270. config.set('repository', 'storage_quota', str(self.storage_quota))
  271. else:
  272. config.set('repository', 'storage_quota', '0')
  273. config.set('repository', 'additional_free_space', '0')
  274. config.set('repository', 'id', bin_to_hex(os.urandom(32)))
  275. self.save_config(path, config)
  276. def save_config(self, path, config):
  277. config_path = os.path.join(path, 'config')
  278. old_config_path = os.path.join(path, 'config.old')
  279. if os.path.isfile(old_config_path):
  280. logger.warning("Old config file not securely erased on previous config update")
  281. secure_erase(old_config_path, avoid_collateral_damage=True)
  282. if os.path.isfile(config_path):
  283. link_error_msg = ("Failed to securely erase old repository config file (hardlinks not supported). "
  284. "Old repokey data, if any, might persist on physical storage.")
  285. try:
  286. os.link(config_path, old_config_path)
  287. except OSError as e:
  288. if e.errno in (errno.EMLINK, errno.ENOSYS, errno.EPERM, errno.EACCES, errno.ENOTSUP, errno.EIO):
  289. logger.warning(link_error_msg)
  290. else:
  291. raise
  292. except AttributeError:
  293. # some python ports have no os.link, see #4901
  294. logger.warning(link_error_msg)
  295. try:
  296. with SaveFile(config_path) as fd:
  297. config.write(fd)
  298. except PermissionError as e:
  299. # error is only a problem if we even had a lock
  300. if self.do_lock:
  301. raise
  302. logger.warning("%s: Failed writing to '%s'. This is expected when working on "
  303. "read-only repositories." % (e.strerror, e.filename))
  304. if os.path.isfile(old_config_path):
  305. secure_erase(old_config_path, avoid_collateral_damage=True)
  306. def save_key(self, keydata):
  307. assert self.config
  308. keydata = keydata.decode('utf-8') # remote repo: msgpack issue #99, getting bytes
  309. self.config.set('repository', 'key', keydata)
  310. self.save_config(self.path, self.config)
  311. def load_key(self):
  312. keydata = self.config.get('repository', 'key', fallback='').strip()
  313. # note: if we return an empty string, it means there is no repo key
  314. return keydata.encode('utf-8') # remote repo: msgpack issue #99, returning bytes
  315. def get_free_nonce(self):
  316. if self.do_lock and not self.lock.got_exclusive_lock():
  317. raise AssertionError("bug in code, exclusive lock should exist here")
  318. nonce_path = os.path.join(self.path, 'nonce')
  319. try:
  320. with open(nonce_path) as fd:
  321. nonce_hex = fd.read().strip()
  322. except FileNotFoundError:
  323. return None
  324. else:
  325. try:
  326. nonce_bytes = hex_to_bin(nonce_hex, length=8)
  327. except ValueError as e:
  328. raise Error(f"Repository has an invalid nonce file: {e}") from None
  329. return int.from_bytes(nonce_bytes, byteorder='big')
  330. def commit_nonce_reservation(self, next_unreserved, start_nonce):
  331. if self.do_lock and not self.lock.got_exclusive_lock():
  332. raise AssertionError("bug in code, exclusive lock should exist here")
  333. if self.get_free_nonce() != start_nonce:
  334. raise Exception("nonce space reservation with mismatched previous state")
  335. nonce_path = os.path.join(self.path, 'nonce')
  336. try:
  337. with SaveFile(nonce_path, binary=False) as fd:
  338. fd.write(bin_to_hex(next_unreserved.to_bytes(8, byteorder='big')))
  339. except PermissionError as e:
  340. # error is only a problem if we even had a lock
  341. if self.do_lock:
  342. raise
  343. logger.warning("%s: Failed writing to '%s'. This is expected when working on "
  344. "read-only repositories." % (e.strerror, e.filename))
  345. def destroy(self):
  346. """Destroy the repository at `self.path`
  347. """
  348. if self.append_only:
  349. raise ValueError(self.path + " is in append-only mode")
  350. self.close()
  351. os.remove(os.path.join(self.path, 'config')) # kill config first
  352. shutil.rmtree(self.path)
  353. def get_index_transaction_id(self):
  354. indices = sorted(int(fn[6:])
  355. for fn in os.listdir(self.path)
  356. if fn.startswith('index.') and fn[6:].isdigit() and os.stat(os.path.join(self.path, fn)).st_size != 0)
  357. if indices:
  358. return indices[-1]
  359. else:
  360. return None
  361. def check_transaction(self):
  362. index_transaction_id = self.get_index_transaction_id()
  363. segments_transaction_id = self.io.get_segments_transaction_id()
  364. if index_transaction_id is not None and segments_transaction_id is None:
  365. # we have a transaction id from the index, but we did not find *any*
  366. # commit in the segment files (thus no segments transaction id).
  367. # this can happen if a lot of segment files are lost, e.g. due to a
  368. # filesystem or hardware malfunction. it means we have no identifiable
  369. # valid (committed) state of the repo which we could use.
  370. msg = '%s" - although likely this is "beyond repair' % self.path # dirty hack
  371. raise self.CheckNeeded(msg)
  372. # Attempt to automatically rebuild index if we crashed between commit
  373. # tag write and index save
  374. if index_transaction_id != segments_transaction_id:
  375. if index_transaction_id is not None and index_transaction_id > segments_transaction_id:
  376. replay_from = None
  377. else:
  378. replay_from = index_transaction_id
  379. self.replay_segments(replay_from, segments_transaction_id)
  380. def get_transaction_id(self):
  381. self.check_transaction()
  382. return self.get_index_transaction_id()
  383. def break_lock(self):
  384. Lock(os.path.join(self.path, 'lock')).break_lock()
  385. def migrate_lock(self, old_id, new_id):
  386. # note: only needed for local repos
  387. if self.lock is not None:
  388. self.lock.migrate_lock(old_id, new_id)
  389. def open(self, path, exclusive, lock_wait=None, lock=True):
  390. self.path = path
  391. try:
  392. st = os.stat(path)
  393. except FileNotFoundError:
  394. raise self.DoesNotExist(path)
  395. if not stat.S_ISDIR(st.st_mode):
  396. raise self.InvalidRepository(path)
  397. if lock:
  398. self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait).acquire()
  399. else:
  400. self.lock = None
  401. self.config = ConfigParser(interpolation=None)
  402. try:
  403. with open(os.path.join(self.path, 'config')) as fd:
  404. self.config.read_file(fd)
  405. except FileNotFoundError:
  406. self.close()
  407. raise self.InvalidRepository(self.path)
  408. if 'repository' not in self.config.sections():
  409. self.close()
  410. raise self.InvalidRepositoryConfig(path, 'no repository section found')
  411. repo_version = self.config.getint('repository', 'version')
  412. if repo_version != 1:
  413. self.close()
  414. raise self.InvalidRepositoryConfig(
  415. path,
  416. 'repository version %d is not supported by this borg version' % repo_version
  417. )
  418. self.max_segment_size = parse_file_size(self.config.get('repository', 'max_segment_size'))
  419. if self.max_segment_size >= MAX_SEGMENT_SIZE_LIMIT:
  420. self.close()
  421. raise self.InvalidRepositoryConfig(path, 'max_segment_size >= %d' % MAX_SEGMENT_SIZE_LIMIT) # issue 3592
  422. self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
  423. self.additional_free_space = parse_file_size(self.config.get('repository', 'additional_free_space', fallback=0))
  424. # append_only can be set in the constructor
  425. # it shouldn't be overridden (True -> False) here
  426. self.append_only = self.append_only or self.config.getboolean('repository', 'append_only', fallback=False)
  427. if self.storage_quota is None:
  428. # self.storage_quota is None => no explicit storage_quota was specified, use repository setting.
  429. self.storage_quota = parse_file_size(self.config.get('repository', 'storage_quota', fallback=0))
  430. self.id = hex_to_bin(self.config.get('repository', 'id').strip(), length=32)
  431. self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
  432. if self.check_segment_magic:
  433. # read a segment and check whether we are dealing with a non-upgraded Attic repository
  434. segment = self.io.get_latest_segment()
  435. if segment is not None and self.io.get_segment_magic(segment) == ATTIC_MAGIC:
  436. self.close()
  437. raise self.AtticRepository(path)
  438. def close(self):
  439. if self.lock:
  440. if self.io:
  441. self.io.close()
  442. self.io = None
  443. self.lock.release()
  444. self.lock = None
  445. def commit(self, save_space=False, compact=True, threshold=0.1, cleanup_commits=False):
  446. """Commit transaction
  447. """
  448. # save_space is not used anymore, but stays for RPC/API compatibility.
  449. if self.transaction_doomed:
  450. exception = self.transaction_doomed
  451. self.rollback()
  452. raise exception
  453. self.check_free_space()
  454. self.log_storage_quota()
  455. segment = self.io.write_commit()
  456. self.segments.setdefault(segment, 0)
  457. self.compact[segment] += LoggedIO.header_fmt.size
  458. if compact and not self.append_only:
  459. if cleanup_commits:
  460. # due to bug #2850, there might be a lot of commit-only segment files.
  461. # this is for a one-time cleanup of these 17byte files.
  462. for segment, filename in self.io.segment_iterator():
  463. if os.path.getsize(filename) == 17:
  464. self.segments[segment] = 0
  465. self.compact[segment] = LoggedIO.header_fmt.size
  466. self.compact_segments(threshold)
  467. self.write_index()
  468. self.rollback()
  469. def _read_integrity(self, transaction_id, key):
  470. integrity_file = 'integrity.%d' % transaction_id
  471. integrity_path = os.path.join(self.path, integrity_file)
  472. try:
  473. with open(integrity_path, 'rb') as fd:
  474. integrity = msgpack.unpack(fd)
  475. except FileNotFoundError:
  476. return
  477. if integrity.get(b'version') != 2:
  478. logger.warning('Unknown integrity data version %r in %s', integrity.get(b'version'), integrity_file)
  479. return
  480. return integrity[key].decode()
  481. def open_index(self, transaction_id, auto_recover=True):
  482. if transaction_id is None:
  483. return NSIndex()
  484. index_path = os.path.join(self.path, 'index.%d' % transaction_id)
  485. integrity_data = self._read_integrity(transaction_id, b'index')
  486. try:
  487. with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd:
  488. return NSIndex.read(fd)
  489. except (ValueError, OSError, FileIntegrityError) as exc:
  490. logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc)
  491. os.unlink(index_path)
  492. if not auto_recover:
  493. raise
  494. self.prepare_txn(self.get_transaction_id())
  495. # don't leave an open transaction around
  496. self.commit(compact=False)
  497. return self.open_index(self.get_transaction_id())
  498. def prepare_txn(self, transaction_id, do_cleanup=True):
  499. self._active_txn = True
  500. if self.do_lock and not self.lock.got_exclusive_lock():
  501. if self.exclusive is not None:
  502. # self.exclusive is either True or False, thus a new client is active here.
  503. # if it is False and we get here, the caller did not use exclusive=True although
  504. # it is needed for a write operation. if it is True and we get here, something else
  505. # went very wrong, because we should have a exclusive lock, but we don't.
  506. raise AssertionError("bug in code, exclusive lock should exist here")
  507. # if we are here, this is an old client talking to a new server (expecting lock upgrade).
  508. # or we are replaying segments and might need a lock upgrade for that.
  509. try:
  510. self.lock.upgrade()
  511. except (LockError, LockErrorT):
  512. # if upgrading the lock to exclusive fails, we do not have an
  513. # active transaction. this is important for "serve" mode, where
  514. # the repository instance lives on - even if exceptions happened.
  515. self._active_txn = False
  516. raise
  517. if not self.index or transaction_id is None:
  518. try:
  519. self.index = self.open_index(transaction_id, auto_recover=False)
  520. except (ValueError, OSError, FileIntegrityError) as exc:
  521. logger.warning('Checking repository transaction due to previous error: %s', exc)
  522. self.check_transaction()
  523. self.index = self.open_index(transaction_id, auto_recover=False)
  524. if transaction_id is None:
  525. self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x]
  526. self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x]
  527. self.storage_quota_use = 0
  528. self.shadow_index.clear()
  529. else:
  530. if do_cleanup:
  531. self.io.cleanup(transaction_id)
  532. hints_path = os.path.join(self.path, 'hints.%d' % transaction_id)
  533. index_path = os.path.join(self.path, 'index.%d' % transaction_id)
  534. integrity_data = self._read_integrity(transaction_id, b'hints')
  535. try:
  536. with IntegrityCheckedFile(hints_path, write=False, integrity_data=integrity_data) as fd:
  537. hints = msgpack.unpack(fd)
  538. except (msgpack.UnpackException, FileNotFoundError, FileIntegrityError) as e:
  539. logger.warning('Repository hints file missing or corrupted, trying to recover: %s', e)
  540. if not isinstance(e, FileNotFoundError):
  541. os.unlink(hints_path)
  542. # index must exist at this point
  543. os.unlink(index_path)
  544. self.check_transaction()
  545. self.prepare_txn(transaction_id)
  546. return
  547. if hints[b'version'] == 1:
  548. logger.debug('Upgrading from v1 hints.%d', transaction_id)
  549. self.segments = hints[b'segments']
  550. self.compact = FreeSpace()
  551. self.storage_quota_use = 0
  552. self.shadow_index = {}
  553. for segment in sorted(hints[b'compact']):
  554. logger.debug('Rebuilding sparse info for segment %d', segment)
  555. self._rebuild_sparse(segment)
  556. logger.debug('Upgrade to v2 hints complete')
  557. elif hints[b'version'] != 2:
  558. raise ValueError('Unknown hints file version: %d' % hints[b'version'])
  559. else:
  560. self.segments = hints[b'segments']
  561. self.compact = FreeSpace(hints[b'compact'])
  562. self.storage_quota_use = hints.get(b'storage_quota_use', 0)
  563. self.shadow_index = hints.get(b'shadow_index', {})
  564. self.log_storage_quota()
  565. # Drop uncommitted segments in the shadow index
  566. for key, shadowed_segments in self.shadow_index.items():
  567. for segment in list(shadowed_segments):
  568. if segment > transaction_id:
  569. shadowed_segments.remove(segment)
  570. def write_index(self):
  571. def flush_and_sync(fd):
  572. fd.flush()
  573. os.fsync(fd.fileno())
  574. def rename_tmp(file):
  575. os.replace(file + ".tmp", file)
  576. hints = {
  577. b'version': 2,
  578. b'segments': self.segments,
  579. b'compact': self.compact,
  580. b'storage_quota_use': self.storage_quota_use,
  581. b'shadow_index': self.shadow_index,
  582. }
  583. integrity = {
  584. # Integrity version started at 2, the current hints version.
  585. # Thus, integrity version == hints version, for now.
  586. b'version': 2,
  587. }
  588. transaction_id = self.io.get_segments_transaction_id()
  589. assert transaction_id is not None
  590. # Log transaction in append-only mode
  591. if self.append_only:
  592. with open(os.path.join(self.path, 'transactions'), 'a') as log:
  593. print('transaction %d, UTC time %s' % (
  594. transaction_id, utcnow().strftime(ISO_FORMAT)), file=log)
  595. # Write hints file
  596. hints_name = 'hints.%d' % transaction_id
  597. hints_file = os.path.join(self.path, hints_name)
  598. with IntegrityCheckedFile(hints_file + '.tmp', filename=hints_name, write=True) as fd:
  599. msgpack.pack(hints, fd)
  600. flush_and_sync(fd)
  601. integrity[b'hints'] = fd.integrity_data
  602. # Write repository index
  603. index_name = 'index.%d' % transaction_id
  604. index_file = os.path.join(self.path, index_name)
  605. with IntegrityCheckedFile(index_file + '.tmp', filename=index_name, write=True) as fd:
  606. # XXX: Consider using SyncFile for index write-outs.
  607. self.index.write(fd)
  608. flush_and_sync(fd)
  609. integrity[b'index'] = fd.integrity_data
  610. # Write integrity file, containing checksums of the hints and index files
  611. integrity_name = 'integrity.%d' % transaction_id
  612. integrity_file = os.path.join(self.path, integrity_name)
  613. with open(integrity_file + '.tmp', 'wb') as fd:
  614. msgpack.pack(integrity, fd)
  615. flush_and_sync(fd)
  616. # Rename the integrity file first
  617. rename_tmp(integrity_file)
  618. sync_dir(self.path)
  619. # Rename the others after the integrity file is hypothetically on disk
  620. rename_tmp(hints_file)
  621. rename_tmp(index_file)
  622. sync_dir(self.path)
  623. # Remove old auxiliary files
  624. current = '.%d' % transaction_id
  625. for name in os.listdir(self.path):
  626. if not name.startswith(('index.', 'hints.', 'integrity.')):
  627. continue
  628. if name.endswith(current):
  629. continue
  630. os.unlink(os.path.join(self.path, name))
  631. self.index = None
  632. def check_free_space(self):
  633. """Pre-commit check for sufficient free space to actually perform the commit."""
  634. # As a baseline we take four times the current (on-disk) index size.
  635. # At this point the index may only be updated by compaction, which won't resize it.
  636. # We still apply a factor of four so that a later, separate invocation can free space
  637. # (journaling all deletes for all chunks is one index size) or still make minor additions
  638. # (which may grow the index up to twice its current size).
  639. # Note that in a subsequent operation the committed index is still on-disk, therefore we
  640. # arrive at index_size * (1 + 2 + 1).
  641. # In that order: journaled deletes (1), hashtable growth (2), persisted index (1).
  642. required_free_space = self.index.size() * 4
  643. # Conservatively estimate hints file size:
  644. # 10 bytes for each segment-refcount pair, 10 bytes for each segment-space pair
  645. # Assume maximum of 5 bytes per integer. Segment numbers will usually be packed more densely (1-3 bytes),
  646. # as will refcounts and free space integers. For 5 MiB segments this estimate is good to ~20 PB repo size.
  647. # Add 4K to generously account for constant format overhead.
  648. hints_size = len(self.segments) * 10 + len(self.compact) * 10 + 4096
  649. required_free_space += hints_size
  650. required_free_space += self.additional_free_space
  651. if not self.append_only:
  652. full_segment_size = self.max_segment_size + MAX_OBJECT_SIZE
  653. if len(self.compact) < 10:
  654. # This is mostly for the test suite to avoid overestimated free space needs. This can be annoying
  655. # if TMP is a small-ish tmpfs.
  656. compact_working_space = 0
  657. for segment, free in self.compact.items():
  658. try:
  659. compact_working_space += self.io.segment_size(segment) - free
  660. except FileNotFoundError:
  661. # looks like self.compact is referring to a non-existent segment file, ignore it.
  662. pass
  663. logger.debug('check_free_space: few segments, not requiring a full free segment')
  664. compact_working_space = min(compact_working_space, full_segment_size)
  665. logger.debug('check_free_space: calculated working space for compact as %d bytes', compact_working_space)
  666. required_free_space += compact_working_space
  667. else:
  668. # Keep one full worst-case segment free in non-append-only mode
  669. required_free_space += full_segment_size
  670. try:
  671. free_space = shutil.disk_usage(self.path).free
  672. except OSError as os_error:
  673. logger.warning('Failed to check free space before committing: ' + str(os_error))
  674. return
  675. logger.debug(f'check_free_space: required bytes {required_free_space}, free bytes {free_space}')
  676. if free_space < required_free_space:
  677. if self.created:
  678. logger.error('Not enough free space to initialize repository at this location.')
  679. self.destroy()
  680. else:
  681. self._rollback(cleanup=True)
  682. formatted_required = format_file_size(required_free_space)
  683. formatted_free = format_file_size(free_space)
  684. raise self.InsufficientFreeSpaceError(formatted_required, formatted_free)
  685. def log_storage_quota(self):
  686. if self.storage_quota:
  687. logger.info('Storage quota: %s out of %s used.',
  688. format_file_size(self.storage_quota_use), format_file_size(self.storage_quota))
  689. def compact_segments(self, threshold):
  690. """Compact sparse segments by copying data into new segments
  691. """
  692. if not self.compact:
  693. logger.debug('nothing to do: compact empty')
  694. return
  695. quota_use_before = self.storage_quota_use
  696. index_transaction_id = self.get_index_transaction_id()
  697. segments = self.segments
  698. unused = [] # list of segments, that are not used anymore
  699. def complete_xfer(intermediate=True):
  700. # complete the current transfer (when some target segment is full)
  701. nonlocal unused
  702. # commit the new, compact, used segments
  703. segment = self.io.write_commit(intermediate=intermediate)
  704. self.segments.setdefault(segment, 0)
  705. self.compact[segment] += LoggedIO.header_fmt.size
  706. logger.debug('complete_xfer: wrote %scommit at segment %d', 'intermediate ' if intermediate else '', segment)
  707. # get rid of the old, sparse, unused segments. free space.
  708. for segment in unused:
  709. logger.debug('complete_xfer: deleting unused segment %d', segment)
  710. count = self.segments.pop(segment)
  711. assert count == 0, 'Corrupted segment reference count - corrupted index or hints'
  712. self.io.delete_segment(segment)
  713. del self.compact[segment]
  714. unused = []
  715. logger.debug('Compaction started (threshold is %i%%).', threshold * 100)
  716. pi = ProgressIndicatorPercent(total=len(self.compact), msg='Compacting segments %3.0f%%', step=1,
  717. msgid='repository.compact_segments')
  718. for segment, freeable_space in sorted(self.compact.items()):
  719. if not self.io.segment_exists(segment):
  720. logger.warning('segment %d not found, but listed in compaction data', segment)
  721. del self.compact[segment]
  722. pi.show()
  723. continue
  724. segment_size = self.io.segment_size(segment)
  725. freeable_ratio = 1.0 * freeable_space / segment_size
  726. # we want to compact if:
  727. # - we can free a considerable relative amount of space (freeable_ratio over some threshold)
  728. if not (freeable_ratio > threshold):
  729. logger.debug('not compacting segment %d (maybe freeable: %2.2f%% [%d bytes])',
  730. segment, freeable_ratio * 100.0, freeable_space)
  731. pi.show()
  732. continue
  733. segments.setdefault(segment, 0)
  734. logger.debug('compacting segment %d with usage count %d (maybe freeable: %2.2f%% [%d bytes])',
  735. segment, segments[segment], freeable_ratio * 100.0, freeable_space)
  736. for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
  737. if tag == TAG_COMMIT:
  738. continue
  739. in_index = self.index.get(key)
  740. is_index_object = in_index == (segment, offset)
  741. if tag == TAG_PUT and is_index_object:
  742. try:
  743. new_segment, offset = self.io.write_put(key, data, raise_full=True)
  744. except LoggedIO.SegmentFull:
  745. complete_xfer()
  746. new_segment, offset = self.io.write_put(key, data)
  747. self.index[key] = new_segment, offset
  748. segments.setdefault(new_segment, 0)
  749. segments[new_segment] += 1
  750. segments[segment] -= 1
  751. elif tag == TAG_PUT and not is_index_object:
  752. # If this is a PUT shadowed by a later tag, then it will be gone when this segment is deleted after
  753. # this loop. Therefore it is removed from the shadow index.
  754. try:
  755. self.shadow_index[key].remove(segment)
  756. except (KeyError, ValueError):
  757. # do not remove entry with empty shadowed_segments list here,
  758. # it is needed for shadowed_put_exists code (see below)!
  759. pass
  760. self.storage_quota_use -= len(data) + self.io.put_header_fmt.size
  761. elif tag == TAG_DELETE and not in_index:
  762. # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
  763. # therefore we do not drop the delete, but write it to a current segment.
  764. key_not_in_shadow_index = key not in self.shadow_index
  765. # If the key is in the shadow index and there is any segment with an older PUT of this
  766. # key, we have a shadowed put.
  767. shadowed_put_exists = key_not_in_shadow_index or any(
  768. shadowed < segment for shadowed in self.shadow_index[key])
  769. delete_is_not_stable = index_transaction_id is None or segment > index_transaction_id
  770. if shadowed_put_exists or delete_is_not_stable:
  771. # (introduced in 6425d16aa84be1eaaf88)
  772. # This is needed to avoid object un-deletion if we crash between the commit and the deletion
  773. # of old segments in complete_xfer().
  774. #
  775. # However, this only happens if the crash also affects the FS to the effect that file deletions
  776. # did not materialize consistently after journal recovery. If they always materialize in-order
  777. # then this is not a problem, because the old segment containing a deleted object would be deleted
  778. # before the segment containing the delete.
  779. #
  780. # Consider the following series of operations if we would not do this, ie. this entire if:
  781. # would be removed.
  782. # Columns are segments, lines are different keys (line 1 = some key, line 2 = some other key)
  783. # Legend: P=TAG_PUT, D=TAG_DELETE, c=commit, i=index is written for latest commit
  784. #
  785. # Segment | 1 | 2 | 3
  786. # --------+-------+-----+------
  787. # Key 1 | P | D |
  788. # Key 2 | P | | P
  789. # commits | c i | c | c i
  790. # --------+-------+-----+------
  791. # ^- compact_segments starts
  792. # ^- complete_xfer commits, after that complete_xfer deletes
  793. # segments 1 and 2 (and then the index would be written).
  794. #
  795. # Now we crash. But only segment 2 gets deleted, while segment 1 is still around. Now key 1
  796. # is suddenly undeleted (because the delete in segment 2 is now missing).
  797. # Again, note the requirement here. We delete these in the correct order that this doesn't happen,
  798. # and only if the FS materialization of these deletes is reordered or parts dropped this can happen.
  799. # In this case it doesn't cause outright corruption, 'just' an index count mismatch, which will be
  800. # fixed by borg-check --repair.
  801. #
  802. # Note that in this check the index state is the proxy for a "most definitely settled" repository state,
  803. # ie. the assumption is that *all* operations on segments <= index state are completed and stable.
  804. try:
  805. new_segment, size = self.io.write_delete(key, raise_full=True)
  806. except LoggedIO.SegmentFull:
  807. complete_xfer()
  808. new_segment, size = self.io.write_delete(key)
  809. self.compact[new_segment] += size
  810. segments.setdefault(new_segment, 0)
  811. else:
  812. logger.debug('dropping DEL for id %s - seg %d, iti %r, knisi %r, spe %r, dins %r, si %r',
  813. bin_to_hex(key), segment, index_transaction_id,
  814. key_not_in_shadow_index, shadowed_put_exists, delete_is_not_stable,
  815. self.shadow_index.get(key))
  816. # we did not keep the delete tag for key (see if-branch)
  817. if not self.shadow_index[key]:
  818. # shadowed segments list is empty -> remove it
  819. del self.shadow_index[key]
  820. assert segments[segment] == 0, 'Corrupted segment reference count - corrupted index or hints'
  821. unused.append(segment)
  822. pi.show()
  823. pi.finish()
  824. complete_xfer(intermediate=False)
  825. self.io.clear_empty_dirs()
  826. quota_use_after = self.storage_quota_use
  827. logger.info('compaction freed about %s repository space.', format_file_size(quota_use_before - quota_use_after))
  828. logger.debug('compaction completed.')
  829. def replay_segments(self, index_transaction_id, segments_transaction_id):
  830. # fake an old client, so that in case we do not have an exclusive lock yet, prepare_txn will upgrade the lock:
  831. remember_exclusive = self.exclusive
  832. self.exclusive = None
  833. self.prepare_txn(index_transaction_id, do_cleanup=False)
  834. try:
  835. segment_count = sum(1 for _ in self.io.segment_iterator())
  836. pi = ProgressIndicatorPercent(total=segment_count, msg='Replaying segments %3.0f%%',
  837. msgid='repository.replay_segments')
  838. for i, (segment, filename) in enumerate(self.io.segment_iterator()):
  839. pi.show(i)
  840. if index_transaction_id is not None and segment <= index_transaction_id:
  841. continue
  842. if segment > segments_transaction_id:
  843. break
  844. objects = self.io.iter_objects(segment)
  845. self._update_index(segment, objects)
  846. pi.finish()
  847. self.write_index()
  848. finally:
  849. self.exclusive = remember_exclusive
  850. self.rollback()
  851. def _update_index(self, segment, objects, report=None):
  852. """some code shared between replay_segments and check"""
  853. self.segments[segment] = 0
  854. for tag, key, offset, size in objects:
  855. if tag == TAG_PUT:
  856. try:
  857. # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
  858. s, _ = self.index[key]
  859. self.compact[s] += size
  860. self.segments[s] -= 1
  861. self.shadow_index.setdefault(key, []).append(s)
  862. except KeyError:
  863. pass
  864. self.index[key] = segment, offset
  865. self.segments[segment] += 1
  866. self.storage_quota_use += size # note: size already includes the put_header_fmt overhead
  867. elif tag == TAG_DELETE:
  868. try:
  869. # if the deleted PUT is not in the index, there is nothing to clean up
  870. s, offset = self.index.pop(key)
  871. except KeyError:
  872. pass
  873. else:
  874. if self.io.segment_exists(s):
  875. # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
  876. # is already gone, then it was already compacted.
  877. self.segments[s] -= 1
  878. size = self.io.read(s, offset, key, read_data=False)
  879. self.compact[s] += size
  880. self.shadow_index.setdefault(key, []).append(s)
  881. elif tag == TAG_COMMIT:
  882. continue
  883. else:
  884. msg = f'Unexpected tag {tag} in segment {segment}'
  885. if report is None:
  886. raise self.CheckNeeded(msg)
  887. else:
  888. report(msg)
  889. if self.segments[segment] == 0:
  890. self.compact[segment] = self.io.segment_size(segment)
  891. def _rebuild_sparse(self, segment):
  892. """Rebuild sparse bytes count for a single segment relative to the current index."""
  893. try:
  894. segment_size = self.io.segment_size(segment)
  895. except FileNotFoundError:
  896. # segment does not exist any more, remove it from the mappings
  897. # note: no need to self.compact.pop(segment), as we start from empty mapping.
  898. self.segments.pop(segment)
  899. return
  900. if self.segments[segment] == 0:
  901. self.compact[segment] = segment_size
  902. return
  903. self.compact[segment] = 0
  904. for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
  905. if tag == TAG_PUT:
  906. if self.index.get(key, (-1, -1)) != (segment, offset):
  907. # This PUT is superseded later
  908. self.compact[segment] += size
  909. elif tag == TAG_DELETE:
  910. # The outcome of the DELETE has been recorded in the PUT branch already
  911. self.compact[segment] += size
  912. def check(self, repair=False, save_space=False, max_duration=0):
  913. """Check repository consistency
  914. This method verifies all segment checksums and makes sure
  915. the index is consistent with the data stored in the segments.
  916. """
  917. if self.append_only and repair:
  918. raise ValueError(self.path + " is in append-only mode")
  919. error_found = False
  920. def report_error(msg, *args):
  921. nonlocal error_found
  922. error_found = True
  923. logger.error(msg, *args)
  924. logger.info('Starting repository check')
  925. assert not self._active_txn
  926. try:
  927. transaction_id = self.get_transaction_id()
  928. current_index = self.open_index(transaction_id)
  929. logger.debug('Read committed index of transaction %d', transaction_id)
  930. except Exception as exc:
  931. transaction_id = self.io.get_segments_transaction_id()
  932. current_index = None
  933. logger.debug('Failed to read committed index (%s)', exc)
  934. if transaction_id is None:
  935. logger.debug('No segments transaction found')
  936. transaction_id = self.get_index_transaction_id()
  937. if transaction_id is None:
  938. logger.debug('No index transaction found, trying latest segment')
  939. transaction_id = self.io.get_latest_segment()
  940. if transaction_id is None:
  941. report_error('This repository contains no valid data.')
  942. return False
  943. if repair:
  944. self.io.cleanup(transaction_id)
  945. segments_transaction_id = self.io.get_segments_transaction_id()
  946. logger.debug('Segment transaction is %s', segments_transaction_id)
  947. logger.debug('Determined transaction is %s', transaction_id)
  948. self.prepare_txn(None) # self.index, self.compact, self.segments, self.shadow_index all empty now!
  949. segment_count = sum(1 for _ in self.io.segment_iterator())
  950. logger.debug('Found %d segments', segment_count)
  951. partial = bool(max_duration)
  952. assert not (repair and partial)
  953. mode = 'partial' if partial else 'full'
  954. if partial:
  955. # continue a past partial check (if any) or start one from beginning
  956. last_segment_checked = self.config.getint('repository', 'last_segment_checked', fallback=-1)
  957. logger.info('skipping to segments >= %d', last_segment_checked + 1)
  958. else:
  959. # start from the beginning and also forget about any potential past partial checks
  960. last_segment_checked = -1
  961. self.config.remove_option('repository', 'last_segment_checked')
  962. self.save_config(self.path, self.config)
  963. t_start = time.monotonic()
  964. pi = ProgressIndicatorPercent(total=segment_count, msg='Checking segments %3.1f%%', step=0.1,
  965. msgid='repository.check')
  966. segment = -1 # avoid uninitialized variable if there are no segment files at all
  967. for i, (segment, filename) in enumerate(self.io.segment_iterator()):
  968. pi.show(i)
  969. if segment <= last_segment_checked:
  970. continue
  971. if segment > transaction_id:
  972. continue
  973. logger.debug('checking segment file %s...', filename)
  974. try:
  975. objects = list(self.io.iter_objects(segment))
  976. except IntegrityError as err:
  977. report_error(str(err))
  978. objects = []
  979. if repair:
  980. self.io.recover_segment(segment, filename)
  981. objects = list(self.io.iter_objects(segment))
  982. if not partial:
  983. self._update_index(segment, objects, report_error)
  984. if partial and time.monotonic() > t_start + max_duration:
  985. logger.info('finished partial segment check, last segment checked is %d', segment)
  986. self.config.set('repository', 'last_segment_checked', str(segment))
  987. self.save_config(self.path, self.config)
  988. break
  989. else:
  990. logger.info('finished segment check at segment %d', segment)
  991. self.config.remove_option('repository', 'last_segment_checked')
  992. self.save_config(self.path, self.config)
  993. pi.finish()
  994. # self.index, self.segments, self.compact now reflect the state of the segment files up to <transaction_id>
  995. # We might need to add a commit tag if no committed segment is found
  996. if repair and segments_transaction_id is None:
  997. report_error(f'Adding commit tag to segment {transaction_id}')
  998. self.io.segment = transaction_id + 1
  999. self.io.write_commit()
  1000. if not partial:
  1001. logger.info('Starting repository index check')
  1002. if current_index and not repair:
  1003. # current_index = "as found on disk"
  1004. # self.index = "as rebuilt in-memory from segments"
  1005. if len(current_index) != len(self.index):
  1006. report_error('Index object count mismatch.')
  1007. report_error('committed index: %d objects', len(current_index))
  1008. report_error('rebuilt index: %d objects', len(self.index))
  1009. else:
  1010. logger.info('Index object count match.')
  1011. line_format = 'ID: %-64s rebuilt index: %-16s committed index: %-16s'
  1012. not_found = '<not found>'
  1013. for key, value in self.index.iteritems():
  1014. current_value = current_index.get(key, not_found)
  1015. if current_value != value:
  1016. report_error(line_format, bin_to_hex(key), value, current_value)
  1017. for key, current_value in current_index.iteritems():
  1018. if key in self.index:
  1019. continue
  1020. value = self.index.get(key, not_found)
  1021. if current_value != value:
  1022. report_error(line_format, bin_to_hex(key), value, current_value)
  1023. if repair:
  1024. self.write_index()
  1025. self.rollback()
  1026. if error_found:
  1027. if repair:
  1028. logger.info('Finished %s repository check, errors found and repaired.', mode)
  1029. else:
  1030. logger.error('Finished %s repository check, errors found.', mode)
  1031. else:
  1032. logger.info('Finished %s repository check, no problems found.', mode)
  1033. return not error_found or repair
  1034. def scan_low_level(self, segment=None, offset=None):
  1035. """Very low level scan over all segment file entries.
  1036. It does NOT care about what's committed and what not.
  1037. It does NOT care whether an object might be deleted or superseded later.
  1038. It just yields anything it finds in the segment files.
  1039. This is intended as a last-resort way to get access to all repo contents of damaged repos,
  1040. when there is uncommitted, but valuable data in there...
  1041. When segment or segment+offset is given, limit processing to this location only.
  1042. """
  1043. for current_segment, filename in self.io.segment_iterator(segment=segment):
  1044. if segment is not None and current_segment > segment:
  1045. break
  1046. try:
  1047. for tag, key, current_offset, data in self.io.iter_objects(segment=current_segment,
  1048. offset=offset or 0, include_data=True):
  1049. if offset is not None and current_offset > offset:
  1050. break
  1051. yield key, data, tag, current_segment, current_offset
  1052. except IntegrityError as err:
  1053. logger.error('Segment %d (%s) has IntegrityError(s) [%s] - skipping.' % (
  1054. current_segment, filename, str(err)))
  1055. def _rollback(self, *, cleanup):
  1056. """
  1057. """
  1058. if cleanup:
  1059. self.io.cleanup(self.io.get_segments_transaction_id())
  1060. self.index = None
  1061. self._active_txn = False
  1062. self.transaction_doomed = None
  1063. def rollback(self):
  1064. # note: when used in remote mode, this is time limited, see RemoteRepository.shutdown_time.
  1065. self._rollback(cleanup=False)
  1066. def __len__(self):
  1067. if not self.index:
  1068. self.index = self.open_index(self.get_transaction_id())
  1069. return len(self.index)
  1070. def __contains__(self, id):
  1071. if not self.index:
  1072. self.index = self.open_index(self.get_transaction_id())
  1073. return id in self.index
  1074. def list(self, limit=None, marker=None):
  1075. """
  1076. list <limit> IDs starting from after id <marker> - in index (pseudo-random) order.
  1077. """
  1078. if not self.index:
  1079. self.index = self.open_index(self.get_transaction_id())
  1080. return [id_ for id_, _ in islice(self.index.iteritems(marker=marker), limit)]
  1081. def scan(self, limit=None, marker=None):
  1082. """
  1083. list <limit> IDs starting from after id <marker> - in on-disk order, so that a client
  1084. fetching data in this order does linear reads and reuses stuff from disk cache.
  1085. We rely on repository.check() has run already (either now or some time before) and that:
  1086. - if we are called from a borg check command, self.index is a valid, fresh, in-sync repo index.
  1087. - if we are called from elsewhere, either self.index or the on-disk index is valid and in-sync.
  1088. - the repository segments are valid (no CRC errors).
  1089. if we encounter CRC errors in segment entry headers, rest of segment is skipped.
  1090. """
  1091. if limit is not None and limit < 1:
  1092. raise ValueError('please use limit > 0 or limit = None')
  1093. if not self.index:
  1094. transaction_id = self.get_transaction_id()
  1095. self.index = self.open_index(transaction_id)
  1096. at_start = marker is None
  1097. # smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
  1098. start_segment, start_offset = (0, 0) if at_start else self.index[marker]
  1099. result = []
  1100. for segment, filename in self.io.segment_iterator(start_segment):
  1101. obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False, include_data=False)
  1102. while True:
  1103. try:
  1104. tag, id, offset, size = next(obj_iterator)
  1105. except (StopIteration, IntegrityError):
  1106. # either end-of-segment or an error - we can not seek to objects at
  1107. # higher offsets than one that has an error in the header fields
  1108. break
  1109. if start_offset > 0:
  1110. # we are using a marker and the marker points to the last object we have already
  1111. # returned in the previous scan() call - thus, we need to skip this one object.
  1112. # also, for the next segment, we need to start at offset 0.
  1113. start_offset = 0
  1114. continue
  1115. if tag == TAG_PUT and (segment, offset) == self.index.get(id):
  1116. # we have found an existing and current object
  1117. result.append(id)
  1118. if len(result) == limit:
  1119. return result
  1120. return result
  1121. def get(self, id):
  1122. if not self.index:
  1123. self.index = self.open_index(self.get_transaction_id())
  1124. try:
  1125. segment, offset = self.index[id]
  1126. return self.io.read(segment, offset, id)
  1127. except KeyError:
  1128. raise self.ObjectNotFound(id, self.path) from None
  1129. def get_many(self, ids, is_preloaded=False):
  1130. for id_ in ids:
  1131. yield self.get(id_)
  1132. def put(self, id, data, wait=True):
  1133. """put a repo object
  1134. Note: when doing calls with wait=False this gets async and caller must
  1135. deal with async results / exceptions later.
  1136. """
  1137. if not self._active_txn:
  1138. self.prepare_txn(self.get_transaction_id())
  1139. try:
  1140. segment, offset = self.index[id]
  1141. except KeyError:
  1142. pass
  1143. else:
  1144. # this put call supersedes a previous put to same id.
  1145. # it is essential to do a delete first to get correct quota bookkeeping
  1146. # and also a correctly updated shadow_index, so that the compaction code
  1147. # does not wrongly resurrect an old PUT by dropping a DEL that is still needed.
  1148. self._delete(id, segment, offset)
  1149. segment, offset = self.io.write_put(id, data)
  1150. self.storage_quota_use += len(data) + self.io.put_header_fmt.size
  1151. self.segments.setdefault(segment, 0)
  1152. self.segments[segment] += 1
  1153. self.index[id] = segment, offset
  1154. if self.storage_quota and self.storage_quota_use > self.storage_quota:
  1155. self.transaction_doomed = self.StorageQuotaExceeded(
  1156. format_file_size(self.storage_quota), format_file_size(self.storage_quota_use))
  1157. raise self.transaction_doomed
  1158. def delete(self, id, wait=True):
  1159. """delete a repo object
  1160. Note: when doing calls with wait=False this gets async and caller must
  1161. deal with async results / exceptions later.
  1162. """
  1163. if not self._active_txn:
  1164. self.prepare_txn(self.get_transaction_id())
  1165. try:
  1166. segment, offset = self.index.pop(id)
  1167. except KeyError:
  1168. raise self.ObjectNotFound(id, self.path) from None
  1169. self._delete(id, segment, offset)
  1170. def _delete(self, id, segment, offset):
  1171. # common code used by put and delete
  1172. # because we'll write a DEL tag to the repository, we must update the shadow index.
  1173. # this is always true, no matter whether we are called from put() or delete().
  1174. # the compaction code needs this to not drop DEL tags if they are still required
  1175. # to keep a PUT in an earlier segment in the "effectively deleted" state.
  1176. self.shadow_index.setdefault(id, []).append(segment)
  1177. self.segments[segment] -= 1
  1178. size = self.io.read(segment, offset, id, read_data=False)
  1179. self.compact[segment] += size
  1180. segment, size = self.io.write_delete(id)
  1181. self.compact[segment] += size
  1182. self.segments.setdefault(segment, 0)
  1183. def async_response(self, wait=True):
  1184. """Get one async result (only applies to remote repositories).
  1185. async commands (== calls with wait=False, e.g. delete and put) have no results,
  1186. but may raise exceptions. These async exceptions must get collected later via
  1187. async_response() calls. Repeat the call until it returns None.
  1188. The previous calls might either return one (non-None) result or raise an exception.
  1189. If wait=True is given and there are outstanding responses, it will wait for them
  1190. to arrive. With wait=False, it will only return already received responses.
  1191. """
  1192. def preload(self, ids):
  1193. """Preload objects (only applies to remote repositories)
  1194. """
  1195. class LoggedIO:
  1196. class SegmentFull(Exception):
  1197. """raised when a segment is full, before opening next"""
  1198. header_fmt = struct.Struct('<IIB')
  1199. assert header_fmt.size == 9
  1200. put_header_fmt = struct.Struct('<IIB32s')
  1201. assert put_header_fmt.size == 41
  1202. header_no_crc_fmt = struct.Struct('<IB')
  1203. assert header_no_crc_fmt.size == 5
  1204. crc_fmt = struct.Struct('<I')
  1205. assert crc_fmt.size == 4
  1206. _commit = header_no_crc_fmt.pack(9, TAG_COMMIT)
  1207. COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
  1208. def __init__(self, path, limit, segments_per_dir, capacity=90):
  1209. self.path = path
  1210. self.fds = LRUCache(capacity, dispose=self._close_fd)
  1211. self.segment = 0
  1212. self.limit = limit
  1213. self.segments_per_dir = segments_per_dir
  1214. self.offset = 0
  1215. self._write_fd = None
  1216. self._fds_cleaned = 0
  1217. def close(self):
  1218. self.close_segment()
  1219. self.fds.clear()
  1220. self.fds = None # Just to make sure we're disabled
  1221. def _close_fd(self, ts_fd):
  1222. ts, fd = ts_fd
  1223. safe_fadvise(fd.fileno(), 0, 0, 'DONTNEED')
  1224. fd.close()
  1225. def get_segment_dirs(self, data_dir, start_index=MIN_SEGMENT_DIR_INDEX, end_index=MAX_SEGMENT_DIR_INDEX):
  1226. """Returns generator yielding required segment dirs in data_dir as `os.DirEntry` objects.
  1227. Start and end are inclusive.
  1228. """
  1229. segment_dirs = (
  1230. f
  1231. for f in os.scandir(data_dir)
  1232. if f.is_dir() and f.name.isdigit() and start_index <= int(f.name) <= end_index
  1233. )
  1234. return segment_dirs
  1235. def get_segment_files(self, segment_dir, start_index=MIN_SEGMENT_INDEX, end_index=MAX_SEGMENT_INDEX):
  1236. """Returns generator yielding required segment files in segment_dir as `os.DirEntry` objects.
  1237. Start and end are inclusive.
  1238. """
  1239. segment_files = (
  1240. f
  1241. for f in os.scandir(segment_dir)
  1242. if f.is_file() and f.name.isdigit() and start_index <= int(f.name) <= end_index
  1243. )
  1244. return segment_files
  1245. def segment_iterator(self, segment=None, reverse=False):
  1246. if segment is None:
  1247. segment = MIN_SEGMENT_INDEX if not reverse else MAX_SEGMENT_INDEX
  1248. start_segment_dir = segment // self.segments_per_dir
  1249. data_path = os.path.join(self.path, 'data')
  1250. if not reverse:
  1251. dirs = self.get_segment_dirs(data_path, start_index=start_segment_dir)
  1252. else:
  1253. dirs = self.get_segment_dirs(data_path, end_index=start_segment_dir)
  1254. dirs = sorted(dirs, key=lambda dir: int(dir.name), reverse=reverse)
  1255. for dir in dirs:
  1256. if not reverse:
  1257. files = self.get_segment_files(dir, start_index=segment)
  1258. else:
  1259. files = self.get_segment_files(dir, end_index=segment)
  1260. files = sorted(files, key=lambda file: int(file.name), reverse=reverse)
  1261. for file in files:
  1262. # Note: Do not filter out logically deleted segments (see "File system interaction" above),
  1263. # since this is used by cleanup and txn state detection as well.
  1264. yield int(file.name), file.path
  1265. def get_latest_segment(self):
  1266. for segment, filename in self.segment_iterator(reverse=True):
  1267. return segment
  1268. return None
  1269. def get_segments_transaction_id(self):
  1270. """Return the last committed segment.
  1271. """
  1272. for segment, filename in self.segment_iterator(reverse=True):
  1273. if self.is_committed_segment(segment):
  1274. return segment
  1275. return None
  1276. def cleanup(self, transaction_id):
  1277. """Delete segment files left by aborted transactions
  1278. """
  1279. self.close_segment()
  1280. self.segment = transaction_id + 1
  1281. count = 0
  1282. for segment, filename in self.segment_iterator(reverse=True):
  1283. if segment > transaction_id:
  1284. self.delete_segment(segment)
  1285. count += 1
  1286. else:
  1287. break
  1288. logger.debug('Cleaned up %d uncommitted segment files (== everything after segment %d).',
  1289. count, transaction_id)
  1290. def is_committed_segment(self, segment):
  1291. """Check if segment ends with a COMMIT_TAG tag
  1292. """
  1293. try:
  1294. iterator = self.iter_objects(segment)
  1295. except IntegrityError:
  1296. return False
  1297. with open(self.segment_filename(segment), 'rb') as fd:
  1298. try:
  1299. fd.seek(-self.header_fmt.size, os.SEEK_END)
  1300. except OSError as e:
  1301. # return False if segment file is empty or too small
  1302. if e.errno == errno.EINVAL:
  1303. return False
  1304. raise e
  1305. if fd.read(self.header_fmt.size) != self.COMMIT:
  1306. return False
  1307. seen_commit = False
  1308. while True:
  1309. try:
  1310. tag, key, offset, _ = next(iterator)
  1311. except IntegrityError:
  1312. return False
  1313. except StopIteration:
  1314. break
  1315. if tag == TAG_COMMIT:
  1316. seen_commit = True
  1317. continue
  1318. if seen_commit:
  1319. return False
  1320. return seen_commit
  1321. def segment_filename(self, segment):
  1322. return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
  1323. def get_write_fd(self, no_new=False, want_new=False, raise_full=False):
  1324. if not no_new and (want_new or self.offset and self.offset > self.limit):
  1325. if raise_full:
  1326. raise self.SegmentFull
  1327. self.close_segment()
  1328. if not self._write_fd:
  1329. if self.segment % self.segments_per_dir == 0:
  1330. dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
  1331. if not os.path.exists(dirname):
  1332. os.mkdir(dirname)
  1333. sync_dir(os.path.join(self.path, 'data'))
  1334. self._write_fd = SyncFile(self.segment_filename(self.segment), binary=True)
  1335. self._write_fd.write(MAGIC)
  1336. self.offset = MAGIC_LEN
  1337. if self.segment in self.fds:
  1338. # we may have a cached fd for a segment file we already deleted and
  1339. # we are writing now a new segment file to same file name. get rid of
  1340. # of the cached fd that still refers to the old file, so it will later
  1341. # get repopulated (on demand) with a fd that refers to the new file.
  1342. del self.fds[self.segment]
  1343. return self._write_fd
  1344. def get_fd(self, segment):
  1345. # note: get_fd() returns a fd with undefined file pointer position,
  1346. # so callers must always seek() to desired position afterwards.
  1347. now = time.monotonic()
  1348. def open_fd():
  1349. fd = open(self.segment_filename(segment), 'rb')
  1350. self.fds[segment] = (now, fd)
  1351. return fd
  1352. def clean_old():
  1353. # we regularly get rid of all old FDs here:
  1354. if now - self._fds_cleaned > FD_MAX_AGE // 8:
  1355. self._fds_cleaned = now
  1356. for k, ts_fd in list(self.fds.items()):
  1357. ts, fd = ts_fd
  1358. if now - ts > FD_MAX_AGE:
  1359. # we do not want to touch long-unused file handles to
  1360. # avoid ESTALE issues (e.g. on network filesystems).
  1361. del self.fds[k]
  1362. clean_old()
  1363. try:
  1364. ts, fd = self.fds[segment]
  1365. except KeyError:
  1366. fd = open_fd()
  1367. else:
  1368. # we only have fresh enough stuff here.
  1369. # update the timestamp of the lru cache entry.
  1370. self.fds.upd(segment, (now, fd))
  1371. return fd
  1372. def close_segment(self):
  1373. # set self._write_fd to None early to guard against reentry from error handling code paths:
  1374. fd, self._write_fd = self._write_fd, None
  1375. if fd is not None:
  1376. self.segment += 1
  1377. self.offset = 0
  1378. fd.close()
  1379. def delete_segment(self, segment):
  1380. if segment in self.fds:
  1381. del self.fds[segment]
  1382. try:
  1383. safe_unlink(self.segment_filename(segment))
  1384. except FileNotFoundError:
  1385. pass
  1386. def clear_empty_dirs(self):
  1387. """Delete empty segment dirs, i.e those with no segment files.
  1388. """
  1389. data_dir = os.path.join(self.path, 'data')
  1390. segment_dirs = self.get_segment_dirs(data_dir)
  1391. for segment_dir in segment_dirs:
  1392. try:
  1393. # os.rmdir will only delete the directory if it is empty
  1394. # so we don't need to explicitly check for emptiness first.
  1395. os.rmdir(segment_dir)
  1396. except OSError:
  1397. # OSError is raised by os.rmdir if directory is not empty. This is expected.
  1398. # Its subclass FileNotFoundError may be raised if the directory already does not exist. Ignorable.
  1399. pass
  1400. sync_dir(data_dir)
  1401. def segment_exists(self, segment):
  1402. filename = self.segment_filename(segment)
  1403. # When deleting segments, they are first truncated. If truncate(2) and unlink(2) are split
  1404. # across FS transactions, then logically deleted segments will show up as truncated.
  1405. return os.path.exists(filename) and os.path.getsize(filename)
  1406. def segment_size(self, segment):
  1407. return os.path.getsize(self.segment_filename(segment))
  1408. def get_segment_magic(self, segment):
  1409. fd = self.get_fd(segment)
  1410. fd.seek(0)
  1411. return fd.read(MAGIC_LEN)
  1412. def iter_objects(self, segment, offset=0, include_data=False, read_data=True):
  1413. """
  1414. Return object iterator for *segment*.
  1415. If read_data is False then include_data must be False as well.
  1416. Integrity checks are skipped: all data obtained from the iterator must be considered informational.
  1417. The iterator returns four-tuples of (tag, key, offset, data|size).
  1418. """
  1419. fd = self.get_fd(segment)
  1420. fd.seek(offset)
  1421. if offset == 0:
  1422. # we are touching this segment for the first time, check the MAGIC.
  1423. # Repository.scan() calls us with segment > 0 when it continues an ongoing iteration
  1424. # from a marker position - but then we have checked the magic before already.
  1425. if fd.read(MAGIC_LEN) != MAGIC:
  1426. raise IntegrityError(f'Invalid segment magic [segment {segment}, offset {0}]')
  1427. offset = MAGIC_LEN
  1428. header = fd.read(self.header_fmt.size)
  1429. while header:
  1430. size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset,
  1431. (TAG_PUT, TAG_DELETE, TAG_COMMIT),
  1432. read_data=read_data)
  1433. if include_data:
  1434. yield tag, key, offset, data
  1435. else:
  1436. yield tag, key, offset, size
  1437. offset += size
  1438. # we must get the fd via get_fd() here again as we yielded to our caller and it might
  1439. # have triggered closing of the fd we had before (e.g. by calling io.read() for
  1440. # different segment(s)).
  1441. # by calling get_fd() here again we also make our fd "recently used" so it likely
  1442. # does not get kicked out of self.fds LRUcache.
  1443. fd = self.get_fd(segment)
  1444. fd.seek(offset)
  1445. header = fd.read(self.header_fmt.size)
  1446. def recover_segment(self, segment, filename):
  1447. logger.info('attempting to recover ' + filename)
  1448. if segment in self.fds:
  1449. del self.fds[segment]
  1450. if os.path.getsize(filename) < MAGIC_LEN + self.header_fmt.size:
  1451. # this is either a zero-byte file (which would crash mmap() below) or otherwise
  1452. # just too small to be a valid non-empty segment file, so do a shortcut here:
  1453. with SaveFile(filename, binary=True) as fd:
  1454. fd.write(MAGIC)
  1455. return
  1456. with SaveFile(filename, binary=True) as dst_fd:
  1457. with open(filename, 'rb') as src_fd:
  1458. # note: file must not be 0 size or mmap() will crash.
  1459. with mmap.mmap(src_fd.fileno(), 0, access=mmap.ACCESS_READ) as mm:
  1460. # memoryview context manager is problematic, see https://bugs.python.org/issue35686
  1461. data = memoryview(mm)
  1462. d = data
  1463. try:
  1464. dst_fd.write(MAGIC)
  1465. while len(d) >= self.header_fmt.size:
  1466. crc, size, tag = self.header_fmt.unpack(d[:self.header_fmt.size])
  1467. if size > MAX_OBJECT_SIZE or tag > MAX_TAG_ID or size < self.header_fmt.size \
  1468. or size > len(d) or crc32(d[4:size]) & 0xffffffff != crc:
  1469. d = d[1:]
  1470. continue
  1471. dst_fd.write(d[:size])
  1472. d = d[size:]
  1473. finally:
  1474. del d
  1475. data.release()
  1476. def read(self, segment, offset, id, read_data=True):
  1477. """
  1478. Read entry from *segment* at *offset* with *id*.
  1479. If read_data is False the size of the entry is returned instead and integrity checks are skipped.
  1480. The return value should thus be considered informational.
  1481. """
  1482. if segment == self.segment and self._write_fd:
  1483. self._write_fd.sync()
  1484. fd = self.get_fd(segment)
  1485. fd.seek(offset)
  1486. header = fd.read(self.put_header_fmt.size)
  1487. size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ), read_data)
  1488. if id != key:
  1489. raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
  1490. segment, offset))
  1491. return data if read_data else size
  1492. def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=True):
  1493. # some code shared by read() and iter_objects()
  1494. # See comment on MAX_TAG_ID for details
  1495. assert max(acceptable_tags) <= MAX_TAG_ID, 'Exceeding MAX_TAG_ID will break backwards compatibility'
  1496. try:
  1497. hdr_tuple = fmt.unpack(header)
  1498. except struct.error as err:
  1499. raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format(
  1500. segment, offset, err)) from None
  1501. if fmt is self.put_header_fmt:
  1502. crc, size, tag, key = hdr_tuple
  1503. elif fmt is self.header_fmt:
  1504. crc, size, tag = hdr_tuple
  1505. key = None
  1506. else:
  1507. raise TypeError("_read called with unsupported format")
  1508. if size > MAX_OBJECT_SIZE:
  1509. # if you get this on an archive made with borg < 1.0.7 and millions of files and
  1510. # you need to restore it, you can disable this check by using "if False:" above.
  1511. raise IntegrityError('Invalid segment entry size {} - too big [segment {}, offset {}]'.format(
  1512. size, segment, offset))
  1513. if size < fmt.size:
  1514. raise IntegrityError('Invalid segment entry size {} - too small [segment {}, offset {}]'.format(
  1515. size, segment, offset))
  1516. length = size - fmt.size
  1517. if read_data:
  1518. data = fd.read(length)
  1519. if len(data) != length:
  1520. raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
  1521. segment, offset, length, len(data)))
  1522. if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
  1523. raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
  1524. segment, offset))
  1525. if key is None and tag in (TAG_PUT, TAG_DELETE):
  1526. key, data = data[:32], data[32:]
  1527. else:
  1528. if key is None and tag in (TAG_PUT, TAG_DELETE):
  1529. key = fd.read(32)
  1530. length -= 32
  1531. if len(key) != 32:
  1532. raise IntegrityError('Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
  1533. segment, offset, 32, len(key)))
  1534. oldpos = fd.tell()
  1535. seeked = fd.seek(length, os.SEEK_CUR) - oldpos
  1536. data = None
  1537. if seeked != length:
  1538. raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format(
  1539. segment, offset, length, seeked))
  1540. if tag not in acceptable_tags:
  1541. raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
  1542. segment, offset))
  1543. return size, tag, key, data
  1544. def write_put(self, id, data, raise_full=False):
  1545. data_size = len(data)
  1546. if data_size > MAX_DATA_SIZE:
  1547. # this would push the segment entry size beyond MAX_OBJECT_SIZE.
  1548. raise IntegrityError(f'More than allowed put data [{data_size} > {MAX_DATA_SIZE}]')
  1549. fd = self.get_write_fd(want_new=(id == Manifest.MANIFEST_ID), raise_full=raise_full)
  1550. size = data_size + self.put_header_fmt.size
  1551. offset = self.offset
  1552. header = self.header_no_crc_fmt.pack(size, TAG_PUT)
  1553. crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
  1554. fd.write(b''.join((crc, header, id, data)))
  1555. self.offset += size
  1556. return self.segment, offset
  1557. def write_delete(self, id, raise_full=False):
  1558. fd = self.get_write_fd(want_new=(id == Manifest.MANIFEST_ID), raise_full=raise_full)
  1559. header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
  1560. crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
  1561. fd.write(b''.join((crc, header, id)))
  1562. self.offset += self.put_header_fmt.size
  1563. return self.segment, self.put_header_fmt.size
  1564. def write_commit(self, intermediate=False):
  1565. # Intermediate commits go directly into the current segment - this makes checking their validity more
  1566. # expensive, but is faster and reduces clobber. Final commits go into a new segment.
  1567. fd = self.get_write_fd(want_new=not intermediate, no_new=intermediate)
  1568. if intermediate:
  1569. fd.sync()
  1570. header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
  1571. crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
  1572. fd.write(b''.join((crc, header)))
  1573. self.close_segment()
  1574. return self.segment - 1 # close_segment() increments it
  1575. assert LoggedIO.put_header_fmt.size == 41 # see constants.MAX_OBJECT_SIZE