Source code for futurefinity.security

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
#   Copyright 2016 Futur Solo
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from futurefinity.utils import ensure_str, ensure_bytes, FutureFinityError

from typing import Optional

import io
import os
import hmac
import time
import base64
import string
import struct
import random
import hashlib

try:  # Try to load cryptography.
    from cryptography.hazmat.primitives.ciphers import (
        Cipher as AESCipher,
        algorithms as aes_algorithms,
        modes as aes_modes
    )
    from cryptography.hazmat.backends import default_backend as aes_backend

except ImportError:  # Point cryptography to None if they are not found.
    AESCipher = aes_algorithms = aes_modes = aes_backend = None


[docs]def get_random_str(length: int) -> str: """ Generate a random string with specified length by ``random.SystemRandom``. If ``random.SystemRandom`` is not presented, it will raise an `NotImplementedError`. """ random_generator = random.SystemRandom() random_string = "" for i in range(0, length): random_string += random_generator.choice( string.ascii_letters + string.digits) return random_string
class BaseSecurityContext: def __init__(self, security_secret: str): self._security_secret = hashlib.sha256( ensure_bytes(security_secret)).digest() def lookup_origin_text( self, secure_text: str, valid_length: Optional[int]=None) -> Optional[str]: """ Validate and/or Decrypt the secured text and return the origin text. If the content and the signature mismatches or unable to decrypt, ``None`` will be returned. :arg secure_text: is a base64 encoded string, which contains the generated time, the content, and a signature. :arg valid_length: the length that the content should be valid. The unit is second. It you want the content be always valid, set the length to ``None``. """ raise NotImplementedError def generate_secure_text(self, origin_text: str) -> str: """ Sign and/or Encrypt the origin text by the implemented algorithm. :arg origin_text: is a string that will be encrypted by the provided security secret. """ raise NotImplementedError
[docs]class HMACSecurityContext(BaseSecurityContext): """ Security Context uses HMAC. It Uses Built-in ``hmac`` library to create a signature for the given content. It doesn't encrypt the content, but only ensures the integrity of the content. No one can forge the content, or replace the content without the security secret. :arg security_secret: is a string or bytes for the generate signature. This can be generated by ``futurefinity.security.get_random_str``. """ def lookup_origin_text(self, secure_text: str, valid_length: Optional[int]=None) -> str: try: signed_text_reader = io.BytesIO(base64.b64decode(secure_text)) except: # Unable to decode base64 into bytes. return None try: iv = signed_text_reader.read(16) length = struct.unpack("l", signed_text_reader.read(8))[0] content = signed_text_reader.read(length) signature = signed_text_reader.read(32) except: # Unable to split the secure_text. return None hash = hmac.new(iv + self._security_secret, digestmod=hashlib.sha256) hash.update(content) if not hmac.compare_digest(signature, hash.digest()): return None # Signature Mismatch. timestamp = struct.unpack("l", content[:8])[0] text = content[8:] if valid_length and int(time.time()) - timestamp > valid_length: return None # Data Expired return ensure_str(text) def generate_secure_text(self, origin_text: str) -> str: if not isinstance(origin_text, str): raise TypeError("origin_text should be a string.") iv = os.urandom(16) content = struct.pack("l", int(time.time())) content += ensure_bytes(origin_text) hash = hmac.new(iv + self._security_secret, digestmod=hashlib.sha256) hash.update(content) signature = hash.digest() final_signed_text = iv final_signed_text += struct.pack("l", len(content)) final_signed_text += content final_signed_text += signature return ensure_str(base64.b64encode(final_signed_text))
[docs]class AESGCMSecurityContext(BaseSecurityContext): """ Security Context uses AES GCM. **This is the default security context of FutureFinity.** It Uses ``cryptography`` library which can be installed via pip to encrypt/decrypt the content and ensure the intrgrity of the content by AES GCM. If ``cryptography`` is not installed, it will raise an `FutureFinityError`. :arg security_secret: is a string or bytes for the generate signature. This can be generated by ``futurefinity.security.get_random_str``. """ def __init__(self, security_secret: str): if None in [AESCipher, aes_algorithms, aes_modes, aes_backend]: raise FutureFinityError( "Currently, `futurefinity.security.AESGCMSecurityContext` " "needs Cryptography to work. Please install it before " "using security features(such as security_secret), " "or turn aes_security to False in Application Settings.") BaseSecurityContext.__init__(self, security_secret) def lookup_origin_text( self, secure_text: str, valid_length: Optional[int]=None) -> str: try: encrypted_text_reader = io.BytesIO(base64.b64decode(secure_text)) except: # Unable to decode the data. return None try: iv = encrypted_text_reader.read(16) length = struct.unpack("l", encrypted_text_reader.read(8))[0] ciphertext = encrypted_text_reader.read(length) tag = encrypted_text_reader.read(16) except: # Unable to split the data. return None decryptor = AESCipher( aes_algorithms.AES(self._security_secret), aes_modes.GCM(iv, tag), backend=aes_backend() ).decryptor() try: content = decryptor.update(ciphertext) + decryptor.finalize() except: # Unable to decrypt and/or verify the data. return None timestamp = struct.unpack("l", content[:8])[0] text = content[8:] if valid_length and int(time.time()) - timestamp > valid_length: return None # Data Expired. return ensure_str(text) def generate_secure_text(self, origin_text: str) -> str: if not isinstance(origin_text, str): raise TypeError("origin_text should be a string.") iv = os.urandom(16) content = struct.pack("l", int(time.time())) content += ensure_bytes(origin_text) encryptor = AESCipher( aes_algorithms.AES(self._security_secret), aes_modes.GCM(iv), backend=aes_backend() ).encryptor() ciphertext = encryptor.update(content) + encryptor.finalize() final_encrypted_text = iv final_encrypted_text += struct.pack("l", len(ciphertext)) final_encrypted_text += ciphertext final_encrypted_text += encryptor.tag return ensure_str(base64.b64encode(final_encrypted_text))