根据XEP-0199, XMPP客户端和服务器都可以在XML流上发送应用层PING请求。因为XMPP依赖底层的TCP连接,有可能TCP连接意外中断,而上层的XMPP并不知晓,从而影响消息传递。通过发送应用层PING请求可以来确认对端的连接可用性。
以服务器发给客户端为例,协议如下:
发送的PING请求:
1 2 3
| <iq from='capulet.lit' to='juliet@capulet.lit/balcony' id='s2c1' type='get'> <ping xmlns='urn:xmpp:ping'/> </iq>
|
如果对端支持PING请求,则返回对应的”PONG”回应。
1
| <iq from='juliet@capulet.lit/balcony' to='capulet.lit' id='s2c1' type='result'/>
|
如果对端不支持则返回错误。
1 2 3 4 5 6
| <iq from='juliet@capulet.lit/balcony' to='capulet.lit' id='s2c1' type='error'> <ping xmlns='urn:xmpp:ping'/> <error type='cancel'> <service-unavailable xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/> </error> </iq>
|
ejabberd中PING功能实现位于mod_ping.erl。它主要支持3个配置:
如果这个选项设置为true, 当客户端在给定时间间隔内没有活动,则向客户端发送一个ping请求。
设置上述send_pings选项中客户端没有活动的时间间隔。
- timeout_action: none|kill
表示当PING请求发出32秒后,ejabberd依然没有收到PING响应,服务端如何处理。none表示什么也不做,kill表示关闭客户端连接。
当ejabberd启动时会调用mod_ping:start/2。
1 2 3 4 5
| start(Host, Opts) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), PingSpec = {Proc, {?MODULE, start_link, [Host, Opts]}, transient, 2000, worker, [?MODULE]}, supervisor:start_child(?SUPERVISOR, PingSpec).
|
start函数调用supervisor:start_child/2为每个支持的host创建一个负责该host的worker进程。
进程树模型如下:
token data1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| +------------+ |ejabberd_sup| +-----+------+ | | +------------------+ +--->|Other processes...| | +------------------+ | | +------------------------+ +--->|ping(im.just4coding.com)| | +------------------------+ | | +------------------------+ +--->|ping(localhost) | | +------------------------+ | | +------------------------+ +--->|ping(Other host) | +------------------------+
|
每个worker是一个gen_server进程,进程调用init函数进行初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| init([Host, Opts]) -> SendPings = gen_mod:get_opt(send_pings, Opts, ?DEFAULT_SEND_PINGS), PingInterval = gen_mod:get_opt(ping_interval, Opts, ?DEFAULT_PING_INTERVAL), TimeoutAction = gen_mod:get_opt(timeout_action, Opts, none), IQDisc = gen_mod:get_opt(iqdisc, Opts, no_queue), mod_disco:register_feature(Host, ?NS_PING), gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PING, ?MODULE, iq_ping, IQDisc), gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_PING, ?MODULE, iq_ping, IQDisc), case SendPings of true -> ejabberd_hooks:add(sm_register_connection_hook, Host, ?MODULE, user_online, 100), ejabberd_hooks:add(sm_remove_connection_hook, Host, ?MODULE, user_offline, 100), ejabberd_hooks:add(user_send_packet, Host, ?MODULE, user_send, 100); _ -> ok end, {ok, #state{host = Host, send_pings = SendPings, ping_interval = PingInterval, timeout_action = TimeoutAction, timers = ?DICT:new()}}.
|
ServiceDiscovery请求:
1 2 3 4 5 6
| <iq type='get' from='juliet@capulet.lit/balcony' to='capulet.lit' id='disco1'> <query xmlns='http://jabber.org/protocol/disco#info'/> </iq>
|
ServiceDiscovery响应:
1 2 3 4 5 6 7 8 9 10
| <iq type='result' from='capulet.lit' to='juliet@capulet.lit/balcony' id='disco1'> <query xmlns='http://jabber.org/protocol/disco#info'> ... <feature var='urn:xmpp:ping'/> ... </query> </iq>
|
ServiceDiscovery相关信息参考XEP-0030。
接下来,注册IQ处理器,令XMLNS为”urn:xmpp:ping”的IQ请求由函数iq_ping处理。iq_ping简单地返回相应响应或者错误。
1 2 3 4 5 6 7
| iq_ping(_From, _To, #iq{type = Type, sub_el = SubEl} = IQ) -> case {Type, SubEl} of {get, {xmlelement, "ping", _, _}} -> IQ#iq{type = result, sub_el = []}; _ -> IQ#iq{type = error, sub_el = [SubEl, ?ERR_FEATURE_NOT_IMPLEMENTED]} end.
|
如果send_pings配置为true, mod_ping在ejabberd中注册n以下3个hook函数:
sm_register_connection_hook
: 它在客户端完成登录验证,建立session信息时调用。
1 2 3 4 5 6 7 8
| open_session(SID, User, Server, Resource, Info) -> set_session(SID, User, Server, Resource, undefined, Info), mnesia:dirty_update_counter(session_counter, jlib:nameprep(Server), 1), check_for_sessions_to_replace(User, Server, Resource), JID = jlib:make_jid(User, Server, Resource), ejabberd_hooks:run(sm_register_connection_hook, JID#jid.lserver, [SID, JID, Info]).
|
sm_remove_connection_hook
: 在用户退出,关闭session时调用。1 2 3 4 5 6 7 8 9 10 11 12 13 14
| close_session(SID, User, Server, Resource) -> Info = case mnesia:dirty_read({session, SID}) of [] -> []; [#session{info=I}] -> I end, F = fun() -> mnesia:delete({session, SID}), mnesia:dirty_update_counter(session_counter, jlib:nameprep(Server), -1) end, mnesia:sync_dirty(F), JID = jlib:make_jid(User, Server, Resource), ejabberd_hooks:run(sm_remove_connection_hook, JID#jid.lserver, [SID, JID, Info]).
|
user_send_packet
: 在C2S进程收到客户端发送的消息时被调用。
sm_register_connection_hook
的hook函数user_online
和user_send_packet
的hook函数user_send
都会调用start_ping函数。
1 2 3
| start_ping(Host, JID) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:cast(Proc, {start_ping, JID}).
|
start_ping向该HOST的worker进程发送一个{start_ping, JID}消息。worker进程调用handle_cast进行处理:
1 2 3
| handle_cast({start_ping, JID}, State) -> Timers = add_timer(JID, State#state.ping_interval, State#state.timers), {noreply, State#state{timers = Timers}};
|
handle_cast调用add_timer为该客户端创建一个timer。
1 2 3 4 5 6 7 8 9 10 11
| add_timer(JID, Interval, Timers) -> LJID = jlib:jid_tolower(JID), NewTimers = case ?DICT:find(LJID, Timers) of {ok, OldTRef} -> cancel_timer(OldTRef), ?DICT:erase(LJID, Timers); _ -> Timers end, TRef = erlang:start_timer(Interval * 1000, self(), {ping, JID}), ?DICT:store(LJID, TRef, NewTimers).
|
由于用户每次发送消息时都会调用add_timer函数,因而add_timer中需要检查之前是否已经存在timer。如果存在,则先取消旧的timer, 再创建新的Timer。
当timer超时后,即客户若干时间内没有活动,进程收到{ping, JID}消息,此时ejabberd应向客户端发送PING消息。进程调用handle_info处理。
1 2 3 4 5 6 7 8 9 10 11
| handle_info({timeout, _TRef, {ping, JID}}, State) -> IQ = #iq{type = get, sub_el = [{xmlelement, "ping", [{"xmlns", ?NS_PING}], []}]}, Pid = self(), F = fun(Response) -> gen_server:cast(Pid, {iq_pong, JID, Response}) end, From = jlib:make_jid("", State#state.host, ""), ejabberd_local:route_iq(From, JID, IQ, F), Timers = add_timer(JID, State#state.ping_interval, State#state.timers), {noreply, State#state{timers = Timers}};
|
handle_info创建IQ消息后,设置回调函数F,调用ejabberd_local:route_iq/4消息IQ消息发送给客户端。当收到该IQ消息的响应或者超过32秒依然没有收到客户端的响应,回调函数F将会被调用。如果响应超时,Response为timeout,F将向进程发送{iq_pong, JID, timeout}消息。进程调用handle_cast处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| handle_cast({iq_pong, JID, timeout}, State) -> Timers = del_timer(JID, State#state.timers), ejabberd_hooks:run(user_ping_timeout, State#state.host, [JID]), case State#state.timeout_action of kill -> #jid{user = User, server = Server, resource = Resource} = JID, case ejabberd_sm:get_session_pid(User, Server, Resource) of Pid when is_pid(Pid) -> ejabberd_c2s:stop(Pid); _ -> ok end; _ -> ok end, {noreply, State#state{timers = Timers}};
|
如果timeout_action设置为kill, 则调用ejabberd_c2s:stop关闭相应的客户端连接。
因为在sm_remove_connection_hook
注册了hook函数user_offline
, 当用户退出时会调用stop_ping函数,向worker进程发送{stop_ping, JID}消息。
1 2 3
| stop_ping(Host, JID) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:cast(Proc, {stop_ping, JID}).
|
worker进程调用del_timer函数将该客户端的timer删除。
1 2 3
| handle_cast({stop_ping, JID}, State) -> Timers = del_timer(JID, State#state.timers), {noreply, State#state{timers = Timers}};
|
1 2 3 4 5 6 7 8 9
| del_timer(JID, Timers) -> LJID = jlib:jid_tolower(JID), case ?DICT:find(LJID, Timers) of {ok, TRef} -> cancel_timer(TRef), ?DICT:erase(LJID, Timers); _ -> Timers end.
|
模块及进程停止的逻辑与模块和进程初始化的逻辑相反,本文略过。