GenStage: плохое возвращаемое значение

Я пытаюсь реализовать чрезвычайно простой GenStage с целью убедиться, что я его понимаю и могу заставить его работать, прежде чем я начну знакомить с логикой своего конкретного приложения. Я получаю ошибку при возврате данных от производителя. Данные выглядят правильно для меня. Я читал, что второе значение в возвращаемом кортеже должно быть списком, и, похоже, оно одно.

Вот моя реализация GenStage:

defmodule CoachActivity.Event.Producer do
  use GenStage

  def start_link(page) do
    GenStage.start_link(CoachActivity.Event.Producer, page)
  end

  def init(page) do
    {:producer, page}
  end

  def handle_demand(demand, page) when demand > 0 do
    # in here we need to run the ecto query for the next X items?
    events = get_fake(page)

    IO.puts "PRODUCER: preparing to return events"
    IO.puts "PRODUCER: page will be: #{page + 1}"

    {:no_reply, events, page + 1}
  end

  defp get_fake(page) do
    IO.puts "PRODUCER: generating events for page #{page}"
    [
      "Event One",
      "Event Two",
      "Event Three",
      "Event Four",
      "Event Five"
    ]
  end
end

defmodule CoachActivity.Event.ProducerConsumer do
  use GenStage

  def start_link(page) do
    GenStage.start_link(CoachActivity.Event.ProducerConsumer, page)
  end

  def init(number) do
    {:producer_consumer, number}
  end

  def handle_events(events, _from, number) do
    # in here we will do the transformation of events
    IO.puts "PRODUCER_CONSUMER: importing a bunch of events"

    events = Enum.map(events, fn(event) ->
      CoachActivity.Event.ImportEvent.import(event)
    end)

    {:noreply, events, number}
  end
end

defmodule CoachActivity.Event.Consumer do
  use GenStage

  def start_link() do
    GenStage.start_link(CoachActivity.Event.Consumer, :ok)
  end

  def init(:ok) do
    {:consumer, :the_state_does_not_matter}
  end

  def handle_events(events, _from, state) do
    # Wait for a second.
    Process.sleep(1000)

    # Inspect the events.
    IO.inspect(events)

    # We are a consumer, so we would never emit items.
    {:noreply, [], state}
  end
end

defmodule CoachActivity.Event.ImportEvent do
  def import(event) do
    IO.puts "Converting an event: #{event}"
  end
end

defmodule KickStart do
  def start do
    {:ok, producer} = CoachActivity.Event.Producer.start_link(0)
    {:ok, producer_consumer} = CoachActivity.Event.ProducerConsumer.start_link(2)
    {:ok, consumer} = CoachActivity.Event.Consumer.start_link()

    GenStage.sync_subscribe(consumer, to: producer_consumer)
    GenStage.sync_subscribe(producer_consumer, to: producer)
  end
end

Когда я делаю

KickStart.start

Я получаю этот вывод сообщения об ошибке:

iex(4)> KickStart.start
PRODUCER: generating events for page 0
{:ok, #Reference<0.2077982655.942931969.179180>}
PRODUCER: preparing to return events
PRODUCER: page will be: 1
iex(5)> [error] GenServer #PID<0.1066.0> terminating
** (stop) bad return value: {:no_reply, ["Event One", "Event Two", "Event Three", "Event Four", "Event Five"], 1}
Last message: {:"$gen_producer", {#PID<0.1067.0>, #Reference<0.2077982655.942931969.179180>}, {:ask, 1000}}
State: 0
** (EXIT from #PID<0.1002.0>) evaluator process exited with reason: bad return value: {:no_reply, ["Event One", "Event Two", "Event Three", "Event Four", "Event Five"], 1}

Я борюсь с тем, чтобы: а) понять, что означает это сообщение об ошибке, и, конечно, почему оно проявляется.

0 ответов

Другие вопросы по тегам