Elixir GenStage ConsumerSupervisor Перезагрузите ребенка

Из того, что я читаю здесь: https://hexdocs.pm/gen_stage/ConsumerSupervisor.html

все реализации ConsumerSupervisor запускают только дочерний элемент (модуль принтера по ссылке выше) для каждой единицы работы. Есть ли способ перезапустить ребенка, если он умрет?

Для меня с именем "ConsumerSupervisor" у него была бы возможность перезапустить детей, если ребенок обычно не выключается. Кто-нибудь делал это раньше?

В моей реализации у потребителя есть супервизор, который запускает ребенка, который на самом деле является GenServer, для выполнения работы, а затем выключает себя. Если он аварийно завершает работу, я хочу, чтобы он был перезапущен.

Мысли... Я думал о том, чтобы просто внедрить потребителя, который обращается к динамическому руководителю, а затем заводить детей, но это не учитывает обратное сохранение...

Вот как я это реализовал, но хочу, чтобы дети перезапустились, если они потерпят крах:

 defmodule Client.Strategy.EventPushing.SendingConsumer do
  use ConsumerSupervisor

  def start_link() do
    ConsumerSupervisor.start_link(__MODULE__, :ok)
  end

  # Callbacks

  def init(:ok) do
    children = [
      worker(Client.DynamicClient, [], restart: :temporary)
    ]

    {:ok, children, strategy: :one_for_one, subscribe_to: [{Client.Strategy.EventPushing.SendingProducerConsumer, max_demand: 10}]}
  end
end

И дети, которые раскручиваются, выглядят так:

defmodule Client.DynamicClient do
  use GenServer
  require Logger

  # Public API's
  def start_link(event) do
    Client.Statix.timing("sending", 1)
    GenServer.start_link(
      __MODULE__,
      event,
      name: String.to_atom(event.sequence_number)
    )
  end

  def schedule_work() do
    send(self(), :start_work)
  end

  # Call Backs
  def init(event) do
    schedule_work()
    {:ok, event}
  end

  def handle_info(
        :start_work,
        %{
          :filter => filter,
          :sequence_number => sequence_number,
          :message => message,
          :ip_address => ip,
          :port => port
        } = state
      ) do

    opts = [:binary, active: true]

    case :gen_tcp.connect(ip, port, opts) do
      {:ok, socket} ->
        :gen_tcp.send(socket, filter)
        :gen_tcp.send(socket, message)

      {:error, reason} ->
        Logger.error("Error")
        Process.exit(self(), :kill)
    end

    {:noreply,state}
  end

  def handle_info({:tcp, socket, msg}, state) do
    # :inet.setopts(socket, active: :once)

    :gen_tcp.close(socket)
     Process.exit(self(), :kill)
    {:noreply, state}
  end

  def handle_info({:tcp_closed, _socket,}, state) do
    Client.SendingSupervisor.stop_child(self())
  end
end

Спасибо

1 ответ

Проблема здесь: worker(Client.DynamicClient, [], restart: :temporary)Если вы хотите, чтобы ваш DynamicClient перезапускался при сбоях, вы должны использовать restart: :permanent

Другие вопросы по тегам