Faust пример публикации в теме кафки

Мне любопытно, как вы должны выразить, что хотите, чтобы сообщение было доставлено на тему Кафки в faust. Пример в их readme, похоже, не пишет в тему:

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

Я бы ожидал hello.send в приведенном выше коде, чтобы опубликовать сообщение в теме, но оно не отображается.

Есть много примеров чтения из тем, и много примеров использования клика для отправки специального сообщения. После просмотра документов я не вижу четких примеров публикации тем в коде. Я просто схожу с ума и приведенный выше код должен работать?

6 ответов

В send()Функция является правильным для вызова для записи в темы. Вы даже можете указать конкретный раздел, как эквивалентный вызов Java API.

Вот ссылка на send() метод:

https://faust.readthedocs.io/en/latest/reference/faust.topics.html

Вы можете использовать sinkсказать Фаусту, куда доставить результаты работы агента. Вы также можете использовать сразу несколько тем в качестве раковин, если хотите.

@app.agent(topic_to_read_from, sink=[destination_topic])
async def fetch(records):
    async for record in records:
        result = do_something(record)
        yield result

Если вам нужен только производитель Faust (не объединенный с потребителем / приемником), исходный вопрос действительно имеет правильный фрагмент кода, вот полностью функциональный скрипт, который публикует сообщения в теме Kafka 'faust_test', которая потребляется любым Kafka/ Потребитель Фауста.

Запустите приведенный ниже код следующим образом: python faust_producer.py worker

"""Simple Faust Producer"""
import faust

if __name__ == '__main__':
    """Simple Faust Producer"""

    # Create the Faust App
    app = faust.App('faust_test_app', broker='localhost:9092')
    topic = app.topic('faust_test')

    # Send messages
    @app.timer(interval=1.0)
    async def send_message(message):
        await topic.send(value='my message')

    # Start the Faust App
    app.main()

Итак, мы просто столкнулись с необходимостью отправить сообщение в тему, отличную от sink темы.

Самый простой способ, который мы нашли: foo = await my_topic.send_soon(value="wtfm8").

Вы также можете использовать send непосредственно, как показано ниже, с помощью цикла событий asyncio.

loop = asyncio.get_event_loop()
foo = await ttopic.send(value="wtfm8??")
loop.run_until_complete(foo)

Не знаю, насколько это актуально, но я столкнулся с этой проблемой, когда пытался выучить Фауста. Из того, что я прочитал, вот что происходит:

      topic = app.topic('hello-topic', value_type=Greeting)

Заблуждение здесь в том, что созданная вами тема - это тема, которую вы пытаетесь использовать / прочитать. Созданная вами тема ничего не делает.

      await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

по сути, это создает промежуточный kstream, который отправляет значения вашей функции hello (приветствия). def hello(...) будет вызываться при появлении нового сообщения в потоке и обработает отправляемое сообщение.

      @app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

Он получает поток kafka от hello.send (...) и просто выводит его на консоль (нет вывода в созданную «тему»). Здесь вы можете отправить сообщение в новую тему. поэтому вместо печати вы можете:

      topic.send(value = "my message!")

Альтернативно:

Вот что вы делаете:

  1. example_sender() отправляет сообщение hello(...) (через промежуточный kstream)
  2. hello(...) забирает сообщение и печатает его ВНИМАНИЕ: сообщения не отправляются в правильную тему

Вот что можно сделать:

  1. example_sender() отправляет сообщение hello(...) (через промежуточный kstream)

  2. привет (...) берет сообщение и печатает

  3. hello(...) ТАКЖЕ отправляет новое сообщение в созданную тему (при условии, что вы пытаетесь преобразовать исходные данные)

             app = faust.App('hello-app', broker='kafka://localhost')
     topic = app.topic('hello-topic', value_type=Greeting)
     output_topic = app.topic('test_output_faust', value_type=str)
    
     @app.agent(topic)
     async def hello(greetings):
         async for greeting in greetings:
             new_message = f'Hello from {greeting.from_name} to {greeting.to_name}'
             print(new_message)
             await output_topic.send(value=new_message)
    

Я нашел решение, как отправлять данные в темы кафки с помощью Faust, но я не очень понимаю, как это работает.

В «Фаусте» для этого есть несколько способов:send(), cast(), ask_nowait(), ask(). В документации они называются операциями RPC.

После создания задачи отправки необходимо запустить приложение Faust в режиме Client-Only Mode. (start_client(), maybe_start_client())

Следующий код (функция product()) демонстрирует их применение (обратите внимание на комментарии):

      import asyncio

import faust


class Greeting(faust.Record):
    from_name: str
    to_name: str


app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
result_topic = app.topic('result-topic', value_type=str)


@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        s = f'Hello from {greeting.from_name} to {greeting.to_name}'
        print(s)
        yield s


async def produce(to_name):
    # send - universal method for sending data to a topic
    await hello.send(value=Greeting(from_name='SEND', to_name=to_name), force=True)
    await app.maybe_start_client()
    print('SEND')

    # cast - allows you to send data without waiting for a response from the agent
    await hello.cast(value=Greeting(from_name='CAST', to_name=to_name))
    await app.maybe_start_client()
    print('CAST')

    # ask_nowait - it seems to be similar to cast
    p = await hello.ask_nowait(
        value=Greeting(from_name='ASK_NOWAIT', to_name=to_name),
        force=True,
        reply_to=result_topic
    )
    # without this line, ask_nowait will not work; taken from the ask implementation
    await app._reply_consumer.add(p.correlation_id, p)
    await app.maybe_start_client()
    print(f'ASK_NOWAIT: {p.correlation_id}')

    # blocks the execution flow
    # p = await hello.ask(value=Greeting(from_name='ASK', to_name=to_name), reply_to=result_topic)
    # print(f'ASK: {p.correlation_id}')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(produce('Faust'))

Запускаем Fast worker командойfaust -A <example> worker

Затем мы можем запустить клиентскую часть приложения и проверить, что все работает:python <example.py>

Вывод <example.py>:

      SEND
CAST
ASK_NOWAIT: bbbe6795-5a99-40e5-a7ad-a9af544efd55

Стоит отметить, что вы также увидите трассировку какой-то ошибки, возникшей после доставки, которая не мешает работе программы (так кажется)

Выход рабочего Faust:

      [2022-07-19 12:06:27,959] [1140] [WARNING] Hello from SEND to Faust 
[2022-07-19 12:06:27,960] [1140] [WARNING] Hello from CAST to Faust 
[2022-07-19 12:06:27,962] [1140] [WARNING] Hello from ASK_NOWAIT to Faust 

Я не понимаю, почему это так работает, почему это так сложно и почему в документации об этом очень мало написано.

Другие вопросы по тегам