|
@@ -1376,39 +1376,51 @@ class LoggedIO:
|
|
|
safe_fadvise(fd.fileno(), 0, 0, "DONTNEED")
|
|
|
fd.close()
|
|
|
|
|
|
+ def get_segment_dirs(self, data_dir, start_index=MIN_SEGMENT_DIR_INDEX, end_index=MAX_SEGMENT_DIR_INDEX):
|
|
|
+ """Returns generator yielding required segment dirs in data_dir as `os.DirEntry` objects.
|
|
|
+ Start and end are inclusive.
|
|
|
+ """
|
|
|
+ segment_dirs = (
|
|
|
+ f
|
|
|
+ for f in os.scandir(data_dir)
|
|
|
+ if f.is_dir() and f.name.isdigit() and start_index <= int(f.name) <= end_index
|
|
|
+ )
|
|
|
+ return segment_dirs
|
|
|
+
|
|
|
+ def get_segment_files(self, segment_dir, start_index=MIN_SEGMENT_INDEX, end_index=MAX_SEGMENT_INDEX):
|
|
|
+ """Returns generator yielding required segment files in segment_dir as `os.DirEntry` objects.
|
|
|
+ Start and end are inclusive.
|
|
|
+ """
|
|
|
+ segment_files = (
|
|
|
+ f
|
|
|
+ for f in os.scandir(segment_dir)
|
|
|
+ if f.is_file() and f.name.isdigit() and start_index <= int(f.name) <= end_index
|
|
|
+ )
|
|
|
+ return segment_files
|
|
|
+
|
|
|
def segment_iterator(self, start_segment=None, end_segment=None, reverse=False):
|
|
|
if start_segment is None:
|
|
|
- start_segment = 0 if not reverse else 2**32 - 1
|
|
|
+ start_segment = MIN_SEGMENT_INDEX if not reverse else MAX_SEGMENT_INDEX
|
|
|
if end_segment is None:
|
|
|
- end_segment = 2**32 - 1 if not reverse else 0
|
|
|
+ end_segment = MAX_SEGMENT_INDEX if not reverse else MIN_SEGMENT_INDEX
|
|
|
data_path = os.path.join(self.path, "data")
|
|
|
start_segment_dir = start_segment // self.segments_per_dir
|
|
|
end_segment_dir = end_segment // self.segments_per_dir
|
|
|
- dirs = os.listdir(data_path)
|
|
|
if not reverse:
|
|
|
- dirs = [dir for dir in dirs if dir.isdigit() and start_segment_dir <= int(dir) <= end_segment_dir]
|
|
|
+ dirs = self.get_segment_dirs(data_path, start_index=start_segment_dir, end_index=end_segment_dir)
|
|
|
else:
|
|
|
- dirs = [dir for dir in dirs if dir.isdigit() and start_segment_dir >= int(dir) >= end_segment_dir]
|
|
|
- dirs = sorted(dirs, key=int, reverse=reverse)
|
|
|
+ dirs = self.get_segment_dirs(data_path, start_index=end_segment_dir, end_index=start_segment_dir)
|
|
|
+ dirs = sorted(dirs, key=lambda dir: int(dir.name), reverse=reverse)
|
|
|
for dir in dirs:
|
|
|
- filenames = os.listdir(os.path.join(data_path, dir))
|
|
|
if not reverse:
|
|
|
- filenames = [
|
|
|
- filename
|
|
|
- for filename in filenames
|
|
|
- if filename.isdigit() and start_segment <= int(filename) <= end_segment
|
|
|
- ]
|
|
|
+ files = self.get_segment_files(dir, start_index=start_segment, end_index=end_segment)
|
|
|
else:
|
|
|
- filenames = [
|
|
|
- filename
|
|
|
- for filename in filenames
|
|
|
- if filename.isdigit() and start_segment >= int(filename) >= end_segment
|
|
|
- ]
|
|
|
- filenames = sorted(filenames, key=int, reverse=reverse)
|
|
|
- for filename in filenames:
|
|
|
+ files = self.get_segment_files(dir, start_index=end_segment, end_index=start_segment)
|
|
|
+ files = sorted(files, key=lambda file: int(file.name), reverse=reverse)
|
|
|
+ for file in files:
|
|
|
# Note: Do not filter out logically deleted segments (see "File system interaction" above),
|
|
|
# since this is used by cleanup and txn state detection as well.
|
|
|
- yield int(filename), os.path.join(data_path, dir, filename)
|
|
|
+ yield int(file.name), file.path
|
|
|
|
|
|
def get_latest_segment(self):
|
|
|
for segment, filename in self.segment_iterator(reverse=True):
|