Невозможно пропустить поток через пользовательские этапы с "Flow.through_specs/3"

Я пытаюсь включить GenStage в конвейер потока. Однако это приводит к исключению. Если я правильно понимаю, Flow.through_specs/3 будет порождать несколько этапов и разделять входящие элементы соответственно. Я что-то пропустил? Я использую Flow v0.14.2.

Исключение:

{:down, {%ArgumentError{message: "the :partition option is required when subscribing to a producer with partition dispatcher"}, [{GenStage.PartitionDispatcher, :subscribe, 3, [file: 'lib/gen_stage/dispatchers/partition_dispatcher.ex', line: 143]}, {GenStage, :dispatcher_callback, 3, [file: 'lib/gen_stage.ex', line: 2160]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 637]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 711]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}}
Last message: {:DOWN, #Reference<0.1791475268.948436996.112315>, :process, #PID<0.183.0>, {:down, {%ArgumentError{message: "the :partition option is required when subscribing to a producer with partition dispatcher"}, [{GenStage.PartitionDispatcher, :subscribe, 3, [file: 'lib/gen_stage/dispatchers/partition_dispatcher.ex', line: 143]}, {GenStage, :dispatcher_callback, 3, [file: 'lib/gen_stage.ex', line: 2160]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 637]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 711]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}]}}}
State: {%{}, %{done?: true, producers: %{}, trigger: #Function<2.13930487/3 in Flow.Window.Global.materialize/5>}, {1, 6}, [], #Function<42.60253262/4 in Flow.Materialize.mapper_ops/1>}

Вот конвейер:

specs = [
  {
    {PBFParser.Decompressor, []},
    []
  }
]

PBFParser.Reader.stream("test.osm.pbf")
    |> Stream.drop(1)
    |> Stream.take(1_000)
    |> Flow.from_enumerable(max_demand: 100)
    |> Flow.through_specs(specs, max_demand: 5, stages: 6)
    |> Flow.partition(max_demand: 5, stages: 12)
    |> Flow.map(&PBFParser.Decoder.decode_block/1)
    |> Flow.partition(window: Flow.Window.count(20))
    |> Flow.reduce(fn -> [] end, fn batch, total ->
      [batch | total]
    end)
    |> Flow.emit(:state)
    |> Flow.partition(max_demand: 20, stages: 2)
    |> Flow.each(fn item -> IO.inspect(length(item)) end)
    |> Flow.run()

GenStage:

defmodule PBFParser.Decompressor do
  use GenStage

  def start_link(opts \\ []) do
    GenStage.start_link(__MODULE__, opts)
  end

  def init(_) do
    {:producer_consumer, :zlib.open()}
  end

  def handle_events(events, _from, z) do
    ...
    {:noreply, result, z}
  end
end

1 ответ

Ваш потребитель подписывается на GenStage.PartitionDispatcher это по умолчанию для Flow.partition/2,

Вы объявляете два раздела (с stages: 2) вариант.

Следовательно, ваш потребитель должен иметь partition: ... параметры переданы как показано здесь.

Другие вопросы по тегам