Почему мой потребитель в GenStage не срабатывает?

Когда я бегу iex все данные потребляются DataConsumer. Теперь, если я изменю порядок DataConsumerLevel2 с DataConsumer тогда все данные потребляются DataConsumerLevel2,

Это почему? Должен ли Диспетчер отправлять данные потребителям на основе их фильтра?

Вот код:

Это в моем приложении:

defmodule Mail.Application do
  @moduledoc false

  use Application
  use Supervisor

  def start(_type, _args) do
    children = [
      %{ id: DataProducer, start: {DataProducer, :start_link, [[]]} },
      %{ id: DataConsumer, start: {DataConsumer, :start_link, [[]]} },
      %{ id: DataConsumerLevel2, start: {DataConsumerLevel2, :start_link, [[]]} },
    ]

    opts = [strategy: :one_for_one, name: Mail.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Потребительский:

require Logger;
defmodule DataConsumer do
  use GenStage

  def start_link([]) do
    GenStage.start_link(__MODULE__, :any_state)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 100 && n < 200 end}]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      :timer.sleep(250)
      Logger.error inspect( {self(), event, state} )
    end
    {:noreply, [], state}
  end
end

Потребитель 2

require Logger;
defmodule DataConsumerLevel2 do
  use GenStage

  def start_link([]) do
    GenStage.start_link(__MODULE__, :any_state)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 50 && n < 100 end}]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      :timer.sleep(500)
      Logger.warn inspect( {self(), event, state} )
    end
    {:noreply, [], state}
  end

end

Режиссер

require Logger;
defmodule DataProducer do
  use GenStage

  def start_link([]) do
    GenStage.start_link(__MODULE__, 0, name: __MODULE__)
  end

  # {:queue.new, demand, size}
  def init(counter) do
    {:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
  end

  def handle_demand(demand, state) do 
    events = Enum.to_list(state..state + demand + 1)
    # Logger.info "demand is: #{inspect(demand)}, state is #{inspect(state)}"
    {:noreply, events, (state + demand)}
  end

end

0 ответов

Другие вопросы по тегам