Elixir: настройка GenStages для удовлетворения динамических требований
У меня есть GenStage Producer <- Consumer для чтения и обработки чего-либо с сообщением из моей очереди Amazon SQS, что означает, что мой Consumer запрашивает спрос, когда ему нечего делать, а производитель просто получает и пытается извлечь события из амазонка. И сейчас он отлично работает для моего спроса, так как максимальное количество событий, обрабатываемых каждым потребителем, равно 1. Но если подумать о масштабируемости, я бы хотел установить max_demand выше для каждой стадии.
Мой первый подход состоял в том, чтобы увеличить max_demand до 10. Но потом пришло предостережение, согласно документации:
При реализации потребителей мы часто устанавливаем:max_demand и:min_demand в подписке. Параметр:max_demand указывает максимальное количество событий, которые должны быть в потоке, в то время как параметр:min_demand указывает минимальное пороговое значение, которое необходимо инициировать для увеличения спроса. Например, если:max_demand равно 1000 и:min_demand равно 750, потребитель первоначально запросит 1000 событий и запросит больше только после того, как получит не менее 250.
Это означает, что если в очереди только 1 событие, а другие не появляются или занимают много времени, мне придется подождать, пока не будет достигнуто 10, чтобы обработать это 1 событие. Это очень плохо для нашего бизнеса и, на мой взгляд, немного странно.
Поэтому мой вопрос:
Как я могу обойти это и установить максимум спроса выше 1, но заставить его обрабатывать любой номер входящего запроса?
(бонусный вопрос): Почему GenStage был разработан таким образом? В чем выгода?
Вот абстракция, которую я сделал для использования из событий производителя... Я думаю, что код производителя более прост, поскольку он просто предоставляет данные при запросе, но если кто-то сочтет это необходимым, я могу добавить сюда:
defmodule MobileApi.GenstageWorkers.AwsSqsConsumer do
use GenStage
require Logger
alias ExAws.SQS
@ex_aws_sqs Application.get_env(
:mobile_api,
:ex_aws_sqs,
ExAws
)
def start_link(init_args, opts \\ []) do
GenStage.start_link(__MODULE__, init_args, opts)
end
def init(%{
producer_id: producer,
queue_name: queue_name,
processor: processor_function,
min_demand: min_demand,
max_demand: max_demand
}) do
state = %{
producer: producer,
subscription: nil,
queue: queue_name,
processor: processor_function
}
GenStage.async_subscribe(
self(),
to: state.producer,
min_demand: min_demand,
max_demand: max_demand
)
{:consumer, state}
end
def handle_subscribe(:producer, _opts, from, state),
do: {:automatic, Map.put(state, :subscription, from)}
def handle_info(:init_ask, %{subscription: subscription} = state) do
GenStage.ask(subscription, state.max_demand)
{:noreply, [], state}
end
def handle_info(_, state), do: {:noreply, [], state}
def handle_events(messages, _from, state) when is_nil(messages), do: {:noreply, [], state}
def handle_events(messages, _from, state) do
handle_messages(messages, state)
{:noreply, [], state}
end
defp handle_messages(messages, state) do
messages
|> Enum.reduce([], &parse_message/2)
|> process_message_batch(state.queue, state.processor)
end
defp parse_message(%{body: body, message_id: message_id, receipt_handle: receipt_handle}, acc) do
case Poison.decode(body) do
{:ok, decoded_body} ->
[{decoded_body, %{id: message_id, receipt_handle: receipt_handle}} | acc]
{:error, error_message} ->
Logger.error(
"An error has ocurred reading from queue, error message: #{inspect(error_message)} message body: #{
inspect(body)
}"
)
acc
end
end
defp process_message_batch([], _, _), do: nil
defp process_message_batch(messages_batch, queue_name, processor) do
{bodies, metadatas} = Enum.unzip(messages_batch)
Enum.map(bodies, fn body -> Task.start(fn -> processor.(body) end) end)
SQS.delete_message_batch(queue_name, metadatas)
|> @ex_aws_sqs.request
end
end