123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- """A thin OpenSSL wrapper
- This could be replaced by PyCrypto or something similar when the performance
- of their PBKDF2 implementation is comparable to the OpenSSL version.
- """
- from libc.stdlib cimport malloc, free
- API_VERSION = 2
- cdef extern from "openssl/rand.h":
- int RAND_bytes(unsigned char *buf, int num)
- cdef extern from "openssl/evp.h":
- ctypedef struct EVP_MD:
- pass
- ctypedef struct EVP_CIPHER:
- pass
- ctypedef struct EVP_CIPHER_CTX:
- unsigned char *iv
- pass
- ctypedef struct ENGINE:
- pass
- const EVP_MD *EVP_sha256()
- const EVP_CIPHER *EVP_aes_256_ctr()
- void EVP_CIPHER_CTX_init(EVP_CIPHER_CTX *a)
- void EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *a)
- int EVP_EncryptInit_ex(EVP_CIPHER_CTX *ctx, const EVP_CIPHER *cipher, ENGINE *impl,
- const unsigned char *key, const unsigned char *iv)
- int EVP_DecryptInit_ex(EVP_CIPHER_CTX *ctx, const EVP_CIPHER *cipher, ENGINE *impl,
- const unsigned char *key, const unsigned char *iv)
- int EVP_EncryptUpdate(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl,
- const unsigned char *in_, int inl)
- int EVP_DecryptUpdate(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl,
- const unsigned char *in_, int inl)
- int EVP_EncryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
- int EVP_DecryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl)
- int PKCS5_PBKDF2_HMAC(const char *password, int passwordlen,
- const unsigned char *salt, int saltlen, int iter,
- const EVP_MD *digest,
- int keylen, unsigned char *out)
- import struct
- _int = struct.Struct('>I')
- _long = struct.Struct('>Q')
- bytes_to_int = lambda x, offset=0: _int.unpack_from(x, offset)[0]
- bytes_to_long = lambda x, offset=0: _long.unpack_from(x, offset)[0]
- long_to_bytes = lambda x: _long.pack(x)
- def num_aes_blocks(length):
- """Return the number of AES blocks required to encrypt/decrypt *length* bytes of data.
- Note: this is only correct for modes without padding, like AES-CTR.
- """
- return (length + 15) // 16
- def pbkdf2_sha256(password, salt, iterations, size):
- """Password based key derivation function 2 (RFC2898)
- """
- cdef unsigned char *key = <unsigned char *>malloc(size)
- if not key:
- raise MemoryError
- try:
- rv = PKCS5_PBKDF2_HMAC(password, len(password), salt, len(salt), iterations, EVP_sha256(), size, key)
- if not rv:
- raise Exception('PKCS5_PBKDF2_HMAC failed')
- return key[:size]
- finally:
- free(key)
- def get_random_bytes(n):
- """Return n cryptographically strong pseudo-random bytes
- """
- cdef unsigned char *buf = <unsigned char *>malloc(n)
- if not buf:
- raise MemoryError
- try:
- if RAND_bytes(buf, n) < 1:
- raise Exception('RAND_bytes failed')
- return buf[:n]
- finally:
- free(buf)
- cdef class AES:
- """A thin wrapper around the OpenSSL EVP cipher API
- """
- cdef EVP_CIPHER_CTX ctx
- cdef int is_encrypt
- def __cinit__(self, is_encrypt, key, iv=None):
- EVP_CIPHER_CTX_init(&self.ctx)
- self.is_encrypt = is_encrypt
- # Set cipher type and mode
- cipher_mode = EVP_aes_256_ctr()
- if self.is_encrypt:
- if not EVP_EncryptInit_ex(&self.ctx, cipher_mode, NULL, NULL, NULL):
- raise Exception('EVP_EncryptInit_ex failed')
- else: # decrypt
- if not EVP_DecryptInit_ex(&self.ctx, cipher_mode, NULL, NULL, NULL):
- raise Exception('EVP_DecryptInit_ex failed')
- self.reset(key, iv)
- def __dealloc__(self):
- EVP_CIPHER_CTX_cleanup(&self.ctx)
- def reset(self, key=None, iv=None):
- cdef const unsigned char *key2 = NULL
- cdef const unsigned char *iv2 = NULL
- if key:
- key2 = key
- if iv:
- iv2 = iv
- # Initialise key and IV
- if self.is_encrypt:
- if not EVP_EncryptInit_ex(&self.ctx, NULL, NULL, key2, iv2):
- raise Exception('EVP_EncryptInit_ex failed')
- else: # decrypt
- if not EVP_DecryptInit_ex(&self.ctx, NULL, NULL, key2, iv2):
- raise Exception('EVP_DecryptInit_ex failed')
- @property
- def iv(self):
- return self.ctx.iv[:16]
- def encrypt(self, data):
- cdef int inl = len(data)
- cdef int ctl = 0
- cdef int outl = 0
- # note: modes that use padding, need up to one extra AES block (16b)
- cdef unsigned char *out = <unsigned char *>malloc(inl+16)
- if not out:
- raise MemoryError
- try:
- if not EVP_EncryptUpdate(&self.ctx, out, &outl, data, inl):
- raise Exception('EVP_EncryptUpdate failed')
- ctl = outl
- if not EVP_EncryptFinal_ex(&self.ctx, out+ctl, &outl):
- raise Exception('EVP_EncryptFinal failed')
- ctl += outl
- return out[:ctl]
- finally:
- free(out)
- def decrypt(self, data):
- cdef int inl = len(data)
- cdef int ptl = 0
- cdef int outl = 0
- # note: modes that use padding, need up to one extra AES block (16b).
- # This is what the openssl docs say. I am not sure this is correct,
- # but OTOH it will not cause any harm if our buffer is a little bigger.
- cdef unsigned char *out = <unsigned char *>malloc(inl+16)
- if not out:
- raise MemoryError
- try:
- if not EVP_DecryptUpdate(&self.ctx, out, &outl, data, inl):
- raise Exception('EVP_DecryptUpdate failed')
- ptl = outl
- if EVP_DecryptFinal_ex(&self.ctx, out+ptl, &outl) <= 0:
- # this error check is very important for modes with padding or
- # authentication. for them, a failure here means corrupted data.
- # CTR mode does not use padding nor authentication.
- raise Exception('EVP_DecryptFinal failed')
- ptl += outl
- return out[:ptl]
- finally:
- free(out)
|