【Pyton】 根据关键词爬取音乐网音乐

查看 132|回复 9
作者:Cristy   
该功能块会实现搜索、下载某音乐网站的功能。可以当做一个便捷下载器使用。
输入的关键字会自动创建文件夹,将下载好的歌曲放入对应的文件夹中。
执行效果


image.png (43.17 KB, 下载次数: 0)
下载附件
2023-1-11 12:15 上传



image.png (84.64 KB, 下载次数: 0)
下载附件
2023-1-11 12:16 上传

本机使用的python3.9
目前有两个版本都有点小问题,后续会慢慢调整。
初始版本:
    问题:单线程下载速度慢。
新版本:
    多线程下载速度快。
    问题:目前下载一个关键字没有问题。
    但是多个关键字的歌曲会出现问题。获取到多个关键字的所有歌曲后,会下载多次放到各个目录下。
    比如设定2个关键字A、B,每个下10首,则会出现A的10首歌即出现在A目录下也出现在B目录下,B的10首也是。会下载多次
源代码
# 初始版本
[Python] 纯文本查看 复制代码# 下载MP3文件到本地
import os
import requests
import json
import urllib.parse
#  ---------启动配置区----------
keyNames = ["张国荣"]
save_path = "D:/Music"
# 每个关键字下载多少首   每个关键词下载 pageNum * 10首
pageNum = 1
# 单线程下载
def DownloadFile(mp3_url, save_url, file_name):
    try:
        if mp3_url is None or save_url is None or file_name is None:
            print('参数错误')
            return None
        # 文件夹不存在,则创建文件夹
        folder = os.path.exists(save_url)
        if not folder:
            os.makedirs(save_url)
        # 读取MP3资源
        res = requests.get(mp3_url, stream=True)
        # 获取文件地址
        file_path = os.path.join(save_url, file_name)
        print('开始写入文件:', file_path)
        # 打开本地文件夹路径file_path,以二进制流方式写入,保存到本地
        with open(file_path, 'wb') as fd:
            for chunk in res.iter_content():
                fd.write(chunk)
        print(file_name + ' 成功下载!')
    except:
        print("下载程序错误!")
def downLoadMp3Files(url, save_url, title, author):
    file_name = title + "-" + author + ".mp3"
    DownloadFile(url, save_url, file_name)
def getPageList(author, pageNum):
    url = "https://www.jbsou.cn/"
    files = [
    ]
    headers = {
        'authority': 'www.jbsou.cn',
        'cookie': '__51cke__=; __51uvsct__JPfsNuTH3tFBO3If=1; __51vcke__JPfsNuTH3tFBO3If=7ce649a4-2f3a-5f38-a46e-c0ba70372790; __51vuft__JPfsNuTH3tFBO3If=1657457772373; Hm_lvt_76749151b22324218af38671cb634aaa=1657457772; __gads=ID=9569f23632163ae7-22f1a6fda4d30046:T=1657457774:RT=1657457774:S=ALNI_MaiKWrecyIGhnohZrRQXzpbNmAhWw; __gpi=UID=00000641ebdfbb03:T=1657457774:RT=1657457774:S=ALNI_MZ77MogL7uZoU0-4rFToiZw2Tno4Q; __tins__19428461={"sid": 1657457772361, "vd": 3, "expires": 1657459620332}; __51laig__=3; __vtins__JPfsNuTH3tFBO3If={"sid": "497a83dc-a208-56b0-b362-e29c476ca102", "vd": 3, "stt": 47972, "dr": 9098, "expires": 1657459620342, "ct": 1657457820342}; Hm_lpvt_76749151b22324218af38671cb634aaa=1657457820',
        'origin': 'https://www.jbsou.cn',
        'referer': 'https://www.jbsou.cn/?name=' + urllib.parse.quote(author.encode('gb2312')) + '&type=netease',
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
        'x-requested-with': 'XMLHttpRequest'
    }
    resultList = []
    for page in range(pageNum):
        payload = {'input': author,
                   'filter': 'name',
                   'type': 'netease',
                   'page': str(page + 1)}
        response = requests.request("POST", url, headers=headers, data=payload, files=files)
        try:
            songList = json.loads(response.text)["data"]
            for i in songList:
                tempDict = {}
                tempDict["url"] = i["url"]
                tempDict["author"] = i["author"]
                tempDict["title"] = i["title"]
                resultList.append(tempDict)
            # print("——【{}】歌曲第{}页信息获取如下——\n{}".format(author,page,resultList))
            print("第{}页一共有{}首歌".format(page + 1, len(songList)))
        except:
            print("获取歌曲信息错误!")
    return resultList
for keyName in keyNames:
    save_url = save_path + "/" + keyName + "/"
    # 获取所有页面歌曲信息
    mp3pageList = getPageList(keyName, pageNum)
    print("——【{}】获取下载信息完毕,共有{}首歌曲".format(keyName, len(mp3pageList)))
    # 打印所有歌曲信息
    for i in mp3pageList:
        print(i)
    # 下载歌曲
    for i in mp3pageList:
        # 单线程下载
        downLoadMp3Files(i["url"], save_url, i["title"], i["author"])
    print("——【{}】所有歌曲下载完毕,共有{}首歌曲!!——".format(keyName, len(mp3pageList)))
# 新版
[Python] 纯文本查看 复制代码# -*- coding: UTF-8 -*-
# 多线程相关
from __future__ import annotations
# 用于显示进度条
import random
from tqdm import tqdm
# 用于发起网络请求
import requests
# 用于多线程操作
import multitasking
import signal
# 导入 retry 库以方便进行下载出错重试
from retry import retry
# 常用导入
# 下载MP3文件到本地
import time
import json
import urllib.parse
import os
from multiprocessing.dummy import Pool as ThreadPool  # 线程池
from sys import stdout
"""
    配置区域
"""
# 存储路径
save_path = "G:/"
# 关键字
keyNames = ["小虎队", "邓丽君", "郭富城", "刘德华", "张学友"]
waittime = 0  # 每次下载等待时间
# 每个关键字下载多少首   每个关键词下载 songNum * 10首
songNum = 5
# 从第一页开始下载
start_page = 1
# 是否使用多线程
use_multhread = True
# 是否使用分块下载
use_block = False
totalThread = 8  # 多线程数量
############################################################
# 公用变量
resultListMul = []
user_agent_list = [
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; …) Gecko/20100101 Firefox/61.0",
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.186 Safari/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36",
    "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36",
    "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)",
    "Mozilla/5.0 (Macintosh; U; PPC Mac OS X 10.5; en-US; rv:1.9.2.15) Gecko/20110303 Firefox/3.6.15",
]
# 此处为多线程处理事件
def doingFunction(i):
    downLoadMp3Files(i["url"], save_url, i["title"], i["author"])
# 多线程工具
def mulTool(totalThread, inputList, function):
    """
    :param totalThread: 线程数
    :param inputList: 传入的参数列表(列表里依次存放每一次处理的参数)
    :param function: 每个线程处理的函数名
    :return:
    """
    pool = ThreadPool(totalThread)  # 创建一个包含 totalThread 个线程的线程池
    pool.map(function, inputList)
    pool.close()  # 关闭线程池的写入
    pool.join()  # 阻塞,保证子线程运行完毕后再继续主进程
    print("main func end")
# 下载文件方法
def DownloadFile(mp3_url, save_url, file_name, otherFuncUse=0):
    try:
        if mp3_url is None or save_url is None or file_name is None:
            print('参数错误')
            return None
        # 读取MP3资源
        res = requests.get(mp3_url, stream=True)
        if otherFuncUse != 0:
            if "Location" not in res.headers or "404" in res.headers["Location"] or "Content-Length" not in res.headers:
                print("文件名:{} - url:{}  获取服务器端文件为空,不做处理!".format(file_name, mp3_url))
                return
        try:
            if "很抱歉,你要查找的网页找不到" in res.content.decode('utf-8'):
                print("文件名:{} - url:{}  页面404,不做处理!".format(file_name, mp3_url))
                return
        except:
            None
        # 获取文件地址
        file_path = os.path.join(save_url, file_name)
        print('开始写入文件:', file_path)
        if "Content-Length" in res.headers:
            filesize = res.headers["Content-Length"]
        else:
            filesize = len(res.content)
        with open(file_path, "wb") as f:
            f.write(res.content)
            chunk_size = 128
            times = int(filesize) // chunk_size
            show = 1 / times
            show2 = 1 / times
            start = 1
            for chunk in res.iter_content(chunk_size):
                f.write(chunk)
                if start  list[tuple[int, int]]:
    # 分多块
    parts = [(start, min(start + step, total)) for start in range(0, total, step)]
    return parts
def get_file_size(url: str, raise_error: bool = False) -> int:
    '''
    获取文件大小
    Parameters
    ----------
    url : 文件直链
    raise_error : 如果无法获取文件大小,是否引发错误
    Return
    ------
    文件大小(B为单位)
    如果不支持则会报错
    '''
    # allow_redirects=True 自动处理302
    # response = requests.head(url, allow_redirects=True)
    session = requests.session()
    while (1):
        try:
            response = session.get(url, headers=headers)
            if "404" in url or ("Location" in response.headers and "404" in response.headers["Location"]):
                return None
            if (response.status_code == requests.codes.ok):
                break
            elif (response.status_code == 302 or response.status_code == 301):
                print(url + " --重定向到--> " + response.headers["Location"])
                url = response.headers["Location"]
            else:
                print("出问题啦!等待3秒继续!", response.status_code)
                time.sleep(5)
        except Exception as e:
            print("出问题啦!等待3秒继续!", e)
            if '404' in e:
                return None
            time.sleep(3)
    file_size = response.headers.get('Content-Length')
    if file_size is None:
        if raise_error is True:
            raise ValueError('该文件不支持多线程分段下载!')
        return file_size
    return int(file_size)
def downloadMulTool(save_url: str, url: str, file_name: str, retry_times: int = 3, each_size=1000 * B) -> None:
    '''
    根据文件直链和文件名下载文件
    Parameters
    ----------
    save_url: 存储路径
    url : 文件直链
    file_name : 文件名
    retry_times: 可选的,每次连接失败重试次数
    Return
    ------
    None
    '''
    file_path = os.path.join(save_url, file_name)
    for i in range(5):
        file_size = get_file_size(url)
        if file_size is None or file_size = 4 and (file_size is None or file_size  None:
        '''
        根据文件起止位置下载文件
        Parameters
        ----------
        start : 开始位置
        end : 结束位置
        '''
        _headers = headers.copy()
        # 分段下载的核心
        _headers['Range'] = f'bytes={start}-{end}'
        # 发起请求并获取响应(流式)
        response = session.get(url, headers=_headers, stream=True)
        # 每次读取的流式响应大小
        chunk_size = 128
        # 暂存已获取的响应,后续循环写入
        chunks = []
        for chunk in response.iter_content(chunk_size=chunk_size):
            # 暂存获取的响应
            chunks.append(chunk)
            # 更新进度条
            bar.update(chunk_size)
        f.seek(start)
        for chunk in chunks:
            f.write(chunk)
        # 释放已写入的资源
        del chunks
    session = requests.Session()
    # 分块文件如果比文件大,就取文件大小为分块大小
    each_size = min(each_size, file_size)
    print("文件名:{} -- 文件大小:{} -- 分块大小:{}".format(file_name, file_size, each_size))
    # 分块
    parts = split(0, file_size, each_size, file_size)
    print(f'分块数:{len(parts)}')
    # 创建进度条
    bar = tqdm(total=file_size, desc=f'下载文件:{file_name}')
    for part in parts:
        start, end = part
        start_download(start, end)
    # 等待全部线程结束
    multitasking.wait_for_tasks()
    f.close()
    bar.close()
# ----多线程下载工具 end ----
def downLoadMp3Files(url, save_url, title, author):
    # 文件夹不存在,则创建文件夹
    folder = os.path.exists(save_url)
    if not folder:
        try:
            os.makedirs(save_url)
        except:
            print("{}  创建失败!".format(folder))
    file_name = title + "-" + author + ".mp3"
    if use_block:
        downloadMulTool(save_url, url, file_name)
    else:
        DownloadFile(url, save_url, file_name)
# 单线程获取列表
def getPageList(author, pageNum):
    url = "https://www.jbsou.cn/"
    files = [
    ]
    headers = {
        'authority': 'www.jbsou.cn',
        'cookie': '__51cke__=; __51uvsct__JPfsNuTH3tFBO3If=1; __51vcke__JPfsNuTH3tFBO3If=7ce649a4-2f3a-5f38-a46e-c0ba70372790; __51vuft__JPfsNuTH3tFBO3If=1657457772373; Hm_lvt_76749151b22324218af38671cb634aaa=1657457772; __gads=ID=9569f23632163ae7-22f1a6fda4d30046:T=1657457774:RT=1657457774:S=ALNI_MaiKWrecyIGhnohZrRQXzpbNmAhWw; __gpi=UID=00000641ebdfbb03:T=1657457774:RT=1657457774:S=ALNI_MZ77MogL7uZoU0-4rFToiZw2Tno4Q; __tins__19428461={"sid": 1657457772361, "vd": 3, "expires": 1657459620332}; __51laig__=3; __vtins__JPfsNuTH3tFBO3If={"sid": "497a83dc-a208-56b0-b362-e29c476ca102", "vd": 3, "stt": 47972, "dr": 9098, "expires": 1657459620342, "ct": 1657457820342}; Hm_lpvt_76749151b22324218af38671cb634aaa=1657457820',
        'origin': 'https://www.jbsou.cn',
        'referer': 'https://www.jbsou.cn/?name=' + urllib.parse.quote(author.encode('gb2312')) + '&type=netease',
        'User-Agent': random.choice(user_agent_list),
        'x-requested-with': 'XMLHttpRequest'
    }
    resultList = []
    for page in range(pageNum):
        payload = {'input': author,
                   'filter': 'name',
                   'type': 'netease',
                   'page': str(page + 1)}
        temp = 0
        nowpageSongNum = 0
        while 1:
            try:
                response = requests.request("POST", url, headers=headers, data=payload, files=files)
                songList = json.loads(response.text)["data"]
                for i in songList:
                    tempDict = {}
                    tempDict["url"] = i["url"]
                    tempDict["author"] = i["author"]
                    tempDict["title"] = i["title"]
                    resultList.append(tempDict)
                # print("——【{}】歌曲第{}页信息获取如下——\n{}".format(author,page,resultList))
                print("第{}页一共有{}首歌".format(page + 1, len(songList)))
                nowpageSongNum = len(songList)
                break
            except Exception as e:
                temp += 1
                print("获取歌曲信息错误,准备重试!错误信息:{}".format(e))
            if temp == 5:
                print("获取歌曲信息多次错误!跳过处理")
                break
        if nowpageSongNum

文件, 多线程

爱的人   

建议改成面向对象
ppmjb   

多谢分享。
bywdyz2005   

学到了,谢谢
thengcee   

感谢楼主分享!
FIzz001   

很不错,谢谢分享
suilibin   

感谢分享!
iapeng   

不错,学习了
a725   

多谢分享!
风流人物   

有点东西,感谢分享
您需要登录后才可以回帖 登录 | 立即注册

返回顶部