import urllib.parse
import random
import string
import json
import time
import ssl
import gzip
from io import BytesIO
def generate_random_username(length=7):
"""生成7位随机数字用户名"""
return ''.join(random.choices('0123456789', k=length))
def generate_random_password(length=6):
"""生成6位小写字母+数字密码"""
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
def generate_random_command(length=6):
"""生成6位随机数字command"""
return ''.join(random.choices('0123456789', k=length))
def register_account(token, device_id, recommend_code, api_url):
"""发送注册请求并处理压缩响应"""
username = generate_random_username()
password = generate_random_password()
command = generate_random_command()
# 构造请求体
payload = {
"command": command,
"password": password,
"recommend": recommend_code,
"userName": username
}
json_data = json.dumps(payload).encode('utf-8')
# 构造请求头(包含gzip支持)
headers = {
"recommend": recommend_code,
"Cache-Control": "no-cache",
"token": token,
"deviceId": device_id,
"client": "app",
"deviceType": "Android",
"Content-Type": "application/json; charset=UTF-8",
"Host": "ldys.sq1005.top",
"Connection": "Keep-Alive",
"Accept-Encoding": "gzip",
"User-Agent": "okhttp/4.12.0"
}
try:
# 处理HTTPS请求
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
request = urllib.request.Request(
api_url,
data=json_data,
headers=headers,
method="POST"
)
with urllib.request.urlopen(request, context=context, timeout=15) as response:
status_code = response.getcode()
content_encoding = response.getheader('Content-Encoding', '').lower()
raw_data = response.read()
# 处理gzip压缩响应
if content_encoding == 'gzip':
try:
with gzip.GzipFile(fileobj=BytesIO(raw_data)) as gzip_f:
response_data = gzip_f.read().decode('utf-8')
except:
response_data = raw_data[:100].decode('utf-8', errors='replace')
else:
response_data = raw_data.decode('utf-8', errors='replace')
if status_code == 200:
try:
result = json.loads(response_data)
return {
"success": True,
"username": username,
"password": password,
"command": command,
"response": result
}
except json.JSONDecodeError:
return {
"success": False,
"error": f"JSON解析失败: {response_data[:50]}...",
"username": username,
"password": password,
"command": command,
"raw_data": response_data
}
else:
return {
"success": False,
"error": f"HTTP错误 {status_code}: {response_data[:50]}...",
"username": username,
"password": password,
"command": command,
"raw_data": response_data
}
except Exception as e:
return {
"success": False,
"error": f"请求异常: {str(e)}",
"username": username,
"password": password,
"command": command
}
def main():
# 配置信息(请根据实际情况修改)
token = "vprZRV2dq1ehB/vr84O2yojcT3DVjCI0ibsZ2nYWL+E="
device_id = "97f85c93f17c66d6"
recommend_code = "jWELhX"
api_url = "http://ldys.sq1005.top/api/v1/app/user/register"
print(f"=== 注册工具(gzip修复版)===")
print(f"token: {token[:8]}...{token[-8:]}")
print(f"deviceId: {device_id}")
print(f"推荐码: {recommend_code}")
print(f"API: {api_url}")
# 单次测试(先测试1次)
print("\n先进行1次注册测试...")
count = 1
result = register_account(token, device_id, recommend_code, api_url)
# 显示测试结果
if result["success"]:
print("\n✅ 测试注册成功!")
print(f"用户名: {result['username']}")
print(f"密码: {result['password']}")
print(f"响应: {json.dumps(result['response'], ensure_ascii=False, indent=2)}")
else:
print("\n❌ 测试注册失败!")
print(f"用户名: {result['username']}")
print(f"错误: {result['error']}")
if 'raw_data' in result:
print(f"原始响应: {result['raw_data'][:200]}...")
# 询问是否继续批量注册
while True:
choice = input("\n测试完成,是否继续批量注册?(y/n): ").lower()
if choice != 'y':
print("程序结束。")
return
try:
count = int(input("请输入注册次数 (1-30): "))
if count 30:
print("次数应在1-30之间,避免触发防刷!")
continue
break
except:
print("请输入有效数字!")
# 批量注册
success_list = []
fail_list = []
print(f"\n开始注册 {count} 个账号,每注册间隔2秒...")
for i in range(1, count + 1):
print(f"\n--- 注册第 {i}/{count} 个账号 ---")
result = register_account(token, device_id, recommend_code, api_url)
if result["success"]:
success_list.append({
"username": result["username"],
"password": result["password"],
"command": result["command"]
})
print(f"✅ 注册成功!")
print(f"用户名: {result['username']}")
print(f"密码: {result['password']}")
print(f"command: {result['command']}")
print(f"响应: {result['response'].get('msg', '无消息')}")
else:
fail_list.append({
"username": result["username"],
"password": result["password"],
"command": result["command"],
"error": result["error"]
})
print(f"❌ 注册失败!")
print(f"用户名: {result['username']}")
print(f"密码: {result['password']}")
print(f"command: {result['command']}")
print(f"错误: {result['error']}")
# 显示进度
progress = int(i / count * 20)
print(f"进度: [{'#' * progress}{'-' * (20 - progress)}] {i}/{count}")
time.sleep(2)
# 结果统计
print("\n=== 注册结果汇总 ===")
print(f"总注册: {count} 个")
print(f"成功: {len(success_list)} 个")
print(f"失败: {len(fail_list)} 个")
# 输出成功账号
if success_list:
print("\n=== 成功账号列表 ===")
for idx, account in enumerate(success_list, 1):
print(f"{idx}. 用户名: {account['username']} 密码: {account['password']} command: {account['command']}")
if __name__ == "__ma