import os,re,time,json,aiohttp,asyncio
url_list = []
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36"
}
directory = "txt" # 相对路径,将在当前工作目录下创建txt目录
if not os.path.exists(directory):
os.makedirs(directory)
async def fetch_post(url, headers, data):
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, data=data) as response:
return await response.text()
async def fetch_get(url, headers):
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
return await response.text()
async def get_list(bookid):#获取章节列表
data = {"bookId": bookid}
r = await fetch_post("https://bookapi.zongheng.com/api/chapter/getChapterList", data=data, headers=headers)
response_data = json.loads(r)
chapter_list = response_data["result"]["chapterList"]
for chapter in chapter_list:
for chapte in chapter["chapterViewList"]:
chapterId = chapte["chapterId"]
url_list.append(f"https://read.zongheng.com/chapter/{bookid}/{chapterId}.html")
return True
async def get_text(url):#访问正文
p_text = ""
r = await fetch_get(url,headers=headers)
soup = BeautifulSoup(r, 'html.parser')
name = soup.find(class_="title_txtbox").text #标题
contents = soup.find('div', class_="content") #正文
content = contents.find_all("p")
for conten in content:
p_text += conten.text+"\n\n"
name = re.sub('[?|&]',"",name.strip()) #正则过滤内容
#将标题和内容写进去
file_name = os.path.join("txt",name+".txt")
await sava_file(file_name,p_text)
await asyncio.sleep(2)
print(name)
async def sava_file(name,text):
with open(name,"w",encoding="utf8") as f:
f.write(text)
async def main():
loop = asyncio.get_running_loop()
task = [asyncio.ensure_future(get_text(url)) for url in url_list]
await asyncio.gather(*task)
Chapter = asyncio.run(get_list("1249806"))#访问章节
print("长度:"+str(len(url_list)))
print(url_list)
if Chapter:
asyncio.run(main())
多线程爬某小说网:https://www.52pojie.cn/thread-1834722-1-1.html
基于同一个源码只不过改成异步实现秒爬,没找到网络请求阻塞的好处理方法,所以我学了异步