Ruby Fiber指南(一)基礎
Ruby Fiber指南(二)參數傳遞
Ruby Fiber指南(三)過濾器
Ruby Fiber指南(四)迭代器
Ruby Actor指南(五)實現Actor
在學習了Fiber的基礎知識之后,可以嘗試用Fiber去做一些比較有趣的事情。這一節將講述如何使用Fiber來實現類似unix系統中的管道功能。在unix系統中,可以通過管道將多個命令組合起來做一些強大的功能,最常用的例如查找所有的java進程:
ps aux|grep java
通過組合ps和grep命令來實現,ps的輸出作為grep的輸入,如果有更多的命令就形成了一條過濾鏈。過濾器本質上還是生產者和消費者模型,前一個過濾器產生結果,后一個過濾器消費這個結果并產生新的結果給下一個過濾器消費。因此我們就從最簡單的生產者消費者模型實現說起。
我們要展示的這個例子場景是這樣:生產者從標準輸入讀入用戶輸入并發送給消費者,消費者打印這個輸入,整個程序是由消費者驅動的,消費者喚醒生存者去讀用戶輸入,生產者讀到輸入后讓出執行權給消費者去打印,整個過程通過生產者和消費者的協作完成。
生產者發送是通過yield返回用戶輸入給消費者(還記的上一節嗎?):
def send(x)
Fiber.yield(x)
end
而消費者的接收則是通過喚醒生產者去生產:
def receive(prod)
prod.resume
end
生產者是一個Fiber,它的任務就是等待用戶輸入并發送結果給消費者:
def producer()
Fiber.new do
while true
x=readline.chomp
send(x)
end
end
end
消費者負責驅動生產者,并且在接收到結果的時候打印,消費者是root fiber:
def consumer(producer)
while true
x=receive(producer)
break if x=='quit'
puts x
end
end
最終的調用如下:
consumer(producer())
完整的程序如下:
#生產者消費者
require 'fiber'
def send(x)
Fiber.yield(x)
end
def receive(prod)
prod.resume
end
def producer()
Fiber.new do
while true
x=readline.chomp
send(x)
end
end
end
def consumer(producer)
while true
x=receive(producer)
break if x=='quit'
puts x
end
end
if $0==__FILE__
consumer(producer())
end
讀者可以嘗試在ruby1.9下運行這個程序,每次程序都由消費者驅動生產者去等待用戶輸入,用戶輸入任何東西之后回車,生產者開始運行并將讀到的結果發送給消費者并讓出執行權(通過yield),消費者在接收到yield返回的結果后打印這個結果,因此整個交互過程是一個echo的例子。
最終的調用consumer(producer())已經有過濾器的影子在了,如果我們希望在producer和consumer之間插入其他過程對用戶的輸入做處理,也就是安插過濾器,那么新的過濾器也將作為fiber存在,新的fiber消費producer的輸出,并輸出新的結果給消費者,例如我們希望將用戶的輸入結果加上行號再打印,那么就插入一個稱為filter的fiber:
def filter(prod)
return Fiber.new do
line=1
while true
value=receive(prod)
value=sprintf("%5d %s",line,value)
send(value)
line=line.succ
end
end
end
最終組合的調用如下:
consumer(filter(producer()))
類似unix系統那樣,簡單的加入新的fiber組合起來就可以為打印結果添加行號。
類似consumer(filter(producer()))的調用方式盡管已經很直觀,但是我們還是希望能像unix系統那樣調用,也就是通過豎線作為管道操作符:
producer | filter | consumer
這樣的調用方式更將透明直觀,清楚地表明整個過濾器鏈的運行過程。幸運的是在Ruby中支持對|方法符的重載,因此要實現這樣的操作符并非難事,只要對Fiber做一層封裝即可,下面給出的代碼來自《Programming
ruby》的作者Dave Thomas的
blog:
class PipelineElement
attr_accessor :source
def initialize
@fiber_delegate = Fiber.new do
process
end
end
def |(other)
other.source = self
other
end
def resume
@fiber_delegate.resume
end
def process
while value = input
handle_value(value)
end
end
def handle_value(value)
output(value)
end
def input
@source.resume
end
def output(value)
Fiber.yield(value)
end
end
這段代碼非常巧妙,將Fiber和Ruby的功能展示的淋漓盡致。大致解說下,PipelineElement作為任何一個過濾器的父類,其中封裝了一個fiber,這個fiber默認執行process,在process方法中可以看到上面生產者和消費者例子的影子,input類似receive方法調用前置過濾器(source),output則將本過濾器處理的結果作為參數傳遞給yield并讓出執行權,讓這個過濾器的調用者(也就是后續過濾器)得到結果并繼續處理。PipelineElement實現了“|”方法,用于組合過濾器,將下一個過濾器的前置過濾器設置為本過濾器,并返回下一個過濾器。整個過濾鏈的驅動者是最后一個過濾器。
有了這個封裝,那么上面生產者消費者的例子可以改寫為:
class Producer < PipelineElement
def process
while true
value=readline.chomp
handle_value(value)
end
end
end
class Filter < PipelineElement
def initialize
@line=1
super()
end
def handle_value(value)
value=sprintf("%5d %s",@line,value)
output(value)
@line=@line.succ
end
end
class Consumer < PipelineElement
def handle_value(value)
puts value
end
end
現在的調用方式可以跟unix的管道很像了:
producer=Producer.new
filter=Filter.new
consumer=Consumer.new
pipeline = producer | filter | consumer
pipeline.resume
如果你打印pipeline對象,你將看到一條清晰的過濾鏈,
#<Consumer:0x8f08bf4 @fiber_delegate=#<Fiber:0x8f08a88>, @source=#<Filter:0x8f08db4 @line=1, @fiber_delegate=#<Fiber:0x8f08d60>, @source=#<Producer:0x8f09054 @fiber_delegate=#<Fiber:0x8f09038>>>>