在日常使用中,我们常常希望动态远程操作防火墙,以保证风险应用的安全,此项目实现了使用钉钉机器人来远程控制Linux应用防火墙。可以在钉钉群以@机器人的方式,来实现控制防火墙开关,支持权限控制、定时关闭。
二、依赖环境
运行环境:Linux (测试系统为centos7)
防火墙:firewall,并处在运行状态。
机器人:钉钉机器人,并开启outgoing功能(该功能现在处在维护状态,新申请的机器人无法打开)
开发环境:python 3.7+ flask requests sqlite3
三、环境部署
启动firewalld
[Bash shell] 纯文本查看 复制代码sodu systemctl start firewalld
放行服务端口(自己指定端口号,应与下述main.py文件中,第16行端口保持一致)
[Bash shell] 纯文本查看 复制代码firewall-cmd --zone=public --add-port=你的端口/tcp --permanent
刷新防火墙
[Bash shell] 纯文本查看 复制代码firewall-cmd --reload
配置脚本内容,并运行脚本
[Bash shell] 纯文本查看 复制代码python3 main.py
查看提示信息
66.png (50.2 KB, 下载次数: 0)
下载附件
2023-2-17 12:40 上传
如果是云服务器,则在云服务器安全组上放开对应端口
访问提示站点尝试,如下则是正常
77.png (28.68 KB, 下载次数: 0)
下载附件
2023-2-17 12:41 上传
增加钉钉机器人,勾选outgoing,输入对应的提示信息
88.png (51.5 KB, 下载次数: 0)
下载附件
2023-2-17 12:43 上传
该功能现属于维护阶段,如放开使用,则如下
99.png (32.09 KB, 下载次数: 0)
下载附件
2023-2-17 12:44 上传
配置完成后,@机器人,查询帮助信息,如正常响应,则配置完成。
100.png (49.41 KB, 下载次数: 0)
下载附件
2023-2-17 12:45 上传
四、源代码
代码结构:
在同一文件夹下,两个py文件,分别为:firewall.py和main.py
101.png (3.62 KB, 下载次数: 0)
下载附件
2023-2-17 12:47 上传
mian.py代码如下(其中8-16行,都需要自己配置)
[Python] 纯文本查看 复制代码from flask import Flask
import requests,json,ipaddress,time,threading,json
from flask import request
from firewall import Firewall
app = Flask(__name__)
#机器人声明的token
TOKEN="123"
#设置机器人地址
robot_url="https://oapi.dingtalk.com/robot/send?access_token=*******"
#本机url_key,建议设置为复杂的key,不要出现特殊符号
url_key="xjcbkasbasba"
#本机公网地址
ip="127.0.0.1"
#本服务监听端口
port="8080"
control_url='/control/'+url_key
control_url_out=f"http://{ip}:{port}{control_url}"
print(f"""
请在您的钉钉机器人的Outgoing配置中输入如下信息:
post地址:{control_url_out}
Token:{TOKEN}
""")
fire_wall=Firewall(robot_url=robot_url)
fire_wall_check=threading.Thread(target=fire_wall.check_data)
fire_wall_check.start()
def is_valid_ip(ip:str):
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
def send_word(word):
"""
"""
global robot_url
header={
"Content-Type":"application/json"
}
data={"msgtype": "text","text": {"content":word}}
for i in range(0,3):
try:
r=requests.post(url=robot_url,headers=header,timeout=60,json=data)
break
except Exception as err:
print(err)
@app.after_request
def modify_server_header(response):
response.headers['Server'] = 'Windows Server 8.0/IIS 10.0'
return response
#路由
@app.route(control_url,methods=['POST'])
def add_data():
word="命令识别失败,发送“帮助”,查看命令格式。"
value = request.headers.get("token")
if TOKEN!=value:
return "error power not allow!"
data=request.json
if data:
user_id=data["senderId"]
text=data["text"]["content"]
text=str(text).replace("\t","").replace(" "," ")
while " " in text:
text=text.replace(" ","")
if text[0]==" ":
text=text[1:]
text=text.split(" ")
if len(text)>0:
if "帮助" in text[0]:
word="""
帮助信息:
1.查看帮助信息 @机器人 帮助
2.查看自己的id @机器人 查询id
3.增加id @机器人 增加id 权限 id值
权限分为:1-管理员 可以增加id,增删放行规则
2-用户 可以增删放行规则
eg:@机器人 增加id 2 $=cndjbaxba
4.删除id @机器人 删除id id值
eg:@机器人 删除id $=cndjbaxba
5.增加放行规则 @机器人 放行 源ip地址 目的端口 放行时间
eg:@机器人 放行 127.0.0.1 8080 10
6.删除放行规则 @机器人 删除 源ip地址 目的端口
eg:@机器人 删除 127.0.0.1 8080
"""
elif "查询id" in text[0]:
word="您的ID是:"+str(user_id)
elif "放行" in text[0]:
if len(text)
firewall.py代码如下
[Python] 纯文本查看 复制代码import os,time,json
import ipaddress
import sqlite3
import requests
class Firewall(object):
def __init__(self,robot_url) -> None:
self.robot_url=robot_url
self.__base_order={
"增加放开规则":"firewall-cmd --permanent --add-rich-rule='rule family=\"ipv4\" source address=\"%s\" port protocol=\"tcp\" port=\"%d\" accept'",
"删除放开规则":"firewall-cmd --permanent --remove-rich-rule='rule family=\"ipv4\" source address=\"%s\" port protocol=\"tcp\" port=\"%d\" accept'",
"重新载入规则":"firewall-cmd --reload",
"新建数据表_规则表":"""
create table filewall_rule(
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip_data varchar(200) not null,
port_data unsigned int not null,
create_time unsigned int not null,
out_time unsigned int not null,
enable_status unsigned int not null
)
""",
"新建数据表_权限表":"""
create table filewall_user(
user_id varchar(200) PRIMARY KEY not null,
power_id unsigned int not null,
create_time unsigned int not null,
enable_status unsigned int not null
)
""",
"增加规则数据":"insert into filewall_rule(ip_data,port_data,create_time,out_time,enable_status) values (?,?,?,?,?);",
"删除规则数据":"update filewall_rule set enable_status=0 where ip_data=? and port_data=?;",
"删除规则数据_id":"update filewall_rule set enable_status=0 where id=?;",
"检查哪些数据过期":"select id,ip_data,port_data,create_time,out_time from filewall_rule where enable_status=1 and out_time50:
[False,"时间范围超限"]
except Exception:
return [False,"时间格式错误"]
self.add_rule_sql(ip,port,time_data)
self.add_rule_os(ip,port)
return [True,""]
def rm_rule(self,ip:str,port:int,id=-1):
"""
删除某一规则
"""
ip=str(ip)
is_pass=True
if self.is_valid_ip(ip)==False:
return [False,"ip格式错误"]
try:
port=int(port)
except Exception:
return [False,"端口格式错误"]
try:
if id==-1:
self.rm_rule_sql(ip,port)
else:
self.rm_rule_sql_id(id)
except Exception as err:
print(err)
answer=self.rm_rule_os(ip,port)
if is_pass==False:
return [False,"sql操作错误"]
return [answer,""]
def check_data_sql(self):
"""
检查哪些数据过期,并删除
"""
time_now=int(time.time())
self.cursor.execute(self.__base_order["检查哪些数据过期"],(time_now,))
data=self.cursor.fetchall()
self.conn.commit()
all_data=[]
for now in data:
answer=self.rm_rule(ip=now[1],port=now[2],id=now[0])
all_data.append(
{
"id":now[0],
"ip":now[1],
"port":now[2],
"create_time":now[3],
"out_time":now[4],
"answer":answer
}
)
print(all_data)
return all_data
def send_word(self,word):
"""
向钉钉机器人发送消息
"""
header={
"Content-Type":"application/json"
}
data={"msgtype": "text","text": {"content":word}}
for i in range(0,3):
try:
r=requests.post(url=self.robot_url,headers=header,timeout=60,json=data)
break
except Exception as err:
print(err)
def check_data(self):
"""
检查数据是否超时
"""
while True:
time.sleep(10)
data=self.check_data_sql()
if len(data)>0:
for rm_data in data:
nid=rm_data["id"]
ip=rm_data["ip"]
port=rm_data["port"]
create_time=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(rm_data["create_time"]))))
out_time=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(rm_data["out_time"]))))
answer=str(rm_data["answer"])
word=f"""
任务超时删除:
任务ID:{nid}
ip:{ip}
port:{port}
创建时间:{create_time}
超时时间:{out_time}
删除结果:{answer}
"""
self.send_word(word=word)
def check_power(self,user_id:str,power_id):
"""
检查用户是否拥有此权限
"""
self.cursor.execute(self.__base_order["查询权限表数据"],(user_id,))
data=self.cursor.fetchall()
self.conn.commit()
is_pass=False
word=""
for user_data in data:
if user_id==user_data[0]:
if int(user_data[1])
五、附件
钉钉机器人控制防火墙工具.zip
(4.45 KB, 下载次数: 2)
2023-2-17 12:50 上传
点击文件名下载附件
下载积分: 吾爱币 -1 CB