Browse Source

Added basic unit tests.

Jonas Borgström 14 years ago
parent
commit
5cd5d761cd
2 changed files with 77 additions and 4 deletions
  1. 30 4
      dedupestore/archiver.py
  2. 47 0
      dedupestore/test.py

+ 30 - 4
dedupestore/archiver.py

@@ -4,6 +4,7 @@ import logging
 import zlib
 import cPickle
 import argparse
+import sys
 
 from chunkifier import chunkify
 from cache import Cache, NS_ARCHIVES, NS_CHUNKS
@@ -12,6 +13,17 @@ from helpers import location_validator, pretty_size
 
 CHUNK_SIZE = 55001
 
+class LevelFilter(logging.Filter):
+
+    def __init__(self, *args, **kwargs):
+        super(LevelFilter, self).__init__(args, **kwargs)
+        self.count = {}
+
+    def filter(self, record):
+        self.count.setdefault(record.levelname, 0)
+        self.count[record.levelname] += 1
+        return record
+
 
 class Archive(object):
 
@@ -172,20 +184,29 @@ class Archiver(object):
         cache = Cache(store)
         return store, cache
 
+    def exit_code_from_logger(self):
+        if not self.level_filter.count.get('ERROR'):
+            return 0
+        else:
+            return 1
+
     def do_create(self, args):
         store, cache = self.open_store(args.archive)
         archive = Archive(store, cache)
         archive.create(args.archive.archive, args.paths, cache)
+        return self.exit_code_from_logger()
 
     def do_extract(self, args):
         store, cache = self.open_store(args.archive)
         archive = Archive(store, cache, args.archive.archive)
         archive.extract(args.dest)
+        return self.exit_code_from_logger()
 
     def do_delete(self, args):
         store, cache = self.open_store(args.archive)
         archive = Archive(store, cache, args.archive.archive)
         archive.delete(cache)
+        return self.exit_code_from_logger()
 
     def do_list(self, args):
         store, cache = self.open_store(args.src)
@@ -195,11 +216,13 @@ class Archiver(object):
         else:
             for archive in sorted(cache.archives):
                 print archive
+        return self.exit_code_from_logger()
 
     def do_verify(self, args):
         store, cache = self.open_store(args.archive)
         archive = Archive(store, cache, args.archive.archive)
         archive.verify()
+        return self.exit_code_from_logger()
 
     def do_info(self, args):
         store, cache = self.open_store(args.archive)
@@ -208,8 +231,9 @@ class Archiver(object):
         print 'Original size:', pretty_size(stats['osize'])
         print 'Compressed size:', pretty_size(stats['csize'])
         print 'Unique data:', pretty_size(stats['usize'])
+        return self.exit_code_from_logger()
 
-    def run(self):
+    def run(self, args=None):
         parser = argparse.ArgumentParser(description='Dedupestore')
         parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
                             default=False,
@@ -255,16 +279,18 @@ class Archiver(object):
                                type=location_validator(archive=True),
                                help='Archive to display information about')
 
-        args = parser.parse_args()
+        args = parser.parse_args(args)
         if args.verbose:
             logging.basicConfig(level=logging.INFO, format='%(message)s')
         else:
             logging.basicConfig(level=logging.WARNING, format='%(message)s')
-        args.func(args)
+        self.level_filter = LevelFilter()
+        logging.getLogger('').addFilter(self.level_filter)
+        return args.func(args)
 
 def main():
     archiver = Archiver()
-    archiver.run()
+    sys.exit(archiver.run())
 
 if __name__ == '__main__':
     main()

+ 47 - 0
dedupestore/test.py

@@ -0,0 +1,47 @@
+import os
+import shutil
+import tempfile
+import unittest
+from archiver import Archiver
+
+
+class Test(unittest.TestCase):
+
+    def setUp(self):
+        self.archiver = Archiver()
+        self.tmpdir = tempfile.mkdtemp()
+        self.store_path = os.path.join(self.tmpdir, 'store')
+
+    def tearDown(self):
+        shutil.rmtree(self.tmpdir)
+
+    def dedupestore(self, *args, **kwargs):
+        exit_code = kwargs.get('exit_code', 0)
+        self.assertEqual(exit_code, self.archiver.run(args))
+
+    def create_src_archive(self, name):
+        src_dir = os.path.join(os.getcwd(), os.path.dirname(__file__))
+        self.dedupestore('create', self.store_path + '::' + name, src_dir)
+
+    def test_basic_functionality(self):
+        self.create_src_archive('test')
+        self.dedupestore('list', self.store_path)
+        self.dedupestore('list', self.store_path + '::test')
+        self.dedupestore('info', self.store_path + '::test')
+        self.dedupestore('verify', self.store_path + '::test')
+        dest_dir = os.path.join(self.tmpdir, 'dest')
+        self.dedupestore('extract', self.store_path + '::test', dest_dir)
+        self.dedupestore('delete', self.store_path + '::test')
+
+    def test_corrupted_store(self):
+        self.create_src_archive('test')
+        self.dedupestore('verify', self.store_path + '::test')
+        fd = open(os.path.join(self.tmpdir, 'store', 'bands', '0'), 'r+')
+        fd.seek(1000)
+        fd.write('X')
+        fd.close()
+        self.dedupestore('verify', self.store_path + '::test', exit_code=1)
+
+
+if __name__ == '__main__':
+    unittest.main()