Keep learning, keep living...

0%

ejabberd消息处理流程分析

ejabberd的程序入口为ejabberd_app.erl中的start/2,简化后逻辑如下:

1
2
3
4
5
6
7
8
9
start(normal, _Args) ->
...
Sup = ejabberd_sup:start_link(),
...
ejabberd_listener:start_listeners(),
?INFO_MSG("ejabberd ~s is started in the node ~p", [?VERSION, node()]),
Sup;
start(_, _) ->
{error, badarg}.

ejabberd_sup是一个supervisor,调用start_link时,它会生成各功能组件进程,如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
------------------------------------------------

+------------+
|ejabberd_sup|
+-----+------+
|
+---------------------+------------------------+
| |
| +-----+ +-------+ |
| |hooks| |c2s_sup| |
| +-----+ +-------+ |
| |
| +-----------+ +---------+ |
| |node_groups| |s2sin_sup| |
| +-----------+ +---------+ |
| |
| +--------------+ +----------+ |
| |system_monitor| |s2sout_sup| |
| +--------------+ +----------+ |
| |
| +------+ +-----------+ |
| |router| |service_sup| |
| +------+ +-----------+ |
| |
| +--+ +--------+ |
| |sm| |http_sup| |
| +--+ +--------+ |
| |
| +---+ +-------------+ |
| |s2s| |http_poll_sup| |
| +---+ +-------------+ |
| |
| +-----+ +-------------------+ |
| |local| |frontend_socket_sup| |
| +-----+ +-------------------+ |
| |
| +-------+ +------+ |
| |captcha| |iq_sup| |
| +-------+ +------+ |
| |
| +--------+ +--------+ |
| |listener| |stun_sup| |
| +--------+ +--------+ |
| |
| +------------+ +-------------+ |
| |receiver_sup| |cache_tab_sup| |
| +------------+ +-------------+ |
| |
+----------------------------------------------+

部分关键组件功能:

  • hooks: 执行各模块注册的HOOK函数
  • router: 分发用户发送的消息
  • sm: session manager, 处理用户到用户的消息分发
  • local: 处理发送给Server本身的消息。
  • listener: supervisor进程,创建子进程监听网络端口
  • receiver_sup: supervisor进程, 它为每一个TCP连接创建一个进程来接收该TCP连接的数据
  • c2s_sup: supervisor进程, 它为每一个TCP连接创建一个进程来处理协议状态,并负责向TCP连接发送数据

ejabberd_listener:start_listerners/0会从配置文件(ejabberd.cfg)中获取“listen”选项。
listen配置的一般形式为:

1
2
3
4
5
6
7
8
9
10
{listen,
[
{5222, ejabberd_c2s, [
{access, c2s},
{shaper, c2s_shaper},
{max_stanza_size, 65536}
]},
...
]
}

每一个端口需要指定一个处理模块。

start_listeners/0遍历listen配置中指定的所有端口,为每个端口调用start_listener/3。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
start_listeners() ->
case ejabberd_config:get_local_option(listen) of
undefined ->
ignore;
Ls ->
Ls2 = lists:map(
fun({Port, Module, Opts}) ->
case start_listener(Port, Module, Opts) of
{ok, _Pid} = R -> R;
{error, Error} ->
throw(Error)
end
end, Ls),
report_duplicated_portips(Ls),
{ok, {{one_for_one, 10, 1}, Ls2}}
end.

start_listerner最终会调用start_listener_sup/3。它通过supervisor:start_child令ejabberd_listeners进程创建子进程执行ejabberd_listener:start/3。

1
2
3
4
5
6
7
8
start_listener_sup(Port, Module, Opts) ->
ChildSpec = {Port,
{?MODULE, start, [Port, Module, Opts]},
transient,
brutal_kill,
worker,
[?MODULE]},
supervisor:start_child(ejabberd_listeners, ChildSpec).

ejabberd_listener:start/3依据该端口处理模块的socket_type/0函数的返回值进行相应处理。如果返回值为independent时,则表示该处理模块自行处理监听端口a。否则,调用start_dependent/3。

1
2
3
4
5
6
7
start(Port, Module, Opts) ->
%% Check if the module is an ejabberd listener or an independent listener
ModuleRaw = strip_frontend(Module),
case ModuleRaw:socket_type() of
independent -> ModuleRaw:start_listener(Port, Opts);
_ -> start_dependent(Port, Module, Opts)
end.

start_dependent/3中会创建子进程执行init/3, init/3函数根据配置参数调用init_tcp/6或init_udp/6。
init_tcp/6开始监听相应端口,然后调用accept/0等待TCP连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
init_tcp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
...
Res = gen_tcp:listen(Port, [binary,
{packet, 0},
{active, false},
{reuseaddr, true},
{nodelay, true},
{send_timeout, ?TCP_SEND_TIMEOUT},
{keepalive, true} |
SockOpts2]),
case Res of
{ok, ListenSocket} ->
%% Inform my parent that this port was opened succesfully
proc_lib:init_ack({ok, self()}),
%% And now start accepting connection attempts
accept(ListenSocket, Module, Opts);
{error, Reason} ->
socket_error(Reason, PortIP, Module, SockOpts, Port, IPS)
end.

如果监听UDP端口,则init/3调用init_udp/6。init/6打开一个UDP端口,调用udp_recv/3等待接收UDP数据。

1
2
3
4
5
6
7
8
9
10
11
12
init_udp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
case gen_udp:open(Port, [binary,
{active, false},
{reuseaddr, true} |
SockOpts]) of
{ok, Socket} ->
%% Inform my parent that this port was opened succesfully
proc_lib:init_ack({ok, self()}),
udp_recv(Socket, Module, Opts);
{error, Reason} ->
socket_error(Reason, PortIP, Module, SockOpts, Port, IPS)
end.

至此ejabberd启动完成。

当UDP数据到来后,udp_recv/3调用该端口处理模块的udp_recv/5处理,接着递归调用udp_recv继续等待接收UDP数据。处理模块的udp_recv/5函数中需要处理返回UDP响应等逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
udp_recv(Socket, Module, Opts) ->
case gen_udp:recv(Socket, 0) of
{ok, {Addr, Port, Packet}} ->
case catch Module:udp_recv(Socket, Addr, Port, Packet, Opts) of
{'EXIT', Reason} ->
?ERROR_MSG("failed to process UDP packet:~n"
"** Source: {~p, ~p}~n"
"** Reason: ~p~n** Packet: ~p",
[Addr, Port, Reason, Packet]);
_ ->
ok
end,
udp_recv(Socket, Module, Opts);
{error, Reason} ->
?ERROR_MSG("unexpected UDP error: ~s", [format_error(Reason)]),
throw({error, Reason})
end.

当有TCP连接成功后,gen_tcp:accept/1返回,accept/3根据配置中处理模块是否指定了frontend, 选择调用ejabberd_frontend_socket:start/4或ejabberd_socket:start/4, 然后递归调用accept再次等待TCP连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
accept(ListenSocket, Module, Opts) ->
case gen_tcp:accept(ListenSocket) of
{ok, Socket} ->
case {inet:sockname(Socket), inet:peername(Socket)} of
{{ok, Addr}, {ok, PAddr}} ->
?INFO_MSG("(~w) Accepted connection ~w -> ~w",
[Socket, PAddr, Addr]);
_ ->
ok
end,
CallMod = case is_frontend(Module) of
true -> ejabberd_frontend_socket;
false -> ejabberd_socket
end,
CallMod:start(strip_frontend(Module), gen_tcp, Socket, Opts),
accept(ListenSocket, Module, Opts);
{error, Reason} ->
?INFO_MSG("(~w) Failed TCP accept: ~w",
[ListenSocket, Reason]),
accept(ListenSocket, Module, Opts)
end.

ejabberd_socket:start/4函数根据处理模块的socket_type/0的返回值进行不同处理。5222端口的处理模块是ejabberd_c2s,该模块socket_type/0返回xml_stream。

ejabberd_socket:start/4对于xml_stream的处理的简化逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
RecPid = ejabberd_receiver:start(
Socket, SockMod, none, MaxStanzaSize),

{ReceiverMod, Receiver, RecRef} =
{ejabberd_receiver, RecPid, RecPid}

SocketData = #socket_state{sockmod = SockMod,
socket = Socket,
receiver = RecRef},

case Module:start({?MODULE, SocketData}, Opts) of
{ok, Pid} ->
case SockMod:controlling_process(Socket, Receiver) of
ok ->
ok;
{error, _Reason} ->
SockMod:close(Socket)
end,
ReceiverMod:become_controller(Receiver, Pid);
{error, _Reason} ->
SockMod:close(Socket)
end;

首先,它调用ejabberd_receiver:start/4, 它创建一个receiver进程,接着调用处理模块的start/2创建一个处理模块子进程。然后将新的receiver进程设为该TCP Socket的控制进程,以后从该Socket收到数据将以消息形式发送给该receiver进程。最后调用ejabberd_receiver:become_controller/2, 向该receiver进程发送become_controller消息。receiver进程处理该消息,生成一个XML解析状态并将处理进程Pid保存在状态中。

1
2
3
4
5
6
7
handle_call({become_controller, C2SPid}, _From, State) ->
XMLStreamState = xml_stream:new(C2SPid, State#state.max_stanza_size),
NewState = State#state{c2s_pid = C2SPid,
xml_stream_state = XMLStreamState},
activate_socket(NewState),
Reply = ok,
{reply, Reply, NewState, ?HIBERNATE_TIMEOUT};

当该Socket有数据可读时,receiver进程将收到TCP消息,receiver进程调用process_data/1函数来处理收到的数据。它调用xml_stream:parse进行解析,当解析出XML元素后,它会通过gen_fsm:send_event向该TCP的处理进程发送消息。处理进程根据消息进行协议状态的转换。

当XMPP协商完成后,处理进程状态为session_established。此时收到XMPP消息,处理进程解析出From和To属性, 调用ejabberd_router:route/3分发消息。ejabberd_router:route/3调用do_route进行分发。它查询route表中是否已经注册了JID所在域名。ejabberd_local进程启动时会在route表中注册配置中所添加的域名。如果已经注册,该消息则应该由当前服务器处理,否则路由至其他Server。

处理本地域名时,do_route首先获取ejabbred_local进程的Pid,如果只有一个进程,并且该进程位于当前节点,则直接调用ejabberd_local:route/3进行处理,否则发送route消息至相应的Pid。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
LDstDomain = To#jid.lserver,
case mnesia:dirty_read(route, LDstDomain) of
[] ->
ejabberd_s2s:route(From, To, Packet);
[R] ->
Pid = R#route.pid,
if
node(Pid) == node() ->
case R#route.local_hint of
{apply, Module, Function} ->
Module:Function(From, To, Packet);
_ ->
Pid ! {route, From, To, Packet}
end;
is_pid(Pid) ->
Pid ! {route, From, To, Packet};
true ->
drop
end;
...
end

这两种方式都会调用ejabberd_local:do_route来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
do_route(From, To, Packet) ->
?DEBUG("local route~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n",
[From, To, Packet, 8]),
if
To#jid.luser /= "" ->
ejabberd_sm:route(From, To, Packet);
To#jid.lresource == "" ->
{xmlelement, Name, _Attrs, _Els} = Packet,
case Name of
"iq" ->
process_iq(From, To, Packet);
"message" ->
ok;
"presence" ->
ok;
_ ->
ok
end;
true ->
{xmlelement, _Name, Attrs, _Els} = Packet,
case xml:get_attr_s("type", Attrs) of
"error" -> ok;
"result" -> ok;
_ ->
ejabberd_hooks:run(local_send_to_resource_hook,
To#jid.lserver,
[From, To, Packet])
end
end.
end

如果”To”属性的JID指定了user, 则该消息应该分发至用户,调用ejabberd_sm:route/3进行分发。ejabberd根据JID的Resource是否为空进行了不同处理。JID的Resource为空的情况本文略过。JID的Resource不为空时,ejabberd_sm:route/3的处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
USR = {LUser, LServer, LResource},
case mnesia:dirty_index_read(session, USR, #session.usr) of
[] ->
case Name of
"message" ->
route_message(From, To, Packet);
"iq" ->
case xml:get_attr_s("type", Attrs) of
"error" -> ok;
"result" -> ok;
_ ->
Err =
jlib:make_error_reply(
Packet, ?ERR_SERVICE_UNAVAILABLE),
ejabberd_router:route(To, From, Err)
end;
_ ->
?DEBUG("packet droped~n", [])
end;
Ss ->
Session = lists:max(Ss),
Pid = element(2, Session#session.sid),
?DEBUG("sending to process ~p~n", [Pid]),
Pid ! {route, From, To, Packet}
end

它从session表中读取出该用户的信息,这些信息是用户连接上由该TCP的处理进程添加到session表中的。从中获得为该用户TCP连接的处理进程,向该进程发送route消息。处理进程处理该route消息,向该用户Socket发送消息。这便完成了一个用户到另一个用户的消息传递。

本文中ejabberd代码版本为2.1.3。