如何使用 Python 加密大文件?
How to encrypt large file using Python?
我正在尝试加密大于 1GB 的文件。我不想把它全部读到记忆中。我为此任务选择了 Fernet (cryptography.fernet),因为它是最受推荐的(比非对称解决方案更快)。
我生成了密钥。然后我创建了一个脚本来加密:
key = Fernet(read_key())
with open(source, "rb") as src, open(destination, "wb") as dest:
for chunk in iter(lambda: src.read(4096), b""):
encrypted = key.encrypt(chunk)
dest.write(encrypted)
以及解密:
key = Fernet(read_key())
with open(source, "rb") as src, open(destination, "wb") as dest:
for chunk in iter(lambda: src.read(4096), b""):
decrypted = key.decrypt(chunk)
dest.write(decrypted)
加密有效 - 不足为奇,但解密无效。
首先,我认为它可能会起作用,但事实并非如此。我想加密时块大小会增加,然后当我读取 4096 字节时,它不是一个完整的加密块。我在尝试解密时遇到错误:
Traceback (most recent call last):
File "/redacted/path/venv/lib/python3.7/site-packages/cryptography/fernet.py", line 119, in _verify_signature
h.verify(data[-32:])
File "/redacted/path/venv/lib/python3.7/site-packages/cryptography/hazmat/primitives/hmac.py", line 74, in verify
ctx.verify(signature)
File "/redacted/path/venv/lib/python3.7/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 75, in verify
raise InvalidSignature("Signature did not match digest.")
cryptography.exceptions.InvalidSignature: Signature did not match digest.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/redacted/path/main.py", line 63, in <module>
decrypted = key.decrypt(chunk)
File "/redacted/path/venv/lib/python3.7/site-packages/cryptography/fernet.py", line 80, in decrypt
return self._decrypt_data(data, timestamp, time_info)
File "/redacted/path/venv/lib/python3.7/site-packages/cryptography/fernet.py", line 137, in _decrypt_data
self._verify_signature(data)
File "/redacted/path/venv/lib/python3.7/site-packages/cryptography/fernet.py", line 121, in _verify_signature
raise InvalidToken
cryptography.fernet.InvalidToken
有办法解决吗?也许有比 fernet 更好(更简单)的解决方案?
Fernet 不应以流媒体方式使用。他们在文档中解释说:
From the documentation (last section):
Limitations
Fernet is ideal for encrypting data that easily fits in memory. As a design feature it does not expose unauthenticated bytes. This means that the complete message contents must be available in memory, making Fernet generally unsuitable for very large files at this time.
由于内存限制,我们可以使用块来加密和解密。
#
# encrypt
#
key = b'Ke0Ft_85-bXQ8GLOOsEI6JeT2mD-GeI8pkcP_re8wio='
in_file_name = 'plain.txt'
out_file_name = 'encypted.txt'
with open(in_file_name, "rb") as fin, open(out_file_name, "wb") as fout:
while True:
block = fin.read(524288) # 2^19
if not block:
break
f = Fernet(key)
output = f.encrypt(block)
print('encrypted block size: ' + str(len(block))) # returns 699148
fout.write(output)
#
# decrypt
#
in_file_name = 'encrypted.txt'
out_file_name = 'plain2.txt'
with open(in_file_name, "rb") as fin, open(out_file_name, "wb") as fout:
while True:
block = fin.read(699148)
if not block:
break
f = Fernet(key)
output = f.decrypt(block)
fout.write(output)
块大小值确定如下:
从 4096 作为加密块大小开始,它产生了恒定字节数的一致输出,但最后一个 <4096 字节的块除外。最后,它被提高到 524288,它再次返回了一致的字节数 - 699148,除了 <699148 字节的最后一个块。
使用524288字节加解密699148字节,35GB以上大文件加解密成功
block = fin.read(524288) # 2^19
print('encrypted block size: ' + str(len(block))) # returns 699148
我刚刚 运行 遇到了同样的问题 - 我觉得你很痛苦兄弟。
Fernet 存在一些问题,使其与您的方法不兼容:
- Fernet 吐出 urlsafe_base64 编码数据。这意味着对于每消耗 3 个字节的未加密数据,Fernet 将吐出 4 个字节的加密数据。
这可以防止您在加密和解密时使用相同的“块大小”,因为解密块大小必须更大。不幸的是,用 urlsafe_b64decode
/urlsafe_b64encode
处理数据也不起作用,因为:
- Fernet 似乎在加密数据的某处添加了某种 digest/checksum/metadata。
可能有一种直接的方法可以计算出这个摘要有多大,并调整解密块的大小来适应这个——但我想避免用“魔法常量”做事,因为那感觉很恶心。
我最终确定的解决方案实际上非常优雅。它的工作原理如下:
加密:
- 读取
n
字节数据 (raw_chunk
)
- 使用 Fernet 加密
n
字节以创建 m
字节块(enc_chunk
)。
- 使用
len(enc_chunk).to_bytes(4, "big")
将加密块的大小写入文件
- 将加密块写入文件
- 当我阅读
b""
时中断
解密:
- 读取
4
字节数据 (size
)
- 如果数据是
b""
则中断
- 使用
int.from_bytes(size, "big")
(num_bytes
) 将这 4 个字节转换为整数
- 读取
num_bytes
个加密数据
- 使用 Fernet 毫无问题地解密此数据
您可以轻松地将任何 non-streaming 算法(如 Fernet)变成流式算法,只需将输入数据切成块并将块长度存储在加密文件中,@tlonny 已经 了。这只有在您负担得起任何格式的加密数据文件时才有可能。
可以通过不同的方式将块大小转换为字节。一个是使用 struct.pack() and struct.unpack() 就像我在下面的代码中所做的那样。另一种方法是使用 int(size).to_bytes(4, 'little')
和 size = int().from_bytes(size_bytes, 'little')
.
以下代码完整实现了 encrypt()
和 decrypt()
以及使用示例(将 2 MB 的随机数据加密为 64 KB 的块)。
def encrypt(key, fin, fout, *, block = 1 << 16):
import cryptography.fernet, struct
fernet = cryptography.fernet.Fernet(key)
with open(fin, 'rb') as fi, open(fout, 'wb') as fo:
while True:
chunk = fi.read(block)
if len(chunk) == 0:
break
enc = fernet.encrypt(chunk)
fo.write(struct.pack('<I', len(enc)))
fo.write(enc)
if len(chunk) < block:
break
def decrypt(key, fin, fout):
import cryptography.fernet, struct
fernet = cryptography.fernet.Fernet(key)
with open(fin, 'rb') as fi, open(fout, 'wb') as fo:
while True:
size_data = fi.read(4)
if len(size_data) == 0:
break
chunk = fi.read(struct.unpack('<I', size_data)[0])
dec = fernet.decrypt(chunk)
fo.write(dec)
def test():
import cryptography.fernet, secrets
key = cryptography.fernet.Fernet.generate_key()
with open('data.in', 'wb') as f:
data = secrets.token_bytes(1 << 21)
f.write(data)
encrypt(key, 'data.in', 'data.enc')
decrypt(key, 'data.enc', 'data.out')
with open('data.out', 'rb') as f:
assert data == f.read()
if __name__ == '__main__':
test()
我正在尝试加密大于 1GB 的文件。我不想把它全部读到记忆中。我为此任务选择了 Fernet (cryptography.fernet),因为它是最受推荐的(比非对称解决方案更快)。
我生成了密钥。然后我创建了一个脚本来加密:
key = Fernet(read_key())
with open(source, "rb") as src, open(destination, "wb") as dest:
for chunk in iter(lambda: src.read(4096), b""):
encrypted = key.encrypt(chunk)
dest.write(encrypted)
以及解密:
key = Fernet(read_key())
with open(source, "rb") as src, open(destination, "wb") as dest:
for chunk in iter(lambda: src.read(4096), b""):
decrypted = key.decrypt(chunk)
dest.write(decrypted)
加密有效 - 不足为奇,但解密无效。 首先,我认为它可能会起作用,但事实并非如此。我想加密时块大小会增加,然后当我读取 4096 字节时,它不是一个完整的加密块。我在尝试解密时遇到错误:
Traceback (most recent call last):
File "/redacted/path/venv/lib/python3.7/site-packages/cryptography/fernet.py", line 119, in _verify_signature
h.verify(data[-32:])
File "/redacted/path/venv/lib/python3.7/site-packages/cryptography/hazmat/primitives/hmac.py", line 74, in verify
ctx.verify(signature)
File "/redacted/path/venv/lib/python3.7/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 75, in verify
raise InvalidSignature("Signature did not match digest.")
cryptography.exceptions.InvalidSignature: Signature did not match digest.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/redacted/path/main.py", line 63, in <module>
decrypted = key.decrypt(chunk)
File "/redacted/path/venv/lib/python3.7/site-packages/cryptography/fernet.py", line 80, in decrypt
return self._decrypt_data(data, timestamp, time_info)
File "/redacted/path/venv/lib/python3.7/site-packages/cryptography/fernet.py", line 137, in _decrypt_data
self._verify_signature(data)
File "/redacted/path/venv/lib/python3.7/site-packages/cryptography/fernet.py", line 121, in _verify_signature
raise InvalidToken
cryptography.fernet.InvalidToken
有办法解决吗?也许有比 fernet 更好(更简单)的解决方案?
Fernet 不应以流媒体方式使用。他们在文档中解释说:
From the documentation (last section):
Limitations
Fernet is ideal for encrypting data that easily fits in memory. As a design feature it does not expose unauthenticated bytes. This means that the complete message contents must be available in memory, making Fernet generally unsuitable for very large files at this time.
由于内存限制,我们可以使用块来加密和解密。
#
# encrypt
#
key = b'Ke0Ft_85-bXQ8GLOOsEI6JeT2mD-GeI8pkcP_re8wio='
in_file_name = 'plain.txt'
out_file_name = 'encypted.txt'
with open(in_file_name, "rb") as fin, open(out_file_name, "wb") as fout:
while True:
block = fin.read(524288) # 2^19
if not block:
break
f = Fernet(key)
output = f.encrypt(block)
print('encrypted block size: ' + str(len(block))) # returns 699148
fout.write(output)
#
# decrypt
#
in_file_name = 'encrypted.txt'
out_file_name = 'plain2.txt'
with open(in_file_name, "rb") as fin, open(out_file_name, "wb") as fout:
while True:
block = fin.read(699148)
if not block:
break
f = Fernet(key)
output = f.decrypt(block)
fout.write(output)
块大小值确定如下:
从 4096 作为加密块大小开始,它产生了恒定字节数的一致输出,但最后一个 <4096 字节的块除外。最后,它被提高到 524288,它再次返回了一致的字节数 - 699148,除了 <699148 字节的最后一个块。
使用524288字节加解密699148字节,35GB以上大文件加解密成功
block = fin.read(524288) # 2^19
print('encrypted block size: ' + str(len(block))) # returns 699148
我刚刚 运行 遇到了同样的问题 - 我觉得你很痛苦兄弟。
Fernet 存在一些问题,使其与您的方法不兼容:
- Fernet 吐出 urlsafe_base64 编码数据。这意味着对于每消耗 3 个字节的未加密数据,Fernet 将吐出 4 个字节的加密数据。
这可以防止您在加密和解密时使用相同的“块大小”,因为解密块大小必须更大。不幸的是,用 urlsafe_b64decode
/urlsafe_b64encode
处理数据也不起作用,因为:
- Fernet 似乎在加密数据的某处添加了某种 digest/checksum/metadata。
可能有一种直接的方法可以计算出这个摘要有多大,并调整解密块的大小来适应这个——但我想避免用“魔法常量”做事,因为那感觉很恶心。
我最终确定的解决方案实际上非常优雅。它的工作原理如下:
加密:
- 读取
n
字节数据 (raw_chunk
) - 使用 Fernet 加密
n
字节以创建m
字节块(enc_chunk
)。 - 使用
len(enc_chunk).to_bytes(4, "big")
将加密块的大小写入文件 - 将加密块写入文件
- 当我阅读
b""
时中断
解密:
- 读取
4
字节数据 (size
) - 如果数据是
b""
则中断
- 使用
int.from_bytes(size, "big")
(num_bytes
) 将这 4 个字节转换为整数
- 读取
num_bytes
个加密数据 - 使用 Fernet 毫无问题地解密此数据
您可以轻松地将任何 non-streaming 算法(如 Fernet)变成流式算法,只需将输入数据切成块并将块长度存储在加密文件中,@tlonny 已经
可以通过不同的方式将块大小转换为字节。一个是使用 struct.pack() and struct.unpack() 就像我在下面的代码中所做的那样。另一种方法是使用 int(size).to_bytes(4, 'little')
和 size = int().from_bytes(size_bytes, 'little')
.
以下代码完整实现了 encrypt()
和 decrypt()
以及使用示例(将 2 MB 的随机数据加密为 64 KB 的块)。
def encrypt(key, fin, fout, *, block = 1 << 16):
import cryptography.fernet, struct
fernet = cryptography.fernet.Fernet(key)
with open(fin, 'rb') as fi, open(fout, 'wb') as fo:
while True:
chunk = fi.read(block)
if len(chunk) == 0:
break
enc = fernet.encrypt(chunk)
fo.write(struct.pack('<I', len(enc)))
fo.write(enc)
if len(chunk) < block:
break
def decrypt(key, fin, fout):
import cryptography.fernet, struct
fernet = cryptography.fernet.Fernet(key)
with open(fin, 'rb') as fi, open(fout, 'wb') as fo:
while True:
size_data = fi.read(4)
if len(size_data) == 0:
break
chunk = fi.read(struct.unpack('<I', size_data)[0])
dec = fernet.decrypt(chunk)
fo.write(dec)
def test():
import cryptography.fernet, secrets
key = cryptography.fernet.Fernet.generate_key()
with open('data.in', 'wb') as f:
data = secrets.token_bytes(1 << 21)
f.write(data)
encrypt(key, 'data.in', 'data.enc')
decrypt(key, 'data.enc', 'data.out')
with open('data.out', 'rb') as f:
assert data == f.read()
if __name__ == '__main__':
test()