[Python] 纯文本查看 复制代码import re, sys import time import requests, json, base64 from rich.console import Console from rich.table import Table from hm3u8dl_cli import m3u8download from hm3u8dl_cli import idm class Xet: def __init__(self, APPID, cookie: str = ''): """ 一个解析模块 :param APPID: 必填 如:appy6xyj9p87665 :param cookie: 非必填 """ # url = 'https://appy6xyj9p87665.h5.xiaoeknow.com/p/course/video/v_62217c28e4b066e9608b63e8?product_id=p_621f4eb7e4b02b8258503f35' self.APPID = APPID self.COOKIE = cookie if self.COOKIE == '': self.COOKIE = self.get_cookie(self.APPID) self.HEADERS = { 'referer': f'https://{self.APPID}.h5.xiaoeknow.com', 'user-agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Mobile Safari/537.36 Edg/105.0.1343.33', 'cookie': self.COOKIE } def get_cookie(self, APPID): try: playurl = f'https://{APPID}.h5.xiaoeknow.com' response = requests.get(playurl, allow_redirects=False) set_cookie = response.headers.get('set-cookie') cookie = re.findall('(anony_token=.+?);', set_cookie)[0] return cookie except: return None def get_cookie_from_pic(self,APPID): url = f'https://{APPID}.h5.xiaoeknow.com/p/decorate/personal_center' session = requests.session() response = session.get(url,allow_redirects=False).headers Location = response['Location'] response2 = session.get(Location) wxurl = f'https://{APPID}.h5.xiaoeknow.com/xe.account-platform.account.auth.get_qr_code' data = { 'app_id':APPID, 'bizData[login_key]':0 } response3 = session.post(wxurl,data=data).json() print(response3) code = response3['data']['code'] qr_code_url = response3['data']['qr_code_url'] pic = session.get(qr_code_url,allow_redirects=True) print(pic.headers) # 检测二维码状态########################### authorize_status_url = f'https://{APPID}.h5.xiaoeknow.com/xe.account-platform.account.auth.authorize_status' data_authorize_status = { 'app_id':APPID, 'bizData[code]':code, 'bizData[login_key]':'' } while True: response_authorize_status = session.post(authorize_status_url,data=data_authorize_status).json() print(response_authorize_status) time.sleep(1) def get_all_resourceId_from_productId(self,APPID,product_id): product_ids = [] url = f'https://{APPID}.h5.xiaoeknow.com/get/topic_products' data = { 'bizData[page_index]': 1, 'bizData[page_size]': 100, 'bizData[topic_id]': product_id } response = requests.post(url,headers=self.HEADERS,data=data).json() datas = response['data'] all_resource_ids = [] for data in datas: product_ids.append(data['id']) resource_ids = self.get_resourceId_from_productId(APPID,data['id']) all_resource_ids += resource_ids return all_resource_ids def get_resourceId_from_productId(self, APPID, product_id): resource_ids = [] url = f'https://{APPID}.h5.xiaoeknow.com/xe.course.business.column.items.get/2.0.0' data = { 'bizData[column_id]': product_id, 'bizData[page_index]': 1, 'bizData[page_size]': 100, 'bizData[sort]': 'desc' } try: # print(self.HEADERS) response = requests.post(url=url, headers=self.HEADERS, data=data) response = response.json() except: raise ('解析失败') total = response['data']['total'] Lists = response['data']['list'] for List in Lists: resource_id = List['resource_id'] resource_title = List['resource_title'] playurl = f'https://{self.APPID}.h5.xiaoeknow.com/p/course/video/{resource_id}?product_id={product_id}' info = { 'resource_title': resource_title, 'resource_id': resource_id, 'playurl': playurl } resource_ids.append(info) return resource_ids def get_resourceId_from_productId2(self, APPID, product_id): resource_ids = [] url = f'https://{self.APPID}.h5.xiaoeknow.com/get/topic_products' data = { 'bizData[topic_id]': product_id, 'bizData[page_index]': 1, 'bizData[page_size]': 100, } try: response = requests.post(url=url, headers=self.HEADERS, data=data).json() Lists = response['data'][0]['resource_list'] for List in Lists: resource_id = List['id'] resource_title = List['resource_name'] playurl = f'https://{self.APPID}.h5.xiaoeknow.com/p/course/video/{resource_id}?product_id={product_id}' info = { 'resource_title': resource_title, 'resource_id': resource_id, 'playurl': playurl } resource_ids.append(info) return resource_ids except: print(response) sys.exit(0) def get_USERID_signature(self, APPID, resource_id): """ keyurl 后加上 uid=anonymous_USERID 可正常访问 :return: USERID, nonceStr, share_title, signature """ url = f'https://{APPID}.h5.xiaoeknow.com/xe.course.business.wechat.init' wechat_init_url = f'https://{APPID}.h5.xiaoeknow.com' data = { 'bizData[resource_id]': resource_id, 'bizData[wechat_init_url]': wechat_init_url } response = requests.post(url, headers=self.HEADERS, data=data).json() commonData = response['data']['commonData'] USERID = commonData['USERID'] wxData = response['data']['wxData'] nonceStr = wxData['nonceStr'] share_title = wxData['share_title'] signature = wxData['signature'] return USERID, nonceStr, share_title, signature def dec_m3u8(self, t): return base64.b64decode( t.replace('_ba', '').replace('@', '1').replace('#', '2').replace('$', '3').replace('%', '4')).decode() def get_detail_info(self, APPID, resource_id:str, product_id,title = None): # url_audio = f'https://{APPID}.h5.xiaoeknow.com/xe.course.business.audio.info.get/2.0.0' if resource_id.startswith('v_'): url = f'https://{APPID}.h5.xiaoeknow.com/xe.course.business.video.detail_info.get/2.0.0' data = { 'bizData[resource_id]': resource_id, 'bizData[product_id]': product_id, 'bizData[opr_sys]': 'Win32' } response = requests.post(url, headers=self.HEADERS, data=data).json() try: if 'video_info' in response['data']: video_info = response['data']['video_info'] video_length = video_info['video_length'] file_name = video_info['file_name'] video_urls = response['data']['video_urls'] video_urls = json.loads(self.dec_m3u8(video_urls)) # print(video_urls) results = [] for video_url in video_urls: host = video_url['ext']['host'] path = video_url['ext']['path'] baseuri = host + '/' + path + '/' param = '?' + video_url['ext']['param'] m3u8url = video_url['url'] # print(baseuri, param) info = { 'm3u8url': m3u8url, 'baseuri': baseuri, 'param': param, 'file_name': file_name } results.append(info) # baseuri + content + param return results elif 'audio_info' in response['data']: audio_info = response['data']['audio_info'] title = audio_info['title'] audio_url = audio_info['audio_url'] print(title) idm.download(audio_url, save_name=title + '.mp3') return None else: sys.exit() except: print(response['msg']) raise '' elif resource_id.startswith('l_'): url = f'https://{APPID}.h5.xiaoeknow.com/_alive/v3/get_lookback_url?alive_id={resource_id}' response = requests.get(url, headers=self.HEADERS).json() try: aliveVideoMp4Url = response['data']['aliveVideoMp4Url'] aliveVideoUrl = response['data']['aliveVideoUrl'] if aliveVideoMp4Url != '': m3u8download(aliveVideoMp4Url,title=title) elif aliveVideoUrl != '': m3u8download(aliveVideoUrl,title=title) except: print(response) raise '' def decrypt(self, m3u8url: str) -> str: """ xiaoetong 替换链接 :param m3u8url: 传入m3u8/ts链接 :return: 不加密的链接 """ replace_header = ['encrypt-k-vod.xet.tech'] # true_header = '1252524126.vod2.myqcloud.com' true_header = 'live-video-tx.xiaoeknow.com' for i in replace_header: if i in m3u8url: m3u8url = m3u8url.replace(i, true_header) m3u8url = re.sub('_\d+', '', m3u8url).replace('.ts', '.m3u8').split('?')[0] return m3u8url def decrypt_key(self, USERID, key0): key = [] uid = list(USERID.encode()) key0 = list(base64.b64decode(key0)) for i, j in enumerate(key0): key.append(key0[i] ^ uid[i]) return base64.b64encode(bytes(key)).decode() def run(self, APPID, resource_id, product_id,title = None): results = self.get_detail_info(APPID, resource_id, product_id,title = title) if results is not None: USERID, nonceStr, share_title, signature = self.get_USERID_signature(APPID, resource_id) for result in results: m3u8url = result['m3u8url'] baseuri = result['baseuri'] param = result['param'] file_name = result['file_name'] # print(m3u8url, baseuri, param) response = requests.get(m3u8url).text # 解key 不用key也能下载 keyurl = re.findall('URI="(.+?)"', response)[0] + f'&uid={USERID}' key0 = base64.b64encode(requests.get(keyurl).content).decode() key = self.decrypt_key(USERID, key0) # 找到一个ts链接去生成m3u8链接 temp_url = re.findall('(v\..+?).ts', response)[0].replace("_0", "") + '.m3u8' # m3u8url = self.decrypt(baseuri + temp_url + param) m3u8url = baseuri + temp_url + param print(file_name, key, m3u8url) m3u8download(m3u8url=m3u8url, title=file_name,key=key, headers=self.HEADERS) def listSort(self, List1): table = Table() console = Console(color_system='256', style=None) List2 = [] if List1 == []: raise ('列表获取错误') elif len(List1) == 1: return List1 i = 1 table.add_column(f'[red]sn') table.add_column(f'[red]title') table.add_column(f'[red]resource_id') for List in List1: table.add_row( str(i), List['resource_title'], List['resource_id'], ) i = i + 1 console.print(table) numbers = input('输入下载序列(① 5 ② 4-10 ③ 4 10):') if ' ' in numbers: for number in numbers.split(' '): List2.append(List1[int(number) - 1]) elif '-' in numbers: number = re.findall('\d+', numbers) return List1[int(number[0]) - 1:int(number[1])] else: number = re.findall('\d+', numbers) List2.append(List1[int(number[0]) - 1]) return List2 return List2 def single_parse(self, resource_id, product_id=None): """ 单个视频下载 :param resource_id: 如:v_62217c28e4b066e9608b63e8 :param product_id: 如:p_621f4eb7e4b02b8258503f35 :return: """ self.run(self.APPID, resource_id, product_id) def batch_parse(self, product_id): """ 批量解析下载 :param product_id: 如:p_621f4eb7e4b02b8258503f35 :return: """ # infos = self.get_all_resourceId_from_productId(self.APPID, product_id) infos = self.get_resourceId_from_productId(self.APPID, product_id) if infos == []: infos = self.get_resourceId_from_productId2(self.APPID, product_id) # raise '空列表' infos = self.listSort(infos) for info in infos: self.run(self.APPID, info['resource_id'], product_id,title=info['resource_title']) if __name__ == '__main__': # cookie = 'anony_token=b9235d170d35c7122fd60f04188e3bcc;' xet = Xet('appy6xyj9p87665',cookie='') # 批量解析 xet.batch_parse('p_621f4eb7e4b02b8258503f35') # 单个解析 # xet.single_parse('v_62fa64a3e4b050af23a99220','')
hecoter12138 发表于 2023-11-26 10:44 [mw_shl_code=python,true]import re, sys import time import requests, json, base64 感谢分享!还缺少2个包,能继续分享下吗? 谢谢! from rich.table import Table from hm3u8dl_cli import m3u8download
baliao 发表于 2023-11-26 11:13 感谢分享!还缺少2个包,能继续分享下吗? 谢谢! from rich.table import Table from hm3u8dl_cli import ... pip install rich,hm3u8dl_cli
hecoter12138 发表于 2023-11-26 11:13 pip install rich,hm3u8dl_cli 感谢大佬快速的指导! 在安装hm3u8dl_cli 包,提示这个错误, 百度了下还是没找到解决方案,请教下,大概如何解决这个? 谢谢! c:\py38\Scripts> c:\py38\Scripts>pip install hm3u8dl_cli --trusted-host pypi.douban.com Looking in indexes: http://pypi.douban.com/simple Collecting hm3u8dl_cli Using cached https://mirrors.cloud.tencent.com/pypi/packages/91/6c/92533a7089f1ab1161b1930ca59980bc2fbcf804d981740c42788b9fc71f/hm3u8dl_cli-0.4.9-py3-none-any.whl Requirement already satisfied: pycryptodome in c:\py38\lib\site-packages (from hm3u8dl_cli) (3.19.0) Requirement already satisfied: rich in c:\py38\lib\site-packages (from hm3u8dl_cli) (13.6.0) Requirement already satisfied: tornado in c:\py38\lib\site-packages (from hm3u8dl_cli) (6.3.3) Requirement already satisfied: retry in c:\py38\lib\site-packages (from hm3u8dl_cli) (0.9.2) Requirement already satisfied: multiprocess in c:\py38\lib\site-packages (from hm3u8dl_cli) (0.70.15) Requirement already satisfied: tqdm in c:\py38\lib\site-packages (from hm3u8dl_cli) (4.66.1) Requirement already satisfied: m3u8 in c:\py38\lib\site-packages (from hm3u8dl_cli) (3.6.0) Requirement already satisfied: requests in c:\py38\lib\site-packages (from hm3u8dl_cli) (2.31.0) Requirement already satisfied: typing-extensions=4.0.0; python_version hm3u8dl_cli) (4.8.0) Requirement already satisfied: pygments=2.13.0 in c:\py38\lib\site-packages (from rich->hm3u8dl_cli) (2.16.1) Requirement already satisfied: markdown-it-py>=2.2.0 in c:\py38\lib\site-packages (from rich->hm3u8dl_cli) (3.0.0) Requirement already satisfied: decorator>=3.4.2 in c:\py38\lib\site-packages (from retry->hm3u8dl_cli) (5.1.1) Requirement already satisfied: py=1.4.26 in c:\py38\lib\site-packages (from retry->hm3u8dl_cli) (1.11.0) Requirement already satisfied: dill>=0.3.7 in c:\py38\lib\site-packages (from multiprocess->hm3u8dl_cli) (0.3.7) Requirement already satisfied: colorama; platform_system == "Windows" in c:\py38\lib\site-packages (from tqdm->hm3u8dl_cli) (0.4.6) Requirement already satisfied: iso8601 in c:\py38\lib\site-packages (from m3u8->hm3u8dl_cli) (2.1.0) Requirement already satisfied: urllib3=1.21.1 in c:\py38\lib\site-packages (from requests->hm3u8dl_cli) (2.0.7) Requirement already satisfied: idna=2.5 in c:\py38\lib\site-packages (from requests->hm3u8dl_cli) (3.4) Requirement already satisfied: charset-normalizer=2 in c:\py38\lib\site-packages (from requests->hm3u8dl_cli) (3.3.1) Requirement already satisfied: certifi>=2017.4.17 in c:\py38\lib\site-packages (from requests->hm3u8dl_cli) (2023.7.22) Requirement already satisfied: mdurl~=0.1 in c:\py38\lib\site-packages (from markdown-it-py>=2.2.0->rich->hm3u8dl_cli) (0.1.2) Installing collected packages: hm3u8dl-cli ERROR: Could not install packages due to an EnvironmentError: [Errno 22] Invalid argument: 'C:\\Users\\Administrator\\AppData\\Local\\Temp\\pip-install-3ob6clet\\hm3u8dl-cli\\hm3u8dl_cli\\Tools\\youkudecrypt.exe' WARNING: You are using pip version 19.2.3, however version 23.3.1 is available. You should consider upgrading via the 'python -m pip install --upgrade pip' command. c:\py38\Scripts>