ngx_http_limit_conn_module模块用来限制某个KEY的并发连接数。它的实现与ngx_http_limit_req_module模块类似,整体逻辑和实现更为简单。LimitConn模块也将某个KEY的信息存储在共享内存RBTREE中的节点中, 但不需要QUEUE结构。LimitConn模块只需要在节点中记录当前的连接数信息:
1 | typedef struct { |
Keep learning, keep living...
ngx_http_limit_conn_module模块用来限制某个KEY的并发连接数。它的实现与ngx_http_limit_req_module模块类似,整体逻辑和实现更为简单。LimitConn模块也将某个KEY的信息存储在共享内存RBTREE中的节点中, 但不需要QUEUE结构。LimitConn模块只需要在节点中记录当前的连接数信息:
1 | typedef struct { |
LVS包转发功能由内核模块IPVS实现。Keepalived的Check进程周期性地对后端RealServer进行健康检测,根据检测结果摘除或恢复。摘除和恢复RealServer等操作本质上为Keepalived这个用户态进程与IPVS内核模块的通信操作。
libipvs封装了用户态程序对内核模块IPVS可以进行的操作,如:
ngx_http_limit_req_module用于依据指定的KEY来限制请求处理速度。比如,用来限制单个IP的访问频率。
文档地址: http://nginx.org/en/docs/http/ngx_http_limit_req_module.html
下面来看具体实现:
NGINX模块从逻辑中主要有两部分:
ejbberd中多个模块或组件都允许管理员在配置文件中基于用户JID进行访问限制, 如在ejabberd_c2s模块中禁用某用户.
ejabberd实现了一套通用的ACL机制来满足各模块的需求.
配置方法如下:
添加acl规则, 如:
1 | {acl, blocked, {user, "test"}}. |
acl元组第2个元素为该条acl规则的名称, 第3个元素为JID的过滤规则. 示例中的过滤规则表示用户JID的User部分为”test”.
添加access规则, 如:
1 | {access, c2s, [{deny, blocked}, {allow, all}]}. |
access元组第2个元素为access规则的名称, 第3个元素中的每个元素为一个指定值和一个acl规则名字.
当JID满足某条acl规则时, 该条access规则的值则为该acl规则的对应值. 示例中, 当用户JID中的User部分为”test”时, access规则c2s值为deny
, 否则为allow
.
为模块或组件指定access规则(不同的模块或组件所用的配置指令可能不同) 如:
1 | {5222, ejabberd_c2s, [ |
按示例配置后, ejabberd_c2s会根据access规则c2s的值禁用相应用户, 如JID中User为”test”的用户全部被禁用.
PowerDNS中DNS解析由各类Backend模块处理。如果解析相关的数据存储于MySQL, Postgres等数据库,Backend需要在这些数据库中查询相应的记录是否存在。查询数据库的性能很低。因而PowerDNS中实现了PacketCache来提高性能。PowerDNS接收到请求后,先在PacketCache中查询是否已经有相应的DNS响应。如果有则直接返回该缓存。否则交由backend处理,处理后再添加到PacketCache中。
PacketCache实现主要位于packetcache.hh和packetcache.cc中。
flashcache是Facebook开源的用SSD来缓存机械磁盘数据的Linux内核模块。
简单来说,Linux的I/O栈层次结构可以表示为:
1 | +----------------------------+ |
文件系统I/O操作传递给块设备,抽象块设备最终传递给真实的设备驱动,完成I/O操作。Linux在块设备层实现了Device Mapper(DM)的映射机制。DM机制可以将一个或多个块设备再映射成一个虚拟块设备。具体的映射规则由mapping table来实现。到达虚拟块设备的I/O请求,DM机制会根据映射表找到正确的目标设备,将请求放到目标设备的请求队列中,目标设备根据目标类型进行处理。结构图如下:
1 | +---------------+ |
DM机制的映射目标类型可以通过内核模块进行扩展,flashcache就是通过定义一种名称为”flashcache”的映射目标类型来实现SSD做为机械磁盘的缓存。结构示意图如下:
1 | +---------------+ |
flashcache做为缓存层,提供了三种写入模式:
三种模式的区别如下图所示:
1 | writethrough writearound writeback |
下面分析flashcache模块的初始化。
flashcache内核模块的入口函数为flashcache_init。
1 | module_init(flashcache_init); |
flashcache_init首先会对job相关的变量及结构进行初始化:
1 | r = flashcache_jobs_init(); |
flashcache_jobs_init会分配job操作所需的内存池,简化逻辑如下:
1 | static int |
然后,flashcache_init调用dm_io_client_create创建一个dm_io_client结构,它为块设备I/O请求执行过程提供内存池。
1 | flashcache_io_client = dm_io_client_create(); |
接着,flashcache_init调用dm_kcopyd_client_create创建了一个dm_kcopyd_client结构。kcopyd是一个内核进程,它用于异步地copy一个块设备的区域到一个或多个其他的块设备。dm_kcopyd_client用于向kcopyd提交任务。
1 | flashcache_kcp_client = dm_kcopyd_client_create(NULL); |
然后,初始化了一个内核工作队列,执行do_work去处理各种job, 本文略过。
1 | INIT_WORK(&_kcached_wq, do_work); |
接着,注册flashcache的DM机制目标类型, 目标类型中具体回调本文略过。
1 | r = dm_register_target(&flashcache_target); |
再来看flashcache_init的最后部分:
1 | flashcache_module_procfs_init(); |
首先,调用flashcache_module_procfs_init创建”/proc/flashcache”目录及文件”/proc/flashcache/flashcache_version”。
flachcache_module_procfs_init简化逻辑如下:
1 | void |
最后,通过register_reboot_notifier注册函数flashcache_notify_reboot。这函数会在机器重启或关闭时被调用。
1 | static struct notifier_block flashcache_notifier = { |
至此,flashcache内核模块初始化完成,可以使用dmsetup指定flashcache目标类型创建虚拟块设备了。
ejabberd_config模块负责加载ejabberd配置文件,存储相应的配置选项,并提供添加和获取配置选项的API。
比如, ejabberd_app:start_modules函数会使用ejabber_config:get_local_option获取配置文件中的modules选项:
1 | %% Start all the modules in all the hosts |
下面分析ejbberd_config模块实现。
ejabberd启动时,ejabberd_app:start/0会调用ejabberd_config:start/0。
1 | start() -> |
start函数首先创建config和local_config两个mnesia表,接着调用get_ejabberd_config_path获取配置文件路径。
1 | get_ejabberd_config_path() -> |
get_ejabberd_config_path首先使用application:get_env从ejabberd.app的env或erlang命令行中的config选项中获取值:
如:
1 | {env, [{config, "/etc/ejabberd/ejabberd.cfg"]} |
或者:
1 | erl -config "/path/to/ejabberd.cfg" |
如果没有设置这两个选项,则尝试从系统环境变量”EJABBERD_CONFIG_PATH”读取文件路径。ejabberdctl会设置该环境变量。若也没有设置该环境变量,get_ejabberd_config_path则返回?CONFIG_PATH
, 这个宏被定义为”ejabberd.cfg”。
1 | -define(CONFIG_PATH, "ejabberd.cfg"). |
接下来, start函数调用load_file来加载配置文件:
1 | load_file(File) -> |
load_file首先调用get_plain_terms_file来获取所有配置选项的列表。
1 | get_plain_terms_file(File1) -> |
我们来看get_plain_terms_file实现。首先,调用get_absolute_path,期望得到配置文件的绝对路径。不过,get_absolute_path实现上存在BUG:
1 | get_absolute_path(File) -> |
当File为相对路径时,使用filename:absname_join不能得到绝对路径。我提了一个PATCH,使用当前目录来转成绝对路径。官方已接受。
PATCH地址:
https://github.com/processone/ejabberd/commit/62ccf1cf0e13954ee5207bc6288afbc669247d14
接着,get_plain_terms_file调用file:consult读取配置文件中的所有Erlang Terms到列表中,再调用include_config_files函数来处理include_config_file选项。
1 | include_config_files([{include_config_file, Filename, Options} | Terms], Res) -> |
include_config_file选项格式为:
1 | {include_config_file, [{disallow, foo}, {allow_only, bar}], "/path/to/included_config"}. |
include_config_files递归调用get_plain_terms_file获取被引用的配置文件中所有配置,接着检查include_config_file选项中是否有disallow选项。如果有,调用delete_disallowed将disallow指定的配置选项从被引用文件的配置列表中删除。接着检查其中是否存在allow_only选项,如果有,则调用keep_only_allowed只保留下allow_only中指定的配置,将其和外部配置合并,再递归调用include_config_files/2处理剩余的选项,最终返回所有配置文件中所有选项列表。
1 | include_config_files([], Res) -> |
load_file接着遍历配置列表调用search_host, 最终调用add_option来添加hosts选项。
1 | add_option(Opt, Val, State) -> |
add_option将指定选项以Key/Value形式添加进状态结构的opts域中。其中,hosts和language使用记录config, 其他选项使用local_config。这与最初创建的MNESIA表相对应。
状态结构如下:
1 | -record(state, {opts = [], |
接下来,load_file调用replace_macros来替换配置中的宏为相应的值。我们来看replace_macros实现。
1 | replace_macros(Terms) -> |
首先, 调用split_terms_macros将宏选项和其他选项分开。
1 | split_terms_macros(Terms) -> |
宏定义选项格式为:
1 | {define_macro, 'KEY', bar}. |
其中key必须为atom类型且必须全部为大写字母,得到的宏选项列表为{key, value}
格式的列表。
接着, replace_macros调用replace/2。
1 | replace([], _) -> |
replace通过递归对每个选项调用replace_term/2。
1 | replace_term(Key, Macros) when is_atom(Key) -> |
replace_term遍历选项中的所有子项,如果在宏列表中查找到相应的值,则替换该子项为找到的值。
另外,如果配置中某子项指定了{use_macro, Key, Value}这种格式的配置,在替换时优先从宏列表中查找相应的值,找不到再使用use_macro指定的Value来替换。
至此,获得了所有配置选项的列表,接下来对每个选项调用process_term。
选项存储主要分为3种类型, process_term分别进行不同处理:
override_global
选项:1 | override_global -> |
max_fsm_queue
选项:1 | {max_fsm_queue, N} -> |
domain_certfile
选项:1 | {domain_certfile, Domain, CertFile} -> |
1 | {host_config, Host, Terms} -> |
1 | {_Opt, _Val} -> |
1 | {dummy_config, [foo, bar]}. |
1 | ejabberd_config:get_local_option({dummy_config, Host}) |
此外,acl
, access
和shaper
三个选项比较特殊。acl
存储acl记录结构在状态结构的opts域中, 在set_opts中这些记录会被写入acl表中。access
和shaper
选项的KEY中除了选项名,HOST,还包括了规则名称。
至此,load_file将所有选项保存到了状态结构的opts域中,最后调用set_opts进行存储:
1 | set_opts(State) -> |
如果配置了override_global
, override_local
, override_acls
选项,set_opts首先会分别删除表config, local_config和acl中的所有内容。接着分别将状态结构opts域中的配置写入config, local_config和acl三个表中。
配置加载过程结束。
ejabberd_config模块提供了添加和查询选项的API:
1 | add_global_option(Opt, Val) -> |
add_global_option和add_local_option分别向config和local_config表中添加选项记录。get_global_option和get_local_option直接使用ets:lookup
查找相应配置。这是由于mnesia底层由ETS实现,直接使用ets:lookup
性能会更高。不过,我个人不太欣赏这种写法。
ejabberd中的hook机制是ejabberd XMPP模块的基础。XMPP模块需要根据需求在相应的hook点上注册自己的处理函数,在处理函数的逻辑中实现需求。ejabberd执行到hook点时,会按注册的顺序号由小到大来执行各模块所注册的处理函数。
下面来分析具体实现。
ejabberd启动时,ejabberd_sup:init/1会通过调用ejabberd_hooks:start_link/0启动名称为ejabberd_hooks的worker进程。
1 | init([]) -> |
ejabberd_hooks进程初始化时执行init/1函数创建了名为hooks的ETS表。这个表用来存储在各注册点和域名下注册的hook函数。
1 | init([]) -> |
模块一般使用ejabberd_hooks:add/5注册hook函数。
1 | add(Hook, Host, Module, Function, Seq) -> |
参数:
add函数发送add
消息给ejabberd_hooks进程。ejabberd_hooks进程调用handle_call处理消息。
1 | handle_call({add, Hook, Host, Module, Function, Seq}, _From, State) -> |
handle_call首先从hooks表中查找该hook点和域名下是否已经注册了函数。若不存在,则将顺序号、模块、函数名添加到表中。若已存在,再检查是否为重复添加。如果不是,则将顺序号、模块、函数名和之前的函数信息按顺序号排序后一并添加。
如果hook函数必须在集群内特定节点上执行,可以调用ejabberd_hooks:add_dist注册。它的处理逻辑与add函数类似,只是在hooks表中多存储了node信息,此处略过。
当需要删除hook函数时(一般是模块停止时),调用ejabberd_hooks:delete/5。
1 | delete(Hook, Host, Module, Function, Seq) -> |
delete函数发送delete
消息给ejabberd_hooks进程。进程执行handle_call处理。
1 | handle_call({delete, Hook, Host, Module, Function, Seq}, _From, State) -> |
handle_call从hooks表中获取注册在该hook点和域名上的所有函数,从中删除指定的函数,再将结果保存。
删除注册在特定节点上的函数要使用delete_dist,处理逻辑类似,略过。
ejabberd执行到hook点时会调用ejabberd_hooks:run/3或ejabberd_hooks:run_fold/4来执行注册的HOOK函数。如果这个hook点不关心各hook函数的返回结果,则调用run函数,否则调用run_fold函数。
首先看run函数:
1 | run(Hook, Host, Args) -> |
run函数从hooks表中查找注册在该hook点和域名上的所有函数,然后调用run1/3。
1 | run1([{_Seq, Module, Function} | Ls], Hook, Args) -> |
run1依次执行注册的hook函数,如果某个hook函数返回stop
, 则run1结束返回,之后的hook函数不再被执行。
如果需要执行的是注册在某个节点上的hook函数,则通过rpc:call在该节点上执行函数,其他逻辑类似。
1 | run1([{_Seq, Node, Module, Function} | Ls], Hook, Args) -> |
再来看run_fold函数。和run函数相比,run_fold还需要一个参数,表示默认的返回结果。
1 | run_fold(Hook, Host, Val, Args) -> |
run_fold首先找到注册在该hook点和域名上的所有函数,如果没有,则返回默认结果。否则调用run_fold1。
1 | run_fold1([{_Seq, Module, Function} | Ls], Hook, Val, Args) -> |
run_fold1会将传入的结果Val
(或者来自默认结果,或者来自前一hook函数的返回结果)和参数Args
组成新的lists做为参数传给hook函数,依次递归调用。若某hook函数返回stop
,结束递归调用,返回stopped
。若hook函数返回{stop, NewVal}
,则直接返回该hook函数的结果NewVal
。这两种情况下,其余的hook函数不再被执行。否则,返回结果做为Val参数再次递归调用run_fold1。
注册在特定节点上的函数处理逻辑类似,只是使用rpc:call在相应节点上执行,略过。
具体的hook点和hook函数原型可以参考官方文档:
https://www.process-one.net/en/wiki/ejabberd_events_and_hooks/
这个文档写地不是很详细。不确定的地方需要参考源码。
当这些内置hook点不能满足需求时,可以在ejabberd中合适位置调用ejabberd_hooks:run或ejabberd_hooks:run_fold添加hook点。
如:
1 | ejabberd_hooks:run(dummy_hook, []), |
另外,需要注意的有:
ejabberd执行某些hook点时,调用不同参数版本的run或run_fold。这种情况Host参数为global
。注册这种hook点时,Host参数也应该使用global
。
如:
1 | case ejabberd_hooks:run_fold(filter_packet, |
1 | ejabberd_hooks:add(filter_packet, global, ?MODULE, on_filter_packet, 120), |
注: 文中代码版本为:ejabberd-2.1.13。
TokyoCabinet(TC)提供了6种不同结构的数据库,包括:
每种数据库都有各自一套API来进行各种操作。
为了简化使用,TC还提供了一套通用的API来操作以上所有类型数据库,叫做Abstract Database API.
Abstract Database API通过数据库名称来区分各类型数据库:
不仅如此,TC更进一步进行了抽象,在Abstract Database中还提供了一种Skeleton Database。
通过实现Skeleton Database指定的API,可以使用自定义的数据库类型。
Skeleton Database API结构体如下:
1 | typedef struct { /* type of structure for a extra database skeleton */ |
各成员与其它类型API相应成员意义一致。在开发时,只需实现功能必需的相应函数,忽略其他成员。
使用示例:
1 | ADBSKEL skel; |
为了解决多进程共享访问和远程访问TC数据库的不便与繁琐,TC作者开发了一个网络访问层,叫做TokyoTyrant(TT)。它使用TC的Abstract Database API来访问TC数据库。因而内置支持skeleton database扩展。
TT提供了-skel命令行选项来指定skeleton database,启动时它会加载传入的Shared Object(SO)文件,使用SO中定制的数据库实现。
1 | ttserver -skel ttskelfoo.so |
我们可以根据需求实现特定的SO文件,就可以完整利用TT本身已经实现的各种特性,如主备同步,memcache协议支持,HTTP协议支持等。在性能满足需求的情况,这将大大减少开发量。SO文件必须导出一个名字为initialize的函数,TT启动时会从SO文件中查找该函数来初始化skeleton database。
该函数原型为:
1 | bool (*initfunc)(ADBSKEL *); |
该函数传入一个指向skeleton database的指针。initialize函数中需要将skeleton database定制的数据库操作的API实现赋值到相应函数指针。
由于initialize函数没有参数传递TT本身相关信息,如命令行选项,配置结构等,而TT将一些信息存储在全局变量g_serv指向的TTSERV结构体中,因而SO中可以声明g_serv外部变量来引用。
1 | extern TTSERV *g_serv; |
不过较为遗憾的是TTSERV中的信息较少,如有需要的话可以自行扩展。如果SO中逻辑需要依赖命令行选项,可以通过使用启动TT时传入的数据库名来做不同处理。skeleton database的open函数会传入该参数。
opentracker是一个开源P2P tracker服务器.之前我们系统中主要使用的是PHP+MYSQL实现的peertracker。随着业务增长,peertracker的性能已经不能满足系统需要。因而我们决定引入性能更好的opentracker。不过, opentracker在功能上并不能完全满足我们的需求,因而我对它进行了一些扩展。
opentracker本身支持cluster模式。cluster内各节点之间会同步数据。这样可以通过添加节点提高整体的集群性能。然而,opentracker原生通过UDP多播进行数据同步。我们不具备多播IP,因而开发了单播模式进行同步。实现非常简单,就是依次向cluster内其他节点发送数据。缺点是当节点数较多时,会影响性能。
opentracker将torrent和peer信息保存在内存中。当opentracker重启时,所有的torrent和peer信息就都丢失了。这会导致我们的系统一段时间内不能进行正常的P2P传输。因而我扩展了持久化功能。opentracker架构上通过多个不同的线程执行不同的任务。我添加了一个线程,周期性地将内存中的torrent和peer信息保存到磁盘文件中。这个线程很像Redis中进行数据dump的进程。磁盘文件格式定义为ODB(opentracker database),它主要借鉴自Redis的RDB格式。目前,没有处理IPV6格式,因而只支持IPV4。
我还提供了一个工具支持流式地对odb文件进行处理。
具体用法请参考: perldoc OdbParser
格式规范:
1 | ----------------------------------- # ODB is a binary format. There are no new lines or spaces in the file. |
由于opentracker响应内容是按BENCODE编码过的,调试时不太方便。因而扩展了一个返回human-readable内容的调试接口。