from urllib import parse
import json
from Aria2_RPC import Aria2Download
class AlistDownload:
def __init__(self, url):
self.headers = {
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Content-Type": "application/json;charset=UTF-8",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9"
}
parseresult = parse.urlparse(url)
scheme = parseresult.scheme
netloc = parseresult.netloc
path = parse.unquote(parseresult.path)
self.host = f"{scheme}://{netloc}"
self.aria2 = Aria2Download()
self.get_list(path)
print("遍历完成")
def get_list(self, path):
url = self.host + "/api/fs/list"
data = {"path": path, "password": "", "page": 1, "per_page": 0, "refresh": False}
req = requests.post(url=url, data=json.dumps(data), headers=self.headers)
status_code = req.status_code
file_list = []
if status_code == 200:
req_json = req.json()
message = req_json.get("message")
content = req_json.get("data")["content"]
# print(message, content)
if content is None:
pass
else:
for file_info in content:
if file_info["is_dir"] is True:
file_download_url = path + "/" + file_info["name"]
print("dir", file_download_url)
file_list.append({"is_dir": True, "path": file_download_url})
else:
file_download_url = self.host + "/d" + path + "/" + file_info["name"]
print("file", file_download_url)
file_list.append(
{"is_dir": False, "url": file_download_url, "path": path, "file": file_info["name"]})
else:
print(status_code)
req.close()
for file in file_list:
if file["is_dir"] is True:
self.get_list(file["path"])
else:
# D:\download 文件下载的存储地址
self.aria2.addUri(file["url"], r"D:\download" + file["path"], file["file"])
if __name__ == '__main__':
AlistDownload("http://此处填入Alist网盘的地址")
Aria2的API详见这里:基于Aria2的API接口进行批量下载源码