腾讯云轻量应用服务器CLI防火墙管理工具

查看 131|回复 9
作者:zunmx   
项目介绍
个人维护的腾讯云清量应用服务器,对于安全性来说,只能自己来维护,比如说SSH,或是说RDP服务,如果实在运维阶段,不得不开启,但是服务正常运行,就不需要对外开放,每次维护都需要登录到云厂商的控制台进行修改,当然会有大佬会使用API,或是说封装这个工具,我这里提供一个自己也在使用的工具
项目架构
主体部分
python:3.x
插件(包)
无额外模块
代码
注意: secret_id,secret_key 需要自行申请:https://console.cloud.tencent.com/cam/capi
region:区域
InstanceId:实例ID
区域两个可以在https://console.cloud.tencent.com/api/explorer?Product=lighthouse&Version=2020-03-24&Action=CreateFirewallRules 地址进行查阅,需要后面英文部分, 如ap-beijing


image.png (22.54 KB, 下载次数: 2)
下载附件
2024-5-22 17:25 上传

InstanceId: 在这里


image.png (83.64 KB, 下载次数: 2)
下载附件
2024-5-22 17:28 上传

import requests
import hashlib, hmac
import time
from datetime import datetime
import json
secret_id = "*******"
secret_key = "*******"
service = "lighthouse"
host = "lighthouse.tencentcloudapi.com"
endpoint = "https://" + host
region = "*******"
version = "2017-03-12"
algorithm = "TC3-HMAC-SHA256"
InstanceId = "*******"
def generate_auth(params):
    timestamp = int(time.time())
    date = datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d")
    # ************* 步骤 1:拼接规范请求串 *************
    http_request_method = "POST"
    canonical_uri = "/"
    canonical_querystring = ""
    ct = "application/json; charset=utf-8"
    payload = json.dumps(params)
    canonical_headers = "content-type:%s\nhost:%s\n" % (ct, host)
    signed_headers = "content-type;host"
    hashed_request_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest()
    canonical_request = (http_request_method + "\n" +
                         canonical_uri + "\n" +
                         canonical_querystring + "\n" +
                         canonical_headers + "\n" +
                         signed_headers + "\n" +
                         hashed_request_payload)
    # ************* 步骤 2:拼接待签名字符串 *************
    credential_scope = date + "/" + service + "/" + "tc3_request"
    hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
    string_to_sign = (algorithm + "\n" +
                      str(timestamp) + "\n" +
                      credential_scope + "\n" +
                      hashed_canonical_request)
    # ************* 步骤 3:计算签名 *************
    # 计算签名摘要函数
    def sign(key, msg):
        return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
    secret_date = sign(("TC3" + secret_key).encode("utf-8"), date)
    secret_service = sign(secret_date, service)
    secret_signing = sign(secret_service, "tc3_request")
    signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
    # ************* 步骤 4:拼接 Authorization *************
    authorization = (algorithm + " " +
                     "Credential=" + secret_id + "/" + credential_scope + ", " +
                     "SignedHeaders=" + signed_headers + ", " +
                     "Signature=" + signature)
    return authorization
def block(port, desc):
    param = {
        "InstanceId": InstanceId,
        "FirewallRules": [
            {
                "Protocol": "TCP",
                "Port": port,
                "CidrBlock": '0.0.0.0/0',
                "Action": "DROP",
                "FirewallRuleDescription": desc
            }
        ],
    }
    post = requests.post(endpoint, headers={"Content-Type": "application/json; charset=utf-8",
                                            "Authorization": generate_auth(param), "Host": host,
                                            "X-TC-Action": "CreateFirewallRules", "X-TC-Version": "2020-03-24",
                                            "X-TC-Timestamp": str(int(time.time())), "X-TC-Region": region},
                         data=json.dumps(param))
    return post
def delBlock(port):
    param = {
        "InstanceId": InstanceId,
        "FirewallRules": [
            {
                "Protocol": "TCP",
                "Port": port,
                "CidrBlock": "0.0.0.0/0",
                "Action": "DROP",
            }
        ],
    }
    post = requests.post(endpoint, headers={"Content-Type": "application/json; charset=utf-8",
                                            "Authorization": generate_auth(param), "Host": host,
                                            "X-TC-Action": "DeleteFirewallRules", "X-TC-Version": "2020-03-24",
                                            "X-TC-Timestamp": str(int(time.time())), "X-TC-Region": region},
                         data=json.dumps(param))
    return post
def getAllBlock(remark):
    rules = []
    param = {
        "InstanceId": InstanceId,
        "Offset": 0,
        "Limit": 100
    }
    while True:
        post = requests.post(endpoint, headers={"Content-Type": "application/json; charset=utf-8",
                                                "Authorization": generate_auth(param), "Host": host,
                                                "X-TC-Action": "DescribeFirewallRules", "X-TC-Version": "2020-03-24",
                                                "X-TC-Timestamp": str(int(time.time())), "X-TC-Region": region},
                             data=json.dumps(param))
        post_json = post.json()
        for i in post_json['Response']['FirewallRuleSet']:
            if str(i['FirewallRuleDescription']).startswith(remark):
                rules.append(i)
        if param['Offset'] > post_json['Response']['TotalCount']:
            break
        else:
            param['Offset'] += param['Limit']
    return rules
def deal(text, port, oper):
    if text == 'unknown':
        return "[×] 端口未定义在程序中"
    text = text.text
    print(text)
    if text.find("have already existed.") > -1:
        return f"[×] 已经存在此[{oper}]规则了。"
    elif text.find("are not found.") > -1:
        return "[×] 不存在此规则。"
    else:
        return f"[√] 端口 [{port}] [{oper}]成功"
def cmdhelp():
    print(f" 腾讯云私人防火墙配置终端")
    print(f" 腾讯云实例ID:{InstanceId}, 地域号:{region}")
    print("  [>] 参考命令:block/allow [port/sName] 阻塞/放行 访问端口")
    print("  [>] 参考命令:show 显示服务端口被封禁情况")
    print("  [>] 参考命令:custom 显示当前IP被封禁情况")
    print("  [>] 参考命令:help 显示帮助")
    print("  [>] 参考命令:exit/quit 退出服务")
if __name__ == "__main__":
    cmdhelp()
    print("[#] 当前被阻塞的服务如下所示")
    rules = getAllBlock("服务-")
    for i in rules:
        print(f"[#] {i}")
    while True:
        s = input("[$] 阁下的意图是? ")
        s = s.strip().lower()
        if s.startswith("block"):
            s = s[5:].strip()
            if s == "3306" or s == 'mysql':
                p = block('3306', "服务-MySQL")
            elif s == "6379" or s == 'redis':
                p = block('6379', "服务-Redis")
            elif s == "3389" or s == 'rdp':
                p = block('3389', "服务-RDP")
            else:
                p = "unknown"
            print("  " + deal(p, s, '阻塞'))
        elif s.startswith("allow"):
            s = s[5:].strip()
            if s == "3306" or s == 'mysql':
                p = delBlock('3306')
            elif s == "6379" or s == 'redis':
                p = delBlock('6379')
            elif s == "3389" or s == 'rdp':
                p = delBlock('3389')
            else:
                p = "unknown"
            print("  " + deal(p, s, '放行'))
        elif s == 'show':
            print("[#] 当前被阻塞的服务如下所示")
            rules = getAllBlock("服务-")
            for i in rules:
                print(f"[#] {i}")
        elif s == 'exit' or s == 'quit':
            exit(0)
        elif s == 'help':
            cmdhelp()
        elif s == 'custom':
            print("[#] 当前被阻塞的ip如下所示")
            rules = getAllBlock("个性化")
            for i in rules:
                print(f"[#] {i}")
        else:
            print("  [X] 未定义的操作符")
使用截图


image.png (491.09 KB, 下载次数: 2)
下载附件
2024-5-22 17:35 上传

如果板块放置不对,感谢审核调整,上一个帖子感谢hmily调整板块

腾讯, 下载次数

柒呀柒   

谢谢分享, 我每次都是扫码登录去放行端口,现在赶紧抄作业了。
zunmx
OP
  


柒呀柒 发表于 2024-5-22 17:54
谢谢分享, 我每次都是扫码登录去放行端口,现在赶紧抄作业了。

这个代码写了好久了,其实有些是可以优化的,
实际上delBlock是删除防火墙规则
block是添加防火墙规则
样例中输入blockrdp和block3389是相同效果,其实这里写的有些冗余了,可以封装一下的。
小三丶   

学到了,谢谢分享
zunmx
OP
  

忘记custom命令的解释了,因为我自己搭建了一个蜜罐系统,触发危险策略的话,就会封禁那个IP,封禁的原理也是通过云厂商的API添加防火墙规则,所以custom实际上是查看蜜罐封禁的IP,蜜罐系统通过记录的时间,对IP进行指定时间的封锁,到期后是会解封的。因为云厂商的防火墙规则是有最高数量限制的,如果您有兴趣,也可自己搭建一个蜜罐系统,触发防火墙规则,如果不需要,可以删除那部分代码。
conancheng   

感谢分享,不过个人还是习惯装个宝塔免费版,就完事了
zunmx
OP
  


conancheng 发表于 2024-5-23 09:49
感谢分享,不过个人还是习惯装个宝塔免费版,就完事了

上次用宝塔不记得多久了,通过系统级防火墙和云厂商的防火墙是存在一定的去别的。
比如说一个端口的请求会先经过云厂商的防火墙,再到主机的防火墙,如果通过云厂商的防火墙进行设置,那么请求就到不了主机,一定程度上也降低了服务器的负载。
比如说一个敏感端口,如ssh端口,只需要在运维的时候进行访问,如果不需要,就可以阻塞,这样也提高了服务器的安全性。
conancheng   


zunmx 发表于 2024-5-23 10:13
上次用宝塔不记得多久了,通过系统级防火墙和云厂商的防火墙是存在一定的去别的。
比如说一个端口的请求 ...

宝塔支持系统级防火墙配置和云厂商防火墙一键放行,你可以试试,挺简单的
zunmx
OP
  


conancheng 发表于 2024-5-23 12:02
宝塔支持系统级防火墙配置和云厂商防火墙一键放行,你可以试试,挺简单的

好久没用过了,变得这么强了,有时间试试
Bob5230   

谢谢分享!@
您需要登录后才可以回帖 登录 | 立即注册