Elixir: настройка GenStages для удовлетворения динамических требований

У меня есть GenStage Producer <- Consumer для чтения и обработки чего-либо с сообщением из моей очереди Amazon SQS, что означает, что мой Consumer запрашивает спрос, когда ему нечего делать, а производитель просто получает и пытается извлечь события из амазонка. И сейчас он отлично работает для моего спроса, так как максимальное количество событий, обрабатываемых каждым потребителем, равно 1. Но если подумать о масштабируемости, я бы хотел установить max_demand выше для каждой стадии.

Мой первый подход состоял в том, чтобы увеличить max_demand до 10. Но потом пришло предостережение, согласно документации:

При реализации потребителей мы часто устанавливаем:max_demand и:min_demand в подписке. Параметр:max_demand указывает максимальное количество событий, которые должны быть в потоке, в то время как параметр:min_demand указывает минимальное пороговое значение, которое необходимо инициировать для увеличения спроса. Например, если:max_demand равно 1000 и:min_demand равно 750, потребитель первоначально запросит 1000 событий и запросит больше только после того, как получит не менее 250.

Это означает, что если в очереди только 1 событие, а другие не появляются или занимают много времени, мне придется подождать, пока не будет достигнуто 10, чтобы обработать это 1 событие. Это очень плохо для нашего бизнеса и, на мой взгляд, немного странно.

Поэтому мой вопрос:

Как я могу обойти это и установить максимум спроса выше 1, но заставить его обрабатывать любой номер входящего запроса?

(бонусный вопрос): Почему GenStage был разработан таким образом? В чем выгода?

Вот абстракция, которую я сделал для использования из событий производителя... Я думаю, что код производителя более прост, поскольку он просто предоставляет данные при запросе, но если кто-то сочтет это необходимым, я могу добавить сюда:

defmodule MobileApi.GenstageWorkers.AwsSqsConsumer do
  use GenStage
  require Logger
  alias ExAws.SQS

  @ex_aws_sqs Application.get_env(
                :mobile_api,
                :ex_aws_sqs,
                ExAws
              )
  def start_link(init_args, opts \\ []) do
    GenStage.start_link(__MODULE__, init_args, opts)
  end

  def init(%{
        producer_id: producer,
        queue_name: queue_name,
        processor: processor_function,
        min_demand: min_demand,
        max_demand: max_demand
      }) do
    state = %{
      producer: producer,
      subscription: nil,
      queue: queue_name,
      processor: processor_function
    }

    GenStage.async_subscribe(
      self(),
      to: state.producer,
      min_demand: min_demand,
      max_demand: max_demand
    )

    {:consumer, state}
  end

  def handle_subscribe(:producer, _opts, from, state),
    do: {:automatic, Map.put(state, :subscription, from)}

  def handle_info(:init_ask, %{subscription: subscription} = state) do
    GenStage.ask(subscription, state.max_demand)

    {:noreply, [], state}
  end

  def handle_info(_, state), do: {:noreply, [], state}

  def handle_events(messages, _from, state) when is_nil(messages), do: {:noreply, [], state}

  def handle_events(messages, _from, state) do
    handle_messages(messages, state)
    {:noreply, [], state}
  end

  defp handle_messages(messages, state) do
    messages
    |> Enum.reduce([], &parse_message/2)
    |> process_message_batch(state.queue, state.processor)
  end

  defp parse_message(%{body: body, message_id: message_id, receipt_handle: receipt_handle}, acc) do
    case Poison.decode(body) do
      {:ok, decoded_body} ->
        [{decoded_body, %{id: message_id, receipt_handle: receipt_handle}} | acc]

      {:error, error_message} ->
        Logger.error(
          "An error has ocurred reading from queue, error message: #{inspect(error_message)} message body: #{
            inspect(body)
          }"
        )

        acc
    end
  end

  defp process_message_batch([], _, _), do: nil

  defp process_message_batch(messages_batch, queue_name, processor) do
    {bodies, metadatas} = Enum.unzip(messages_batch)
    Enum.map(bodies, fn body -> Task.start(fn -> processor.(body) end) end)

    SQS.delete_message_batch(queue_name, metadatas)
    |> @ex_aws_sqs.request
  end
end

0 ответов

Другие вопросы по тегам