论坛二创第三方全网视频电视剧全网电影解析多线程下载

查看 69|回复 7
作者:蜗牛很牛   
第三方全网视频电视剧全网电影解析多线程下载
感谢原创。本人学艺不精还请大家有好的意见一起交流[Python] 纯文本查看 复制代码import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import asyncio
import aiohttp
import aiofiles
import hashlib
import shutil
import os
import re
import requests
from tqdm import tqdm
import datetime
headers = {
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
}
def download_m3u8(urls, film_name, path):
    print("请等待 m3u8 文件下载完成!")
    film_lists = []
    film_dir = os.path.join(path, film_name)
    os.makedirs(film_dir, exist_ok=True)
    for url in tqdm(urls):
        episode_name = url["name"].strip().replace('\t', '')
        film_lists.append(episode_name)
        episode_dir = os.path.join(film_dir, episode_name)
        os.makedirs(episode_dir, exist_ok=True)
        response = requests.get(url['url'], headers=headers)
        m3u8_url = url['url'] if '#EXT-X-ENDLIST' in response.text else re.search(r'.*m3u8.*', response.text).group(0)
        m3u8_content = requests.get(m3u8_url).text
        with open(os.path.join(episode_dir, 'first.m3u8'), 'w') as f:
            f.write(m3u8_content)
    return film_lists, film_name
def get_key(key_url):
    return requests.get(key_url).content
async def download_one(path, url, sem, key, episode):
    file_name = url.split('/')[-1]
    async with sem:
        for _ in range(10):
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url) as response:
                        content = await response.read()
                        if key:
                            cipher = Cipher(algorithms.AES(key), modes.CBC(b"0000000000000000"),
                                            backend=default_backend())
                            decryptor = cipher.decryptor()
                            content = decryptor.update(content) + decryptor.finalize()
                        async with aiofiles.open(os.path.join(path, file_name), mode='wb') as f:
                            await f.write(content)
                print(f"{episode} - {file_name} 完成片段!")
                break
            except Exception as e:
                print(f"{episode} - {file_name} 超时重试!", e)
def merge(path, episode):
    print("开始合并")
    episode_dir = os.path.join(path, episode)
    ts_files = []
    with open(os.path.join(episode_dir, "first.m3u8"), "r", encoding='utf-8') as fa:
        ts_files = [line.strip().split('/')[-1] for line in fa if not line.startswith("#")]
    os.chdir(episode_dir)
    n = 1
    temp = []
    for i, ts_file in enumerate(ts_files):
        temp.append(ts_file)
        if i > 0 and i % 100 == 0:
            cmd = f"copy /b {'+'.join(temp)} {n}.ts"
            os.system(cmd)
            videos_del(temp)
            temp = []
            n += 1
    if temp:
        cmd = f"copy /b {'+'.join(temp)} {n}.ts"
        os.system(cmd)
        videos_del(temp)
    last_temp = [f"{i}.ts" for i in range(1, n + 1)]
    cmd = f"copy /b {'+'.join(last_temp)} {episode}.mp4"
    os.system(cmd)
    shutil.move(f"{episode}.mp4", path)
    os.chdir(path)
    for i in range(1, n + 1):
        os.remove(os.path.join(episode_dir, f"{i}.ts"))
    print("合并完成")
def videos_del(ts_files):
    for ts_file in ts_files:
        os.remove(ts_file)
async def download_all_videos(film_list, film_name, film_path, m3u8_urls):
    tasks = []
    sem = asyncio.Semaphore(100)
    film_path = os.path.join(film_path, film_name)
    for episode, m3u8_url in zip(film_list, m3u8_urls):
        key_value = ''
        resp = requests.get(m3u8_url['url'])
        key_exists = 'AES-128' in resp.text
        if key_exists:
            key_url = m3u8_url['url'].replace('index.m3u8', 'enc.key')
            key_value = get_key(key_url)
        with open(os.path.join(film_path, episode, 'first.m3u8'), 'r', encoding='utf-8') as f:
            for line in f:
                if line.startswith('#'):
                    continue
                line = line.strip()
                task = asyncio.create_task(
                    download_one(os.path.join(film_path, episode), line, sem, key_value, episode))
                tasks.append(task)
        await asyncio.wait(tasks)
        await asyncio.get_event_loop().run_in_executor(None, merge, film_path, episode)
        print(f"{episode} 完成")
    print("全部下载完成!")
def get_md5_day():
    now = datetime.datetime.now()
    day_of_num = str(now.day + 9 + 9 ^ 10)
    md5_hash = hashlib.md5(day_of_num.encode()).hexdigest()[:10]
    md5_hash = hashlib.md5(md5_hash.encode()).hexdigest()
    return md5_hash, str(now.day + 11372)
def get_m3u8_url(film):
    md5_hash, one_day = get_md5_day()
    params = {'z': md5_hash, 'jx': film, 's1ig': one_day, 'g': ''}
    response = requests.get('https://m1-a1.cloud.nnpp.vip:2223/api/v/', params=params, headers=headers)
    if response.status_code != 200:
        print(f"请求失败,状态码:{response.status_code}")
        return None
    try:
        response_data = response.json()
    except requests.exceptions.JSONDecodeError:
        print("响应内容不是有效的 JSON 格式")
        return None
    if 'data' not in response_data:
        print("未搜索到相关片源")
        return None
    return response_data['data']
def download_via_ui(film, selected_idx, episodes, path):
    film_data = get_m3u8_url(film)
    if not film_data:
        return
    selected_film = film_data[selected_idx]
    film_name = selected_film['name']
    m3u8_urls = selected_film['source']['eps']
    target_episode = episodes[1] if len(episodes) > 1 else ''
    if '-' in target_episode:
        start_idx, end_idx = map(lambda x: int(x) - 1, target_episode.split('-'))
        if start_idx = len(m3u8_urls):
            end_idx = len(m3u8_urls) - 1
        m3u8_list = m3u8_urls[start_idx:end_idx + 1]
    elif ',' in target_episode:
        indexes = [int(x) - 1 for x in target_episode.split(',')]
        m3u8_list = [m3u8_urls[i] for i in indexes]
    elif target_episode.strip() == '':
        m3u8_list = m3u8_urls
    else:
        start_idx = int(target_episode) - 1
        if start_idx = len(m3u8_urls):
            start_idx = len(m3u8_urls) - 1
        m3u8_list = [m3u8_urls[start_idx]]
    film_lists, _ = download_m3u8(m3u8_list, film_name, path)
    asyncio.run(download_all_videos(film_lists, film_name, path, m3u8_list))
def main_ui():
    def select_directory():
        path = filedialog.askdirectory()
        if path:
            entry_path.delete(0, tk.END)
            entry_path.insert(0, path)
    def search_film():
        film = entry_film.get().strip()
        if not film:
            messagebox.showwarning("警告", "请输入影片名")
            return
        film_data = get_m3u8_url(film)
        if not film_data:
            messagebox.showwarning("警告", "未搜索到相关片源")
            return
        combo_film['values'] = [f"{item['name']} ({len(item['source']['eps'])}集)" for item in film_data]
        combo_film.current(0)
    def start_download():
        film = entry_film.get().strip()
        episodes = entry_episodes.get().strip().split('-')
        path = entry_path.get().strip() or os.getcwd()
        selected_idx = combo_film.current()
        if not film or not episodes or not path or selected_idx

第三方, 影片

zelen   

谢谢楼主的分享!这个绝对要顶!!!
蛋疼王子   

谢谢分享
chaozhi   

搜索任何关键字都是返回:{'type': 4000}
未搜索到相关片源
gun008   

感谢分享!光是学习研究就很值当了
Haoyua   

感谢分享,这个应该不能下VIP吧。
搜索返回:{'type': 4000}
是哪个参数有问题吗,下载界面很容易卡顿
jtui6999   

收藏了,感谢分享!
xuanqi521   

不太好用啊
您需要登录后才可以回帖 登录 | 立即注册

返回顶部