请大佬帮忙瞄一眼我这个丑陋的异步协程代码

查看 63|回复 1
作者:ohayoo   
迫于机器配置太低,用多进程多线程,一秒钟才处理几百条 uri ,于是想着来用异步协程来试下,看着文档写出了这样一个丑陋的代码,搞了几万条 uri 测试了下,好像也没啥问题,不打印结果到屏幕的话,一秒钟差不多可以处理 1000 条,大概有这么几个步骤:
  • 1 、uris.txt 有几千万条 uri ,于是每次读 1000 行,避免内存占用过多
  • 2 、利用 Semaphore 来控制并发数量为 100 ,避免 API 端把我给 ban 了
  • 3 、复用 session

    我现在的困惑是:
  • 1 、我这样写,上面 3 点真的有达到目的吗?
  • 2 、最后面 if name == 'main'下面那一坨,在 for 循环里面写 asyncio.run()总觉得怪怪的,但是不这样写,又不知道要怎么写
  • 3 、弱弱的再问个小白问题,不是说事件循环才是 asyncio 的核心嘛?可我这里面也没用 asyncio.get_event_loop(),为啥也能跑得这么顺畅呢?
  • 4 、如果要让代码优雅一点应该怎么修改呢?

    耽误大佬周五下午一点点时间,帮忙瞅一眼,不胜感激!
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import asyncio
    from asyncio import Semaphore
    from aiohttp import ClientSession
    from itertools import islice
    def get_lines_iterator(filename, n=1000):
        with open(filename) as fp:
            while True:
                lines = list(islice(fp, n))
                if lines:
                    yield lines
                else:
                    break
    async def delete_file(uri: str,
                          session: ClientSession,
                          sem: Semaphore) -> int:
        headers = {'Authorization': 'xxxxxxxxxxx'}
        url = api + uri
        async with sem, session.delete(url, headers=headers) as response:
            return uri, response.status
    # 写法 1:
    # async def main(uris):
    #     sem = Semaphore(100)
    #     async with ClientSession() as session:
    #         tasks = [delete_file(uri, session, sem) for uri in uris]
    #         await asyncio.gather(*tasks)
    # 写法 2:
    async def main(uris):
        sem = Semaphore(100)
        async with ClientSession() as session:
            async with asyncio.TaskGroup() as group:
                result = [group.create_task(delete_file(
                    uri, session, sem)) for uri in uris]
                return result
    if __name__ == '__main__':
        for lines in get_lines_iterator("uris.txt"):
            uris = [uri.strip() for uri in lines]
            result = asyncio.run(main(uris))
            for x in result:
                print(x.result())

    URI, uris, Async, SEM

  • fzzff   
    以下是对代码进行优化的建议:
    1. 使用异步文件读取:可以使用`aiofiles`库来实现异步文件读取,从而避免阻塞事件循环。这将使得文件的读取操作也能并发进行,提高效率。
    2. 使用异步上下文管理器:`aiohttp`支持异步上下文管理器,你可以使用`async with`语法来创建`ClientSession`,这样会更加简洁,而且会在任务完成后自动关闭会话。
    3. 使用`asyncio.as_completed`:在并发执行任务时,可以使用`asyncio.as_completed`来获取已完成的任务,而不是等待所有任务都完成再处理结果。这样可以更早地得到一部分结果,并在需要时立即处理。
    4. 异常处理:对于异步代码,异常处理十分重要。可以在任务执行时捕获异常,并记录错误信息,以便后续分析和处理。
    下面是优化后的代码:
    ```python
    import asyncio
    import aiofiles
    from aiohttp import ClientSession
    async def delete_file(session: ClientSession, sem: asyncio.Semaphore, uri: str):
    headers = {'Authorization': 'xxxxxxxxxxx'}
    url = api + uri
    try:
    async with sem:
    async with session.delete(url, headers=headers) as response:
    return uri, response.status
    except Exception as e:
    # 处理异常,比如记录错误日志
    print(f"Error occurred while processing {uri}: {str(e)}")
    return uri, None
    async def main(uris):
    sem = asyncio.Semaphore(100)
    async with ClientSession() as session:
    tasks = [delete_file(session, sem, uri) for uri in uris]
    for future in asyncio.as_completed(tasks):
    uri, status = await future
    if status is not None:
    print(f"{uri}: {status}")
    else:
    print(f"{uri}: Error")
    async def read_uris(filename):
    async with aiofiles.open(filename, mode='r') as fp:
    async for line in fp:
    yield line.strip()
    if __name__ == '__main__':
    asyncio.run(main(read_uris("uris.txt")))
    ```
    在优化后的代码中,我们使用`aiofiles`库来异步读取文件,并使用`async for`来逐行获取 URI 。同时,我们使用`asyncio.as_completed`来处理已完成的任务,这样在某些任务执行较慢时,可以更早地输出结果,提高实时性。另外,我们在`delete_file`函数中增加了异常处理,确保在出现异常时不会导致整个任务中断。
    您需要登录后才可以回帖 登录 | 立即注册

    返回顶部