GenStage: плохое возвращаемое значение
Я пытаюсь реализовать чрезвычайно простой GenStage с целью убедиться, что я его понимаю и могу заставить его работать, прежде чем я начну знакомить с логикой своего конкретного приложения. Я получаю ошибку при возврате данных от производителя. Данные выглядят правильно для меня. Я читал, что второе значение в возвращаемом кортеже должно быть списком, и, похоже, оно одно.
Вот моя реализация GenStage:
defmodule CoachActivity.Event.Producer do
use GenStage
def start_link(page) do
GenStage.start_link(CoachActivity.Event.Producer, page)
end
def init(page) do
{:producer, page}
end
def handle_demand(demand, page) when demand > 0 do
# in here we need to run the ecto query for the next X items?
events = get_fake(page)
IO.puts "PRODUCER: preparing to return events"
IO.puts "PRODUCER: page will be: #{page + 1}"
{:no_reply, events, page + 1}
end
defp get_fake(page) do
IO.puts "PRODUCER: generating events for page #{page}"
[
"Event One",
"Event Two",
"Event Three",
"Event Four",
"Event Five"
]
end
end
defmodule CoachActivity.Event.ProducerConsumer do
use GenStage
def start_link(page) do
GenStage.start_link(CoachActivity.Event.ProducerConsumer, page)
end
def init(number) do
{:producer_consumer, number}
end
def handle_events(events, _from, number) do
# in here we will do the transformation of events
IO.puts "PRODUCER_CONSUMER: importing a bunch of events"
events = Enum.map(events, fn(event) ->
CoachActivity.Event.ImportEvent.import(event)
end)
{:noreply, events, number}
end
end
defmodule CoachActivity.Event.Consumer do
use GenStage
def start_link() do
GenStage.start_link(CoachActivity.Event.Consumer, :ok)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
# Wait for a second.
Process.sleep(1000)
# Inspect the events.
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
defmodule CoachActivity.Event.ImportEvent do
def import(event) do
IO.puts "Converting an event: #{event}"
end
end
defmodule KickStart do
def start do
{:ok, producer} = CoachActivity.Event.Producer.start_link(0)
{:ok, producer_consumer} = CoachActivity.Event.ProducerConsumer.start_link(2)
{:ok, consumer} = CoachActivity.Event.Consumer.start_link()
GenStage.sync_subscribe(consumer, to: producer_consumer)
GenStage.sync_subscribe(producer_consumer, to: producer)
end
end
Когда я делаю
KickStart.start
Я получаю этот вывод сообщения об ошибке:
iex(4)> KickStart.start
PRODUCER: generating events for page 0
{:ok, #Reference<0.2077982655.942931969.179180>}
PRODUCER: preparing to return events
PRODUCER: page will be: 1
iex(5)> [error] GenServer #PID<0.1066.0> terminating
** (stop) bad return value: {:no_reply, ["Event One", "Event Two", "Event Three", "Event Four", "Event Five"], 1}
Last message: {:"$gen_producer", {#PID<0.1067.0>, #Reference<0.2077982655.942931969.179180>}, {:ask, 1000}}
State: 0
** (EXIT from #PID<0.1002.0>) evaluator process exited with reason: bad return value: {:no_reply, ["Event One", "Event Two", "Event Three", "Event Four", "Event Five"], 1}
Я борюсь с тем, чтобы: а) понять, что означает это сообщение об ошибке, и, конечно, почему оно проявляется.