输入的关键字会自动创建文件夹,将下载好的歌曲放入对应的文件夹中。
执行效果:
image.png (43.17 KB, 下载次数: 0)
下载附件
2023-1-11 12:15 上传
image.png (84.64 KB, 下载次数: 0)
下载附件
2023-1-11 12:16 上传
本机使用的python3.9
目前有两个版本都有点小问题,后续会慢慢调整。
初始版本:
问题:单线程下载速度慢。
新版本:
多线程下载速度快。
问题:目前下载一个关键字没有问题。
但是多个关键字的歌曲会出现问题。获取到多个关键字的所有歌曲后,会下载多次放到各个目录下。
比如设定2个关键字A、B,每个下10首,则会出现A的10首歌即出现在A目录下也出现在B目录下,B的10首也是。会下载多次
源代码
# 初始版本
[Python] 纯文本查看 复制代码# 下载MP3文件到本地
import os
import requests
import json
import urllib.parse
# ---------启动配置区----------
keyNames = ["张国荣"]
save_path = "D:/Music"
# 每个关键字下载多少首 每个关键词下载 pageNum * 10首
pageNum = 1
# 单线程下载
def DownloadFile(mp3_url, save_url, file_name):
try:
if mp3_url is None or save_url is None or file_name is None:
print('参数错误')
return None
# 文件夹不存在,则创建文件夹
folder = os.path.exists(save_url)
if not folder:
os.makedirs(save_url)
# 读取MP3资源
res = requests.get(mp3_url, stream=True)
# 获取文件地址
file_path = os.path.join(save_url, file_name)
print('开始写入文件:', file_path)
# 打开本地文件夹路径file_path,以二进制流方式写入,保存到本地
with open(file_path, 'wb') as fd:
for chunk in res.iter_content():
fd.write(chunk)
print(file_name + ' 成功下载!')
except:
print("下载程序错误!")
def downLoadMp3Files(url, save_url, title, author):
file_name = title + "-" + author + ".mp3"
DownloadFile(url, save_url, file_name)
def getPageList(author, pageNum):
url = "https://www.jbsou.cn/"
files = [
]
headers = {
'authority': 'www.jbsou.cn',
'cookie': '__51cke__=; __51uvsct__JPfsNuTH3tFBO3If=1; __51vcke__JPfsNuTH3tFBO3If=7ce649a4-2f3a-5f38-a46e-c0ba70372790; __51vuft__JPfsNuTH3tFBO3If=1657457772373; Hm_lvt_76749151b22324218af38671cb634aaa=1657457772; __gads=ID=9569f23632163ae7-22f1a6fda4d30046:T=1657457774:RT=1657457774:S=ALNI_MaiKWrecyIGhnohZrRQXzpbNmAhWw; __gpi=UID=00000641ebdfbb03:T=1657457774:RT=1657457774:S=ALNI_MZ77MogL7uZoU0-4rFToiZw2Tno4Q; __tins__19428461={"sid": 1657457772361, "vd": 3, "expires": 1657459620332}; __51laig__=3; __vtins__JPfsNuTH3tFBO3If={"sid": "497a83dc-a208-56b0-b362-e29c476ca102", "vd": 3, "stt": 47972, "dr": 9098, "expires": 1657459620342, "ct": 1657457820342}; Hm_lpvt_76749151b22324218af38671cb634aaa=1657457820',
'origin': 'https://www.jbsou.cn',
'referer': 'https://www.jbsou.cn/?name=' + urllib.parse.quote(author.encode('gb2312')) + '&type=netease',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
'x-requested-with': 'XMLHttpRequest'
}
resultList = []
for page in range(pageNum):
payload = {'input': author,
'filter': 'name',
'type': 'netease',
'page': str(page + 1)}
response = requests.request("POST", url, headers=headers, data=payload, files=files)
try:
songList = json.loads(response.text)["data"]
for i in songList:
tempDict = {}
tempDict["url"] = i["url"]
tempDict["author"] = i["author"]
tempDict["title"] = i["title"]
resultList.append(tempDict)
# print("——【{}】歌曲第{}页信息获取如下——\n{}".format(author,page,resultList))
print("第{}页一共有{}首歌".format(page + 1, len(songList)))
except:
print("获取歌曲信息错误!")
return resultList
for keyName in keyNames:
save_url = save_path + "/" + keyName + "/"
# 获取所有页面歌曲信息
mp3pageList = getPageList(keyName, pageNum)
print("——【{}】获取下载信息完毕,共有{}首歌曲".format(keyName, len(mp3pageList)))
# 打印所有歌曲信息
for i in mp3pageList:
print(i)
# 下载歌曲
for i in mp3pageList:
# 单线程下载
downLoadMp3Files(i["url"], save_url, i["title"], i["author"])
print("——【{}】所有歌曲下载完毕,共有{}首歌曲!!——".format(keyName, len(mp3pageList)))
# 新版
[Python] 纯文本查看 复制代码# -*- coding: UTF-8 -*-
# 多线程相关
from __future__ import annotations
# 用于显示进度条
import random
from tqdm import tqdm
# 用于发起网络请求
import requests
# 用于多线程操作
import multitasking
import signal
# 导入 retry 库以方便进行下载出错重试
from retry import retry
# 常用导入
# 下载MP3文件到本地
import time
import json
import urllib.parse
import os
from multiprocessing.dummy import Pool as ThreadPool # 线程池
from sys import stdout
"""
配置区域
"""
# 存储路径
save_path = "G:/"
# 关键字
keyNames = ["小虎队", "邓丽君", "郭富城", "刘德华", "张学友"]
waittime = 0 # 每次下载等待时间
# 每个关键字下载多少首 每个关键词下载 songNum * 10首
songNum = 5
# 从第一页开始下载
start_page = 1
# 是否使用多线程
use_multhread = True
# 是否使用分块下载
use_block = False
totalThread = 8 # 多线程数量
############################################################
# 公用变量
resultListMul = []
user_agent_list = [
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; …) Gecko/20100101 Firefox/61.0",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.186 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)",
"Mozilla/5.0 (Macintosh; U; PPC Mac OS X 10.5; en-US; rv:1.9.2.15) Gecko/20110303 Firefox/3.6.15",
]
# 此处为多线程处理事件
def doingFunction(i):
downLoadMp3Files(i["url"], save_url, i["title"], i["author"])
# 多线程工具
def mulTool(totalThread, inputList, function):
"""
:param totalThread: 线程数
:param inputList: 传入的参数列表(列表里依次存放每一次处理的参数)
:param function: 每个线程处理的函数名
:return:
"""
pool = ThreadPool(totalThread) # 创建一个包含 totalThread 个线程的线程池
pool.map(function, inputList)
pool.close() # 关闭线程池的写入
pool.join() # 阻塞,保证子线程运行完毕后再继续主进程
print("main func end")
# 下载文件方法
def DownloadFile(mp3_url, save_url, file_name, otherFuncUse=0):
try:
if mp3_url is None or save_url is None or file_name is None:
print('参数错误')
return None
# 读取MP3资源
res = requests.get(mp3_url, stream=True)
if otherFuncUse != 0:
if "Location" not in res.headers or "404" in res.headers["Location"] or "Content-Length" not in res.headers:
print("文件名:{} - url:{} 获取服务器端文件为空,不做处理!".format(file_name, mp3_url))
return
try:
if "很抱歉,你要查找的网页找不到" in res.content.decode('utf-8'):
print("文件名:{} - url:{} 页面404,不做处理!".format(file_name, mp3_url))
return
except:
None
# 获取文件地址
file_path = os.path.join(save_url, file_name)
print('开始写入文件:', file_path)
if "Content-Length" in res.headers:
filesize = res.headers["Content-Length"]
else:
filesize = len(res.content)
with open(file_path, "wb") as f:
f.write(res.content)
chunk_size = 128
times = int(filesize) // chunk_size
show = 1 / times
show2 = 1 / times
start = 1
for chunk in res.iter_content(chunk_size):
f.write(chunk)
if start list[tuple[int, int]]:
# 分多块
parts = [(start, min(start + step, total)) for start in range(0, total, step)]
return parts
def get_file_size(url: str, raise_error: bool = False) -> int:
'''
获取文件大小
Parameters
----------
url : 文件直链
raise_error : 如果无法获取文件大小,是否引发错误
Return
------
文件大小(B为单位)
如果不支持则会报错
'''
# allow_redirects=True 自动处理302
# response = requests.head(url, allow_redirects=True)
session = requests.session()
while (1):
try:
response = session.get(url, headers=headers)
if "404" in url or ("Location" in response.headers and "404" in response.headers["Location"]):
return None
if (response.status_code == requests.codes.ok):
break
elif (response.status_code == 302 or response.status_code == 301):
print(url + " --重定向到--> " + response.headers["Location"])
url = response.headers["Location"]
else:
print("出问题啦!等待3秒继续!", response.status_code)
time.sleep(5)
except Exception as e:
print("出问题啦!等待3秒继续!", e)
if '404' in e:
return None
time.sleep(3)
file_size = response.headers.get('Content-Length')
if file_size is None:
if raise_error is True:
raise ValueError('该文件不支持多线程分段下载!')
return file_size
return int(file_size)
def downloadMulTool(save_url: str, url: str, file_name: str, retry_times: int = 3, each_size=1000 * B) -> None:
'''
根据文件直链和文件名下载文件
Parameters
----------
save_url: 存储路径
url : 文件直链
file_name : 文件名
retry_times: 可选的,每次连接失败重试次数
Return
------
None
'''
file_path = os.path.join(save_url, file_name)
for i in range(5):
file_size = get_file_size(url)
if file_size is None or file_size = 4 and (file_size is None or file_size None:
'''
根据文件起止位置下载文件
Parameters
----------
start : 开始位置
end : 结束位置
'''
_headers = headers.copy()
# 分段下载的核心
_headers['Range'] = f'bytes={start}-{end}'
# 发起请求并获取响应(流式)
response = session.get(url, headers=_headers, stream=True)
# 每次读取的流式响应大小
chunk_size = 128
# 暂存已获取的响应,后续循环写入
chunks = []
for chunk in response.iter_content(chunk_size=chunk_size):
# 暂存获取的响应
chunks.append(chunk)
# 更新进度条
bar.update(chunk_size)
f.seek(start)
for chunk in chunks:
f.write(chunk)
# 释放已写入的资源
del chunks
session = requests.Session()
# 分块文件如果比文件大,就取文件大小为分块大小
each_size = min(each_size, file_size)
print("文件名:{} -- 文件大小:{} -- 分块大小:{}".format(file_name, file_size, each_size))
# 分块
parts = split(0, file_size, each_size, file_size)
print(f'分块数:{len(parts)}')
# 创建进度条
bar = tqdm(total=file_size, desc=f'下载文件:{file_name}')
for part in parts:
start, end = part
start_download(start, end)
# 等待全部线程结束
multitasking.wait_for_tasks()
f.close()
bar.close()
# ----多线程下载工具 end ----
def downLoadMp3Files(url, save_url, title, author):
# 文件夹不存在,则创建文件夹
folder = os.path.exists(save_url)
if not folder:
try:
os.makedirs(save_url)
except:
print("{} 创建失败!".format(folder))
file_name = title + "-" + author + ".mp3"
if use_block:
downloadMulTool(save_url, url, file_name)
else:
DownloadFile(url, save_url, file_name)
# 单线程获取列表
def getPageList(author, pageNum):
url = "https://www.jbsou.cn/"
files = [
]
headers = {
'authority': 'www.jbsou.cn',
'cookie': '__51cke__=; __51uvsct__JPfsNuTH3tFBO3If=1; __51vcke__JPfsNuTH3tFBO3If=7ce649a4-2f3a-5f38-a46e-c0ba70372790; __51vuft__JPfsNuTH3tFBO3If=1657457772373; Hm_lvt_76749151b22324218af38671cb634aaa=1657457772; __gads=ID=9569f23632163ae7-22f1a6fda4d30046:T=1657457774:RT=1657457774:S=ALNI_MaiKWrecyIGhnohZrRQXzpbNmAhWw; __gpi=UID=00000641ebdfbb03:T=1657457774:RT=1657457774:S=ALNI_MZ77MogL7uZoU0-4rFToiZw2Tno4Q; __tins__19428461={"sid": 1657457772361, "vd": 3, "expires": 1657459620332}; __51laig__=3; __vtins__JPfsNuTH3tFBO3If={"sid": "497a83dc-a208-56b0-b362-e29c476ca102", "vd": 3, "stt": 47972, "dr": 9098, "expires": 1657459620342, "ct": 1657457820342}; Hm_lpvt_76749151b22324218af38671cb634aaa=1657457820',
'origin': 'https://www.jbsou.cn',
'referer': 'https://www.jbsou.cn/?name=' + urllib.parse.quote(author.encode('gb2312')) + '&type=netease',
'User-Agent': random.choice(user_agent_list),
'x-requested-with': 'XMLHttpRequest'
}
resultList = []
for page in range(pageNum):
payload = {'input': author,
'filter': 'name',
'type': 'netease',
'page': str(page + 1)}
temp = 0
nowpageSongNum = 0
while 1:
try:
response = requests.request("POST", url, headers=headers, data=payload, files=files)
songList = json.loads(response.text)["data"]
for i in songList:
tempDict = {}
tempDict["url"] = i["url"]
tempDict["author"] = i["author"]
tempDict["title"] = i["title"]
resultList.append(tempDict)
# print("——【{}】歌曲第{}页信息获取如下——\n{}".format(author,page,resultList))
print("第{}页一共有{}首歌".format(page + 1, len(songList)))
nowpageSongNum = len(songList)
break
except Exception as e:
temp += 1
print("获取歌曲信息错误,准备重试!错误信息:{}".format(e))
if temp == 5:
print("获取歌曲信息多次错误!跳过处理")
break
if nowpageSongNum