1.png (140.25 KB, 下载次数: 0)
下载附件
2025-2-2 17:38 上传
2.png (62.42 KB, 下载次数: 0)
下载附件
2025-2-2 17:39 上传
[Asm] 纯文本查看 复制代码import re
import requests
import cloudscraper
import concurrent.futures
from lxml import etree
import os
import logging
# 设置日志配置
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
class VideoDownloader:
def __init__(self, url):
self.url = url
# 在这里补充你的 headers
self.headers = {
"Referer": "https://",
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/131.0.0.0 Safari/537.36',
'Cookie': "省略"
}
self.scraper = cloudscraper.create_scraper()
def fetch_page(self):
"""获取网页内容并解析"""
try:
response = self.scraper.get(url=self.url, headers=self.headers, timeout=2)
html = etree.HTML(response.text)
return html
except Exception as e:
logger.error(f"获取网页内容失败: {e}")
return None
def extract_video_uuid(self, html):
"""从页面中提取视频的UUID"""
try:
text = html.xpath("""/html/body/script[5]/text()""")[0]
uuid_pattern = r'\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b'
video_num = re.search(uuid_pattern, text).group(0)
return video_num
except Exception as e:
logger.error(f"提取视频UUID失败: {e}")
return None
def get_video_numbers(self, video_num, quality="1080p"):
"""获取视频切分数"""
url_videolist = f"https://surrit.com/{video_num}/{quality}/"
return self._get_video_part_number(url_videolist)
def _get_video_part_number(self, url):
"""从视频链接中提取视频切分数"""
try:
response = self.scraper.get(url=url, headers=self.headers, timeout=2)
temp = response.text
pattern = r'video(\d+)\.jpeg'
numbers = re.findall(pattern, temp)[-1]
return int(numbers)
except Exception as e:
logger.error(f"获取视频切分数失败: {e}")
return None
def download_video_part(self, video_num, quality, part_number):
"""下载单个视频切片"""
download_url = f"https://surrit.com/{video_num}/{quality}/video{part_number}"
try:
response = requests.get(url=download_url, headers=self.headers)
with open(f"{part_number}.mp4", "wb") as f:
f.write(response.content)
logger.info(f"下载视频切片 {part_number} 完成")
except Exception as e:
logger.error(f"下载视频切片 {part_number} 失败: {e}")
def download_video(self, video_num, start, end, quality="1080p"):
"""使用多线程批量下载视频切片"""
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for i in range(start, end + 1):
futures.append(executor.submit(self.download_video_part, video_num, quality, i))
# 等待所有任务完成
for future in concurrent.futures.as_completed(futures):
future.result()
def merge_videos(self, part_count):
"""合并所有下载的视频切片"""
try:
with open("file_list.txt", "w") as f:
for i in range(1, part_count + 1):
f.write(f"file '{i}.mp4'\n")
os.system("ffmpeg -f concat -safe 0 -i file_list.txt -c copy output.mp4")
logger.info("视频合并完成")
# 删除临时文件
os.remove("file_list.txt")
# 删除下载的视频切片
for i in range(1, part_count + 1):
try:
os.remove(f"{i}.mp4")
logger.info(f"删除视频切片: {i}.mp4")
except Exception as e:
logger.error(f"删除视频切片 {i}.mp4 时发生错误: {e}")
except Exception as e:
logger.error(f"合并视频失败: {e}")
url = "https://"
video_downloader = VideoDownloader(url)
# 获取页面内容并提取视频UUID
html = video_downloader.fetch_page()
if html:
video_num = video_downloader.extract_video_uuid(html)
if video_num:
logger.info(f"视频链接编号: {video_num}")
# 获取视频的切分数
numbers = video_downloader.get_video_numbers(video_num)
if numbers:
logger.info(f"视频切分数: {numbers}")
# 批量下载视频切片
video_downloader.download_video(video_num, 1, numbers)
[color=]
[color=]#
合并所有视频切片
[color=]
video_downloader
[color=].merge_videos(
numbers)