Эликсир: Genserver.call не инициирует handle_call
Я реализую Gossip Algorithm
в котором несколько действующих лиц одновременно распространяют сплетни. Система останавливается, когда каждый из Актеров прослушивает Сплетни 10 раз.
Теперь у меня есть сценарий, в котором я проверяю количество прослушиваний актера-получателя, прежде чем отправить ему сплетню. Если количество прослушиваний уже равно 10, сплетни не будут отправлены получателю. Я делаю это с помощью синхронного вызова, чтобы получить количество прослушиваний.
def get_message(server, msg) do
GenServer.call(server, {:get_message, msg})
end
def handle_call({:get_message, msg}, _from, state) do
listen_count = hd(state)
{:reply, listen_count, state}
end
Программа хорошо запускается в начале, но через некоторое время Genserver.call
останавливается с ошибкой тайм-аута, как показано ниже. После некоторой отладки я понял, что Genserver.call
становится бездействующим и не может инициировать соответствующий handle_call
метод. Ожидается ли такое поведение при использовании синхронных вызовов? Поскольку все актеры независимы, не должны Genserver.call
методы работают независимо друг от друга, не ожидая ответа друг друга.
02:28:05.634 [error] GenServer #PID<0.81.0> terminating
** (stop) exited in: GenServer.call(#PID<0.79.0>, {:get_message, []}, 5000)
** (EXIT) time out
(elixir) lib/gen_server.ex:774: GenServer.call/3
Редактировать: следующий код может воспроизвести ошибку при запуске в оболочке iex.
defmodule RumourActor do
use GenServer
def start_link(opts) do
{:ok, pid} = GenServer.start_link(__MODULE__,opts)
{pid}
end
def set_message(server, msg, recipient) do
GenServer.cast(server, {:set_message, msg, server, recipient})
end
def get_message(server, msg) do
GenServer.call(server, :get_message)
end
def init(opts) do
state=opts
{:ok,state}
end
def handle_cast({:set_message, msg, server, recipient},state) do
:timer.sleep(5000)
c = RumourActor.get_message(recipient, [])
IO.inspect c
{:noreply,state}
end
def handle_call(:get_message, _from, state) do
count = tl(state)
{:reply, count, state}
end
end
Откройте оболочку iex и загрузите модуль выше. Запустите два процесса, используя:
a = RumourActor.start_link(["", 3])
b = RumourActor.start_link(["", 5])
Произведите ошибку, вызвав условие тупика, как упомянуто Догбертом в комментариях. Запустите следующий без большой разницы во времени.
cb = RumourActor.set_message(elem(a,0), [], elem(b,0))
ca = RumourActor.set_message(elem(b,0), [], elem(a,0))
Подождите 5 секунд. Ошибка появится.
2 ответа
Протокол сплетен - это способ работы с асинхронными, неизвестными, ненастроенными (случайными) сетями, которые могут испытывать периодические перебои и перерывы в работе, и где отсутствует лидер или структура по умолчанию. (Обратите внимание, что эта ситуация несколько необычна в реальном мире, и внешнее управление всегда каким-либо образом навязывается системам.)
Имея это в виду, давайте изменим это, чтобы быть асинхронной системой (используя cast
) так что мы следуем духу концепции болтливого общения в стиле сплетен.
Нам нужен дайджест сообщений, который подсчитывает, сколько раз было получено данное сообщение, дайджест сообщений, которые были получены и уже превысили магический номер (поэтому мы не пересылаем одно, если это слишком поздно), и список процессов, зарегистрированных в нашей системе, чтобы мы знали, кому мы вещаем:
(Следующий пример написан на Erlang, потому что я перебираю синтаксис Elixir с тех пор, как перестал его использовать...)
-module(rumor).
-record(s,
{peers = [] :: [pid()],
digest = #{} :: #{message_id(), non_neg_integer()},
dead = sets:new() :: sets:set(message_id())}).
-type message_id() :: zuuid:uuid().
Здесь я использую UUID, но это может быть что угодно. Ссылка на Erlang подойдет для тестового примера, но поскольку сплетни бесполезны в кластере Erlang, а ссылки за пределами исходной системы небезопасны, я просто пытаюсь предположить, что это для сетевой системы.
Нам понадобится интерфейсная функция, которая позволит нам сообщить процессу о введении нового сообщения в систему. Нам также понадобится интерфейсная функция, которая отправляет сообщение между двумя процессами, когда оно уже находится в системе. Тогда нам понадобится внутренняя функция, которая передает сообщения всем известным (подписанным) одноранговым узлам. Ах, это означает, что нам нужен интерфейс приветствия, чтобы одноранговые процессы могли уведомлять друг друга об их присутствии.
Мы также хотим, чтобы процесс говорил себе о продолжении вещания во времени. Как долго установить интервал повторной передачи, на самом деле не является простым решением - оно имеет отношение к топологии сети, задержке, изменчивости и т. Д. (На самом деле вы, вероятно, время от времени пингуете одноранговые узлы и разрабатываете некоторую эвристику на основе задержки, отбрасывает одноранговые узлы, которые казаться безразличным и т. д. - но мы не собираемся входить в это безумие здесь). Здесь я просто собираюсь установить его на 1 секунду, потому что это простой для понимания интервал для людей, наблюдающих за системой.
Обратите внимание, что все ниже - асинхронно.
Интерфейсы...
insert(Pid, Message) ->
gen_server:cast(Pid, {insert, Message}).
relay(Pid, ID, Message) ->
gen_server:cast(Pid, {relay, ID, Message}).
greet(Pid) ->
gen_server:cast(Pid, {greet, self()}).
make_introduction(Pid, PeerPid) ->
gen_server:cast(Pid, {make_introduction, PeerPid}).
Эта последняя функция станет для нас, как тестеров системы, вызовом одного из процессов greet/1
на некотором целевом Pid, чтобы они начали строить одноранговую сеть. В реальном мире обычно происходит что-то немного другое.
Внутри нашего обратного вызова gen_server для получения приведения мы получим:
handle_cast({insert, Message}, State) ->
NewState = do_insert(Message, State);
{noreply, NewState};
handle_cast({relay, ID, Message}, State) ->
NewState = do_relay(ID, Message, State),
{noreply, NewState};
handle_cast({greet, Peer}, State) ->
NewState = do_greet(Peer, State),
{noreply, NewState};
handle_cast({make_introduction, Peer}, State) ->
NewState = do_make_introduction(Peer, State),
{noreply, NewState}.
Довольно простые вещи.
Выше я упомянул, что нам нужен способ, чтобы эта вещь заставила себя повторно отправиться после задержки. Чтобы сделать это, мы собираемся отправить нам "голое сообщение" в "redo_relay" после задержки, используя erlang:send_after/3
поэтому нам понадобится handle_info/2 для его решения:
handle_info({redo_relay, ID, Message}, State) ->
NewState = do_relay(ID, Message, State),
{noreply, NewState}.
Реализация битов сообщения - это интересная часть, но все это не так сложно. Прости do_relay/3
ниже - это может быть более кратким, но я пишу это в браузере на макушке, так что...
do_insert(Message, State = #s{peers = Peers, digest = Digest}) ->
MessageID = zuuid:v1(),
NewDigest = maps:put(MessageID, 1, Digest),
ok = broadcast(Message, Peers),
ok = schedule_resend(MessageID, Message),
State#s{digest = NewDigest}.
do_relay(ID,
Message,
State = #s{peers = Peers, digest = Digest, dead = Dead}) ->
case maps:find(ID, Digest) of
{ok, Count} when Count >= 10 ->
NewDigest = maps:remove(ID, Digest),
NewDead = sets:add_element(ID, Dead),
ok = broadcast(Message, Peers),
State#s{digest = NewDigest, dead = NewDead};
{ok, Count} ->
NewDigest = maps:put(ID, Count + 1),
ok = broadcast(ID, Message, Peers),
ok = schedule_resend(ID, Message),
State#s{digest = NewDigest};
error ->
case set:is_element(ID, Dead) of
true ->
State;
false ->
NewDigest = maps:put(ID, 1),
ok = broadcast(Message, Peers),
ok = schedule_resend(ID, Message),
State#s{digest = NewDigest}
end
end.
broadcast(ID, Message, Peers) ->
Forward = fun(P) -> relay(P, ID, Message),
lists:foreach(Forward, Peers).
schedule_resend(ID, Message) ->
_ = erlang:send_after(1000, self(), {redo_relay, ID, Message}),
ok.
И теперь нам нужны социальные биты...
do_greet(Peer, State = #s{peers = Peers}) ->
case lists:member(Peer, Peers) of
false -> State#s{peers = [Peer | Peers]};
true -> State
end.
do_make_introduction(Peer, State = #s{peers = Peers}) ->
ok = greet(Peer),
do_greet(Peer, State).
Так что же делали все эти ужасно нетипичные вещи?
Это позволило избежать любой возможности тупика. Причина взаимных блокировок настолько, что смертельно опасна в одноранговых системах, состоит в том, что всякий раз, когда у вас есть два идентичных процесса (или актера, или что-то еще), взаимодействующих синхронно, вы создаете случай учебного пособия потенциального тупика.
В любой момент A
имеет синхронное сообщение, направленное к B
а также B
имеет синхронное сообщение, направленное к A
в то же время у вас теперь тупик. Невозможно создать идентичные процессы, которые синхронно вызывают друг друга, не создавая потенциальную тупиковую ситуацию. В массово параллельных системах все, что может произойти почти наверняка, в конечном итоге произойдет, поэтому рано или поздно вы столкнетесь с этим.
Сплетня должна быть асинхронной по определенной причине: это небрежный, ненадежный, неэффективный способ справиться с небрежной, ненадежной, неэффективной топологией сети. Попытка совершать звонки вместо приведения не только отрицает цель ретрансляции сообщений в стиле сплетни, но и толкает вас в невозможную тупиковую зону, связанную с изменением природы протокола с асинхронной на синхронизацию.
Genser.call
по умолчанию имеет время ожидания5000 миллисекунд. Так что, вероятно, происходит то, что очередь сообщений субъекта заполнена миллионами сообщений, и к тому времени, когда она достигаетcall
Время ожиданиявызывающего актера истекло.
Вы можете справиться с таймаутом, используяtry...catch
:
try do
c = RumourActor.get_message(recipient, [])
catch
:exit, reason ->
# handle timeout
Теперь вызываемый актер наконец доберется доcall
сообщение и ответ, который придет как неожиданное сообщение для первого процесса. Это вам нужно обрабатывать с помощьюhandle_info
, Так что одним из способов является игнорирование ошибки в catch
заблокировать и отправить слух handle_info
,
Кроме того, это значительно ухудшит производительность, если многие процессы ожидают тайм-аут в течение 5 секунд, прежде чем двигаться дальше. Можно намеренно сократить время ожидания и обработать ответ в handle_info
, Это приведет к использованию cast
и обработка ответа от другого процесса.
Ваш блокирующий вызов должен быть разбит на два неблокирующих вызова. Поэтому, если A делает блокирующий вызов B, вместо ожидания ответа, A может попросить B отправить свое состояние на заданный адрес (адрес A) и двигаться дальше. Затем А обработает это сообщение отдельно и при необходимости ответит.
A.fun1():
body of A before blocking call
result = blockingcall()
do things based on result
нужно разделить на:
A.send():
body of A before blocking call
nonblockingcall(A.receive) #A.receive is where B should send results
do other things
A.receive(result):
do things based on result