文件加密的时候每个文件随机生成一个AES秘钥进行加密,然后用RSA把秘钥加密后写入到加密后的文件里,防止单个文件被破解。
[color=]注意:加密后的json文件是用来恢复文件夹名称的,生成的RSA公钥、RSA私钥是用来解密文档的,删了就彻底恢复不了了。
[Python] 纯文本查看 复制代码from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes
import os
from loguru import logger
import uuid
import json
# logger.add("log.log", rotation="10 MB")
# Constants
CHUNK_SIZE = 64 * 1024 # 64KB per chunk
RSA_KEY_SIZE = 2048
AES_BLOCK_SIZE = 16
# Padding function (PKCS7)
def pad(data):
padding_len = AES_BLOCK_SIZE - len(data) % AES_BLOCK_SIZE
return data + bytes([padding_len] * padding_len)
# Unpadding function (PKCS7)
def unpad(data):
padding_len = data[-1]
return data[:-padding_len]
# Generate RSA key
def generate_rsa_key():
if os.path.isfile("private_key.pem") or os.path.isfile("public_key.pem"):
logger.warning("KYE文件已存在")
else:
# Generate RSA key pair
rsa_key = RSA.generate(RSA_KEY_SIZE)
private_key = rsa_key.export_key()
public_key = rsa_key.publickey().export_key()
# Save RSA keys locally
with open("private_key.pem", "wb") as priv_file:
priv_file.write(private_key)
with open("public_key.pem", "wb") as pub_file:
pub_file.write(public_key)
# Encryption function
def encrypt_file(input_file, uuid_set):
# Generate a random AES key
aes_key = get_random_bytes(32)
# Initialize AES cipher in CBC mode
cipher_aes = AES.new(aes_key, AES.MODE_CBC)
# Read public key
with open("public_key.pem", "rb") as public_file:
public_key = RSA.import_key(public_file.read())
# Encrypt the AES key using RSA
cipher_rsa = PKCS1_OAEP.new(public_key)
enc_aes_key = cipher_rsa.encrypt(aes_key)
# Encrypt the file name
file_name = os.path.basename(input_file)
encrypted_file_name = cipher_aes.encrypt(pad(file_name.encode('utf-8')))
file_name_length = len(encrypted_file_name)
# Open output file
# output_file = "ex_video"
output_file = os.path.join(os.path.dirname(input_file), str(uuid.uuid4()))
while output_file in uuid_set:
output_file = str(uuid.uuid4())
uuid_set.append(output_file)
with open(input_file, "rb") as infile, open(output_file, "wb") as outfile:
# Write the IV, encrypted AES key, and encrypted file name length to the output file
outfile.write(cipher_aes.iv)
outfile.write(enc_aes_key)
outfile.write(file_name_length.to_bytes(2, 'big')) # 2 bytes to store the file name length
outfile.write(encrypted_file_name)
# Read and encrypt the file in chunks
while chunk := infile.read(CHUNK_SIZE):
if len(chunk) % AES_BLOCK_SIZE != 0:
chunk = pad(chunk)
encrypted_chunk = cipher_aes.encrypt(chunk)
outfile.write(encrypted_chunk)
# Delete the original file
os.remove(input_file)
logger.info(f"Encryption complete. Encrypted file saved as '{output_file}'.")
# Decryption function
def decrypt_file(encrypted_file):
# Read private key
with open("private_key.pem", "rb") as priv_file:
private_key = RSA.import_key(priv_file.read())
# Open files
with open(encrypted_file, "rb") as infile:
# Read the IV and the encrypted AES key
iv = infile.read(16)
enc_aes_key = infile.read(private_key.size_in_bytes())
# Decrypt the AES key using RSA
cipher_rsa = PKCS1_OAEP.new(private_key)
aes_key = cipher_rsa.decrypt(enc_aes_key)
# Initialize AES cipher for decryption
cipher_aes = AES.new(aes_key, AES.MODE_CBC, iv)
# Read and decrypt the file name length and file name
file_name_length = int.from_bytes(infile.read(2), 'big')
encrypted_file_name = infile.read(file_name_length)
file_name = unpad(cipher_aes.decrypt(encrypted_file_name)).decode('utf-8')
# Set output file to the original file name
output_file = os.path.join(os.path.dirname(encrypted_file), file_name)
with open(output_file, "wb") as outfile:
# Read and decrypt the file in chunks
next_chunk = b''
while chunk := infile.read(CHUNK_SIZE):
if next_chunk:
outfile.write(next_chunk)
next_chunk = cipher_aes.decrypt(chunk)
# Handle the final chunk separately to remove padding
outfile.write(unpad(next_chunk))
os.remove(encrypted_file)
logger.info(f"Decryption complete. Decrypted file saved as '{output_file}'.")
def get_all_file_paths(directory):
file_paths = []
for root, dirs, files in os.walk(directory):
for file in files:
file_paths.append(os.path.join(root, file))
return file_paths
# 混淆文件夹名
def rename_folders_and_save_config(root_dir, config_file):
all_file_paths = []
rename_map = {}
# 遍历指定目录及其子目录
for root, dirs, files in os.walk(root_dir, topdown=False):
# 收集所有子文件的路径
for file_name in files:
file_path = os.path.join(root, file_name)
all_file_paths.append(file_path)
# 修改当前目录下的所有子文件夹的名称
for folder_name in dirs:
# 生成一个随机的UUID
new_name = str(uuid.uuid4())
# 构建原始路径和新的路径
old_folder_path = os.path.join(root, folder_name)
new_folder_path = os.path.join(root, new_name)
# 保存原始名称和新名称的映射
rename_map[old_folder_path] = new_folder_path
# 修改文件夹名称
os.rename(old_folder_path, new_folder_path)
logger.info(f'Renamed folder: {old_folder_path} -> {new_folder_path}')
# 保存映射到配置文件
with open(config_file, 'w') as f:
json.dump(rename_map, f, indent=4)
return all_file_paths
# 还原文件夹名
def restore_folders_from_config(config_file):
max_depth = 0
# 从配置文件中读取映射
with open(config_file, 'r') as f:
rename_map = json.load(f)
# 遍历所有路径,计算最大层级数
for old_folder_path, new_folder_path in rename_map.items():
# 计算路径中的层级数
depth = old_folder_path.count(os.sep)
if depth > max_depth:
max_depth = depth
# 对路径长度进行排序,先还原子文件夹
sorted_paths = sorted(rename_map.items(), key=lambda x: len(x[0]), reverse=True)
while max_depth:
max_depth -= 1
for new_folder_path, old_folder_path in sorted_paths:
# 确保新路径存在
if os.path.exists(old_folder_path):
try:
os.rename(old_folder_path, new_folder_path)
logger.info(f'Restored folder: {new_folder_path} -> {old_folder_path}')
except Exception as e:
logger.error(f'Failed to restore folder {old_folder_path} -> {new_folder_path}: {e}')
else:
logger.warning(f'Folder not found: {old_folder_path}')
def hun_xiao(dirs, config_json):
rename_folders_and_save_config(dirs, config_json)
uuid_set = []
fpaths = get_all_file_paths(dirs)
logger.debug(fpaths)
for file in fpaths:
encrypt_file(file, uuid_set)
logger.debug(uuid_set)
logger.debug(len(uuid_set))
def hui_fu(dirs, config_json):
fpaths = get_all_file_paths(dirs)
logger.debug(fpaths)
for file in fpaths:
try:
decrypt_file(file)
except Exception as e:
logger.error(e)
restore_folders_from_config(config_json)
if __name__ == '__main__':
generate_rsa_key()
#混淆加密
hun_xiao(r"文档", r"wd.json")
#还原恢复
#hui_fu(r"文档", r"wd.json")