Selaa lähdekoodia

More bandstore optimizations

Jonas Borgström 14 vuotta sitten
vanhempi
sitoutus
5cb2d80cb5
4 muutettua tiedostoa jossa 46 lisäystä ja 42 poistoa
  1. 1 6
      dedupestore/archiver.py
  2. 33 32
      dedupestore/bandstore.py
  3. 2 2
      dedupestore/cache.py
  4. 10 2
      dedupestore/test.py

+ 1 - 6
dedupestore/archiver.py

@@ -121,8 +121,6 @@ class Archive(object):
                         if hashlib.sha256(data).digest() != cid:
                             raise Exception('Invalid chunk checksum')
                         data = zlib.decompress(data)
-#                        if hashlib.sha256(data).digest() != id:
-#                            raise Exception('Invalid chunk checksum')
                         fd.write(data)
 
     def verify(self):
@@ -138,10 +136,6 @@ class Archive(object):
                     if (hashlib.sha256(data).digest() != cid):
                         logging.error('%s ... ERROR', item['path'])
                         break
-#                    if (hashlib.sha256(data).digest() != cid or
-#                        hashlib.sha256(zlib.decompress(data)).digest() != id):
-#                        logging.error('%s ... ERROR', item['path'])
-#                        break
                 else:
                     logging.info('%s ... OK', item['path'])
 
@@ -198,6 +192,7 @@ class Archive(object):
             fd = open(path, 'rb')
         except IOError, e:
             logging.error(e)
+            return
         with fd:
             path = path.lstrip('/\\:')
             logging.info(path)

+ 33 - 32
dedupestore/bandstore.py

@@ -1,4 +1,3 @@
-#!/usr/bin/env python
 import os
 import tempfile
 import shutil
@@ -7,6 +6,8 @@ import sqlite3
 import uuid
 import fcntl
 
+Binary = sqlite3.Binary
+
 
 class BandStore(object):
     """
@@ -40,6 +41,7 @@ class BandStore(object):
         fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
         self.path = path
         self.cnx = sqlite3.connect(db_path)
+        self.cnx.text_factory = str
         self.cursor = self.cnx.cursor()
         self._begin()
 
@@ -62,17 +64,14 @@ class BandStore(object):
         while os.path.exists(self.band_filename(band)):
             os.unlink(self.band_filename(band))
             band += 1
-        self.delete_bands()
 
     def create(self, path):
         os.mkdir(path)
         os.mkdir(os.path.join(path, 'bands'))
         cnx = sqlite3.connect(os.path.join(path, 'dedupestore.db'))
-        cnx.execute('CREATE TABLE objects(ns TEXT NOT NULL, id NOT NULL, '
+        cnx.execute('CREATE TABLE objects(ns BINARY NOT NULL, id BINARY NOT NULL, '
                     'band NOT NULL, offset NOT NULL, size NOT NULL)')
         cnx.execute('CREATE UNIQUE INDEX objects_pk ON objects(ns, id)')
-        cnx.execute('CREATE TABLE to_delete(band NOT NULL)')
-        cnx.execute('CREATE UNIQUE INDEX to_delete_pk ON to_delete(band)')
         cnx.execute('CREATE TABLE system(uuid NOT NULL, tid NOT NULL, '
                     'nextband NOT NULL, version NOT NULL, bandlimit NOT NULL)')
         cnx.execute('INSERT INTO system VALUES(?,?,?,?,?)',
@@ -89,28 +88,24 @@ class BandStore(object):
         """
         """
         self.band = None
-        self.cursor.executemany('INSERT INTO to_delete(band) VALUES(?)',
-                                [[d] for d in self.to_delete])
-        self.cursor.execute('UPDATE system SET tid=tid+1, nextband=?',
-                            (self.nextband,))
-        self.cnx.commit()
+        self.delete_bands()
         self.tid += 1
         self._begin()
 
     def delete_bands(self):
-        self.cursor.execute('SELECT band FROM to_delete')
-        to_delete = [r[0] for r in self.cursor.fetchall()]
-        for b in to_delete:
+        for b in self.to_delete:
             objects = self.cursor.execute('SELECT ns, id, offset, size '
                                           'FROM objects WHERE band=? ORDER BY offset',
                                           (b,)).fetchall()
             for o in objects:
                 band, offset, size = self.store_data(self.retrieve_data(b, *o[2:]))
                 self.cursor.execute('UPDATE objects SET band=?, offset=?, size=? '
-                                    'WHERE ns=? AND id=?', (band, offset, size, o[0], o[1]))
-            self.cursor.execute('DELETE FROM to_delete WHERE band=?', (b,))
-            self.cursor.execute('UPDATE system SET nextband=?', (self.nextband,))
-            self.cnx.commit()
+                                    'WHERE ns=? AND id=?', (band, offset, size,
+                                    Binary(o[0]), Binary(o[1])))
+        self.cursor.execute('UPDATE system SET tid=tid+1, nextband=?',
+                            (self.nextband,))
+        self.cnx.commit()
+        for b in self.to_delete:
             os.unlink(self.band_filename(b))
 
     def rollback(self):
@@ -123,7 +118,7 @@ class BandStore(object):
         """
         """
         self.cursor.execute('SELECT band, offset, size FROM objects WHERE ns=? and id=?',
-                            (ns.encode('hex'), id.encode('hex')))
+                            (Binary(ns), Binary(id)))
         row = self.cursor.fetchone()
         if row:
             return self.retrieve_data(*row)
@@ -134,6 +129,8 @@ class BandStore(object):
         return os.path.join(self.path, 'bands', str(band / 1000), str(band))
 
     def retrieve_data(self, band, offset, size):
+        if self.write_band == band:
+            self.write_fd.flush()
         if self.read_band != band:
             self.read_band = band
             if self.read_fd:
@@ -147,7 +144,9 @@ class BandStore(object):
             self.write_band = self.nextband
             self.nextband += 1
             if self.write_band % 1000 == 0:
-                os.mkdir(os.path.join(self.path, 'bands', str(self.write_band / 1000)))
+                path = os.path.join(self.path, 'bands', str(self.write_band / 1000))
+                if not os.path.exists(path):
+                    os.mkdir(path)
             assert not os.path.exists(self.band_filename(self.write_band))
             self.write_fd = open(self.band_filename(self.write_band), 'ab')
         band = self.write_band
@@ -164,8 +163,7 @@ class BandStore(object):
             band, offset, size = self.store_data(data)
             self.cursor.execute('INSERT INTO objects (ns, id, band, offset, size) '
                                 'VALUES(?, ?, ?, ?, ?)',
-                                (ns.encode('hex'), id.encode('hex'),
-                                band, offset, size))
+                                (Binary(ns), Binary(id), band, offset, size))
         except sqlite3.IntegrityError:
             raise self.AlreadyExists
 
@@ -173,28 +171,28 @@ class BandStore(object):
         """
         """
         self.cursor.execute('SELECT band FROM objects WHERE ns=? and id=?',
-                            (ns.encode('hex'), id.encode('hex')))
+                            (Binary(ns), Binary(id)))
         row = self.cursor.fetchone()
         if not row:
             raise self.DoesNotExist
         self.cursor.execute('DELETE FROM objects WHERE ns=? AND id=?',
-                           (ns.encode('hex'), id.encode('hex')))
+                           (Binary(ns), Binary(id)))
         self.to_delete.add(row[0])
 
     def list(self, ns, prefix='', marker=None, max_keys=1000000):
         """
         """
-        condition = ''
+        sql = 'SELECT id FROM objects WHERE ns=:ns'
+        args = dict(ns=Binary(ns))
         if prefix:
-            condition += ' AND id LIKE :prefix'
+            args['prefix'] = Binary(prefix)
+            args['end'] = Binary(prefix + chr(255))
+            sql += ' AND id >= :prefix AND id < :end'
         if marker:
-            condition += ' AND id >= :marker'
-        args = dict(ns=ns.encode('hex'), prefix=prefix.encode('hex') + '%',
-                    marker=marker and marker.encode('hex'))
-        for row in self.cursor.execute('SELECT id FROM objects WHERE '
-                                'ns=:ns ' + condition + ' LIMIT ' + str(max_keys),
-                                args):
-            yield row[0].decode('hex')
+            sql += ' AND id >= :marker'
+            args['marker'] = Binary(marker)
+        for row in self.cursor.execute(sql + ' LIMIT ' + str(max_keys), args):
+            yield str(row[0])
 
 
 class BandStoreTestCase(unittest.TestCase):
@@ -250,5 +248,8 @@ class BandStoreTestCase(unittest.TestCase):
             ['SOMEID12', 'SOMEID123'])
 
 
+def suite():
+    return unittest.TestLoader().loadTestsFromTestCase(BandStoreTestCase)
+
 if __name__ == '__main__':
     unittest.main()

+ 2 - 2
dedupestore/cache.py

@@ -3,8 +3,8 @@ import os
 import zlib
 import msgpack
 
-NS_ARCHIVES = 'ARCHIVES'
-NS_CHUNKS = 'CHUNKS'
+NS_ARCHIVES = 'A'
+NS_CHUNKS = 'C'
 
 
 class Cache(object):

+ 10 - 2
dedupestore/test.py

@@ -2,7 +2,9 @@ import os
 import shutil
 import tempfile
 import unittest
-from archiver import Archiver
+
+from .archiver import Archiver
+from . import bandstore
 
 
 class Test(unittest.TestCase):
@@ -54,5 +56,11 @@ class Test(unittest.TestCase):
         self.assertEqual(os.readlink(os.path.join(dest, 'link')), '/tmp/somewhere')
 
 
+def suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.TestLoader().loadTestsFromTestCase(Test))
+    suite.addTest(bandstore.suite())
+    return suite
+
 if __name__ == '__main__':
-    unittest.main()
+    unittest.TextTestRunner(verbosity=2).run(suite())