Почему мой потребитель в GenStage не срабатывает?
Когда я бегу iex
все данные потребляются DataConsumer. Теперь, если я изменю порядок DataConsumerLevel2
с DataConsumer
тогда все данные потребляются DataConsumerLevel2
,
Это почему? Должен ли Диспетчер отправлять данные потребителям на основе их фильтра?
Вот код:
Это в моем приложении:
defmodule Mail.Application do
@moduledoc false
use Application
use Supervisor
def start(_type, _args) do
children = [
%{ id: DataProducer, start: {DataProducer, :start_link, [[]]} },
%{ id: DataConsumer, start: {DataConsumer, :start_link, [[]]} },
%{ id: DataConsumerLevel2, start: {DataConsumerLevel2, :start_link, [[]]} },
]
opts = [strategy: :one_for_one, name: Mail.Supervisor]
Supervisor.start_link(children, opts)
end
end
Потребительский:
require Logger;
defmodule DataConsumer do
use GenStage
def start_link([]) do
GenStage.start_link(__MODULE__, :any_state)
end
def init(state) do
{:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 100 && n < 200 end}]}
end
def handle_events(events, _from, state) do
for event <- events do
:timer.sleep(250)
Logger.error inspect( {self(), event, state} )
end
{:noreply, [], state}
end
end
Потребитель 2
require Logger;
defmodule DataConsumerLevel2 do
use GenStage
def start_link([]) do
GenStage.start_link(__MODULE__, :any_state)
end
def init(state) do
{:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 50 && n < 100 end}]}
end
def handle_events(events, _from, state) do
for event <- events do
:timer.sleep(500)
Logger.warn inspect( {self(), event, state} )
end
{:noreply, [], state}
end
end
Режиссер
require Logger;
defmodule DataProducer do
use GenStage
def start_link([]) do
GenStage.start_link(__MODULE__, 0, name: __MODULE__)
end
# {:queue.new, demand, size}
def init(counter) do
{:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_demand(demand, state) do
events = Enum.to_list(state..state + demand + 1)
# Logger.info "demand is: #{inspect(demand)}, state is #{inspect(state)}"
{:noreply, events, (state + demand)}
end
end