最近在鋒爺?shù)慕ㄗh下開始讀rabbitmq的源碼,鋒爺說這個項目已經(jīng)很成熟,并且代碼也很有借鑒和學習的意義,在自己寫erlang代碼之前看看別人是怎么寫的,可以少走彎路,避免養(yǎng)成一些不好的習慣,學習一些最佳實踐。讀了一個星期,這個項目果然非常棒,代碼也寫的非常清晰易懂,一些細節(jié)的處理上非常巧妙,比如我這里想分享的網(wǎng)絡層一節(jié)。
Rabbitmq是一個MQ系統(tǒng),也就是消息中間件,它實現(xiàn)了AMQP 0.8規(guī)范,簡單來說就是一個TCP的廣播服務器。AMQP協(xié)議,你可以類比JMS,不過JMS僅僅是java領域內(nèi)的API規(guī)范,而AMQP比JMS更進一步,它有自己的wire-level protocol,有一套可編程的協(xié)議,中立于語言。簡單介紹了Rabbitmq之后,進入正題。
Rabbitmq充分利用了Erlang的分布式、高可靠性、并發(fā)等特性,首先看它的一個結構圖:
這張圖展現(xiàn)了Rabbitmq的主要組件和組件之間的關系,具體到監(jiān)控樹的結構,我畫了一張圖:
頂層是rabbit_sup
supervisor,它至少有兩個子進程,一個是rabbit_tcp_client_sup,用來監(jiān)控每個connection的處理進程
rabbit_reader的supervisor;rabbit_tcp_listener_sup是監(jiān)控tcp_listener和
tcp_acceptor_sup的supervisor,tcp_listener里啟動tcp服務器,監(jiān)聽端口,并且通過tcp_acceptor_sup啟動N個tcp_accetpor,tcp_acceptor發(fā)起accept請求,等待客戶端連接;tcp_acceptor_sup負責監(jiān)控這些acceptor。這張圖已經(jīng)能給你一個大體的印象。
講完大概,進入細節(jié),說說幾個我覺的值的注意的地方:
1、
tcp_accepto.erl,r對于accept采用的是異步方式,利用
prim_inet:async_accept/2方
法,此模塊沒有被文檔化,是otp庫內(nèi)部使用,通常來說沒必要使用這一模塊,gen_tcp:accept/1已經(jīng)足夠,不過rabbitmq是廣播程
序,因此采用了異步方式。使用async_accept,需要打patch,以使得socket好像我們從gen_tcp:accept/1得到的一樣:
handle_info({inet_async, LSock, Ref, {ok, Sock}},
State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
%%這里做了patch
%% patch up the socket so it looks like one we got from
%% gen_tcp:accept/1
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
try
%% report
{Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
{PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
[inet_parse:ntoa(Address), Port,
inet_parse:ntoa(PeerAddress), PeerPort]),
%% 調(diào)用回調(diào)模塊,將Sock作為附加參數(shù)
apply(M, F, A ++ [Sock])
catch {inet_error, Reason} ->
gen_tcp:close(Sock),
error_logger:error_msg("unable to accept TCP connection: ~p~n",
[Reason])
end,
%% 繼續(xù)發(fā)起異步調(diào)用
case prim_inet:async_accept(LSock, -1) of
{ok, NRef} -> {noreply, State#state{ref=NRef}};
Error -> {stop, {cannot_accept, Error}, none}
end;
%%處理錯誤情況
handle_info({inet_async, LSock, Ref, {error, closed}},
State=#state{sock=LSock, ref=Ref}) ->
%% It would be wrong to attempt to restart the acceptor when we
%% know this will fail.
{stop, normal, State};
2、
rabbitmq內(nèi)部是使用了多個并發(fā)acceptor,這在高并發(fā)下、大量連接情況下有效率優(yōu)勢,
類似java現(xiàn)在的nio框架采用多個reactor類似,查看tcp_listener.erl:
init({IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
{M,F,A} = OnStartup, OnShutdown, Label}) ->
process_flag(trap_exit, true),
case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
{active, false}]) of
{ok, LSock} ->
%%創(chuàng)建ConcurrentAcceptorCount個并發(fā)acceptor
lists:foreach(fun (_) ->
{ok, _APid} = supervisor:start_child(
AcceptorSup, [LSock])
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
{ok, {LIPAddress, LPort}} = inet:sockname(LSock),
error_logger:info_msg("started ~s on ~s:~p~n",
[Label, inet_parse:ntoa(LIPAddress), LPort]),
%%調(diào)用初始化回調(diào)函數(shù)
apply(M, F, A ++ [IPAddress, Port]),
{ok, #state{sock = LSock,
on_startup = OnStartup, on_shutdown = OnShutdown,
label = Label}};
{error, Reason} ->
error_logger:error_msg(
"failed to start ~s on ~s:~p - ~p~n",
[Label, inet_parse:ntoa(IPAddress), Port, Reason]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
這里有一個技巧,如果要循環(huán)N次執(zhí)行某個函數(shù)F,可以通過lists:foreach結合lists:duplicate(N,dummy)來處理。
lists:foreach(fun(_)-> F() end,lists:duplicate(N,dummy)).
3、
simple_one_for_one策略的使用,可以看到對于tcp_client_sup和tcp_acceptor_sup都采用了simple_one_for_one策略,而非普通的one_fo_one,這是為什么呢?
這牽扯到simple_one_for_one的幾個特點:
1)simple_one_for_one內(nèi)部保存child是使用dict,而其他策略是使用list,因此simple_one_for_one更適合child頻繁創(chuàng)建銷毀、需要大量child進程的情況,具體來說例如網(wǎng)絡連接的頻繁接入斷開。
2)使用了simple_one_for_one后,無法調(diào)用terminate_child/2 delete_child/2 restart_child/2
3)start_child/2
對于simple_one_for_one來說,不必傳入完整的child
spect,傳入?yún)?shù)list,會自動進行
參數(shù)合并。
在一個地方定義好child
spec之后,其他地方只要start_child傳入?yún)?shù)即可啟動child進程,簡化child都是同一類型進程情況下的編程。
在
rabbitmq中,tcp_acceptor_sup的子進程都是tcp_acceptor進程,在tcp_listener中是啟動了
ConcurrentAcceptorCount個tcp_acceptor子進程,通過supervisor:start_child/2方法:
%%創(chuàng)建ConcurrentAcceptorCount個并發(fā)acceptor
lists:foreach(fun (_) ->
{ok, _APid} =
supervisor:start_child(
AcceptorSup, [LSock])
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
注意到,這里調(diào)用的start_child只傳入了
LSock一個參數(shù),另一個參數(shù)CallBack是在定義child spec的時候傳入的,參見tcp_acceptor_sup.erl:
init(Callback) ->
{ok, {{simple_one_for_one, 10, 10},
[{tcp_acceptor, {tcp_acceptor, start_link, [
Callback]},
transient, brutal_kill, worker, [tcp_acceptor]}]}}.
Erlang內(nèi)部自動為simple_one_for_one做了
參數(shù)合并,最后調(diào)用的是tcp_acceptor的init/2:
init({
Callback,
LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {ok, #state{callback=Callback, sock=LSock, ref=Ref}};
Error -> {stop, {cannot_accept, Error}}
end.
對于tcp_client_sup的情況類似,tcp_client_sup監(jiān)控的子進程都是rabbit_reader類型,在
rabbit_networking.erl中啟動tcp_listenner傳入的處理connect事件的回調(diào)方法是是
rabbit_networking:start_client/1:
start_tcp_listener(Host, Port) ->
start_listener(Host, Port, "TCP Listener",
%回調(diào)的MFA
{
?MODULE, start_client, []}).
start_client(Sock) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = rabbit_net:controlling_process(Sock, Child),
Child ! {go, Sock},
Child.
start_client調(diào)用了supervisor:start_child/2來動態(tài)啟動rabbit_reader進程。
4、
協(xié)議的解析,消息的讀取這部分也非常巧妙,這一部分主要在rabbit_reader.erl中,對于協(xié)議的解析沒有采用gen_fsm,而是實現(xiàn)了一個巧妙的狀態(tài)機機制,核心代碼在mainloop/4中:
%啟動一個連接
start_connection(Parent, Deb, ClientSock) ->
process_flag(trap_exit, true),
{PeerAddressS, PeerPort} = peername(ClientSock),
ProfilingValue = setup_profiling(),
try
rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]),
%延時發(fā)送握手協(xié)議
Erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
%進入主循環(huán),更換callback模塊,魔法就在這個switch_callback
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
connection = #connection{
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none},
callback = uninitialized_callback,
recv_ref = none,
connection_state = pre_init},
%%注意到這里,handshake就是我們的回調(diào)模塊,8就是希望接收的數(shù)據(jù)長度,AMQP協(xié)議頭的八個字節(jié)。
handshake, 8))
魔法就在switch_callback這個方法上:
switch_callback(OldState, NewCallback, Length) ->
%發(fā)起一個異步recv請求,請求Length字節(jié)的數(shù)據(jù)
Ref = inet_op(fun () -> rabbit_net:async_recv(
OldState#v1.sock, Length, infinity) end),
%更新狀態(tài),替換ref和處理模塊
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
異步接收Length個數(shù)據(jù),如果有,erlang會通知你處理。處理模塊是什么概念呢?其實就是一個狀態(tài)的概念,表示當前協(xié)議解析進行到哪一步,起一個label的作用,看看mainloop/4中的應用:
mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
%%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
receive
%%接收到數(shù)據(jù),交給handle_input處理,注意handle_input的第一個參數(shù)就是callback
{inet_async, Sock, Ref, {ok, Data}} ->
%handle_input處理
{State1, Callback1, Length1} =
handle_input(State#v1.callback, Data,
State#v1{recv_ref = none}),
%更新回調(diào)模塊,再次發(fā)起異步請求,并進入主循環(huán)
mainloop(Parent, Deb,
switch_callback(State1, Callback1, Length1));
handle_input有多個分支,每個分支都對應一個處理模塊,例如我們剛才提到的握手協(xié)議:
%handshake模塊,注意到第一個參數(shù),第二個參數(shù)就是我們得到的數(shù)據(jù)
handle_input(
handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
State = #v1{sock = Sock, connection = Connection}) ->
%檢測協(xié)議是否兼容
case check_version({ProtocolMajor, ProtocolMinor},
{?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
true ->
{ok, Product} = application:get_key(id),
{ok, Version} = application:get_key(vsn),
%兼容的話,進入connections start,協(xié)商參數(shù)
ok = send_on_channel0(
Sock,
#'connection.start'{
version_major = ?PROTOCOL_VERSION_MAJOR,
version_minor = ?PROTOCOL_VERSION_MINOR,
server_properties =
[{list_to_binary(K), longstr, list_to_binary(V)} ||
{K, V} <-
[{"product", Product},
{"version", Version},
{"platform", "
{"copyright", ?COPYRIGHT_MESSAGE},
{"information", ?INFORMATION_MESSAGE}]],
mechanisms = <<"PLAIN AMQPLAIN">>,
locales = <<"en_US">> }),
{State#v1{connection = Connection#connection{
timeout_sec = ?NORMAL_TIMEOUT},
connection_state = starting},
frame_header, 7};
%否則,斷開連接,返回可以接受的協(xié)議
false ->
throw({bad_version, ProtocolMajor, ProtocolMinor})
end;
其他協(xié)議的處理也是類似,通過動態(tài)替換callback的方式來模擬狀態(tài)機做協(xié)議的解析和數(shù)據(jù)的接收,真的很巧妙!讓我們體會到Erlang的魅力,F(xiàn)P的魅力。
5、序列圖:
1)tcp server的啟動過程:
2)一個client連接上來的處理過程:
小結:從上面的分析可以看出,rabbitmq的網(wǎng)絡層是非常健壯和高效的,通過層層監(jiān)控,對每個可能出現(xiàn)的風險點都做了考慮,并且利用了prim_net模塊做異步IO處理。分層也是很清晰,將業(yè)務處理模塊隔離到client_sup監(jiān)控下的子進程,將網(wǎng)絡處理細節(jié)和業(yè)務邏輯分離。在協(xié)議的解析和業(yè)務處理上雖然沒有采用gen_fsm,但是也實現(xiàn)了一套類似的狀態(tài)機機制,通過動態(tài)替換Callback來模擬狀態(tài)的變遷,非常巧妙。如果你要實現(xiàn)一個tcp server,強烈推薦從rabbitmq中扣出這個網(wǎng)絡層,你只需要實現(xiàn)自己的業(yè)務處理模塊即可擁有一個高效、健壯、分層清晰的TCP服務器。