Python Faust await agent.ask() не возвращает ответ и зависает вызывающая его функция
Я новичок в Python, играю с вещами, пытаюсь обмениваться услугами Python через Kafka, используя Faust. Итак, у меня есть небольшой проект PoC. Определение приложения Faust:
# app.py
import faust as f
from models import ReadRequest, ReadResponse
app = f.App("faust-app", broker="kafka://localhost:9092", store="rocksdb://")
topics = {"read-request": app.topic("read-request", value_type=ReadRequest)}
def get_app() -> f.types.AppT:
return app
def get_topic(name: str) -> f.types.TopicT:
return topics[name]
Мой агент чтения БД:
# reader.py
import pandas as pd
from pymongo import MongoClient
from app import get_app, get_topic
client = MongoClient()
app = get_app()
req_topic = get_topic("read-request")
@app.agent(req_topic)
async def read_request(requests):
async for request in requests:
db = client.test
coll = db[request["collection"]]
result = coll.find(request["query"])
df = pd.DataFrame(result)
response = {
"id": request["id"],
"data": list(df.loc[:, df.columns != "_id"].to_dict(orient="records")),
}
print(response) # debug <1>
yield response
Модельные определения:
# models.py
import faust as f
class ReadRequest(f.Record):
id: int
collection: str
query: dict
Контрольная работа agent.ask()
# test.py
import asyncio
from reader import read_request
from models import ReadRequest
async def run():
result = await read_request.ask(ReadRequest(id=1, collection="test", query={}))
print(result) # debug <2>
if __name__ == "__main__":
asyncio.run(run())
Итак, у меня есть Zookeeper, Kafka Server, Mongodb и Faust работник reader
Бег. Все использует из коробки конфиги.
Когда я бегу python3 test.py
понятно debug <1>
вывод на печать как положено, но debug <2>
никогда не уходит, и исполнение висит там.
Поэтому я предполагаю, что все делаю правильно. У кого-нибудь есть какие-нибудь подсказки?