[Python] 某站有声小说全本下载代码分享

查看 61|回复 5
作者:nguaduot   
因为喜欢子慕播音的《全职高手》,所以花了点时间研究 ting55,用 Python 写了全本下载工具,分享一些经验。
关于 ting55 的访问限制
有声小说的相关信息(比如书名、播音演员、章节名等)可以从 HTML 页面抓取(参考下文源码),不是难点,难点在于音频资源接口的处理。
ting55 核心接口是 ting55.com/nlinka,POST 调用,用于解析音频资源直链 URL(参考下文源码),ting55 主要对该接口做了访问控制:
  • 同一 IP 短时间连续请求间隔不得低于6秒(否则返回 503 状态码),不超过6次,否则返回空的 URL;
  • 单 IP 超过限制大概30分钟后可继续请求,一天能发起有效请求(获取到音频直链)20~40次(不严谨测试)。

    注意,不要频繁请求该接口,无论是否成功获取到音频 URL,都会被视为请求一次,导致推迟下次请求时间。
    代码实现
    访问控制搞清楚后,来想想思路,多线程不用考虑,意义不大。有两种思路:
  • 基于定时器,卡着时间请求;
  • 基于代{过}{滤}理 IP,单 IP 受限后更换 IP 继续。

    本工具初版是按第一种思路实现,但过于缓慢,在服务器上挂一天只能获取几十章,《全职高手》全本1104章,难等,于是按第二种思路更新了代码(参考下文源码)。
    下文代码为当前版本,后续可能更新,追踪开源代码即可:Gitee - nguaduot / audiobook
    代{过}{滤}理 IP
    参考:https://www.dailiservers.com/free-proxy-list/
    将搜集的代{过}{滤}理 IP 按行分割存放在文本文件中,通过参数引入即可。运行后会逐个检查使用,失效代{过}{滤}理会即时从文件中剔除。
    使用参考
    命令行工具调用,建议先查看命令参数:python audiobook.py -h
    [Asm] 纯文本查看 复制代码usage: audiobook.py [-h] [--name NAME] [--actor ACTOR] [--start START] [--end END] [--out OUT]
                        [--proxy PROXY]
                        url
    audiobook thief, just for https://ting55.com/
    positional arguments:
      url            novel url (e.g. https://ting55.com/book/9200)
    options:
      -h, --help     show this help message and exit
      --name NAME    novel name
      --actor ACTOR  voice actor
      --start START  starting chapter index (default 1)
      --end END      last chapter index (default 1)
      --out OUT      output folder (default same as novel name)
      --proxy PROXY  text file containing proxy IPs (e.g. 81.10.80.155:8080)
    下载《全职高手》前500章,并手动设置小说名和播音演员(用于音频文件命名):
    [color=]python audiobook.py https://ting55.com/book/9200 --name
    [color=]全职高手
    [color=] --actor
    [color=]子慕
    [color=] --start 1 --end 500
    下载《雪中悍刀行》第1章(省略参数则自动抓取小说名和播音演员,不过只下载第1章):
    [color=]python audiobook.py https://ting55.com/book/1419
    运行后会自动生成代{过}{滤}理文件 audiobook.proxy,为纯文本文件。如需使用代{过}{滤}理,打开按行添加代{过}{滤}理即可,或手动指定其他代{过}{滤}理文件。


    屏幕截图 2023-09-18 224711.png (367.35 KB, 下载次数: 0)
    下载附件
    2023-9-18 22:48 上传

    TODO
  • ting55《全职高手》全本免费,
    [color=]本工具未对收费内容做研究
    ,后续会研究其 APP,抓包分析一下接口。

    [Python] 纯文本查看 复制代码#!/usr/bin/python3
    # -*- coding: utf-8 -*-
    import argparse
    import datetime
    import math
    import os
    import re
    import time
    import urllib.parse
    from typing import List, Dict, Tuple
    import requests
    class Proxy(object):
        """
        https://www.dailiservers.com/free-proxy-list/
        https://proxyscrape.com/free-proxy-list-f
        """
        DEF_PROXY_FILE_NAME = "audiobook.proxy"
        def __init__(self, file_proxy: str):
            self.file_proxy = file_proxy
            if not self.file_proxy:
                self.file_proxy = os.path.join(os.path.dirname(__file__), self.DEF_PROXY_FILE_NAME)
            self.data_work = self.__load()
            self.data_sleep = []
            self.data_dead = []
        @staticmethod
        def __ip_with_port(proxy) -> bool:
            return True if re.match("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$", proxy) else False
        def __load(self) -> List[str]:
            """
            始终将空字符串作为第一个值,表示不使用代{过}{滤}理
            """
            if not os.path.isfile(self.file_proxy):
                return [""]
            data = set()
            with open(self.file_proxy, "r", encoding="utf-8") as f:
                for line in f.readlines():
                    line = line.strip()
                    if self.__ip_with_port(line):
                        data.add(line)
            return [""] + list(data)
        def get(self) -> str:
            return self.data_work[0] if self.data_work else ""
        def count(self) -> int:
            return len(self.data_work)
        def empty(self) -> bool:
            return self.count() == 0
        def feedback_dead(self, proxy: str):
            if proxy in self.data_work:
                self.data_work.remove(proxy)
            self.data_dead.append(proxy)
            self.save()
        def feedback_sleep(self, proxy: str):
            if proxy in self.data_work:
                self.data_work.remove(proxy)
            # self.data_work.append(proxy)
            self.data_sleep.append(proxy)
            self.save()
        def save(self):
            with open(self.file_proxy, "w", encoding="utf-8") as f:
                f.write("\n".join([p for p in self.data_work if self.__ip_with_port(p)]))
                f.write("\n".join([p for p in self.data_sleep if self.__ip_with_port(p)]))
    class Ting55Thief(object):
        """
        恋听网 https://ting55.com/
        访问限制规则:
        1. 同一 IP 短时间只能请求6次 nlinka 接口
        """
        URL_NOVEL = "https://ting55.com/book/%s"  # GET
        URL_CHAPTER = "https://ting55.com/book/%s-%d"  # GET
        URL_API_RES = "https://ting55.com/nlinka"  # POST
        USER_AGENT = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
                      " AppleWebKit/537.36 (KHTML, like Gecko)"
                      " Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.76")
        NLINKA_INTERVAL = 6  # nlinka 请求间隔时长
        FILE_SIZE_VALID = 100 * 1024  # 有效文件大小校验
        def __init__(self, novel_id: str, novel_name: str, voice_actor: str, folder_out: str, file_proxy: str):
            self.novel_id = novel_id  # 小说ID
            self.novel_name = novel_name  # 小说名
            self.voice_actor = voice_actor  # 播音演员
            self.folder_out = folder_out  # 音频资源输出文件夹
            self.chapter_max = 1  # 最大章节,用于固定宽度命名
            self.log_nlinka = [0.0]  # nlinka 请求时间日志
            self.proxy = Proxy(file_proxy)  # 代{过}{滤}理,用于 nlinka 接口
            self.session = requests.session()
            self.session.headers.update({
                "User-Agent": self.USER_AGENT,  # 必填
            })
            self.fix_novel_info()  # 修正小说名、播音演员、输出文件夹、最大章节
        def __res_url(self, url_page: str, chapter_id: int, token: str, proxy="") -> Tuple[str, int, str]:
            """
            响应示例:{'ourl': '', 'plink': '', 'url': '', 'status': 1}
            不使用代{过}{滤}理:proxies={"http": "", "https": ""}
            返回值:-2 - 错误,-1 - 该资源失效,0 - 失败,1 - 成功且还剩余请求次数
            """
            t = time.time()
            try:
                res = self.session.post(self.URL_API_RES, data={
                    "bookId": self.novel_id,  # 必填
                    "page": chapter_id  # 必填
                    # "isPay": 0  # 选填
                }, headers={
                    "Referer": url_page,  # 必填
                    "xt": token,  # 必填
                    "l": "1"  # 必填
                }, proxies={
                    "http": proxy,
                    "https": proxy
                }, timeout=8)
                self.log_nlinka.append(t)  # 无论是否获取到 URL,都计1次
                if res.status_code != 200:
                    if res.status_code == 503:
                        return "", 0, "503: too fast"
                    return "", 0, "status code %d" % res.status_code
                data = res.json()
                if data["status"] != 1:
                    if data["status"] == -1:
                        return "", -1, "non-free content"
                    return "", -1, "unexpected response: %s" % data
                if not data["url"]:  # 达到请求限制
                    return "", 0, "too many requests"
                return data["url"], 1, ""
            except requests.exceptions.ReadTimeout:
                return "", -2, "timeout"
            except requests.exceptions.ProxyError:
                return "", -2, "useless proxy"
            except requests.exceptions.ConnectionError:
                return "", -2, "connection error"
        def fix_novel_info(self):
            res = self.session.get(self.URL_NOVEL % self.novel_id)
            text = res.text
            novel_name, voice_actor, chapter_max = "", "", 1
            match = re.search("播音:]+class=\"by\"[^>]*>(.+?)", text)
            if match:
                voice_actor = match.group(1).strip()
            match = re.search("class=\"binfo\">(.+?)", text)
            if match:
                novel_name = match.group(1).strip()
                if voice_actor:
                    novel_name = re.sub("有声小说$", "", novel_name)
                    novel_name = re.sub("[((]子慕[))]", "", novel_name)
                    novel_name = novel_name.strip()
            match = re.search(">(\\d+)", text)
            if match:
                chapter_max = int(match.group(1))
            if not self.novel_name:  # 修正小说名
                self.novel_name = novel_name if novel_name else "未知小说"
            if not self.voice_actor:  # 修正播音演员
                self.voice_actor = voice_actor if voice_actor else "未知播音"
            if not self.folder_out:  # 修正输出文件夹
                self.folder_out = os.path.join(os.path.dirname(__file__), self.novel_name)
            self.chapter_max = chapter_max  # 修正最大章节
        def chapter_url(self, chapter_id: int) -> str:
            return self.URL_CHAPTER % (self.novel_id, chapter_id)
        def file_name(self, chapter_id: int, chapter_name: str, fmt: str) -> str:
            """
            格式:{小说名}-{播音演员}-{章节索引}-{章节名}.{扩展名}
            """
            pattern = "%%s-%%s-%%0%dd-%%s%%s" % len(str(self.chapter_max))
            name = pattern % (self.novel_name, self.voice_actor, chapter_id, chapter_name, fmt)
            name = re.sub("[/\\\\:*?\"|]+", "_", name)  # 替换不可用字符
            return name
        def get(self, chapter_id: int) -> Tuple[str, str]:
            """
            Args:
                chapter_id:
            Returns:
                音频资源直链、章节名
            """
            url_page = self.chapter_url(chapter_id)
            res = self.session.get(url_page)
            text = res.text
            match = re.search("name=\"_c\" content=\"(.+?)\"", text)
            if not match:
                print("token not found")
                return "", ""
            token = match.group(1)
            while True:
                proxy = self.proxy.get()
                print("try proxy %s" % (proxy if proxy else "null"))
                url_audio, status, msg = self.__res_url(url_page, chapter_id, token, proxy)
                if status == 1:
                    break
                if status == -1:  # 非免费资源或资源失效
                    print(msg)
                    break
                if status == 0:  # 代{过}{滤}理有效但数据异常
                    print(msg)
                    self.proxy.feedback_sleep(proxy)
                else:  # -2 无效代{过}{滤}理
                    print(msg)
                    self.proxy.feedback_dead(proxy)
                if self.proxy.empty():  # 代{过}{滤}理耗尽
                    break
            if not url_audio:
                print("error in parsing chapter url")
                return "", ""
            match = re.search(">\\s*第(\\d+)章\\s*(.+?)在线收听\\s* str:
            """
            链接有时效
            Args:
                chapter_url:
                chapter_id:
                chapter_name:
            Returns:
                下载完成文件路径
            """
            if not chapter_url:
                print("empty chapter url")
                return ""
            _, fmt = os.path.splitext(urllib.parse.urlparse(chapter_url).path)
            name = self.file_name(chapter_id, chapter_name, fmt)
            if not os.path.isdir(self.folder_out):
                os.mkdir(self.folder_out)
            path = os.path.join(self.folder_out, name)
            res = self.session.get(chapter_url)
            with open(path, "wb") as f:
                f.write(res.content)
            if os.path.getsize(path)  List[int]:
            data = []
            if not os.path.isdir(self.folder_out):
                return []
            for name in os.listdir(self.folder_out):
                m = re.match("%s-%s-(\\d+)-.+" % (self.novel_name, self.voice_actor), name)
                if m:
                    data.append(int(m.group(1)))
            return data
        def available(self) -> bool:
            return time.time() - self.log_nlinka[-1] > self.NLINKA_INTERVAL
        def wait(self) -> int:
            return max(math.ceil(self.NLINKA_INTERVAL - (time.time() - self.log_nlinka[-1])), 0)
        def config(self) -> Dict:
            return {
                "novel_id": self.novel_id,
                "novel_name": self.novel_name,
                "voice_actor": self.voice_actor,
                "chapter_max": self.chapter_max,
                "folder_out": self.folder_out,
                "file_proxy": self.proxy.file_proxy
            }
        @staticmethod
        def parse_novel_id(novel_url: str) -> str:
            if not novel_url:
                return ""
            m = re.search("ting55\\.com/book/(\\d+)", novel_url)
            if m:
                return m.group(1)
            return ""
    def parse_config() -> Dict:
        """
        argparse:
        https://docs.python.org/zh-cn/3/howto/argparse.html
        """
        parser = argparse.ArgumentParser(description="audiobook thief, just for https://ting55.com/")
        parser.add_argument(
            "url",
            type=str,
            help="novel url (e.g. https://ting55.com/book/9200)"
        )
        parser.add_argument(
            "--name",
            type=str, default="", required=False,
            help="novel name"
        )
        parser.add_argument(
            "--actor",
            type=str, default="", required=False,
            help="voice actor"
        )
        parser.add_argument(
            "--start",
            type=int, default=1, required=False,
            help="starting chapter index (default 1)"
        )
        parser.add_argument(
            "--end",
            type=int, default=1, required=False,
            help="last chapter index (default 1)"
        )
        parser.add_argument(
            "--out",
            type=str, default="", required=False,
            help="output folder (default same as novel name)"
        )
        parser.add_argument(
            "--proxy",
            type=str, default="", required=False,
            help="text file containing proxy IPs (e.g. 81.10.80.155:8080)"
        )
        args = parser.parse_args()
        novel_id = Ting55Thief.parse_novel_id(args.url)
        start = args.start
        end = args.end
        if end  str:
        return datetime.datetime.now().strftime("%H:%M:%S")
    def run():
        thief = Ting55Thief(CONFIG["id"], CONFIG["name"], CONFIG["actor"], CONFIG["out"], CONFIG["proxy"])
        print("config:", thief.config())
        data_downloaded = thief.downloaded()
        data_todo = [i for i in range(CONFIG["start"], CONFIG["end"] + 1) if i not in data_downloaded]
        print("todo: %d" % len(data_todo))
        if not data_todo:
            return
        index = 0
        while True:
            if index >= len(data_todo):
                break
            chapter_id = data_todo[index]
            print("[%s] %d/%d: %s" % (now(), chapter_id, CONFIG["end"], thief.chapter_url(chapter_id)))
            chapter_url, chapter_name = thief.get(chapter_id)
            print("[%s] %d/%d: %s %s" % (
                now(), chapter_id, CONFIG["end"],
                chapter_name if chapter_name else "null", chapter_url if chapter_url else "null"
            ))
            if not chapter_url or not chapter_name:
                break
            file_saved = thief.download(chapter_url, chapter_id, chapter_name)
            if not file_saved:
                break
            print("[%s] %d/%d: %s %.2fmb" % (
                now(), chapter_id, CONFIG["end"], file_saved, os.path.getsize(file_saved) / 1024 / 1024
            ))
            index += 1
            print("sleep %d sec..." % thief.wait())
            time.sleep(thief.wait())
    if __name__ == "__main__":
        VERSION = "1.5.230918"
        print("audiobook thief v%s gitee.com/nguaduot/audiobook" % VERSION)
        CONFIG = parse_config()
        run()

    小说, 代码

  • pentium   

    感谢分享,源码继续研究
    magicny   

    不明觉厉,多谢分享。
    MAOSKE   

    感谢分享!
    指尖的阳光   

    能分享成品工具使用吗
    adm1nSQL   

    好帖帮顶
    您需要登录后才可以回帖 登录 | 立即注册

    返回顶部