repository.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650
  1. from configparser import RawConfigParser
  2. from binascii import hexlify
  3. from itertools import islice
  4. import errno
  5. import logging
  6. logger = logging.getLogger(__name__)
  7. import os
  8. import shutil
  9. import struct
  10. import sys
  11. from zlib import crc32
  12. from .helpers import Error, IntegrityError, read_msgpack, write_msgpack, unhexlify, have_cython
  13. if have_cython():
  14. from .hashindex import NSIndex
  15. from .locking import UpgradableLock
  16. from .lrucache import LRUCache
  17. MAX_OBJECT_SIZE = 20 * 1024 * 1024
  18. MAGIC = b'BORG_SEG'
  19. MAGIC_LEN = len(MAGIC)
  20. TAG_PUT = 0
  21. TAG_DELETE = 1
  22. TAG_COMMIT = 2
  23. class Repository:
  24. """Filesystem based transactional key value store
  25. On disk layout:
  26. dir/README
  27. dir/config
  28. dir/data/<X / SEGMENTS_PER_DIR>/<X>
  29. dir/index.X
  30. dir/hints.X
  31. """
  32. DEFAULT_MAX_SEGMENT_SIZE = 5 * 1024 * 1024
  33. DEFAULT_SEGMENTS_PER_DIR = 10000
  34. class DoesNotExist(Error):
  35. """Repository {} does not exist."""
  36. class AlreadyExists(Error):
  37. """Repository {} already exists."""
  38. class InvalidRepository(Error):
  39. """{} is not a valid repository."""
  40. class CheckNeeded(Error):
  41. """Inconsistency detected. Please run "borg check {}"."""
  42. class ObjectNotFound(Error):
  43. """Object with key {} not found in repository {}."""
  44. def __init__(self, path, create=False, exclusive=False):
  45. self.path = os.path.abspath(path)
  46. self.io = None
  47. self.lock = None
  48. self.index = None
  49. self._active_txn = False
  50. if create:
  51. self.create(self.path)
  52. self.open(self.path, exclusive)
  53. def __del__(self):
  54. self.close()
  55. def __repr__(self):
  56. return '<%s %s>' % (self.__class__.__name__, self.path)
  57. def create(self, path):
  58. """Create a new empty repository at `path`
  59. """
  60. if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
  61. raise self.AlreadyExists(path)
  62. if not os.path.exists(path):
  63. os.mkdir(path)
  64. with open(os.path.join(path, 'README'), 'w') as fd:
  65. fd.write('This is a Borg repository\n')
  66. os.mkdir(os.path.join(path, 'data'))
  67. config = RawConfigParser()
  68. config.add_section('repository')
  69. config.set('repository', 'version', '1')
  70. config.set('repository', 'segments_per_dir', self.DEFAULT_SEGMENTS_PER_DIR)
  71. config.set('repository', 'max_segment_size', self.DEFAULT_MAX_SEGMENT_SIZE)
  72. config.set('repository', 'id', hexlify(os.urandom(32)).decode('ascii'))
  73. self.save_config(path, config)
  74. def save_config(self, path, config):
  75. config_path = os.path.join(path, 'config')
  76. with open(config_path, 'w') as fd:
  77. config.write(fd)
  78. def save_key(self, keydata):
  79. assert self.config
  80. keydata = keydata.decode('utf-8') # remote repo: msgpack issue #99, getting bytes
  81. self.config.set('repository', 'key', keydata)
  82. self.save_config(self.path, self.config)
  83. def load_key(self):
  84. keydata = self.config.get('repository', 'key')
  85. return keydata.encode('utf-8') # remote repo: msgpack issue #99, returning bytes
  86. def destroy(self):
  87. """Destroy the repository at `self.path`
  88. """
  89. self.close()
  90. os.remove(os.path.join(self.path, 'config')) # kill config first
  91. shutil.rmtree(self.path)
  92. def get_index_transaction_id(self):
  93. indices = sorted((int(name[6:]) for name in os.listdir(self.path) if name.startswith('index.') and name[6:].isdigit()))
  94. if indices:
  95. return indices[-1]
  96. else:
  97. return None
  98. def get_transaction_id(self):
  99. index_transaction_id = self.get_index_transaction_id()
  100. segments_transaction_id = self.io.get_segments_transaction_id()
  101. if index_transaction_id is not None and segments_transaction_id is None:
  102. raise self.CheckNeeded(self.path)
  103. # Attempt to automatically rebuild index if we crashed between commit
  104. # tag write and index save
  105. if index_transaction_id != segments_transaction_id:
  106. if index_transaction_id is not None and index_transaction_id > segments_transaction_id:
  107. replay_from = None
  108. else:
  109. replay_from = index_transaction_id
  110. self.replay_segments(replay_from, segments_transaction_id)
  111. return self.get_index_transaction_id()
  112. def open(self, path, exclusive):
  113. self.path = path
  114. if not os.path.isdir(path):
  115. raise self.DoesNotExist(path)
  116. self.lock = UpgradableLock(os.path.join(path, 'lock'), exclusive).acquire()
  117. self.config = RawConfigParser()
  118. self.config.read(os.path.join(self.path, 'config'))
  119. if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1:
  120. raise self.InvalidRepository(path)
  121. self.max_segment_size = self.config.getint('repository', 'max_segment_size')
  122. self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
  123. self.id = unhexlify(self.config.get('repository', 'id').strip())
  124. self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
  125. def close(self):
  126. if self.lock:
  127. if self.io:
  128. self.io.close()
  129. self.io = None
  130. self.lock.release()
  131. self.lock = None
  132. def commit(self):
  133. """Commit transaction
  134. """
  135. self.io.write_commit()
  136. self.compact_segments()
  137. self.write_index()
  138. self.rollback()
  139. def open_index(self, transaction_id):
  140. if transaction_id is None:
  141. return NSIndex()
  142. return NSIndex.read((os.path.join(self.path, 'index.%d') % transaction_id).encode('utf-8'))
  143. def prepare_txn(self, transaction_id, do_cleanup=True):
  144. self._active_txn = True
  145. try:
  146. self.lock.upgrade()
  147. except UpgradableLock.ExclusiveLockFailed:
  148. # if upgrading the lock to exclusive fails, we do not have an
  149. # active transaction. this is important for "serve" mode, where
  150. # the repository instance lives on - even if exceptions happened.
  151. self._active_txn = False
  152. raise
  153. if not self.index:
  154. self.index = self.open_index(transaction_id)
  155. if transaction_id is None:
  156. self.segments = {}
  157. self.compact = set()
  158. else:
  159. if do_cleanup:
  160. self.io.cleanup(transaction_id)
  161. hints = read_msgpack(os.path.join(self.path, 'hints.%d' % transaction_id))
  162. if hints[b'version'] != 1:
  163. raise ValueError('Unknown hints file version: %d' % hints['version'])
  164. self.segments = hints[b'segments']
  165. self.compact = set(hints[b'compact'])
  166. def write_index(self):
  167. hints = {b'version': 1,
  168. b'segments': self.segments,
  169. b'compact': list(self.compact)}
  170. transaction_id = self.io.get_segments_transaction_id()
  171. write_msgpack(os.path.join(self.path, 'hints.%d' % transaction_id), hints)
  172. self.index.write(os.path.join(self.path, 'index.tmp'))
  173. os.rename(os.path.join(self.path, 'index.tmp'),
  174. os.path.join(self.path, 'index.%d' % transaction_id))
  175. # Remove old indices
  176. current = '.%d' % transaction_id
  177. for name in os.listdir(self.path):
  178. if not name.startswith('index.') and not name.startswith('hints.'):
  179. continue
  180. if name.endswith(current):
  181. continue
  182. os.unlink(os.path.join(self.path, name))
  183. self.index = None
  184. def compact_segments(self):
  185. """Compact sparse segments by copying data into new segments
  186. """
  187. if not self.compact:
  188. return
  189. index_transaction_id = self.get_index_transaction_id()
  190. segments = self.segments
  191. for segment in sorted(self.compact):
  192. if self.io.segment_exists(segment):
  193. for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
  194. if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
  195. new_segment, offset = self.io.write_put(key, data)
  196. self.index[key] = new_segment, offset
  197. segments.setdefault(new_segment, 0)
  198. segments[new_segment] += 1
  199. segments[segment] -= 1
  200. elif tag == TAG_DELETE:
  201. if index_transaction_id is None or segment > index_transaction_id:
  202. self.io.write_delete(key)
  203. assert segments[segment] == 0
  204. self.io.write_commit()
  205. for segment in sorted(self.compact):
  206. assert self.segments.pop(segment) == 0
  207. self.io.delete_segment(segment)
  208. self.compact = set()
  209. def replay_segments(self, index_transaction_id, segments_transaction_id):
  210. self.prepare_txn(index_transaction_id, do_cleanup=False)
  211. for segment, filename in self.io.segment_iterator():
  212. if index_transaction_id is not None and segment <= index_transaction_id:
  213. continue
  214. if segment > segments_transaction_id:
  215. break
  216. self.segments[segment] = 0
  217. for tag, key, offset in self.io.iter_objects(segment):
  218. if tag == TAG_PUT:
  219. try:
  220. s, _ = self.index[key]
  221. self.compact.add(s)
  222. self.segments[s] -= 1
  223. except KeyError:
  224. pass
  225. self.index[key] = segment, offset
  226. self.segments[segment] += 1
  227. elif tag == TAG_DELETE:
  228. try:
  229. s, _ = self.index.pop(key)
  230. self.segments[s] -= 1
  231. self.compact.add(s)
  232. except KeyError:
  233. pass
  234. self.compact.add(segment)
  235. elif tag == TAG_COMMIT:
  236. continue
  237. else:
  238. raise self.CheckNeeded(self.path)
  239. if self.segments[segment] == 0:
  240. self.compact.add(segment)
  241. self.write_index()
  242. self.rollback()
  243. def check(self, repair=False):
  244. """Check repository consistency
  245. This method verifies all segment checksums and makes sure
  246. the index is consistent with the data stored in the segments.
  247. """
  248. error_found = False
  249. def report_error(msg):
  250. nonlocal error_found
  251. error_found = True
  252. logger.error(msg)
  253. assert not self._active_txn
  254. try:
  255. transaction_id = self.get_transaction_id()
  256. current_index = self.open_index(transaction_id)
  257. except Exception:
  258. transaction_id = self.io.get_segments_transaction_id()
  259. current_index = None
  260. if transaction_id is None:
  261. transaction_id = self.get_index_transaction_id()
  262. if transaction_id is None:
  263. transaction_id = self.io.get_latest_segment()
  264. if repair:
  265. self.io.cleanup(transaction_id)
  266. segments_transaction_id = self.io.get_segments_transaction_id()
  267. self.prepare_txn(None)
  268. for segment, filename in self.io.segment_iterator():
  269. if segment > transaction_id:
  270. continue
  271. try:
  272. objects = list(self.io.iter_objects(segment))
  273. except IntegrityError as err:
  274. report_error(str(err))
  275. objects = []
  276. if repair:
  277. self.io.recover_segment(segment, filename)
  278. objects = list(self.io.iter_objects(segment))
  279. self.segments[segment] = 0
  280. for tag, key, offset in objects:
  281. if tag == TAG_PUT:
  282. try:
  283. s, _ = self.index[key]
  284. self.compact.add(s)
  285. self.segments[s] -= 1
  286. except KeyError:
  287. pass
  288. self.index[key] = segment, offset
  289. self.segments[segment] += 1
  290. elif tag == TAG_DELETE:
  291. try:
  292. s, _ = self.index.pop(key)
  293. self.segments[s] -= 1
  294. self.compact.add(s)
  295. except KeyError:
  296. pass
  297. self.compact.add(segment)
  298. elif tag == TAG_COMMIT:
  299. continue
  300. else:
  301. report_error('Unexpected tag {} in segment {}'.format(tag, segment))
  302. # We might need to add a commit tag if no committed segment is found
  303. if repair and segments_transaction_id is None:
  304. report_error('Adding commit tag to segment {}'.format(transaction_id))
  305. self.io.segment = transaction_id + 1
  306. self.io.write_commit()
  307. if current_index and not repair:
  308. if len(current_index) != len(self.index):
  309. report_error('Index object count mismatch. {} != {}'.format(len(current_index), len(self.index)))
  310. elif current_index:
  311. for key, value in self.index.iteritems():
  312. if current_index.get(key, (-1, -1)) != value:
  313. report_error('Index mismatch for key {}. {} != {}'.format(key, value, current_index.get(key, (-1, -1))))
  314. if repair:
  315. self.compact_segments()
  316. self.write_index()
  317. self.rollback()
  318. return not error_found or repair
  319. def rollback(self):
  320. """
  321. """
  322. self.index = None
  323. self._active_txn = False
  324. def __len__(self):
  325. if not self.index:
  326. self.index = self.open_index(self.get_transaction_id())
  327. return len(self.index)
  328. def __contains__(self, id):
  329. if not self.index:
  330. self.index = self.open_index(self.get_transaction_id())
  331. return id in self.index
  332. def list(self, limit=None, marker=None):
  333. if not self.index:
  334. self.index = self.open_index(self.get_transaction_id())
  335. return [id_ for id_, _ in islice(self.index.iteritems(marker=marker), limit)]
  336. def get(self, id_):
  337. if not self.index:
  338. self.index = self.open_index(self.get_transaction_id())
  339. try:
  340. segment, offset = self.index[id_]
  341. return self.io.read(segment, offset, id_)
  342. except KeyError:
  343. raise self.ObjectNotFound(id_, self.path)
  344. def get_many(self, ids, is_preloaded=False):
  345. for id_ in ids:
  346. yield self.get(id_)
  347. def put(self, id, data, wait=True):
  348. if not self._active_txn:
  349. self.prepare_txn(self.get_transaction_id())
  350. try:
  351. segment, _ = self.index[id]
  352. self.segments[segment] -= 1
  353. self.compact.add(segment)
  354. segment = self.io.write_delete(id)
  355. self.segments.setdefault(segment, 0)
  356. self.compact.add(segment)
  357. except KeyError:
  358. pass
  359. segment, offset = self.io.write_put(id, data)
  360. self.segments.setdefault(segment, 0)
  361. self.segments[segment] += 1
  362. self.index[id] = segment, offset
  363. def delete(self, id, wait=True):
  364. if not self._active_txn:
  365. self.prepare_txn(self.get_transaction_id())
  366. try:
  367. segment, offset = self.index.pop(id)
  368. except KeyError:
  369. raise self.ObjectNotFound(id, self.path)
  370. self.segments[segment] -= 1
  371. self.compact.add(segment)
  372. segment = self.io.write_delete(id)
  373. self.compact.add(segment)
  374. self.segments.setdefault(segment, 0)
  375. def preload(self, ids):
  376. """Preload objects (only applies to remote repositories)
  377. """
  378. class LoggedIO:
  379. header_fmt = struct.Struct('<IIB')
  380. assert header_fmt.size == 9
  381. put_header_fmt = struct.Struct('<IIB32s')
  382. assert put_header_fmt.size == 41
  383. header_no_crc_fmt = struct.Struct('<IB')
  384. assert header_no_crc_fmt.size == 5
  385. crc_fmt = struct.Struct('<I')
  386. assert crc_fmt.size == 4
  387. _commit = header_no_crc_fmt.pack(9, TAG_COMMIT)
  388. COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
  389. def __init__(self, path, limit, segments_per_dir, capacity=90):
  390. self.path = path
  391. self.fds = LRUCache(capacity,
  392. dispose=lambda fd: fd.close())
  393. self.segment = 0
  394. self.limit = limit
  395. self.segments_per_dir = segments_per_dir
  396. self.offset = 0
  397. self._write_fd = None
  398. def close(self):
  399. self.close_segment()
  400. self.fds.clear()
  401. self.fds = None # Just to make sure we're disabled
  402. def segment_iterator(self, reverse=False):
  403. data_path = os.path.join(self.path, 'data')
  404. dirs = sorted((dir for dir in os.listdir(data_path) if dir.isdigit()), key=int, reverse=reverse)
  405. for dir in dirs:
  406. filenames = os.listdir(os.path.join(data_path, dir))
  407. sorted_filenames = sorted((filename for filename in filenames
  408. if filename.isdigit()), key=int, reverse=reverse)
  409. for filename in sorted_filenames:
  410. yield int(filename), os.path.join(data_path, dir, filename)
  411. def get_latest_segment(self):
  412. for segment, filename in self.segment_iterator(reverse=True):
  413. return segment
  414. return None
  415. def get_segments_transaction_id(self):
  416. """Verify that the transaction id is consistent with the index transaction id
  417. """
  418. for segment, filename in self.segment_iterator(reverse=True):
  419. if self.is_committed_segment(filename):
  420. return segment
  421. return None
  422. def cleanup(self, transaction_id):
  423. """Delete segment files left by aborted transactions
  424. """
  425. self.segment = transaction_id + 1
  426. for segment, filename in self.segment_iterator(reverse=True):
  427. if segment > transaction_id:
  428. os.unlink(filename)
  429. else:
  430. break
  431. def is_committed_segment(self, filename):
  432. """Check if segment ends with a COMMIT_TAG tag
  433. """
  434. with open(filename, 'rb') as fd:
  435. try:
  436. fd.seek(-self.header_fmt.size, os.SEEK_END)
  437. except OSError as e:
  438. # return False if segment file is empty or too small
  439. if e.errno == errno.EINVAL:
  440. return False
  441. raise e
  442. return fd.read(self.header_fmt.size) == self.COMMIT
  443. def segment_filename(self, segment):
  444. return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
  445. def get_write_fd(self, no_new=False):
  446. if not no_new and self.offset and self.offset > self.limit:
  447. self.close_segment()
  448. if not self._write_fd:
  449. if self.segment % self.segments_per_dir == 0:
  450. dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
  451. if not os.path.exists(dirname):
  452. os.mkdir(dirname)
  453. self._write_fd = open(self.segment_filename(self.segment), 'ab')
  454. self._write_fd.write(MAGIC)
  455. self.offset = MAGIC_LEN
  456. return self._write_fd
  457. def get_fd(self, segment):
  458. try:
  459. return self.fds[segment]
  460. except KeyError:
  461. fd = open(self.segment_filename(segment), 'rb')
  462. self.fds[segment] = fd
  463. return fd
  464. def delete_segment(self, segment):
  465. if segment in self.fds:
  466. del self.fds[segment]
  467. try:
  468. os.unlink(self.segment_filename(segment))
  469. except OSError:
  470. pass
  471. def segment_exists(self, segment):
  472. return os.path.exists(self.segment_filename(segment))
  473. def iter_objects(self, segment, include_data=False):
  474. fd = self.get_fd(segment)
  475. fd.seek(0)
  476. if fd.read(MAGIC_LEN) != MAGIC:
  477. raise IntegrityError('Invalid segment magic [segment {}, offset {}]'.format(segment, 0))
  478. offset = MAGIC_LEN
  479. header = fd.read(self.header_fmt.size)
  480. while header:
  481. size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset,
  482. (TAG_PUT, TAG_DELETE, TAG_COMMIT))
  483. if include_data:
  484. yield tag, key, offset, data
  485. else:
  486. yield tag, key, offset
  487. offset += size
  488. header = fd.read(self.header_fmt.size)
  489. def recover_segment(self, segment, filename):
  490. if segment in self.fds:
  491. del self.fds[segment]
  492. with open(filename, 'rb') as fd:
  493. data = memoryview(fd.read())
  494. os.rename(filename, filename + '.beforerecover')
  495. logger.info('attempting to recover ' + filename)
  496. with open(filename, 'wb') as fd:
  497. fd.write(MAGIC)
  498. while len(data) >= self.header_fmt.size:
  499. crc, size, tag = self.header_fmt.unpack(data[:self.header_fmt.size])
  500. if size < self.header_fmt.size or size > len(data):
  501. data = data[1:]
  502. continue
  503. if crc32(data[4:size]) & 0xffffffff != crc:
  504. data = data[1:]
  505. continue
  506. fd.write(data[:size])
  507. data = data[size:]
  508. def read(self, segment, offset, id):
  509. if segment == self.segment and self._write_fd:
  510. self._write_fd.flush()
  511. fd = self.get_fd(segment)
  512. fd.seek(offset)
  513. header = fd.read(self.put_header_fmt.size)
  514. size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ))
  515. if id != key:
  516. raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
  517. segment, offset))
  518. return data
  519. def _read(self, fd, fmt, header, segment, offset, acceptable_tags):
  520. # some code shared by read() and iter_objects()
  521. try:
  522. hdr_tuple = fmt.unpack(header)
  523. except struct.error as err:
  524. raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format(
  525. segment, offset, err))
  526. if fmt is self.put_header_fmt:
  527. crc, size, tag, key = hdr_tuple
  528. elif fmt is self.header_fmt:
  529. crc, size, tag = hdr_tuple
  530. key = None
  531. else:
  532. raise TypeError("_read called with unsupported format")
  533. if size > MAX_OBJECT_SIZE or size < fmt.size:
  534. raise IntegrityError('Invalid segment entry size [segment {}, offset {}]'.format(
  535. segment, offset))
  536. length = size - fmt.size
  537. data = fd.read(length)
  538. if len(data) != length:
  539. raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
  540. segment, offset, length, len(data)))
  541. if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
  542. raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
  543. segment, offset))
  544. if tag not in acceptable_tags:
  545. raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
  546. segment, offset))
  547. if key is None and tag in (TAG_PUT, TAG_DELETE):
  548. key, data = data[:32], data[32:]
  549. return size, tag, key, data
  550. def write_put(self, id, data):
  551. size = len(data) + self.put_header_fmt.size
  552. fd = self.get_write_fd()
  553. offset = self.offset
  554. header = self.header_no_crc_fmt.pack(size, TAG_PUT)
  555. crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
  556. fd.write(b''.join((crc, header, id, data)))
  557. self.offset += size
  558. return self.segment, offset
  559. def write_delete(self, id):
  560. fd = self.get_write_fd()
  561. header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
  562. crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
  563. fd.write(b''.join((crc, header, id)))
  564. self.offset += self.put_header_fmt.size
  565. return self.segment
  566. def write_commit(self):
  567. fd = self.get_write_fd(no_new=True)
  568. header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
  569. crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
  570. fd.write(b''.join((crc, header)))
  571. self.close_segment()
  572. def close_segment(self):
  573. if self._write_fd:
  574. self.segment += 1
  575. self.offset = 0
  576. self._write_fd.flush()
  577. os.fsync(self._write_fd.fileno())
  578. if hasattr(os, 'posix_fadvise'): # python >= 3.3, only on UNIX
  579. # tell the OS that it does not need to cache what we just wrote,
  580. # avoids spoiling the cache for the OS and other processes.
  581. os.posix_fadvise(self._write_fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
  582. self._write_fd.close()
  583. self._write_fd = None