GenStage: Как обрабатывать ситуации, когда производитель не может предоставить события?
Следующий сценарий: производитель GenStage обрабатывает поток Twitter (используя Stream API и ExTwitter) и предоставляет набор твитов (макс. Спрос, который запрашивает потребитель) потребителю GenStage. Затем потребитель просто печатает их.
Следующая проблема: я ищу конкретные твиты, поэтому не всегда доступны новые твиты. Если производитель GenStage возвращает пустой список событий, потребитель перестанет спрашивать. Смотрите этот вопрос, и Хосе Валимс ответит больше.
Я не уверен, как решить эту проблему. Любая помощь с благодарностью. Это то, что я до сих пор:
defmodule MyApp.TwitterProducer do
use GenStage
alias MyApp.TwitterStream
def start_link(:ok) do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
# This creates a regular Elixir Stream
# I use this as the state so that not every
# time the consumer asks for new data
# a new stream is initiated
stream = TwitterStream.get_stream
{:producer, stream}
end
def handle_demand(demand, stream) do
# Take tweets from the stream and
# turn them into a list. Then return
# them to the consumer
chunk = Stream.take(stream, demand)
events = Enum.to_list(chunk)
{:noreply, events, stream}
end
def handle_info(_msg, state) do
# I as getting an "wrong message" error
# before I implemented this function myself
# It does nothing special to my case
{:noreply, [], state}
end
end
defmodule MyApp.TwitterConsumer do
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
Process.sleep(3000)
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
# Let's fire this thing up
{:ok, p} = GenStage.start_link(MyApp.TwitterProducer, :ok, name: MyApp.TwitterProducer)
{:ok, c} = GenStage.start_link(MyApp.TwitterConsumer, :ok, name: MyApp.TwitterConsumer)
GenStage.sync_subscribe(c, to: p, max_demand: 3)
Что происходит: это работает некоторое время, а затем останавливается. Как я понимаю, как только появляется пустой список событий, возвращаемый производителем.
Редактировать: Достаточно интересно: если я установлю требование в 1, оно продолжит работать. Но это намного, намного медленнее, чем прямой запрос к API Twitter Stream. То есть я получаю в 10 раз меньше твитов. Моя теория заключается в том, что это связано с повторным Stream.take
звонки вместо того, чтобы просто звонить Enum.to_list
для всего потока. Но я нахожу это все еще очень запутанным. Есть идеи, что мне не хватает?
1 ответ
В документации о значительном (но, к сожалению, не выделенном жирным шрифтом) предложении есть GenStage.handle_demand/2
:
Производитель должен либо сохранить спрос, либо вернуть запрошенные события.
Тем не менее, вместо блока на Stream.take
можно было бы явно знать, что задача может быть блокировать и обрабатывать дело, собирая спрос в таком случае, используя Task.await/2
с разумным временем ожидания (возможно Task.yield/2
может быть полезен в более сложных проверках, но здесь это кажется излишним.)
Из документации:
Если вы не хотите, чтобы задача провалилась, вы должны изменить
heavy_fun/0
кодировать так же, как вы добились бы этого, если бы у вас не было асинхронного вызова. Например, чтобы либо вернуть{:ok, val} | :error
результаты или, в более крайних случаях, с помощьюtry/rescue
,
Однако в документации отсутствуют примеры. OTOH, здесь, вероятно, было бы проще просто вернуть пустой список и забыть о сборе спроса:
def handle_demand(demand, stream) do
try do
task = Task.async(fn ->
stream
|> Stream.take(demand)
|> Enum.to_list()
end)
Task.await(task, 1000) # one sec
catch
:exit, {:timeout, {Task, :await, [_, 1000]}} ->
{:noreply, [], stream}
else
events when is_list(events) ->
{:noreply, events, stream}
end
end