自己写的某网站视频下载爬虫

查看 14|回复 1
作者:ccxk   
主要思路:
  • 访问主页面抓取视频对应的下载ID(通过正则表达式来进行)
  • 抓包得到m3u8文件的请求链接,分析其构成
  • 通过ID找到对应的m3u8文件(video.m3u8和playlist.m3u8,一个对应视频切分信息,一个对应视频画质信息),获取视频切片数量信息
  • 通过浏览器开发者工具抓包分析,网站将mp4文件伪装成了jpg进行传输,根据获取的数量来依次下载



    1.png (140.25 KB, 下载次数: 0)
    下载附件
    2025-2-2 17:38 上传



    2.png (62.42 KB, 下载次数: 0)
    下载附件
    2025-2-2 17:39 上传

  • 最后通过FFmpeg来进行视频合成
  • 整理思路很简单,唯一稍微难点就是视频格式伪装不过非常常见
  • 存在问题:代码在pycharm中为什么占用内存很大时间稍微一长就会卡死

    [Asm] 纯文本查看 复制代码import re
    import requests
    import cloudscraper
    import concurrent.futures
    from lxml import etree
    import os
    import logging
    # 设置日志配置
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
    logger = logging.getLogger(__name__)
    class VideoDownloader:
        def __init__(self, url):
            self.url = url
            # 在这里补充你的 headers
            self.headers = {
                "Referer": "https://",
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                              'Chrome/131.0.0.0 Safari/537.36',
                'Cookie': "省略"
            }
            self.scraper = cloudscraper.create_scraper()
        def fetch_page(self):
            """获取网页内容并解析"""
            try:
                response = self.scraper.get(url=self.url, headers=self.headers, timeout=2)
                html = etree.HTML(response.text)
                return html
            except Exception as e:
                logger.error(f"获取网页内容失败: {e}")
                return None
        def extract_video_uuid(self, html):
            """从页面中提取视频的UUID"""
            try:
                text = html.xpath("""/html/body/script[5]/text()""")[0]
                uuid_pattern = r'\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b'
                video_num = re.search(uuid_pattern, text).group(0)
                return video_num
            except Exception as e:
                logger.error(f"提取视频UUID失败: {e}")
                return None
        def get_video_numbers(self, video_num, quality="1080p"):
            """获取视频切分数"""
            url_videolist = f"https://surrit.com/{video_num}/{quality}/"
            return self._get_video_part_number(url_videolist)
        def _get_video_part_number(self, url):
            """从视频链接中提取视频切分数"""
            try:
                response = self.scraper.get(url=url, headers=self.headers, timeout=2)
                temp = response.text
                pattern = r'video(\d+)\.jpeg'
                numbers = re.findall(pattern, temp)[-1]
                return int(numbers)
            except Exception as e:
                logger.error(f"获取视频切分数失败: {e}")
                return None
        def download_video_part(self, video_num, quality, part_number):
            """下载单个视频切片"""
            download_url = f"https://surrit.com/{video_num}/{quality}/video{part_number}"
            try:
                response = requests.get(url=download_url, headers=self.headers)
                with open(f"{part_number}.mp4", "wb") as f:
                    f.write(response.content)
                logger.info(f"下载视频切片 {part_number} 完成")
            except Exception as e:
                logger.error(f"下载视频切片 {part_number} 失败: {e}")
        def download_video(self, video_num, start, end, quality="1080p"):
            """使用多线程批量下载视频切片"""
            with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
                futures = []
                for i in range(start, end + 1):
                    futures.append(executor.submit(self.download_video_part, video_num, quality, i))
                # 等待所有任务完成
                for future in concurrent.futures.as_completed(futures):
                    future.result()
        def merge_videos(self, part_count):
            """合并所有下载的视频切片"""
            try:
                with open("file_list.txt", "w") as f:
                    for i in range(1, part_count + 1):
                        f.write(f"file '{i}.mp4'\n")
                os.system("ffmpeg -f concat -safe 0 -i file_list.txt -c copy output.mp4")
                logger.info("视频合并完成")
                # 删除临时文件
                os.remove("file_list.txt")
                # 删除下载的视频切片
                for i in range(1, part_count + 1):
                    try:
                        os.remove(f"{i}.mp4")
                        logger.info(f"删除视频切片: {i}.mp4")
                    except Exception as e:
                        logger.error(f"删除视频切片 {i}.mp4 时发生错误: {e}")
            except Exception as e:
                logger.error(f"合并视频失败: {e}")
    url = "https://"
    video_downloader = VideoDownloader(url)
    # 获取页面内容并提取视频UUID
    html = video_downloader.fetch_page()
    if html:
        video_num = video_downloader.extract_video_uuid(html)
        if video_num:
            logger.info(f"视频链接编号: {video_num}")
            # 获取视频的切分数
            numbers = video_downloader.get_video_numbers(video_num)
            if numbers:
                logger.info(f"视频切分数: {numbers}")
                # 批量下载视频切片
                video_downloader.download_video(video_num, 1, numbers)
    [color=]            
    [color=]#
    合并所有视频切片
    [color=]            
    video_downloader
    [color=].merge_videos(
    numbers)

    视频, 切片

  • CROW_df   

    修一下代码框吧,建议用 Markdown 格式贴代码。
    "
    您需要登录后才可以回帖 登录 | 立即注册

    返回顶部