Keep learning, keep living...

0%

ejabberd_config模块负责加载ejabberd配置文件,存储相应的配置选项,并提供添加和获取配置选项的API。

比如, ejabberd_app:start_modules函数会使用ejabber_config:get_local_option获取配置文件中的modules选项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
%% Start all the modules in all the hosts
start_modules() ->
lists:foreach(
fun(Host) ->
case ejabberd_config:get_local_option({modules, Host}) of
undefined ->
ok;
Modules ->
lists:foreach(
fun({Module, Args}) ->
gen_mod:start_module(Host, Module, Args)
end, Modules)
end
end, ?MYHOSTS).

下面分析ejbberd_config模块实现。

ejabberd启动时,ejabberd_app:start/0会调用ejabberd_config:start/0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
start() ->
mnesia:create_table(config,
[{disc_copies, [node()]},
{attributes, record_info(fields, config)}]),
mnesia:add_table_copy(config, node(), ram_copies),
mnesia:create_table(local_config,
[{disc_copies, [node()]},
{local_content, true},
{attributes, record_info(fields, local_config)}]),
mnesia:add_table_copy(local_config, node(), ram_copies),
Config = get_ejabberd_config_path(),
load_file(Config),
%% This start time is used by mod_last:
add_local_option(node_start, now()),
ok.

start函数首先创建config和local_config两个mnesia表,接着调用get_ejabberd_config_path获取配置文件路径。

1
2
3
4
5
6
7
8
9
10
11
get_ejabberd_config_path() ->
case application:get_env(config) of
{ok, Path} -> Path;
undefined ->
case os:getenv("EJABBERD_CONFIG_PATH") of
false ->
?CONFIG_PATH;
Path ->
Path
end
end.

get_ejabberd_config_path首先使用application:get_env从ejabberd.app的env或erlang命令行中的config选项中获取值:
如:

1
{env, [{config, "/etc/ejabberd/ejabberd.cfg"]}

或者:

1
erl -config "/path/to/ejabberd.cfg"

如果没有设置这两个选项,则尝试从系统环境变量”EJABBERD_CONFIG_PATH”读取文件路径。ejabberdctl会设置该环境变量。若也没有设置该环境变量,get_ejabberd_config_path则返回?CONFIG_PATH, 这个宏被定义为”ejabberd.cfg”。

1
-define(CONFIG_PATH, "ejabberd.cfg").

接下来, start函数调用load_file来加载配置文件:

1
2
3
4
5
6
load_file(File) ->
Terms = get_plain_terms_file(File),
State = lists:foldl(fun search_hosts/2, #state{}, Terms),
Terms_macros = replace_macros(Terms),
Res = lists:foldl(fun process_term/2, State, Terms_macros),
set_opts(Res).

load_file首先调用get_plain_terms_file来获取所有配置选项的列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
get_plain_terms_file(File1) ->
File = get_absolute_path(File1),
case file:consult(File) of
{ok, Terms} ->
include_config_files(Terms);
{error, {LineNumber, erl_parse, _ParseMessage} = Reason} ->
ExitText = describe_config_problem(File, Reason, LineNumber),
?ERROR_MSG(ExitText, []),
exit_or_halt(ExitText);
{error, Reason} ->
ExitText = describe_config_problem(File, Reason),
?ERROR_MSG(ExitText, []),
exit_or_halt(ExitText)
end.

我们来看get_plain_terms_file实现。首先,调用get_absolute_path,期望得到配置文件的绝对路径。不过,get_absolute_path实现上存在BUG:

1
2
3
4
5
6
7
8
9
get_absolute_path(File) ->
case filename:pathtype(File) of
absolute ->
File;
relative ->
Config_path = get_ejabberd_config_path(),
Config_dir = filename:dirname(Config_path),
filename:absname_join(Config_dir, File)
end.

当File为相对路径时,使用filename:absname_join不能得到绝对路径。我提了一个PATCH,使用当前目录来转成绝对路径。官方已接受。
PATCH地址:
https://github.com/processone/ejabberd/commit/62ccf1cf0e13954ee5207bc6288afbc669247d14

接着,get_plain_terms_file调用file:consult读取配置文件中的所有Erlang Terms到列表中,再调用include_config_files函数来处理include_config_file选项。

1
2
3
4
5
6
7
include_config_files([{include_config_file, Filename, Options} | Terms], Res) ->
Included_terms = get_plain_terms_file(Filename),
Disallow = proplists:get_value(disallow, Options, []),
Included_terms2 = delete_disallowed(Disallow, Included_terms),
Allow_only = proplists:get_value(allow_only, Options, all),
Included_terms3 = keep_only_allowed(Allow_only, Included_terms2),
include_config_files(Terms, Res ++ Included_terms3);

include_config_file选项格式为:

1
{include_config_file, [{disallow, foo}, {allow_only, bar}], "/path/to/included_config"}.

include_config_files递归调用get_plain_terms_file获取被引用的配置文件中所有配置,接着检查include_config_file选项中是否有disallow选项。如果有,调用delete_disallowed将disallow指定的配置选项从被引用文件的配置列表中删除。接着检查其中是否存在allow_only选项,如果有,则调用keep_only_allowed只保留下allow_only中指定的配置,将其和外部配置合并,再递归调用include_config_files/2处理剩余的选项,最终返回所有配置文件中所有选项列表。

1
2
include_config_files([], Res) ->
Res;

load_file接着遍历配置列表调用search_host, 最终调用add_option来添加hosts选项。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
add_option(Opt, Val, State) ->
Table = case Opt of
hosts ->
config;
language ->
config;
_ ->
local_config
end,
case Table of
config ->
State#state{opts = [#config{key = Opt, value = Val} |
State#state.opts]};
local_config ->
case Opt of
{{add, OptName}, Host} ->
State#state{opts = compact({OptName, Host}, Val,
State#state.opts, [])};
_ ->
State#state{opts = [#local_config{key = Opt, value = Val} |
State#state.opts]}
end
end.

add_option将指定选项以Key/Value形式添加进状态结构的opts域中。其中,hosts和language使用记录config, 其他选项使用local_config。这与最初创建的MNESIA表相对应。
状态结构如下:

1
2
3
4
5
-record(state, {opts = [],
hosts = [],
override_local = false,
override_global = false,
override_acls = false}).

接下来,load_file调用replace_macros来替换配置中的宏为相应的值。我们来看replace_macros实现。

1
2
3
replace_macros(Terms) ->
{TermsOthers, Macros} = split_terms_macros(Terms),
replace(TermsOthers, Macros).

首先, 调用split_terms_macros将宏选项和其他选项分开。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
split_terms_macros(Terms) ->
lists:foldl(
fun(Term, {TOs, Ms}) ->
case Term of
{define_macro, Key, Value} ->
case is_atom(Key) and is_all_uppercase(Key) of
true ->
{TOs, Ms++[{Key, Value}]};
false ->
exit({macro_not_properly_defined, Term})
end;
Term ->
{TOs ++ [Term], Ms}
end
end,
{[], []},
Terms).

宏定义选项格式为:

1
{define_macro, 'KEY', bar}.

其中key必须为atom类型且必须全部为大写字母,得到的宏选项列表为{key, value}格式的列表。
接着, replace_macros调用replace/2。

1
2
3
4
replace([], _) ->
[];
replace([Term|Terms], Macros) ->
[replace_term(Term, Macros) | replace(Terms, Macros)].

replace通过递归对每个选项调用replace_term/2。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
replace_term(Key, Macros) when is_atom(Key) ->
case is_all_uppercase(Key) of
true ->
case proplists:get_value(Key, Macros) of
undefined -> exit({undefined_macro, Key});
Value -> Value
end;
false ->
Key
end;
replace_term({use_macro, Key, Value}, Macros) ->
proplists:get_value(Key, Macros, Value);
replace_term(Term, Macros) when is_list(Term) ->
replace(Term, Macros);
replace_term(Term, Macros) when is_tuple(Term) ->
List = tuple_to_list(Term),
List2 = replace(List, Macros),
list_to_tuple(List2);
replace_term(Term, _) ->
Term.

replace_term遍历选项中的所有子项,如果在宏列表中查找到相应的值,则替换该子项为找到的值。
另外,如果配置中某子项指定了{use_macro, Key, Value}这种格式的配置,在替换时优先从宏列表中查找相应的值,找不到再使用use_macro指定的Value来替换。

至此,获得了所有配置选项的列表,接下来对每个选项调用process_term。

选项存储主要分为3种类型, process_term分别进行不同处理:

  • 在状态结构中以独立域进行存储,如override_global选项:
    1
    2
    override_global ->
    State#state{override_global = true};
  • 以选项名做为key进行存储, 如max_fsm_queue选项:
    1
    2
    {max_fsm_queue, N} ->
    add_option(max_fsm_queue, N, State);
  • 以选项名和Host一起做为key进行存储,如domain_certfile选项:
    1
    2
    3
    4
    5
    6
    7
    8
    {domain_certfile, Domain, CertFile} ->
    case ejabberd_config:is_file_readable(CertFile) of
    true -> add_option({domain_certfile, Domain}, CertFile, State);
    false ->
    ErrorText = "There is a problem in the configuration: "
    "the specified file is not readable: ",
    throw({error, ErrorText ++ CertFile})
    end;
    其中由host_config指定的绝大多数配置选项都以这种方式存储:
    1
    2
    3
    {host_config, Host, Terms} ->
    lists:foldl(fun(T, S) -> process_host_term(T, Host, S) end,
    State, Terms);
    process_terms对于没有明确列出的选项,给配置的每个HOST都调用process_host_term添加了一个以选项名和HOST一起做为Key的配置选项。
    1
    2
    3
    {_Opt, _Val} ->
    lists:foldl(fun(Host, S) -> process_host_term(Term, Host, S) end,
    State, State#state.hosts)
    这样,如果我们自定义配置选项dummy_config:
    1
    {dummy_config, [foo, bar]}.
    查询时应使用如下提供Host参数的语句:
    1
    ejabberd_config:get_local_option({dummy_config, Host})

此外,acl, accessshaper三个选项比较特殊。
acl存储acl记录结构在状态结构的opts域中, 在set_opts中这些记录会被写入acl表中。
accessshaper选项的KEY中除了选项名,HOST,还包括了规则名称。

至此,load_file将所有选项保存到了状态结构的opts域中,最后调用set_opts进行存储:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
set_opts(State) ->
Opts = lists:reverse(State#state.opts),
F = fun() ->
if
State#state.override_global ->
Ksg = mnesia:all_keys(config),
lists:foreach(fun(K) ->
mnesia:delete({config, K})
end, Ksg);
true ->
ok
end,
if
State#state.override_local ->
Ksl = mnesia:all_keys(local_config),
lists:foreach(fun(K) ->
mnesia:delete({local_config, K})
end, Ksl);
true ->
ok
end,
if
State#state.override_acls ->
Ksa = mnesia:all_keys(acl),
lists:foreach(fun(K) ->
mnesia:delete({acl, K})
end, Ksa);
true ->
ok
end,
lists:foreach(fun(R) ->
mnesia:write(R)
end, Opts)
end,
case mnesia:transaction(F) of
...
end.

如果配置了override_global, override_local, override_acls选项,set_opts首先会分别删除表config, local_config和acl中的所有内容。接着分别将状态结构opts域中的配置写入config, local_config和acl三个表中。

配置加载过程结束。

ejabberd_config模块提供了添加和查询选项的API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
add_global_option(Opt, Val) ->
mnesia:transaction(fun() ->
mnesia:write(#config{key = Opt,
value = Val})
end).

add_local_option(Opt, Val) ->
mnesia:transaction(fun() ->
mnesia:write(#local_config{key = Opt,
value = Val})
end).

get_global_option(Opt) ->
case ets:lookup(config, Opt) of
[#config{value = Val}] ->
Val;
_ ->
undefined
end.

get_local_option(Opt) ->
case ets:lookup(local_config, Opt) of
[#local_config{value = Val}] ->
Val;
_ ->
undefined
end.

add_global_option和add_local_option分别向config和local_config表中添加选项记录。get_global_option和get_local_option直接使用ets:lookup查找相应配置。这是由于mnesia底层由ETS实现,直接使用ets:lookup性能会更高。不过,我个人不太欣赏这种写法。

ejabberd中的hook机制是ejabberd XMPP模块的基础。XMPP模块需要根据需求在相应的hook点上注册自己的处理函数,在处理函数的逻辑中实现需求。ejabberd执行到hook点时,会按注册的顺序号由小到大来执行各模块所注册的处理函数。

下面来分析具体实现。

ejabberd启动时,ejabberd_sup:init/1会通过调用ejabberd_hooks:start_link/0启动名称为ejabberd_hooks的worker进程。

1
2
3
4
5
6
7
8
9
10
11
12
init([]) ->
Hooks =
{ejabberd_hooks,
{ejabberd_hooks, start_link, []},
permanent,
brutal_kill,
worker,
[ejabberd_hooks]},
...
{ok, {{one_for_one, 10, 1},
[Hooks,
...]}}.

ejabberd_hooks进程初始化时执行init/1函数创建了名为hooks的ETS表。这个表用来存储在各注册点和域名下注册的hook函数。

1
2
3
init([]) ->
ets:new(hooks, [named_table]),
{ok, #state{}}.

模块一般使用ejabberd_hooks:add/5注册hook函数。

1
2
add(Hook, Host, Module, Function, Seq) ->
gen_server:call(ejabberd_hooks, {add, Hook, Host, Module, Function, Seq}).

参数:

  • Hook: 注册的hook点位置
  • Host: 注册的域名
  • Module: hook函数所在模块
  • Function: hook函数名
  • Seq: hook函数顺序号,顺序号越小函数越早被执行

add函数发送add消息给ejabberd_hooks进程。ejabberd_hooks进程调用handle_call处理消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
handle_call({add, Hook, Host, Module, Function, Seq}, _From, State) ->
Reply = case ets:lookup(hooks, {Hook, Host}) of
[{_, Ls}] ->
El = {Seq, Module, Function},
case lists:member(El, Ls) of
true ->
ok;
false ->
NewLs = lists:merge(Ls, [El]),
ets:insert(hooks, {{Hook, Host}, NewLs}),
ok
end;
[] ->
NewLs = [{Seq, Module, Function}],
ets:insert(hooks, {{Hook, Host}, NewLs}),
ok
end,
{reply, Reply, State};

handle_call首先从hooks表中查找该hook点和域名下是否已经注册了函数。若不存在,则将顺序号、模块、函数名添加到表中。若已存在,再检查是否为重复添加。如果不是,则将顺序号、模块、函数名和之前的函数信息按顺序号排序后一并添加。

如果hook函数必须在集群内特定节点上执行,可以调用ejabberd_hooks:add_dist注册。它的处理逻辑与add函数类似,只是在hooks表中多存储了node信息,此处略过。

当需要删除hook函数时(一般是模块停止时),调用ejabberd_hooks:delete/5。

1
2
delete(Hook, Host, Module, Function, Seq) ->
gen_server:call(ejabberd_hooks, {delete, Hook, Host, Module, Function, Seq}).

delete函数发送delete消息给ejabberd_hooks进程。进程执行handle_call处理。

1
2
3
4
5
6
7
8
9
10
handle_call({delete, Hook, Host, Module, Function, Seq}, _From, State) ->
Reply = case ets:lookup(hooks, {Hook, Host}) of
[{_, Ls}] ->
NewLs = lists:delete({Seq, Module, Function}, Ls),
ets:insert(hooks, {{Hook, Host}, NewLs}),
ok;
[] ->
ok
end,
{reply, Reply, State};

handle_call从hooks表中获取注册在该hook点和域名上的所有函数,从中删除指定的函数,再将结果保存。
删除注册在特定节点上的函数要使用delete_dist,处理逻辑类似,略过。

ejabberd执行到hook点时会调用ejabberd_hooks:run/3或ejabberd_hooks:run_fold/4来执行注册的HOOK函数。如果这个hook点不关心各hook函数的返回结果,则调用run函数,否则调用run_fold函数。
首先看run函数:

1
2
3
4
5
6
7
run(Hook, Host, Args) ->
case ets:lookup(hooks, {Hook, Host}) of
[{_, Ls}] ->
run1(Ls, Hook, Args);
[] ->
ok
end.

run函数从hooks表中查找注册在该hook点和域名上的所有函数,然后调用run1/3。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
run1([{_Seq, Module, Function} | Ls], Hook, Args) ->
Res = if is_function(Function) ->
catch apply(Function, Args);
true ->
catch apply(Module, Function, Args)
end,
case Res of
{'EXIT', Reason} ->
?ERROR_MSG("~p~nrunning hook: ~p",
[Reason, {Hook, Args}]),
run1(Ls, Hook, Args);
stop ->
ok;
_ ->
run1(Ls, Hook, Args)
end.

run1依次执行注册的hook函数,如果某个hook函数返回stop, 则run1结束返回,之后的hook函数不再被执行。

如果需要执行的是注册在某个节点上的hook函数,则通过rpc:call在该节点上执行函数,其他逻辑类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
run1([{_Seq, Node, Module, Function} | Ls], Hook, Args) ->
case rpc:call(Node, Module, Function, Args, ?TIMEOUT_DISTRIBUTED_HOOK) of
timeout ->
?ERROR_MSG("Timeout on RPC to ~p~nrunning hook: ~p",
[Node, {Hook, Args}]),
run1(Ls, Hook, Args);
{badrpc, Reason} ->
?ERROR_MSG("Bad RPC error to ~p: ~p~nrunning hook: ~p",
[Node, Reason, {Hook, Args}]),
run1(Ls, Hook, Args);
stop ->
?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n"
"Stop.", [self(), node(), Node]), % debug code
ok;
Res ->
?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n"
"The response is:~n~s", [self(), node(), Node, Res]), % debug code
run1(Ls, Hook, Args)
end;

再来看run_fold函数。和run函数相比,run_fold还需要一个参数,表示默认的返回结果。

1
2
3
4
5
6
7
run_fold(Hook, Host, Val, Args) ->
case ets:lookup(hooks, {Hook, Host}) of
[{_, Ls}] ->
run_fold1(Ls, Hook, Val, Args);
[] ->
Val
end.

run_fold首先找到注册在该hook点和域名上的所有函数,如果没有,则返回默认结果。否则调用run_fold1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
run_fold1([{_Seq, Module, Function} | Ls], Hook, Val, Args) ->
Res = if is_function(Function) ->
catch apply(Function, [Val | Args]);
true ->
catch apply(Module, Function, [Val | Args])
end,
case Res of
{'EXIT', Reason} ->
?ERROR_MSG("~p~nrunning hook: ~p",
[Reason, {Hook, Args}]),
run_fold1(Ls, Hook, Val, Args);
stop ->
stopped;
{stop, NewVal} ->
NewVal;
NewVal ->
run_fold1(Ls, Hook, NewVal, Args)
end.

run_fold1会将传入的结果Val(或者来自默认结果,或者来自前一hook函数的返回结果)和参数Args组成新的lists做为参数传给hook函数,依次递归调用。若某hook函数返回stop,结束递归调用,返回stopped。若hook函数返回{stop, NewVal},则直接返回该hook函数的结果NewVal。这两种情况下,其余的hook函数不再被执行。否则,返回结果做为Val参数再次递归调用run_fold1。

注册在特定节点上的函数处理逻辑类似,只是使用rpc:call在相应节点上执行,略过。

具体的hook点和hook函数原型可以参考官方文档:

https://www.process-one.net/en/wiki/ejabberd_events_and_hooks/

这个文档写地不是很详细。不确定的地方需要参考源码。

当这些内置hook点不能满足需求时,可以在ejabberd中合适位置调用ejabberd_hooks:run或ejabberd_hooks:run_fold添加hook点。

如:

1
ejabberd_hooks:run(dummy_hook, []),

另外,需要注意的有:
ejabberd执行某些hook点时,调用不同参数版本的run或run_fold。这种情况Host参数为global。注册这种hook点时,Host参数也应该使用global
如:

1
2
case ejabberd_hooks:run_fold(filter_packet,
{OrigFrom, OrigTo, OrigPacket}, []) of
1
ejabberd_hooks:add(filter_packet, global, ?MODULE, on_filter_packet, 120),

注: 文中代码版本为:ejabberd-2.1.13。

TokyoCabinet(TC)提供了6种不同结构的数据库,包括:

  • (MDB) on-memory hash database
  • (NDB) on-memory tree database
  • (HDB) hash database
  • (BDB) B+ tree database
  • (FDB) fixed-length database
  • (TDB) table database

每种数据库都有各自一套API来进行各种操作。

为了简化使用,TC还提供了一套通用的API来操作以上所有类型数据库,叫做Abstract Database API.

Abstract Database API通过数据库名称来区分各类型数据库:

  • “*” on-memory hash database
  • “+” on-memory tree database
  • “.tch” hash database
  • “.tcb” B+ tree database
  • “.tcf” fixed-length database
  • “.tct” table database

不仅如此,TC更进一步进行了抽象,在Abstract Database中还提供了一种Skeleton Database。
通过实现Skeleton Database指定的API,可以使用自定义的数据库类型。
Skeleton Database API结构体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
typedef struct {                        /* type of structure for a extra database skeleton */
void *opq; /* opaque pointer */
void (*del)(void *); /* destructor */
bool (*open)(void *, const char *);
bool (*close)(void *);
bool (*put)(void *, const void *, int, const void *, int);
bool (*putkeep)(void *, const void *, int, const void *, int);
bool (*putcat)(void *, const void *, int, const void *, int);
bool (*out)(void *, const void *, int);
void *(*get)(void *, const void *, int, int *);
int (*vsiz)(void *, const void *, int);
bool (*iterinit)(void *);
void *(*iternext)(void *, int *);
TCLIST *(*fwmkeys)(void *, const void *, int, int);
int (*addint)(void *, const void *, int, int);
double (*adddouble)(void *, const void *, int, double);
bool (*sync)(void *);
bool (*optimize)(void *, const char *);
bool (*vanish)(void *);
bool (*copy)(void *, const char *);
bool (*tranbegin)(void *);
bool (*trancommit)(void *);
bool (*tranabort)(void *);
const char *(*path)(void *);
uint64_t (*rnum)(void *);
uint64_t (*size)(void *);
TCLIST *(*misc)(void *, const char *, const TCLIST *);
bool (*putproc)(void *, const void *, int, const void *, int, TCPDPROC, void *);
bool (*foreach)(void *, TCITER, void *);
} ADBSKEL;

各成员与其它类型API相应成员意义一致。在开发时,只需实现功能必需的相应函数,忽略其他成员。

使用示例:

1
2
3
4
5
6
7
8
9
10
ADBSKEL skel;
memset(0, &skel, sizeof(skel));
skel.opq = mydbnew();
skel.del = mydbdel;
skel.open = mydbopen;
skel.close = mydbclose;
...
TCADB *adb = tcadbnew();
tcadbsetskel(adb, &skel);
tcadbopen(adb, "foobarbaz");

为了解决多进程共享访问和远程访问TC数据库的不便与繁琐,TC作者开发了一个网络访问层,叫做TokyoTyrant(TT)。它使用TC的Abstract Database API来访问TC数据库。因而内置支持skeleton database扩展。
TT提供了-skel命令行选项来指定skeleton database,启动时它会加载传入的Shared Object(SO)文件,使用SO中定制的数据库实现。

1
ttserver -skel ttskelfoo.so

我们可以根据需求实现特定的SO文件,就可以完整利用TT本身已经实现的各种特性,如主备同步,memcache协议支持,HTTP协议支持等。在性能满足需求的情况,这将大大减少开发量。SO文件必须导出一个名字为initialize的函数,TT启动时会从SO文件中查找该函数来初始化skeleton database。
该函数原型为:

1
bool (*initfunc)(ADBSKEL *);

该函数传入一个指向skeleton database的指针。initialize函数中需要将skeleton database定制的数据库操作的API实现赋值到相应函数指针。
由于initialize函数没有参数传递TT本身相关信息,如命令行选项,配置结构等,而TT将一些信息存储在全局变量g_serv指向的TTSERV结构体中,因而SO中可以声明g_serv外部变量来引用。

1
extern TTSERV  *g_serv;

不过较为遗憾的是TTSERV中的信息较少,如有需要的话可以自行扩展。如果SO中逻辑需要依赖命令行选项,可以通过使用启动TT时传入的数据库名来做不同处理。skeleton database的open函数会传入该参数。

具体例子可以参考:
https://github.com/flygoast/ttskeliplist

opentracker是一个开源P2P tracker服务器.之前我们系统中主要使用的是PHP+MYSQL实现的peertracker。随着业务增长,peertracker的性能已经不能满足系统需要。因而我们决定引入性能更好的opentracker。不过, opentracker在功能上并不能完全满足我们的需求,因而我对它进行了一些扩展。

  • UDP单播同步数据

opentracker本身支持cluster模式。cluster内各节点之间会同步数据。这样可以通过添加节点提高整体的集群性能。然而,opentracker原生通过UDP多播进行数据同步。我们不具备多播IP,因而开发了单播模式进行同步。实现非常简单,就是依次向cluster内其他节点发送数据。缺点是当节点数较多时,会影响性能。

  • 持久化支持

opentracker将torrent和peer信息保存在内存中。当opentracker重启时,所有的torrent和peer信息就都丢失了。这会导致我们的系统一段时间内不能进行正常的P2P传输。因而我扩展了持久化功能。opentracker架构上通过多个不同的线程执行不同的任务。我添加了一个线程,周期性地将内存中的torrent和peer信息保存到磁盘文件中。这个线程很像Redis中进行数据dump的进程。磁盘文件格式定义为ODB(opentracker database),它主要借鉴自Redis的RDB格式。目前,没有处理IPV6格式,因而只支持IPV4。
我还提供了一个工具支持流式地对odb文件进行处理。

具体用法请参考: perldoc OdbParser

格式规范:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
----------------------------------- # ODB is a binary format. There are no new lines or spaces in the file.
4f 50 45 4e 54 52 41 43 4b 45 52 # Magic String "OPENTRACKER"
30 30 30 33 # ODB Version Number in ASCII characters. In this case, version = "0001" = 1
-----------------------------------
FE # FE = Opcode that indicates following is a torrent information.
----------------------------------- # Torrent information starts from here.
00 02 2e 33 01 c4 a1 df bb 82
1f 51 0f b5 b6 02 6f 93 6e 9f # 20 byte info_hash of torrent
-----------------------------------
e2 37 59 01 00 00 00 00 # Last access time in minutes since UNIX epoch, 8 bytes.
# At present, when loading ODB file, just use current clock, ignore this.
-----------------------------------
00 00 00 00 00 00 00 00 # Seeding peer count for current torrent. 8 bytes long integer in little endian.
# NOT used at present.
-----------------------------------
00 00 00 00 00 00 00 00 # Total peer count for current torrent. 8 bytes long integer in little endian.
# NOT used at present.
-----------------------------------
00 00 00 00 00 00 00 00 # Download times of files in current torrent. 8 bytes long integer in little endian.
# NOT used at present.
-----------------------------------
01 00 00 00 # Peer count in current peer, 4 bytes integer in little endian.
----------------------------------- # Peers information starts from here.
7f 00 00 01 # 4 bytes ip address in network byte order. In this case, 0x7f000001 = "127.0.0.1"
1b 31 # 2 bytes port in network byte order. In this case, 0x1b31 = 6961.
80 # Flag of peers. SEEDING = 0x80, COMPLETED = 0x40, STOPPED = 0x20, LEECHING = 0x00
00 # Reserved. Just set zero.
-----------------------------------
... # Other peers information.
-----------------------------------
FE
-----------------------------------
... # Other torrent information.
-----------------------------------
FF # EOF opcode.
  • HTTP debug接口

由于opentracker响应内容是按BENCODE编码过的,调试时不太方便。因而扩展了一个返回human-readable内容的调试接口。

具体参看: https://github.com/flygoast/opentracker

我们线上服务使用nginx+passenger-3.0.11来运行Rails程序。我们发现当执行若干次nginx -s reload或者kill -HUP cat /usr/local/nginx/logs/nginx.pid之后,再执行以上命令不再生效。

用pstack观察master进程,master进程阻塞在read()操作上。

使用gcore生成core文件,然后用gdb查看,查看栈帧所在文件及行号。

查看passenger源码。readExact()逻辑为一直读到size大小返回。readArrayMessage()代码逻辑为先从传入fd中读取两个字节做为长度,再从fd中读取相应长度的内容。Passenger::AgentsStarter::start函数的过程为创建一个socketpair,然后fork()子进程,子进程执行PassengerWatchdog。父子进程通过该socketpair进行通信。master就是卡在读取socketpair上。

从gdb中看到readExact()传入的size为12848,即从socketpair中读到的长度为12848,而实际读取的内容长度为621,并且读取的内容也很诡异。应该是某个地方向socketpair中写入了错误内容。

继续查看passenger代码中子进程的逻辑。关闭socketpair一端。然后将socketpair另一端通过dup2()调用复制到3上。然后关闭除了0-3以外的所有文件描述符。接着解除信号阻塞,重置信号处理函数。

通过strace追踪master进程及其子进程行为,发现子进程确实发送了错误内容。”20”的十六进制表示为0x3230,按网络字节序读出正好是12848。从调用位置看是在SIGCHLD的信号处理函数中。而从写入内容看像是nginx在写日志。

继续追查为什么会有SIGCHLD产生。最后发现getHighestFileDescriptor()函数中子进程会fork()出孙进程,获取当前打开的最大fd,子进程等待孙进程退出后返回。由于孙进程退出,子进程收到了SIGCHLD信号。但是由于子进程是由master进程fork()出来的,SIGCHLD信号是被阻塞的。当执行sigprocmask()时,被阻塞的SIGCHLD被处理了,而这时的信号处理函数是ngx_signal_handler(), 该函数会调用ngx_log_error()向error log写入日志。若error log的fd为3,就发生我们上述的情况。

为了避免这种情况,应该先重置信号处理函数,再解除信号阻塞。

patch:

https://github.com/flygoast/passenger/commit/0b62808943aba65432f0b492f4ef941499fad02c

异常现象:

在/etc/rc.local中添加/usr/local/nginx/sbin/nginx来开机自动启动NGINX时,PassengerHelperAgent进程不停反复重启,而从shell上手动启动NGINX时一切正常。

追查过程:

查阅异常时的error.log日志发现以下错误:

1
2
3
4
5
[ pid=3413 thr=140583025772288 file=ext/nginx/HelperAgent.cpp:963 time=2014-09-30 11:05:20.925 ]: Uncaught exception in PassengerServer client thread:
exception: Cannot accept new connection: Too many open files (24)
backtrace:
in 'Passenger::FileDescriptor Client::acceptConnection()' (HelperAgent.cpp:429)
in 'void Client::threadMain()' (HelperAgent.cpp:952)

根据日志错误信息,可以确定是PassengerHelperAgent进程文件描述符达到了上限。

找到ext/nginx/HelperAgent.cpp文件的963行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void threadMain() {
TRACE_POINT();
try {
while (true) {
UPDATE_TRACE_POINT();
inactivityTimer.start();
FileDescriptor fd(acceptConnection());
inactivityTimer.stop();
handleRequest(fd);
}
} catch (const boost::thread_interrupted &) {
P_TRACE(2, "Client thread " << this << " interrupted.");
} catch (const tracable_exception &e) {
P_ERROR("Uncaught exception in PassengerServer client thread:\n"
<< " exception: " << e.what() << "\n"
<< " backtrace:\n" << e.backtrace());
abort();
}
}

通过上下文可以确定是调用acceptConnection()出错,查看acceptConnection()代码,确定是由该函数抛出的异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
FileDescriptor acceptConnection() {
TRACE_POINT();
struct sockaddr_un addr;
socklen_t addrlen = sizeof(addr);
int fd = syscalls::accept(serverSocket,
(struct sockaddr *) &addr,
&addrlen);
if (fd == -1) {
throw SystemException("Cannot accept new connection", errno);
} else {
return FileDescriptor(fd);
}
}

syscalls::accept是对系统调用accept的简单封装。

1
2
3
4
5
6
7
8
syscalls::accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
int ret;
CHECK_INTERRUPTION(
ret == -1,
ret = ::accept(sockfd, addr, addrlen)
);
return ret;
}

错误原因就是accept由于进程文件描述符达到上限而出错返回了。

接下来追查为什么进程文件描述符数会达到上限。

首先看一下PassengerHelperAgent的整体代码逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int
main(int argc, char *argv[]) {
TRACE_POINT();
VariantMap options = initializeAgent(argc, argv, "PassengerHelperAgent");
pid_t webServerPid = options.getPid("web_server_pid");
string tempDir = options.get("temp_dir");
bool userSwitching = options.getBool("user_switching");
string defaultUser = options.get("default_user");
string defaultGroup = options.get("default_group");
string passengerRoot = options.get("passenger_root");
string rubyCommand = options.get("ruby");
unsigned int generationNumber = options.getInt("generation_number");
unsigned int maxPoolSize = options.getInt("max_pool_size");
unsigned int maxInstancesPerApp = options.getInt("max_instances_per_app");
unsigned int poolIdleTime = options.getInt("pool_idle_time");

try {
UPDATE_TRACE_POINT();
Server server(FEEDBACK_FD, webServerPid, tempDir,
userSwitching, defaultUser, defaultGroup,
passengerRoot, rubyCommand, generationNumber,
maxPoolSize, maxInstancesPerApp, poolIdleTime,
options);
P_DEBUG("Passenger helper agent started on PID " << getpid());

UPDATE_TRACE_POINT();
server.mainLoop();
} catch (const tracable_exception &e) {
P_ERROR(e.what() << "\n" << e.backtrace());
return 1;
} catch (const std::exception &e) {
P_ERROR(e.what());
return 1;
}

P_TRACE(2, "Helper agent exited.");
return 0;
}

首先创建一个Server对象,然后调用Server对象的mainLoop成员函数。Server对象构造函数会调用成员函数startListening。

1
2
3
4
5
6
7
8
9
10
11
12
void startListening() {
this_thread::disable_syscall_interruption dsi;
requestSocket = createUnixServer(getRequestSocketFilename().c_str());

int ret;
do {
ret = chmod(getRequestSocketFilename().c_str(), S_ISVTX |
S_IRUSR | S_IWUSR | S_IXUSR |
S_IRGRP | S_IWGRP | S_IXGRP |
S_IROTH | S_IWOTH | S_IXOTH);
} while (ret == -1 && errno == EINTR);
}

createUnixServer函数会创建一个socket文件,然后监听这个文件。NGINX收到请求后,会由Passenger模块转发请求到该socket文件。
mainLoop会调用成员函数startClientHandlerThreads,它会创建numberOfThreads个Client对象。

1
2
3
4
5
6
7
8
void startClientHandlerThreads() {
for (unsigned int i = 0; i < numberOfThreads; i++) {
ClientPtr client(new Client(i + 1, pool, requestSocketPassword,
defaultUser, defaultGroup, requestSocket,
analyticsLogger));
clients.insert(client);
}
}

Client对象构造函数会启动一个线程执行threadMain。threadMain就是我们上面出错的函数。每个线程等待接收通过socket文件发来的请求,接收请求后调用handleRequest进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Client(unsigned int number, ApplicationPool::Ptr pool,
const string &password, const string &defaultUser,
const string &defaultGroup, int serverSocket,
const AnalyticsLoggerPtr &logger)
: inactivityTimer(false)
{
this->number = number;
this->pool = pool;
this->password = password;
this->defaultUser = defaultUser;
this->defaultGroup = defaultGroup;
this->serverSocket = serverSocket;
this->analyticsLogger = logger;

sbmh_init(&statusFinder.ctx, NULL, NULL, 0);
sbmh_init(&transferEncodingFinder.ctx, NULL, NULL, 0);

thr = new oxt::thread(
boost::bind(&Client::threadMain, this),
"Client thread " + toString(number),
CLIENT_THREAD_STACK_SIZE
);
}

问题出在线程调用accept等待接收请求时。我们所创建的线程数量numberOfThreads是在Server对象被创建时指定的。

1
numberOfThreads     = maxPoolSize * 4;

而maxPoolSize由passenger_max_pool_size配置项指定,我们指定的是256。256 × 4 = 1024,开机启动时PassengerHelperAgent进程的文件描述符上限就是1024。这个数字值得怀疑。因而我将配置修改为128,果然正常了。

1
passenger_max_pool_size 256;

可以确定每个线程中占用了文件描述符。然而从代码中并没有找到打开文件相关的逻辑。当accept成功返回时,会返回一个新的文件描述符。开始怀疑accept在还没有接收到请求时就预先占用了一个文件描述符。通过一个简单程序来验证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/un.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <pthread.h>
#include <assert.h>

void *start_accept(void *arg) {
int fd = (int) arg;
int newfd;
struct sockaddr_un addr;
socklen_t addrlen;

newfd = accept(fd, (struct sockaddr *)&addr, &addrlen);

if (newfd < 0) {
fprintf(stderr, "accept failed: %u: %s\n",
pthread_self(), strerror(errno));
}
}

int main() {
int i, fd;
struct sockaddr_un addr;
socklen_t addrlen = sizeof(addr);
pthread_t pt;

assert((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) > 0);

for (i = 0; i < 1020; i++) {
assert(open("emfile.c", O_RDONLY) > 0);
}

addr.sun_family = AF_LOCAL;
strncpy(addr.sun_path, "emfile.socket", sizeof("emfile.socket") - 1);
addr.sun_path[sizeof("emfile.socket") - 1] = '\0';

assert(bind(fd, (const struct sockaddr *)&addr, sizeof(addr)) == 0);

assert(listen(fd, 512) == 0);

pthread_create(&pt, NULL, start_accept, (void *) fd);

pthread_join(pt, NULL);

exit(0);
}

使用ulimit将shell文件打开数上限修改为1024:

1
$ ulimit -n 1024

编译验证程序,并执行

1
2
$ gcc emfile.c -lpthread
$ ./a.out

得到结果:

1
accept failed: 591611648: Too many open files

确实如些,那再来看一下accept的实现。accept系统调用的内核实现是sys_accept,而sys_accept是对sys_accept4的简单封装。

1
2
3
4
5
SYSCALL_DEFINE3(accept, int, fd, struct sockaddr __user *, upeer_sockaddr,
int __user *, upeer_addrlen)
{
return sys_accept4(fd, upeer_sockaddr, upeer_addrlen, 0);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
SYSCALL_DEFINE4(accept4, int, fd, struct sockaddr __user *, upeer_sockaddr,
int __user *, upeer_addrlen, int, flags)
{
...

sock = sockfd_lookup_light(fd, &err, &fput_needed);
if (!sock)
goto out;

err = -ENFILE;
if (!(newsock = sock_alloc()))
goto out_put;

newsock->type = sock->type;
newsock->ops = sock->ops;

...

newfd = sock_alloc_file(newsock, &newfile, flags);

...

err = sock->ops->accept(sock, newsock, sock->file->f_flags);
if (err < 0)
goto out_fd;

...

fd_install(newfd, newfile);
err = newfd;

out_put:
fput_light(sock->file, fput_needed);
out:
return err;
out_fd:
fput(newfile);
put_unused_fd(newfd);
goto out_put;
}

sys_accept4中在调用sock->ops->accept去接收网络请求前就调用sock_alloc_file来分配文件描述符。再来看sock_alloc_file这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
static int sock_alloc_file(struct socket *sock, struct file **f, int flags)
{
struct qstr name = { .name = "" };
struct path path;
struct file *file;
int fd;

fd = get_unused_fd_flags(flags);
if (unlikely(fd < 0))
return fd;
...
}

sock_alloc_file会调用get_unused_fd_flags,这是一个宏,实际会调用函数alloc_fd, 而alloc_fd又会调用函数expand_files:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int expand_files(struct files_struct *files, int nr)
{
struct fdtable *fdt;

fdt = files_fdtable(files);

/*
* N.B. For clone tasks sharing a files structure, this test
* will limit the total number of files that can be opened.
*/
if (nr >= current->signal->rlim[RLIMIT_NOFILE].rlim_cur)
return -EMFILE;

/* Do we need to expand? */
if (nr < fdt->max_fds)
return 0;

/* Can we expand? */
if (nr >= sysctl_nr_open)
return -EMFILE;

/* All good, so we try */
return expand_fdtable(files, nr);
}

可以看到expand_files进行文件描述符限制的检查,当超过限制时返回”EMFILE”。”EMFILE”错误的提示就是”Too many open files”。

1
#define EMFILE      24  /* Too many open files */

结合上面的测试程序,使用一个systemtap脚本可以捕获到上述调用路径。

1
2
3
4
5
6
7
8
9
10
11
12
probe  kernel.function("expand_files").return {
if ($return == -24) {
println("expand_files");
printf("%d\n", $nr);
print_backtrace();
exit();
}
}

probe begin {
println("start\n");
}

执行stap:

1
$sudo stap emfile.stap

捕获结果为:

1
2
3
4
5
6
7
8
9
10
start

expand_files
1024
Returning from: 0xffffffff811931d0 : expand_files+0x0/0x220 [kernel]
Returning to : 0xffffffff81193443 : alloc_fd+0x53/0x160 [kernel]
0xffffffff814187f3 : sock_alloc_file+0x43/0x150 [kernel]
0xffffffff8141b3dd : sys_accept4+0x11d/0x2b0 [kernel]
0xffffffff8141b580 : sys_accept+0x10/0x20 [kernel]
0xffffffff8100b0f2 : system_call_fastpath+0x16/0x1b [kernel]

最终确定异常原因:

开机自启时,进程的打开文件数限制为1024,而创建1024个线程执行accept()时。每个accept会占用一个文件描述符, 达到了进程的文件描述符上限而异常。而从shell启动时,我们shell进程的文件描述符限制是32768,因而不会出现问题。

解决方法:
创建一个启动脚本,在执行/usr/local/nginx/sbin/nginx前执行ulimit修改文件描述符限制。

1
2
ulimit -SHn 65535
/usr/local/nginx/sbin/nginx &

注意:

  • /etc/security/limits.conf中的设置只针对登录动作发生时才生效,因而对于开机自动启动进程这种情况,这种修改该文件的方式不生效。
  • Passenger版本为3.0.11
  • kernel版本为CentOS 6.2内核,kernel-2.6.32-220.4.2.el6

NGINX实现了一套变量机制,使得配置动态内容非常灵活。比如我们可以使用“proxy_cache_key”指令根据我们的需求灵活地设置Cache的KEY。变量机制也为各模块合作提供了一个桥梁,使得模块间配合完成功能更加方便。比如”proxy_pass”指令支持将回源地址设置为变量,我们可以在另一个模块中依据条件对该变量赋值,从而非常简便地完成基于各种策略选择上游地址的功能。

变量机制相关的内部数据结构主要有两种, ngx_http_variable_t和ngx_http_variable_value_t,分别代表变量本身和变量值。

ngx_http_variable_t结构如下:

1
2
3
4
5
6
7
8
struct ngx_http_variable_s {
ngx_str_t name; /* must be first to build the hash */
ngx_http_set_variable_pt set_handler;
ngx_http_get_variable_pt get_handler;
uintptr_t data;
ngx_uint_t flags;
ngx_uint_t index;
};

各成员意义如下:

  • name: 变量名称
  • set_handler: 赋值函数,主要用于”set”指令,处理请求时执行set指令时调用
  • get_handler: 取值函数,当读取该变量时调用该函数得到变量值
  • data: 传递给set_handler和get_handler的参数
  • flags: 变量属性标志
  • index: 变量在cmcf->variables数组中的索引

其中flags取值及意义如下:

  • NGX_HTTP_VAR_CHANGEABLE: 变量被添加时如果已有同名变量,则返回该变量,否则会报错认为变量名冲突。
  • NGX_HTTP_VAR_NOCACHEABLE: 变量的值不应该被缓存。变量被取值后,变量值的no_cacheable被置为1。
  • NGX_HTTP_VAR_INDEXED:表示变量被索引,存储在cmcf->variables数组,这样的变量可以通过索引值直接找到。
  • NGX_HTTP_VAR_NOHASH: 不会将该变量存储在cmcf->variables_hash哈希表。

ngx_http_variable_value_t结构如下:

1
typedef ngx_variable_value_t  ngx_http_variable_value_t;
1
2
3
4
5
6
7
8
9
10
typedef struct {
unsigned len:28;

unsigned valid:1;
unsigned no_cacheable:1;
unsigned not_found:1;
unsigned escape:1;

u_char *data;
} ngx_variable_value_t;

各成员意义如下:

  • len: 变量值数据长度
  • valid: 该变量值是否可用
  • no_cacheable: 该变量值是否不能缓存
  • not_found: 对应变量不存在
  • escape: 变量值内容中的特殊字符是否进行了转义
  • data:变量值的数据

因为NGINX所有变量存储在ngx_http_core_module的main级结构ngx_http_core_main_conf_t中(以下简写cmcf),所以变量的作用范围是整个http{}配置。在某个server{}中添加的变量,在另一个server{}同样可以使用。

1
2
3
4
5
6
7
8
typedef struct {
......
ngx_hash_t variables_hash;
ngx_array_t variables; /* ngx_http_variable_t */
......
ngx_hash_keys_arrays_t *variables_keys;
......
} ngx_http_core_main_conf_t;

NGINX变量有以下3种类型:

  • 模块内置变量
  • 根据配置动态添加的变量
  • 内置规则变量

模块内置变量主要在ngx_http_module_t的preconfiguration阶段中添加。如upstream模块的preconfiguration回调为ngx_http_upstream_add_variables().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static ngx_int_t
ngx_http_upstream_add_variables(ngx_conf_t *cf)
{
ngx_http_variable_t *var, *v;

for (v = ngx_http_upstream_vars; v->name.len; v++) {
var = ngx_http_add_variable(cf, &v->name, v->flags);
if (var == NULL) {
return NGX_ERROR;
}

var->get_handler = v->get_handler;
var->data = v->data;
}

return NGX_OK;
}

根据配置动态添加的变量一般当解析相应指令时,在指令的解析函数中添加。比如rewrite模块中的set指令可以添加用户定义名称的变量。

1
2
3
4
v = ngx_http_add_variable(cf, &value[1], NGX_HTTP_VAR_CHANGEABLE);
if (v == NULL) {
return NGX_CONF_ERROR;
}

内置规则变量不需要添加,而是按特定规则来解析。如”http_”, “upstream_http_”, “arg_”, “cookie_”等等一系列变量。
前两种方式都是调用ngx_http_add_variable()来添加变量。ngx_http_add_variable()向cmcf->variable_keys数组中添加变量,并将该变量结构返回。如果指定了NGX_HTTP_VAR_CHANGEABLE标志,那么当检查到同名的变量时,则直接返回该变量。否则报错返回NULL。
如果某一指令需要用到一个变量,则一般在解析该指令配置时会调用ngx_http_get_variable_index(),并将该索引值保存,当处理请求时直接通过索引找到变量。如geo模块中geo指令的解析函数。

1
2
3
4
geo->index = ngx_http_get_variable_index(cf, &name);
if (geo->index == NGX_ERROR) {
return NGX_CONF_ERROR;
}

ngx_http_get_variable_index()会从cmcf->variables数组中查找变量,查找到则返回该变量的索引。否则在cmcf->variables添加该变量并返回数组索引。当解析完HTTP{}配置后,NGINX会将cmcf->variables_keys中的变量组织到cmcf->variables_hash这个HASH表中。如果变量指令了NGX_HTTP_VAR_NOHASH标志,则该变量不会被添加到cmcf->variables_hash中。如果一个变量既没有添加到cmcf->variables中,也没有添加到cmcf->variables_hash中,那么这个变量就不能被找到,因而会被认为不存在。

NGINX开始处理请求时会在请求结构体ngx_http_request_中创建被索引的变量值的缓存空间。

1
2
r->variables = ngx_pcalloc(r->pool, cmcf->variables.nelts
* sizeof(ngx_http_variable_value_t));

对于被索引的变量,可以使用ngx_http_get_indexed_variable()或者ngx_http_get_flushed_variable()来求值。这样省去了查找哈希表的消耗。ngx_http_get_indexed_variable()首先检查r->variables[index]变量缓存是否可用。可用则直接返回,否则调用v->get_handler对变量求值,并将结果存储在r->variables[index]中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
ngx_http_variable_value_t *
ngx_http_get_indexed_variable(ngx_http_request_t *r, ngx_uint_t index)
{
ngx_http_variable_t *v;
ngx_http_core_main_conf_t *cmcf;

cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);

if (cmcf->variables.nelts <= index) {
ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
"unknown variable index: %d", index);
return NULL;
}

if (r->variables[index].not_found || r->variables[index].valid) {
return &r->variables[index];
}

v = cmcf->variables.elts;

if (v[index].get_handler(r, &r->variables[index], v[index].data)
== NGX_OK)
{
if (v[index].flags & NGX_HTTP_VAR_NOCACHEABLE) {
r->variables[index].no_cacheable = 1;
}

return &r->variables[index];
}

r->variables[index].valid = 0;
r->variables[index].not_found = 1;

return NULL;
}

ngx_http_get_flushed_variable()还会对变量值的no_cacheable标志进行检查。如果为0,表示变量值可以cache, 则直接返回已缓存的变量值。否则,将变量值置为不可用,调用ngx_http_get_indexed_variable()对变量求值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ngx_http_variable_value_t *
ngx_http_get_flushed_variable(ngx_http_request_t *r, ngx_uint_t index)
{
ngx_http_variable_value_t *v;

v = &r->variables[index];

if (v->valid || v->not_found) {
if (!v->no_cacheable) {
return v;
}

v->valid = 0;
v->not_found = 0;
}

return ngx_http_get_indexed_variable(r, index);
}

没有索引的变量,可以调用ngx_http_get_variable()完成取值。它会查找cmcf->variables_hash哈希表,找到变量,从相应变量缓存中取值或调用变量的get_handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);

v = ngx_hash_find(&cmcf->variables_hash, key, name->data, name->len);

if (v) {

if (v->flags & NGX_HTTP_VAR_INDEXED) {
return ngx_http_get_flushed_variable(r, v->index);

} else {

vv = ngx_palloc(r->pool, sizeof(ngx_http_variable_value_t));

if (vv && v->get_handler(r, vv, v->data) == NGX_OK) {
return vv;
}

return NULL;
}
}

NGINX中rewrite模块实现了set指令,可以给变量赋值。如果另一模块的变量也可以使用set来赋值,则多模块配合完成功能会更加灵活。
set指令的解析函数ngx_http_rewrite_set()代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
static char *
ngx_http_rewrite_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_rewrite_loc_conf_t *lcf = conf;

ngx_int_t index;
ngx_str_t *value;
ngx_http_variable_t *v;
ngx_http_script_var_code_t *vcode;
ngx_http_script_var_handler_code_t *vhcode;

value = cf->args->elts;

if (value[1].data[0] != '$') {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid variable name "%V"", &value[1]);
return NGX_CONF_ERROR;
}

value[1].len--;
value[1].data++;

v = ngx_http_add_variable(cf, &value[1], NGX_HTTP_VAR_CHANGEABLE);
if (v == NULL) {
return NGX_CONF_ERROR;
}

index = ngx_http_get_variable_index(cf, &value[1]);
if (index == NGX_ERROR) {
return NGX_CONF_ERROR;
}

if (v->get_handler == NULL
&& ngx_strncasecmp(value[1].data, (u_char *) "http_", 5) != 0
&& ngx_strncasecmp(value[1].data, (u_char *) "sent_http_", 10) != 0
&& ngx_strncasecmp(value[1].data, (u_char *) "upstream_http_", 14) != 0)
{
v->get_handler = ngx_http_rewrite_var;
v->data = index;
}

if (ngx_http_rewrite_value(cf, lcf, &value[2]) != NGX_CONF_OK) {
return NGX_CONF_ERROR;
}

if (v->set_handler) {
vhcode = ngx_http_script_start_code(cf->pool, &lcf->codes,
sizeof(ngx_http_script_var_handler_code_t));
if (vhcode == NULL) {
return NGX_CONF_ERROR;
}

vhcode->code = ngx_http_script_var_set_handler_code;
vhcode->handler = v->set_handler;
vhcode->data = v->data;

return NGX_CONF_OK;
}

vcode = ngx_http_script_start_code(cf->pool, &lcf->codes,
sizeof(ngx_http_script_var_code_t));
if (vcode == NULL) {
return NGX_CONF_ERROR;
}

vcode->code = ngx_http_script_set_var_code;
vcode->index = (uintptr_t) index;

return NGX_CONF_OK;
}

它向lcf->codes函数引擎添加rewrite阶段需要执行的函数。如果检测到变量的set_handler存在,则添加ngx_http_script_var_set_handler_code函数,它会调用set_handler。而如果没有set_handler, 则添加ngx_http_script_set_var_code函数,它不会调用set_handler。由于内置变量添加一般是在preconfiguration中完成,因而解析set指令时,变量的set_handler存在,可以正常处理。而根据配置动态添加的变量如果解析出现在set指令后,set指令先被解析,此时变量的set_handler为空,此时添加的函数为ngx_http_script_set_var_code,v->set_handler不会得到调用。因此个人感觉添加函数引擎的逻辑应该放到postconfiguration中处理。此时和变量的各成员值都已被正常赋值。因而可以更方便地让根据配置动态添加的变量也可以和set指令轻松结合。

业务要求Cache服务器能够随时增删允许访问的HOST。而每个HOST有单独的配置,这些配置随时都可能更改。如果单纯采用静态配置文件(nginx.conf)的方式,每次修改都要reload NGINX。如果更改很频繁,会造成服务器上存在大量的NGINX进程,导致服务器负载很高。因而我们将需要随时更改的配置存储于一个独立的配置服务器中。请求处理时,先去配置服务器中获取该请求需要使用的配置,再根据这些配置进行相应的处理。因而,我们可以随时更改配置服务器中的相应内容。
其中一个配置就是文件缓存时间。NGINX中设置文件缓存时间有两种方法:

  • 设置proxy_cache_valid指令
  • 在上游响应中的添加”Cache-Control” header和”Expires” header

其中上游响应header的优先级更高。当不想使用上游响应header中所设置的缓存时间时,可以使用以下指令来禁用。

1
proxy_ignore_headers X-Accel-Expires;

这两种方法都无法满足我们根据动态配置来设置缓存时间的需求。因而我给NGINX添加了一个内置变量”cache_time”来支持灵活地设置缓存时间,并且该种方式具有最高的优先级。这样,可以非常方便地在ngx_lua等第三方模块中根据条件设置不同的缓存时间。

在ngx_http_request_t添加一个cache_time成员,在ngx_http_core_variables数组中添加内置变量”cache_time”,”cache_time”在被赋值时会将值存储在r->cache_time中。

1
2
3
4
{ ngx_string("cache_time"), ngx_http_variable_request_set_time,
ngx_http_variable_request_get_time,
offsetof(ngx_http_request_t, cache_time),
NGX_HTTP_VAR_CHANGEABLE|NGX_HTTP_VAR_NOCACHEABLE, 0 },
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static void
ngx_http_variable_request_set_time(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data)
{
ngx_str_t val;
time_t valid, *vp;

val.len = v->len;
val.data = v->data;

valid = ngx_parse_time(&val, 1);
if (valid == (time_t) NGX_ERROR) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"invalid time value "%V"", &val);
return;
}

vp = (time_t *) ((char *) r + data);

*vp = valid;

return;
}

因为”cache_time”变量需要比上游响应header具有更高的优先级,因而要在上游header处理之后再处理”cache_time”变量。上游响应的header在ngx_http_upstream_process_headers()中进行处理。因而我在upstream模块中添加了一个hook, 该hook在调用完ngx_http_upstream_process_headers()后,开始处理body前被调用。

1
2
3
4
5
6
7
8
9
10
11
if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
return;
}

if (u->post_headers) {
rc = u->post_headers(r);
if (rc != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}
}

proxy模块在该hook上注册一个函数,这个函数执行时,首先检查上游响应的状态码判断是否需要处理”cache_time”变量。检查通过后,读取”cache_time”变量的值,依据值来进行各种操作。当值为0时,禁用cache.为正值,则将缓存时间修改为该值。当修改cache缓存时间后,将上游响应中的”Cache-Control”和”Expires” header去除,不再发送给下游。

1
u->post_headers = ngx_http_proxy_post_headers;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static ngx_int_t
ngx_http_proxy_post_headers(ngx_http_request_t *r)
{
ngx_uint_t i;
ngx_table_elt_t **ph;

if (ngx_http_upstream_check_status(r->upstream->conf->cache_time_valid,
r->upstream->headers_in.status_n)
== NGX_DECLINED)
{
return NGX_OK;
}

if (r->cache_time == (time_t) -1) {
return NGX_OK;
}

if (r->cache_time == (time_t) 0) {
r->upstream->cacheable = 0;
return NGX_OK;
}

r->cache->valid_sec = ngx_time() + r->cache_time;

r->headers_out.expires->hash = 0;

ph = r->headers_out.cache_control.elts;
for (i = 0; i < r->headers_out.cache_control.nelts; i++) {
ph[i]->hash = 0;
}

return NGX_OK;
}

XEP-0114中定义了Jabber组件协议(Jabber Componet Protocol)。XMPP网络外的可信组件可以使用这个协议和XMPP网络内实体进行通信。

组件协议定义了两种模式:

  • accept:外部组件向XMPP服务器发起连接
  • connect:XMPP服务器向外部组件发起连接

其中, accept方式使用比较广泛,ejabberd中只实现了accept方式。

组件协议像XMPP一样,也是基于XML流,使用的XMLNS为jabber:componet:accept或者jabber:component:connect

accept方式的协议流程:

  • 外部组件建立到XMPP服务器的TCP连接,发送流头。

    1
    2
    3
    4
    <stream:stream
    xmlns='jabber:component:accept'
    xmlns:stream='http://etherx.jabber.org/streams'
    to='plays.shakespeare.lit'>
  • XMPP服务器回应,也发送流头,其中必须包括流ID属性:

    1
    2
    3
    4
    5
    <stream:stream
    xmlns:stream='http://etherx.jabber.org/streams'
    xmlns='jabber:component:accept'
    from='plays.shakespeare.lit'
    id='3BF96D32'>
  • 外部组件发送身份验证摘要信息。

    1
    <handshake>aaee83c26aeeafcbabeabfcbcd50df997e0a2a1e</handshake>

    组件协议身份验证不使用SASL,也不使用已废弃的XEP-0078。它使用双方共享密钥计算摘要信息来验证身份。计算方法如下:

    1. 将服务器流头中的流ID属性和共享密钥拼接成字符串
    2. 计算该字符串的SHA1哈希值,并转换成小写16进制字符串
  • XMPP服务器用同样方法计算进行校验。通过后,返回一个空的handshake元素。

    1
    <handshake/>

    至此,外部组件和XMPP服务器就可以交换XMPP消息了。

我们来看ejabberd中组件协议实现,位于ejabberd_service.erl模块中。

ejabberd中的ejabberd_service的默认配置为:

1
2
3
4
5
6
7
8
{8888, ejabberd_service, [
{access, all},
{shaper_rule, fast},
{ip, {127, 0, 0, 1}},
{hosts, ["icq.example.org", "sms.example.org"],
[{password, "secret"}]
}
]},

ejabberd_service是端口8888的处理模块。当有ejabberd接收端口上的TCP连接后,ejabberd_socket:start/4调用处理模块的socket_type/0, 根据返回值进行不同处理。ejabberd_service:socket_type/0返回xml_stream。它的处理流程和ejabberd_c2s模块相同。ejabberd为每个TCP连接分别创建一个receiver进程和一个处理进程(这里是ejabberd_service进程)。receiver进程接收消息并解析,然后发送相应的消息给处理进程。具体不再详述,请参考:ejabberd消息处理流程分析

service进程为gen_fsm进程,初始状态为wait_for_stream。receiver进程接收到XML流头后发送xmlstreamstart消息给service进程。service进程调用wait_for_stream函数进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
case xml:get_attr_s("xmlns", Attrs) of
"jabber:component:accept" ->
%% Note: XEP-0114 requires to check that destination is a Jabber
%% component served by this Jabber server.
%% However several transports don't respect that,
%% so ejabberd doesn't check 'to' attribute (EJAB-717)
To = xml:get_attr_s("to", Attrs),
Header = io_lib:format(?STREAM_HEADER,
[StateData#state.streamid, xml:crypt(To)]),
send_text(StateData, Header),
{next_state, wait_for_handshake, StateData};
_ ->
send_text(StateData, ?INVALID_HEADER_ERR),
{stop, normal, StateData}
end;

wait_for_stream检测到XML流头中XMLNS为”jabber:component:accept”后,向组件发送流头,状态变更为wait_for_handshake。

receiver进程收到handshake消息后,发送xmlstreamelement消息给service进程,service调用wait_for_handshake处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
wait_for_handshake({xmlstreamelement, El}, StateData) ->
{xmlelement, Name, _Attrs, Els} = El,
case {Name, xml:get_cdata(Els)} of
{"handshake", Digest} ->
case sha:sha(StateData#state.streamid ++
StateData#state.password) of
Digest ->
send_text(StateData, "<handshake/>"),
lists:foreach(
fun(H) ->
ejabberd_router:register_route(H),
?INFO_MSG("Route registered for service ~p~n", [H])
end, StateData#state.hosts),
{next_state, stream_established, StateData};
_ ->
send_text(StateData, ?INVALID_HANDSHAKE_ERR),
{stop, normal, StateData}
end;
_ ->
{next_state, wait_for_handshake, StateData}
end;

wait_for_handshake使用XML流ID和密码计算身份验证摘要,和组件所发的摘要信息进行对比判断是否通过。检查通过后,发送空的handshake元素。然后调用ejabberd_router:register_route/1依次注册配置的所有service域名。这样,XMPP实体发往这些域名的消息都将被ejabberd_router路由给该service进程。service进程状态变更为stream_established。

至此,外部组件就可以和XMPP服务器交换XMPP消息了。

组件向XMPP服务器发送消息后,receiver进程解析后向service进程发送xmlstreamelement消息,service进程调用stream_established处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
stream_established({xmlstreamelement, El}, StateData) ->
NewEl = jlib:remove_attr("xmlns", El),
{xmlelement, Name, Attrs, _Els} = NewEl,
From = xml:get_attr_s("from", Attrs),
...
To = xml:get_attr_s("to", Attrs),
ToJID = case To of
"" -> error;
_ -> jlib:string_to_jid(To)
end,
if
((Name == "iq") or
(Name == "message") or
(Name == "presence")) and
(ToJID /= error) and (FromJID /= error) ->
ejabberd_router:route(FromJID, ToJID, NewEl);
true ->
Err = jlib:make_error_reply(NewEl, ?ERR_BAD_REQUEST),
send_element(StateData, Err),
error
end,
{next_state, stream_established, StateData};

stream_established进行一系列检查后,调用ejabberd_router:route转发消息。

XMPP实体发送给service域名的消息会由ejabberd_router以route消息的格式发给service进程。service进程调用handle_info处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
handle_info({route, From, To, Packet}, StateName, StateData) ->
case acl:match_rule(global, StateData#state.access, From) of
allow ->
{xmlelement, Name, Attrs, Els} = Packet,
Attrs2 = jlib:replace_from_to_attrs(jlib:jid_to_string(From),
jlib:jid_to_string(To),
Attrs),
Text = xml:element_to_binary({xmlelement, Name, Attrs2, Els}),
send_text(StateData, Text);
deny ->
Err = jlib:make_error_reply(Packet, ?ERR_NOT_ALLOWED),
ejabberd_router:route_error(To, From, Err, Packet)
end,
{next_state, StateName, StateData};

handle_info首先进行ACL检查,通过后,修改From和To属性,将消息发送给组件。

使用telnet演示简单登录过程:

1
2
3
4
5
6
7
8
9
10
[root@flygoast flygoast]# telnet 127.0.0.1 8888
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
<stream:stream
xmlns='jabber:component:accept'
xmlns:stream='http://etherx.jabber.org/streams'
to='sms.example.com'>
<?xml version='1.0'?><stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:component:accept' id='2744762983' from='sms.example.com'><handshake>cffc7fab4feae018a325ea834d2dca8c3b707a51</handshake>
<handshake/>

身份校验信息计算:

1
2
[root@flygoast flygoast]# echo -n "2744762983secret" | sha1sum
cffc7fab4feae018a325ea834d2dca8c3b707a51 -

根据XEP-0199, XMPP客户端和服务器都可以在XML流上发送应用层PING请求。因为XMPP依赖底层的TCP连接,有可能TCP连接意外中断,而上层的XMPP并不知晓,从而影响消息传递。通过发送应用层PING请求可以来确认对端的连接可用性。

以服务器发给客户端为例,协议如下:

发送的PING请求:

1
2
3
<iq from='capulet.lit' to='juliet@capulet.lit/balcony' id='s2c1' type='get'>
<ping xmlns='urn:xmpp:ping'/>
</iq>

如果对端支持PING请求,则返回对应的”PONG”回应。

1
<iq from='juliet@capulet.lit/balcony' to='capulet.lit' id='s2c1' type='result'/>

如果对端不支持则返回错误。

1
2
3
4
5
6
<iq from='juliet@capulet.lit/balcony' to='capulet.lit' id='s2c1' type='error'>
<ping xmlns='urn:xmpp:ping'/>
<error type='cancel'>
<service-unavailable xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>
</error>
</iq>

ejabberd中PING功能实现位于mod_ping.erl。它主要支持3个配置:

  • send_pings: true|false

如果这个选项设置为true, 当客户端在给定时间间隔内没有活动,则向客户端发送一个ping请求。

  • ping_interval: Seconds

设置上述send_pings选项中客户端没有活动的时间间隔。

  • timeout_action: none|kill

表示当PING请求发出32秒后,ejabberd依然没有收到PING响应,服务端如何处理。none表示什么也不做,kill表示关闭客户端连接。

当ejabberd启动时会调用mod_ping:start/2。

1
2
3
4
5
start(Host, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?MODULE),
PingSpec = {Proc, {?MODULE, start_link, [Host, Opts]},
transient, 2000, worker, [?MODULE]},
supervisor:start_child(?SUPERVISOR, PingSpec).

start函数调用supervisor:start_child/2为每个支持的host创建一个负责该host的worker进程。

进程树模型如下:

token data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
+------------+
|ejabberd_sup|
+-----+------+
|
| +------------------+
+--->|Other processes...|
| +------------------+
|
| +------------------------+
+--->|ping(im.just4coding.com)|
| +------------------------+
|
| +------------------------+
+--->|ping(localhost) |
| +------------------------+
|
| +------------------------+
+--->|ping(Other host) |
+------------------------+

每个worker是一个gen_server进程,进程调用init函数进行初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
init([Host, Opts]) ->
SendPings = gen_mod:get_opt(send_pings, Opts, ?DEFAULT_SEND_PINGS),
PingInterval = gen_mod:get_opt(ping_interval, Opts, ?DEFAULT_PING_INTERVAL),
TimeoutAction = gen_mod:get_opt(timeout_action, Opts, none),
IQDisc = gen_mod:get_opt(iqdisc, Opts, no_queue),
mod_disco:register_feature(Host, ?NS_PING),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PING,
?MODULE, iq_ping, IQDisc),
gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_PING,
?MODULE, iq_ping, IQDisc),
case SendPings of
true ->
%% Ping requests are sent to all entities, whether they
%% announce 'urn:xmpp:ping' in their caps or not
ejabberd_hooks:add(sm_register_connection_hook, Host,
?MODULE, user_online, 100),
ejabberd_hooks:add(sm_remove_connection_hook, Host,
?MODULE, user_offline, 100),
ejabberd_hooks:add(user_send_packet, Host,
?MODULE, user_send, 100);
_ ->
ok
end,
{ok, #state{host = Host,
send_pings = SendPings,
ping_interval = PingInterval,
timeout_action = TimeoutAction,
timers = ?DICT:new()}}.
  • 首先获取相关配置

  • 接着调用mod_disco:register_feature注册PING功能的XMLNS。这样当客户端请求”Service Discovery”信息时,ejabberd返回的特征中会包括”urn:xmpp:ping”。

ServiceDiscovery请求:

1
2
3
4
5
6
<iq type='get'
from='juliet@capulet.lit/balcony'
to='capulet.lit'
id='disco1'>
<query xmlns='http://jabber.org/protocol/disco#info'/>
</iq>

ServiceDiscovery响应:

1
2
3
4
5
6
7
8
9
10
<iq type='result'
from='capulet.lit'
to='juliet@capulet.lit/balcony'
id='disco1'>
<query xmlns='http://jabber.org/protocol/disco#info'>
...
<feature var='urn:xmpp:ping'/>
...
</query>
</iq>

ServiceDiscovery相关信息参考XEP-0030

  • 接下来,注册IQ处理器,令XMLNS为”urn:xmpp:ping”的IQ请求由函数iq_ping处理。iq_ping简单地返回相应响应或者错误。

    1
    2
    3
    4
    5
    6
    7
    iq_ping(_From, _To, #iq{type = Type, sub_el = SubEl} = IQ) ->
    case {Type, SubEl} of
    {get, {xmlelement, "ping", _, _}} ->
    IQ#iq{type = result, sub_el = []};
    _ ->
    IQ#iq{type = error, sub_el = [SubEl, ?ERR_FEATURE_NOT_IMPLEMENTED]}
    end.

    如果send_pings配置为true, mod_ping在ejabberd中注册n以下3个hook函数:

  • sm_register_connection_hook: 它在客户端完成登录验证,建立session信息时调用。

1
2
3
4
5
6
7
8
open_session(SID, User, Server, Resource, Info) ->
set_session(SID, User, Server, Resource, undefined, Info),
mnesia:dirty_update_counter(session_counter,
jlib:nameprep(Server), 1),
check_for_sessions_to_replace(User, Server, Resource),
JID = jlib:make_jid(User, Server, Resource),
ejabberd_hooks:run(sm_register_connection_hook, JID#jid.lserver,
[SID, JID, Info]).
  • sm_remove_connection_hook: 在用户退出,关闭session时调用。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    close_session(SID, User, Server, Resource) ->
    Info = case mnesia:dirty_read({session, SID}) of
    [] -> [];
    [#session{info=I}] -> I
    end,
    F = fun() ->
    mnesia:delete({session, SID}),
    mnesia:dirty_update_counter(session_counter,
    jlib:nameprep(Server), -1)
    end,
    mnesia:sync_dirty(F),
    JID = jlib:make_jid(User, Server, Resource),
    ejabberd_hooks:run(sm_remove_connection_hook, JID#jid.lserver,
    [SID, JID, Info]).
  • user_send_packet: 在C2S进程收到客户端发送的消息时被调用。

sm_register_connection_hook的hook函数user_onlineuser_send_packet的hook函数user_send都会调用start_ping函数。

1
2
3
start_ping(Host, JID) ->
Proc = gen_mod:get_module_proc(Host, ?MODULE),
gen_server:cast(Proc, {start_ping, JID}).

start_ping向该HOST的worker进程发送一个{start_ping, JID}消息。worker进程调用handle_cast进行处理:

1
2
3
handle_cast({start_ping, JID}, State) ->
Timers = add_timer(JID, State#state.ping_interval, State#state.timers),
{noreply, State#state{timers = Timers}};

handle_cast调用add_timer为该客户端创建一个timer。

1
2
3
4
5
6
7
8
9
10
11
add_timer(JID, Interval, Timers) ->
LJID = jlib:jid_tolower(JID),
NewTimers = case ?DICT:find(LJID, Timers) of
{ok, OldTRef} ->
cancel_timer(OldTRef),
?DICT:erase(LJID, Timers);
_ ->
Timers
end,
TRef = erlang:start_timer(Interval * 1000, self(), {ping, JID}),
?DICT:store(LJID, TRef, NewTimers).

由于用户每次发送消息时都会调用add_timer函数,因而add_timer中需要检查之前是否已经存在timer。如果存在,则先取消旧的timer, 再创建新的Timer。

当timer超时后,即客户若干时间内没有活动,进程收到{ping, JID}消息,此时ejabberd应向客户端发送PING消息。进程调用handle_info处理。

1
2
3
4
5
6
7
8
9
10
11
handle_info({timeout, _TRef, {ping, JID}}, State) ->
IQ = #iq{type = get,
sub_el = [{xmlelement, "ping", [{"xmlns", ?NS_PING}], []}]},
Pid = self(),
F = fun(Response) ->
gen_server:cast(Pid, {iq_pong, JID, Response})
end,
From = jlib:make_jid("", State#state.host, ""),
ejabberd_local:route_iq(From, JID, IQ, F),
Timers = add_timer(JID, State#state.ping_interval, State#state.timers),
{noreply, State#state{timers = Timers}};

handle_info创建IQ消息后,设置回调函数F,调用ejabberd_local:route_iq/4消息IQ消息发送给客户端。当收到该IQ消息的响应或者超过32秒依然没有收到客户端的响应,回调函数F将会被调用。如果响应超时,Response为timeout,F将向进程发送{iq_pong, JID, timeout}消息。进程调用handle_cast处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
handle_cast({iq_pong, JID, timeout}, State) ->
Timers = del_timer(JID, State#state.timers),
ejabberd_hooks:run(user_ping_timeout, State#state.host, [JID]),
case State#state.timeout_action of
kill ->
#jid{user = User, server = Server, resource = Resource} = JID,
case ejabberd_sm:get_session_pid(User, Server, Resource) of
Pid when is_pid(Pid) ->
ejabberd_c2s:stop(Pid);
_ ->
ok
end;
_ ->
ok
end,
{noreply, State#state{timers = Timers}};

如果timeout_action设置为kill, 则调用ejabberd_c2s:stop关闭相应的客户端连接。

因为在sm_remove_connection_hook注册了hook函数user_offline, 当用户退出时会调用stop_ping函数,向worker进程发送{stop_ping, JID}消息。

1
2
3
stop_ping(Host, JID) ->
Proc = gen_mod:get_module_proc(Host, ?MODULE),
gen_server:cast(Proc, {stop_ping, JID}).

worker进程调用del_timer函数将该客户端的timer删除。

1
2
3
handle_cast({stop_ping, JID}, State) ->
Timers = del_timer(JID, State#state.timers),
{noreply, State#state{timers = Timers}};
1
2
3
4
5
6
7
8
9
del_timer(JID, Timers) ->
LJID = jlib:jid_tolower(JID),
case ?DICT:find(LJID, Timers) of
{ok, TRef} ->
cancel_timer(TRef),
?DICT:erase(LJID, Timers);
_ ->
Timers
end.

模块及进程停止的逻辑与模块和进程初始化的逻辑相反,本文略过。