Faust пример публикации в теме кафки
Мне любопытно, как вы должны выразить, что хотите, чтобы сообщение было доставлено на тему Кафки в faust. Пример в их readme, похоже, не пишет в тему:
import faust
class Greeting(faust.Record):
from_name: str
to_name: str
app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
@app.agent(topic)
async def hello(greetings):
async for greeting in greetings:
print(f'Hello from {greeting.from_name} to {greeting.to_name}')
@app.timer(interval=1.0)
async def example_sender(app):
await hello.send(
value=Greeting(from_name='Faust', to_name='you'),
)
if __name__ == '__main__':
app.main()
Я бы ожидал hello.send
в приведенном выше коде, чтобы опубликовать сообщение в теме, но оно не отображается.
Есть много примеров чтения из тем, и много примеров использования клика для отправки специального сообщения. После просмотра документов я не вижу четких примеров публикации тем в коде. Я просто схожу с ума и приведенный выше код должен работать?
6 ответов
В send()
Функция является правильным для вызова для записи в темы. Вы даже можете указать конкретный раздел, как эквивалентный вызов Java API.
Вот ссылка на send()
метод:
https://faust.readthedocs.io/en/latest/reference/faust.topics.html
Вы можете использовать sink
сказать Фаусту, куда доставить результаты работы агента. Вы также можете использовать сразу несколько тем в качестве раковин, если хотите.
@app.agent(topic_to_read_from, sink=[destination_topic])
async def fetch(records):
async for record in records:
result = do_something(record)
yield result
Если вам нужен только производитель Faust (не объединенный с потребителем / приемником), исходный вопрос действительно имеет правильный фрагмент кода, вот полностью функциональный скрипт, который публикует сообщения в теме Kafka 'faust_test', которая потребляется любым Kafka/ Потребитель Фауста.
Запустите приведенный ниже код следующим образом:
python faust_producer.py worker
"""Simple Faust Producer"""
import faust
if __name__ == '__main__':
"""Simple Faust Producer"""
# Create the Faust App
app = faust.App('faust_test_app', broker='localhost:9092')
topic = app.topic('faust_test')
# Send messages
@app.timer(interval=1.0)
async def send_message(message):
await topic.send(value='my message')
# Start the Faust App
app.main()
Итак, мы просто столкнулись с необходимостью отправить сообщение в тему, отличную от
sink
темы.
Самый простой способ, который мы нашли:
foo = await my_topic.send_soon(value="wtfm8")
.
Вы также можете использовать
send
непосредственно, как показано ниже, с помощью цикла событий asyncio.
loop = asyncio.get_event_loop()
foo = await ttopic.send(value="wtfm8??")
loop.run_until_complete(foo)
Не знаю, насколько это актуально, но я столкнулся с этой проблемой, когда пытался выучить Фауста. Из того, что я прочитал, вот что происходит:
topic = app.topic('hello-topic', value_type=Greeting)
Заблуждение здесь в том, что созданная вами тема - это тема, которую вы пытаетесь использовать / прочитать. Созданная вами тема ничего не делает.
await hello.send(
value=Greeting(from_name='Faust', to_name='you'),
)
по сути, это создает промежуточный kstream, который отправляет значения вашей функции hello (приветствия). def hello(...) будет вызываться при появлении нового сообщения в потоке и обработает отправляемое сообщение.
@app.agent(topic)
async def hello(greetings):
async for greeting in greetings:
print(f'Hello from {greeting.from_name} to {greeting.to_name}')
Он получает поток kafka от hello.send (...) и просто выводит его на консоль (нет вывода в созданную «тему»). Здесь вы можете отправить сообщение в новую тему. поэтому вместо печати вы можете:
topic.send(value = "my message!")
Альтернативно:
Вот что вы делаете:
- example_sender() отправляет сообщение hello(...) (через промежуточный kstream)
- hello(...) забирает сообщение и печатает его ВНИМАНИЕ: сообщения не отправляются в правильную тему
Вот что можно сделать:
example_sender() отправляет сообщение hello(...) (через промежуточный kstream)
привет (...) берет сообщение и печатает
hello(...) ТАКЖЕ отправляет новое сообщение в созданную тему (при условии, что вы пытаетесь преобразовать исходные данные)
app = faust.App('hello-app', broker='kafka://localhost') topic = app.topic('hello-topic', value_type=Greeting) output_topic = app.topic('test_output_faust', value_type=str) @app.agent(topic) async def hello(greetings): async for greeting in greetings: new_message = f'Hello from {greeting.from_name} to {greeting.to_name}' print(new_message) await output_topic.send(value=new_message)
Я нашел решение, как отправлять данные в темы кафки с помощью Faust, но я не очень понимаю, как это работает.
В «Фаусте» для этого есть несколько способов:send(), cast(), ask_nowait(), ask()
. В документации они называются операциями RPC.
После создания задачи отправки необходимо запустить приложение Faust в режиме Client-Only Mode. (start_client(), maybe_start_client()
)
Следующий код (функция product()) демонстрирует их применение (обратите внимание на комментарии):
import asyncio
import faust
class Greeting(faust.Record):
from_name: str
to_name: str
app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
result_topic = app.topic('result-topic', value_type=str)
@app.agent(topic)
async def hello(greetings):
async for greeting in greetings:
s = f'Hello from {greeting.from_name} to {greeting.to_name}'
print(s)
yield s
async def produce(to_name):
# send - universal method for sending data to a topic
await hello.send(value=Greeting(from_name='SEND', to_name=to_name), force=True)
await app.maybe_start_client()
print('SEND')
# cast - allows you to send data without waiting for a response from the agent
await hello.cast(value=Greeting(from_name='CAST', to_name=to_name))
await app.maybe_start_client()
print('CAST')
# ask_nowait - it seems to be similar to cast
p = await hello.ask_nowait(
value=Greeting(from_name='ASK_NOWAIT', to_name=to_name),
force=True,
reply_to=result_topic
)
# without this line, ask_nowait will not work; taken from the ask implementation
await app._reply_consumer.add(p.correlation_id, p)
await app.maybe_start_client()
print(f'ASK_NOWAIT: {p.correlation_id}')
# blocks the execution flow
# p = await hello.ask(value=Greeting(from_name='ASK', to_name=to_name), reply_to=result_topic)
# print(f'ASK: {p.correlation_id}')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(produce('Faust'))
Запускаем Fast worker командойfaust -A <example> worker
Затем мы можем запустить клиентскую часть приложения и проверить, что все работает:python <example.py>
Вывод <example.py>:
SEND
CAST
ASK_NOWAIT: bbbe6795-5a99-40e5-a7ad-a9af544efd55
Стоит отметить, что вы также увидите трассировку какой-то ошибки, возникшей после доставки, которая не мешает работе программы (так кажется)
Выход рабочего Faust:
[2022-07-19 12:06:27,959] [1140] [WARNING] Hello from SEND to Faust
[2022-07-19 12:06:27,960] [1140] [WARNING] Hello from CAST to Faust
[2022-07-19 12:06:27,962] [1140] [WARNING] Hello from ASK_NOWAIT to Faust
Я не понимаю, почему это так работает, почему это так сложно и почему в документации об этом очень мало написано.