<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    莊周夢(mèng)蝶

    生活、程序、未來(lái)
       :: 首頁(yè) ::  ::  :: 聚合  :: 管理

    Rabbitmq的網(wǎng)絡(luò)層淺析

    Posted on 2009-11-29 12:00 dennis 閱讀(11894) 評(píng)論(8)  編輯  收藏 所屬分類: erlang源碼解讀
        最近在鋒爺?shù)慕ㄗh下開(kāi)始讀rabbitmq的源碼,鋒爺說(shuō)這個(gè)項(xiàng)目已經(jīng)很成熟,并且代碼也很有借鑒和學(xué)習(xí)的意義,在自己寫erlang代碼之前看看別人是怎么寫的,可以少走彎路,避免養(yǎng)成一些不好的習(xí)慣,學(xué)習(xí)一些最佳實(shí)踐。讀了一個(gè)星期,這個(gè)項(xiàng)目果然非常棒,代碼也寫的非常清晰易懂,一些細(xì)節(jié)的處理上非常巧妙,比如我這里想分享的網(wǎng)絡(luò)層一節(jié)。
        Rabbitmq是一個(gè)MQ系統(tǒng),也就是消息中間件,它實(shí)現(xiàn)了AMQP 0.8規(guī)范,簡(jiǎn)單來(lái)說(shuō)就是一個(gè)TCP的廣播服務(wù)器。AMQP協(xié)議,你可以類比JMS,不過(guò)JMS僅僅是java領(lǐng)域內(nèi)的API規(guī)范,而AMQP比JMS更進(jìn)一步,它有自己的wire-level protocol,有一套可編程的協(xié)議,中立于語(yǔ)言。簡(jiǎn)單介紹了Rabbitmq之后,進(jìn)入正題。
        Rabbitmq充分利用了Erlang的分布式、高可靠性、并發(fā)等特性,首先看它的一個(gè)結(jié)構(gòu)圖:


    這張圖展現(xiàn)了Rabbitmq的主要組件和組件之間的關(guān)系,具體到監(jiān)控樹(shù)的結(jié)構(gòu),我畫了一張圖:







        頂層是rabbit_sup supervisor,它至少有兩個(gè)子進(jìn)程,一個(gè)是rabbit_tcp_client_sup,用來(lái)監(jiān)控每個(gè)connection的處理進(jìn)程 rabbit_reader的supervisor;rabbit_tcp_listener_sup是監(jiān)控tcp_listener和 tcp_acceptor_sup的supervisor,tcp_listener里啟動(dòng)tcp服務(wù)器,監(jiān)聽(tīng)端口,并且通過(guò)tcp_acceptor_sup啟動(dòng)N個(gè)tcp_accetpor,tcp_acceptor發(fā)起accept請(qǐng)求,等待客戶端連接;tcp_acceptor_sup負(fù)責(zé)監(jiān)控這些acceptor。這張圖已經(jīng)能給你一個(gè)大體的印象。
       
        講完大概,進(jìn)入細(xì)節(jié),說(shuō)說(shuō)幾個(gè)我覺(jué)的值的注意的地方:
    1、tcp_accepto.erl,r對(duì)于accept采用的是異步方式,利用prim_inet:async_accept/2方 法,此模塊沒(méi)有被文檔化,是otp庫(kù)內(nèi)部使用,通常來(lái)說(shuō)沒(méi)必要使用這一模塊,gen_tcp:accept/1已經(jīng)足夠,不過(guò)rabbitmq是廣播程 序,因此采用了異步方式。使用async_accept,需要打patch,以使得socket好像我們從gen_tcp:accept/1得到的一樣:

    handle_info({inet_async, LSock, Ref, {ok, Sock}},
                State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
        %%這里做了patch
        %% patch up the socket so it looks like one we got from
        %% gen_tcp:accept/1
        {ok, Mod} = inet_db:lookup_socket(LSock),
        inet_db:register_socket(Sock, Mod),

        try
            %% report
            {Address, Port}         = inet_op(fun () -> inet:sockname(LSock) end),
            {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
            error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
                                  [inet_parse:ntoa(Address), Port,
                                   inet_parse:ntoa(PeerAddress), PeerPort]),
            %% 調(diào)用回調(diào)模塊,將Sock作為附加參數(shù)
            apply(M, F, A ++ [Sock])
        catch {inet_error, Reason} ->
                gen_tcp:close(Sock),
                error_logger:error_msg("unable to accept TCP connection: ~p~n",
                                       [Reason])
        end,

        %% 繼續(xù)發(fā)起異步調(diào)用
        case prim_inet:async_accept(LSock, -1) of
            {ok, NRef} -> {noreply, State#state{ref=NRef}};
            Error -> {stop, {cannot_accept, Error}, none}
        end;
    %%處理錯(cuò)誤情況
    handle_info({inet_async, LSock, Ref, {error, closed}},
                State=#state{sock=LSock, ref=Ref}) ->
        %% It would be wrong to attempt to restart the acceptor when we
        %% know this will fail.
        {stop, normal, State};

    2、rabbitmq內(nèi)部是使用了多個(gè)并發(fā)acceptor,這在高并發(fā)下、大量連接情況下有效率優(yōu)勢(shì),類似java現(xiàn)在的nio框架采用多個(gè)reactor類似,查看tcp_listener.erl:

    init({IPAddress, Port, SocketOpts,
          ConcurrentAcceptorCount, AcceptorSup,
          {M,F,A} = OnStartup, OnShutdown, Label}) ->
        process_flag(trap_exit, true),
        case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
                                                 {active, false}]) of
            {ok, LSock} ->
                 %%創(chuàng)建ConcurrentAcceptorCount個(gè)并發(fā)acceptor
                lists:foreach(fun (_) ->
                                      {ok, _APid} = supervisor:start_child(
                                                      AcceptorSup, [LSock])
                              end,
                              lists:duplicate(ConcurrentAcceptorCount, dummy)),

                {ok, {LIPAddress, LPort}} = inet:sockname(LSock),
                error_logger:info_msg("started ~s on ~s:~p~n",
                                      [Label, inet_parse:ntoa(LIPAddress), LPort]),
                %%調(diào)用初始化回調(diào)函數(shù)
                apply(M, F, A ++ [IPAddress, Port]),
                {ok, #state{sock = LSock,
                            on_startup = OnStartup, on_shutdown = OnShutdown,
                            label = Label}};
            {error, Reason} ->
                error_logger:error_msg(
                  "failed to start ~s on ~s:~p - ~p~n",
                  [Label, inet_parse:ntoa(IPAddress), Port, Reason]),
                {stop, {cannot_listen, IPAddress, Port, Reason}}
        end.

    這里有一個(gè)技巧,如果要循環(huán)N次執(zhí)行某個(gè)函數(shù)F,可以通過(guò)lists:foreach結(jié)合lists:duplicate(N,dummy)來(lái)處理。

    lists:foreach(fun(_)-> F() end,lists:duplicate(N,dummy)).

    3、simple_one_for_one策略的使用,可以看到對(duì)于tcp_client_sup和tcp_acceptor_sup都采用了simple_one_for_one策略,而非普通的one_fo_one,這是為什么呢?
    這牽扯到simple_one_for_one的幾個(gè)特點(diǎn):
    1)simple_one_for_one內(nèi)部保存child是使用dict,而其他策略是使用list,因此simple_one_for_one更適合child頻繁創(chuàng)建銷毀、需要大量child進(jìn)程的情況,具體來(lái)說(shuō)例如網(wǎng)絡(luò)連接的頻繁接入斷開(kāi)。
    2)使用了simple_one_for_one后,無(wú)法調(diào)用terminate_child/2 delete_child/2 restart_child/2

    3)start_child/2 對(duì)于simple_one_for_one來(lái)說(shuō),不必傳入完整的child spect,傳入?yún)?shù)list,會(huì)自動(dòng)進(jìn)行參數(shù)合并在一個(gè)地方定義好child spec之后,其他地方只要start_child傳入?yún)?shù)即可啟動(dòng)child進(jìn)程,簡(jiǎn)化child都是同一類型進(jìn)程情況下的編程

    在 rabbitmq中,tcp_acceptor_sup的子進(jìn)程都是tcp_acceptor進(jìn)程,在tcp_listener中是啟動(dòng)了 ConcurrentAcceptorCount個(gè)tcp_acceptor子進(jìn)程,通過(guò)supervisor:start_child/2方法:

    %%創(chuàng)建ConcurrentAcceptorCount個(gè)并發(fā)acceptor
                lists:foreach(fun (_) ->
                                      {ok, _APid} = supervisor:start_child(
                                                      AcceptorSup, [
    LSock])
                              end,
                              lists:duplicate(ConcurrentAcceptorCount, dummy)),

    注意到,這里調(diào)用的start_child只傳入了LSock一個(gè)參數(shù),另一個(gè)參數(shù)CallBack是在定義child spec的時(shí)候傳入的,參見(jiàn)tcp_acceptor_sup.erl:
    init(Callback) ->
        {ok, {{simple_one_for_one, 10, 10},
              [{tcp_acceptor, {tcp_acceptor, start_link, [Callback]},
                transient, brutal_kill, worker, [tcp_acceptor]}]}}.

    Erlang內(nèi)部自動(dòng)為simple_one_for_one做了參數(shù)合并,最后調(diào)用的是tcp_acceptor的init/2:

    init({Callback, LSock}) ->
        case prim_inet:async_accept(LSock, -1) of
            {ok, Ref} -> {ok, #state{callback=Callback, sock=LSock, ref=Ref}};
            Error -> {stop, {cannot_accept, Error}}
        end.

    對(duì)于tcp_client_sup的情況類似,tcp_client_sup監(jiān)控的子進(jìn)程都是rabbit_reader類型,在 rabbit_networking.erl中啟動(dòng)tcp_listenner傳入的處理connect事件的回調(diào)方法是是 rabbit_networking:start_client/1:

    start_tcp_listener(Host, Port) ->
        start_listener(Host, Port, "TCP Listener",
                       %回調(diào)的MFA
                       {?MODULE, start_client, []}).

    start_client(Sock) ->
        {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
        ok = rabbit_net:controlling_process(Sock, Child),
        Child ! {go, Sock},
        Child.

    start_client調(diào)用了supervisor:start_child/2來(lái)動(dòng)態(tài)啟動(dòng)rabbit_reader進(jìn)程。

    4、協(xié)議的解析,消息的讀取這部分也非常巧妙,這一部分主要在rabbit_reader.erl中,對(duì)于協(xié)議的解析沒(méi)有采用gen_fsm,而是實(shí)現(xiàn)了一個(gè)巧妙的狀態(tài)機(jī)機(jī)制,核心代碼在mainloop/4中:
    %啟動(dòng)一個(gè)連接
    start_connection(Parent, Deb, ClientSock) ->
        process_flag(trap_exit, true),
        {PeerAddressS, PeerPort} = peername(ClientSock),
        ProfilingValue = setup_profiling(),
        try
            rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
                            [self(), PeerAddressS, PeerPort]),
             %延時(shí)發(fā)送握手協(xié)議
            Erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
                              handshake_timeout),
            %進(jìn)入主循環(huán),更換callback模塊,魔法就在這個(gè)switch_callback
            mainloop(Parent, Deb, switch_callback(
                                    #v1{sock = ClientSock,
                                        connection = #connection{
                                          user = none,
                                          timeout_sec = ?HANDSHAKE_TIMEOUT,
                                          frame_max = ?FRAME_MIN_SIZE,
                                          vhost = none},
                                        callback = uninitialized_callback,
                                        recv_ref = none,
                                        connection_state = pre_init},
                                    %%注意到這里,handshake就是我們的回調(diào)模塊,8就是希望接收的數(shù)據(jù)長(zhǎng)度,AMQP協(xié)議頭的八個(gè)字節(jié)。
                                    handshake, 8))

    魔法就在switch_callback這個(gè)方法上:
    switch_callback(OldState, NewCallback, Length) ->
        %發(fā)起一個(gè)異步recv請(qǐng)求,請(qǐng)求Length字節(jié)的數(shù)據(jù)
        Ref = inet_op(fun () -> rabbit_net:async_recv(
                                  OldState#v1.sock, Length, infinity) end),
        %更新?tīng)顟B(tài),替換ref和處理模塊
        OldState#v1{callback = NewCallback,
                    recv_ref = Ref}.


    異步接收Length個(gè)數(shù)據(jù),如果有,erlang會(huì)通知你處理。處理模塊是什么概念呢?其實(shí)就是一個(gè)狀態(tài)的概念,表示當(dāng)前協(xié)議解析進(jìn)行到哪一步,起一個(gè)label的作用,看看mainloop/4中的應(yīng)用:

    mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
        %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
        receive
            %%接收到數(shù)據(jù),交給handle_input處理,注意handle_input的第一個(gè)參數(shù)就是callback
            {inet_async, Sock, Ref, {ok, Data}} ->
                %handle_input處理
                {State1, Callback1, Length1} =
                    handle_input(State#v1.callback, Data,
                                 State#v1{recv_ref = none}),

                %更新回調(diào)模塊,再次發(fā)起異步請(qǐng)求,并進(jìn)入主循環(huán)
                mainloop(Parent, Deb,
                         switch_callback(State1, Callback1, Length1));


    handle_input有多個(gè)分支,每個(gè)分支都對(duì)應(yīng)一個(gè)處理模塊,例如我們剛才提到的握手協(xié)議:

    %handshake模塊,注意到第一個(gè)參數(shù),第二個(gè)參數(shù)就是我們得到的數(shù)據(jù)
    handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
                 State = #v1{sock = Sock, connection = Connection}) ->
         %檢測(cè)協(xié)議是否兼容
        case check_version({ProtocolMajor, ProtocolMinor},
                           {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
            true ->
                {ok, Product} = application:get_key(id),
                {ok, Version} = application:get_key(vsn),
                %兼容的話,進(jìn)入connections start,協(xié)商參數(shù)
                ok = send_on_channel0(
                       Sock,
                       #'connection.start'{
                         version_major = ?PROTOCOL_VERSION_MAJOR,
                         version_minor = ?PROTOCOL_VERSION_MINOR,
                         server_properties =
                         [{list_to_binary(K), longstr, list_to_binary(V)} ||
                             {K, V} <-
                                 [{"product",     Product},
                                  {"version",     Version},
                                  {"platform",    "                               {"copyright",   ?COPYRIGHT_MESSAGE},
                                  {"information", ?INFORMATION_MESSAGE}]],
                         mechanisms = <<"PLAIN AMQPLAIN">>,
                         locales = <<"en_US">> }),
                {State#v1{connection = Connection#connection{
                                         timeout_sec = ?NORMAL_TIMEOUT},
                          connection_state = starting},
                 frame_header, 7};
             %否則,斷開(kāi)連接,返回可以接受的協(xié)議
            false ->
                throw({bad_version, ProtocolMajor, ProtocolMinor})
        end;

        其他協(xié)議的處理也是類似,通過(guò)動(dòng)態(tài)替換callback的方式來(lái)模擬狀態(tài)機(jī)做協(xié)議的解析和數(shù)據(jù)的接收,真的很巧妙!讓我們體會(huì)到Erlang的魅力,F(xiàn)P的魅力。

    5、序列圖:
    1)tcp server的啟動(dòng)過(guò)程:

    2)一個(gè)client連接上來(lái)的處理過(guò)程:


        小結(jié):從上面的分析可以看出,rabbitmq的網(wǎng)絡(luò)層是非常健壯和高效的,通過(guò)層層監(jiān)控,對(duì)每個(gè)可能出現(xiàn)的風(fēng)險(xiǎn)點(diǎn)都做了考慮,并且利用了prim_net模塊做異步IO處理。分層也是很清晰,將業(yè)務(wù)處理模塊隔離到client_sup監(jiān)控下的子進(jìn)程,將網(wǎng)絡(luò)處理細(xì)節(jié)和業(yè)務(wù)邏輯分離。在協(xié)議的解析和業(yè)務(wù)處理上雖然沒(méi)有采用gen_fsm,但是也實(shí)現(xiàn)了一套類似的狀態(tài)機(jī)機(jī)制,通過(guò)動(dòng)態(tài)替換Callback來(lái)模擬狀態(tài)的變遷,非常巧妙。如果你要實(shí)現(xiàn)一個(gè)tcp server,強(qiáng)烈推薦從rabbitmq中扣出這個(gè)網(wǎng)絡(luò)層,你只需要實(shí)現(xiàn)自己的業(yè)務(wù)處理模塊即可擁有一個(gè)高效、健壯、分層清晰的TCP服務(wù)器。

    評(píng)論

    # re: Rabbitmq的網(wǎng)絡(luò)層淺析[未登錄](méi)  回復(fù)  更多評(píng)論   

    2009-11-29 12:48 by M
    很有用,最近也在看,謝謝

    # re: Rabbitmq的網(wǎng)絡(luò)層淺析[未登錄](méi)  回復(fù)  更多評(píng)論   

    2009-11-29 13:03 by M
    prim_inet確實(shí)沒(méi)有文檔,我都沒(méi)看明白,trapexit里有人問(wèn)過(guò)這個(gè)問(wèn)題,也有人(好像是erlang otp項(xiàng)目的人)說(shuō):
    prim_inet is not documented because it is not intended to be a
    supported/stable API which we will keep backwards compatible.

    見(jiàn):
    http://www.trapexit.org/forum/viewtopic.php?p=29157

    我有個(gè)疑問(wèn),按照simple_one_for_one的文檔,supervisor:start_child每次將使用定義supervisor時(shí)init方法返回的的child spec,那么是不是說(shuō)這種模式下每次只能有一個(gè)child,因?yàn)槎xsupervsor時(shí),已指定了child ID,希望不吝賜教,謝謝。

    # re: Rabbitmq的網(wǎng)絡(luò)層淺析  回復(fù)  更多評(píng)論   

    2009-11-29 13:21 by dennis

    prim_inet,按照余鋒老大的說(shuō)法是可以用的,基本上接口不會(huì)有大的變更,gen_tcp其實(shí)是基于prim_net實(shí)現(xiàn)的。

    使用simple_one_for_one,可以有多個(gè)child的,只不過(guò)這些child的是同一種類型的,看supervisor.erl的源碼就知道,內(nèi)部是動(dòng)態(tài)保存在一個(gè)dict結(jié)構(gòu)里dynamics = ?DICT:new(),因此是可保存多個(gè):

    {ok, Pid} ->
    NState = State#state{dynamics =
    ?DICT:store(Pid, Args, State#state.dynamics)},
    {reply, {ok, Pid}, NState};

    這跟其他類型不一樣:
    %先判斷是否存在name的child
    case get_child(Child#child.name, State) of
    false ->
    case do_start_child(State#state.name, Child) of
    {ok, Pid} ->
    Children = State#state.children,
    {{ok, Pid},
    %加入到list
    State#state{children =
    [Child#child{pid = Pid}|Children]}};

    # re: Rabbitmq的網(wǎng)絡(luò)層淺析  回復(fù)  更多評(píng)論   

    2009-11-29 13:22 by dennis
    簡(jiǎn)單一句話,simple_one_for_one是是依據(jù)pid來(lái)保存的,而其他策略是依據(jù)child.name來(lái)保存的。

    # re: Rabbitmq的網(wǎng)絡(luò)層淺析[未登錄](méi)  回復(fù)  更多評(píng)論   

    2009-11-29 17:23 by M
    明白了,太感謝了!

    # re: Rabbitmq的網(wǎng)絡(luò)層淺析[未登錄](méi)  回復(fù)  更多評(píng)論   

    2014-03-03 11:35 by 菲戈
    更正一個(gè)小錯(cuò)誤哈:
    3.2中介紹說(shuō):使用了simple_one_for_one后,無(wú)法調(diào)用terminate_child/2 delete_child/2 restart_child/2

    我看文檔上說(shuō)了,simple_one_for_one是可以使用 terminate_child/2的,但參數(shù)必須是進(jìn)程的Pid。

    -----------------------------------
    不過(guò)這篇文章寫得真的很詳細(xì)啊,我解開(kāi)了而我好多疑惑,贊一個(gè)!

    # re: Rabbitmq的網(wǎng)絡(luò)層淺析  回復(fù)  更多評(píng)論   

    2014-09-16 17:21 by jj
    看不到圖片

    # re: Rabbitmq的網(wǎng)絡(luò)層淺析  回復(fù)  更多評(píng)論   

    2015-03-20 02:09 by noboby
    圖掛了,分析得不錯(cuò),但是圖顯示不了
    主站蜘蛛池模板: 久久久久国色AV免费看图片| 亚洲最大的成人网站| 四虎永久免费地址在线网站| 131美女爱做免费毛片| 91av免费在线视频| 99亚洲精品卡2卡三卡4卡2卡| 亚洲精品第五页中文字幕| 国产精品久久久亚洲| 亚洲国产成人久久笫一页| 好男人看视频免费2019中文| 亚洲一区二区三区免费在线观看| 免费国产污网站在线观看| 特级毛片在线大全免费播放| 亚洲AV无码之国产精品| 亚洲人精品亚洲人成在线| 亚洲第一页在线观看| 色噜噜综合亚洲av中文无码| 亚洲成A人片777777| 亚洲综合日韩久久成人AV| 国产精品亚洲mnbav网站 | 成人免费无码视频在线网站| 59pao成国产成视频永久免费 | 在线a亚洲v天堂网2018| 国产高清在线精品免费软件 | 亚洲色欲色欱wwW在线| 亚洲制服丝袜一区二区三区| 亚洲经典在线中文字幕| 久久夜色精品国产噜噜亚洲AV| 亚洲国产精品一区二区久久| 亚洲2022国产成人精品无码区| 亚洲av无码不卡| 亚洲成在人天堂一区二区| 亚洲图片在线观看| 亚洲视频在线观看地址| 亚洲成人网在线播放| 亚洲一本之道高清乱码| 亚洲一级特黄特黄的大片| 亚洲人成网站免费播放| 亚洲欧美在线x视频| 色婷婷综合缴情综免费观看 | 亚洲综合精品网站|