Python Faust await agent.ask() не возвращает ответ и зависает вызывающая его функция

Я новичок в Python, играю с вещами, пытаюсь обмениваться услугами Python через Kafka, используя Faust. Итак, у меня есть небольшой проект PoC. Определение приложения Faust:

# app.py

import faust as f
from models import ReadRequest, ReadResponse

app = f.App("faust-app", broker="kafka://localhost:9092", store="rocksdb://")

topics = {"read-request": app.topic("read-request", value_type=ReadRequest)}


def get_app() -> f.types.AppT:
    return app


def get_topic(name: str) -> f.types.TopicT:
    return topics[name]

Мой агент чтения БД:

# reader.py

import pandas as pd
from pymongo import MongoClient
from app import get_app, get_topic

client = MongoClient()

app = get_app()
req_topic = get_topic("read-request")


@app.agent(req_topic)
async def read_request(requests):
    async for request in requests:
        db = client.test
        coll = db[request["collection"]]
        result = coll.find(request["query"])
        df = pd.DataFrame(result)
        response = {
            "id": request["id"],
            "data": list(df.loc[:, df.columns != "_id"].to_dict(orient="records")),
        }
        print(response) # debug <1>
        yield response

Модельные определения:

# models.py

import faust as f


class ReadRequest(f.Record):
    id: int
    collection: str
    query: dict

Контрольная работа agent.ask()

# test.py

import asyncio
from reader import read_request
from models import ReadRequest


async def run():
    result = await read_request.ask(ReadRequest(id=1, collection="test", query={}))
    print(result) # debug <2>


if __name__ == "__main__":
    asyncio.run(run())

Итак, у меня есть Zookeeper, Kafka Server, Mongodb и Faust работник reader Бег. Все использует из коробки конфиги.

Когда я бегу python3 test.py понятно debug <1> вывод на печать как положено, но debug <2> никогда не уходит, и исполнение висит там.

Faust Docs говорят

Поэтому я предполагаю, что все делаю правильно. У кого-нибудь есть какие-нибудь подсказки?

0 ответов

Другие вопросы по тегам