gen_server закрытие сокета прослушивания
Я пытаюсь сделать так, чтобы процесс gen_server принял нового клиента и немедленно породил нового потомка для обработки следующего. Проблема, с которой я сталкиваюсь, заключается в том, что когда сокет завершается и последовательно завершает работу, он также закрывает сокет прослушивания, и я не могу понять, почему, даже если он больше не ссылается на него.
Есть идеи, что я делаю не так?
gen_server:
-module(simple_tcp).
-behaviour(gen_server).
%% API
-export([start_link/1, stop/0, start/0, start/1]).
%% gen-server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(DEFAULT_PORT, 1055).
-record(state, {port, lsock}).
start_link({port, Port}) ->
gen_server:start_link(?MODULE, [{port, Port}], []);
start_link({socket, Socket}) ->
gen_server:start_link(?MODULE, [{socket, Socket}], []).
start({port, Port}) ->
simple_tcp_sup:start_child({port, Port});
start({socket, Socket}) ->
simple_tcp_sup:start_child({socket, Socket}).
start() ->
start({port, ?DEFAULT_PORT}).
stop() ->
gen_server:cast(?SERVER, stop).
% Callback functions
init([{port, Port}]) ->
{ok, LSock} = gen_tcp:listen(Port, [{active, true},{reuseaddr, true}]),
init([{socket, LSock}]);
init([{socket, Socket}]) ->
io:fwrite("Starting server with socket: ~p~n", [self()]),
{ok, Port} = inet:port(Socket),
{ok, #state{port=Port, lsock=Socket}, 0}.
handle_call(_Msg, _From, State) ->
{noreply, State}.
handle_cast(stop, State) ->
{stop, ok, State}.
handle_info({tcp, Socket, RawData}, State) ->
gen_tcp:send(Socket, io_lib:fwrite("Received raw data: ~p~n", [RawData])),
{noreply, State};
handle_info({tcp_error, _Socket, Reason}, State) ->
io:fwrite("Error: ~p~n", [Reason]),
{stop, normal, State};
handle_info(timeout, #state{lsock = LSock} = State) ->
case gen_tcp:accept(LSock) of
{ok, Sock} ->
io:fwrite("Accepting connection...~p~n", [self()]),
start({socket, LSock}),
{noreply, #state{lsock=Sock}};
{error, Reason} ->
io:fwrite("Error: ~p, ~p~n", [Reason, self()]),
{stop, normal, State}
end;
handle_info({tcp_closed, _Port}, State) ->
io:fwrite("Socket closed: ~p~n", [self()]),
simple_tcp_sup:kill_child(self()),
{stop, normal, State}.
terminate(_Reason, _State) ->
io:fwrite("Shutting down server: ~p~n", [self()]),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
руководитель:
-module(simple_tcp_sup).
-behaviour(supervisor).
-export([start_link/0,
start_child/1
]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
start_child({socket, Socket}) ->
io:fwrite("Spawning child with socket...~n"),
supervisor:start_child(?SERVER, [{socket, Socket}]);
start_child({port, Port}) ->
io:fwrite("Spawning child with port...~n"),
supervisor:start_child(?SERVER, [{port, Port}]).
init([]) ->
Element = {simple_tcp, {simple_tcp, start_link, []},
temporary, brutal_kill, worker, [simple_tcp]},
Children = [Element],
RestartStrategy = {simple_one_for_one, 0, 1},
{ok, {RestartStrategy, Children}}.
2 ответа
Твой третий handle_info
меняет роль Sock
а также LSock
, Должно пройти Sock
для дочернего процесса и оставить свое собственное состояние без изменений.
Кстати, это плохая карма, чтобы восстановить State
с нуля (#state{lsock=Sock}
) вы всегда должны выводить новое State
от текущего State
(State#state{lsock=Sock}
), на случай, если позже вы добавите больше переменных состояния. На самом деле, в этом есть ошибка (хотя и незначительная), поскольку вы выбрасываете номер порта.
Что ж, я предлагаю вам разрешить обработку Socket отдельными процессами, которые асинхронно взаимодействуют с gen_server и linked
с этим. У меня есть пример кода, который покажет вам, как это можно сделать. Gen_server запускается и порождает прослушиватель TCP, который после успешного получения сокета прослушивания сообщает нашему gen_server об изменении его внутреннего состояния. Я расположил код сверху вниз. Все соответствующие функции были показаны. Сосредоточьтесь на процессах обработки сокетов и на том, как они взаимодействуют с gen_server
-define (PEER_CLIENT_TIMEOUT, таймер: секунды (20)). -Определить (PORT_RANGE,{10245,10265}). -Определить (ОТЛАДКА (X,Y),error_logger:info_msg(Х,Y)). -Определить (ОШИБКА (L),error_logger:error_report(L)). -define(SOCKET_OPTS(IP),[инет, двоичный, {непереданных,100},{пакет,0}, {Reuseaddr, истинно}, {активен, истинно}, {Ф, ИП}]). %% ------------------------------------------------ ---- %% gen_server начинается здесь.... начать (Одноранговое)-> gen_server:start_link({локальная, МОДУЛЬ?},? МОДУЛЬ, Одноранговое, []). %%% ------------------------------------------- Функция %% Gen_server init/1 INIT (Одноранговое)-> process_flag(trap_exit, правда), %% начиная всю цепочку сокетов ниже.. start_link_listener(), %% Сокет запущен, gen_server теперь может ожидать асинхронного %% Сообщения {Хорошо,[]}. %%% ---- Функции обработки сокетов --------- %% Функция: start_link_listener/0 %% Цель: запуск всей цепочки прослушивания %% и ждет подключения. выполненный %% непосредственно процессом gen_server, но %% порождает отдельный процесс, чтобы сделать все остальное start_link_listener () -> Ip_address = get_myaddr (), spawn_link (fun () -> listener (? SOCKET_OPTS (Ip_address)) end). %%% ---------------------------------------------- %% Функция: get_myaddr/0 %% Цель: выбрать активный IP-адрес на моей машине, чтобы %% прослушать get_myaddr()-> DEBUG("Сервер> Попытка извлечь мой локальный IP-адрес....",[]), {ok,Name} = inet:gethostname(), {ok,IP} = inet:getaddr(Имя, inet), DEBUG("Сервер> Найден живой локальный IP-адрес: ~p..... ~ n",[IP]), IP. %%% ----------------------------------------------- --- %% Функция: слушатель /1, выполняется в отдельном процессе %% Цель: Пытается получить заданный PORT_RANGE с заданными опциями сокета %% Как только он получает ListenSocket, он разыгрывает gen_server! слушатель (SocketOpts)-> process_flag(trap_exit, правда), Порты = списки:seq(элемент (1,?PORT_RANGE), элемент (2,?PORT_RANGE)), case try_listening(SocketOpts,Ports) из {Нормально, порт,LSocket}-> PP = proplists:get_value(ip,SocketOpts),? МОДУЛЬ:started_listener(порт,PP,LSocket), accept_connection(LSocket); {ошибка, ошибка} -> {ошибка, ошибка,SocketOpts} конец. try_listening (_Opts, []) -> {ошибка, ошибка}; try_listening(ОПТС, [Порт | Остальные])-> case gen_tcp:listen(Port,Opts) из {хорошо,Listen_Socket} -> {хорошо, порт,Listen_Socket}; {error,_} -> try_listening(Opts,Rest) конец. %%%--------------------------------------------------------- %% вспомогательные функции для преобразования IP-адреса из кортежа %% к строке и наоборот str(X) когда is_integer (X) -> integer_to_list (X). formalise_ipaddress ({А, В, С,D})-> str(A) ++ "." ++ str(B) ++ "." ++ str(C) ++ "." ++ стр (D). unformalise_address (String) -> [A, B, C, D] = строка: токены (строка,"."), {List_to_integer (А), list_to_integer (В), list_to_integer (С), list_to_integer (D)}. %%% ----------------------------------------------- --- %% Функция: get_source_connection/1 %% Назначение: получение IP и порта на другом %% конец соединения get_source_connection(Socket)-> попробуй инет:peername(сокет) из {ok,{IP_Address, Port}} -> [{IPaddress, formalise_ipaddress (IP_Address)}, {порт, порт}]; _ -> failed_to_retrieve_address ловить _:_ -> failed_to_retrieve_address конец. %%% ----------------------------------------------- ------ %% Функция: accept_connection/1 %% Назначение: ожидает подключения и повторно использует %% ListenSocket, порождая другой поток %% взять и послушать тоже. Это кастует gen_server %% при каждом подключении и предоставляет подробности об этом. accept_connection (ListenSocket) -> case gen_tcp: accept (ListenSocket, infinity) из {хорошо, Сокет}-> %% повторно использовать ListenSocket ниже..... spawn_link(fun() -> accept_connection(ListenSocket) end), OtherEnd = get_source_connection(Socket),? МОДУЛЬ:accepted_connection(OtherEnd), петля (гнездо,OtherEnd); {error,_} = Причина ->? ОШИБКА (["Слушателю не удалось принять соединение", {Слушатель, самостоятельно ()},{причина, причина}]) конец. %%% ----------------------------------------------- -------------------------- %% Функция: петля /2 %% Назначение: петля приема TCP, она вызывает gen_server %%, как только он что-то получит. gen_server %% отвечает за генерацию ответа %% OtherEnd::= [{ipAddress,StringIPAddress},{Порт, Порт}] или 'failed_to_retrieve_address' петля (гнездо,OtherEnd)-> Получать {tcp, Socket, Data} -> DEBUG("Acceptor: ~p получил двоичное сообщение от: ~p~n",[self(), OtherEnd]), Ответ =? МОДУЛЬ: входящий_бинарный_месяц (данные, другие конец), gen_tcp: отправить (Socket, ответ), gen_tcp: близко (гнездо), Выход (нормальный); {tcp_closed, Socket} -> DEBUG("Acceptor: ~p. Сокет закрыт другим концом: ~p~n",[self(), OtherEnd]),? МОДУЛЬ: socket_closed (OtherEnd), Выход (нормальный); Any ->?DEBUG("Acceptor: ~p получил сообщение: ~p~n",[self(), Any]) конец. %%% ---------------------------------------------- %% Gen_server Асинхронные API accept_connection (failed_to_retrieve_address) -> хорошо; accepted_connection ([{IPaddress, StringIPAddress}, {порт, порт}]) -> gen_server: литая (? МОДУЛЬ, {связно, StringIPAddress, порт}). socket_closed (failed_to_retrieve_address) -> хорошо; socket_closed ([{IPaddress, StringIPAddress}, {порт, порт}]) -> gen_server: литая (? МОДУЛЬ, {socket_closed, StringIPAddress, порт}). входящий_бинарный_месяц (данные, _OtherEnd) -> %%, ожидающий двоичный ответ case analyse_protocol (Data) из неправильный -> term_to_binary ("нарушение протокола!"); Val -> gen_server: call (?MODULE, {request, Val}, бесконечность) конец. %%% -------------------- дескриптор приведения ------------------------- ----------------- handle_cast ({listener_starts, _Port, _MyTupleIP, _LSocket} = Объект, Состояние) -> NewState = do_something_with_the_listen_report (Object), {Noreply, NewState}; handle_cast ({подключено, _StringIPAddress, _Port} = объект, состояние) -> NewState = do_something_with_the_connection_report (Object), {Noreply, NewState}; handle_cast ({socket_closed, _StringIPAddress, _Port} = Объект, Состояние) -> NewState = do_something_with_the_closed_connection_report (Object), {Noreply, NewState}; handle_cast (Любой, State) -> DEBUG("Сервер> Я получил какое-то неизвестное сообщение: ~p ~ n",[Любое]), {Noreply, State}. %%%% ---------------------- вызов вызова -------------- handle_call({запрос,Val},_,State)-> {NewState,Reply} = req(Val,State), {Ответ, ответ,NewState}; handle_call(_,_,State)-> {reply,[],State}. REQ (Val,State)-> %% изменить состояние gen_server и %% построить ответ {NewState,Reply} = modify_state_and_get_reply(State,Val), {NewState, Ответить}. %% ------------------- прекратить /2 -------------------- прекратить (_Reason,_State)-> хорошо. %% ----------------- code_change / 3 ------------------ code_change (_, State, _) -> {ok, State}.
С асинхронной возможностью gen_server мы можем обрабатывать детали сокета из отдельных связанных процессов. Затем эти процессы будут связываться с gen_server через cast
и не блокируя gen_server от его параллельной природы.