hashindex.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import hashlib
  2. import os
  3. import tempfile
  4. from ..hashindex import NSIndex, ChunkIndex
  5. from . import BaseTestCase
  6. def H(x):
  7. # make some 32byte long thing that depends on x
  8. return bytes('%-0.32d' % x, 'ascii')
  9. class HashIndexTestCase(BaseTestCase):
  10. def _generic_test(self, cls, make_value, sha):
  11. idx = cls()
  12. self.assert_equal(len(idx), 0)
  13. # Test set
  14. for x in range(100):
  15. idx[bytes('%-32d' % x, 'ascii')] = make_value(x)
  16. self.assert_equal(len(idx), 100)
  17. for x in range(100):
  18. self.assert_equal(idx[bytes('%-32d' % x, 'ascii')], make_value(x))
  19. # Test update
  20. for x in range(100):
  21. idx[bytes('%-32d' % x, 'ascii')] = make_value(x * 2)
  22. self.assert_equal(len(idx), 100)
  23. for x in range(100):
  24. self.assert_equal(idx[bytes('%-32d' % x, 'ascii')], make_value(x * 2))
  25. # Test delete
  26. for x in range(50):
  27. del idx[bytes('%-32d' % x, 'ascii')]
  28. self.assert_equal(len(idx), 50)
  29. idx_name = tempfile.NamedTemporaryFile()
  30. idx.write(idx_name.name)
  31. del idx
  32. # Verify file contents
  33. with open(idx_name.name, 'rb') as fd:
  34. self.assert_equal(hashlib.sha256(fd.read()).hexdigest(), sha)
  35. # Make sure we can open the file
  36. idx = cls.read(idx_name.name)
  37. self.assert_equal(len(idx), 50)
  38. for x in range(50, 100):
  39. self.assert_equal(idx[bytes('%-32d' % x, 'ascii')], make_value(x * 2))
  40. idx.clear()
  41. self.assert_equal(len(idx), 0)
  42. idx.write(idx_name.name)
  43. del idx
  44. self.assert_equal(len(cls.read(idx_name.name)), 0)
  45. def test_nsindex(self):
  46. self._generic_test(NSIndex, lambda x: (x, x),
  47. '861d6d60069ea45e39d36bed2bdc1d0c07981e0641955f897ac6848be429abac')
  48. def test_chunkindex(self):
  49. self._generic_test(ChunkIndex, lambda x: (x, x, x),
  50. '69464bd0ebbc5866b9f95d838bc48617d21bfe3dcf294682a5c21a2ef6b9dc0b')
  51. def test_resize(self):
  52. n = 2000 # Must be >= MIN_BUCKETS
  53. idx_name = tempfile.NamedTemporaryFile()
  54. idx = NSIndex()
  55. idx.write(idx_name.name)
  56. initial_size = os.path.getsize(idx_name.name)
  57. self.assert_equal(len(idx), 0)
  58. for x in range(n):
  59. idx[bytes('%-32d' % x, 'ascii')] = x, x
  60. idx.write(idx_name.name)
  61. self.assert_true(initial_size < os.path.getsize(idx_name.name))
  62. for x in range(n):
  63. del idx[bytes('%-32d' % x, 'ascii')]
  64. self.assert_equal(len(idx), 0)
  65. idx.write(idx_name.name)
  66. self.assert_equal(initial_size, os.path.getsize(idx_name.name))
  67. def test_iteritems(self):
  68. idx = NSIndex()
  69. for x in range(100):
  70. idx[bytes('%-0.32d' % x, 'ascii')] = x, x
  71. all = list(idx.iteritems())
  72. self.assert_equal(len(all), 100)
  73. second_half = list(idx.iteritems(marker=all[49][0]))
  74. self.assert_equal(len(second_half), 50)
  75. self.assert_equal(second_half, all[50:])
  76. def test_chunkindex_merge(self):
  77. idx1 = ChunkIndex()
  78. idx1[H(1)] = 1, 100, 100
  79. idx1[H(2)] = 2, 200, 200
  80. idx1[H(3)] = 3, 300, 300
  81. # no H(4) entry
  82. idx2 = ChunkIndex()
  83. idx2[H(1)] = 4, 100, 100
  84. idx2[H(2)] = 5, 200, 200
  85. # no H(3) entry
  86. idx2[H(4)] = 6, 400, 400
  87. idx1.merge(idx2)
  88. assert idx1[H(1)] == (5, 100, 100)
  89. assert idx1[H(2)] == (7, 200, 200)
  90. assert idx1[H(3)] == (3, 300, 300)
  91. assert idx1[H(4)] == (6, 400, 400)