Elixir GenStage ConsumerSupervisor Перезагрузите ребенка
Из того, что я читаю здесь: https://hexdocs.pm/gen_stage/ConsumerSupervisor.html
все реализации ConsumerSupervisor запускают только дочерний элемент (модуль принтера по ссылке выше) для каждой единицы работы. Есть ли способ перезапустить ребенка, если он умрет?
Для меня с именем "ConsumerSupervisor" у него была бы возможность перезапустить детей, если ребенок обычно не выключается. Кто-нибудь делал это раньше?
В моей реализации у потребителя есть супервизор, который запускает ребенка, который на самом деле является GenServer, для выполнения работы, а затем выключает себя. Если он аварийно завершает работу, я хочу, чтобы он был перезапущен.
Мысли... Я думал о том, чтобы просто внедрить потребителя, который обращается к динамическому руководителю, а затем заводить детей, но это не учитывает обратное сохранение...
Вот как я это реализовал, но хочу, чтобы дети перезапустились, если они потерпят крах:
defmodule Client.Strategy.EventPushing.SendingConsumer do
use ConsumerSupervisor
def start_link() do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
# Callbacks
def init(:ok) do
children = [
worker(Client.DynamicClient, [], restart: :temporary)
]
{:ok, children, strategy: :one_for_one, subscribe_to: [{Client.Strategy.EventPushing.SendingProducerConsumer, max_demand: 10}]}
end
end
И дети, которые раскручиваются, выглядят так:
defmodule Client.DynamicClient do
use GenServer
require Logger
# Public API's
def start_link(event) do
Client.Statix.timing("sending", 1)
GenServer.start_link(
__MODULE__,
event,
name: String.to_atom(event.sequence_number)
)
end
def schedule_work() do
send(self(), :start_work)
end
# Call Backs
def init(event) do
schedule_work()
{:ok, event}
end
def handle_info(
:start_work,
%{
:filter => filter,
:sequence_number => sequence_number,
:message => message,
:ip_address => ip,
:port => port
} = state
) do
opts = [:binary, active: true]
case :gen_tcp.connect(ip, port, opts) do
{:ok, socket} ->
:gen_tcp.send(socket, filter)
:gen_tcp.send(socket, message)
{:error, reason} ->
Logger.error("Error")
Process.exit(self(), :kill)
end
{:noreply,state}
end
def handle_info({:tcp, socket, msg}, state) do
# :inet.setopts(socket, active: :once)
:gen_tcp.close(socket)
Process.exit(self(), :kill)
{:noreply, state}
end
def handle_info({:tcp_closed, _socket,}, state) do
Client.SendingSupervisor.stop_child(self())
end
end
Спасибо
1 ответ
Проблема здесь: worker(Client.DynamicClient, [], restart: :temporary)
Если вы хотите, чтобы ваш DynamicClient перезапускался при сбоях, вы должны использовать restart: :permanent