crypto.pyx 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. """A thin OpenSSL wrapper
  2. This could be replaced by PyCrypto maybe?
  3. """
  4. from libc.stdlib cimport malloc, free
  5. from cpython.buffer cimport PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release
  6. API_VERSION = 3
  7. cdef extern from "openssl/rand.h":
  8. int RAND_bytes(unsigned char *buf, int num)
  9. cdef extern from "openssl/evp.h":
  10. ctypedef struct EVP_MD:
  11. pass
  12. ctypedef struct EVP_CIPHER:
  13. pass
  14. ctypedef struct EVP_CIPHER_CTX:
  15. unsigned char *iv
  16. pass
  17. ctypedef struct ENGINE:
  18. pass
  19. const EVP_CIPHER *EVP_aes_256_ctr()
  20. void EVP_CIPHER_CTX_init(EVP_CIPHER_CTX *a)
  21. void EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *a)
  22. int EVP_EncryptInit_ex(EVP_CIPHER_CTX *ctx, const EVP_CIPHER *cipher, ENGINE *impl,
  23. const unsigned char *key, const unsigned char *iv)
  24. int EVP_DecryptInit_ex(EVP_CIPHER_CTX *ctx, const EVP_CIPHER *cipher, ENGINE *impl,
  25. const unsigned char *key, const unsigned char *iv)
  26. int EVP_EncryptUpdate(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl,
  27. const unsigned char *in_, int inl)
  28. int EVP_DecryptUpdate(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl,
  29. const unsigned char *in_, int inl)
  30. int EVP_EncryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
  31. int EVP_DecryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
  32. EVP_MD *EVP_sha256() nogil
  33. cdef extern from "openssl/hmac.h":
  34. unsigned char *HMAC(const EVP_MD *evp_md,
  35. const void *key, int key_len,
  36. const unsigned char *data, int data_len,
  37. unsigned char *md, unsigned int *md_len) nogil
  38. import struct
  39. _int = struct.Struct('>I')
  40. _long = struct.Struct('>Q')
  41. bytes_to_int = lambda x, offset=0: _int.unpack_from(x, offset)[0]
  42. bytes_to_long = lambda x, offset=0: _long.unpack_from(x, offset)[0]
  43. long_to_bytes = lambda x: _long.pack(x)
  44. cdef Py_buffer ro_buffer(object data) except *:
  45. cdef Py_buffer view
  46. PyObject_GetBuffer(data, &view, PyBUF_SIMPLE)
  47. return view
  48. def num_aes_blocks(int length):
  49. """Return the number of AES blocks required to encrypt/decrypt *length* bytes of data.
  50. Note: this is only correct for modes without padding, like AES-CTR.
  51. """
  52. return (length + 15) // 16
  53. cdef class AES:
  54. """A thin wrapper around the OpenSSL EVP cipher API
  55. """
  56. cdef EVP_CIPHER_CTX ctx
  57. cdef int is_encrypt
  58. def __cinit__(self, is_encrypt, key, iv=None):
  59. EVP_CIPHER_CTX_init(&self.ctx)
  60. self.is_encrypt = is_encrypt
  61. # Set cipher type and mode
  62. cipher_mode = EVP_aes_256_ctr()
  63. if self.is_encrypt:
  64. if not EVP_EncryptInit_ex(&self.ctx, cipher_mode, NULL, NULL, NULL):
  65. raise Exception('EVP_EncryptInit_ex failed')
  66. else: # decrypt
  67. if not EVP_DecryptInit_ex(&self.ctx, cipher_mode, NULL, NULL, NULL):
  68. raise Exception('EVP_DecryptInit_ex failed')
  69. self.reset(key, iv)
  70. def __dealloc__(self):
  71. EVP_CIPHER_CTX_cleanup(&self.ctx)
  72. def reset(self, key=None, iv=None):
  73. cdef const unsigned char *key2 = NULL
  74. cdef const unsigned char *iv2 = NULL
  75. if key:
  76. key2 = key
  77. if iv:
  78. iv2 = iv
  79. # Initialise key and IV
  80. if self.is_encrypt:
  81. if not EVP_EncryptInit_ex(&self.ctx, NULL, NULL, key2, iv2):
  82. raise Exception('EVP_EncryptInit_ex failed')
  83. else: # decrypt
  84. if not EVP_DecryptInit_ex(&self.ctx, NULL, NULL, key2, iv2):
  85. raise Exception('EVP_DecryptInit_ex failed')
  86. @property
  87. def iv(self):
  88. return self.ctx.iv[:16]
  89. def encrypt(self, data):
  90. cdef Py_buffer data_buf = ro_buffer(data)
  91. cdef int inl = len(data)
  92. cdef int ctl = 0
  93. cdef int outl = 0
  94. # note: modes that use padding, need up to one extra AES block (16b)
  95. cdef unsigned char *out = <unsigned char *>malloc(inl+16)
  96. if not out:
  97. raise MemoryError
  98. try:
  99. if not EVP_EncryptUpdate(&self.ctx, out, &outl, <const unsigned char*> data_buf.buf, inl):
  100. raise Exception('EVP_EncryptUpdate failed')
  101. ctl = outl
  102. if not EVP_EncryptFinal_ex(&self.ctx, out+ctl, &outl):
  103. raise Exception('EVP_EncryptFinal failed')
  104. ctl += outl
  105. return out[:ctl]
  106. finally:
  107. free(out)
  108. PyBuffer_Release(&data_buf)
  109. def decrypt(self, data):
  110. cdef Py_buffer data_buf = ro_buffer(data)
  111. cdef int inl = len(data)
  112. cdef int ptl = 0
  113. cdef int outl = 0
  114. # note: modes that use padding, need up to one extra AES block (16b).
  115. # This is what the openssl docs say. I am not sure this is correct,
  116. # but OTOH it will not cause any harm if our buffer is a little bigger.
  117. cdef unsigned char *out = <unsigned char *>malloc(inl+16)
  118. if not out:
  119. raise MemoryError
  120. try:
  121. if not EVP_DecryptUpdate(&self.ctx, out, &outl, <const unsigned char*> data_buf.buf, inl):
  122. raise Exception('EVP_DecryptUpdate failed')
  123. ptl = outl
  124. if EVP_DecryptFinal_ex(&self.ctx, out+ptl, &outl) <= 0:
  125. # this error check is very important for modes with padding or
  126. # authentication. for them, a failure here means corrupted data.
  127. # CTR mode does not use padding nor authentication.
  128. raise Exception('EVP_DecryptFinal failed')
  129. ptl += outl
  130. return out[:ptl]
  131. finally:
  132. free(out)
  133. PyBuffer_Release(&data_buf)
  134. def hmac_sha256(key, data):
  135. md = bytes(32)
  136. cdef Py_buffer data_buf = ro_buffer(data)
  137. cdef const unsigned char *key_ptr = key
  138. cdef int key_len = len(key)
  139. cdef unsigned char *md_ptr = md
  140. try:
  141. with nogil:
  142. rc = HMAC(EVP_sha256(), key_ptr, key_len, <const unsigned char*> data_buf.buf, data_buf.len, md_ptr, NULL)
  143. if rc != md_ptr:
  144. raise Exception('HMAC(EVP_sha256) failed')
  145. finally:
  146. PyBuffer_Release(&data_buf)
  147. return md