Ruby Fiber指南(一)基礎
Ruby Fiber指南(二)參數傳遞
Ruby Fiber指南(三)過濾器
Ruby Fiber指南(四)迭代器
Ruby Actor指南(五)實現Actor
寫這個指南的時候,計劃是第五章寫一個Fiber的應用例子,但是一時沒有想到比較好的例子,模仿《Programming in Lua》中的多任務下載的例子也不合適,因為Ruby中的異步HttpClient跟lua還是很不一樣的,體現不了Fiber的優點。因此,這第五節一直拖著沒寫。
恰巧最近在小組中做了一次Erlang的分享,有人問到Erlang調度器的實現問題,這塊我沒注意過,那時候就根據我對coroutine實現actor的想法做了下解釋,后來思考了下那個解釋是錯誤的,Erlang的調度器是搶占式的,而通過coroutine實現的actor調度卻是非搶占的,兩者還是截然不同的。我在《
Actor、Coroutine和Continuation的概念澄清》中提到coroutine可以實現actor風格,actor跟coroutine并沒有必然的聯系,這篇文章的目的就在于證明這一點,使用Ruby Fiber實現一個簡單的actor風格的庫,整個代碼不到100行。后面還會談到這個實現的缺點,以及我對Erlang調度器實現的理解。
首先是monkey patch,給Thread和Fiber類加上兩個方法,分別用于獲取當前線程的調度器和Fiber對應的actor:
class Thread
#得到當前線程的調度器
def __scheduler__
@internal_scheduler||=FiberActor::Scheduler.new
end
end
class Fiber
#得到當前Fiber的actor
def __actor__
@internal_actor
end
end
這里實現的actor仍然是Thread內的,一個Thread只跑一個調度器,每個actor關聯一個Fiber。
讓我們來想想調度器該怎么實現,調度器顧名思義就是協調actor的運行,每次挑選適當的actor并執行,可以想象調度器內部應該維護一個等待調度的actor隊列,Scheduler每次從隊列里取出一個actor并執行,執行完之后取下一個actor執行,不斷循環持續這個過程;在沒有actor可以調度的時候,調度器應該讓出執行權。因此調度器本身也是一個Fiber,它內部有個queue,用于維護等待調度的actor:
module FiberActor
class Scheduler
def initialize
@queue=[]
@running=false
end
def run
return if @running
@running=true
while true
#取出隊列中的actor并執行
while actor=@queue.shift
begin
actor.fiber.resume
rescue => ex
puts "actor resume error,#{ex}"
end
end
#沒有任務,讓出執行權
Fiber.yield
end
end
def reschedule
if @running
#已經啟動,只是被掛起,那么再次執行
@fiber.resume
else
#將當前actor加入隊列
self << Actor.current
end
end
def running?
@running
end
def <<(actor)
#將actor加入等待隊列
@queue << actor unless @queue.last == actor
#啟動調度器
unless @running
@queue << Actor.current
@fiber=Fiber.new { run }
@fiber.resume
end
end
end
end
run方法是核心的調度方法,注釋說明了主要的工作流程。因為調度器可能讓出執行權,因此提供了reschedule方法重新resume啟動調度器。<<方法用于將等待被調度的actor加入等待隊列,如果調度器沒有啟動,那么就啟動調度Fiber。
有了調度器,Actor的實現也很簡單,Actor跟Fiber是一對一的關系,Actor內部維護一個mailbox,用來存儲接收到的消息。最重要的是receive原語的實現,我們這里很簡單,不搞模式匹配,只是接收消息。receive的工作流程大概是這樣,判斷mailbox中有沒有消息,有消息的話,取出消息并調用block處理,沒有消息的話就yield讓出執行權。
module FiberActor
class Actor
attr_accessor :fiber
#定義類方法
class << self
def scheduler
Thread.current.__scheduler__
end
def current
Fiber.current.__actor__
end
#啟動一個actor
def spawn(*args,&block)
fiber=Fiber.new do
block.call(args)
end
actor=new(fiber)
fiber.instance_variable_set :@internal_actor,actor
scheduler << actor
actor
end
def receive(&block)
current.receive(&block)
end
end
def initialize(fiber)
@mailbox=[]
@fiber=fiber
end
#給actor發送消息
def << (msg)
@mailbox << msg
#加入調度隊列
Actor.scheduler << self
end
def receive(&block)
#沒有消息的時候,讓出執行權
Fiber.yield while @mailbox.empty?
msg=@mailbox.shift
block.call(msg)
end
def alive?
@fiber.alive?
end
end
end
Actor.spawn用于啟動一個actor,內部其實是創建了一個fiber并包裝成actor給用戶,每個actor一被創建就加入調度器的等待隊列。<<方法用于向actor傳遞消息,傳遞消息后,該actor也將加入等待隊列,等待被調度。
我們的簡化版actor庫已經寫完了,可以嘗試寫幾個例子,最簡單的hello world:
include FiberActor
Actor.spawn { puts "hello world!"}
輸出:
hello world!
沒有問題,那么試試傳遞消息:
actor=Actor.spawn{
Actor.receive{ |msg| puts "receive #{msg}"}
}
actor << :test_message
輸出:
receive test_message
也成了,那么試試兩個actor互相傳遞消息,乒乓一下下:
pong=Actor.spawn do
Actor.receive do |ping|
#收到ping,返回pong
ping << :pong
end
end
ping=Actor.spawn do
#ping一下,將ping作為消息傳遞
pong << Actor.current
Actor.receive do |msg|
#接收到pong
puts "ping #{msg}"
end
end
#resume調度器
Actor.scheduler.reschedule
輸出:
ping pong
都沒有問題,這個超級簡單actor基本完成了。可以看到,利用coroutine來實現actor是完全可行的,事實上我這里描述的實現基本上是
revactor這個庫的實現原理。
revactor是一個ruby的actor庫,它的實現就是基于Fiber,并且支持消息的模式匹配和thread之間的actor調度,有興趣地可以去玩下。更進一步,其實采用輕量級協程來模擬actor風格早就不是新鮮主意,比如在
cn-erlounge的第四次會議上就有兩個topic是關于這個,一個是51.com利用基于ucontext的實現的類erlang進程模型,一個是許世偉的CERL。可以想見,他們的基本原理跟本文所描述不會有太大差別,那么面對的問題也是一樣。
采用coroutine實現actor的主要缺點如下:
1、因為是非搶占式,這就要求actor不能有阻塞操作,任何阻塞操作都需要異步化。IO可以使用異步IO,沒有os原生支持的就需要利用線程池,基本上是一個重復造輪子的過程。
2、異常的隔離,某個actor的異常不能影響到調度器的運轉,簡單的try...catch是不夠的。
3、多核的利用,調度器只能跑在一個線程上,無法充分利用多核優勢。
4、效率因素,在actor數量劇增的情況下,簡單的FIFO的調度策略效率是個瓶頸,盡管coroutine的切換已經非常高效。
當然,上面提到的這些問題并非無法解決,例如可以使用多線程多個調度器,類似erlang smp那樣來解決單個調度器的問題。但是如調度效率這樣的問題是很難解決的。相反,erlang的actor實現就不是通過coroutine,而是自己實現一套類似os的調度程序。
首先明確一點,Erlang的process的調度是搶占式的,而非couroutine的協作式的。其次,Erlang早期版本是只有一個調度器,運行在一個線程上,隨著erts的發展,現在erlang的調度器已經支持smp,每個cpu關聯一個調度器,并且可以明確指定哪個調度器綁定到哪個cpu上。第三,Erlang的調度也是采用優先隊列+時間片輪詢的方式,每個調度器關聯一個
ErtsRunQueue,ErtsRunQueue內部又分為三個ErtsRunPrioQueue隊列,分別對應high,max和normal,low的優先級,其中normal和low共用一個隊列;在Erlang中時間片是以reduction為單位,你可以將reduction理解成一次函數調用,每個被調度的process能執行的reduction次數是有限的。調度器每次都是從max隊列開始尋找等待調度的process并執行,當前調度的隊列如果為空或者執行的reductions超過限制,那么就降低優先級,調度下一個隊列。
從上面的描述可以看出,Erlang優秀的地方不僅在于actor風格的輕量級process,另一個強悍的地方就是它的類os的調度器,再加上OTP庫的完美支持,這不是一般方案能山寨的。