|
@@ -3,6 +3,7 @@ import mmap
|
|
|
import os
|
|
|
import shutil
|
|
|
import struct
|
|
|
+import time
|
|
|
from binascii import hexlify, unhexlify
|
|
|
from collections import defaultdict
|
|
|
from configparser import ConfigParser
|
|
@@ -1164,20 +1165,21 @@ class LoggedIO:
|
|
|
|
|
|
def __init__(self, path, limit, segments_per_dir, capacity=90):
|
|
|
self.path = path
|
|
|
- self.fds = LRUCache(capacity,
|
|
|
- dispose=self.close_fd)
|
|
|
+ self.fds = LRUCache(capacity, dispose=self._close_fd)
|
|
|
self.segment = 0
|
|
|
self.limit = limit
|
|
|
self.segments_per_dir = segments_per_dir
|
|
|
self.offset = 0
|
|
|
self._write_fd = None
|
|
|
+ self._fds_cleaned = 0
|
|
|
|
|
|
def close(self):
|
|
|
self.close_segment()
|
|
|
self.fds.clear()
|
|
|
self.fds = None # Just to make sure we're disabled
|
|
|
|
|
|
- def close_fd(self, fd):
|
|
|
+ def _close_fd(self, ts_fd):
|
|
|
+ ts, fd = ts_fd
|
|
|
safe_fadvise(fd.fileno(), 0, 0, 'DONTNEED')
|
|
|
fd.close()
|
|
|
|
|
@@ -1291,13 +1293,37 @@ class LoggedIO:
|
|
|
return self._write_fd
|
|
|
|
|
|
def get_fd(self, segment):
|
|
|
- try:
|
|
|
- return self.fds[segment]
|
|
|
- except KeyError:
|
|
|
+ # note: get_fd() returns a fd with undefined file pointer position,
|
|
|
+ # so callers must always seek() to desired position afterwards.
|
|
|
+ now = time.monotonic()
|
|
|
+
|
|
|
+ def open_fd():
|
|
|
fd = open(self.segment_filename(segment), 'rb')
|
|
|
- self.fds[segment] = fd
|
|
|
+ self.fds[segment] = (now, fd)
|
|
|
return fd
|
|
|
|
|
|
+ def clean_old():
|
|
|
+ # we regularly get rid of all old FDs here:
|
|
|
+ if now - self._fds_cleaned > FD_MAX_AGE // 8:
|
|
|
+ self._fds_cleaned = now
|
|
|
+ for k, ts_fd in list(self.fds.items()):
|
|
|
+ ts, fd = ts_fd
|
|
|
+ if now - ts > FD_MAX_AGE:
|
|
|
+ # we do not want to touch long-unused file handles to
|
|
|
+ # avoid ESTALE issues (e.g. on network filesystems).
|
|
|
+ del self.fds[k]
|
|
|
+
|
|
|
+ clean_old()
|
|
|
+ try:
|
|
|
+ ts, fd = self.fds[segment]
|
|
|
+ except KeyError:
|
|
|
+ fd = open_fd()
|
|
|
+ else:
|
|
|
+ # we only have fresh enough stuff here.
|
|
|
+ # update the timestamp of the lru cache entry.
|
|
|
+ self.fds.upd(segment, (now, fd))
|
|
|
+ return fd
|
|
|
+
|
|
|
def close_segment(self):
|
|
|
# set self._write_fd to None early to guard against reentry from error handling code paths:
|
|
|
fd, self._write_fd = self._write_fd, None
|