Python 日志记录以自定义格式加密
Python logging encrypt with custom format
我正在尝试创建自定义记录器 class 以打印日志并将其同时保存到加密文件中。我用这个 。
这是我的代码:
import base64
import logging
from pprint import pprint
from Cryptodome.Cipher import AES
from Cryptodome.Hash import SHA256
from Cryptodome.Hash import MD5
from Cryptodome import Random
class logger:
"""
Encrypt log messages to file as encrypted
"""
class EncryptedLogFormatter(logging.Formatter):
def __init__(self, key, fmt=None, datefmt=None):
self._key = self.hash_gen(key, 16)
super(logger.EncryptedLogFormatter, self).__init__(fmt=fmt, datefmt=datefmt)
@staticmethod
def hash_gen(key, size):
"""
return a hash object of key base on size
"""
key = MD5.new(key.encode('utf-8')).digest() # use SHA-256 for a proper-sized AES key
return key[:size]
def format(self, record):
# pprint(vars(record))
message = record.msg # log message to encrypt, if any
asctime = record.asctime # asctime to encrypt
levelname = record.levelname # levelname to encrypt
if message: # no sense to encrypt empty log messages
iv = Random.new().read(AES.block_size) # we'll be using CBC so generate an IV
cipher = AES.new(self._key, AES.MODE_CBC, iv)
# AES demands all blocks to be of `AES.block_size` so we have to pad the message
# you can use any padding you prefer, I think PKCS#7 is the best option
padding = AES.block_size - len(message) % AES.block_size
# pad the message...
message += chr(padding) * padding
message_enc = iv + cipher.encrypt(message.encode()) # add iv and encrypt
# finally, replace our plain-text message with base64 encoded encrypted one
record.msg = base64.b64encode(message_enc).decode()
if asctime:
iv = Random.new().read(AES.block_size)
cipher = AES.new(self._key, AES.MODE_CBC, iv)
padding = AES.block_size - len(asctime) % AES.block_size
asctime += chr(padding) * padding
asctime_enc = iv + cipher.encrypt(asctime.encode())
record.asctime = base64.b64encode(asctime_enc).decode()
if levelname:
iv = Random.new().read(AES.block_size)
cipher = AES.new(self._key, AES.MODE_CBC, iv)
padding = AES.block_size - len(levelname) % AES.block_size
levelname += chr(padding) * padding
levelname_enc = iv + cipher.encrypt(levelname.encode())
record.levelname = base64.b64encode(levelname_enc).decode()
return super(logger.EncryptedLogFormatter, self).format(record)
def __init__(self, key, filename, level=logging.INFO, fmt='%(asctime)s:%(levelname)s: %(message)s', datefmt="%Y-%m-%d %H:%M:%S"):
root = logging.getLogger()
root.setLevel(level)
ch = logging.StreamHandler()
fh = logging.FileHandler(filename)
formatter = logging.Formatter(fmt=fmt, datefmt=datefmt)
ch.setFormatter(formatter)
fh.setFormatter(logger.EncryptedLogFormatter(key, fmt, datefmt))
root.addHandler(ch)
root.addHandler(fh)
def print(self, message):
logging.info(message)
if __name__ == "__main__":
logg = logger("abcdefg", 'Some path')
logg.print("Hello")
控制台输出:
2018-08-12 13:21:07:INFO: Hello
文件输出:
2018-08-12 13:21:07:QcMrG7d7gvxwiagidFozC2v4kQukgnbXv5Hs2rMDAZQ=: Px4ZlIE7usOTTtbURDjrGW4VBXaIKH/F3vhs9pj5G3o=
似乎 asctime
还没有加密。
我想要的是只使用用户格式并加密时间、级别和消息。最好只创建整行加密,但我不知道如何为用户输入格式创建自定义消息。
正如@AntiMatterDynamite 所说,整个消息可以在 super().format()
:
之前被加密
import base64
import logging
from pprint import pprint
from Cryptodome.Cipher import AES
from Cryptodome.Hash import SHA256
from Cryptodome.Hash import MD5
from Cryptodome import Random
class logger:
"""
Encrypt log messages to file as encrypted
"""
class EncryptedLogFormatter(logging.Formatter):
def __init__(self, key, fmt=None, datefmt=None):
self._key = self.hash_gen(key, 16)
super(logger.EncryptedLogFormatter, self).__init__(fmt=fmt, datefmt=datefmt)
@staticmethod
def hash_gen(key, size):
"""
return a hash object of key base on size
"""
key = MD5.new(key.encode('utf-8')).digest() # use SHA-256 for a proper-sized AES key
return key[:size]
def format(self, record):
# encrypt whole message instead of record.msg
message = super().format(record)
if message: # no sense to encrypt empty log messages
iv = Random.new().read(AES.block_size) # we'll be using CBC so generate an IV
cipher = AES.new(self._key, AES.MODE_CBC, iv)
# AES demands all blocks to be of `AES.block_size` so we have to pad the message
# you can use any padding you prefer, I think PKCS#7 is the best option
padding = AES.block_size - len(message) % AES.block_size
# pad the message...
message += chr(padding) * padding
message_enc = iv + cipher.encrypt(message.encode()) # add iv and encrypt
# finally, replace our plain-text message with base64 encoded encrypted one
return base64.b64encode(message_enc).decode()
def __init__(self, key, filename, level=logging.INFO, fmt='%(asctime)s:%(levelname)s: %(message)s', datefmt="%Y-%m-%d %H:%M:%S"):
root = logging.getLogger()
root.setLevel(level)
ch = logging.StreamHandler()
fh = logging.FileHandler(filename)
formatter = logging.Formatter(fmt=fmt, datefmt=datefmt)
ch.setFormatter(formatter)
fh.setFormatter(logger.EncryptedLogFormatter(key, fmt, datefmt))
root.addHandler(ch)
root.addHandler(fh)
def print(self, message):
logging.info(message)
if __name__ == "__main__":
log = logger("abcdefg", 'Some path')
log.print("Hello")
我正在尝试创建自定义记录器 class 以打印日志并将其同时保存到加密文件中。我用这个
import base64
import logging
from pprint import pprint
from Cryptodome.Cipher import AES
from Cryptodome.Hash import SHA256
from Cryptodome.Hash import MD5
from Cryptodome import Random
class logger:
"""
Encrypt log messages to file as encrypted
"""
class EncryptedLogFormatter(logging.Formatter):
def __init__(self, key, fmt=None, datefmt=None):
self._key = self.hash_gen(key, 16)
super(logger.EncryptedLogFormatter, self).__init__(fmt=fmt, datefmt=datefmt)
@staticmethod
def hash_gen(key, size):
"""
return a hash object of key base on size
"""
key = MD5.new(key.encode('utf-8')).digest() # use SHA-256 for a proper-sized AES key
return key[:size]
def format(self, record):
# pprint(vars(record))
message = record.msg # log message to encrypt, if any
asctime = record.asctime # asctime to encrypt
levelname = record.levelname # levelname to encrypt
if message: # no sense to encrypt empty log messages
iv = Random.new().read(AES.block_size) # we'll be using CBC so generate an IV
cipher = AES.new(self._key, AES.MODE_CBC, iv)
# AES demands all blocks to be of `AES.block_size` so we have to pad the message
# you can use any padding you prefer, I think PKCS#7 is the best option
padding = AES.block_size - len(message) % AES.block_size
# pad the message...
message += chr(padding) * padding
message_enc = iv + cipher.encrypt(message.encode()) # add iv and encrypt
# finally, replace our plain-text message with base64 encoded encrypted one
record.msg = base64.b64encode(message_enc).decode()
if asctime:
iv = Random.new().read(AES.block_size)
cipher = AES.new(self._key, AES.MODE_CBC, iv)
padding = AES.block_size - len(asctime) % AES.block_size
asctime += chr(padding) * padding
asctime_enc = iv + cipher.encrypt(asctime.encode())
record.asctime = base64.b64encode(asctime_enc).decode()
if levelname:
iv = Random.new().read(AES.block_size)
cipher = AES.new(self._key, AES.MODE_CBC, iv)
padding = AES.block_size - len(levelname) % AES.block_size
levelname += chr(padding) * padding
levelname_enc = iv + cipher.encrypt(levelname.encode())
record.levelname = base64.b64encode(levelname_enc).decode()
return super(logger.EncryptedLogFormatter, self).format(record)
def __init__(self, key, filename, level=logging.INFO, fmt='%(asctime)s:%(levelname)s: %(message)s', datefmt="%Y-%m-%d %H:%M:%S"):
root = logging.getLogger()
root.setLevel(level)
ch = logging.StreamHandler()
fh = logging.FileHandler(filename)
formatter = logging.Formatter(fmt=fmt, datefmt=datefmt)
ch.setFormatter(formatter)
fh.setFormatter(logger.EncryptedLogFormatter(key, fmt, datefmt))
root.addHandler(ch)
root.addHandler(fh)
def print(self, message):
logging.info(message)
if __name__ == "__main__":
logg = logger("abcdefg", 'Some path')
logg.print("Hello")
控制台输出:
2018-08-12 13:21:07:INFO: Hello
文件输出:
2018-08-12 13:21:07:QcMrG7d7gvxwiagidFozC2v4kQukgnbXv5Hs2rMDAZQ=: Px4ZlIE7usOTTtbURDjrGW4VBXaIKH/F3vhs9pj5G3o=
似乎 asctime
还没有加密。
我想要的是只使用用户格式并加密时间、级别和消息。最好只创建整行加密,但我不知道如何为用户输入格式创建自定义消息。
正如@AntiMatterDynamite 所说,整个消息可以在 super().format()
:
import base64
import logging
from pprint import pprint
from Cryptodome.Cipher import AES
from Cryptodome.Hash import SHA256
from Cryptodome.Hash import MD5
from Cryptodome import Random
class logger:
"""
Encrypt log messages to file as encrypted
"""
class EncryptedLogFormatter(logging.Formatter):
def __init__(self, key, fmt=None, datefmt=None):
self._key = self.hash_gen(key, 16)
super(logger.EncryptedLogFormatter, self).__init__(fmt=fmt, datefmt=datefmt)
@staticmethod
def hash_gen(key, size):
"""
return a hash object of key base on size
"""
key = MD5.new(key.encode('utf-8')).digest() # use SHA-256 for a proper-sized AES key
return key[:size]
def format(self, record):
# encrypt whole message instead of record.msg
message = super().format(record)
if message: # no sense to encrypt empty log messages
iv = Random.new().read(AES.block_size) # we'll be using CBC so generate an IV
cipher = AES.new(self._key, AES.MODE_CBC, iv)
# AES demands all blocks to be of `AES.block_size` so we have to pad the message
# you can use any padding you prefer, I think PKCS#7 is the best option
padding = AES.block_size - len(message) % AES.block_size
# pad the message...
message += chr(padding) * padding
message_enc = iv + cipher.encrypt(message.encode()) # add iv and encrypt
# finally, replace our plain-text message with base64 encoded encrypted one
return base64.b64encode(message_enc).decode()
def __init__(self, key, filename, level=logging.INFO, fmt='%(asctime)s:%(levelname)s: %(message)s', datefmt="%Y-%m-%d %H:%M:%S"):
root = logging.getLogger()
root.setLevel(level)
ch = logging.StreamHandler()
fh = logging.FileHandler(filename)
formatter = logging.Formatter(fmt=fmt, datefmt=datefmt)
ch.setFormatter(formatter)
fh.setFormatter(logger.EncryptedLogFormatter(key, fmt, datefmt))
root.addHandler(ch)
root.addHandler(fh)
def print(self, message):
logging.info(message)
if __name__ == "__main__":
log = logger("abcdefg", 'Some path')
log.print("Hello")