感谢原创。本人学艺不精还请大家有好的意见一起交流[Python] 纯文本查看 复制代码import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import asyncio
import aiohttp
import aiofiles
import hashlib
import shutil
import os
import re
import requests
from tqdm import tqdm
import datetime
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
}
def download_m3u8(urls, film_name, path):
print("请等待 m3u8 文件下载完成!")
film_lists = []
film_dir = os.path.join(path, film_name)
os.makedirs(film_dir, exist_ok=True)
for url in tqdm(urls):
episode_name = url["name"].strip().replace('\t', '')
film_lists.append(episode_name)
episode_dir = os.path.join(film_dir, episode_name)
os.makedirs(episode_dir, exist_ok=True)
response = requests.get(url['url'], headers=headers)
m3u8_url = url['url'] if '#EXT-X-ENDLIST' in response.text else re.search(r'.*m3u8.*', response.text).group(0)
m3u8_content = requests.get(m3u8_url).text
with open(os.path.join(episode_dir, 'first.m3u8'), 'w') as f:
f.write(m3u8_content)
return film_lists, film_name
def get_key(key_url):
return requests.get(key_url).content
async def download_one(path, url, sem, key, episode):
file_name = url.split('/')[-1]
async with sem:
for _ in range(10):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.read()
if key:
cipher = Cipher(algorithms.AES(key), modes.CBC(b"0000000000000000"),
backend=default_backend())
decryptor = cipher.decryptor()
content = decryptor.update(content) + decryptor.finalize()
async with aiofiles.open(os.path.join(path, file_name), mode='wb') as f:
await f.write(content)
print(f"{episode} - {file_name} 完成片段!")
break
except Exception as e:
print(f"{episode} - {file_name} 超时重试!", e)
def merge(path, episode):
print("开始合并")
episode_dir = os.path.join(path, episode)
ts_files = []
with open(os.path.join(episode_dir, "first.m3u8"), "r", encoding='utf-8') as fa:
ts_files = [line.strip().split('/')[-1] for line in fa if not line.startswith("#")]
os.chdir(episode_dir)
n = 1
temp = []
for i, ts_file in enumerate(ts_files):
temp.append(ts_file)
if i > 0 and i % 100 == 0:
cmd = f"copy /b {'+'.join(temp)} {n}.ts"
os.system(cmd)
videos_del(temp)
temp = []
n += 1
if temp:
cmd = f"copy /b {'+'.join(temp)} {n}.ts"
os.system(cmd)
videos_del(temp)
last_temp = [f"{i}.ts" for i in range(1, n + 1)]
cmd = f"copy /b {'+'.join(last_temp)} {episode}.mp4"
os.system(cmd)
shutil.move(f"{episode}.mp4", path)
os.chdir(path)
for i in range(1, n + 1):
os.remove(os.path.join(episode_dir, f"{i}.ts"))
print("合并完成")
def videos_del(ts_files):
for ts_file in ts_files:
os.remove(ts_file)
async def download_all_videos(film_list, film_name, film_path, m3u8_urls):
tasks = []
sem = asyncio.Semaphore(100)
film_path = os.path.join(film_path, film_name)
for episode, m3u8_url in zip(film_list, m3u8_urls):
key_value = ''
resp = requests.get(m3u8_url['url'])
key_exists = 'AES-128' in resp.text
if key_exists:
key_url = m3u8_url['url'].replace('index.m3u8', 'enc.key')
key_value = get_key(key_url)
with open(os.path.join(film_path, episode, 'first.m3u8'), 'r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
line = line.strip()
task = asyncio.create_task(
download_one(os.path.join(film_path, episode), line, sem, key_value, episode))
tasks.append(task)
await asyncio.wait(tasks)
await asyncio.get_event_loop().run_in_executor(None, merge, film_path, episode)
print(f"{episode} 完成")
print("全部下载完成!")
def get_md5_day():
now = datetime.datetime.now()
day_of_num = str(now.day + 9 + 9 ^ 10)
md5_hash = hashlib.md5(day_of_num.encode()).hexdigest()[:10]
md5_hash = hashlib.md5(md5_hash.encode()).hexdigest()
return md5_hash, str(now.day + 11372)
def get_m3u8_url(film):
md5_hash, one_day = get_md5_day()
params = {'z': md5_hash, 'jx': film, 's1ig': one_day, 'g': ''}
response = requests.get('https://m1-a1.cloud.nnpp.vip:2223/api/v/', params=params, headers=headers)
if response.status_code != 200:
print(f"请求失败,状态码:{response.status_code}")
return None
try:
response_data = response.json()
except requests.exceptions.JSONDecodeError:
print("响应内容不是有效的 JSON 格式")
return None
if 'data' not in response_data:
print("未搜索到相关片源")
return None
return response_data['data']
def download_via_ui(film, selected_idx, episodes, path):
film_data = get_m3u8_url(film)
if not film_data:
return
selected_film = film_data[selected_idx]
film_name = selected_film['name']
m3u8_urls = selected_film['source']['eps']
target_episode = episodes[1] if len(episodes) > 1 else ''
if '-' in target_episode:
start_idx, end_idx = map(lambda x: int(x) - 1, target_episode.split('-'))
if start_idx = len(m3u8_urls):
end_idx = len(m3u8_urls) - 1
m3u8_list = m3u8_urls[start_idx:end_idx + 1]
elif ',' in target_episode:
indexes = [int(x) - 1 for x in target_episode.split(',')]
m3u8_list = [m3u8_urls[i] for i in indexes]
elif target_episode.strip() == '':
m3u8_list = m3u8_urls
else:
start_idx = int(target_episode) - 1
if start_idx = len(m3u8_urls):
start_idx = len(m3u8_urls) - 1
m3u8_list = [m3u8_urls[start_idx]]
film_lists, _ = download_m3u8(m3u8_list, film_name, path)
asyncio.run(download_all_videos(film_lists, film_name, path, m3u8_list))
def main_ui():
def select_directory():
path = filedialog.askdirectory()
if path:
entry_path.delete(0, tk.END)
entry_path.insert(0, path)
def search_film():
film = entry_film.get().strip()
if not film:
messagebox.showwarning("警告", "请输入影片名")
return
film_data = get_m3u8_url(film)
if not film_data:
messagebox.showwarning("警告", "未搜索到相关片源")
return
combo_film['values'] = [f"{item['name']} ({len(item['source']['eps'])}集)" for item in film_data]
combo_film.current(0)
def start_download():
film = entry_film.get().strip()
episodes = entry_episodes.get().strip().split('-')
path = entry_path.get().strip() or os.getcwd()
selected_idx = combo_film.current()
if not film or not episodes or not path or selected_idx