Erlang - проблема с функцией завершения
Я создаю модуль с Erlang, и у меня есть три варианта, которые добавить, редактировать и удалять.
В журналах видно, что функция add вызывается в методе init, но я не смог найти ничего, связанного с удалением сообщений. Я предполагаю, что это потому, что "метод завершения" не вызывается, но я не уверен, правильна ли моя функция или я вызываю функции редактирования и удаления в нужном месте.
это мой код:
-module(mod_msgschedule).
-behaviour(gen_server).
-behaviour(gen_mod).
-include("ejabberd.hrl").
-include("jlib.hrl").
%% gen_mod handlers
-export([start/2,stop/1]).
%% gen_server handlers
-export([init/1,handle_info/2, handle_call/3, handle_cast/2, terminate/2, code_change/3]).
%% Hook handlers
-export([
remove_delayed_message/2,
add_delayed_message/7,
edit_delayed_message/7,
search_delayed_messages/2,
check_packet/1,
process_sm_iq/3]).
-export([start_link/2]).
-define(INTERVAL, timer:minutes(5)).
-define(PROCNAME, ?MODULE).
-define(NS_DELAYMSG, "delayed-msg").
-record(state,{host :: binary()}).
-record(delayed_msg, {id,from,to,server,scheduledTimestamp,packet,relativeId}).
%%--------------------------------------------------------------------
%% gen_mod callbacks
%%--------------------------------------------------------------------
start(VHost, Opts) ->
ejabberd_loglevel:set_custom(?MODULE, 5),
?DEBUG("Start Module", []),
Proc = gen_mod:get_module_proc(VHost,?PROCNAME),
ChildSpec = {Proc, {?MODULE, start_link, [VHost,Opts]},
transient, 1000, worker, [?MODULE]},
supervisor:start_child(ejabberd_sup, ChildSpec).
stop(VHost) ->
Proc = gen_mod:get_module_proc(VHost,?PROCNAME),
supervisor:terminate_child(ejabberd_sup,Proc),
supervisor:delete_child(ejabberd_sup,Proc).
start_link(VHost, Opts) ->
Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
gen_server:start_link({local, Proc}, ?MODULE, [VHost, Opts],[]).
init([VHost, Opts]) ->
?DEBUG("Start Timer", []),
process_flag(trap_exit, true),
ejabberd_hooks:add(filter_local_packet, VHost, ?MODULE, check_packet, 10),
timer:send_interval(?INTERVAL, self(), tick),
IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue),
gen_iq_handler:add_iq_handler(ejabberd_sm, VHost, ?NS_DELAYMSG,?MODULE, process_sm_iq, IQDisc),
%%gen_iq_handler:add_iq_handler(ejabberd_sm, VHost, ?NS_VCARD,
%% ?MODULE,process_sm_iq, IQDisc),
%%gen_iq_handler:add_iq_handler(ejabberd_local, VHost, ?NS_VCARD,
%% ?MODULE,process_local_iq, IQDisc),
%%DirectoryHost = gen_mod:get_opt_host(VHost, Opts, "vjud.@HOST@"),
%%Search = gen_mod:get_opt(search, Opts, true),
%%case Search of
%% true ->
%% ejabberd_router:register_route(DirectoryHost);
%% _ ->
%% ok
%%end,
{ok,#state{host=VHost}}.
terminate(_Reason, State) ->
VHost = State#state.host,
%%case State#state.search of
%% true ->
%% ejabberd_router:unregister_route(State#state.directory_host);
%% _ ->
%% ok
%%end,
ejabberd_hooks:delete(filter_local_packet, VHost,?MODULE, check_packet, 10),
gen_iq_handler:remove_iq_handler(ejabberd_local, VHost, ?NS_DELAYMSG).
%%gen_iq_handler:remove_iq_handler(ejabberd_local, VHost, ?NS_VCARD),
%%gen_iq_handler:remove_iq_handler(ejabberd_sm, VHost, ?NS_VCARD),
%%ejabberd_hooks:delete(host_config_update, VHost, ?MODULE, config_change, 50),
%%ejabberd_hooks:delete(disco_local_features, VHost, ?MODULE, get_local_features, 50).
handle_call(get_state, _From, State) ->
{reply, {ok, State}, State};
handle_call(stop,_From,State) ->
{stop, normal, ok, State};
handle_call(_Request, _From,State) ->
{reply, bad_request, State}.
%% this function is called whenever gen_server receives a 'tick' message
handle_info(tick, State) ->
State2 = send_pending_delayed_messages(State),
{noreply, State2};
handle_info(_Info, State) ->
{noreply, State}.
handle_cast(_Request, State) ->
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% this function is called by handle_info/2 when tick message is received
send_pending_delayed_messages(State) ->
LServer = jlib:nameprep(State#state.host),
Now = erlang:now(),
CurrentTimestamp = now_to_microseconds(Now),
?DEBUG("Timer Triggered!! ~p", [CurrentTimestamp]),
case search_delayed_messages(LServer,CurrentTimestamp) of
{ok, DelayedMessages} ->
lists:foreach(fun(DelayedMessage) ->
route_scheduled_message(LServer, DelayedMessage),
remove_delayed_message(LServer, DelayedMessage)
end, DelayedMessages);
{error, Reason} ->
?DEBUG("Select command: ~p", [{error, Reason}])
end,
State.
route_scheduled_message(Server, #delayed_msg{from=From, to=To, packet=Packet} = DelayedMessage) ->
NewPacket = resend_scheduled_message_packet(Server,DelayedMessage),
ejabberd_router:route(From, To, NewPacket).
resend_scheduled_message_packet(Server,
#delayed_msg{scheduledTimestamp=TimeStamp, packet = Packet}) ->
add_timestamp(TimeStamp, Server, Packet).
add_timestamp(undefined, _Server, Packet) ->
Packet;
add_timestamp({_,_,Micro} = TimeStamp, Server, Packet) ->
{D,{H,M,S}} = calendar:now_to_universal_time(TimeStamp),
Time = {D,{H,M,S, Micro}},
TimeStampXML = timestamp_xml(Server, Time),
xml:append_subtags(Packet, [TimeStampXML]).
timestamp_xml(Server, Time) ->
FromJID = jlib:make_jid(<<>>, Server, <<>>),
jlib:timestamp_to_xml(Time, utc, FromJID, <<"Offline Storage">>).
%%--------------------------------------------------------------------
%% Hook handler
%%--------------------------------------------------------------------
check_packet({From, To, XML} = Packet) ->
#jid{luser = LUser, lserver = LServer} = From,
case XML#xmlel.name of
<<"message">> ->
Type = xml:get_tag_attr_s(list_to_binary("type"), XML),
case Type of
<<"chat">> ->
DeltaTimeStampS = binary_to_list(xml:get_path_s(XML, [{elem, list_to_binary("scheduled_time")}, cdata])),
case DeltaTimeStampS of
"" ->
Packet;
_ ->
RelativeId = binary_to_list(xml:get_path_s(XML, [{elem, list_to_binary("delayed_msg_id")}, cdata])),
{DeltaTimeStampI, _Rest} = string:to_integer(DeltaTimeStampS),
case _Rest of
[] ->
Action = binary_to_list(xml:get_path_s(XML, [{elem, list_to_binary("delayed_msg_action")}, cdata])),
ScheduledTimestamp = from_now_to_microseconds(erlang:now(),DeltaTimeStampI),
NewChildren = lists:delete(lists:keyfind(<<"scheduled_time">>, 2, XML#xmlel.children),XML#xmlel.children),
NewXML = XML#xmlel{ children = NewChildren },
case Action of
"edit" ->
edit_delayed_message(LServer, binary_to_list(From#jid.luser), binary_to_list(To#jid.luser), binary_to_list(To#jid.lserver), ScheduledTimestamp, NewXML,RelativeId);
"delete" ->
remove_delayed_message(LServer,#delayed_msg{from = binary_to_list(From#jid.luser), relativeId = RelativeId});
_ ->
add_delayed_message(LServer, binary_to_list(From#jid.luser), binary_to_list(To#jid.luser), binary_to_list(To#jid.lserver), ScheduledTimestamp, NewXML,RelativeId)
end
end,
drop
end;
<<"groupchat">> ->
DeltaTimeStampS = binary_to_list(xml:get_path_s(XML, [{elem, list_to_binary("scheduled_time")}, cdata])),
case DeltaTimeStampS of
"" ->
Packet;
_ ->
RelativeId = binary_to_list(xml:get_path_s(XML, [{elem, list_to_binary("delayed_msg_id")}, cdata])),
{DeltaTimeStampI, _Rest} = string:to_integer(DeltaTimeStampS),
case _Rest of
[] ->
Action = binary_to_list(xml:get_path_s(XML, [{elem, list_to_binary("delayed_msg_action")}, cdata])),
ScheduledTimestamp = from_now_to_microseconds(erlang:now(),DeltaTimeStampI),
NewChildren = lists:delete(lists:keyfind(<<"scheduled_time">>, 2, XML#xmlel.children),XML#xmlel.children),
NewXML = XML#xmlel{ children = NewChildren },
case Action of
"edit" ->
edit_delayed_message(LServer, binary_to_list(From#jid.luser), binary_to_list(To#jid.luser), binary_to_list(To#jid.lserver), ScheduledTimestamp, NewXML,RelativeId);
"delete" ->
remove_delayed_message(LServer,#delayed_msg{from = binary_to_list(From#jid.luser), relativeId = RelativeId});
_ ->
add_delayed_message(LServer, binary_to_list(From#jid.luser), binary_to_list(To#jid.luser), binary_to_list(To#jid.lserver), ScheduledTimestamp, NewXML,RelativeId)
end
end,
drop
end;
_ -> Packet
end;
_ -> Packet
end.
process_sm_iq(_From, _To, #iq{type = get, xmlns = ?NS_DELAYMSG} = IQ) ->
?INFO_MSG("Processing IQ Get query:~n ~p", [IQ]),
IQ#iq{type = result, sub_el = [{xmlelement, "value", [], [{xmlcdata, "Hello World of Testing."}]}]};
process_sm_iq(_From, _To, #iq{type = set} = IQ) ->
?INFO_MSG("Processing IQ Set: it does nothing", []),
IQ#iq{type = result, sub_el = []};
process_sm_iq(_From, _To, #iq{sub_el = SubEl} = IQ) ->
?INFO_MSG("Processing IQ other type: it does nothing", []),
IQ#iq{type = error, sub_el = [SubEl, ?ERR_FEATURE_NOT_IMPLEMENTED]}.
%%--------------------------------------------------------------------
%% ODBC Functions
%%--------------------------------------------------------------------
remove_delayed_message(LServer, #delayed_msg{from=FromUserName, relativeId = RelativeId}) ->
QR = ejabberd_odbc:sql_query(
LServer,
[<<"delete from delayed_message "
"where from_jid = '">>, ejabberd_odbc:escape(FromUserName#jid.luser),<<"' and relative_id = '">>,ejabberd_odbc:escape(list_to_binary(RelativeId)),<<"';">>]),
?DEBUG("DELETE ~p", [QR]).
prepare_delayed_message(SFromUserName, SToUsername, SServer, SScheduledTimestamp, SPacket,SRelativeId) ->
[<<"('">>, ejabberd_odbc:escape(list_to_binary(SFromUserName)),
<<"', '">>, ejabberd_odbc:escape(list_to_binary(SToUsername)),
<<"', '">>, ejabberd_odbc:escape(list_to_binary(SServer)),
<<"', ">>, integer_to_list(SScheduledTimestamp),
<<", '">>, ejabberd_odbc:escape(xml:element_to_binary(SPacket)),
<<"', '">>, ejabberd_odbc:escape(list_to_binary(SRelativeId)),
<<"')">>].
add_delayed_message(LServer, SFromUserName, SToUsername, SServer, SScheduledTimestamp, SPacket, SRelativeId) ->
Rows = prepare_delayed_message(SFromUserName, SToUsername, SServer, SScheduledTimestamp, SPacket,SRelativeId),
QR = ejabberd_odbc:sql_query(
LServer,
[<<"insert into delayed_message(from_jid,to_jid,server,scheduled_time,packet,relative_id) "
"values ">>, join(Rows, "")]),
?DEBUG("Delayed message inserted? ~p", [QR]).
edit_delayed_message(LServer,SFromUserName, SToUsername, SServer, SScheduledTimestamp, SPacket, SRelativeId) ->
ejabberd_odbc:sql_query(
LServer,
[<<"update delayed_message set to_jid='">>,ejabberd_odbc:escape(list_to_binary(SToUsername)),
<<"' , server='">>,ejabberd_odbc:escape(list_to_binary(SServer)),
<<"' , scheduled_time=">>,integer_to_list(SScheduledTimestamp),
<<", packet='">>,ejabberd_odbc:escape(xml:element_to_binary(SPacket)),
<<"' where from_jid='">>,ejabberd_odbc:escape(list_to_binary(SFromUserName)),
<<"' AND relative_id = '">>, ejabberd_odbc:escape(list_to_binary(SRelativeId)),<<"';">>]).
search_delayed_messages(LServer, SScheduledTimestamp) ->
ScheduledTimestamp = encode_timestamp(SScheduledTimestamp),
Query = [<<"select id,from_jid,to_jid,server,scheduled_time,packet,relative_id from delayed_message where ">>,
<<"(scheduled_time < ">>,ScheduledTimestamp,<<" OR ">>,ScheduledTimestamp,<<" = 0);">>],
case ejabberd_odbc:sql_query(LServer,Query) of
{selected, [<<"id">>,<<"from_jid">>,<<"to_jid">>,<<"server">>,<<"scheduled_time">>,<<"packet">>,<<"relative_id">>], Rows} ->
{ok, rows_to_records(Rows)};
{aborted, Reason} ->
{error, Reason};
{error, Reason} ->
{error, Reason}
end.
rows_to_records(Rows) ->
[row_to_record(Row) || Row <- Rows].
row_to_record({SId, SFromUserName, SToUsername, SServer, SScheduledTimestamp, SPacket,SRelativeId}) ->
Id = list_to_integer(binary_to_list(SId)),
Server = binary_to_list(SServer),
From = jlib:make_jid(SFromUserName,SServer,<<"fb">>),
To = jlib:make_jid(SToUsername,SServer,<<"fb">>),
ScheduledTimestamp = microseconds_to_now(list_to_integer(binary_to_list(SScheduledTimestamp))),
Packet = xml_stream:parse_element(SPacket),
RelativeId = binary_to_list(SRelativeId),
#delayed_msg{id = Id,
from = From,
to = To,
server = Server,
scheduledTimestamp = ScheduledTimestamp,
packet = Packet,
relativeId = RelativeId}.
%% ------------------------------------------------------------------
%% Helpers
choose_strategy(true, true, get) -> get;
choose_strategy(true, true, set) -> set;
choose_strategy(false, _, _ ) -> not_allowed;
choose_strategy(_, _, _ ) -> forbidden.
compare_bare_jids(#jid{luser = LUser, lserver = LServer},
#jid{luser = LUser, lserver = LServer}) -> true;
compare_bare_jids(_, _) -> false.
element_to_namespace(#xmlel{attrs = Attrs}) ->
xml:get_attr_s(<<"xmlns">>, Attrs);
element_to_namespace(_) ->
<<>>.
%% Skip invalid elements.
to_map(Elems) ->
[{NS, Elem} || Elem <- Elems, is_valid_namespace(NS = element_to_namespace(Elem))].
is_valid_namespace(Namespace) -> Namespace =/= <<>>.
error_iq(IQ=#iq{sub_el=SubElem}, ErrorStanza) ->
IQ#iq{type = error, sub_el = [SubElem, ErrorStanza]}.
from_now_to_microseconds({Mega, Secs, Micro}, FromNow) ->
Mega*1000*1000*1000*1000 + Secs * 1000 * 1000 + Micro + FromNow.
now_to_microseconds({Mega, Secs, Micro}) ->
Mega*1000*1000*1000*1000 + Secs * 1000 * 1000 + Micro.
encode_timestamp(TimeStamp) ->
integer_to_list(TimeStamp).
maybe_encode_timestamp(never) ->
"null";
maybe_encode_timestamp(TimeStamp) ->
encode_timestamp(TimeStamp).
microseconds_to_now(MicroSeconds) when is_integer(MicroSeconds) ->
Seconds = MicroSeconds div 1000000,
{Seconds div 1000000, Seconds rem 1000000, MicroSeconds rem 1000000}.
join([], _Sep) ->
[];
join([H|T], Sep) ->
[H, [[Sep, X] || X <- T]].
3 ответа
В общем, не следует использовать terminate
если нет необходимости писать собственные процессы или поведения, которые должны подключаться к иерархии надзора.
Я только что просмотрел код, поэтому, пожалуйста, поправьте меня, если я ошибаюсь в любом из моих предположений.
Здесь вы имеете дело с двумя вещами: модулем ejabberd и процессом, поддерживающим некоторые его функции, и вы слишком сильно переплетаете их инициализацию.
Под модулем ejabberd я подразумеваю компонент сервера (что не обязательно означает, что любой раздел, исходящий от клиента, будет проходить границы процесса при обработке сервером при вызове кода из компонента).
Вы вводите процесс, чтобы можно было измерить тики времени, и это нормально. Тем не менее, вы также помещаете некоторую инициализацию модуля в свою функцию инициализации процесса (init/1
).
Я вижу, что ни один из ваших обработчиков IQ/hook фактически не вызывает ваш процесс в своем коде - это хорошо, так как это означает, что они на самом деле не зависят от того, существует ли процесс или нет (например, когда он перезапускается супервизором).
Я бы предложил настроить ваш модуль в start/2
(зарегистрируйте обработчики ловушек, обработчики IQ, ...), разложите их в stop/1
и предположим, что супервизор перезапустит процесс в случае ошибок времени выполнения - не связывайте настройку / демонтаж модуля и время жизни процесса, поместив регистрацию (де) обработки в init
/terminate
, Если супервизор должен перезапустить ваш процесс, он должен быть почти незамедлительным - и даже тогда ваши обработчики IQ/hook не зависят от присутствующего модуля - зачем связывать их init
/terminate
?
Есть еще одна вещь - если вы хотите, чтобы модуль запускался только после запуска процесса (что иногда может потребоваться, хотя здесь это и не обязательно), помните, что supervisor:start_child
является синхронным и блокирующим - он вернется только после успешного запуска нового потомка. Получив эту гарантию (т. Е. Вызов вернул правильное возвращаемое значение), вы можете безопасно продолжить настройку модуля (настройка обработчика перехвата / IQ).
Если вам требуется, чтобы ваш код хорошо масштабировался, тогда timer
это не лучший выбор - используйте erlang:send_after/3
или Timeout
часть возвращаемого значения ожидается от handle_*
Обратные вызовы.
Спасибо за ответ. Поскольку я очень новичок в Erlang, я не знал, что должен скомпилировать файл erl. Я его скомпилировал и сейчас работает.
Это инструкция для компиляции файлов erl:
erlc -I / path / to / include / files / module_name.erl
Я никогда не использовал ejabberd, поэтому мои комментарии очень общие.
Вы используете как поведение gen_mod (обязательно для всех модулей ejjaberd), так и поведение gen_server из OTP. Я понимаю, что вы сделали это, чтобы использовать тиковое сообщение, но вы могли бы использовать функцию timer:apply_interval(Time,Module,Function,Args)
чтобы получить тот же результат. Таким образом, вы можете удалить все поведение gen_server и его call_back, и он останется только для обратных вызовов start и stop gen_mod.
В функции init/1 вы вызываете process_flag(trap_exit, true),
Я думаю, что это, в общем-то, плохая идея, особенно если у вас нет никакого управления сообщениями об ошибках (в вашем коде это должно обрабатываться одним предложением handle_info, но здесь это handle_info(_Info, State) -> {noreply, State}.
который будет соответствовать).