zf逆向

查看 23|回复 2
作者:qixireal   
zf逆向
声明
本文章中所有内容仅供学习交流使用,不用于其他任何目的,不提供完整代码,抓包内容、敏感网址、数据接口等均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关!
目录
[ol]

  • 目标

  • 抓包分析

  • 验证逻辑分析

  • 验证逻辑跟栈

  • 扣 js 代码

  • python 实现

  • 结论

  • 目标
    ● 逆向目标:获取每次下载时候验证接口的 payload
    ● 逆向网址:aHR0cHM6Ly9oYW93YWxscGFwZXIuY29tLw==

  • 抓包分析
    直接找到我们需要的壁纸图片,点击下载后出现验证,接着点击验证开始抓包
    首先通过游览器抓包发现没有找到图片包,选择使用 reqable 进行抓包,抓包结果如下:
    图片数据接口如上所示,点击小黄鸟的右边下载可以看到数据接口,此接口数据中无加密参数,其中 cookie 测试发现只需要:askId。
    接着分析参数来源以及 url 来源,往上看可以看到接口:/link/common/file/getCompleteUrl/17187074423967104 返回最终 url
    [/ol]
    找接口/link/common/file/getCompleteUrl/17187074423967104 发现来自于,每张图片唯一标识为:/link/common/file/getCompleteUrl/后面的数字,当然可以看到此接口需要 token 和 cookie,且都是 token 和 cookie 的值一致。
    目前为止需要处理的参数有:
    ● 接口数字
    ● token (也是 cookie 中的 ackid)
    继续分析:
    ● 接口数字可以从最开始访问官网,找到每张图片的 href 属性,其中 href 属性中的数字就是需要的接口数字
    ● token 可以从 cookie 中拿到,主要通过删除 cookie 反复访问网址发现:访问每张图片详情存在 set-cookie 字段,其中含有 cookie 的 ackid 值。那么到目前为止,需要处理的参数已经处理好了。
    ● 此时如果走流程:
    访问初始页面 = 》 拿到每个壁纸的详情页 = 》 接着组成/link/common/file/getCompleteUrl/ = 》 访问这个组成的接口会发现无法走通,显示:验证错误
    确实,在我们正规手段访问每张图片下载时,需要走验证的,直接通过上述流程显然无法绕过验证。

  • 验证逻辑分析

  • 依旧查看抓包结果:下面接口返回数据 challenge 以及 algorithm 明显标识:sha-256,但是具体实现不太清楚

  • 同时可以看到下面接口以 payload 为参数

  • base64 解码 payload 为,可以想到,需要对/link/pc/certify/challenge接口返回的数据处理后形成下面数据中的 number 和 took,再通过 base64 编码后得到 payload,访问这个接口验证成功后才能访问产生最终图片的接口

  • 验证逻辑跟栈
    首次通过接口找到发起项:
    打开源代码后,打上断点
    [/ol]
    最后通过不断跟栈发现到达了一个异步链:
    打上两个断点仔细检查发现,在 xn 处进行了加密,开始 n 进行输入,后续出现了 c,而 c 就是我们要找的 number 和 took
    更改断点可以看到:
    那么进入 xn()函数发现,xn()实现是,其中 ma 就是给 c 赋值的:
    async function xn(n) {
    let c = null;
    if ("Worker"in window) {
    try {
    c = await ma(n, n.maxNumber || n.maxnumber || Xe())
    } catch (v) {
    I(v)
    }
    if (c?.number !== void 0 || "obfuscated"in n)
    return {
    data: n,
    solution: c
    }
    }
    if ("obfuscated"in n) {
    const v = await pi(n.obfuscated, n.key, n.maxNumber || n.maxnumber);
    return {
    data: n,
    solution: await v.promise
    }
    }
    return {
    data: n,
    solution: await di(n.challenge, n.salt, n.algorithm, n.maxNumber || n.maxnumber || Xe()).promise
    }
    }
    接着查看 ma()实现:
    async function ma(n, c=typeof ve() == "number" ? ve() : n.maxNumber || n.maxnumber || Xe(), v=Math.ceil(dt())) {
    const y = [];
    v = Math.min(16, c, Math.max(1, v));
    for (let x = 0; x $= await Promise.all(y.map( (x, _) => { const H = _ * p; return new Promise(Q => { x.addEventListener("message", le => { if (le.data) for (const qe of y) qe !== x && qe.postMessage({ type: "abort" }); Q(le.data) } ), x.postMessage({ payload: n, max: H + p, start: H, type: "work" }) } ) } )); for (const x of y) x.terminate(); return$.find(x => !!x) || null
    }
    为这个 ma()开始和 return 三个地方打上断点,其中 参数 n 就是返回 chanllege 的 json 数据,c 为固定的 75000,其中这部分代码连续发送 8 次消息,产生 payload:
    return new Promise(Q => {
    x.addEventListener("message", le => {
    if (le.data)
    for (const qe of y)
    qe !== x && qe.postMessage({
    type: "abort"
    });
    Q(le.data)
    }
    ),
    x.postMessage({
    payload: n,
    max: H + p,
    start: H,
    type: "work"
    })
    }
    接着断点打在这个 ma()函数的上述 return 这,查看函数执行以及服务器返回,可以看到:
    那么分析发现,这个响应是个自执行函数,其中就涉及了暴力解密,number 和 took 就是解密时间和标识,都由这个产生,下面图片可以轻松看出:
  • 扣 js 代码
    ● 扣代码比较简单,直接将这个 blob :*这个接口中的 js 数据全部复制,当然需要进行更改,去掉其中 onmessege 的相关操作,改为直接函数传参操作,其中最开始的属于游览器环境才有:
    [/ol]
    ● js 代码:
    async function main(data) {
    "use strict";
    const encoder = new TextEncoder();
    function hexEncode(buffer) {
        return [...new Uint8Array(buffer)].map(byte => byte.toString(16).padStart(2, "0")).join("");
    }
    async function computeHash(text, number, algorithm) {
        if (typeof crypto === "undefined" || !("subtle" in crypto) || !("digest" in crypto.subtle)) {
            throw new Error("Web Crypto is not available. Secure context is required.");
        }
        return hexEncode(await crypto.subtle.digest(algorithm.toUpperCase(), encoder.encode(text + number)));
    }
    async function findMatchingNumber(challenge, salt, algorithm, maxNumber, startNumber = 0) {
        const startTime = Date.now();
        for (let i = startNumber; i
    }
  • python 实现
    ● 最后决定使用 playwright 打开游览器执行这段 js,代码如下,使用异步 api
    [/ol]
    import json
    from urllib.parse import urlparse, unquote
    from lxml import html
    import logging
    import os
    import base64
    from playwright.async_api import async_playwright
    from contextlib import asynccontextmanager
    import aiofiles
    from httpx import AsyncClient
    import asyncio
    初始化配置
    logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36',
    'Accept': "application/json",
    'Accept-Language': "zh-CN,zh;q=0.9",
    'Cache-Control': "no-cache",
    'Pragma': "no-cache",
    }
    class ZFWallpaperSpider:
    def init(self, max_concurrent=5, download_folder="C:\Users\Administrator\Desktop\waller_paper"):
    self.semaphore = asyncio.Semaphore(max_concurrent)
    self.download_folder = download_folder
    self.client = None
    self.browser = None
    self.js_code = None
    async def init_resources(self):
        """初始化HTTP客户端和浏览器"""
        self.client = AsyncClient(headers=headers)
        # 启动Playwright浏览器实例
        playwright = await async_playwright().start()
        self.browser = await playwright.chromium.launch(headless=True)
        # 预加载JS代码
        js_file_path = 'js_folder/lgx_20250909_zf.js'
        if not os.path.exists(js_file_path):
            raise FileNotFoundError(f"找不到 JS 文件: {js_file_path}")
        async with aiofiles.open(js_file_path, encoding='utf-8') as f:
            self.js_code = await f.read()
    async def close_resources(self):
        """关闭所有资源"""
        if self.client:
            await self.client.aclose()
        if self.browser:
            await self.browser.close()
    @asynccontextmanager
    async def get_page(self):
        """获取浏览器页面的上下文管理器"""
        page = await self.browser.new_page()
        try:
            await page.goto("https://www.baidu.com")
            yield page
        finally:
            await page.close()
    async def save_image(self, data, url):
        """
        保存二进制图片数据到本地
        :param data: 二进制数据
        :param url: 下载的url
        :return: 保存的文件路径
        """
        # 创建保存目录
        os.makedirs(self.download_folder, exist_ok=True)
        # 解析URL,获取路径部分
        parsed_url = urlparse(url)
        path = parsed_url.path
        if "mp4" in path:
            return path
        # 提取文件名(含参数)
        filename_with_params = path.split("/")[-1]
        # 去除参数,保留纯文件名
        filename = filename_with_params.split("?")[0]
        # 完整路径
        file_path = os.path.join(self.download_folder, filename)
        # 保存文件
        try:
            async with aiofiles.open(file_path, 'wb') as f:
                await f.write(data)
            logging.info(f"✅ 图片/动态壁纸保存成功: {file_path}")
            return file_path
        except Exception as e:
            logging.error(f"❌ 保存失败: {e}")
            return None
    async def challenge(self):
        url = "/link/pc/certify/challenge"
        response = await self.client.get(url)
        return response.json()
    async def get_payload(self, data=None):
        """
        执行JS代码生成payload
        """
        async with self.get_page() as page:
            try:
                # 执行JS函数并传入参数
                js_result = await page.evaluate(self.js_code, data)
                js_result = json.loads(js_result)
                js_result['signature'] = data['signature']
                logging.info(f"JavaScript 返回值: {js_result}")
                return js_result
            except Exception as e:
                logging.error(f"执行 JavaScript 时出错: {e}")
                return None
    async def get_verify(self, payload):
        # 将payload进行base64编码
        json_bytes = payload.encode('utf-8')
        encoded_bytes = base64.b64encode(json_bytes)
        encoded_str = encoded_bytes.decode('utf-8')
        url = f"/link/pc/certify/verify?payload={encoded_str}"
        cookie_data = dict(self.client.cookies).get('askId')
        if cookie_data is None:
            raise ValueError("没有cookie,失败了,ackid没有")
        ack = unquote(cookie_data)
        self.client.headers.update({
            'token': f"{ack}",
        })
        response = await self.client.post(url)
        logging.info(response.text)
        return response
    async def fetch_list(self):
        data_list = []
        for page in range(7, 8):
            url = f"/homeView?isSel=true&page={page}"
            response = await self.client.get(url)
            tree = html.fromstring(response.text)
            cards = tree.xpath('//div[@class="card"]')
            for card in cards:
                href = card.xpath("./div[2]/div/a/@href")[0]
                data_list.append({'url': href})
        return data_list
    async def fetch_detail(self, response_data):
        logging.info("start...-----------------------------------------------")
        base_url = '/link/common/file/getCompleteUrl/'
        url = response_data.get('url').replace('/homeViewLook/', base_url)
        async with self.semaphore:
            # 伪装请求头
            referer = 'https://haowallpaper.com' + response_data.get('url')
            self.client.headers.update({'Referer': referer})
            # 获取cookie
            await self.client.get(referer)
            # 进行challenge和verify
            challenge_data = await self.challenge()
            logging.info(f"challege返回值为:{challenge_data}")
            payload = await self.get_payload(data=challenge_data)
            if payload is None:
                raise ValueError("payload没有产生,应该是js执行错误")
            payload_str = json.dumps(payload)
            await self.get_verify(payload_str)
            logging.info(f"cookie为:{dict(self.client.cookies)}")
            # 发送请求获取最终url
            logging.info(f"正在爬取:{url}")
            response = await self.client.get(url=url)
            logging.info(f"获取最终页面response:{response.text}")
            finally_url = response.json()['data']
            picture = await self.client.get(url=finally_url)
            success = await self.save_image(picture.content, url=finally_url)
            data = {
                'url': finally_url,
                'success': success
            }
            logging.info("end...-----------------------------------------------")
            return data
    async def run_tasks(self):
        """主流程"""
        try:
            await self.init_resources()
            # 获取列表数据
            job_ids = await self.fetch_list()
            # 并发抓取详情
            tasks = [self.fetch_detail(job_id) for job_id in job_ids]
            data_results = await asyncio.gather(*tasks)
            return data_results
        finally:
            await self.close_resources()
    if name == "main":
    spider = ZFWallpaperSpider(max_concurrent=1)
    results = asyncio.run(spider.run_tasks())
    for r in results:
    print(r)
    print("成功抓取的壁纸数:", len(results))
  • 结论
    ● 整个逆向过程并不难,主要集中在跟栈方向,当然存在其他快速定位值的办法,比如 hook ,不过我没尝试
    ● 这个网址检查单个 ip 只能访问 5 次,5 次后需要登陆,这部分我没处理
    ● python 代码相应文件导入需要更改,所有 url 主机域名需自行添加(脱敏处理)
    ● 如果有疑问或者问题的欢迎留言交流
    [/ol]
    再次声明
    本文章中所有内容仅供学习交流使用,不用于其他任何目的,不提供完整代码,抓包内容、敏感网址、数据接口等均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关,如有侵权,请联系删除!

    接口, 代码

  • wujunfengpojie   

    以为是zf的php框架,一看不是,学习了
    13955925361   

    律师函警告
    您需要登录后才可以回帖 登录 | 立即注册

    返回顶部