<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    莊周夢蝶

    生活、程序、未來
       :: 首頁 ::  ::  :: 聚合  :: 管理
       
        Ruby Fiber指南(一)基礎
        Ruby Fiber指南(二)參數傳遞
        Ruby Fiber指南(三)過濾器
        Ruby Fiber指南(四)迭代器
        Ruby Actor指南(五)實現Actor

        寫這個指南的時候,計劃是第五章寫一個Fiber的應用例子,但是一時沒有想到比較好的例子,模仿《Programming in Lua》中的多任務下載的例子也不合適,因為Ruby中的異步HttpClient跟lua還是很不一樣的,體現不了Fiber的優點。因此,這第五節一直拖著沒寫。
        恰巧最近在小組中做了一次Erlang的分享,有人問到Erlang調度器的實現問題,這塊我沒注意過,那時候就根據我對coroutine實現actor的想法做了下解釋,后來思考了下那個解釋是錯誤的,Erlang的調度器是搶占式的,而通過coroutine實現的actor調度卻是非搶占的,兩者還是截然不同的。我在《Actor、Coroutine和Continuation的概念澄清》中提到coroutine可以實現actor風格,actor跟coroutine并沒有必然的聯系,這篇文章的目的就在于證明這一點,使用Ruby Fiber實現一個簡單的actor風格的庫,整個代碼不到100行。后面還會談到這個實現的缺點,以及我對Erlang調度器實現的理解。

        首先是monkey patch,給Thread和Fiber類加上兩個方法,分別用于獲取當前線程的調度器和Fiber對應的actor:
    class Thread
      
    #得到當前線程的調度器
      def __scheduler__
        @internal_scheduler
    ||=FiberActor::Scheduler.new
      end
    end

    class Fiber
      
    #得到當前Fiber的actor
      def __actor__
        @internal_actor
      end
    end

         這里實現的actor仍然是Thread內的,一個Thread只跑一個調度器,每個actor關聯一個Fiber。
         讓我們來想想調度器該怎么實現,調度器顧名思義就是協調actor的運行,每次挑選適當的actor并執行,可以想象調度器內部應該維護一個等待調度的actor隊列,Scheduler每次從隊列里取出一個actor并執行,執行完之后取下一個actor執行,不斷循環持續這個過程;在沒有actor可以調度的時候,調度器應該讓出執行權。因此調度器本身也是一個Fiber,它內部有個queue,用于維護等待調度的actor:
    module FiberActor
      
    class Scheduler
        
    def initialize
          @queue
    =[]
          @running
    =false
        end

        
    def run
          
    return if @running
          @running
    =true
          
    while true
            
    #取出隊列中的actor并執行
            while actor=@queue.shift
              begin
                actor.fiber.resume
              rescue 
    => ex
                puts 
    "actor resume error,#{ex}"
              end
            end
            
    #沒有任務,讓出執行權
            Fiber.yield
          end
        end

        
    def reschedule
          
    if @running
            
    #已經啟動,只是被掛起,那么再次執行
            @fiber.resume
          
    else
            
    #將當前actor加入隊列
            self << Actor.current
          end
        end

        
    def running?
          @running
        end

        
    def <<(actor)
          
    #將actor加入等待隊列
          @queue << actor unless @queue.last == actor
          
    #啟動調度器
          unless @running
             @queue 
    << Actor.current
             @fiber
    =Fiber.new { run }
             @fiber.resume
          end
        end
      end
    end

        run方法是核心的調度方法,注釋說明了主要的工作流程。因為調度器可能讓出執行權,因此提供了reschedule方法重新resume啟動調度器。<<方法用于將等待被調度的actor加入等待隊列,如果調度器沒有啟動,那么就啟動調度Fiber。

        有了調度器,Actor的實現也很簡單,Actor跟Fiber是一對一的關系,Actor內部維護一個mailbox,用來存儲接收到的消息。最重要的是receive原語的實現,我們這里很簡單,不搞模式匹配,只是接收消息。receive的工作流程大概是這樣,判斷mailbox中有沒有消息,有消息的話,取出消息并調用block處理,沒有消息的話就yield讓出執行權。

    module FiberActor  
      
    class Actor
        attr_accessor :fiber
        
    #定義類方法
        class << self
          
    def scheduler
            Thread.current.
    __scheduler__
          end

          
    def current
            Fiber.current.
    __actor__
          end

          
    #啟動一個actor
          def spawn(*args,&block)
            fiber
    =Fiber.new do
               block.call(args)
            end
            actor
    =new(fiber)
            fiber.instance_variable_set :@internal_actor,actor
            scheduler 
    << actor
            actor
          end

          
    def receive(&block)
            current.receive(
    &block)
          end
        end

        
    def initialize(fiber)
           @mailbox
    =[]
           @fiber
    =fiber
        end

        
    #給actor發送消息
        def << (msg)
          @mailbox 
    << msg
          
    #加入調度隊列
          Actor.scheduler << self
        end

        
    def receive(&block)
          
    #沒有消息的時候,讓出執行權
          Fiber.yield while @mailbox.empty?
          msg
    =@mailbox.shift
          block.call(msg)
        end

        
    def alive?
          @fiber.alive?
        end
      end

    end

        Actor.spawn用于啟動一個actor,內部其實是創建了一個fiber并包裝成actor給用戶,每個actor一被創建就加入調度器的等待隊列。<<方法用于向actor傳遞消息,傳遞消息后,該actor也將加入等待隊列,等待被調度。

        我們的簡化版actor庫已經寫完了,可以嘗試寫幾個例子,最簡單的hello world:
    include FiberActor

    Actor.spawn { puts 
    "hello world!"}
         輸出:
    hello world!

        沒有問題,那么試試傳遞消息:
    actor=Actor.spawn{
       Actor.receive{ 
    |msg|  puts "receive #{msg}"}
    }
    actor 
    << :test_message
        輸出:
    receive test_message
        
        也成了,那么試試兩個actor互相傳遞消息,乒乓一下下:
    pong=Actor.spawn do
          Actor.receive do 
    |ping|
            
    #收到ping,返回pong
            ping << :pong
          end
        end
    ping
    =Actor.spawn do
          
    #ping一下,將ping作為消息傳遞
          pong << Actor.current
          Actor.receive do 
    |msg|
            
    #接收到pong
            puts "ping #{msg}"
          end
        end
    #resume調度器
    Actor.scheduler.reschedule

         輸出:
    ping pong
        
         都沒有問題,這個超級簡單actor基本完成了。可以看到,利用coroutine來實現actor是完全可行的,事實上我這里描述的實現基本上是revactor這個庫的實現原理。revactor是一個ruby的actor庫,它的實現就是基于Fiber,并且支持消息的模式匹配和thread之間的actor調度,有興趣地可以去玩下。更進一步,其實采用輕量級協程來模擬actor風格早就不是新鮮主意,比如在cn-erlounge的第四次會議上就有兩個topic是關于這個,一個是51.com利用基于ucontext的實現的類erlang進程模型,一個是許世偉的CERL。可以想見,他們的基本原理跟本文所描述不會有太大差別,那么面對的問題也是一樣。

         采用coroutine實現actor的主要缺點如下:
    1、因為是非搶占式,這就要求actor不能有阻塞操作,任何阻塞操作都需要異步化。IO可以使用異步IO,沒有os原生支持的就需要利用線程池,基本上是一個重復造輪子的過程。
    2、異常的隔離,某個actor的異常不能影響到調度器的運轉,簡單的try...catch是不夠的。
    3、多核的利用,調度器只能跑在一個線程上,無法充分利用多核優勢。
    4、效率因素,在actor數量劇增的情況下,簡單的FIFO的調度策略效率是個瓶頸,盡管coroutine的切換已經非常高效。

        當然,上面提到的這些問題并非無法解決,例如可以使用多線程多個調度器,類似erlang smp那樣來解決單個調度器的問題。但是如調度效率這樣的問題是很難解決的。相反,erlang的actor實現就不是通過coroutine,而是自己實現一套類似os的調度程序。
        首先明確一點,Erlang的process的調度是搶占式的,而非couroutine的協作式的。其次,Erlang早期版本是只有一個調度器,運行在一個線程上,隨著erts的發展,現在erlang的調度器已經支持smp,每個cpu關聯一個調度器,并且可以明確指定哪個調度器綁定到哪個cpu上。第三,Erlang的調度也是采用優先隊列+時間片輪詢的方式,每個調度器關聯一個ErtsRunQueueErtsRunQueue內部又分為三個ErtsRunPrioQueue隊列,分別對應high,max和normal,low的優先級,其中normal和low共用一個隊列;在Erlang中時間片是以reduction為單位,你可以將reduction理解成一次函數調用,每個被調度的process能執行的reduction次數是有限的。調度器每次都是從max隊列開始尋找等待調度的process并執行,當前調度的隊列如果為空或者執行的reductions超過限制,那么就降低優先級,調度下一個隊列。

       從上面的描述可以看出,Erlang優秀的地方不僅在于actor風格的輕量級process,另一個強悍的地方就是它的類os的調度器,再加上OTP庫的完美支持,這不是一般方案能山寨的。
        
        
      

    評論

    # re: Ruby Fiber指南(五): 實現Actor,兼談Erlang的process調度  回復  更多評論   

    2013-11-04 02:39 by 我傻逼我自豪
    協程=>流程控制
    actor=>對象抽象
    主站蜘蛛池模板: 国产精品永久免费视频| 7m凹凸精品分类大全免费| 日韩亚洲欧洲在线com91tv| 免费无码又爽又刺激高潮视频| 亚洲一区二区三区深夜天堂| 波多野结衣中文一区二区免费| 日本在线免费播放| 亚洲一区二区三区写真| 中文字幕一精品亚洲无线一区| 真人做人试看60分钟免费视频| 人妻巨大乳hd免费看| 久久亚洲精品成人AV| 四虎永久免费地址在线网站| 久久永久免费人妻精品| 国产成人精品日本亚洲语音| 亚洲电影中文字幕| 全黄a免费一级毛片人人爱| 24小时日本韩国高清免费| 最新亚洲人成无码网站| 亚洲精品自拍视频| 国产亚洲成人久久| 中文字幕无码成人免费视频| 爱丫爱丫影院在线观看免费| 亚洲AV电影天堂男人的天堂| 久久精品国产亚洲AV高清热| 亚洲日韩国产精品乱| 成人a免费α片在线视频网站| 无码人妻一区二区三区免费看 | 91麻豆精品国产自产在线观看亚洲| 日本XXX黄区免费看| 两个人看的www免费视频| 激情无码亚洲一区二区三区| 亚洲男人的天堂在线| 国产亚洲一区二区在线观看| 亚洲电影日韩精品| 免费无码又爽又高潮视频| 亚洲一级毛片免费看| 久久久久久国产精品免费免费男同 | 中文字幕亚洲综合精品一区| 精品国产香蕉伊思人在线在线亚洲一区二区 | 在线观看免费视频资源|