声明
本文章中所有内容仅供学习交流使用,不用于其他任何目的,不提供完整代码,抓包内容、敏感网址、数据接口等均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关!
目录
[ol]
目标
抓包分析
验证逻辑分析
验证逻辑跟栈
扣 js 代码
python 实现
结论
目标
● 逆向目标:获取每次下载时候验证接口的 payload
● 逆向网址:aHR0cHM6Ly9oYW93YWxscGFwZXIuY29tLw==
抓包分析
直接找到我们需要的壁纸图片,点击下载后出现验证,接着点击验证开始抓包
首先通过游览器抓包发现没有找到图片包,选择使用 reqable 进行抓包,抓包结果如下:
图片数据接口如上所示,点击小黄鸟的右边下载可以看到数据接口,此接口数据中无加密参数,其中 cookie 测试发现只需要:askId。
接着分析参数来源以及 url 来源,往上看可以看到接口:/link/common/file/getCompleteUrl/17187074423967104 返回最终 url
[/ol]
找接口/link/common/file/getCompleteUrl/17187074423967104 发现来自于,每张图片唯一标识为:/link/common/file/getCompleteUrl/后面的数字,当然可以看到此接口需要 token 和 cookie,且都是 token 和 cookie 的值一致。
目前为止需要处理的参数有:
● 接口数字
● token (也是 cookie 中的 ackid)
继续分析:
● 接口数字可以从最开始访问官网,找到每张图片的 href 属性,其中 href 属性中的数字就是需要的接口数字
● token 可以从 cookie 中拿到,主要通过删除 cookie 反复访问网址发现:访问每张图片详情存在 set-cookie 字段,其中含有 cookie 的 ackid 值。那么到目前为止,需要处理的参数已经处理好了。
● 此时如果走流程:
访问初始页面 = 》 拿到每个壁纸的详情页 = 》 接着组成/link/common/file/getCompleteUrl/ = 》 访问这个组成的接口会发现无法走通,显示:验证错误
确实,在我们正规手段访问每张图片下载时,需要走验证的,直接通过上述流程显然无法绕过验证。
验证逻辑分析
依旧查看抓包结果:下面接口返回数据 challenge 以及 algorithm 明显标识:sha-256,但是具体实现不太清楚
同时可以看到下面接口以 payload 为参数
base64 解码 payload 为,可以想到,需要对/link/pc/certify/challenge接口返回的数据处理后形成下面数据中的 number 和 took,再通过 base64 编码后得到 payload,访问这个接口验证成功后才能访问产生最终图片的接口
验证逻辑跟栈
首次通过接口找到发起项:
打开源代码后,打上断点
[/ol]
最后通过不断跟栈发现到达了一个异步链:
打上两个断点仔细检查发现,在 xn 处进行了加密,开始 n 进行输入,后续出现了 c,而 c 就是我们要找的 number 和 took
更改断点可以看到:
那么进入 xn()函数发现,xn()实现是,其中 ma 就是给 c 赋值的:
async function xn(n) {
let c = null;
if ("Worker"in window) {
try {
c = await ma(n, n.maxNumber || n.maxnumber || Xe())
} catch (v) {
I(v)
}
if (c?.number !== void 0 || "obfuscated"in n)
return {
data: n,
solution: c
}
}
if ("obfuscated"in n) {
const v = await pi(n.obfuscated, n.key, n.maxNumber || n.maxnumber);
return {
data: n,
solution: await v.promise
}
}
return {
data: n,
solution: await di(n.challenge, n.salt, n.algorithm, n.maxNumber || n.maxnumber || Xe()).promise
}
}
接着查看 ma()实现:
async function ma(n, c=typeof ve() == "number" ? ve() : n.maxNumber || n.maxnumber || Xe(), v=Math.ceil(dt())) {
const y = [];
v = Math.min(16, c, Math.max(1, v));
for (let x = 0; x $= await Promise.all(y.map( (x, _) => { const H = _ * p; return new Promise(Q => { x.addEventListener("message", le => { if (le.data) for (const qe of y) qe !== x && qe.postMessage({ type: "abort" }); Q(le.data) } ), x.postMessage({ payload: n, max: H + p, start: H, type: "work" }) } ) } )); for (const x of y) x.terminate(); return$.find(x => !!x) || null
}
为这个 ma()开始和 return 三个地方打上断点,其中 参数 n 就是返回 chanllege 的 json 数据,c 为固定的 75000,其中这部分代码连续发送 8 次消息,产生 payload:
return new Promise(Q => {
x.addEventListener("message", le => {
if (le.data)
for (const qe of y)
qe !== x && qe.postMessage({
type: "abort"
});
Q(le.data)
}
),
x.postMessage({
payload: n,
max: H + p,
start: H,
type: "work"
})
}
接着断点打在这个 ma()函数的上述 return 这,查看函数执行以及服务器返回,可以看到:
那么分析发现,这个响应是个自执行函数,其中就涉及了暴力解密,number 和 took 就是解密时间和标识,都由这个产生,下面图片可以轻松看出:
● 扣代码比较简单,直接将这个 blob :*这个接口中的 js 数据全部复制,当然需要进行更改,去掉其中 onmessege 的相关操作,改为直接函数传参操作,其中最开始的属于游览器环境才有:
[/ol]
● js 代码:
async function main(data) {
"use strict";
const encoder = new TextEncoder();
function hexEncode(buffer) {
return [...new Uint8Array(buffer)].map(byte => byte.toString(16).padStart(2, "0")).join("");
}
async function computeHash(text, number, algorithm) {
if (typeof crypto === "undefined" || !("subtle" in crypto) || !("digest" in crypto.subtle)) {
throw new Error("Web Crypto is not available. Secure context is required.");
}
return hexEncode(await crypto.subtle.digest(algorithm.toUpperCase(), encoder.encode(text + number)));
}
async function findMatchingNumber(challenge, salt, algorithm, maxNumber, startNumber = 0) {
const startTime = Date.now();
for (let i = startNumber; i
}
● 最后决定使用 playwright 打开游览器执行这段 js,代码如下,使用异步 api
[/ol]
import json
from urllib.parse import urlparse, unquote
from lxml import html
import logging
import os
import base64
from playwright.async_api import async_playwright
from contextlib import asynccontextmanager
import aiofiles
from httpx import AsyncClient
import asyncio
初始化配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36',
'Accept': "application/json",
'Accept-Language': "zh-CN,zh;q=0.9",
'Cache-Control': "no-cache",
'Pragma': "no-cache",
}
class ZFWallpaperSpider:
def init(self, max_concurrent=5, download_folder="C:\Users\Administrator\Desktop\waller_paper"):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.download_folder = download_folder
self.client = None
self.browser = None
self.js_code = None
async def init_resources(self):
"""初始化HTTP客户端和浏览器"""
self.client = AsyncClient(headers=headers)
# 启动Playwright浏览器实例
playwright = await async_playwright().start()
self.browser = await playwright.chromium.launch(headless=True)
# 预加载JS代码
js_file_path = 'js_folder/lgx_20250909_zf.js'
if not os.path.exists(js_file_path):
raise FileNotFoundError(f"找不到 JS 文件: {js_file_path}")
async with aiofiles.open(js_file_path, encoding='utf-8') as f:
self.js_code = await f.read()
async def close_resources(self):
"""关闭所有资源"""
if self.client:
await self.client.aclose()
if self.browser:
await self.browser.close()
@asynccontextmanager
async def get_page(self):
"""获取浏览器页面的上下文管理器"""
page = await self.browser.new_page()
try:
await page.goto("https://www.baidu.com")
yield page
finally:
await page.close()
async def save_image(self, data, url):
"""
保存二进制图片数据到本地
:param data: 二进制数据
:param url: 下载的url
:return: 保存的文件路径
"""
# 创建保存目录
os.makedirs(self.download_folder, exist_ok=True)
# 解析URL,获取路径部分
parsed_url = urlparse(url)
path = parsed_url.path
if "mp4" in path:
return path
# 提取文件名(含参数)
filename_with_params = path.split("/")[-1]
# 去除参数,保留纯文件名
filename = filename_with_params.split("?")[0]
# 完整路径
file_path = os.path.join(self.download_folder, filename)
# 保存文件
try:
async with aiofiles.open(file_path, 'wb') as f:
await f.write(data)
logging.info(f"✅ 图片/动态壁纸保存成功: {file_path}")
return file_path
except Exception as e:
logging.error(f"❌ 保存失败: {e}")
return None
async def challenge(self):
url = "/link/pc/certify/challenge"
response = await self.client.get(url)
return response.json()
async def get_payload(self, data=None):
"""
执行JS代码生成payload
"""
async with self.get_page() as page:
try:
# 执行JS函数并传入参数
js_result = await page.evaluate(self.js_code, data)
js_result = json.loads(js_result)
js_result['signature'] = data['signature']
logging.info(f"JavaScript 返回值: {js_result}")
return js_result
except Exception as e:
logging.error(f"执行 JavaScript 时出错: {e}")
return None
async def get_verify(self, payload):
# 将payload进行base64编码
json_bytes = payload.encode('utf-8')
encoded_bytes = base64.b64encode(json_bytes)
encoded_str = encoded_bytes.decode('utf-8')
url = f"/link/pc/certify/verify?payload={encoded_str}"
cookie_data = dict(self.client.cookies).get('askId')
if cookie_data is None:
raise ValueError("没有cookie,失败了,ackid没有")
ack = unquote(cookie_data)
self.client.headers.update({
'token': f"{ack}",
})
response = await self.client.post(url)
logging.info(response.text)
return response
async def fetch_list(self):
data_list = []
for page in range(7, 8):
url = f"/homeView?isSel=true&page={page}"
response = await self.client.get(url)
tree = html.fromstring(response.text)
cards = tree.xpath('//div[@class="card"]')
for card in cards:
href = card.xpath("./div[2]/div/a/@href")[0]
data_list.append({'url': href})
return data_list
async def fetch_detail(self, response_data):
logging.info("start...-----------------------------------------------")
base_url = '/link/common/file/getCompleteUrl/'
url = response_data.get('url').replace('/homeViewLook/', base_url)
async with self.semaphore:
# 伪装请求头
referer = 'https://haowallpaper.com' + response_data.get('url')
self.client.headers.update({'Referer': referer})
# 获取cookie
await self.client.get(referer)
# 进行challenge和verify
challenge_data = await self.challenge()
logging.info(f"challege返回值为:{challenge_data}")
payload = await self.get_payload(data=challenge_data)
if payload is None:
raise ValueError("payload没有产生,应该是js执行错误")
payload_str = json.dumps(payload)
await self.get_verify(payload_str)
logging.info(f"cookie为:{dict(self.client.cookies)}")
# 发送请求获取最终url
logging.info(f"正在爬取:{url}")
response = await self.client.get(url=url)
logging.info(f"获取最终页面response:{response.text}")
finally_url = response.json()['data']
picture = await self.client.get(url=finally_url)
success = await self.save_image(picture.content, url=finally_url)
data = {
'url': finally_url,
'success': success
}
logging.info("end...-----------------------------------------------")
return data
async def run_tasks(self):
"""主流程"""
try:
await self.init_resources()
# 获取列表数据
job_ids = await self.fetch_list()
# 并发抓取详情
tasks = [self.fetch_detail(job_id) for job_id in job_ids]
data_results = await asyncio.gather(*tasks)
return data_results
finally:
await self.close_resources()
if name == "main":
spider = ZFWallpaperSpider(max_concurrent=1)
results = asyncio.run(spider.run_tasks())
for r in results:
print(r)
print("成功抓取的壁纸数:", len(results))
● 整个逆向过程并不难,主要集中在跟栈方向,当然存在其他快速定位值的办法,比如 hook ,不过我没尝试
● 这个网址检查单个 ip 只能访问 5 次,5 次后需要登陆,这部分我没处理
● python 代码相应文件导入需要更改,所有 url 主机域名需自行添加(脱敏处理)
● 如果有疑问或者问题的欢迎留言交流
[/ol]
再次声明
本文章中所有内容仅供学习交流使用,不用于其他任何目的,不提供完整代码,抓包内容、敏感网址、数据接口等均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关,如有侵权,请联系删除!