第三方全网视频电视剧全网电影解析多线程下载

查看 106|回复 9
作者:我很忙!   
[Python] 纯文本查看 复制代码import pprint
import os
import time
import requests
import re
from tqdm import tqdm
import asyncio
import aiohttp
import aiofiles
import datetime
import hashlib
import shutil
from pathlib import Path
from Crypto.Cipher import AES
headers = {
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
}
def download_m3u8(urls, film_name, path):
    '''
    :param urls: m3u8所有主链接
    :param film_name: 片名
    :param path: 保存路径
    :return: 对应集数名列表。片名
    '''
    print("请等待m3u8文件下载完成!")
    film_lists = []
    for url in tqdm(urls):
        # 片源集数 删全部空
        film_del_blank = url["name"].replace('\t', '').strip()
        # 记录已下载的电影总集数
        film_lists.append(film_del_blank)
        # 访问m3u8连接
        response = requests.get(url['url'], headers=headers)
        # 判断电影名目录
        if not os.path.exists(fr'{path}/{film_name}'):
            # 创建文件夹 E:\电视剧\白色城堡
            os.makedirs(fr'{path}/{film_name}')
        # 判断电影集数文件是否存在
        if not os.path.exists(fr'{path}/{film_name}/{film_del_blank}'):
            # 如: E:\电视剧\白色城堡\第6集
            os.makedirs(fr'{path}/{film_name}/{film_del_blank}')
        # 是否为正常m3u8
        if '#EXT-X-ENDLIST' in response.text:
            m3u8_url = url['url']
        else:
            # 正则匹配
            pattern = re.compile(r'.*m3u8.*')
            # 匹配数据来源
            matches = pattern.findall(response.text)
            # 切割删除最后/的内容 并拼接上正则取出的新的连接
            m3u8_url = url['url'].replace('index.m3u8', matches[0])
        # 下载m3u8文件
        resp = requests.get(m3u8_url)
        with open(fr'{path}/{film_name}/{film_del_blank}/first.m3u8', 'w') as f:
            f.write(resp.text)
    return film_lists, film_name
def get_key(key_url):
    resp = requests.get(key_url)
    return resp.content # 直接拿字节,为了解密的时候,直接丢进去就可以了
async def download_one(path, url, sem, key, episode):
    '''
    功能:协程,每请求一次,下载一个片段,并直接解密
    :param path: 路径
    :param url: 链接
    :param sem: 协程并发数
    :param key: 秘钥
    :param episode : 集数
    :return: 返回空
    '''
    # E:\电视剧\孙悟空大战红孩儿/正片/https://c.baisiweiting.com:18443/hls/445/20240628/470661/plist462.ts
    file_name = url.split('/')[-1]
    async with sem:
        # 下载片段,如出错,重试20次
        for i in range(10):
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url) as response:
                        content = await response.read()
                        if key:
                            aes = AES.new(key=key, mode=AES.MODE_CBC,IV=b"0000000000000000")  # 如果没有指定IV就输入十六个零 MODE ECB或CBC
                            content = aes.decrypt(content)
                        async with aiofiles.open(fr"{path}/{file_name}", mode='wb') as f:
                            await f.write(content)
                print(episode + "-" + file_name, '完成片段!')
                # E:/电视剧/孙悟空大战红孩儿/正片/
                break
            except Exception as e:
                print(episode + "-" + file_name,'超时重试!' ,e)
def merge(path,title, episode):
    '''
    功能:合并并且删除所有片段
    :param path: 例如:E:/电视剧/孙悟空大战红孩儿
    :param title: 片名:例如:孙悟空
    :return: 无返回值
    '''
    #path = E:/电视剧/孙悟空大战红孩儿/正片
    print("开始合并")
    file_list =[]
    #    merge('E:/电视剧/灵魂摆渡','灵魂摆渡','第1集')
    with open(f"{path}/{episode}/first.m3u8", "r", encoding='utf-8') as fa:
        for line in fa:
            # 判断数据前缀是否为#开头
            if line.startswith("#"):
                continue
            #去换行
            line = line.strip()
            # 将 m3u8地址添加到列表
            line = line.split('/')[-1]
            file_list.append(line)
    # 更换合并目录
    os.chdir(path+"/"+episode)
    n = 1
    temp =[]
    for i in range(len(file_list)):
        # 每20个合并一次
        file_name = file_list
        temp.append(file_name)
        if i != 0 and i % 100 == 0:
            # 可以合并一次
            cmd = f"copy /b {'+'.join(temp)} {n}.ts"
            r = os.popen(cmd)
            r.read()
            r.close()
            videos_del(temp)  # 删除已经合并的视频1
            temp = [] # 新列表
            n += 1
    # 需要把剩余的ts进行合并
    cmd = f"copy /b {'+'.join(temp)} {n}.ts"
    r = os.popen(cmd)
    r.read()
    r.close()
    n += 1
    # 第二次大合并
    videos_del(temp) # 删除已经合并的视频2
    last_temp = []
    for i in range(1, n):
        last_temp.append(f"{i}.ts")
    # 最后一次合并
    cmd = f"copy /b {'+'.join(last_temp)} {episode}.mp4"
    r = os.popen(cmd)
    r.read()
    for i in range(1,n):
        os.remove(f"{i}.ts")
    shutil.move(f"{episode}.mp4", path)
    time.sleep(1)
    r.close()
    # try:
    #     os.chdir('E:/')
    #     os.remove(f"{path}/{episode}/first.m3u8")
    #     # os.rmdir(path+"/"+episode)
    #     # shutil.rmtree(path+"/"+episode)
    #     print("合并完成")
    # except Exception as e:
    #     print('删除文件时异常,可手动删除!',e)
#删除已合并的视频
def videos_del(list):
    '''
    :param list: 需要删除的文件和路径
    :return: 无返回值
    '''
    for a in list:
        os.remove(a)
async def download_all_videos(film_list,film_name, film_path,m3u8_urls):
    '''
    功能:下载所有集数
    :param film_list: 集数名列表
    :param film_name: 片名:如 孙悟空
    :param film_path: 路径:如"E:/电视剧"
    :param m3u8_urls m3u8所有链接
    :return: 协程无返回值
    ''' # 参数/功能说明
    tasks = []
    # 协程数
    sem = asyncio.Semaphore(100)
    # 路径拼接  "E:/电视剧" 红孩儿
    film_path = fr"{film_path}/{film_name}"
    # film_list 集数列表 m3u8_urls链接列表 遍历
    for episode,m3u8_url in zip(film_list,m3u8_urls):
        key_value = ''
        resp = requests.get(m3u8_url['url'])
        # 判断m3u8文件中,是否为加密视频
        key_exists = True if 'AES-128' in resp.text else False
        # 读取秘钥
        if key_exists:
            # 与原m3u8链接替换enc
            key_url = m3u8_url['url'].replace('index.m3u8', 'enc.key')
            # 调用 请求key函数,返回key值
            key_value = get_key(key_url)
        with open(fr'{film_path}/{episode}/first.m3u8', 'r', encoding='utf-8') as f:
            for line in f:
                if line.startswith('#'):
                    continue
                line = line.strip()
                # 每次循环下载一条ts
                t = asyncio.create_task(download_one(fr'{film_path}/{episode}/', line, sem, key_value, episode))
                tasks.append(t)
            # 等待协程执行完
            await asyncio.wait(tasks)
            # 每次下完一集,直接合并和删除
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(None, merge, film_path , film_name, episode)
            print(episode,"完成")
    print("全部下载完成!")
def get_md5_day():
    ''':return: 计算提交参数,并返回md5和s1ig值'''
    now = datetime.datetime.now() # 创建时间对象
    day_of_month = now.day # 取当前天数
    day_of_num = str(day_of_month + 9 + 9 ^ 10) # 天数与其他参数运算 转str
    md5_hash = hashlib.md5(day_of_num.encode()).hexdigest() # 结果转md5
    md5_hash = md5_hash[0:10] # 切片md5 前10
    md5_hash = hashlib.md5(md5_hash.encode()).hexdigest() # 前十 md5在进行md5加密
    return md5_hash, str(day_of_month + 11372)
def get_m3u8_url(film):
    '''
    :param film: 片名
    :return: 返回选中的片名,和所有片名对应的m3u8主链接
    '''
    md5_hash, one_day = get_md5_day()
    params = {
        'z': md5_hash,
        'jx': film,
        's1ig': one_day,
        'g': '',
    }
    response = requests.get('https://m1-a1.cloud.nnpp.vip:2223/api/v/', params=params, headers=headers)
    if response.status_code != 200:
        print(f"请求失败,可能出现验证码或其他问题,状态码:{response.status_code}")
        return None, None
    else:
        try:
            response_data = response.json()
        except requests.exceptions.JSONDecodeError:
            print("响应内容不是有效的 JSON 格式")
            return None, None
    index = 0
    if 'data' not in response_data:
        print('未搜索到相关片源,可能网站出现了验证码,去看看!')
        return None, None
    for i in response_data['data']:
        # 取片集数
        number = '\t'*1 + str(len(i['source']['eps'])).zfill(2) + '集' + "\t"*1
        index += 1
        print(f"【{index}】" + i['name'] + number + i['year'])
    if index == 1:
        inpurt_data = input("请输入1进行下载:")
    else:
        inpurt_data = input(f"请输入1 ~ {index} 进行下载:")
    try:
        inpurt_data = int(inpurt_data)
    except ValueError:
        print('非纯数字!')
        return None, None
    if not inpurt_data:
        return None, None
    if 0  index:
        main()
    else:
        target_episode = input("指定集数(如:3 = 从第3集往后全下,5+5 = 只下第五集, 5+6 = 下5,6集)留空全下:")
        eps_list = response_data['data'][inpurt_data - 1]['source']
        if target_episode == '':
            target_episode = "1"
        # 如存在组合0+0
        if target_episode.find('+') != -1:
            start_index = int(target_episode.split('+')[0]) - 1 # 取+号左边
            end_index = int(target_episode.split('+')[1]) - 1# 取+号左边
        else:
            start_index = int(target_episode) - 1
            end_index = len(eps_list['eps'])-1
        m3u8_list = []
        for index in range(start_index, end_index + 1):
            m3u8_list.append(eps_list['eps'][index])
        return response_data['data'][inpurt_data - 1]['name'], m3u8_list
def delete_subfolders_except_current(path):
    '''
    删除指定文件夹下的所有文件夹
    :param path: 路径
    :return: 无返回值
    '''
    current_dir = os.path.abspath(os.curdir)
    os.chdir(path)
    try:
        for item in os.listdir():
            full_path = os.path.join(os.getcwd(), item)
            if os.path.isdir(full_path) and item!= current_dir:
                try:
                    shutil.rmtree(full_path)
                    print(f"成功删除目录:{full_path}")
                except PermissionError as pe:
                    print(f"没有权限删除目录:{full_path}。错误信息:{pe}")
                except OSError as ose:
                    print(f"删除目录 {full_path} 时出现其他错误。错误信息:{ose}")
    except Exception as e:
        print(f"在遍历目录时出现错误:{e}")
def main():
    path = "E:/电视剧"  # 默认路径
    i = 0
    while True:
        if i >= 1:
            delete_subfolders_except_current(path + "/" + film_name)
        inpurt_film_name = input("-"*40+ "\n请输入需要下载的电影/电视剧名称:")
        if not inpurt_film_name:
            print('未输入片名')
            continue
        # 提取m3u8 返回电影名称,和m3u8主连接
        film_name, m3u8_urls = get_m3u8_url(inpurt_film_name)
        if film_name == None:
            continue
        # 下载m3u8
        film_list, film_names= download_m3u8(m3u8_urls, film_name, path)
        # 启动协程
        event_loop = asyncio.get_event_loop()
        event_loop.run_until_complete(download_all_videos(film_list, film_name, path, m3u8_urls))
        i += 1
if __name__ == '__main__':
    main()部分未搜索到的电影,或者偶尔m3u8下载空白时会报错,线程也会偶尔崩溃,多试几次应该没问题, 很多待优化,刚看视频学了几天python,写的有点乱,也欢迎各位大牛优化一下发我一份,界面没弄,还没学会, 目标网: ZGFnYS5jYw==     用BASE64解码自己去看把

集数, 电视剧

Do_zh   

这格式看着费劲。
HHJ200318   

看这很厉害,不知道我能不能学会
apples1949   

[Python] 纯文本查看 复制代码代码格式更好看
laolaide   

感谢分享
我很忙!
OP
  


apples1949 发表于 2024-8-31 09:11

刚才不会弄
我很忙!
OP
  


Do_zh 发表于 2024-8-31 09:03
这格式看着费劲。

改了改了,刚不会,
toloy   

我用油猴,方便
bacchus00   

好东西,谢谢分享啊,试试看
lingling134   

学了几天就会,已经很厉害了,看不懂的还在这
您需要登录后才可以回帖 登录 | 立即注册

返回顶部