ejabberd的程序入口为ejabberd_app.erl中的start/2,简化后逻辑如下:
1 2 3 4 5 6 7 8 9 start (normal, _Args) -> ... Sup = ejabberd_sup:start_link(), ... ejabberd_listener:start_listeners(), ?INFO_MSG("ejabberd ~s is started in the node ~p" , [?VERSION, node()]), Sup; start (_, _) -> {error, badarg}.
ejabberd_sup是一个supervisor,调用start_link时,它会生成各功能组件进程,如下图所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 ------------------------------------------------ +------------+ |ejabberd_sup| +-----+------+ | +---------------------+------------------------+ | | | +-----+ +-------+ | | |hooks| |c2s_sup| | | +-----+ +-------+ | | | | +-----------+ +---------+ | | |node_groups| |s2sin_sup| | | +-----------+ +---------+ | | | | +--------------+ +----------+ | | |system_monitor| |s2sout_sup| | | +--------------+ +----------+ | | | | +------+ +-----------+ | | |router| |service_sup| | | +------+ +-----------+ | | | | +--+ +--------+ | | |sm| |http_sup| | | +--+ +--------+ | | | | +---+ +-------------+ | | |s2s| |http_poll_sup| | | +---+ +-------------+ | | | | +-----+ +-------------------+ | | |local| |frontend_socket_sup| | | +-----+ +-------------------+ | | | | +-------+ +------+ | | |captcha| |iq_sup| | | +-------+ +------+ | | | | +--------+ +--------+ | | |listener| |stun_sup| | | +--------+ +--------+ | | | | +------------+ +-------------+ | | |receiver_sup| |cache_tab_sup| | | +------------+ +-------------+ | | | +----------------------------------------------+
部分关键组件功能:
hooks: 执行各模块注册的HOOK函数
router: 分发用户发送的消息
sm: session manager, 处理用户到用户的消息分发
local: 处理发送给Server本身的消息。
listener: supervisor进程,创建子进程监听网络端口
receiver_sup: supervisor进程, 它为每一个TCP连接创建一个进程来接收该TCP连接的数据
c2s_sup: supervisor进程, 它为每一个TCP连接创建一个进程来处理协议状态,并负责向TCP连接发送数据
ejabberd_listener:start_listerners/0会从配置文件(ejabberd.cfg)中获取“listen”选项。 listen配置的一般形式为:
1 2 3 4 5 6 7 8 9 10 {listen, [ {5222, ejabberd_c2s, [ {access, c2s}, {shaper, c2s_shaper}, {max_stanza_size, 65536} ]}, ... ] }
每一个端口需要指定一个处理模块。
start_listeners/0遍历listen配置中指定的所有端口,为每个端口调用start_listener/3。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 start_listeners () -> case ejabberd_config:get_local_option(listen) of undefined -> ignore; Ls -> Ls2 = lists:map( fun ({Port, Module, Opts}) -> case start_listener(Port, Module, Opts) of {ok, _Pid} = R -> R; {error, Error} -> throw(Error) end end , Ls), report_duplicated_portips(Ls), {ok, {{one_for_one, 10 , 1 }, Ls2}} end .
start_listerner最终会调用start_listener_sup/3。它通过supervisor:start_child令ejabberd_listeners进程创建子进程执行ejabberd_listener:start/3。
1 2 3 4 5 6 7 8 start_listener_sup (Port, Module, Opts) -> ChildSpec = {Port, {?MODULE, start, [Port, Module, Opts]}, transient, brutal_kill, worker, [?MODULE]}, supervisor:start_child(ejabberd_listeners, ChildSpec).
ejabberd_listener:start/3依据该端口处理模块的socket_type/0函数的返回值进行相应处理。如果返回值为independent时,则表示该处理模块自行处理监听端口a。否则,调用start_dependent/3。
1 2 3 4 5 6 7 start (Port, Module, Opts) -> ModuleRaw = strip_frontend(Module), case ModuleRaw:socket_type() of independent -> ModuleRaw:start_listener(Port, Opts); _ -> start_dependent(Port, Module, Opts) end .
start_dependent/3中会创建子进程执行init/3, init/3函数根据配置参数调用init_tcp/6或init_udp/6。 init_tcp/6开始监听相应端口,然后调用accept/0等待TCP连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 init_tcp (PortIP, Module, Opts, SockOpts, Port, IPS) -> ... Res = gen_tcp:listen(Port, [binary, {packet, 0 }, {active, false}, {reuseaddr, true}, {nodelay, true}, {send_timeout, ?TCP_SEND_TIMEOUT}, {keepalive, true} | SockOpts2]), case Res of {ok, ListenSocket} -> proc_lib:init_ack({ok, self()}), accept(ListenSocket, Module, Opts); {error, Reason} -> socket_error(Reason, PortIP, Module, SockOpts, Port, IPS) end .
如果监听UDP端口,则init/3调用init_udp/6。init/6打开一个UDP端口,调用udp_recv/3等待接收UDP数据。
1 2 3 4 5 6 7 8 9 10 11 12 init_udp (PortIP, Module, Opts, SockOpts, Port, IPS) -> case gen_udp:open(Port, [binary, {active, false}, {reuseaddr, true} | SockOpts]) of {ok, Socket} -> proc_lib:init_ack({ok, self()}), udp_recv(Socket, Module, Opts); {error, Reason} -> socket_error(Reason, PortIP, Module, SockOpts, Port, IPS) end .
至此ejabberd启动完成。
当UDP数据到来后,udp_recv/3调用该端口处理模块的udp_recv/5处理,接着递归调用udp_recv继续等待接收UDP数据。处理模块的udp_recv/5函数中需要处理返回UDP响应等逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 udp_recv (Socket, Module, Opts) -> case gen_udp:recv(Socket, 0 ) of {ok, {Addr, Port, Packet}} -> case catch Module:udp_recv(Socket, Addr, Port, Packet, Opts) of {'EXIT', Reason} -> ?ERROR_MSG("failed to process UDP packet:~n" "** Source: {~p, ~p}~n" "** Reason: ~p~n** Packet: ~p" , [Addr, Port, Reason, Packet]); _ -> ok end , udp_recv(Socket, Module, Opts); {error, Reason} -> ?ERROR_MSG("unexpected UDP error: ~s" , [format_error(Reason)]), throw({error, Reason}) end .
当有TCP连接成功后,gen_tcp:accept/1返回,accept/3根据配置中处理模块是否指定了frontend, 选择调用ejabberd_frontend_socket:start/4或ejabberd_socket:start/4, 然后递归调用accept再次等待TCP连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 accept (ListenSocket, Module, Opts) -> case gen_tcp:accept(ListenSocket) of {ok, Socket} -> case {inet:sockname(Socket), inet:peername(Socket)} of {{ok, Addr}, {ok, PAddr}} -> ?INFO_MSG("(~w) Accepted connection ~w -> ~w" , [Socket, PAddr, Addr]); _ -> ok end , CallMod = case is_frontend(Module) of true -> ejabberd_frontend_socket; false -> ejabberd_socket end , CallMod:start(strip_frontend(Module), gen_tcp, Socket, Opts), accept(ListenSocket, Module, Opts); {error, Reason} -> ?INFO_MSG("(~w) Failed TCP accept: ~w" , [ListenSocket, Reason]), accept(ListenSocket, Module, Opts) end .
ejabberd_socket:start/4函数根据处理模块的socket_type/0的返回值进行不同处理。5222端口的处理模块是ejabberd_c2s,该模块socket_type/0返回xml_stream。
ejabberd_socket:start/4对于xml_stream的处理的简化逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 RecPid = ejabberd_receiver:start( Socket, SockMod, none, MaxStanzaSize), {ReceiverMod, Receiver, RecRef} = {ejabberd_receiver, RecPid, RecPid} SocketData = #socket_state{sockmod = SockMod, socket = Socket, receiver = RecRef}, case Module:start({?MODULE, SocketData}, Opts) of {ok, Pid} -> case SockMod:controlling_process(Socket, Receiver) of ok -> ok; {error, _Reason} -> SockMod:close(Socket) end , ReceiverMod:become_controller(Receiver, Pid); {error, _Reason} -> SockMod:close(Socket) end ;
首先,它调用ejabberd_receiver:start/4, 它创建一个receiver进程,接着调用处理模块的start/2创建一个处理模块子进程。然后将新的receiver进程设为该TCP Socket的控制进程,以后从该Socket收到数据将以消息形式发送给该receiver进程。最后调用ejabberd_receiver:become_controller/2, 向该receiver进程发送become_controller消息。receiver进程处理该消息,生成一个XML解析状态并将处理进程Pid保存在状态中。
1 2 3 4 5 6 7 handle_call ({become_controller, C2SPid}, _From, State) -> XMLStreamState = xml_stream:new(C2SPid, State#state.max_stanza_size), NewState = State#state{c2s_pid = C2SPid, xml_stream_state = XMLStreamState}, activate_socket(NewState), Reply = ok, {reply, Reply, NewState, ?HIBERNATE_TIMEOUT};
当该Socket有数据可读时,receiver进程将收到TCP消息,receiver进程调用process_data/1函数来处理收到的数据。它调用xml_stream:parse进行解析,当解析出XML元素后,它会通过gen_fsm:send_event向该TCP的处理进程发送消息。处理进程根据消息进行协议状态的转换。
当XMPP协商完成后,处理进程状态为session_established。此时收到XMPP消息,处理进程解析出From和To属性, 调用ejabberd_router:route/3分发消息。ejabberd_router:route/3调用do_route进行分发。它查询route表中是否已经注册了JID所在域名。ejabberd_local进程启动时会在route表中注册配置中所添加的域名。如果已经注册,该消息则应该由当前服务器处理,否则路由至其他Server。
处理本地域名时,do_route首先获取ejabbred_local进程的Pid,如果只有一个进程,并且该进程位于当前节点,则直接调用ejabberd_local:route/3进行处理,否则发送route消息至相应的Pid。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 LDstDomain = To#jid.lserver, case mnesia:dirty_read(route, LDstDomain) of [] -> ejabberd_s2s:route(From, To, Packet); [R] -> Pid = R#route.pid, if node(Pid) == node() -> case R#route.local_hint of {apply, Module, Function} -> Module:Function(From, To, Packet); _ -> Pid ! {route, From, To, Packet} end ; is_pid(Pid) -> Pid ! {route, From, To, Packet}; true -> drop end ; ... end
这两种方式都会调用ejabberd_local:do_route来处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 do_route (From, To, Packet) -> ?DEBUG("local route~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n" , [From, To, Packet, 8 ]), if To#jid.luser /= "" -> ejabberd_sm:route(From, To, Packet); To#jid.lresource == "" -> {xmlelement, Name, _Attrs, _Els} = Packet, case Name of "iq" -> process_iq(From, To, Packet); "message" -> ok; "presence" -> ok; _ -> ok end ; true -> {xmlelement, _Name, Attrs, _Els} = Packet, case xml:get_attr_s("type" , Attrs) of "error" -> ok; "result" -> ok; _ -> ejabberd_hooks:run(local_send_to_resource_hook, To#jid.lserver, [From, To, Packet]) end end . end
如果”To”属性的JID指定了user, 则该消息应该分发至用户,调用ejabberd_sm:route/3进行分发。ejabberd根据JID的Resource是否为空进行了不同处理。JID的Resource为空的情况本文略过。JID的Resource不为空时,ejabberd_sm:route/3的处理逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 USR = {LUser, LServer, LResource}, case mnesia:dirty_index_read(session, USR, #session.usr) of [] -> case Name of "message" -> route_message(From, To, Packet); "iq" -> case xml:get_attr_s("type" , Attrs) of "error" -> ok; "result" -> ok; _ -> Err = jlib:make_error_reply( Packet, ?ERR_SERVICE_UNAVAILABLE), ejabberd_router:route(To, From, Err) end ; _ -> ?DEBUG("packet droped~n" , []) end ; Ss -> Session = lists:max(Ss), Pid = element(2 , Session#session.sid), ?DEBUG("sending to process ~p~n" , [Pid]), Pid ! {route, From, To, Packet} end
它从session表中读取出该用户的信息,这些信息是用户连接上由该TCP的处理进程添加到session表中的。从中获得为该用户TCP连接的处理进程,向该进程发送route消息。处理进程处理该route消息,向该用户Socket发送消息。这便完成了一个用户到另一个用户的消息传递。
本文中ejabberd代码版本为2.1.3。