代码如下,实现的效果就是在几分钟内尝试登录多少次,就会自动封禁ip,24小时后解除(这个解除是触发器触发的时候进行检测的,所以只有下次登录的时候触发一下才能解除)
然后搜索 计划任务程序
创建一个计划任务,
在触发器里可以手动编辑xml,输入如下内容,可以根据你自己的需求来定义,
然后安装python,在操作里面选择启动程序,启动python如C:\Python39\python.exe ,添加参数如 D:\zixing\safe\secwall.py ,起始于如 D:\zixing\safe
其他配置根据你的需求来,然后就可以测试一下远程连接失败N次后是否会被防火墙屏蔽(当然前提是开了防火墙才可以)
[Asm] 纯文本查看 复制代码
*[System[(Level=1 or Level=2 or Level=3 or Level=4 or Level=0 or Level=5) and (EventID=4625)]]
[Python] 纯文本查看 复制代码import subprocess
import re
from datetime import datetime, timedelta
import json
import os
LOG_FILE = "ip_attempts.json"
BLOCK_RULES_FILE = "blocked_ips.json"
MAX_ATTEMPTS = 3
TIME_WINDOW_MINUTES = 3
def load_json(file):
if os.path.exists(file):
with open(file, "r") as f:
return json.load(f)
return {}
def save_json(data, file):
with open(file, "w") as f:
json.dump(data, f)
def get_failed_login_events():
command = 'wevtutil qe Security "/q:*[System[EventID=4625]]" /rd:true /f:text'
try:
output = subprocess.check_output(command, shell=True).decode(
"gbk", errors="ignore"
)
except subprocess.CalledProcessError as e:
print(f"Error reading event log: {e}")
return []
return output.split("\n\n")
def parse_event(event):
ip_pattern = re.compile(r"源网络地址:\s+(?P\d+\.\d+\.\d+\.\d+)")
time_pattern = re.compile(r"Date:\s+(?P[\d\-T:.]+)\d*Z")
ip_match = ip_pattern.search(event)
time_match = time_pattern.search(event)
if ip_match and time_match:
ip = ip_match.group("ip")
event_time = time_match.group("datetime")
event_time = event_time[:23] # 截取前23个字符(YYYY-MM-DDTHH:MM:SS.sss)
event_time = datetime.fromisoformat(event_time)
return ip, event_time
return None, None
def block_ip(ip, blocked_ips):
rule_name = f"Block {ip} for 24 hours"
command = f'netsh advfirewall firewall add rule name="{rule_name}" dir=in action=block remoteip={ip} enable=yes'
try:
subprocess.check_call(command, shell=True)
expire_time = (datetime.now() + timedelta(hours=24)).isoformat()
blocked_ips[ip] = expire_time
print(f"IP {ip} 被阻止到 {expire_time}")
except subprocess.CalledProcessError as e:
print(f"Error blocking IP {ip}: {e}")
def remove_expired_blocks(blocked_ips):
now = datetime.now()
for ip, expire_time in list(blocked_ips.items()):
if datetime.fromisoformat(expire_time) cutoff_time
]
if not ip_attempts[ip]:
del ip_attempts[ip]
def main():
ip_attempts = load_json(LOG_FILE)
blocked_ips = load_json(BLOCK_RULES_FILE)
events = get_failed_login_events()
for event in events:
ip, event_time = parse_event(event)
if ip and event_time:
if ip not in ip_attempts:
ip_attempts[ip] = []
if(event_time.isoformat() not in ip_attempts[ip]):
ip_attempts[ip].append(event_time.isoformat())
remove_old_attempts(ip_attempts)
remove_expired_blocks(blocked_ips)
to_block = []
for ip, attempts in ip_attempts.items():
if len(attempts) >= MAX_ATTEMPTS:
to_block.append(ip)
else:
print(f"IP {ip} 尝试次数: {len(attempts)}")
for ip in to_block:
if ip not in blocked_ips:
block_ip(ip, blocked_ips)
del ip_attempts[ip]
save_json(ip_attempts, LOG_FILE)
save_json(blocked_ips, BLOCK_RULES_FILE)
if __name__ == "__main__":
main()
secwall.zip
(1.49 KB, 下载次数: 27)
2024-10-10 14:20 上传
点击文件名下载附件
下载积分: 吾爱币 -1 CB