| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 | 
							- from UserDict import DictMixin
 
- from heapq import heappush, heapify, heapreplace, heappop
 
- import unittest
 
- class LRUCache(DictMixin):
 
-     """Heap queue based Least Recently Used Cache implementation
 
-     """
 
-     class Node(object):
 
-         """Internal cache node
 
-         """
 
-         __slots__ = ('key', 'value', 't')
 
-         def __init__(self, key, value, t):
 
-             self.key = key
 
-             self.value = value
 
-             self.t = t
 
-         def __cmp__(self, other):
 
-             return cmp(self.t, other.t)
 
-     def __init__(self, size):
 
-         self._heap = []
 
-         self._dict = {}
 
-         self.size = size
 
-         self._t = 0
 
-     def __setitem__(self, key, value):
 
-         self._t += 1
 
-         try:
 
-             node = self._dict[key]
 
-             node.value = value
 
-             node.t = self._t
 
-             heapify(self._heap)
 
-         except KeyError:
 
-             node = self.Node(key, value, self._t)
 
-             self._dict[key] = node
 
-             if len(self) < self.size:
 
-                 heappush(self._heap, node)
 
-             else:
 
-                 old = heapreplace(self._heap, node)
 
-                 del self._dict[old.key]
 
-     def __getitem__(self, key):
 
-         node = self._dict[key]
 
-         self[key] = node.value
 
-         return node.value
 
-     def __delitem__(self, key):
 
-         node = self._dict[key]
 
-         del self._dict[key]
 
-         self._heap.remove(node)
 
-         heapify(self._heap)
 
-     def __iter__(self):
 
-         copy = self._heap[:]
 
-         while copy:
 
-             yield heappop(copy).key
 
-     def iteritems(self):
 
-         copy = self._heap[:]
 
-         while copy:
 
-             node = heappop(copy)
 
-             yield node.key, node.value
 
-     def keys(self):
 
-         return self._dict.keys()
 
-     def __contains__(self, key):
 
-         return key in self._dict
 
-     def __len__(self):
 
-         return len(self._heap)
 
- class LRUCacheTestCase(unittest.TestCase):
 
-     def test(self):
 
-         c = LRUCache(2)
 
-         self.assertEqual(len(c), 0)
 
-         for i, x in enumerate('abc'):
 
-             c[x] = i
 
-         self.assertEqual(len(c), 2)
 
-         self.assertEqual(list(c), ['b', 'c'])
 
-         self.assertEqual(list(c.iteritems()), [('b', 1), ('c', 2)])
 
-         self.assertEqual(False, 'a' in c)
 
-         self.assertEqual(True, 'b' in c)
 
-         self.assertRaises(KeyError, lambda: c['a'])
 
-         self.assertEqual(c['b'], 1)
 
-         self.assertEqual(c['c'], 2)
 
-         c['d'] = 3
 
-         self.assertEqual(len(c), 2)
 
-         self.assertEqual(c['c'], 2)
 
-         self.assertEqual(c['d'], 3)
 
-         c['c'] = 22
 
-         c['e'] = 4
 
-         self.assertEqual(len(c), 2)
 
-         self.assertRaises(KeyError, lambda: c['d'])
 
-         self.assertEqual(c['c'], 22)
 
-         self.assertEqual(c['e'], 4)
 
-         del c['c']
 
-         self.assertEqual(len(c), 1)
 
-         self.assertRaises(KeyError, lambda: c['c'])
 
-         self.assertEqual(c['e'], 4)
 
- def suite():
 
-     return unittest.TestLoader().loadTestsFromTestCase(LRUCacheTestCase)
 
- if __name__ == '__main__':
 
-     unittest.main()
 
 
  |