GenStage: Как обрабатывать ситуации, когда производитель не может предоставить события?

Следующий сценарий: производитель GenStage обрабатывает поток Twitter (используя Stream API и ExTwitter) и предоставляет набор твитов (макс. Спрос, который запрашивает потребитель) потребителю GenStage. Затем потребитель просто печатает их.

Следующая проблема: я ищу конкретные твиты, поэтому не всегда доступны новые твиты. Если производитель GenStage возвращает пустой список событий, потребитель перестанет спрашивать. Смотрите этот вопрос, и Хосе Валимс ответит больше.

Я не уверен, как решить эту проблему. Любая помощь с благодарностью. Это то, что я до сих пор:

defmodule MyApp.TwitterProducer do
  use GenStage
  alias MyApp.TwitterStream

  def start_link(:ok) do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    # This creates a regular Elixir Stream
    # I use this as the state so that not every
    # time the consumer asks for new data
    # a new stream is initiated
    stream = TwitterStream.get_stream
    {:producer, stream}
  end

  def handle_demand(demand, stream) do
    # Take tweets from the stream and 
    # turn them into a list. Then return 
    # them to the consumer
    chunk = Stream.take(stream, demand)
    events = Enum.to_list(chunk)
    {:noreply, events, stream}
  end


  def handle_info(_msg, state) do
    # I as getting an "wrong message" error 
    # before I implemented this function myself
    # It does nothing special to my case
    {:noreply, [], state}
  end

end

defmodule MyApp.TwitterConsumer do
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    {:consumer, :the_state_does_not_matter}
  end

  def handle_events(events, _from, state) do

    Process.sleep(3000)
    IO.inspect(events)

    # We are a consumer, so we would never emit items.
    {:noreply, [], state}
  end

end

# Let's fire this thing up
{:ok, p} = GenStage.start_link(MyApp.TwitterProducer, :ok, name: MyApp.TwitterProducer)
{:ok, c} = GenStage.start_link(MyApp.TwitterConsumer, :ok, name: MyApp.TwitterConsumer)
GenStage.sync_subscribe(c, to: p, max_demand: 3)

Что происходит: это работает некоторое время, а затем останавливается. Как я понимаю, как только появляется пустой список событий, возвращаемый производителем.

Редактировать: Достаточно интересно: если я установлю требование в 1, оно продолжит работать. Но это намного, намного медленнее, чем прямой запрос к API Twitter Stream. То есть я получаю в 10 раз меньше твитов. Моя теория заключается в том, что это связано с повторным Stream.take звонки вместо того, чтобы просто звонить Enum.to_list для всего потока. Но я нахожу это все еще очень запутанным. Есть идеи, что мне не хватает?

1 ответ

В документации о значительном (но, к сожалению, не выделенном жирным шрифтом) предложении есть GenStage.handle_demand/2:

Производитель должен либо сохранить спрос, либо вернуть запрошенные события.

Тем не менее, вместо блока на Stream.take можно было бы явно знать, что задача может быть блокировать и обрабатывать дело, собирая спрос в таком случае, используя Task.await/2 с разумным временем ожидания (возможно Task.yield/2 может быть полезен в более сложных проверках, но здесь это кажется излишним.)

Из документации:

Если вы не хотите, чтобы задача провалилась, вы должны изменить heavy_fun/0 кодировать так же, как вы добились бы этого, если бы у вас не было асинхронного вызова. Например, чтобы либо вернуть {:ok, val} | :error результаты или, в более крайних случаях, с помощью try/rescue,

Однако в документации отсутствуют примеры. OTOH, здесь, вероятно, было бы проще просто вернуть пустой список и забыть о сборе спроса:

def handle_demand(demand, stream) do
  try do
    task = Task.async(fn ->
      stream
      |> Stream.take(demand)
      |> Enum.to_list()
    end)
    Task.await(task, 1000) # one sec
  catch
    :exit, {:timeout, {Task, :await, [_, 1000]}} ->
      {:noreply, [], stream}
  else
    events when is_list(events) ->
      {:noreply, events, stream}
  end
end
Другие вопросы по тегам