如何实现一个实时转发 Stream 流的接口

查看 12|回复 0
作者:WindyXu   
想写一个实时转发 Stream 流内容的接口,不怎么懂 stream 流相关的知识
之前的接口是
async def index(path: str, request: Request):
    data = await request.json()
    resp = c.send_request(path, data)
    headers = dict(resp.headers)
    async def generate():
        for chunk in resp.iter_lines():
            if chunk:
                yield chunk + b'\n\n'
    return StreamingResponse(generate(), headers=headers)
问题是,这个会等接受完全部的 Stream 流信息,才会转发,想实现的效果是实时转发 stream 流信息,具体一点就是,接收到一行或者指定大小的信息块,就转发出去。
想法是将 resp = c.send_request(path, data)这块改造成异步的,然后实时转发。
目前的代码是
async def index(path: str, request: Request):
    data = await request.json()
    async with httpx.AsyncClient() as client:
        async with client.stream('POST', path, json=data) as response:
            resp_headers = dict(response.headers)
            async def generate():
                async for line in response.aiter_lines():
                    if line:
                        yield line.encode('utf-8') + b'\n\n'
            return StreamingResponse(generate(), headers=resp_headers)
但是会报:

httpx.StreamClosed: Attempted to read or stream content, but the stream has been closed.

不知道怎么改,求赐教
您需要登录后才可以回帖 登录 | 立即注册

返回顶部