具体操作步骤看另一个大佬的帖子:https://www.52pojie.cn/forum.php?mod=viewthread&tid=2039517
之前网站在请求头中新增了 time 与 Authorization 两个参数;通过分析前端 JS(关键函数为 mm() 和 insertStr())已逆向出生成 Authorization 的思路,并把对应逻辑转成了 Python 实现。
[JavaScript] 纯文本查看 复制代码function mm() {
var _0x225fa4 = _0x2bf3ed
, _0x4f8827 = {
'EUYYc': function (_0x1d2445, _0x43d351, _0x474683, _0x54342b) {
return _0x1d2445(_0x43d351, _0x474683, _0x54342b);
},
'URHSO': function (_0x47d3f7, _0x19593f) {
return _0x47d3f7(_0x19593f);
},
'flkYq': function (_0x345ed7, _0x1d1dda) {
return _0x345ed7(_0x1d1dda);
},
'encpW': function (_0x22ff4f, _0x213ac1) {
return _0x22ff4f + _0x213ac1;
},
'BeObx': function (_0x470f0c, _0x51bbf5) {
return _0x470f0c(_0x51bbf5);
},
'lyLcO': _0x225fa4(0x2a3, 'oTSr')
}
, _0x2ab54b = _0x225fa4(0x198, 'G&(C')[_0x225fa4(0x157, 'c8oo')]('|')
, _0x3fa870 = 0x0;
while (!![]) {
switch (_0x2ab54b[_0x3fa870++]) {
case '0':
key = _0x4f8827[_0x225fa4(0x219, 'S[e7')](insertStr, key, new Date(time)['getUTCHours'](), new Date(time)['getUTCMonth']());
continue;
case '1':
key = _0x4f8827['URHSO'](btoa, _0x4f8827[_0x225fa4(0x1af, 'Hb)d')](unescape, encodeURIComponent(key)));
continue;
case '2':
key = _0x4f8827[_0x225fa4(0x222, '7e*5')](insertStr, key, new Date(time)[_0x225fa4(0x29a, 'Tr4!')](), new Date(time)['getUTCHours']())[_0x225fa4(0x16d, 'w6t(')](0x7, 0x1b);
continue;
case '3':
key = _0x4f8827['URHSO'](md5, key);
continue;
case '4':
key = _0x4f8827['encpW'](_0x4f8827[_0x225fa4(0x221, 'L[1K')]($, _0x4f8827[_0x225fa4(0x187, 'Xu$a')])[_0x225fa4(0x1ea, 'rK1w')]() + _0x4f8827['encpW'](time, '')['slice'](0x8, 0xd), $(_0x225fa4(0x161, 'm]%d'))[_0x225fa4(0x209, 'M@J)')]());
continue;
}
break;
}
}
function insertStr(_0x519b42, _0x4ee216, _0x32239d) {
var _0x55487f = _0x2bf3ed
, _0xf96269 = {
'lvCuF': function(_0x2d908a, _0x28e020) {
return _0x2d908a + _0x28e020;
}
};
return key = _0xf96269['lvCuF'](_0x519b42[_0x55487f(0x185, 'DgJo')](0x0, _0x4ee216) + _0x32239d, _0x519b42['slice'](_0x4ee216)),
key;
}
对应逻辑转成了 Python 实现:
[Python] 纯文本查看 复制代码 def mm(self, xmphone: str, xmpwd: str, time_ms: int) -> str:
"""构建秘钥"""
steps = ["4", "1", "0", "2", "3"]
key = ""
utc_time = datetime.utcfromtimestamp(time_ms / 1000.0)
utc_month = utc_time.month - 1
for step in steps:
if step == "4":
time_str = str(time_ms)
key = f"{xmphone}{time_str[8:13]}{xmpwd}"
elif step == "1":
key = base64.b64encode(key.encode("utf-8")).decode("utf-8")
elif step == "0":
key = self.insert_str(key, utc_time.hour, utc_month)
elif step == "2":
key = self.insert_str(key, utc_month, utc_time.hour)[7:27]
elif step == "3":
key = hashlib.md5(key.encode("utf-8")).hexdigest()
return key
def insert_str(self, s: str, pos: int, insert_value) -> str:
s = s or ""
return s[:pos] + str(insert_value) + s[pos:]
更新后完整代码:
[Python] 纯文本查看 复制代码#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import requests
import random
import time
import json
import logging
from datetime import datetime
import base64
import hashlib
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 多账号配置
ACCOUNTS = [
{"username": "账号1", "password": "密码1"},
{"username": "账号2", "password": "密码2"},
]
# 步数范围配置
STEP_RANGES = {
8: {"min": 6000, "max": 10000},
12: {"min": 8000, "max": 14000},
16: {"min": 10000, "max": 18000},
20: {"min": 12000, "max": 22000},
22: {"min": 15000, "max": 24000}
}
# 默认步数(当不在指定时间段时使用)
DEFAULT_STEPS = 24465
class StepSubmitter:
def __init__(self):
self.session = requests.Session()
# 设置浏览器般的请求头
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.7339.128 Safari/537.36',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Origin': 'https://m.cqzz.top',
'Referer': 'https://m.cqzz.top/',
'X-Requested-With': 'XMLHttpRequest'
}
self.base_url = 'https://wzz.wangzouzou.com/motion/api/motion/Xiaomi'
def get_current_steps(self):
"""根据当前时间获取对应的步数范围"""
current_hour = datetime.now().hour
logger.info(f"当前时间: {datetime.now()}, 小时: {current_hour}")
# 找到最接近的配置时间段
closest_hour = None
min_diff = float('inf')
for hour in STEP_RANGES.keys():
diff = abs(current_hour - hour)
if diff str:
s = s or ""
return s[:pos] + str(insert_value) + s[pos:]
def mm(self, xmphone: str, xmpwd: str, time_ms: int) -> str:
"""构建秘钥"""
steps = ["4", "1", "0", "2", "3"]
key = ""
utc_time = datetime.utcfromtimestamp(time_ms / 1000.0)
utc_month = utc_time.month - 1
for step in steps:
if step == "4":
time_str = str(time_ms)
key = f"{xmphone}{time_str[8:13]}{xmpwd}"
elif step == "1":
key = base64.b64encode(key.encode("utf-8")).decode("utf-8")
elif step == "0":
key = self.insert_str(key, utc_time.hour, utc_month)
elif step == "2":
key = self.insert_str(key, utc_month, utc_time.hour)[7:27]
elif step == "3":
key = hashlib.md5(key.encode("utf-8")).hexdigest()
return key
def submit_steps(self, username, password, steps):
"""提交步数到服务器"""
try:
# 先验证凭证格式
is_valid, message = self.validate_credentials(username, password)
if not is_valid:
return False, f"验证失败: {message}"
# 准备请求数据
data = {
'phone': username,
'pwd': password,
'num': steps
}
logger.info(f"准备提交 - 账号: {username}, 步数: {steps}")
time_val = int(time.time() * 1000)
self.headers["time"] = str(time_val)
self.headers["Authorization"] = self.mm(username, password, time_val)
# 发送请求
response = self.session.post(
self.base_url,
data=data,
headers=self.headers,
timeout=30,
proxies={"http": None, "https": None}
)
# 解析响应
if response.status_code == 200:
result = response.json()
if result.get('code') == 200:
return True, f"提交成功! 步数: {steps}"
else:
error_msg = result.get('data', '未知错误')
# 处理频繁提交的情况
if '频繁' in error_msg:
return False, "提交过于频繁,请稍后再试"
else:
return False, f"提交失败: {error_msg}"
else:
return False, f"HTTP错误: {response.status_code}"
except requests.exceptions.RequestException as e:
return False, f"网络请求错误: {str(e)}"
except json.JSONDecodeError:
return False, "响应解析错误"
except Exception as e:
return False, f"未知错误: {str(e)}"
def run(self):
"""主执行函数"""
logger.info("开始执行步数提交任务")
logger.info(f"共有 {len(ACCOUNTS)} 个账号需要处理")
success_count = 0
fail_count = 0
for i, account in enumerate(ACCOUNTS, 1):
logger.info(f"处理第 {i}/{len(ACCOUNTS)} 个账号: {account['username']}")
try:
# 获取当前应提交的步数
steps = self.get_current_steps()
# 提交步数
success, message = self.submit_steps(
account['username'],
account['password'],
steps
)
if success:
success_count += 1
logger.info(f"✓ 账号 {account['username']} - {message}")
else:
fail_count += 1
logger.error(f"✗ 账号 {account['username']} - {message}")
except Exception as e:
fail_count += 1
logger.error(f"✗ 账号 {account['username']} - 处理异常: {str(e)}")
# 账号间间隔(最后一个账号不需要等待)
if i

