代码如下:
main.py
from fastapi import FastAPI
from test_workflow import test
app = FastAPI(
title="Data Services",
description="REST API for data services",
version="0.1.0",
)
@app.get("/")
async def root():
return {"message": "Data Services"}
@app.get("/test")
def trigger():
test()
return "completed"
test_workflow.py
from prefect import flow, task
import time
@task
def sleep1():
print("sleep1")
time.sleep(5) //模拟需要一定运行时间的计算流
return 1
@flow()
def test():
task_1 = sleep1.submit()
x = task_1.result()
print(x)
需首先运行下列命令开启服务:
$ prefect server start
$ uvicorn main:app --reload
如果尝试 GET /test ,会得到如下错误:
RuntimeError: is bound to a different event loop
如果 FastAPI 中定义为异步函数async def trigger():, 则顺利运行不会报错,但问题是这样就丧失了并行性,FastAPI 一次只能处理一个 /test 请求,显然不符合需求。
想了很多办法都没法解决,有没有懂这方面的帮忙看看?