中小学教材下载

查看 37|回复 0
作者:boychn   
[Python] 纯文本查看 复制代码import concurrent.futures
import logging
import os
import queue
import sys
import requests
def init_logging():
    logging_format = '%(asctime)s\t[%(levelname)s]\t[%(name)s]\t[%(threadName)s]\t%(message)s'
    logging.basicConfig(format=logging_format, level=logging.INFO)
def request_data_version() -> [str]:
    # 目标URL
    data_url = "https://s-file-1.ykt.cbern.com.cn/zxx/ndrs/resources/tch_material/version/data_version.json"
    try:
        logging.info("start to request url: %s", data_url)
        response = requests.get(data_url)
        logging.info("status_code %s, body: %s", response.status_code, response.text)
        if response.status_code == 200:
            data = response.json()
            return data["urls"].split(",")
        else:
            logging.warning("could not get right response, system exit -1.")
            sys.exit(-1)
    except requests.exceptions.RequestException as e:
        logging.exception("failed to request data_version", e)
def request_single_data_url(data_url: str) -> [dict]:
    try:
        logging.info("start to request url: %s", data_url)
        response = requests.get(data_url)
        logging.info("status_code %s", response.status_code)
        if response.status_code == 200:
            data = response.json()
            books = []
            for d in data:
                book_data = parse_book_data(d)
                if len(book_data) != 0:
                    books.append(book_data)
                else:
                    logging.warning("skip this book: %s", d['title'])
            return books
    except requests.exceptions.RequestException as e:
        logging.exception("failed to request single_data_url", e)
def parse_book_data(data: dict) -> dict:
    book_id = data['id']
    book_name = data['title']
    book_tags = data['tag_list']
    book_tags = sorted(book_tags, key=custom_sort_tag_list, reverse=True)
    if '版' not in (book_tags[0]['tag_name']):
        return {}
    return {"id": book_id, "name": book_name, "dirs": get_dirs_from_tags(book_tags)}
def get_dirs_from_tags(book_tags) -> str:
    return f"{book_tags[0]['tag_name']}/{book_tags[1]['tag_name']}/{book_tags[2]['tag_name']}"
def custom_sort_tag_list(tag):
    tag_name = tag['tag_name']
    school_level, edition, grade, semester, course_name = '', '', '', '', ''
    if "版" in tag_name:
        edition = tag_name
    elif "年级" in tag_name:
        grade = tag_name
    elif "册" in tag_name:
        semester = tag_name
    elif '小学' in tag_name or '初中' in tag_name or '高中' in tag_name:
        school_level = tag_name
    elif '教材' not in tag_name:
        course_name = tag_name
    return edition, school_level, grade, course_name, semester
def download_book(book):
    file_dir = f"{download_base_dir}/{book['dirs']}"
    file_name = f"{book['name']}.pdf"
    file_full_path = os.path.join(file_dir, file_name)
    logging.info("start download book: %s/%s", file_dir, file_name)
    try:
        if not os.path.exists(file_dir):
            os.makedirs(file_dir)
    except FileExistsError:
        logging.warning("the folder %s already exists", file_dir)
    download_url = f"https://r1-ndr.ykt.cbern.com.cn/edu_product/esp/assets_document/{book['id']}.pkg/pdf.pdf"
    response = requests.get(download_url)
    if response.status_code == 200:
        with open(os.path.join(file_dir, file_name), "wb") as file:
            file.write(response.content)
            logging.info("download book: %s successfully", file_full_path)
    else:
        logging.warning("failed to download book, status_code: %s", response.status_code)
def multi_thread_download_books(shared_queue: queue.Queue):
    try:
        while True:
            item = shared_queue.get_nowait()
            download_book(item)
            shared_queue.task_done()
    except queue.Empty:
        logging.warning("queue is empty")
def request_all_books_to_shared_queue():
    shared_queue = queue.Queue(maxsize=3000)
    urls = request_data_version()
    for url in urls:
        books = request_single_data_url(url)
        for book in books:
            shared_queue.put(book)
    return shared_queue
if __name__ == '__main__':
    init_logging()
    # 下载文件保存的文件夹
    download_base_dir = 'd:/book3'
    # 线程数量
    thread_count = 8
    all_books_queue = request_all_books_to_shared_queue()
    with concurrent.futures.ThreadPoolExecutor(max_workers=thread_count) as executor:
        futures = [executor.submit(multi_thread_download_books, all_books_queue) for i in range(thread_count)]
        concurrent.futures.wait(futures)
    logging.info("Done!!!")

中小学, 教材

您需要登录后才可以回帖 登录 | 立即注册

返回顶部