import logging
import os
import queue
import sys
import requests
def init_logging():
logging_format = '%(asctime)s\t[%(levelname)s]\t[%(name)s]\t[%(threadName)s]\t%(message)s'
logging.basicConfig(format=logging_format, level=logging.INFO)
def request_data_version() -> [str]:
# 目标URL
data_url = "https://s-file-1.ykt.cbern.com.cn/zxx/ndrs/resources/tch_material/version/data_version.json"
try:
logging.info("start to request url: %s", data_url)
response = requests.get(data_url)
logging.info("status_code %s, body: %s", response.status_code, response.text)
if response.status_code == 200:
data = response.json()
return data["urls"].split(",")
else:
logging.warning("could not get right response, system exit -1.")
sys.exit(-1)
except requests.exceptions.RequestException as e:
logging.exception("failed to request data_version", e)
def request_single_data_url(data_url: str) -> [dict]:
try:
logging.info("start to request url: %s", data_url)
response = requests.get(data_url)
logging.info("status_code %s", response.status_code)
if response.status_code == 200:
data = response.json()
books = []
for d in data:
book_data = parse_book_data(d)
if len(book_data) != 0:
books.append(book_data)
else:
logging.warning("skip this book: %s", d['title'])
return books
except requests.exceptions.RequestException as e:
logging.exception("failed to request single_data_url", e)
def parse_book_data(data: dict) -> dict:
book_id = data['id']
book_name = data['title']
book_tags = data['tag_list']
book_tags = sorted(book_tags, key=custom_sort_tag_list, reverse=True)
if '版' not in (book_tags[0]['tag_name']):
return {}
return {"id": book_id, "name": book_name, "dirs": get_dirs_from_tags(book_tags)}
def get_dirs_from_tags(book_tags) -> str:
return f"{book_tags[0]['tag_name']}/{book_tags[1]['tag_name']}/{book_tags[2]['tag_name']}"
def custom_sort_tag_list(tag):
tag_name = tag['tag_name']
school_level, edition, grade, semester, course_name = '', '', '', '', ''
if "版" in tag_name:
edition = tag_name
elif "年级" in tag_name:
grade = tag_name
elif "册" in tag_name:
semester = tag_name
elif '小学' in tag_name or '初中' in tag_name or '高中' in tag_name:
school_level = tag_name
elif '教材' not in tag_name:
course_name = tag_name
return edition, school_level, grade, course_name, semester
def download_book(book):
file_dir = f"{download_base_dir}/{book['dirs']}"
file_name = f"{book['name']}.pdf"
file_full_path = os.path.join(file_dir, file_name)
logging.info("start download book: %s/%s", file_dir, file_name)
try:
if not os.path.exists(file_dir):
os.makedirs(file_dir)
except FileExistsError:
logging.warning("the folder %s already exists", file_dir)
download_url = f"https://r1-ndr.ykt.cbern.com.cn/edu_product/esp/assets_document/{book['id']}.pkg/pdf.pdf"
response = requests.get(download_url)
if response.status_code == 200:
with open(os.path.join(file_dir, file_name), "wb") as file:
file.write(response.content)
logging.info("download book: %s successfully", file_full_path)
else:
logging.warning("failed to download book, status_code: %s", response.status_code)
def multi_thread_download_books(shared_queue: queue.Queue):
try:
while True:
item = shared_queue.get_nowait()
download_book(item)
shared_queue.task_done()
except queue.Empty:
logging.warning("queue is empty")
def request_all_books_to_shared_queue():
shared_queue = queue.Queue(maxsize=3000)
urls = request_data_version()
for url in urls:
books = request_single_data_url(url)
for book in books:
shared_queue.put(book)
return shared_queue
if __name__ == '__main__':
init_logging()
# 下载文件保存的文件夹
download_base_dir = 'd:/book3'
# 线程数量
thread_count = 8
all_books_queue = request_all_books_to_shared_queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_count) as executor:
futures = [executor.submit(multi_thread_download_books, all_books_queue) for i in range(thread_count)]
concurrent.futures.wait(futures)
logging.info("Done!!!")