linux防火墙firewall与钉钉机器人联动

查看 73|回复 5
作者:天域至尊   
一、项目背景
在日常使用中,我们常常希望动态远程操作防火墙,以保证风险应用的安全,此项目实现了使用钉钉机器人来远程控制Linux应用防火墙。可以在钉钉群以@机器人的方式,来实现控制防火墙开关,支持权限控制、定时关闭。
二、依赖环境
运行环境:Linux (测试系统为centos7)
防火墙:firewall,并处在运行状态。
机器人:钉钉机器人,并开启outgoing功能(该功能现在处在维护状态,新申请的机器人无法打开)
开发环境:python 3.7+ flask requests sqlite3
三、环境部署
启动firewalld
[Bash shell] 纯文本查看 复制代码sodu systemctl start firewalld
放行服务端口(自己指定端口号,应与下述main.py文件中,第16行端口保持一致)
[Bash shell] 纯文本查看 复制代码firewall-cmd --zone=public --add-port=你的端口/tcp --permanent
刷新防火墙
[Bash shell] 纯文本查看 复制代码firewall-cmd --reload
配置脚本内容,并运行脚本
[Bash shell] 纯文本查看 复制代码python3 main.py
查看提示信息


66.png (50.2 KB, 下载次数: 0)
下载附件
2023-2-17 12:40 上传

如果是云服务器,则在云服务器安全组上放开对应端口
访问提示站点尝试,如下则是正常


77.png (28.68 KB, 下载次数: 0)
下载附件
2023-2-17 12:41 上传

增加钉钉机器人,勾选outgoing,输入对应的提示信息


88.png (51.5 KB, 下载次数: 0)
下载附件
2023-2-17 12:43 上传

该功能现属于维护阶段,如放开使用,则如下


99.png (32.09 KB, 下载次数: 0)
下载附件
2023-2-17 12:44 上传

配置完成后,@机器人,查询帮助信息,如正常响应,则配置完成。


100.png (49.41 KB, 下载次数: 0)
下载附件
2023-2-17 12:45 上传

四、源代码
代码结构:
在同一文件夹下,两个py文件,分别为:firewall.py和main.py


101.png (3.62 KB, 下载次数: 0)
下载附件
2023-2-17 12:47 上传

mian.py代码如下(其中8-16行,都需要自己配置
[Python] 纯文本查看 复制代码from flask import Flask
import requests,json,ipaddress,time,threading,json
from flask import request
from firewall import Firewall
app = Flask(__name__)
#机器人声明的token
TOKEN="123"
#设置机器人地址
robot_url="https://oapi.dingtalk.com/robot/send?access_token=*******"
#本机url_key,建议设置为复杂的key,不要出现特殊符号
url_key="xjcbkasbasba"
#本机公网地址
ip="127.0.0.1"
#本服务监听端口
port="8080"
control_url='/control/'+url_key
control_url_out=f"http://{ip}:{port}{control_url}"
print(f"""
请在您的钉钉机器人的Outgoing配置中输入如下信息:
post地址:{control_url_out}
Token:{TOKEN}
""")
fire_wall=Firewall(robot_url=robot_url)
fire_wall_check=threading.Thread(target=fire_wall.check_data)
fire_wall_check.start()
def is_valid_ip(ip:str):
    try:
        ipaddress.ip_address(ip)
        return True
    except ValueError:
        return False
def send_word(word):
    """
    """
    global robot_url
    header={
        "Content-Type":"application/json"
    }
    data={"msgtype": "text","text": {"content":word}}
    for i in range(0,3):
        try:
            r=requests.post(url=robot_url,headers=header,timeout=60,json=data)
            break
        except Exception as err:
            print(err)
@app.after_request
def modify_server_header(response):
    response.headers['Server'] = 'Windows Server 8.0/IIS 10.0'
    return response
#路由
@app.route(control_url,methods=['POST'])
def add_data():
    word="命令识别失败,发送“帮助”,查看命令格式。"
    value = request.headers.get("token")
    if TOKEN!=value:
        return "error power not allow!"
    data=request.json
    if data:
        user_id=data["senderId"]
        text=data["text"]["content"]
        text=str(text).replace("\t","").replace("  "," ")
        while "  " in text:
            text=text.replace("  ","")
        if text[0]==" ":
            text=text[1:]
        text=text.split(" ")
        if len(text)>0:
            if "帮助" in text[0]:
                word="""
帮助信息:
    1.查看帮助信息 @机器人 帮助
    2.查看自己的id @机器人 查询id
    3.增加id @机器人 增加id 权限 id值
        权限分为:1-管理员 可以增加id,增删放行规则
                 2-用户 可以增删放行规则
            eg:@机器人 增加id 2 $=cndjbaxba
    4.删除id @机器人 删除id id值
            eg:@机器人 删除id $=cndjbaxba
    5.增加放行规则 @机器人 放行 源ip地址 目的端口 放行时间
        eg:@机器人 放行 127.0.0.1 8080 10
    6.删除放行规则 @机器人 删除 源ip地址 目的端口
        eg:@机器人 删除 127.0.0.1 8080
                """
            elif "查询id" in text[0]:
                word="您的ID是:"+str(user_id)
            elif "放行" in text[0]:
                if len(text)
firewall.py代码如下
[Python] 纯文本查看 复制代码import os,time,json
import ipaddress
import sqlite3
import requests
class Firewall(object):
    def __init__(self,robot_url) -> None:
        self.robot_url=robot_url
        self.__base_order={
            "增加放开规则":"firewall-cmd --permanent --add-rich-rule='rule family=\"ipv4\" source address=\"%s\" port protocol=\"tcp\" port=\"%d\" accept'",
            "删除放开规则":"firewall-cmd --permanent --remove-rich-rule='rule family=\"ipv4\" source address=\"%s\" port protocol=\"tcp\" port=\"%d\" accept'",
            "重新载入规则":"firewall-cmd --reload",
            "新建数据表_规则表":"""
                create table filewall_rule(
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    ip_data varchar(200) not null,
                    port_data unsigned int not null,
                    create_time unsigned int not null,
                    out_time unsigned int not null,
                    enable_status unsigned int not null
                )
            """,
            "新建数据表_权限表":"""
                create table filewall_user(
                    user_id varchar(200) PRIMARY KEY not null,
                    power_id unsigned int not null,
                    create_time unsigned int not null,
                    enable_status unsigned int not null
                )
            """,
            "增加规则数据":"insert into filewall_rule(ip_data,port_data,create_time,out_time,enable_status) values (?,?,?,?,?);",
            "删除规则数据":"update filewall_rule set enable_status=0 where ip_data=? and port_data=?;",
            "删除规则数据_id":"update filewall_rule set enable_status=0 where id=?;",
            "检查哪些数据过期":"select id,ip_data,port_data,create_time,out_time from filewall_rule where enable_status=1 and out_time50:
                [False,"时间范围超限"]
        except Exception:
            return [False,"时间格式错误"]
        
        self.add_rule_sql(ip,port,time_data)
        self.add_rule_os(ip,port)
        return [True,""]
   
    def rm_rule(self,ip:str,port:int,id=-1):
        """
        删除某一规则
        """
        ip=str(ip)
        is_pass=True
        if self.is_valid_ip(ip)==False:
            return [False,"ip格式错误"]
        try:
            port=int(port)
        except Exception:
            return [False,"端口格式错误"]
        try:
            if id==-1:
                self.rm_rule_sql(ip,port)
            else:
                self.rm_rule_sql_id(id)
        except Exception as err:
            print(err)
        answer=self.rm_rule_os(ip,port)
        if is_pass==False:
            return [False,"sql操作错误"]
        return [answer,""]
   
    def check_data_sql(self):
        """
        检查哪些数据过期,并删除
        """
        time_now=int(time.time())
        self.cursor.execute(self.__base_order["检查哪些数据过期"],(time_now,))
        data=self.cursor.fetchall()
        self.conn.commit()
        all_data=[]
        for now in data:
            answer=self.rm_rule(ip=now[1],port=now[2],id=now[0])
            all_data.append(
                    {
                        "id":now[0],
                        "ip":now[1],
                        "port":now[2],
                        "create_time":now[3],
                        "out_time":now[4],
                        "answer":answer
                    }
                )
            print(all_data)
        return all_data
   
    def send_word(self,word):
        """
        向钉钉机器人发送消息
        """
        header={
            "Content-Type":"application/json"
        }
        data={"msgtype": "text","text": {"content":word}}
        for i in range(0,3):
            try:
                r=requests.post(url=self.robot_url,headers=header,timeout=60,json=data)
                break
            except Exception as err:
                print(err)
    def check_data(self):
        """
        检查数据是否超时
        """
        while True:
            time.sleep(10)
            
            data=self.check_data_sql()
            
            if len(data)>0:
                for rm_data in data:
                    nid=rm_data["id"]
                    ip=rm_data["ip"]
                    port=rm_data["port"]
                    create_time=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(rm_data["create_time"]))))
                    out_time=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(rm_data["out_time"]))))
                    answer=str(rm_data["answer"])
                    word=f"""
任务超时删除:
    任务ID:{nid}
    ip:{ip}
    port:{port}
    创建时间:{create_time}
    超时时间:{out_time}
    删除结果:{answer}
                """
                    self.send_word(word=word)
   
    def check_power(self,user_id:str,power_id):
        """
        检查用户是否拥有此权限
        """
        self.cursor.execute(self.__base_order["查询权限表数据"],(user_id,))
        data=self.cursor.fetchall()
        self.conn.commit()
        is_pass=False
        word=""
        for user_data in data:
            if user_id==user_data[0]:
                if int(user_data[1])
五、附件

钉钉机器人控制防火墙工具.zip
(4.45 KB, 下载次数: 2)
2023-2-17 12:50 上传
点击文件名下载附件
下载积分: 吾爱币 -1 CB

机器人, 规则

jingyu2333   

感觉服务器的防火墙真的好难呀。
jingyu2333   

谁能弄个免费的服务器的防火墙?有这样的东西吗?
JuncoJet   


jingyu2333 发表于 2023-2-17 13:49
谁能弄个免费的服务器的防火墙?有这样的东西吗?

iptables
dujiu3611   

赞啊,我一直想找一个可以远程管理服务器防火墙的程序,为的就是方便偶尔的操作,感谢分享
wkdxz   

思路清晰 操作稳健  赞一个
您需要登录后才可以回帖 登录 | 立即注册

返回顶部