lrucache.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from UserDict import DictMixin
  2. from heapq import heappush, heapify, heapreplace, heappop
  3. import unittest
  4. class LRUCache(DictMixin):
  5. """Heap queue based Least Recently Used Cache implementation
  6. """
  7. class Node(object):
  8. """Internal cache node
  9. """
  10. __slots__ = ('key', 'value', 't')
  11. def __init__(self, key, value, t):
  12. self.key = key
  13. self.value = value
  14. self.t = t
  15. def __cmp__(self, other):
  16. return cmp(self.t, other.t)
  17. def __init__(self, size):
  18. self.size = size
  19. self._t = 0
  20. self.clear()
  21. def clear(self):
  22. self._heap = []
  23. self._dict = {}
  24. def __setitem__(self, key, value):
  25. self._t += 1
  26. try:
  27. node = self._dict[key]
  28. node.value = value
  29. node.t = self._t
  30. heapify(self._heap)
  31. except KeyError:
  32. node = self.Node(key, value, self._t)
  33. self._dict[key] = node
  34. if len(self) < self.size:
  35. heappush(self._heap, node)
  36. else:
  37. old = heapreplace(self._heap, node)
  38. del self._dict[old.key]
  39. def __getitem__(self, key):
  40. node = self._dict[key]
  41. self[key] = node.value
  42. return node.value
  43. def __delitem__(self, key):
  44. node = self._dict[key]
  45. del self._dict[key]
  46. self._heap.remove(node)
  47. heapify(self._heap)
  48. def __iter__(self):
  49. copy = self._heap[:]
  50. while copy:
  51. yield heappop(copy).key
  52. def iteritems(self):
  53. copy = self._heap[:]
  54. while copy:
  55. node = heappop(copy)
  56. yield node.key, node.value
  57. def keys(self):
  58. return self._dict.keys()
  59. def __contains__(self, key):
  60. return key in self._dict
  61. def __len__(self):
  62. return len(self._heap)
  63. class LRUCacheTestCase(unittest.TestCase):
  64. def test(self):
  65. c = LRUCache(2)
  66. self.assertEqual(len(c), 0)
  67. for i, x in enumerate('abc'):
  68. c[x] = i
  69. self.assertEqual(len(c), 2)
  70. self.assertEqual(list(c), ['b', 'c'])
  71. self.assertEqual(list(c.iteritems()), [('b', 1), ('c', 2)])
  72. self.assertEqual(False, 'a' in c)
  73. self.assertEqual(True, 'b' in c)
  74. self.assertRaises(KeyError, lambda: c['a'])
  75. self.assertEqual(c['b'], 1)
  76. self.assertEqual(c['c'], 2)
  77. c['d'] = 3
  78. self.assertEqual(len(c), 2)
  79. self.assertEqual(c['c'], 2)
  80. self.assertEqual(c['d'], 3)
  81. c['c'] = 22
  82. c['e'] = 4
  83. self.assertEqual(len(c), 2)
  84. self.assertRaises(KeyError, lambda: c['d'])
  85. self.assertEqual(c['c'], 22)
  86. self.assertEqual(c['e'], 4)
  87. del c['c']
  88. self.assertEqual(len(c), 1)
  89. self.assertRaises(KeyError, lambda: c['c'])
  90. self.assertEqual(c['e'], 4)
  91. def suite():
  92. return unittest.TestLoader().loadTestsFromTestCase(LRUCacheTestCase)
  93. if __name__ == '__main__':
  94. unittest.main()