2
0

repository.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743
  1. from configparser import ConfigParser
  2. from binascii import unhexlify
  3. from datetime import datetime
  4. from itertools import islice
  5. import errno
  6. import logging
  7. logger = logging.getLogger(__name__)
  8. import os
  9. import shutil
  10. import struct
  11. from zlib import crc32
  12. import msgpack
  13. from .constants import * # NOQA
  14. from .helpers import Error, ErrorWithTraceback, IntegrityError, Location, ProgressIndicatorPercent, bin_to_hex
  15. from .hashindex import NSIndex
  16. from .locking import UpgradableLock, LockError, LockErrorT
  17. from .lrucache import LRUCache
  18. from .platform import SyncFile, sync_dir
  19. MAX_OBJECT_SIZE = 20 * 1024 * 1024
  20. MAGIC = b'BORG_SEG'
  21. MAGIC_LEN = len(MAGIC)
  22. TAG_PUT = 0
  23. TAG_DELETE = 1
  24. TAG_COMMIT = 2
  25. class Repository:
  26. """Filesystem based transactional key value store
  27. On disk layout:
  28. dir/README
  29. dir/config
  30. dir/data/<X // SEGMENTS_PER_DIR>/<X>
  31. dir/index.X
  32. dir/hints.X
  33. """
  34. class DoesNotExist(Error):
  35. """Repository {} does not exist."""
  36. class AlreadyExists(Error):
  37. """Repository {} already exists."""
  38. class InvalidRepository(Error):
  39. """{} is not a valid repository. Check repo config."""
  40. class CheckNeeded(ErrorWithTraceback):
  41. """Inconsistency detected. Please run "borg check {}"."""
  42. class ObjectNotFound(ErrorWithTraceback):
  43. """Object with key {} not found in repository {}."""
  44. def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=True):
  45. self.path = os.path.abspath(path)
  46. self._location = Location('file://%s' % self.path)
  47. self.io = None
  48. self.lock = None
  49. self.index = None
  50. self._active_txn = False
  51. self.lock_wait = lock_wait
  52. self.do_lock = lock
  53. self.do_create = create
  54. self.exclusive = exclusive
  55. def __del__(self):
  56. if self.lock:
  57. self.close()
  58. assert False, "cleanup happened in Repository.__del__"
  59. def __repr__(self):
  60. return '<%s %s>' % (self.__class__.__name__, self.path)
  61. def __enter__(self):
  62. if self.do_create:
  63. self.do_create = False
  64. self.create(self.path)
  65. self.open(self.path, self.exclusive, lock_wait=self.lock_wait, lock=self.do_lock)
  66. return self
  67. def __exit__(self, exc_type, exc_val, exc_tb):
  68. if exc_type is not None:
  69. self.rollback()
  70. self.close()
  71. @property
  72. def id_str(self):
  73. return bin_to_hex(self.id)
  74. def create(self, path):
  75. """Create a new empty repository at `path`
  76. """
  77. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  78. raise self.AlreadyExists(path)
  79. if not os.path.exists(path):
  80. os.mkdir(path)
  81. with open(os.path.join(path, 'README'), 'w') as fd:
  82. fd.write('This is a Borg repository\n')
  83. os.mkdir(os.path.join(path, 'data'))
  84. config = ConfigParser(interpolation=None)
  85. config.add_section('repository')
  86. config.set('repository', 'version', '1')
  87. config.set('repository', 'segments_per_dir', str(DEFAULT_SEGMENTS_PER_DIR))
  88. config.set('repository', 'max_segment_size', str(DEFAULT_MAX_SEGMENT_SIZE))
  89. config.set('repository', 'append_only', '0')
  90. config.set('repository', 'id', bin_to_hex(os.urandom(32)))
  91. self.save_config(path, config)
  92. def save_config(self, path, config):
  93. config_path = os.path.join(path, 'config')
  94. with open(config_path, 'w') as fd:
  95. config.write(fd)
  96. def save_key(self, keydata):
  97. assert self.config
  98. keydata = keydata.decode('utf-8') # remote repo: msgpack issue #99, getting bytes
  99. self.config.set('repository', 'key', keydata)
  100. self.save_config(self.path, self.config)
  101. def load_key(self):
  102. keydata = self.config.get('repository', 'key')
  103. return keydata.encode('utf-8') # remote repo: msgpack issue #99, returning bytes
  104. def destroy(self):
  105. """Destroy the repository at `self.path`
  106. """
  107. if self.append_only:
  108. raise ValueError(self.path + " is in append-only mode")
  109. self.close()
  110. os.remove(os.path.join(self.path, 'config')) # kill config first
  111. shutil.rmtree(self.path)
  112. def get_index_transaction_id(self):
  113. indices = sorted((int(name[6:]) for name in os.listdir(self.path) if name.startswith('index.') and name[6:].isdigit()))
  114. if indices:
  115. return indices[-1]
  116. else:
  117. return None
  118. def get_transaction_id(self):
  119. index_transaction_id = self.get_index_transaction_id()
  120. segments_transaction_id = self.io.get_segments_transaction_id()
  121. if index_transaction_id is not None and segments_transaction_id is None:
  122. raise self.CheckNeeded(self.path)
  123. # Attempt to automatically rebuild index if we crashed between commit
  124. # tag write and index save
  125. if index_transaction_id != segments_transaction_id:
  126. if index_transaction_id is not None and index_transaction_id > segments_transaction_id:
  127. replay_from = None
  128. else:
  129. replay_from = index_transaction_id
  130. self.replay_segments(replay_from, segments_transaction_id)
  131. return self.get_index_transaction_id()
  132. def break_lock(self):
  133. UpgradableLock(os.path.join(self.path, 'lock')).break_lock()
  134. def open(self, path, exclusive, lock_wait=None, lock=True):
  135. self.path = path
  136. if not os.path.isdir(path):
  137. raise self.DoesNotExist(path)
  138. if lock:
  139. self.lock = UpgradableLock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait).acquire()
  140. else:
  141. self.lock = None
  142. self.config = ConfigParser(interpolation=None)
  143. self.config.read(os.path.join(self.path, 'config'))
  144. if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1:
  145. raise self.InvalidRepository(path)
  146. self.max_segment_size = self.config.getint('repository', 'max_segment_size')
  147. self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
  148. self.append_only = self.config.getboolean('repository', 'append_only', fallback=False)
  149. self.id = unhexlify(self.config.get('repository', 'id').strip())
  150. self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
  151. def close(self):
  152. if self.lock:
  153. if self.io:
  154. self.io.close()
  155. self.io = None
  156. self.lock.release()
  157. self.lock = None
  158. def commit(self, save_space=False):
  159. """Commit transaction
  160. """
  161. self.io.write_commit()
  162. if not self.append_only:
  163. self.compact_segments(save_space=save_space)
  164. self.write_index()
  165. self.rollback()
  166. def open_index(self, transaction_id):
  167. if transaction_id is None:
  168. return NSIndex()
  169. return NSIndex.read((os.path.join(self.path, 'index.%d') % transaction_id).encode('utf-8'))
  170. def prepare_txn(self, transaction_id, do_cleanup=True):
  171. self._active_txn = True
  172. try:
  173. self.lock.upgrade()
  174. except (LockError, LockErrorT):
  175. # if upgrading the lock to exclusive fails, we do not have an
  176. # active transaction. this is important for "serve" mode, where
  177. # the repository instance lives on - even if exceptions happened.
  178. self._active_txn = False
  179. raise
  180. if not self.index or transaction_id is None:
  181. self.index = self.open_index(transaction_id)
  182. if transaction_id is None:
  183. self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x]
  184. self.compact = set() # XXX bad name: segments_needing_compaction = self.compact
  185. else:
  186. if do_cleanup:
  187. self.io.cleanup(transaction_id)
  188. with open(os.path.join(self.path, 'hints.%d' % transaction_id), 'rb') as fd:
  189. hints = msgpack.unpack(fd)
  190. if hints[b'version'] != 1:
  191. raise ValueError('Unknown hints file version: %d' % hints['version'])
  192. self.segments = hints[b'segments']
  193. self.compact = set(hints[b'compact'])
  194. def write_index(self):
  195. hints = {b'version': 1,
  196. b'segments': self.segments,
  197. b'compact': list(self.compact)}
  198. transaction_id = self.io.get_segments_transaction_id()
  199. hints_file = os.path.join(self.path, 'hints.%d' % transaction_id)
  200. with open(hints_file + '.tmp', 'wb') as fd:
  201. msgpack.pack(hints, fd)
  202. fd.flush()
  203. os.fsync(fd.fileno())
  204. os.rename(hints_file + '.tmp', hints_file)
  205. self.index.write(os.path.join(self.path, 'index.tmp'))
  206. os.rename(os.path.join(self.path, 'index.tmp'),
  207. os.path.join(self.path, 'index.%d' % transaction_id))
  208. if self.append_only:
  209. with open(os.path.join(self.path, 'transactions'), 'a') as log:
  210. print('transaction %d, UTC time %s' % (transaction_id, datetime.utcnow().isoformat()), file=log)
  211. # Remove old indices
  212. current = '.%d' % transaction_id
  213. for name in os.listdir(self.path):
  214. if not name.startswith('index.') and not name.startswith('hints.'):
  215. continue
  216. if name.endswith(current):
  217. continue
  218. os.unlink(os.path.join(self.path, name))
  219. self.index = None
  220. def compact_segments(self, save_space=False):
  221. """Compact sparse segments by copying data into new segments
  222. """
  223. if not self.compact:
  224. return
  225. index_transaction_id = self.get_index_transaction_id()
  226. segments = self.segments
  227. unused = [] # list of segments, that are not used anymore
  228. def complete_xfer():
  229. # complete the transfer (usually exactly when some target segment
  230. # is full, or at the very end when everything is processed)
  231. nonlocal unused
  232. # commit the new, compact, used segments
  233. self.io.write_commit()
  234. # get rid of the old, sparse, unused segments. free space.
  235. for segment in unused:
  236. assert self.segments.pop(segment) == 0
  237. self.io.delete_segment(segment)
  238. unused = []
  239. for segment in sorted(self.compact):
  240. if self.io.segment_exists(segment):
  241. for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
  242. if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
  243. try:
  244. new_segment, offset = self.io.write_put(key, data, raise_full=save_space)
  245. except LoggedIO.SegmentFull:
  246. complete_xfer()
  247. new_segment, offset = self.io.write_put(key, data)
  248. self.index[key] = new_segment, offset
  249. segments.setdefault(new_segment, 0)
  250. segments[new_segment] += 1
  251. segments[segment] -= 1
  252. elif tag == TAG_DELETE:
  253. if index_transaction_id is None or segment > index_transaction_id:
  254. try:
  255. self.io.write_delete(key, raise_full=save_space)
  256. except LoggedIO.SegmentFull:
  257. complete_xfer()
  258. self.io.write_delete(key)
  259. assert segments[segment] == 0
  260. unused.append(segment)
  261. complete_xfer()
  262. self.compact = set()
  263. def replay_segments(self, index_transaction_id, segments_transaction_id):
  264. self.prepare_txn(index_transaction_id, do_cleanup=False)
  265. try:
  266. segment_count = sum(1 for _ in self.io.segment_iterator())
  267. pi = ProgressIndicatorPercent(total=segment_count, msg="Replaying segments %3.0f%%", same_line=True)
  268. for i, (segment, filename) in enumerate(self.io.segment_iterator()):
  269. pi.show(i)
  270. if index_transaction_id is not None and segment <= index_transaction_id:
  271. continue
  272. if segment > segments_transaction_id:
  273. break
  274. objects = self.io.iter_objects(segment)
  275. self._update_index(segment, objects)
  276. pi.finish()
  277. self.write_index()
  278. finally:
  279. self.rollback()
  280. def _update_index(self, segment, objects, report=None):
  281. """some code shared between replay_segments and check"""
  282. self.segments[segment] = 0
  283. for tag, key, offset in objects:
  284. if tag == TAG_PUT:
  285. try:
  286. s, _ = self.index[key]
  287. self.compact.add(s)
  288. self.segments[s] -= 1
  289. except KeyError:
  290. pass
  291. self.index[key] = segment, offset
  292. self.segments[segment] += 1
  293. elif tag == TAG_DELETE:
  294. try:
  295. s, _ = self.index.pop(key)
  296. self.segments[s] -= 1
  297. self.compact.add(s)
  298. except KeyError:
  299. pass
  300. self.compact.add(segment)
  301. elif tag == TAG_COMMIT:
  302. continue
  303. else:
  304. msg = 'Unexpected tag {} in segment {}'.format(tag, segment)
  305. if report is None:
  306. raise self.CheckNeeded(msg)
  307. else:
  308. report(msg)
  309. if self.segments[segment] == 0:
  310. self.compact.add(segment)
  311. def check(self, repair=False, save_space=False):
  312. """Check repository consistency
  313. This method verifies all segment checksums and makes sure
  314. the index is consistent with the data stored in the segments.
  315. """
  316. if self.append_only and repair:
  317. raise ValueError(self.path + " is in append-only mode")
  318. error_found = False
  319. def report_error(msg):
  320. nonlocal error_found
  321. error_found = True
  322. logger.error(msg)
  323. logger.info('Starting repository check')
  324. assert not self._active_txn
  325. try:
  326. transaction_id = self.get_transaction_id()
  327. current_index = self.open_index(transaction_id)
  328. except Exception:
  329. transaction_id = self.io.get_segments_transaction_id()
  330. current_index = None
  331. if transaction_id is None:
  332. transaction_id = self.get_index_transaction_id()
  333. if transaction_id is None:
  334. transaction_id = self.io.get_latest_segment()
  335. if repair:
  336. self.io.cleanup(transaction_id)
  337. segments_transaction_id = self.io.get_segments_transaction_id()
  338. self.prepare_txn(None) # self.index, self.compact, self.segments all empty now!
  339. segment_count = sum(1 for _ in self.io.segment_iterator())
  340. pi = ProgressIndicatorPercent(total=segment_count, msg="Checking segments %3.1f%%", step=0.1, same_line=True)
  341. for i, (segment, filename) in enumerate(self.io.segment_iterator()):
  342. pi.show(i)
  343. if segment > transaction_id:
  344. continue
  345. try:
  346. objects = list(self.io.iter_objects(segment))
  347. except IntegrityError as err:
  348. report_error(str(err))
  349. objects = []
  350. if repair:
  351. self.io.recover_segment(segment, filename)
  352. objects = list(self.io.iter_objects(segment))
  353. self._update_index(segment, objects, report_error)
  354. pi.finish()
  355. # self.index, self.segments, self.compact now reflect the state of the segment files up to <transaction_id>
  356. # We might need to add a commit tag if no committed segment is found
  357. if repair and segments_transaction_id is None:
  358. report_error('Adding commit tag to segment {}'.format(transaction_id))
  359. self.io.segment = transaction_id + 1
  360. self.io.write_commit()
  361. if current_index and not repair:
  362. # current_index = "as found on disk"
  363. # self.index = "as rebuilt in-memory from segments"
  364. if len(current_index) != len(self.index):
  365. report_error('Index object count mismatch. {} != {}'.format(len(current_index), len(self.index)))
  366. elif current_index:
  367. for key, value in self.index.iteritems():
  368. if current_index.get(key, (-1, -1)) != value:
  369. report_error('Index mismatch for key {}. {} != {}'.format(key, value, current_index.get(key, (-1, -1))))
  370. if repair:
  371. self.compact_segments(save_space=save_space)
  372. self.write_index()
  373. self.rollback()
  374. if error_found:
  375. if repair:
  376. logger.info('Completed repository check, errors found and repaired.')
  377. else:
  378. logger.error('Completed repository check, errors found.')
  379. else:
  380. logger.info('Completed repository check, no problems found.')
  381. return not error_found or repair
  382. def rollback(self):
  383. """
  384. """
  385. self.index = None
  386. self._active_txn = False
  387. def __len__(self):
  388. if not self.index:
  389. self.index = self.open_index(self.get_transaction_id())
  390. return len(self.index)
  391. def __contains__(self, id):
  392. if not self.index:
  393. self.index = self.open_index(self.get_transaction_id())
  394. return id in self.index
  395. def list(self, limit=None, marker=None):
  396. if not self.index:
  397. self.index = self.open_index(self.get_transaction_id())
  398. return [id_ for id_, _ in islice(self.index.iteritems(marker=marker), limit)]
  399. def get(self, id_):
  400. if not self.index:
  401. self.index = self.open_index(self.get_transaction_id())
  402. try:
  403. segment, offset = self.index[id_]
  404. return self.io.read(segment, offset, id_)
  405. except KeyError:
  406. raise self.ObjectNotFound(id_, self.path) from None
  407. def get_many(self, ids, is_preloaded=False):
  408. for id_ in ids:
  409. yield self.get(id_)
  410. def put(self, id, data, wait=True):
  411. if not self._active_txn:
  412. self.prepare_txn(self.get_transaction_id())
  413. try:
  414. segment, _ = self.index[id]
  415. self.segments[segment] -= 1
  416. self.compact.add(segment)
  417. segment = self.io.write_delete(id)
  418. self.segments.setdefault(segment, 0)
  419. self.compact.add(segment)
  420. except KeyError:
  421. pass
  422. segment, offset = self.io.write_put(id, data)
  423. self.segments.setdefault(segment, 0)
  424. self.segments[segment] += 1
  425. self.index[id] = segment, offset
  426. def delete(self, id, wait=True):
  427. if not self._active_txn:
  428. self.prepare_txn(self.get_transaction_id())
  429. try:
  430. segment, offset = self.index.pop(id)
  431. except KeyError:
  432. raise self.ObjectNotFound(id, self.path) from None
  433. self.segments[segment] -= 1
  434. self.compact.add(segment)
  435. segment = self.io.write_delete(id)
  436. self.compact.add(segment)
  437. self.segments.setdefault(segment, 0)
  438. def preload(self, ids):
  439. """Preload objects (only applies to remote repositories)
  440. """
  441. class LoggedIO:
  442. class SegmentFull(Exception):
  443. """raised when a segment is full, before opening next"""
  444. header_fmt = struct.Struct('<IIB')
  445. assert header_fmt.size == 9
  446. put_header_fmt = struct.Struct('<IIB32s')
  447. assert put_header_fmt.size == 41
  448. header_no_crc_fmt = struct.Struct('<IB')
  449. assert header_no_crc_fmt.size == 5
  450. crc_fmt = struct.Struct('<I')
  451. assert crc_fmt.size == 4
  452. _commit = header_no_crc_fmt.pack(9, TAG_COMMIT)
  453. COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
  454. def __init__(self, path, limit, segments_per_dir, capacity=90):
  455. self.path = path
  456. self.fds = LRUCache(capacity,
  457. dispose=self.close_fd)
  458. self.segment = 0
  459. self.limit = limit
  460. self.segments_per_dir = segments_per_dir
  461. self.offset = 0
  462. self._write_fd = None
  463. def close(self):
  464. self.close_segment()
  465. self.fds.clear()
  466. self.fds = None # Just to make sure we're disabled
  467. def close_fd(self, fd):
  468. if hasattr(os, 'posix_fadvise'): # only on UNIX
  469. os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
  470. fd.close()
  471. def segment_iterator(self, reverse=False):
  472. data_path = os.path.join(self.path, 'data')
  473. dirs = sorted((dir for dir in os.listdir(data_path) if dir.isdigit()), key=int, reverse=reverse)
  474. for dir in dirs:
  475. filenames = os.listdir(os.path.join(data_path, dir))
  476. sorted_filenames = sorted((filename for filename in filenames
  477. if filename.isdigit()), key=int, reverse=reverse)
  478. for filename in sorted_filenames:
  479. yield int(filename), os.path.join(data_path, dir, filename)
  480. def get_latest_segment(self):
  481. for segment, filename in self.segment_iterator(reverse=True):
  482. return segment
  483. return None
  484. def get_segments_transaction_id(self):
  485. """Return the last committed segment.
  486. """
  487. for segment, filename in self.segment_iterator(reverse=True):
  488. if self.is_committed_segment(segment):
  489. return segment
  490. return None
  491. def cleanup(self, transaction_id):
  492. """Delete segment files left by aborted transactions
  493. """
  494. self.segment = transaction_id + 1
  495. for segment, filename in self.segment_iterator(reverse=True):
  496. if segment > transaction_id:
  497. os.unlink(filename)
  498. else:
  499. break
  500. def is_committed_segment(self, segment):
  501. """Check if segment ends with a COMMIT_TAG tag
  502. """
  503. try:
  504. iterator = self.iter_objects(segment)
  505. except IntegrityError:
  506. return False
  507. with open(self.segment_filename(segment), 'rb') as fd:
  508. try:
  509. fd.seek(-self.header_fmt.size, os.SEEK_END)
  510. except OSError as e:
  511. # return False if segment file is empty or too small
  512. if e.errno == errno.EINVAL:
  513. return False
  514. raise e
  515. if fd.read(self.header_fmt.size) != self.COMMIT:
  516. return False
  517. seen_commit = False
  518. while True:
  519. try:
  520. tag, key, offset = next(iterator)
  521. except IntegrityError:
  522. return False
  523. except StopIteration:
  524. break
  525. if tag == TAG_COMMIT:
  526. seen_commit = True
  527. continue
  528. if seen_commit:
  529. return False
  530. return seen_commit
  531. def segment_filename(self, segment):
  532. return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
  533. def get_write_fd(self, no_new=False, raise_full=False):
  534. if not no_new and self.offset and self.offset > self.limit:
  535. if raise_full:
  536. raise self.SegmentFull
  537. self.close_segment()
  538. if not self._write_fd:
  539. if self.segment % self.segments_per_dir == 0:
  540. dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
  541. if not os.path.exists(dirname):
  542. os.mkdir(dirname)
  543. sync_dir(os.path.join(self.path, 'data'))
  544. self._write_fd = SyncFile(self.segment_filename(self.segment))
  545. self._write_fd.write(MAGIC)
  546. self.offset = MAGIC_LEN
  547. return self._write_fd
  548. def get_fd(self, segment):
  549. try:
  550. return self.fds[segment]
  551. except KeyError:
  552. fd = open(self.segment_filename(segment), 'rb')
  553. self.fds[segment] = fd
  554. return fd
  555. def close_segment(self):
  556. if self._write_fd:
  557. self.segment += 1
  558. self.offset = 0
  559. self._write_fd.close()
  560. self._write_fd = None
  561. def delete_segment(self, segment):
  562. if segment in self.fds:
  563. del self.fds[segment]
  564. try:
  565. os.unlink(self.segment_filename(segment))
  566. except FileNotFoundError:
  567. pass
  568. def segment_exists(self, segment):
  569. return os.path.exists(self.segment_filename(segment))
  570. def iter_objects(self, segment, include_data=False):
  571. fd = self.get_fd(segment)
  572. fd.seek(0)
  573. if fd.read(MAGIC_LEN) != MAGIC:
  574. raise IntegrityError('Invalid segment magic [segment {}, offset {}]'.format(segment, 0))
  575. offset = MAGIC_LEN
  576. header = fd.read(self.header_fmt.size)
  577. while header:
  578. size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset,
  579. (TAG_PUT, TAG_DELETE, TAG_COMMIT))
  580. if include_data:
  581. yield tag, key, offset, data
  582. else:
  583. yield tag, key, offset
  584. offset += size
  585. header = fd.read(self.header_fmt.size)
  586. def recover_segment(self, segment, filename):
  587. if segment in self.fds:
  588. del self.fds[segment]
  589. with open(filename, 'rb') as fd:
  590. data = memoryview(fd.read())
  591. os.rename(filename, filename + '.beforerecover')
  592. logger.info('attempting to recover ' + filename)
  593. with open(filename, 'wb') as fd:
  594. fd.write(MAGIC)
  595. while len(data) >= self.header_fmt.size:
  596. crc, size, tag = self.header_fmt.unpack(data[:self.header_fmt.size])
  597. if size < self.header_fmt.size or size > len(data):
  598. data = data[1:]
  599. continue
  600. if crc32(data[4:size]) & 0xffffffff != crc:
  601. data = data[1:]
  602. continue
  603. fd.write(data[:size])
  604. data = data[size:]
  605. def read(self, segment, offset, id):
  606. if segment == self.segment and self._write_fd:
  607. self._write_fd.sync()
  608. fd = self.get_fd(segment)
  609. fd.seek(offset)
  610. header = fd.read(self.put_header_fmt.size)
  611. size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ))
  612. if id != key:
  613. raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
  614. segment, offset))
  615. return data
  616. def _read(self, fd, fmt, header, segment, offset, acceptable_tags):
  617. # some code shared by read() and iter_objects()
  618. try:
  619. hdr_tuple = fmt.unpack(header)
  620. except struct.error as err:
  621. raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format(
  622. segment, offset, err)) from None
  623. if fmt is self.put_header_fmt:
  624. crc, size, tag, key = hdr_tuple
  625. elif fmt is self.header_fmt:
  626. crc, size, tag = hdr_tuple
  627. key = None
  628. else:
  629. raise TypeError("_read called with unsupported format")
  630. if size > MAX_OBJECT_SIZE or size < fmt.size:
  631. raise IntegrityError('Invalid segment entry size [segment {}, offset {}]'.format(
  632. segment, offset))
  633. length = size - fmt.size
  634. data = fd.read(length)
  635. if len(data) != length:
  636. raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
  637. segment, offset, length, len(data)))
  638. if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
  639. raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
  640. segment, offset))
  641. if tag not in acceptable_tags:
  642. raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
  643. segment, offset))
  644. if key is None and tag in (TAG_PUT, TAG_DELETE):
  645. key, data = data[:32], data[32:]
  646. return size, tag, key, data
  647. def write_put(self, id, data, raise_full=False):
  648. fd = self.get_write_fd(raise_full=raise_full)
  649. size = len(data) + self.put_header_fmt.size
  650. offset = self.offset
  651. header = self.header_no_crc_fmt.pack(size, TAG_PUT)
  652. crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
  653. fd.write(b''.join((crc, header, id, data)))
  654. self.offset += size
  655. return self.segment, offset
  656. def write_delete(self, id, raise_full=False):
  657. fd = self.get_write_fd(raise_full=raise_full)
  658. header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
  659. crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
  660. fd.write(b''.join((crc, header, id)))
  661. self.offset += self.put_header_fmt.size
  662. return self.segment
  663. def write_commit(self):
  664. fd = self.get_write_fd(no_new=True)
  665. fd.sync()
  666. header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
  667. crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
  668. fd.write(b''.join((crc, header)))
  669. self.close_segment()