repository.py 37 KB


  1. from configparser import ConfigParser
  2. from binascii import unhexlify
  3. from datetime import datetime
  4. from itertools import islice
  5. import errno
  6. import logging
  7. logger = logging.getLogger(__name__)
  8. import os
  9. import shutil
  10. import struct
  11. from collections import defaultdict
  12. from functools import partial
  13. from zlib import crc32
  14. import msgpack
  15. from .constants import * # NOQA
  16. from .helpers import Error, ErrorWithTraceback, IntegrityError, InternalOSError, Location, ProgressIndicatorPercent, \
  17. bin_to_hex
  18. from .hashindex import NSIndex
  19. from .locking import UpgradableLock, LockError, LockErrorT
  20. from .lrucache import LRUCache
  21. from .platform import SyncFile, sync_dir
  22. MAX_OBJECT_SIZE = 20 * 1024 * 1024
  23. MAGIC = b'BORG_SEG'
  24. MAGIC_LEN = len(MAGIC)
  25. TAG_PUT = 0
  26. TAG_DELETE = 1
  27. TAG_COMMIT = 2
  28. FreeSpace = partial(defaultdict, int)
  29. class Repository:
  30. """
  31. Filesystem based transactional key value store
  32. Transactionality is achieved by using a log (aka journal) to record changes. The log is a series of numbered files
  33. called segments. Each segment is a series of log entries. The segment number together with the offset of each
  34. entry relative to its segment start establishes an ordering of the log entries. This is the "definition" of
  35. time for the purposes of the log.
  36. Log entries are either PUT, DELETE or COMMIT.
  37. A COMMIT is always the final log entry in a segment and marks all data from the beginning of the log until the
  38. segment ending with the COMMIT as committed and consistent. The segment number of a segment ending with a COMMIT
  39. is called the transaction ID of that commit, and a segment ending with a COMMIT is called committed.
  40. When reading from a repository it is first checked whether the last segment is committed. If it is not, then
  41. all segments after the last committed segment are deleted; they contain log entries whose consistency is not
  42. established by a COMMIT.
  43. Note that the COMMIT can't establish consistency by itself, but only manages to do so with proper support from
  44. the platform (including the hardware). See platform_base.SyncFile for details.
  45. A PUT inserts a key-value pair. The value is stored in the log entry, hence the repository implements
  46. full data logging, meaning that all data is consistent, not just metadata (which is common in file systems).
  47. A DELETE marks a key as deleted.
  48. For a given key only the last entry regarding the key, which is called current (all other entries are called
  49. superseded), is relevant: If there is no entry or the last entry is a DELETE then the key does not exist.
  50. Otherwise the last PUT defines the value of the key.
  51. By superseding a PUT (with either another PUT or a DELETE) the log entry becomes obsolete. A segment containing
  52. such obsolete entries is called sparse, while a segment containing no such entries is called compact.
  53. Sparse segments can be compacted and thereby disk space freed. This destroys the transaction for which the
  54. superseded entries where current.
  55. On disk layout:
  56. dir/README
  57. dir/config
  58. dir/data/<X // SEGMENTS_PER_DIR>/<X>
  59. dir/index.X
  60. dir/hints.X
  61. """
  62. class DoesNotExist(Error):
  63. """Repository {} does not exist."""
  64. class AlreadyExists(Error):
  65. """Repository {} already exists."""
  66. class InvalidRepository(Error):
  67. """{} is not a valid repository. Check repo config."""
  68. class CheckNeeded(ErrorWithTraceback):
  69. """Inconsistency detected. Please run "borg check {}"."""
  70. class ObjectNotFound(ErrorWithTraceback):
  71. """Object with key {} not found in repository {}."""
  72. def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=True):
  73. self.path = os.path.abspath(path)
  74. self._location = Location('file://%s' % self.path)
  75. self.io = None
  76. self.lock = None
  77. self.index = None
  78. self._active_txn = False
  79. self.lock_wait = lock_wait
  80. self.do_lock = lock
  81. self.do_create = create
  82. self.exclusive = exclusive
  83. def __del__(self):
  84. if self.lock:
  85. self.close()
  86. assert False, "cleanup happened in Repository.__del__"
  87. def __repr__(self):
  88. return '<%s %s>' % (self.__class__.__name__, self.path)
  89. def __enter__(self):
  90. if self.do_create:
  91. self.do_create = False
  92. self.create(self.path)
  93. self.open(self.path, self.exclusive, lock_wait=self.lock_wait, lock=self.do_lock)
  94. return self
  95. def __exit__(self, exc_type, exc_val, exc_tb):
  96. if exc_type is not None:
  97. self.rollback()
  98. self.close()
  99. @property
  100. def id_str(self):
  101. return bin_to_hex(self.id)
  102. def create(self, path):
  103. """Create a new empty repository at `path`
  104. """
  105. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  106. raise self.AlreadyExists(path)
  107. if not os.path.exists(path):
  108. os.mkdir(path)
  109. with open(os.path.join(path, 'README'), 'w') as fd:
  110. fd.write('This is a Borg repository\n')
  111. os.mkdir(os.path.join(path, 'data'))
  112. config = ConfigParser(interpolation=None)
  113. config.add_section('repository')
  114. config.set('repository', 'version', '1')
  115. config.set('repository', 'segments_per_dir', str(DEFAULT_SEGMENTS_PER_DIR))
  116. config.set('repository', 'max_segment_size', str(DEFAULT_MAX_SEGMENT_SIZE))
  117. config.set('repository', 'append_only', '0')
  118. config.set('repository', 'id', bin_to_hex(os.urandom(32)))
  119. self.save_config(path, config)
  120. def save_config(self, path, config):
  121. config_path = os.path.join(path, 'config')
  122. with open(config_path, 'w') as fd:
  123. config.write(fd)
  124. def save_key(self, keydata):
  125. assert self.config
  126. keydata = keydata.decode('utf-8') # remote repo: msgpack issue #99, getting bytes
  127. self.config.set('repository', 'key', keydata)
  128. self.save_config(self.path, self.config)
  129. def load_key(self):
  130. keydata = self.config.get('repository', 'key')
  131. return keydata.encode('utf-8') # remote repo: msgpack issue #99, returning bytes
  132. def destroy(self):
  133. """Destroy the repository at `self.path`
  134. """
  135. if self.append_only:
  136. raise ValueError(self.path + " is in append-only mode")
  137. self.close()
  138. os.remove(os.path.join(self.path, 'config')) # kill config first
  139. shutil.rmtree(self.path)
  140. def get_index_transaction_id(self):
  141. indices = sorted((int(name[6:]) for name in os.listdir(self.path) if name.startswith('index.') and name[6:].isdigit()))
  142. if indices:
  143. return indices[-1]
  144. else:
  145. return None
  146. def check_transaction(self):
  147. index_transaction_id = self.get_index_transaction_id()
  148. segments_transaction_id = self.io.get_segments_transaction_id()
  149. if index_transaction_id is not None and segments_transaction_id is None:
  150. raise self.CheckNeeded(self.path)
  151. # Attempt to automatically rebuild index if we crashed between commit
  152. # tag write and index save
  153. if index_transaction_id != segments_transaction_id:
  154. if index_transaction_id is not None and index_transaction_id > segments_transaction_id:
  155. replay_from = None
  156. else:
  157. replay_from = index_transaction_id
  158. self.replay_segments(replay_from, segments_transaction_id)
  159. def get_transaction_id(self):
  160. self.check_transaction()
  161. return self.get_index_transaction_id()
  162. def break_lock(self):
  163. UpgradableLock(os.path.join(self.path, 'lock')).break_lock()
  164. def open(self, path, exclusive, lock_wait=None, lock=True):
  165. self.path = path
  166. if not os.path.isdir(path):
  167. raise self.DoesNotExist(path)
  168. if lock:
  169. self.lock = UpgradableLock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait).acquire()
  170. else:
  171. self.lock = None
  172. self.config = ConfigParser(interpolation=None)
  173. self.config.read(os.path.join(self.path, 'config'))
  174. if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1:
  175. raise self.InvalidRepository(path)
  176. self.max_segment_size = self.config.getint('repository', 'max_segment_size')
  177. self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
  178. self.append_only = self.config.getboolean('repository', 'append_only', fallback=False)
  179. self.id = unhexlify(self.config.get('repository', 'id').strip())
  180. self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
  181. def close(self):
  182. if self.lock:
  183. if self.io:
  184. self.io.close()
  185. self.io = None
  186. self.lock.release()
  187. self.lock = None
  188. def commit(self, save_space=False):
  189. """Commit transaction
  190. """
  191. self.io.write_commit()
  192. if not self.append_only:
  193. self.compact_segments(save_space=save_space)
  194. self.write_index()
  195. self.rollback()
  196. def open_index(self, transaction_id, auto_recover=True):
  197. if transaction_id is None:
  198. return NSIndex()
  199. index_path = os.path.join(self.path, 'index.%d' % transaction_id).encode('utf-8')
  200. try:
  201. return NSIndex.read(index_path)
  202. except RuntimeError as error:
  203. assert str(error) == 'hashindex_read failed' # everything else means we're in *deep* trouble
  204. logger.warning('Repository index missing or corrupted, trying to recover')
  205. try:
  206. os.unlink(index_path)
  207. except OSError as e:
  208. raise InternalOSError(e) from None
  209. if not auto_recover:
  210. raise
  211. self.prepare_txn(self.get_transaction_id())
  212. # don't leave an open transaction around
  213. self.commit()
  214. return self.open_index(self.get_transaction_id())
  215. except OSError as e:
  216. raise InternalOSError(e) from None
  217. def prepare_txn(self, transaction_id, do_cleanup=True):
  218. self._active_txn = True
  219. try:
  220. self.lock.upgrade()
  221. except (LockError, LockErrorT):
  222. # if upgrading the lock to exclusive fails, we do not have an
  223. # active transaction. this is important for "serve" mode, where
  224. # the repository instance lives on - even if exceptions happened.
  225. self._active_txn = False
  226. raise
  227. if not self.index or transaction_id is None:
  228. try:
  229. self.index = self.open_index(transaction_id, False)
  230. except RuntimeError:
  231. self.check_transaction()
  232. self.index = self.open_index(transaction_id, False)
  233. if transaction_id is None:
  234. self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x]
  235. self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x]
  236. else:
  237. if do_cleanup:
  238. self.io.cleanup(transaction_id)
  239. hints_path = os.path.join(self.path, 'hints.%d' % transaction_id)
  240. index_path = os.path.join(self.path, 'index.%d' % transaction_id)
  241. try:
  242. with open(hints_path, 'rb') as fd:
  243. hints = msgpack.unpack(fd)
  244. except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError) as e:
  245. logger.warning('Repository hints file missing or corrupted, trying to recover')
  246. if not isinstance(e, FileNotFoundError):
  247. os.unlink(hints_path)
  248. # index must exist at this point
  249. os.unlink(index_path)
  250. self.check_transaction()
  251. self.prepare_txn(transaction_id)
  252. return
  253. except OSError as os_error:
  254. raise InternalOSError(os_error) from None
  255. if hints[b'version'] == 1:
  256. logger.debug('Upgrading from v1 hints.%d', transaction_id)
  257. self.segments = hints[b'segments']
  258. self.compact = FreeSpace()
  259. for segment in sorted(hints[b'compact']):
  260. logger.debug('Rebuilding sparse info for segment %d', segment)
  261. self._rebuild_sparse(segment)
  262. logger.debug('Upgrade to v2 hints complete')
  263. elif hints[b'version'] != 2:
  264. raise ValueError('Unknown hints file version: %d' % hints[b'version'])
  265. else:
  266. self.segments = hints[b'segments']
  267. self.compact = FreeSpace(hints[b'compact'])
  268. def write_index(self):
  269. hints = {b'version': 2,
  270. b'segments': self.segments,
  271. b'compact': self.compact}
  272. transaction_id = self.io.get_segments_transaction_id()
  273. hints_file = os.path.join(self.path, 'hints.%d' % transaction_id)
  274. with open(hints_file + '.tmp', 'wb') as fd:
  275. msgpack.pack(hints, fd)
  276. fd.flush()
  277. os.fsync(fd.fileno())
  278. os.rename(hints_file + '.tmp', hints_file)
  279. self.index.write(os.path.join(self.path, 'index.tmp'))
  280. os.rename(os.path.join(self.path, 'index.tmp'),
  281. os.path.join(self.path, 'index.%d' % transaction_id))
  282. if self.append_only:
  283. with open(os.path.join(self.path, 'transactions'), 'a') as log:
  284. print('transaction %d, UTC time %s' % (transaction_id, datetime.utcnow().isoformat()), file=log)
  285. # Remove old auxiliary files
  286. current = '.%d' % transaction_id
  287. for name in os.listdir(self.path):
  288. if not name.startswith(('index.', 'hints.')):
  289. continue
  290. if name.endswith(current):
  291. continue
  292. os.unlink(os.path.join(self.path, name))
  293. self.index = None
  294. def compact_segments(self, save_space=False):
  295. """Compact sparse segments by copying data into new segments
  296. """
  297. if not self.compact:
  298. return
  299. index_transaction_id = self.get_index_transaction_id()
  300. segments = self.segments
  301. unused = [] # list of segments, that are not used anymore
  302. def complete_xfer():
  303. # complete the transfer (usually exactly when some target segment
  304. # is full, or at the very end when everything is processed)
  305. nonlocal unused
  306. # commit the new, compact, used segments
  307. self.io.write_commit()
  308. # get rid of the old, sparse, unused segments. free space.
  309. for segment in unused:
  310. assert self.segments.pop(segment) == 0
  311. self.io.delete_segment(segment)
  312. del self.compact[segment]
  313. unused = []
  314. for segment, freeable_space in sorted(self.compact.items()):
  315. if not self.io.segment_exists(segment):
  316. del self.compact[segment]
  317. continue
  318. segment_size = self.io.segment_size(segment)
  319. if segment_size > 0.2 * self.max_segment_size and freeable_space < 0.15 * segment_size:
  320. logger.debug('not compacting segment %d for later (only %d bytes are sparse)',
  321. segment, freeable_space)
  322. continue
  323. segments.setdefault(segment, 0)
  324. for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
  325. if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
  326. try:
  327. new_segment, offset = self.io.write_put(key, data, raise_full=save_space)
  328. except LoggedIO.SegmentFull:
  329. complete_xfer()
  330. new_segment, offset = self.io.write_put(key, data)
  331. self.index[key] = new_segment, offset
  332. segments.setdefault(new_segment, 0)
  333. segments[new_segment] += 1
  334. segments[segment] -= 1
  335. elif tag == TAG_DELETE:
  336. if index_transaction_id is None or segment > index_transaction_id:
  337. try:
  338. self.io.write_delete(key, raise_full=save_space)
  339. except LoggedIO.SegmentFull:
  340. complete_xfer()
  341. self.io.write_delete(key)
  342. assert segments[segment] == 0
  343. unused.append(segment)
  344. complete_xfer()
  345. def replay_segments(self, index_transaction_id, segments_transaction_id):
  346. self.prepare_txn(index_transaction_id, do_cleanup=False)
  347. try:
  348. segment_count = sum(1 for _ in self.io.segment_iterator())
  349. pi = ProgressIndicatorPercent(total=segment_count, msg="Replaying segments %3.0f%%", same_line=True)
  350. for i, (segment, filename) in enumerate(self.io.segment_iterator()):
  351. pi.show(i)
  352. if index_transaction_id is not None and segment <= index_transaction_id:
  353. continue
  354. if segment > segments_transaction_id:
  355. break
  356. objects = self.io.iter_objects(segment)
  357. self._update_index(segment, objects)
  358. pi.finish()
  359. self.write_index()
  360. finally:
  361. self.rollback()
  362. def _update_index(self, segment, objects, report=None):
  363. """some code shared between replay_segments and check"""
  364. self.segments[segment] = 0
  365. for tag, key, offset, size in objects:
  366. if tag == TAG_PUT:
  367. try:
  368. # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
  369. s, _ = self.index[key]
  370. self.compact[s] += size
  371. self.segments[s] -= 1
  372. except KeyError:
  373. pass
  374. self.index[key] = segment, offset
  375. self.segments[segment] += 1
  376. elif tag == TAG_DELETE:
  377. try:
  378. # if the deleted PUT is not in the index, there is nothing to clean up
  379. s, offset = self.index.pop(key)
  380. except KeyError:
  381. pass
  382. else:
  383. if self.io.segment_exists(s):
  384. # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
  385. # is already gone, then it was already compacted.
  386. self.segments[s] -= 1
  387. size = self.io.read(s, offset, key, read_data=False)
  388. self.compact[s] += size
  389. elif tag == TAG_COMMIT:
  390. continue
  391. else:
  392. msg = 'Unexpected tag {} in segment {}'.format(tag, segment)
  393. if report is None:
  394. raise self.CheckNeeded(msg)
  395. else:
  396. report(msg)
  397. if self.segments[segment] == 0:
  398. self.compact[segment] += self.io.segment_size(segment)
  399. def _rebuild_sparse(self, segment):
  400. """Rebuild sparse bytes count for a single segment relative to the current index."""
  401. self.compact[segment] = 0
  402. if self.segments[segment] == 0:
  403. self.compact[segment] += self.io.segment_size(segment)
  404. return
  405. for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
  406. if tag == TAG_PUT:
  407. if self.index.get(key, (-1, -1)) != (segment, offset):
  408. # This PUT is superseded later
  409. self.compact[segment] += size
  410. elif tag == TAG_DELETE:
  411. # The outcome of the DELETE has been recorded in the PUT branch already
  412. self.compact[segment] += size
  413. def check(self, repair=False, save_space=False):
  414. """Check repository consistency
  415. This method verifies all segment checksums and makes sure
  416. the index is consistent with the data stored in the segments.
  417. """
  418. if self.append_only and repair:
  419. raise ValueError(self.path + " is in append-only mode")
  420. error_found = False
  421. def report_error(msg):
  422. nonlocal error_found
  423. error_found = True
  424. logger.error(msg)
  425. logger.info('Starting repository check')
  426. assert not self._active_txn
  427. try:
  428. transaction_id = self.get_transaction_id()
  429. current_index = self.open_index(transaction_id)
  430. except Exception:
  431. transaction_id = self.io.get_segments_transaction_id()
  432. current_index = None
  433. if transaction_id is None:
  434. transaction_id = self.get_index_transaction_id()
  435. if transaction_id is None:
  436. transaction_id = self.io.get_latest_segment()
  437. if repair:
  438. self.io.cleanup(transaction_id)
  439. segments_transaction_id = self.io.get_segments_transaction_id()
  440. self.prepare_txn(None) # self.index, self.compact, self.segments all empty now!
  441. segment_count = sum(1 for _ in self.io.segment_iterator())
  442. pi = ProgressIndicatorPercent(total=segment_count, msg="Checking segments %3.1f%%", step=0.1, same_line=True)
  443. for i, (segment, filename) in enumerate(self.io.segment_iterator()):
  444. pi.show(i)
  445. if segment > transaction_id:
  446. continue
  447. try:
  448. objects = list(self.io.iter_objects(segment))
  449. except IntegrityError as err:
  450. report_error(str(err))
  451. objects = []
  452. if repair:
  453. self.io.recover_segment(segment, filename)
  454. objects = list(self.io.iter_objects(segment))
  455. self._update_index(segment, objects, report_error)
  456. pi.finish()
  457. # self.index, self.segments, self.compact now reflect the state of the segment files up to <transaction_id>
  458. # We might need to add a commit tag if no committed segment is found
  459. if repair and segments_transaction_id is None:
  460. report_error('Adding commit tag to segment {}'.format(transaction_id))
  461. self.io.segment = transaction_id + 1
  462. self.io.write_commit()
  463. if current_index and not repair:
  464. # current_index = "as found on disk"
  465. # self.index = "as rebuilt in-memory from segments"
  466. if len(current_index) != len(self.index):
  467. report_error('Index object count mismatch. {} != {}'.format(len(current_index), len(self.index)))
  468. elif current_index:
  469. for key, value in self.index.iteritems():
  470. if current_index.get(key, (-1, -1)) != value:
  471. report_error('Index mismatch for key {}. {} != {}'.format(key, value, current_index.get(key, (-1, -1))))
  472. if repair:
  473. self.compact_segments(save_space=save_space)
  474. self.write_index()
  475. self.rollback()
  476. if error_found:
  477. if repair:
  478. logger.info('Completed repository check, errors found and repaired.')
  479. else:
  480. logger.error('Completed repository check, errors found.')
  481. else:
  482. logger.info('Completed repository check, no problems found.')
  483. return not error_found or repair
  484. def rollback(self):
  485. """
  486. """
  487. self.index = None
  488. self._active_txn = False
  489. def __len__(self):
  490. if not self.index:
  491. self.index = self.open_index(self.get_transaction_id())
  492. return len(self.index)
  493. def __contains__(self, id):
  494. if not self.index:
  495. self.index = self.open_index(self.get_transaction_id())
  496. return id in self.index
  497. def list(self, limit=None, marker=None):
  498. if not self.index:
  499. self.index = self.open_index(self.get_transaction_id())
  500. return [id_ for id_, _ in islice(self.index.iteritems(marker=marker), limit)]
  501. def get(self, id_):
  502. if not self.index:
  503. self.index = self.open_index(self.get_transaction_id())
  504. try:
  505. segment, offset = self.index[id_]
  506. return self.io.read(segment, offset, id_)
  507. except KeyError:
  508. raise self.ObjectNotFound(id_, self.path) from None
  509. def get_many(self, ids, is_preloaded=False):
  510. for id_ in ids:
  511. yield self.get(id_)
  512. def put(self, id, data, wait=True):
  513. if not self._active_txn:
  514. self.prepare_txn(self.get_transaction_id())
  515. try:
  516. segment, offset = self.index[id]
  517. except KeyError:
  518. pass
  519. else:
  520. self.segments[segment] -= 1
  521. size = self.io.read(segment, offset, id, read_data=False)
  522. self.compact[segment] += size
  523. segment, size = self.io.write_delete(id)
  524. self.compact[segment] += size
  525. self.segments.setdefault(segment, 0)
  526. segment, offset = self.io.write_put(id, data)
  527. self.segments.setdefault(segment, 0)
  528. self.segments[segment] += 1
  529. self.index[id] = segment, offset
  530. def delete(self, id, wait=True):
  531. if not self._active_txn:
  532. self.prepare_txn(self.get_transaction_id())
  533. try:
  534. segment, offset = self.index.pop(id)
  535. except KeyError:
  536. raise self.ObjectNotFound(id, self.path) from None
  537. self.segments[segment] -= 1
  538. size = self.io.read(segment, offset, id, read_data=False)
  539. self.compact[segment] += size
  540. segment, size = self.io.write_delete(id)
  541. self.compact[segment] += size
  542. self.segments.setdefault(segment, 0)
  543. def preload(self, ids):
  544. """Preload objects (only applies to remote repositories)
  545. """
  546. class LoggedIO:
  547. class SegmentFull(Exception):
  548. """raised when a segment is full, before opening next"""
  549. header_fmt = struct.Struct('<IIB')
  550. assert header_fmt.size == 9
  551. put_header_fmt = struct.Struct('<IIB32s')
  552. assert put_header_fmt.size == 41
  553. header_no_crc_fmt = struct.Struct('<IB')
  554. assert header_no_crc_fmt.size == 5
  555. crc_fmt = struct.Struct('<I')
  556. assert crc_fmt.size == 4
  557. _commit = header_no_crc_fmt.pack(9, TAG_COMMIT)
  558. COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
  559. def __init__(self, path, limit, segments_per_dir, capacity=90):
  560. self.path = path
  561. self.fds = LRUCache(capacity,
  562. dispose=self.close_fd)
  563. self.segment = 0
  564. self.limit = limit
  565. self.segments_per_dir = segments_per_dir
  566. self.offset = 0
  567. self._write_fd = None
  568. def close(self):
  569. self.close_segment()
  570. self.fds.clear()
  571. self.fds = None # Just to make sure we're disabled
  572. def close_fd(self, fd):
  573. if hasattr(os, 'posix_fadvise'): # only on UNIX
  574. os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
  575. fd.close()
  576. def segment_iterator(self, reverse=False):
  577. data_path = os.path.join(self.path, 'data')
  578. dirs = sorted((dir for dir in os.listdir(data_path) if dir.isdigit()), key=int, reverse=reverse)
  579. for dir in dirs:
  580. filenames = os.listdir(os.path.join(data_path, dir))
  581. sorted_filenames = sorted((filename for filename in filenames
  582. if filename.isdigit()), key=int, reverse=reverse)
  583. for filename in sorted_filenames:
  584. yield int(filename), os.path.join(data_path, dir, filename)
  585. def get_latest_segment(self):
  586. for segment, filename in self.segment_iterator(reverse=True):
  587. return segment
  588. return None
  589. def get_segments_transaction_id(self):
  590. """Return the last committed segment.
  591. """
  592. for segment, filename in self.segment_iterator(reverse=True):
  593. if self.is_committed_segment(segment):
  594. return segment
  595. return None
  596. def cleanup(self, transaction_id):
  597. """Delete segment files left by aborted transactions
  598. """
  599. self.segment = transaction_id + 1
  600. for segment, filename in self.segment_iterator(reverse=True):
  601. if segment > transaction_id:
  602. os.unlink(filename)
  603. else:
  604. break
  605. def is_committed_segment(self, segment):
  606. """Check if segment ends with a COMMIT_TAG tag
  607. """
  608. try:
  609. iterator = self.iter_objects(segment)
  610. except IntegrityError:
  611. return False
  612. with open(self.segment_filename(segment), 'rb') as fd:
  613. try:
  614. fd.seek(-self.header_fmt.size, os.SEEK_END)
  615. except OSError as e:
  616. # return False if segment file is empty or too small
  617. if e.errno == errno.EINVAL:
  618. return False
  619. raise e
  620. if fd.read(self.header_fmt.size) != self.COMMIT:
  621. return False
  622. seen_commit = False
  623. while True:
  624. try:
  625. tag, key, offset, _ = next(iterator)
  626. except IntegrityError:
  627. return False
  628. except StopIteration:
  629. break
  630. if tag == TAG_COMMIT:
  631. seen_commit = True
  632. continue
  633. if seen_commit:
  634. return False
  635. return seen_commit
  636. def segment_filename(self, segment):
  637. return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
  638. def get_write_fd(self, no_new=False, raise_full=False):
  639. if not no_new and self.offset and self.offset > self.limit:
  640. if raise_full:
  641. raise self.SegmentFull
  642. self.close_segment()
  643. if not self._write_fd:
  644. if self.segment % self.segments_per_dir == 0:
  645. dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
  646. if not os.path.exists(dirname):
  647. os.mkdir(dirname)
  648. sync_dir(os.path.join(self.path, 'data'))
  649. self._write_fd = SyncFile(self.segment_filename(self.segment))
  650. self._write_fd.write(MAGIC)
  651. self.offset = MAGIC_LEN
  652. return self._write_fd
  653. def get_fd(self, segment):
  654. try:
  655. return self.fds[segment]
  656. except KeyError:
  657. fd = open(self.segment_filename(segment), 'rb')
  658. self.fds[segment] = fd
  659. return fd
  660. def close_segment(self):
  661. if self._write_fd:
  662. self.segment += 1
  663. self.offset = 0
  664. self._write_fd.close()
  665. self._write_fd = None
  666. def delete_segment(self, segment):
  667. if segment in self.fds:
  668. del self.fds[segment]
  669. try:
  670. os.unlink(self.segment_filename(segment))
  671. except FileNotFoundError:
  672. pass
  673. def segment_exists(self, segment):
  674. return os.path.exists(self.segment_filename(segment))
  675. def segment_size(self, segment):
  676. return os.path.getsize(self.segment_filename(segment))
  677. def iter_objects(self, segment, include_data=False, read_data=True):
  678. """
  679. Return object iterator for *segment*.
  680. If read_data is False then include_data must be False as well.
  681. Integrity checks are skipped: all data obtained from the iterator must be considered informational.
  682. The iterator returns four-tuples of (tag, key, offset, data|size).
  683. """
  684. fd = self.get_fd(segment)
  685. fd.seek(0)
  686. if fd.read(MAGIC_LEN) != MAGIC:
  687. raise IntegrityError('Invalid segment magic [segment {}, offset {}]'.format(segment, 0))
  688. offset = MAGIC_LEN
  689. header = fd.read(self.header_fmt.size)
  690. while header:
  691. size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset,
  692. (TAG_PUT, TAG_DELETE, TAG_COMMIT),
  693. read_data=read_data)
  694. if include_data:
  695. yield tag, key, offset, data
  696. else:
  697. yield tag, key, offset, size
  698. offset += size
  699. header = fd.read(self.header_fmt.size)
  700. def recover_segment(self, segment, filename):
  701. if segment in self.fds:
  702. del self.fds[segment]
  703. with open(filename, 'rb') as fd:
  704. data = memoryview(fd.read())
  705. os.rename(filename, filename + '.beforerecover')
  706. logger.info('attempting to recover ' + filename)
  707. with open(filename, 'wb') as fd:
  708. fd.write(MAGIC)
  709. while len(data) >= self.header_fmt.size:
  710. crc, size, tag = self.header_fmt.unpack(data[:self.header_fmt.size])
  711. if size < self.header_fmt.size or size > len(data):
  712. data = data[1:]
  713. continue
  714. if crc32(data[4:size]) & 0xffffffff != crc:
  715. data = data[1:]
  716. continue
  717. fd.write(data[:size])
  718. data = data[size:]
  719. def read(self, segment, offset, id, read_data=True):
  720. """
  721. Read entry from *segment* at *offset* with *id*.
  722. If read_data is False the size of the entry is returned instead and integrity checks are skipped.
  723. The return value should thus be considered informational.
  724. """
  725. if segment == self.segment and self._write_fd:
  726. self._write_fd.sync()
  727. fd = self.get_fd(segment)
  728. fd.seek(offset)
  729. header = fd.read(self.put_header_fmt.size)
  730. size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ), read_data)
  731. if id != key:
  732. raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
  733. segment, offset))
  734. return data if read_data else size
  735. def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=True):
  736. # some code shared by read() and iter_objects()
  737. try:
  738. hdr_tuple = fmt.unpack(header)
  739. except struct.error as err:
  740. raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format(
  741. segment, offset, err)) from None
  742. if fmt is self.put_header_fmt:
  743. crc, size, tag, key = hdr_tuple
  744. elif fmt is self.header_fmt:
  745. crc, size, tag = hdr_tuple
  746. key = None
  747. else:
  748. raise TypeError("_read called with unsupported format")
  749. if size > MAX_OBJECT_SIZE or size < fmt.size:
  750. raise IntegrityError('Invalid segment entry size [segment {}, offset {}]'.format(
  751. segment, offset))
  752. length = size - fmt.size
  753. if read_data:
  754. data = fd.read(length)
  755. if len(data) != length:
  756. raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
  757. segment, offset, length, len(data)))
  758. if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
  759. raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
  760. segment, offset))
  761. if key is None and tag in (TAG_PUT, TAG_DELETE):
  762. key, data = data[:32], data[32:]
  763. else:
  764. if key is None and tag in (TAG_PUT, TAG_DELETE):
  765. key = fd.read(32)
  766. length -= 32
  767. if len(key) != 32:
  768. raise IntegrityError('Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
  769. segment, offset, 32, len(key)))
  770. oldpos = fd.tell()
  771. seeked = fd.seek(length, os.SEEK_CUR) - oldpos
  772. data = None
  773. if seeked != length:
  774. raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format(
  775. segment, offset, length, seeked))
  776. if tag not in acceptable_tags:
  777. raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
  778. segment, offset))
  779. return size, tag, key, data
  780. def write_put(self, id, data, raise_full=False):
  781. fd = self.get_write_fd(raise_full=raise_full)
  782. size = len(data) + self.put_header_fmt.size
  783. offset = self.offset
  784. header = self.header_no_crc_fmt.pack(size, TAG_PUT)
  785. crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
  786. fd.write(b''.join((crc, header, id, data)))
  787. self.offset += size
  788. return self.segment, offset
  789. def write_delete(self, id, raise_full=False):
  790. fd = self.get_write_fd(raise_full=raise_full)
  791. header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
  792. crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
  793. fd.write(b''.join((crc, header, id)))
  794. self.offset += self.put_header_fmt.size
  795. return self.segment, self.put_header_fmt.size
  796. def write_commit(self):
  797. self.close_segment()
  798. fd = self.get_write_fd()
  799. header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
  800. crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
  801. fd.write(b''.join((crc, header)))
  802. self.close_segment()