gen_server закрытие сокета прослушивания

Я пытаюсь сделать так, чтобы процесс gen_server принял нового клиента и немедленно породил нового потомка для обработки следующего. Проблема, с которой я сталкиваюсь, заключается в том, что когда сокет завершается и последовательно завершает работу, он также закрывает сокет прослушивания, и я не могу понять, почему, даже если он больше не ссылается на него.

Есть идеи, что я делаю не так?

gen_server:

-module(simple_tcp).
-behaviour(gen_server).

%% API
-export([start_link/1, stop/0, start/0, start/1]).

%% gen-server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-define(SERVER, ?MODULE).
-define(DEFAULT_PORT, 1055).

-record(state, {port, lsock}).

start_link({port, Port}) ->
    gen_server:start_link(?MODULE, [{port, Port}], []);

start_link({socket, Socket}) ->
    gen_server:start_link(?MODULE, [{socket, Socket}], []).

start({port, Port}) ->
    simple_tcp_sup:start_child({port, Port});

start({socket, Socket}) ->
    simple_tcp_sup:start_child({socket, Socket}).

start() ->
    start({port, ?DEFAULT_PORT}).

stop() ->
    gen_server:cast(?SERVER, stop).

% Callback functions
init([{port, Port}]) ->
    {ok, LSock} = gen_tcp:listen(Port, [{active, true},{reuseaddr, true}]),
    init([{socket, LSock}]);

init([{socket, Socket}]) ->
    io:fwrite("Starting server with socket: ~p~n", [self()]),
    {ok, Port} = inet:port(Socket),
    {ok, #state{port=Port, lsock=Socket}, 0}. 

handle_call(_Msg, _From, State) ->
    {noreply, State}.

handle_cast(stop, State) ->
    {stop, ok, State}.

handle_info({tcp, Socket, RawData}, State) ->
    gen_tcp:send(Socket, io_lib:fwrite("Received raw data: ~p~n", [RawData])),
    {noreply, State};

handle_info({tcp_error, _Socket, Reason}, State) ->
    io:fwrite("Error: ~p~n", [Reason]),
    {stop, normal, State};

handle_info(timeout, #state{lsock = LSock} = State) ->
    case gen_tcp:accept(LSock) of
        {ok, Sock} ->
            io:fwrite("Accepting connection...~p~n", [self()]),
            start({socket, LSock}),
            {noreply, #state{lsock=Sock}};

        {error, Reason} ->
            io:fwrite("Error: ~p, ~p~n", [Reason, self()]),
            {stop, normal, State}
    end;

handle_info({tcp_closed, _Port}, State) ->
    io:fwrite("Socket closed: ~p~n", [self()]),
    simple_tcp_sup:kill_child(self()),
    {stop, normal, State}.

terminate(_Reason, _State) ->
    io:fwrite("Shutting down server: ~p~n", [self()]),
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

руководитель:

-module(simple_tcp_sup).

-behaviour(supervisor).

-export([start_link/0,
         start_child/1
        ]). 

-export([init/1]).

-define(SERVER, ?MODULE).

start_link() ->
    supervisor:start_link({local, ?SERVER}, ?MODULE, []).

start_child({socket, Socket}) ->
    io:fwrite("Spawning child with socket...~n"),
    supervisor:start_child(?SERVER, [{socket, Socket}]);

start_child({port, Port}) ->
    io:fwrite("Spawning child with port...~n"),
    supervisor:start_child(?SERVER, [{port, Port}]).

init([]) ->
    Element = {simple_tcp, {simple_tcp, start_link, []},
               temporary, brutal_kill, worker, [simple_tcp]},
    Children = [Element],
    RestartStrategy = {simple_one_for_one, 0, 1}, 
    {ok, {RestartStrategy, Children}}.

2 ответа

Твой третий handle_info меняет роль Sock а также LSock, Должно пройти Sock для дочернего процесса и оставить свое собственное состояние без изменений.

Кстати, это плохая карма, чтобы восстановить State с нуля (#state{lsock=Sock}) вы всегда должны выводить новое State от текущего State (State#state{lsock=Sock}), на случай, если позже вы добавите больше переменных состояния. На самом деле, в этом есть ошибка (хотя и незначительная), поскольку вы выбрасываете номер порта.

Что ж, я предлагаю вам разрешить обработку Socket отдельными процессами, которые асинхронно взаимодействуют с gen_server и linked с этим. У меня есть пример кода, который покажет вам, как это можно сделать. Gen_server запускается и порождает прослушиватель TCP, который после успешного получения сокета прослушивания сообщает нашему gen_server об изменении его внутреннего состояния. Я расположил код сверху вниз. Все соответствующие функции были показаны. Сосредоточьтесь на процессах обработки сокетов и на том, как они взаимодействуют с gen_server

-define (PEER_CLIENT_TIMEOUT, таймер: секунды (20)).
-Определить (PORT_RANGE,{10245,10265}).
-Определить (ОТЛАДКА (X,Y),error_logger:info_msg(Х,Y)).
-Определить (ОШИБКА (L),error_logger:error_report(L)).
-define(SOCKET_OPTS(IP),[инет, двоичный, {непереданных,100},{пакет,0},
                            {Reuseaddr, истинно}, {активен, истинно},
                            {Ф, ИП}]).

%% ------------------------------------------------ ----
%% gen_server начинается здесь....

начать (Одноранговое)-> 
    gen_server:start_link({локальная, МОДУЛЬ?},? МОДУЛЬ, Одноранговое, []).

%%% -------------------------------------------
Функция %% Gen_server init/1

INIT (Одноранговое)->
    process_flag(trap_exit, правда),
    %% начиная всю цепочку сокетов ниже..
    start_link_listener(),
    %% Сокет запущен, gen_server теперь может ожидать асинхронного
    %% Сообщения
    {Хорошо,[]}.

%%% ---- Функции обработки сокетов ---------

%% Функция: start_link_listener/0
%% Цель: запуск всей цепочки прослушивания
%% и ждет подключения. выполненный
%% непосредственно процессом gen_server, но
%% порождает отдельный процесс, чтобы сделать все остальное

start_link_listener () -> 
    Ip_address = get_myaddr (),  
    spawn_link (fun () -> listener (? SOCKET_OPTS (Ip_address)) end).

%%% ----------------------------------------------   
%% Функция: get_myaddr/0
%% Цель: выбрать активный IP-адрес на моей машине, чтобы
%% прослушать

get_myaddr()-> 
    DEBUG("Сервер> Попытка извлечь мой локальный IP-адрес....",[]),
    {ok,Name} = inet:gethostname(),
    {ok,IP} = inet:getaddr(Имя, inet),
    DEBUG("Сервер> Найден живой локальный IP-адрес: ~p..... ~ n",[IP]),
    IP.

%%% ----------------------------------------------- ---
%% Функция: слушатель /1, выполняется в отдельном процессе
%% Цель: Пытается получить заданный PORT_RANGE с заданными опциями сокета
%% Как только он получает ListenSocket, он разыгрывает gen_server!

слушатель (SocketOpts)->
    process_flag(trap_exit, правда),
    Порты = списки:seq(элемент (1,?PORT_RANGE), элемент (2,?PORT_RANGE)),
    case try_listening(SocketOpts,Ports) из
        {Нормально, порт,LSocket}->              
                PP = proplists:get_value(ip,SocketOpts),? МОДУЛЬ:started_listener(порт,PP,LSocket),              
                accept_connection(LSocket);
        {ошибка, ошибка} -> {ошибка, ошибка,SocketOpts}
    конец.

try_listening (_Opts, []) -> {ошибка, ошибка};
try_listening(ОПТС, [Порт | Остальные])->
    case gen_tcp:listen(Port,Opts) из
        {хорошо,Listen_Socket} -> {хорошо, порт,Listen_Socket};
        {error,_} -> try_listening(Opts,Rest)
    конец.
%%%---------------------------------------------------------
%% вспомогательные функции для преобразования IP-адреса из кортежа
%% к строке и наоборот

str(X) когда is_integer (X) -> integer_to_list (X).

formalise_ipaddress ({А, В, С,D})-> 
    str(A) ++ "." ++ str(B) ++ "." ++ str(C) ++ "." ++ стр (D).

unformalise_address (String) -> 
    [A, B, C, D] = строка: токены (строка,"."),
    {List_to_integer (А), list_to_integer (В), list_to_integer (С), list_to_integer (D)}.

%%% ----------------------------------------------- ---
%% Функция: get_source_connection/1
%% Назначение: получение IP и порта на другом
%% конец соединения

get_source_connection(Socket)->
    попробуй инет:peername(сокет) из
        {ok,{IP_Address, Port}} -> 
            [{IPaddress, formalise_ipaddress (IP_Address)}, {порт, порт}];
        _ -> failed_to_retrieve_address
    ловить
        _:_ -> failed_to_retrieve_address
    конец.

%%% ----------------------------------------------- ------
%% Функция: accept_connection/1
%% Назначение: ожидает подключения и повторно использует 
%%          ListenSocket, порождая другой поток
%% взять и послушать тоже. Это кастует gen_server
%% при каждом подключении и предоставляет подробности об этом.

accept_connection (ListenSocket) ->    
    case gen_tcp: accept (ListenSocket, infinity) из
        {хорошо, Сокет}-> 
            %% повторно использовать ListenSocket ниже.....
            spawn_link(fun() -> accept_connection(ListenSocket) end),            
            OtherEnd = get_source_connection(Socket),? МОДУЛЬ:accepted_connection(OtherEnd),          
            петля (гнездо,OtherEnd);
        {error,_} = Причина ->? ОШИБКА (["Слушателю не удалось принять соединение",
                    {Слушатель, самостоятельно ()},{причина, причина}])
    конец.

%%% ----------------------------------------------- --------------------------
%% Функция: петля /2
%% Назначение: петля приема TCP, она вызывает gen_server
%%, как только он что-то получит. gen_server
%% отвечает за генерацию ответа
%% OtherEnd::= [{ipAddress,StringIPAddress},{Порт, Порт}] или 'failed_to_retrieve_address'

петля (гнездо,OtherEnd)-> 
    Получать
        {tcp, Socket, Data} -> 
            DEBUG("Acceptor: ~p получил двоичное сообщение от: ~p~n",[self(), OtherEnd]),
            Ответ =? МОДУЛЬ: входящий_бинарный_месяц (данные, другие конец),
            gen_tcp: отправить (Socket, ответ),         
            gen_tcp: близко (гнездо),
            Выход (нормальный);
        {tcp_closed, Socket} -> 
            DEBUG("Acceptor: ~p. Сокет закрыт другим концом: ~p~n",[self(), OtherEnd]),? МОДУЛЬ: socket_closed (OtherEnd),
            Выход (нормальный);
        Any ->?DEBUG("Acceptor: ~p получил сообщение: ~p~n",[self(), Any])
    конец.

%%% ----------------------------------------------
%% Gen_server Асинхронные API

accept_connection (failed_to_retrieve_address) -> хорошо;
accepted_connection ([{IPaddress, StringIPAddress}, {порт, порт}]) ->     
    gen_server: литая (? МОДУЛЬ, {связно, StringIPAddress, порт}).

socket_closed (failed_to_retrieve_address) -> хорошо;
socket_closed ([{IPaddress, StringIPAddress}, {порт, порт}]) ->
    gen_server: литая (? МОДУЛЬ, {socket_closed, StringIPAddress, порт}).

входящий_бинарный_месяц (данные, _OtherEnd) -> %%, ожидающий двоичный ответ
    case analyse_protocol (Data) из
        неправильный -> term_to_binary ("нарушение протокола!");
        Val -> gen_server: call (?MODULE, {request, Val}, бесконечность)
    конец.

%%% -------------------- дескриптор приведения ------------------------- -----------------

handle_cast ({listener_starts, _Port, _MyTupleIP, _LSocket} = Объект, Состояние) ->
    NewState = do_something_with_the_listen_report (Object),
    {Noreply, NewState};
handle_cast ({подключено, _StringIPAddress, _Port} = объект, состояние) ->
    NewState = do_something_with_the_connection_report (Object),
    {Noreply, NewState};
handle_cast ({socket_closed, _StringIPAddress, _Port} = Объект, Состояние) ->
    NewState = do_something_with_the_closed_connection_report (Object),
    {Noreply, NewState};
handle_cast (Любой, State) ->
    DEBUG("Сервер> Я получил какое-то неизвестное сообщение: ~p ~ n",[Любое]),
    {Noreply, State}.


%%%% ---------------------- вызов вызова --------------
handle_call({запрос,Val},_,State)->
    {NewState,Reply} = req(Val,State),
    {Ответ, ответ,NewState};
handle_call(_,_,State)-> {reply,[],State}.

REQ (Val,State)->
    %% изменить состояние gen_server и 
    %% построить ответ
    {NewState,Reply} = modify_state_and_get_reply(State,Val),
    {NewState, Ответить}.

%% ------------------- прекратить /2 --------------------

прекратить (_Reason,_State)-> хорошо.  

%% ----------------- code_change / 3 ------------------

code_change (_, State, _) -> {ok, State}.

С асинхронной возможностью gen_server мы можем обрабатывать детали сокета из отдельных связанных процессов. Затем эти процессы будут связываться с gen_server через cast и не блокируя gen_server от его параллельной природы.

Другие вопросы по тегам