Răsfoiți Sursa

Store: rename bands to segments

Jonas Borgström 14 ani în urmă
părinte
comite
4f7f1fdbe0
2 a modificat fișierele cu 88 adăugiri și 88 ștergeri
  1. 87 87
      darc/store.py
  2. 1 1
      darc/test.py

+ 87 - 87
darc/store.py

@@ -21,12 +21,12 @@ class Store(object):
     On disk layout:
     dir/README
     dir/config
-    dir/bands/<X / BANDS_PER_DIR>/<X>
-    dir/band
+    dir/data/<X / SEGMENTS_PER_DIR>/<X>
+    dir/segments
     dir/index
     """
-    DEFAULT_MAX_BAND_SIZE = 5 * 1024 * 1024
-    DEFAULT_BANDS_PER_DIR = 10000
+    DEFAULT_MAX_SEGMENT_SIZE = 5 * 1024 * 1024
+    DEFAULT_SEGMENTS_PER_DIR = 10000
 
     class DoesNotExist(KeyError):
         """Requested key does not exist"""
@@ -47,18 +47,18 @@ class Store(object):
             os.mkdir(path)
         with open(os.path.join(path, 'README'), 'wb') as fd:
             fd.write('This is a DARC store')
-        os.mkdir(os.path.join(path, 'bands'))
+        os.mkdir(os.path.join(path, 'data'))
         config = RawConfigParser()
         config.add_section('store')
         config.set('store', 'version', '1')
-        config.set('store', 'bands_per_dir', self.DEFAULT_BANDS_PER_DIR)
-        config.set('store', 'max_band_size', self.DEFAULT_MAX_BAND_SIZE)
-        config.set('store', 'next_band', '0')
+        config.set('store', 'segments_per_dir', self.DEFAULT_SEGMENTS_PER_DIR)
+        config.set('store', 'max_segment_size', self.DEFAULT_MAX_SEGMENT_SIZE)
+        config.set('store', 'next_segment', '0')
         config.add_section('meta')
         config.set('meta', 'manifest', '')
         config.set('meta', 'id', os.urandom(32).encode('hex'))
         NSIndex.create(os.path.join(path, 'index'))
-        self.write_dict(os.path.join(path, 'band'), {})
+        self.write_dict(os.path.join(path, 'segments'), {})
         with open(os.path.join(path, 'config'), 'w') as fd:
             config.write(fd)
 
@@ -79,14 +79,14 @@ class Store(object):
             fd.write(msgpack.packb(d))
         os.rename(filename+'.tmp', filename)
 
-    def delete_bands(self):
+    def delete_segments(self):
         delete_path = os.path.join(self.path, 'delete')
         if os.path.exists(delete_path):
-            bands = self.read_dict(os.path.join(self.path, 'band'))
-            for band in self.read_dict(delete_path):
-                assert bands.pop(band, 0) == 0
-                self.io.delete_band(band, missing_ok=True)
-            self.write_dict(os.path.join(self.path, 'band'), bands)
+            segments = self.read_dict(os.path.join(self.path, 'segments'))
+            for segment in self.read_dict(delete_path):
+                assert segments.pop(segment, 0) == 0
+                self.io.delete_segment(segment, missing_ok=True)
+            self.write_dict(os.path.join(self.path, 'segments'), segments)
 
     def begin_txn(self):
         txn_dir = os.path.join(self.path, 'txn.tmp')
@@ -94,7 +94,7 @@ class Store(object):
         os.mkdir(txn_dir)
         shutil.copy(os.path.join(self.path, 'config'), txn_dir)
         shutil.copy(os.path.join(self.path, 'index'), txn_dir)
-        shutil.copy(os.path.join(self.path, 'band'), txn_dir)
+        shutil.copy(os.path.join(self.path, 'segments'), txn_dir)
         os.rename(os.path.join(self.path, 'txn.tmp'),
                   os.path.join(self.path, 'txn.active'))
         self.compact = set()
@@ -108,9 +108,9 @@ class Store(object):
         """Commit transaction
         """
         meta = meta or self.meta
-        self.compact_bands()
+        self.compact_segments()
         self.io.close()
-        self.config.set('store', 'next_band', self.io.band + 1)
+        self.config.set('store', 'next_segment', self.io.segment + 1)
         self.config.remove_section('meta')
         self.config.add_section('meta')
         for k, v in meta.items():
@@ -118,30 +118,30 @@ class Store(object):
         with open(os.path.join(self.path, 'config'), 'w') as fd:
             self.config.write(fd)
         self.index.flush()
-        self.write_dict(os.path.join(self.path, 'band'), self.bands)
+        self.write_dict(os.path.join(self.path, 'segments'), self.segments)
         # If we crash before this line, the transaction will be
         # rolled back by open()
         os.rename(os.path.join(self.path, 'txn.active'),
                   os.path.join(self.path, 'txn.commit'))
         self.rollback()
 
-    def compact_bands(self):
-        """Compact sparse bands by copying data into new bands
+    def compact_segments(self):
+        """Compact sparse segments by copying data into new segments
         """
         if not self.compact:
             return
-        self.io.close_band()
+        self.io.close_segment()
         def lookup(key):
-            return self.index.get(key, (-1, -1))[0] == band
-        bands = self.bands
-        for band in self.compact:
-            if bands[band] > 0:
-                for key, data in self.io.iter_objects(band, lookup):
-                    new_band, offset = self.io.write(key, data)
-                    self.index[key] = new_band, offset
-                    bands.setdefault(new_band, 0)
-                    bands[new_band] += 1
-                    bands[band] -= 1
+            return self.index.get(key, (-1, -1))[0] == segment
+        segments = self.segments
+        for segment in self.compact:
+            if segments[segment] > 0:
+                for key, data in self.io.iter_objects(segment, lookup):
+                    new_segment, offset = self.io.write(key, data)
+                    self.index[key] = new_segment, offset
+                    segments.setdefault(new_segment, 0)
+                    segments[new_segment] += 1
+                    segments[segment] -= 1
         self.write_dict(os.path.join(self.path, 'delete'), tuple(self.compact))
 
     def rollback(self):
@@ -149,7 +149,7 @@ class Store(object):
         """
         # Commit any half committed transaction
         if os.path.exists(os.path.join(self.path, 'txn.commit')):
-            self.delete_bands()
+            self.delete_segments()
             os.rename(os.path.join(self.path, 'txn.commit'),
                       os.path.join(self.path, 'txn.tmp'))
 
@@ -161,30 +161,30 @@ class Store(object):
         if os.path.exists(txn_dir):
             shutil.copy(os.path.join(txn_dir, 'config'), self.path)
             shutil.copy(os.path.join(txn_dir, 'index'), self.path)
-            shutil.copy(os.path.join(txn_dir, 'band'), self.path)
+            shutil.copy(os.path.join(txn_dir, 'segments'), self.path)
             os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
         # Remove partially removed transaction
         if os.path.exists(os.path.join(self.path, 'txn.tmp')):
             shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
         self.index = NSIndex(os.path.join(self.path, 'index'))
-        self.bands = self.read_dict(os.path.join(self.path, 'band'))
+        self.segments = self.read_dict(os.path.join(self.path, 'segments'))
         self.config = RawConfigParser()
         self.config.read(os.path.join(self.path, 'config'))
         if self.config.getint('store', 'version') != 1:
             raise Exception('%s Does not look like a darc store')
-        next_band = self.config.getint('store', 'next_band')
-        max_band_size = self.config.getint('store', 'max_band_size')
-        bands_per_dir = self.config.getint('store', 'bands_per_dir')
+        next_segment = self.config.getint('store', 'next_segment')
+        max_segment_size = self.config.getint('store', 'max_segment_size')
+        segments_per_dir = self.config.getint('store', 'segments_per_dir')
         self.meta = dict(self.config.items('meta'))
-        self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
+        self.io = SegmentIO(self.path, next_segment, max_segment_size, segments_per_dir)
         self.io.cleanup()
         self.txn_active = False
 
     @deferrable
     def get(self, id):
         try:
-            band, offset = self.index[id]
-            return self.io.read(band, offset, id)
+            segment, offset = self.index[id]
+            return self.io.read(segment, offset, id)
         except KeyError:
             raise self.DoesNotExist
 
@@ -193,24 +193,24 @@ class Store(object):
         if not self.txn_active:
             self.begin_txn()
         try:
-            band, _ = self.index[id]
-            self.bands[band] -= 1
-            self.compact.add(band)
+            segment, _ = self.index[id]
+            self.segments[segment] -= 1
+            self.compact.add(segment)
         except KeyError:
             pass
-        band, offset = self.io.write(id, data)
-        self.bands.setdefault(band, 0)
-        self.bands[band] += 1
-        self.index[id] = band, offset
+        segment, offset = self.io.write(id, data)
+        self.segments.setdefault(segment, 0)
+        self.segments[segment] += 1
+        self.index[id] = segment, offset
 
     @deferrable
     def delete(self, id):
         if not self.txn_active:
             self.begin_txn()
         try:
-            band, offset = self.index.pop(id)
-            self.bands[band] -= 1
-            self.compact.add(band)
+            segment, offset = self.index.pop(id)
+            self.segments[segment] -= 1
+            self.compact.add(segment)
         except KeyError:
             raise self.DoesNotExist
 
@@ -218,85 +218,85 @@ class Store(object):
         pass
 
 
-class BandIO(object):
+class SegmentIO(object):
 
     header_fmt = struct.Struct('<IBI32s')
     assert header_fmt.size == 41
 
-    def __init__(self, path, nextband, limit, bands_per_dir, capacity=100):
+    def __init__(self, path, next_segment, limit, segments_per_dir, capacity=100):
         self.path = path
         self.fds = LRUCache(capacity)
-        self.band = nextband
+        self.segment = next_segment
         self.limit = limit
-        self.bands_per_dir = bands_per_dir
+        self.segments_per_dir = segments_per_dir
         self.offset = 0
 
     def close(self):
-        for band in self.fds.keys():
-            self.fds.pop(band).close()
+        for segment in self.fds.keys():
+            self.fds.pop(segment).close()
 	self.fds = None # Just to make sure we're disabled
 
     def cleanup(self):
-        """Delete band files left by aborted transactions
+        """Delete segment files left by aborted transactions
         """
-        band = self.band
+        segment = self.segment
         while True:
-            filename = self.band_filename(band)
+            filename = self.segment_filename(segment)
             if not os.path.exists(filename):
                 break
             os.unlink(filename)
-            band += 1
+            segment += 1
 
-    def band_filename(self, band):
-        return os.path.join(self.path, 'bands', str(band / self.bands_per_dir), str(band))
+    def segment_filename(self, segment):
+        return os.path.join(self.path, 'data', str(segment / self.segments_per_dir), str(segment))
 
-    def get_fd(self, band, write=False):
+    def get_fd(self, segment, write=False):
         try:
-            return self.fds[band]
+            return self.fds[segment]
         except KeyError:
-            if write and band % 1000 == 0:
-                dirname = os.path.join(self.path, 'bands', str(band / self.bands_per_dir))
+            if write and segment % self.segments_per_dir == 0:
+                dirname = os.path.join(self.path, 'data', str(segment / self.segments_per_dir))
                 if not os.path.exists(dirname):
                     os.mkdir(dirname)
-            fd = open(self.band_filename(band), write and 'w+' or 'rb')
-            self.fds[band] = fd
+            fd = open(self.segment_filename(segment), write and 'w+' or 'rb')
+            self.fds[segment] = fd
             return fd
 
-    def delete_band(self, band, missing_ok=False):
+    def delete_segment(self, segment, missing_ok=False):
         try:
-            os.unlink(self.band_filename(band))
+            os.unlink(self.segment_filename(segment))
         except OSError, e:
             if not missing_ok or e.errno != errno.ENOENT:
                 raise
 
-    def read(self, band, offset, id):
-        fd = self.get_fd(band)
+    def read(self, segment, offset, id):
+        fd = self.get_fd(segment)
         fd.seek(offset)
         data = fd.read(self.header_fmt.size)
         size, magic, hash, id_ = self.header_fmt.unpack(data)
         if magic != 0 or id != id_:
-            raise IntegrityError('Invalid band entry header')
+            raise IntegrityError('Invalid segment entry header')
         data = fd.read(size - self.header_fmt.size)
         if crc32(data) & 0xffffffff != hash:
-            raise IntegrityError('Band checksum mismatch')
+            raise IntegrityError('Segment checksum mismatch')
         return data
 
-    def iter_objects(self, band, lookup):
-        fd = self.get_fd(band)
+    def iter_objects(self, segment, lookup):
+        fd = self.get_fd(segment)
         fd.seek(0)
-        if fd.read(8) != 'DARCBAND':
-            raise IntegrityError('Invalid band header')
+        if fd.read(8) != 'DSEGMENT':
+            raise IntegrityError('Invalid segment header')
         offset = 8
         data = fd.read(self.header_fmt.size)
         while data:
             size, magic, hash, key = self.header_fmt.unpack(data)
             if magic != 0:
-                raise IntegrityError('Unknown band entry header')
+                raise IntegrityError('Unknown segment entry header')
             offset += size
             if lookup(key):
                 data = fd.read(size - self.header_fmt.size)
                 if crc32(data) & 0xffffffff != hash:
-                    raise IntegrityError('Band checksum mismatch')
+                    raise IntegrityError('Segment checksum mismatch')
                 yield key, data
             else:
                 fd.seek(offset)
@@ -305,21 +305,21 @@ class BandIO(object):
     def write(self, id, data):
         size = len(data) + self.header_fmt.size
         if self.offset and self.offset + size > self.limit:
-            self.close_band()
-        fd = self.get_fd(self.band, write=True)
+            self.close_segment()
+        fd = self.get_fd(self.segment, write=True)
         fd.seek(self.offset)
         if self.offset == 0:
-            fd.write('DARCBAND')
+            fd.write('DSEGMENT')
             self.offset = 8
         offset = self.offset
         hash = crc32(data) & 0xffffffff
         fd.write(self.header_fmt.pack(size, 0, hash, id))
         fd.write(data)
         self.offset += size
-        return self.band, offset
+        return self.segment, offset
 
-    def close_band(self):
-        self.band += 1
+    def close_segment(self):
+        self.segment += 1
         self.offset = 0
 
 

+ 1 - 1
darc/test.py

@@ -109,7 +109,7 @@ class Test(unittest.TestCase):
     def test_corrupted_store(self):
         self.create_src_archive('test')
         self.darc('verify', self.store_path + '::test')
-        fd = open(os.path.join(self.tmpdir, 'store', 'bands', '0', '0'), 'r+')
+        fd = open(os.path.join(self.tmpdir, 'store', 'data', '0', '0'), 'r+')
         fd.seek(100)
         fd.write('X')
         fd.close()