个人维护的腾讯云清量应用服务器,对于安全性来说,只能自己来维护,比如说SSH,或是说RDP服务,如果实在运维阶段,不得不开启,但是服务正常运行,就不需要对外开放,每次维护都需要登录到云厂商的控制台进行修改,当然会有大佬会使用API,或是说封装这个工具,我这里提供一个自己也在使用的工具
项目架构
主体部分
python:3.x
插件(包)
无额外模块
代码
注意: secret_id,secret_key 需要自行申请:https://console.cloud.tencent.com/cam/capi
region:区域
InstanceId:实例ID
区域两个可以在https://console.cloud.tencent.com/api/explorer?Product=lighthouse&Version=2020-03-24&Action=CreateFirewallRules 地址进行查阅,需要后面英文部分, 如ap-beijing
image.png (22.54 KB, 下载次数: 2)
下载附件
2024-5-22 17:25 上传
InstanceId: 在这里
image.png (83.64 KB, 下载次数: 2)
下载附件
2024-5-22 17:28 上传
import requests
import hashlib, hmac
import time
from datetime import datetime
import json
secret_id = "*******"
secret_key = "*******"
service = "lighthouse"
host = "lighthouse.tencentcloudapi.com"
endpoint = "https://" + host
region = "*******"
version = "2017-03-12"
algorithm = "TC3-HMAC-SHA256"
InstanceId = "*******"
def generate_auth(params):
timestamp = int(time.time())
date = datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d")
# ************* 步骤 1:拼接规范请求串 *************
http_request_method = "POST"
canonical_uri = "/"
canonical_querystring = ""
ct = "application/json; charset=utf-8"
payload = json.dumps(params)
canonical_headers = "content-type:%s\nhost:%s\n" % (ct, host)
signed_headers = "content-type;host"
hashed_request_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest()
canonical_request = (http_request_method + "\n" +
canonical_uri + "\n" +
canonical_querystring + "\n" +
canonical_headers + "\n" +
signed_headers + "\n" +
hashed_request_payload)
# ************* 步骤 2:拼接待签名字符串 *************
credential_scope = date + "/" + service + "/" + "tc3_request"
hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
string_to_sign = (algorithm + "\n" +
str(timestamp) + "\n" +
credential_scope + "\n" +
hashed_canonical_request)
# ************* 步骤 3:计算签名 *************
# 计算签名摘要函数
def sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
secret_date = sign(("TC3" + secret_key).encode("utf-8"), date)
secret_service = sign(secret_date, service)
secret_signing = sign(secret_service, "tc3_request")
signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
# ************* 步骤 4:拼接 Authorization *************
authorization = (algorithm + " " +
"Credential=" + secret_id + "/" + credential_scope + ", " +
"SignedHeaders=" + signed_headers + ", " +
"Signature=" + signature)
return authorization
def block(port, desc):
param = {
"InstanceId": InstanceId,
"FirewallRules": [
{
"Protocol": "TCP",
"Port": port,
"CidrBlock": '0.0.0.0/0',
"Action": "DROP",
"FirewallRuleDescription": desc
}
],
}
post = requests.post(endpoint, headers={"Content-Type": "application/json; charset=utf-8",
"Authorization": generate_auth(param), "Host": host,
"X-TC-Action": "CreateFirewallRules", "X-TC-Version": "2020-03-24",
"X-TC-Timestamp": str(int(time.time())), "X-TC-Region": region},
data=json.dumps(param))
return post
def delBlock(port):
param = {
"InstanceId": InstanceId,
"FirewallRules": [
{
"Protocol": "TCP",
"Port": port,
"CidrBlock": "0.0.0.0/0",
"Action": "DROP",
}
],
}
post = requests.post(endpoint, headers={"Content-Type": "application/json; charset=utf-8",
"Authorization": generate_auth(param), "Host": host,
"X-TC-Action": "DeleteFirewallRules", "X-TC-Version": "2020-03-24",
"X-TC-Timestamp": str(int(time.time())), "X-TC-Region": region},
data=json.dumps(param))
return post
def getAllBlock(remark):
rules = []
param = {
"InstanceId": InstanceId,
"Offset": 0,
"Limit": 100
}
while True:
post = requests.post(endpoint, headers={"Content-Type": "application/json; charset=utf-8",
"Authorization": generate_auth(param), "Host": host,
"X-TC-Action": "DescribeFirewallRules", "X-TC-Version": "2020-03-24",
"X-TC-Timestamp": str(int(time.time())), "X-TC-Region": region},
data=json.dumps(param))
post_json = post.json()
for i in post_json['Response']['FirewallRuleSet']:
if str(i['FirewallRuleDescription']).startswith(remark):
rules.append(i)
if param['Offset'] > post_json['Response']['TotalCount']:
break
else:
param['Offset'] += param['Limit']
return rules
def deal(text, port, oper):
if text == 'unknown':
return "[×] 端口未定义在程序中"
text = text.text
print(text)
if text.find("have already existed.") > -1:
return f"[×] 已经存在此[{oper}]规则了。"
elif text.find("are not found.") > -1:
return "[×] 不存在此规则。"
else:
return f"[√] 端口 [{port}] [{oper}]成功"
def cmdhelp():
print(f" 腾讯云私人防火墙配置终端")
print(f" 腾讯云实例ID:{InstanceId}, 地域号:{region}")
print(" [>] 参考命令:block/allow [port/sName] 阻塞/放行 访问端口")
print(" [>] 参考命令:show 显示服务端口被封禁情况")
print(" [>] 参考命令:custom 显示当前IP被封禁情况")
print(" [>] 参考命令:help 显示帮助")
print(" [>] 参考命令:exit/quit 退出服务")
if __name__ == "__main__":
cmdhelp()
print("[#] 当前被阻塞的服务如下所示")
rules = getAllBlock("服务-")
for i in rules:
print(f"[#] {i}")
while True:
s = input("[$] 阁下的意图是? ")
s = s.strip().lower()
if s.startswith("block"):
s = s[5:].strip()
if s == "3306" or s == 'mysql':
p = block('3306', "服务-MySQL")
elif s == "6379" or s == 'redis':
p = block('6379', "服务-Redis")
elif s == "3389" or s == 'rdp':
p = block('3389', "服务-RDP")
else:
p = "unknown"
print(" " + deal(p, s, '阻塞'))
elif s.startswith("allow"):
s = s[5:].strip()
if s == "3306" or s == 'mysql':
p = delBlock('3306')
elif s == "6379" or s == 'redis':
p = delBlock('6379')
elif s == "3389" or s == 'rdp':
p = delBlock('3389')
else:
p = "unknown"
print(" " + deal(p, s, '放行'))
elif s == 'show':
print("[#] 当前被阻塞的服务如下所示")
rules = getAllBlock("服务-")
for i in rules:
print(f"[#] {i}")
elif s == 'exit' or s == 'quit':
exit(0)
elif s == 'help':
cmdhelp()
elif s == 'custom':
print("[#] 当前被阻塞的ip如下所示")
rules = getAllBlock("个性化")
for i in rules:
print(f"[#] {i}")
else:
print(" [X] 未定义的操作符")
使用截图
image.png (491.09 KB, 下载次数: 2)
下载附件
2024-5-22 17:35 上传
如果板块放置不对,感谢审核调整,上一个帖子感谢hmily调整板块