repository.py 54 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183
  1. import errno
  2. import os
  3. import shutil
  4. import struct
  5. from binascii import hexlify, unhexlify
  6. from collections import defaultdict
  7. from configparser import ConfigParser
  8. from datetime import datetime
  9. from functools import partial
  10. from itertools import islice
  11. from zlib import crc32
  12. import msgpack
  13. import logging
  14. logger = logging.getLogger(__name__)
  15. from .constants import * # NOQA
  16. from .hashindex import NSIndex
  17. from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size
  18. from .helpers import Location
  19. from .helpers import ProgressIndicatorPercent
  20. from .helpers import bin_to_hex
  21. from .helpers import yes
  22. from .locking import Lock, LockError, LockErrorT
  23. from .logger import create_logger
  24. from .lrucache import LRUCache
  25. from .platform import SaveFile, SyncFile, sync_dir
  26. MAX_OBJECT_SIZE = 20 * 1024 * 1024
  27. MAGIC = b'BORG_SEG'
  28. MAGIC_LEN = len(MAGIC)
  29. TAG_PUT = 0
  30. TAG_DELETE = 1
  31. TAG_COMMIT = 2
  32. FreeSpace = partial(defaultdict, int)
  33. class Repository:
  34. """
  35. Filesystem based transactional key value store
  36. Transactionality is achieved by using a log (aka journal) to record changes. The log is a series of numbered files
  37. called segments. Each segment is a series of log entries. The segment number together with the offset of each
  38. entry relative to its segment start establishes an ordering of the log entries. This is the "definition" of
  39. time for the purposes of the log.
  40. Log entries are either PUT, DELETE or COMMIT.
  41. A COMMIT is always the final log entry in a segment and marks all data from the beginning of the log until the
  42. segment ending with the COMMIT as committed and consistent. The segment number of a segment ending with a COMMIT
  43. is called the transaction ID of that commit, and a segment ending with a COMMIT is called committed.
  44. When reading from a repository it is first checked whether the last segment is committed. If it is not, then
  45. all segments after the last committed segment are deleted; they contain log entries whose consistency is not
  46. established by a COMMIT.
  47. Note that the COMMIT can't establish consistency by itself, but only manages to do so with proper support from
  48. the platform (including the hardware). See platform.base.SyncFile for details.
  49. A PUT inserts a key-value pair. The value is stored in the log entry, hence the repository implements
  50. full data logging, meaning that all data is consistent, not just metadata (which is common in file systems).
  51. A DELETE marks a key as deleted.
  52. For a given key only the last entry regarding the key, which is called current (all other entries are called
  53. superseded), is relevant: If there is no entry or the last entry is a DELETE then the key does not exist.
  54. Otherwise the last PUT defines the value of the key.
  55. By superseding a PUT (with either another PUT or a DELETE) the log entry becomes obsolete. A segment containing
  56. such obsolete entries is called sparse, while a segment containing no such entries is called compact.
  57. Sparse segments can be compacted and thereby disk space freed. This destroys the transaction for which the
  58. superseded entries where current.
  59. On disk layout:
  60. dir/README
  61. dir/config
  62. dir/data/<X // SEGMENTS_PER_DIR>/<X>
  63. dir/index.X
  64. dir/hints.X
  65. """
  66. class DoesNotExist(Error):
  67. """Repository {} does not exist."""
  68. class AlreadyExists(Error):
  69. """Repository {} already exists."""
  70. class InvalidRepository(Error):
  71. """{} is not a valid repository. Check repo config."""
  72. class CheckNeeded(ErrorWithTraceback):
  73. """Inconsistency detected. Please run "borg check {}"."""
  74. class ObjectNotFound(ErrorWithTraceback):
  75. """Object with key {} not found in repository {}."""
  76. def __init__(self, id, repo):
  77. if isinstance(id, bytes):
  78. id = bin_to_hex(id)
  79. super().__init__(id, repo)
  80. class InsufficientFreeSpaceError(Error):
  81. """Insufficient free space to complete transaction (required: {}, available: {})."""
  82. def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False):
  83. self.path = os.path.abspath(path)
  84. self._location = Location('file://%s' % self.path)
  85. self.io = None
  86. self.lock = None
  87. self.index = None
  88. # This is an index of shadowed log entries during this transaction. Consider the following sequence:
  89. # segment_n PUT A, segment_x DELETE A
  90. # After the "DELETE A" in segment_x the shadow index will contain "A -> [n]".
  91. self.shadow_index = {}
  92. self._active_txn = False
  93. self.lock_wait = lock_wait
  94. self.do_lock = lock
  95. self.do_create = create
  96. self.exclusive = exclusive
  97. self.append_only = append_only
  98. self.hostname_is_unique = yes(env_var_override='BORG_HOSTNAME_IS_UNIQUE', env_msg=None, prompt=False)
  99. if self.hostname_is_unique:
  100. logger.info('Enabled removal of stale repository locks')
  101. def __del__(self):
  102. if self.lock:
  103. self.close()
  104. assert False, "cleanup happened in Repository.__del__"
  105. def __repr__(self):
  106. return '<%s %s>' % (self.__class__.__name__, self.path)
  107. def __enter__(self):
  108. if self.do_create:
  109. self.do_create = False
  110. self.create(self.path)
  111. self.open(self.path, bool(self.exclusive), lock_wait=self.lock_wait, lock=self.do_lock)
  112. return self
  113. def __exit__(self, exc_type, exc_val, exc_tb):
  114. if exc_type is not None:
  115. no_space_left_on_device = exc_type is OSError and exc_val.errno == errno.ENOSPC
  116. # The ENOSPC could have originated somewhere else besides the Repository. The cleanup is always safe, unless
  117. # EIO or FS corruption ensues, which is why we specifically check for ENOSPC.
  118. if self._active_txn and no_space_left_on_device:
  119. logger.warning('No space left on device, cleaning up partial transaction to free space.')
  120. cleanup = True
  121. else:
  122. cleanup = False
  123. self.rollback(cleanup)
  124. self.close()
  125. @property
  126. def id_str(self):
  127. return bin_to_hex(self.id)
  128. def create(self, path):
  129. """Create a new empty repository at `path`
  130. """
  131. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  132. raise self.AlreadyExists(path)
  133. if not os.path.exists(path):
  134. os.mkdir(path)
  135. with open(os.path.join(path, 'README'), 'w') as fd:
  136. fd.write('This is a Borg repository\n')
  137. os.mkdir(os.path.join(path, 'data'))
  138. config = ConfigParser(interpolation=None)
  139. config.add_section('repository')
  140. config.set('repository', 'version', '1')
  141. config.set('repository', 'segments_per_dir', str(DEFAULT_SEGMENTS_PER_DIR))
  142. config.set('repository', 'max_segment_size', str(DEFAULT_MAX_SEGMENT_SIZE))
  143. config.set('repository', 'append_only', str(int(self.append_only)))
  144. config.set('repository', 'additional_free_space', '0')
  145. config.set('repository', 'id', bin_to_hex(os.urandom(32)))
  146. self.save_config(path, config)
  147. def save_config(self, path, config):
  148. config_path = os.path.join(path, 'config')
  149. with SaveFile(config_path) as fd:
  150. config.write(fd)
  151. def save_key(self, keydata):
  152. assert self.config
  153. keydata = keydata.decode('utf-8') # remote repo: msgpack issue #99, getting bytes
  154. self.config.set('repository', 'key', keydata)
  155. self.save_config(self.path, self.config)
  156. def load_key(self):
  157. keydata = self.config.get('repository', 'key')
  158. return keydata.encode('utf-8') # remote repo: msgpack issue #99, returning bytes
  159. def get_free_nonce(self):
  160. if not self.lock.got_exclusive_lock():
  161. raise AssertionError("bug in code, exclusive lock should exist here")
  162. nonce_path = os.path.join(self.path, 'nonce')
  163. try:
  164. with open(nonce_path, 'r') as fd:
  165. return int.from_bytes(unhexlify(fd.read()), byteorder='big')
  166. except FileNotFoundError:
  167. return None
  168. def commit_nonce_reservation(self, next_unreserved, start_nonce):
  169. if not self.lock.got_exclusive_lock():
  170. raise AssertionError("bug in code, exclusive lock should exist here")
  171. if self.get_free_nonce() != start_nonce:
  172. raise Exception("nonce space reservation with mismatched previous state")
  173. nonce_path = os.path.join(self.path, 'nonce')
  174. with SaveFile(nonce_path, binary=False) as fd:
  175. fd.write(bin_to_hex(next_unreserved.to_bytes(8, byteorder='big')))
  176. def destroy(self):
  177. """Destroy the repository at `self.path`
  178. """
  179. if self.append_only:
  180. raise ValueError(self.path + " is in append-only mode")
  181. self.close()
  182. os.remove(os.path.join(self.path, 'config')) # kill config first
  183. shutil.rmtree(self.path)
  184. def get_index_transaction_id(self):
  185. indices = sorted(int(fn[6:])
  186. for fn in os.listdir(self.path)
  187. if fn.startswith('index.') and fn[6:].isdigit() and os.stat(os.path.join(self.path, fn)).st_size != 0)
  188. if indices:
  189. return indices[-1]
  190. else:
  191. return None
  192. def check_transaction(self):
  193. index_transaction_id = self.get_index_transaction_id()
  194. segments_transaction_id = self.io.get_segments_transaction_id()
  195. if index_transaction_id is not None and segments_transaction_id is None:
  196. raise self.CheckNeeded(self.path)
  197. # Attempt to automatically rebuild index if we crashed between commit
  198. # tag write and index save
  199. if index_transaction_id != segments_transaction_id:
  200. if index_transaction_id is not None and index_transaction_id > segments_transaction_id:
  201. replay_from = None
  202. else:
  203. replay_from = index_transaction_id
  204. self.replay_segments(replay_from, segments_transaction_id)
  205. def get_transaction_id(self):
  206. self.check_transaction()
  207. return self.get_index_transaction_id()
  208. def break_lock(self):
  209. Lock(os.path.join(self.path, 'lock')).break_lock()
  210. def open(self, path, exclusive, lock_wait=None, lock=True):
  211. self.path = path
  212. if not os.path.isdir(path):
  213. raise self.DoesNotExist(path)
  214. if lock:
  215. self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait, kill_stale_locks=self.hostname_is_unique).acquire()
  216. else:
  217. self.lock = None
  218. self.config = ConfigParser(interpolation=None)
  219. self.config.read(os.path.join(self.path, 'config'))
  220. if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1:
  221. self.close()
  222. raise self.InvalidRepository(path)
  223. self.max_segment_size = self.config.getint('repository', 'max_segment_size')
  224. self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
  225. self.additional_free_space = parse_file_size(self.config.get('repository', 'additional_free_space', fallback=0))
  226. # append_only can be set in the constructor
  227. # it shouldn't be overridden (True -> False) here
  228. self.append_only = self.append_only or self.config.getboolean('repository', 'append_only', fallback=False)
  229. self.id = unhexlify(self.config.get('repository', 'id').strip())
  230. self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
  231. def close(self):
  232. if self.lock:
  233. if self.io:
  234. self.io.close()
  235. self.io = None
  236. self.lock.release()
  237. self.lock = None
  238. def commit(self, save_space=False):
  239. """Commit transaction
  240. """
  241. # save_space is not used anymore, but stays for RPC/API compatibility.
  242. self.check_free_space()
  243. self.io.write_commit()
  244. if not self.append_only:
  245. self.compact_segments()
  246. self.write_index()
  247. self.rollback()
  248. def open_index(self, transaction_id, auto_recover=True):
  249. if transaction_id is None:
  250. return NSIndex()
  251. index_path = os.path.join(self.path, 'index.%d' % transaction_id).encode('utf-8')
  252. try:
  253. return NSIndex.read(index_path)
  254. except RuntimeError as error:
  255. assert str(error) == 'hashindex_read failed' # everything else means we're in *deep* trouble
  256. logger.warning('Repository index missing or corrupted, trying to recover')
  257. os.unlink(index_path)
  258. if not auto_recover:
  259. raise
  260. self.prepare_txn(self.get_transaction_id())
  261. # don't leave an open transaction around
  262. self.commit()
  263. return self.open_index(self.get_transaction_id())
  264. def prepare_txn(self, transaction_id, do_cleanup=True):
  265. self._active_txn = True
  266. if not self.lock.got_exclusive_lock():
  267. if self.exclusive is not None:
  268. # self.exclusive is either True or False, thus a new client is active here.
  269. # if it is False and we get here, the caller did not use exclusive=True although
  270. # it is needed for a write operation. if it is True and we get here, something else
  271. # went very wrong, because we should have a exclusive lock, but we don't.
  272. raise AssertionError("bug in code, exclusive lock should exist here")
  273. # if we are here, this is an old client talking to a new server (expecting lock upgrade).
  274. # or we are replaying segments and might need a lock upgrade for that.
  275. try:
  276. self.lock.upgrade()
  277. except (LockError, LockErrorT):
  278. # if upgrading the lock to exclusive fails, we do not have an
  279. # active transaction. this is important for "serve" mode, where
  280. # the repository instance lives on - even if exceptions happened.
  281. self._active_txn = False
  282. raise
  283. if not self.index or transaction_id is None:
  284. try:
  285. self.index = self.open_index(transaction_id, False)
  286. except RuntimeError:
  287. self.check_transaction()
  288. self.index = self.open_index(transaction_id, False)
  289. if transaction_id is None:
  290. self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x]
  291. self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x]
  292. self.shadow_index.clear()
  293. else:
  294. if do_cleanup:
  295. self.io.cleanup(transaction_id)
  296. hints_path = os.path.join(self.path, 'hints.%d' % transaction_id)
  297. index_path = os.path.join(self.path, 'index.%d' % transaction_id)
  298. try:
  299. with open(hints_path, 'rb') as fd:
  300. hints = msgpack.unpack(fd)
  301. except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError) as e:
  302. logger.warning('Repository hints file missing or corrupted, trying to recover')
  303. if not isinstance(e, FileNotFoundError):
  304. os.unlink(hints_path)
  305. # index must exist at this point
  306. os.unlink(index_path)
  307. self.check_transaction()
  308. self.prepare_txn(transaction_id)
  309. return
  310. if hints[b'version'] == 1:
  311. logger.debug('Upgrading from v1 hints.%d', transaction_id)
  312. self.segments = hints[b'segments']
  313. self.compact = FreeSpace()
  314. for segment in sorted(hints[b'compact']):
  315. logger.debug('Rebuilding sparse info for segment %d', segment)
  316. self._rebuild_sparse(segment)
  317. logger.debug('Upgrade to v2 hints complete')
  318. elif hints[b'version'] != 2:
  319. raise ValueError('Unknown hints file version: %d' % hints[b'version'])
  320. else:
  321. self.segments = hints[b'segments']
  322. self.compact = FreeSpace(hints[b'compact'])
  323. # Drop uncommitted segments in the shadow index
  324. for key, shadowed_segments in self.shadow_index.items():
  325. for segment in list(shadowed_segments):
  326. if segment > transaction_id:
  327. shadowed_segments.remove(segment)
  328. def write_index(self):
  329. hints = {b'version': 2,
  330. b'segments': self.segments,
  331. b'compact': self.compact}
  332. transaction_id = self.io.get_segments_transaction_id()
  333. assert transaction_id is not None
  334. hints_file = os.path.join(self.path, 'hints.%d' % transaction_id)
  335. with open(hints_file + '.tmp', 'wb') as fd:
  336. msgpack.pack(hints, fd)
  337. fd.flush()
  338. os.fsync(fd.fileno())
  339. os.rename(hints_file + '.tmp', hints_file)
  340. self.index.write(os.path.join(self.path, 'index.tmp'))
  341. os.rename(os.path.join(self.path, 'index.tmp'),
  342. os.path.join(self.path, 'index.%d' % transaction_id))
  343. if self.append_only:
  344. with open(os.path.join(self.path, 'transactions'), 'a') as log:
  345. print('transaction %d, UTC time %s' % (transaction_id, datetime.utcnow().isoformat()), file=log)
  346. # Remove old auxiliary files
  347. current = '.%d' % transaction_id
  348. for name in os.listdir(self.path):
  349. if not name.startswith(('index.', 'hints.')):
  350. continue
  351. if name.endswith(current):
  352. continue
  353. os.unlink(os.path.join(self.path, name))
  354. self.index = None
  355. def check_free_space(self):
  356. """Pre-commit check for sufficient free space to actually perform the commit."""
  357. # As a baseline we take four times the current (on-disk) index size.
  358. # At this point the index may only be updated by compaction, which won't resize it.
  359. # We still apply a factor of four so that a later, separate invocation can free space
  360. # (journaling all deletes for all chunks is one index size) or still make minor additions
  361. # (which may grow the index up to twice it's current size).
  362. # Note that in a subsequent operation the committed index is still on-disk, therefore we
  363. # arrive at index_size * (1 + 2 + 1).
  364. # In that order: journaled deletes (1), hashtable growth (2), persisted index (1).
  365. required_free_space = self.index.size() * 4
  366. # Conservatively estimate hints file size:
  367. # 10 bytes for each segment-refcount pair, 10 bytes for each segment-space pair
  368. # Assume maximum of 5 bytes per integer. Segment numbers will usually be packed more densely (1-3 bytes),
  369. # as will refcounts and free space integers. For 5 MiB segments this estimate is good to ~20 PB repo size.
  370. # Add 4K to generously account for constant format overhead.
  371. hints_size = len(self.segments) * 10 + len(self.compact) * 10 + 4096
  372. required_free_space += hints_size
  373. required_free_space += self.additional_free_space
  374. if not self.append_only:
  375. # Keep one full worst-case segment free in non-append-only mode
  376. required_free_space += self.max_segment_size + MAX_OBJECT_SIZE
  377. try:
  378. st_vfs = os.statvfs(self.path)
  379. except OSError as os_error:
  380. logger.warning('Failed to check free space before committing: ' + str(os_error))
  381. return
  382. # f_bavail: even as root - don't touch the Federal Block Reserve!
  383. free_space = st_vfs.f_bavail * st_vfs.f_bsize
  384. logger.debug('check_free_space: required bytes {}, free bytes {}'.format(required_free_space, free_space))
  385. if free_space < required_free_space:
  386. self.rollback(cleanup=True)
  387. formatted_required = format_file_size(required_free_space)
  388. formatted_free = format_file_size(free_space)
  389. raise self.InsufficientFreeSpaceError(formatted_required, formatted_free)
  390. def compact_segments(self):
  391. """Compact sparse segments by copying data into new segments
  392. """
  393. if not self.compact:
  394. return
  395. index_transaction_id = self.get_index_transaction_id()
  396. segments = self.segments
  397. unused = [] # list of segments, that are not used anymore
  398. logger = create_logger('borg.debug.compact_segments')
  399. def complete_xfer(intermediate=True):
  400. # complete the current transfer (when some target segment is full)
  401. nonlocal unused
  402. # commit the new, compact, used segments
  403. segment = self.io.write_commit(intermediate=intermediate)
  404. logger.debug('complete_xfer: wrote %scommit at segment %d', 'intermediate ' if intermediate else '', segment)
  405. # get rid of the old, sparse, unused segments. free space.
  406. for segment in unused:
  407. logger.debug('complete_xfer: deleting unused segment %d', segment)
  408. assert self.segments.pop(segment) == 0
  409. self.io.delete_segment(segment)
  410. del self.compact[segment]
  411. unused = []
  412. logger.debug('compaction started.')
  413. pi = ProgressIndicatorPercent(total=len(self.compact), msg='Compacting segments %3.0f%%', step=1)
  414. for segment, freeable_space in sorted(self.compact.items()):
  415. if not self.io.segment_exists(segment):
  416. logger.warning('segment %d not found, but listed in compaction data', segment)
  417. del self.compact[segment]
  418. pi.show()
  419. continue
  420. segment_size = self.io.segment_size(segment)
  421. if segment_size > 0.2 * self.max_segment_size and freeable_space < 0.15 * segment_size:
  422. logger.debug('not compacting segment %d (only %d bytes are sparse)', segment, freeable_space)
  423. pi.show()
  424. continue
  425. segments.setdefault(segment, 0)
  426. logger.debug('compacting segment %d with usage count %d and %d freeable bytes',
  427. segment, segments[segment], freeable_space)
  428. for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
  429. if tag == TAG_COMMIT:
  430. continue
  431. in_index = self.index.get(key)
  432. is_index_object = in_index == (segment, offset)
  433. if tag == TAG_PUT and is_index_object:
  434. try:
  435. new_segment, offset = self.io.write_put(key, data, raise_full=True)
  436. except LoggedIO.SegmentFull:
  437. complete_xfer()
  438. new_segment, offset = self.io.write_put(key, data)
  439. self.index[key] = new_segment, offset
  440. segments.setdefault(new_segment, 0)
  441. segments[new_segment] += 1
  442. segments[segment] -= 1
  443. elif tag == TAG_PUT and not is_index_object:
  444. # If this is a PUT shadowed by a later tag, then it will be gone when this segment is deleted after
  445. # this loop. Therefore it is removed from the shadow index.
  446. try:
  447. self.shadow_index[key].remove(segment)
  448. except (KeyError, ValueError):
  449. pass
  450. elif tag == TAG_DELETE and not in_index:
  451. # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
  452. # therefore we do not drop the delete, but write it to a current segment.
  453. shadowed_put_exists = key not in self.shadow_index or any(
  454. # If the key is in the shadow index and there is any segment with an older PUT of this
  455. # key, we have a shadowed put.
  456. shadowed < segment for shadowed in self.shadow_index[key])
  457. delete_is_not_stable = index_transaction_id is None or segment > index_transaction_id
  458. if shadowed_put_exists or delete_is_not_stable:
  459. # (introduced in 6425d16aa84be1eaaf88)
  460. # This is needed to avoid object un-deletion if we crash between the commit and the deletion
  461. # of old segments in complete_xfer().
  462. #
  463. # However, this only happens if the crash also affects the FS to the effect that file deletions
  464. # did not materialize consistently after journal recovery. If they always materialize in-order
  465. # then this is not a problem, because the old segment containing a deleted object would be deleted
  466. # before the segment containing the delete.
  467. #
  468. # Consider the following series of operations if we would not do this, ie. this entire if:
  469. # would be removed.
  470. # Columns are segments, lines are different keys (line 1 = some key, line 2 = some other key)
  471. # Legend: P=TAG_PUT, D=TAG_DELETE, c=commit, i=index is written for latest commit
  472. #
  473. # Segment | 1 | 2 | 3
  474. # --------+-------+-----+------
  475. # Key 1 | P | D |
  476. # Key 2 | P | | P
  477. # commits | c i | c | c i
  478. # --------+-------+-----+------
  479. # ^- compact_segments starts
  480. # ^- complete_xfer commits, after that complete_xfer deletes
  481. # segments 1 and 2 (and then the index would be written).
  482. #
  483. # Now we crash. But only segment 2 gets deleted, while segment 1 is still around. Now key 1
  484. # is suddenly undeleted (because the delete in segment 2 is now missing).
  485. # Again, note the requirement here. We delete these in the correct order that this doesn't happen,
  486. # and only if the FS materialization of these deletes is reordered or parts dropped this can happen.
  487. # In this case it doesn't cause outright corruption, 'just' an index count mismatch, which will be
  488. # fixed by borg-check --repair.
  489. #
  490. # Note that in this check the index state is the proxy for a "most definitely settled" repository state,
  491. # ie. the assumption is that *all* operations on segments <= index state are completed and stable.
  492. try:
  493. new_segment, size = self.io.write_delete(key, raise_full=True)
  494. except LoggedIO.SegmentFull:
  495. complete_xfer()
  496. new_segment, size = self.io.write_delete(key)
  497. self.compact[new_segment] += size
  498. segments.setdefault(new_segment, 0)
  499. assert segments[segment] == 0
  500. unused.append(segment)
  501. pi.show()
  502. pi.finish()
  503. complete_xfer(intermediate=False)
  504. logger.debug('compaction completed.')
  505. def replay_segments(self, index_transaction_id, segments_transaction_id):
  506. # fake an old client, so that in case we do not have an exclusive lock yet, prepare_txn will upgrade the lock:
  507. remember_exclusive = self.exclusive
  508. self.exclusive = None
  509. self.prepare_txn(index_transaction_id, do_cleanup=False)
  510. try:
  511. segment_count = sum(1 for _ in self.io.segment_iterator())
  512. pi = ProgressIndicatorPercent(total=segment_count, msg="Replaying segments %3.0f%%")
  513. for i, (segment, filename) in enumerate(self.io.segment_iterator()):
  514. pi.show(i)
  515. if index_transaction_id is not None and segment <= index_transaction_id:
  516. continue
  517. if segment > segments_transaction_id:
  518. break
  519. objects = self.io.iter_objects(segment)
  520. self._update_index(segment, objects)
  521. pi.finish()
  522. self.write_index()
  523. finally:
  524. self.exclusive = remember_exclusive
  525. self.rollback()
  526. def _update_index(self, segment, objects, report=None):
  527. """some code shared between replay_segments and check"""
  528. self.segments[segment] = 0
  529. for tag, key, offset, size in objects:
  530. if tag == TAG_PUT:
  531. try:
  532. # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
  533. s, _ = self.index[key]
  534. self.compact[s] += size
  535. self.segments[s] -= 1
  536. except KeyError:
  537. pass
  538. self.index[key] = segment, offset
  539. self.segments[segment] += 1
  540. elif tag == TAG_DELETE:
  541. try:
  542. # if the deleted PUT is not in the index, there is nothing to clean up
  543. s, offset = self.index.pop(key)
  544. except KeyError:
  545. pass
  546. else:
  547. if self.io.segment_exists(s):
  548. # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
  549. # is already gone, then it was already compacted.
  550. self.segments[s] -= 1
  551. size = self.io.read(s, offset, key, read_data=False)
  552. self.compact[s] += size
  553. elif tag == TAG_COMMIT:
  554. continue
  555. else:
  556. msg = 'Unexpected tag {} in segment {}'.format(tag, segment)
  557. if report is None:
  558. raise self.CheckNeeded(msg)
  559. else:
  560. report(msg)
  561. if self.segments[segment] == 0:
  562. self.compact[segment] += self.io.segment_size(segment)
  563. def _rebuild_sparse(self, segment):
  564. """Rebuild sparse bytes count for a single segment relative to the current index."""
  565. self.compact[segment] = 0
  566. if self.segments[segment] == 0:
  567. self.compact[segment] += self.io.segment_size(segment)
  568. return
  569. for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
  570. if tag == TAG_PUT:
  571. if self.index.get(key, (-1, -1)) != (segment, offset):
  572. # This PUT is superseded later
  573. self.compact[segment] += size
  574. elif tag == TAG_DELETE:
  575. # The outcome of the DELETE has been recorded in the PUT branch already
  576. self.compact[segment] += size
  577. def check(self, repair=False, save_space=False):
  578. """Check repository consistency
  579. This method verifies all segment checksums and makes sure
  580. the index is consistent with the data stored in the segments.
  581. """
  582. if self.append_only and repair:
  583. raise ValueError(self.path + " is in append-only mode")
  584. error_found = False
  585. def report_error(msg):
  586. nonlocal error_found
  587. error_found = True
  588. logger.error(msg)
  589. logger.info('Starting repository check')
  590. assert not self._active_txn
  591. try:
  592. transaction_id = self.get_transaction_id()
  593. current_index = self.open_index(transaction_id)
  594. logger.debug('Read committed index of transaction %d', transaction_id)
  595. except Exception as exc:
  596. transaction_id = self.io.get_segments_transaction_id()
  597. current_index = None
  598. logger.debug('Failed to read committed index (%s)', exc)
  599. if transaction_id is None:
  600. logger.debug('No segments transaction found')
  601. transaction_id = self.get_index_transaction_id()
  602. if transaction_id is None:
  603. logger.debug('No index transaction found, trying latest segment')
  604. transaction_id = self.io.get_latest_segment()
  605. if repair:
  606. self.io.cleanup(transaction_id)
  607. segments_transaction_id = self.io.get_segments_transaction_id()
  608. logger.debug('Segment transaction is %s', segments_transaction_id)
  609. logger.debug('Determined transaction is %s', transaction_id)
  610. self.prepare_txn(None) # self.index, self.compact, self.segments all empty now!
  611. segment_count = sum(1 for _ in self.io.segment_iterator())
  612. logger.debug('Found %d segments', segment_count)
  613. pi = ProgressIndicatorPercent(total=segment_count, msg="Checking segments %3.1f%%", step=0.1)
  614. for i, (segment, filename) in enumerate(self.io.segment_iterator()):
  615. pi.show(i)
  616. if segment > transaction_id:
  617. continue
  618. try:
  619. objects = list(self.io.iter_objects(segment))
  620. except IntegrityError as err:
  621. report_error(str(err))
  622. objects = []
  623. if repair:
  624. self.io.recover_segment(segment, filename)
  625. objects = list(self.io.iter_objects(segment))
  626. self._update_index(segment, objects, report_error)
  627. pi.finish()
  628. # self.index, self.segments, self.compact now reflect the state of the segment files up to <transaction_id>
  629. # We might need to add a commit tag if no committed segment is found
  630. if repair and segments_transaction_id is None:
  631. report_error('Adding commit tag to segment {}'.format(transaction_id))
  632. self.io.segment = transaction_id + 1
  633. self.io.write_commit()
  634. logger.info('Starting repository index check')
  635. if current_index and not repair:
  636. # current_index = "as found on disk"
  637. # self.index = "as rebuilt in-memory from segments"
  638. if len(current_index) != len(self.index):
  639. report_error('Index object count mismatch.')
  640. logger.error('committed index: %d objects', len(current_index))
  641. logger.error('rebuilt index: %d objects', len(self.index))
  642. line_format = '%-64s %-16s %-16s'
  643. not_found = '<not found>'
  644. logger.warning(line_format, 'ID', 'rebuilt index', 'committed index')
  645. for key, value in self.index.iteritems():
  646. current_value = current_index.get(key, not_found)
  647. if current_value != value:
  648. logger.warning(line_format, bin_to_hex(key), value, current_value)
  649. for key, current_value in current_index.iteritems():
  650. if key in self.index:
  651. continue
  652. value = self.index.get(key, not_found)
  653. if current_value != value:
  654. logger.warning(line_format, bin_to_hex(key), value, current_value)
  655. elif current_index:
  656. for key, value in self.index.iteritems():
  657. if current_index.get(key, (-1, -1)) != value:
  658. report_error('Index mismatch for key {}. {} != {}'.format(key, value, current_index.get(key, (-1, -1))))
  659. if repair:
  660. self.compact_segments()
  661. self.write_index()
  662. self.rollback()
  663. if error_found:
  664. if repair:
  665. logger.info('Completed repository check, errors found and repaired.')
  666. else:
  667. logger.error('Completed repository check, errors found.')
  668. else:
  669. logger.info('Completed repository check, no problems found.')
  670. return not error_found or repair
  671. def rollback(self, cleanup=False):
  672. """
  673. """
  674. if cleanup:
  675. self.io.cleanup(self.io.get_segments_transaction_id())
  676. self.index = None
  677. self._active_txn = False
  678. def __len__(self):
  679. if not self.index:
  680. self.index = self.open_index(self.get_transaction_id())
  681. return len(self.index)
  682. def __contains__(self, id):
  683. if not self.index:
  684. self.index = self.open_index(self.get_transaction_id())
  685. return id in self.index
  686. def list(self, limit=None, marker=None):
  687. """
  688. list <limit> IDs starting from after id <marker> - in index (pseudo-random) order.
  689. """
  690. if not self.index:
  691. self.index = self.open_index(self.get_transaction_id())
  692. return [id_ for id_, _ in islice(self.index.iteritems(marker=marker), limit)]
  693. def scan(self, limit=None, marker=None):
  694. """
  695. list <limit> IDs starting from after id <marker> - in on-disk order, so that a client
  696. fetching data in this order does linear reads and reuses stuff from disk cache.
  697. We rely on repository.check() has run already (either now or some time before) and that:
  698. - if we are called from a borg check command, self.index is a valid, fresh, in-sync repo index.
  699. - if we are called from elsewhere, either self.index or the on-disk index is valid and in-sync.
  700. - the repository segments are valid (no CRC errors).
  701. if we encounter CRC errors in segment entry headers, rest of segment is skipped.
  702. """
  703. if limit is not None and limit < 1:
  704. raise ValueError('please use limit > 0 or limit = None')
  705. if not self.index:
  706. transaction_id = self.get_transaction_id()
  707. self.index = self.open_index(transaction_id)
  708. at_start = marker is None
  709. # smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
  710. start_segment, start_offset = (0, 0) if at_start else self.index[marker]
  711. result = []
  712. for segment, filename in self.io.segment_iterator(start_segment):
  713. obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False, include_data=False)
  714. while True:
  715. try:
  716. tag, id, offset, size = next(obj_iterator)
  717. except (StopIteration, IntegrityError):
  718. # either end-of-segment or an error - we can not seek to objects at
  719. # higher offsets than one that has an error in the header fields
  720. break
  721. if start_offset > 0:
  722. # we are using a marker and the marker points to the last object we have already
  723. # returned in the previous scan() call - thus, we need to skip this one object.
  724. # also, for the next segment, we need to start at offset 0.
  725. start_offset = 0
  726. continue
  727. if tag == TAG_PUT and (segment, offset) == self.index.get(id):
  728. # we have found an existing and current object
  729. result.append(id)
  730. if len(result) == limit:
  731. return result
  732. return result
  733. def get(self, id_):
  734. if not self.index:
  735. self.index = self.open_index(self.get_transaction_id())
  736. try:
  737. segment, offset = self.index[id_]
  738. return self.io.read(segment, offset, id_)
  739. except KeyError:
  740. raise self.ObjectNotFound(id_, self.path) from None
  741. def get_many(self, ids, is_preloaded=False):
  742. for id_ in ids:
  743. yield self.get(id_)
  744. def put(self, id, data, wait=True):
  745. if not self._active_txn:
  746. self.prepare_txn(self.get_transaction_id())
  747. try:
  748. segment, offset = self.index[id]
  749. except KeyError:
  750. pass
  751. else:
  752. self.segments[segment] -= 1
  753. size = self.io.read(segment, offset, id, read_data=False)
  754. self.compact[segment] += size
  755. segment, size = self.io.write_delete(id)
  756. self.compact[segment] += size
  757. self.segments.setdefault(segment, 0)
  758. segment, offset = self.io.write_put(id, data)
  759. self.segments.setdefault(segment, 0)
  760. self.segments[segment] += 1
  761. self.index[id] = segment, offset
  762. def delete(self, id, wait=True):
  763. if not self._active_txn:
  764. self.prepare_txn(self.get_transaction_id())
  765. try:
  766. segment, offset = self.index.pop(id)
  767. except KeyError:
  768. raise self.ObjectNotFound(id, self.path) from None
  769. self.shadow_index.setdefault(id, []).append(segment)
  770. self.segments[segment] -= 1
  771. size = self.io.read(segment, offset, id, read_data=False)
  772. self.compact[segment] += size
  773. segment, size = self.io.write_delete(id)
  774. self.compact[segment] += size
  775. self.segments.setdefault(segment, 0)
  776. def preload(self, ids):
  777. """Preload objects (only applies to remote repositories)
  778. """
  779. class LoggedIO:
  780. class SegmentFull(Exception):
  781. """raised when a segment is full, before opening next"""
  782. header_fmt = struct.Struct('<IIB')
  783. assert header_fmt.size == 9
  784. put_header_fmt = struct.Struct('<IIB32s')
  785. assert put_header_fmt.size == 41
  786. header_no_crc_fmt = struct.Struct('<IB')
  787. assert header_no_crc_fmt.size == 5
  788. crc_fmt = struct.Struct('<I')
  789. assert crc_fmt.size == 4
  790. _commit = header_no_crc_fmt.pack(9, TAG_COMMIT)
  791. COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
  792. def __init__(self, path, limit, segments_per_dir, capacity=90):
  793. self.path = path
  794. self.fds = LRUCache(capacity,
  795. dispose=self.close_fd)
  796. self.segment = 0
  797. self.limit = limit
  798. self.segments_per_dir = segments_per_dir
  799. self.offset = 0
  800. self._write_fd = None
  801. def close(self):
  802. self.close_segment()
  803. self.fds.clear()
  804. self.fds = None # Just to make sure we're disabled
  805. def close_fd(self, fd):
  806. if hasattr(os, 'posix_fadvise'): # only on UNIX
  807. os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
  808. fd.close()
  809. def segment_iterator(self, segment=None, reverse=False):
  810. if segment is None:
  811. segment = 0 if not reverse else 2 ** 32 - 1
  812. data_path = os.path.join(self.path, 'data')
  813. start_segment_dir = segment // self.segments_per_dir
  814. dirs = os.listdir(data_path)
  815. if not reverse:
  816. dirs = [dir for dir in dirs if dir.isdigit() and int(dir) >= start_segment_dir]
  817. else:
  818. dirs = [dir for dir in dirs if dir.isdigit() and int(dir) <= start_segment_dir]
  819. dirs = sorted(dirs, key=int, reverse=reverse)
  820. for dir in dirs:
  821. filenames = os.listdir(os.path.join(data_path, dir))
  822. if not reverse:
  823. filenames = [filename for filename in filenames if filename.isdigit() and int(filename) >= segment]
  824. else:
  825. filenames = [filename for filename in filenames if filename.isdigit() and int(filename) <= segment]
  826. filenames = sorted(filenames, key=int, reverse=reverse)
  827. for filename in filenames:
  828. yield int(filename), os.path.join(data_path, dir, filename)
  829. def get_latest_segment(self):
  830. for segment, filename in self.segment_iterator(reverse=True):
  831. return segment
  832. return None
  833. def get_segments_transaction_id(self):
  834. """Return the last committed segment.
  835. """
  836. for segment, filename in self.segment_iterator(reverse=True):
  837. if self.is_committed_segment(segment):
  838. return segment
  839. return None
  840. def cleanup(self, transaction_id):
  841. """Delete segment files left by aborted transactions
  842. """
  843. self.segment = transaction_id + 1
  844. for segment, filename in self.segment_iterator(reverse=True):
  845. if segment > transaction_id:
  846. os.unlink(filename)
  847. else:
  848. break
  849. def is_committed_segment(self, segment):
  850. """Check if segment ends with a COMMIT_TAG tag
  851. """
  852. try:
  853. iterator = self.iter_objects(segment)
  854. except IntegrityError:
  855. return False
  856. with open(self.segment_filename(segment), 'rb') as fd:
  857. try:
  858. fd.seek(-self.header_fmt.size, os.SEEK_END)
  859. except OSError as e:
  860. # return False if segment file is empty or too small
  861. if e.errno == errno.EINVAL:
  862. return False
  863. raise e
  864. if fd.read(self.header_fmt.size) != self.COMMIT:
  865. return False
  866. seen_commit = False
  867. while True:
  868. try:
  869. tag, key, offset, _ = next(iterator)
  870. except IntegrityError:
  871. return False
  872. except StopIteration:
  873. break
  874. if tag == TAG_COMMIT:
  875. seen_commit = True
  876. continue
  877. if seen_commit:
  878. return False
  879. return seen_commit
  880. def segment_filename(self, segment):
  881. return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
  882. def get_write_fd(self, no_new=False, raise_full=False):
  883. if not no_new and self.offset and self.offset > self.limit:
  884. if raise_full:
  885. raise self.SegmentFull
  886. self.close_segment()
  887. if not self._write_fd:
  888. if self.segment % self.segments_per_dir == 0:
  889. dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
  890. if not os.path.exists(dirname):
  891. os.mkdir(dirname)
  892. sync_dir(os.path.join(self.path, 'data'))
  893. self._write_fd = SyncFile(self.segment_filename(self.segment), binary=True)
  894. self._write_fd.write(MAGIC)
  895. self.offset = MAGIC_LEN
  896. return self._write_fd
  897. def get_fd(self, segment):
  898. try:
  899. return self.fds[segment]
  900. except KeyError:
  901. fd = open(self.segment_filename(segment), 'rb')
  902. self.fds[segment] = fd
  903. return fd
  904. def close_segment(self):
  905. if self._write_fd:
  906. self.segment += 1
  907. self.offset = 0
  908. self._write_fd.close()
  909. self._write_fd = None
  910. def delete_segment(self, segment):
  911. if segment in self.fds:
  912. del self.fds[segment]
  913. try:
  914. os.unlink(self.segment_filename(segment))
  915. except FileNotFoundError:
  916. pass
  917. def segment_exists(self, segment):
  918. return os.path.exists(self.segment_filename(segment))
  919. def segment_size(self, segment):
  920. return os.path.getsize(self.segment_filename(segment))
  921. def iter_objects(self, segment, offset=0, include_data=False, read_data=True):
  922. """
  923. Return object iterator for *segment*.
  924. If read_data is False then include_data must be False as well.
  925. Integrity checks are skipped: all data obtained from the iterator must be considered informational.
  926. The iterator returns four-tuples of (tag, key, offset, data|size).
  927. """
  928. fd = self.get_fd(segment)
  929. fd.seek(offset)
  930. if offset == 0:
  931. # we are touching this segment for the first time, check the MAGIC.
  932. # Repository.scan() calls us with segment > 0 when it continues an ongoing iteration
  933. # from a marker position - but then we have checked the magic before already.
  934. if fd.read(MAGIC_LEN) != MAGIC:
  935. raise IntegrityError('Invalid segment magic [segment {}, offset {}]'.format(segment, 0))
  936. offset = MAGIC_LEN
  937. header = fd.read(self.header_fmt.size)
  938. while header:
  939. size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset,
  940. (TAG_PUT, TAG_DELETE, TAG_COMMIT),
  941. read_data=read_data)
  942. if include_data:
  943. yield tag, key, offset, data
  944. else:
  945. yield tag, key, offset, size
  946. offset += size
  947. # we must get the fd via get_fd() here again as we yielded to our caller and it might
  948. # have triggered closing of the fd we had before (e.g. by calling io.read() for
  949. # different segment(s)).
  950. # by calling get_fd() here again we also make our fd "recently used" so it likely
  951. # does not get kicked out of self.fds LRUcache.
  952. fd = self.get_fd(segment)
  953. fd.seek(offset)
  954. header = fd.read(self.header_fmt.size)
  955. def recover_segment(self, segment, filename):
  956. if segment in self.fds:
  957. del self.fds[segment]
  958. with open(filename, 'rb') as fd:
  959. data = memoryview(fd.read())
  960. os.rename(filename, filename + '.beforerecover')
  961. logger.info('attempting to recover ' + filename)
  962. with open(filename, 'wb') as fd:
  963. fd.write(MAGIC)
  964. while len(data) >= self.header_fmt.size:
  965. crc, size, tag = self.header_fmt.unpack(data[:self.header_fmt.size])
  966. if size < self.header_fmt.size or size > len(data):
  967. data = data[1:]
  968. continue
  969. if crc32(data[4:size]) & 0xffffffff != crc:
  970. data = data[1:]
  971. continue
  972. fd.write(data[:size])
  973. data = data[size:]
  974. def read(self, segment, offset, id, read_data=True):
  975. """
  976. Read entry from *segment* at *offset* with *id*.
  977. If read_data is False the size of the entry is returned instead and integrity checks are skipped.
  978. The return value should thus be considered informational.
  979. """
  980. if segment == self.segment and self._write_fd:
  981. self._write_fd.sync()
  982. fd = self.get_fd(segment)
  983. fd.seek(offset)
  984. header = fd.read(self.put_header_fmt.size)
  985. size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ), read_data)
  986. if id != key:
  987. raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
  988. segment, offset))
  989. return data if read_data else size
  990. def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=True):
  991. # some code shared by read() and iter_objects()
  992. try:
  993. hdr_tuple = fmt.unpack(header)
  994. except struct.error as err:
  995. raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format(
  996. segment, offset, err)) from None
  997. if fmt is self.put_header_fmt:
  998. crc, size, tag, key = hdr_tuple
  999. elif fmt is self.header_fmt:
  1000. crc, size, tag = hdr_tuple
  1001. key = None
  1002. else:
  1003. raise TypeError("_read called with unsupported format")
  1004. if size > MAX_OBJECT_SIZE:
  1005. # if you get this on an archive made with borg < 1.0.7 and millions of files and
  1006. # you need to restore it, you can disable this check by using "if False:" above.
  1007. raise IntegrityError('Invalid segment entry size {} - too big [segment {}, offset {}]'.format(
  1008. size, segment, offset))
  1009. if size < fmt.size:
  1010. raise IntegrityError('Invalid segment entry size {} - too small [segment {}, offset {}]'.format(
  1011. size, segment, offset))
  1012. length = size - fmt.size
  1013. if read_data:
  1014. data = fd.read(length)
  1015. if len(data) != length:
  1016. raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
  1017. segment, offset, length, len(data)))
  1018. if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
  1019. raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
  1020. segment, offset))
  1021. if key is None and tag in (TAG_PUT, TAG_DELETE):
  1022. key, data = data[:32], data[32:]
  1023. else:
  1024. if key is None and tag in (TAG_PUT, TAG_DELETE):
  1025. key = fd.read(32)
  1026. length -= 32
  1027. if len(key) != 32:
  1028. raise IntegrityError('Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
  1029. segment, offset, 32, len(key)))
  1030. oldpos = fd.tell()
  1031. seeked = fd.seek(length, os.SEEK_CUR) - oldpos
  1032. data = None
  1033. if seeked != length:
  1034. raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format(
  1035. segment, offset, length, seeked))
  1036. if tag not in acceptable_tags:
  1037. raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
  1038. segment, offset))
  1039. return size, tag, key, data
  1040. def write_put(self, id, data, raise_full=False):
  1041. data_size = len(data)
  1042. if data_size > MAX_DATA_SIZE:
  1043. # this would push the segment entry size beyond MAX_OBJECT_SIZE.
  1044. raise IntegrityError('More than allowed put data [{} > {}]'.format(data_size, MAX_DATA_SIZE))
  1045. fd = self.get_write_fd(raise_full=raise_full)
  1046. size = data_size + self.put_header_fmt.size
  1047. offset = self.offset
  1048. header = self.header_no_crc_fmt.pack(size, TAG_PUT)
  1049. crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
  1050. fd.write(b''.join((crc, header, id, data)))
  1051. self.offset += size
  1052. return self.segment, offset
  1053. def write_delete(self, id, raise_full=False):
  1054. fd = self.get_write_fd(raise_full=raise_full)
  1055. header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
  1056. crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
  1057. fd.write(b''.join((crc, header, id)))
  1058. self.offset += self.put_header_fmt.size
  1059. return self.segment, self.put_header_fmt.size
  1060. def write_commit(self, intermediate=False):
  1061. if intermediate:
  1062. # Intermediate commits go directly into the current segment - this makes checking their validity more
  1063. # expensive, but is faster and reduces clobber.
  1064. fd = self.get_write_fd()
  1065. fd.sync()
  1066. else:
  1067. self.close_segment()
  1068. fd = self.get_write_fd()
  1069. header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
  1070. crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
  1071. fd.write(b''.join((crc, header)))
  1072. self.close_segment()
  1073. return self.segment - 1 # close_segment() increments it
  1074. MAX_DATA_SIZE = MAX_OBJECT_SIZE - LoggedIO.put_header_fmt.size