Clojure
的并發(fā)(一) Ref和STM
Clojure
的并發(fā)(二)Write Skew分析
Clojure
的并發(fā)(三)Atom、緩存和性能
Clojure
的并發(fā)(四)Agent深入分析和Actor
Clojure
的并發(fā)(五)binding和let
Clojure的并發(fā)(六)Agent可以改進(jìn)的地方
Clojure的并發(fā)(七)pmap、pvalues和pcalls
Clojure的并發(fā)(八)future、promise和線程
七、并發(fā)函數(shù)pmap、pvalues和pcalls
1、pmap是map的進(jìn)化版本,map將function依次作用于集合的每個元素,pmap也是這樣,但是它對于每個集合中的元素都是提交給一個線程去執(zhí)行function,也就是并行地對集合里的元素執(zhí)行指定的函數(shù)。通過一個例子來解釋下。我們先定義一個make-heavy函數(shù)用于延時執(zhí)行某個函數(shù):
(defn make-heavy [f]
(fn [& args]
(Thread/sleep 1000)
(apply f args)))
make-heavy接受一個函數(shù)f作為參數(shù),返回一個新的函數(shù),它延時一秒才實際執(zhí)行f。我們利用make-heavy包裝inc,然后執(zhí)行下map:
user=> (time (doall (map (make-heavy inc) [1 2 3 4 5])))
"Elapsed time: 5005.115601 msecs"
(2 3 4 5 6)
可以看到總共執(zhí)行了5秒,這是因為map依次將包裝后的inc作用在每個元素上,每次調(diào)用都延時一秒,總共5個元素,因此延時了5秒左右。這里使用doall,是為了強(qiáng)制map返回的lazy-seq馬上執(zhí)行。
如果我們使用pmap替代map的話:
user=> (time (doall (pmap (make-heavy inc) [1 2 3 4 5])))
"Elapsed time: 1001.146444 msecs"
(2 3 4 5 6)
果然快了很多,只用了1秒多,顯然pmap并行地將make-heavy包裝后的inc作用在集合的5個元素上,總耗時就接近于于單個調(diào)用的耗時,也就是一秒。
2、pvalues和pcalls是在pmap之上的封裝,pvalues是并行地執(zhí)行多個表達(dá)式并返回執(zhí)行結(jié)果組成的LazySeq,pcalls則是并行地調(diào)用多個無參數(shù)的函數(shù)并返回調(diào)用結(jié)果組成的LazySeq。
user=> (pvalues (+ 1 2) (- 1 2) (* 1 2) (/ 1 2))
(3 -1 2 1/2)
user=> (pcalls #(println "hello") #(println "world"))
hello
world
(nil nil)
3、pmap的并行,從實現(xiàn)上來說,是集合有多少個元素就使用多少個線程:
1 (defn pmap
2 {:added "1.0"}
3 ([f coll]
4 (let [n (+ 2 (.. Runtime getRuntime availableProcessors))
5 rets (map #(future (f %)) coll)
6 step (fn step [[x & xs :as vs] fs]
7 (lazy-seq
8 (if-let [s (seq fs)]
9 (cons (deref x) (step xs (rest s)))
10 (map deref vs))))]
11 (step rets (drop n rets))))
12 ([f coll & colls]
13 (let [step (fn step [cs]
14 (lazy-seq
15 (let [ss (map seq cs)]
16 (when (every? identity ss)
17 (cons (map first ss) (step (map rest ss)))))))]
18 (pmap #(apply f %) (step (cons coll colls))))))
在第5行,利用map和future將函數(shù)f作用在集合的每個元素上,future是將函數(shù)f(實現(xiàn)callable接口)提交給Agent的CachedThreadPool處理,
跟agent的send-off共用線程池。
但是由于有chunked-sequence的存在,
實際上調(diào)用的線程數(shù)不會超過chunked的大小,也就是32。事實上,pmap啟動多少個線程取決于集合的類型,對于chunked-sequence,是以32個元素為單位來批量執(zhí)行,通過下面的測試可以看出來,range返回的是一個chunked-sequence,clojure 1.1引入了chunked-sequence,目前那些返回LazySeq的函數(shù)如map、filter、keep等都是返回chunked-sequence:
user=> (time (doall (pmap (make-heavy inc) (range 0 32))))
"Elapsed time: 1003.372366 msecs"
(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32)
user=> (time (doall (pmap (make-heavy inc) (range 0 64))))
"Elapsed time: 2008.153617 msecs"
(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64)
可以看到,對于32個元素,執(zhí)行(make-heavy inc)耗費(fèi)了一秒左右;對于64個元素,總耗時是2秒,這可以證明64個元素是分為兩個批次并行執(zhí)行,一批32個元素,啟動32個線程(可以通過jstack查看)。
并且pmap的執(zhí)行是半延時的(semi-lazy),前面的總數(shù)-(cpus+2)個元素是一個一個deref(future通過deref來阻塞獲取結(jié)果),后cpus+2個元素則是一次性調(diào)用map執(zhí)行deref。
4、pmap的適用場景取決于將集合分解并提交給線程池并行執(zhí)行的代價是否低于函數(shù)f執(zhí)行的代價,如果函數(shù)f的執(zhí)行代價很低,那么將集合分解并提交線程的代價可能超過了帶來的好處,pmap就不一定能帶來性能的提升。pmap只適合那些計算密集型的函數(shù)f,計算的耗時超過了協(xié)調(diào)的代價。
5、關(guān)于chunked-sequence可以看看
這篇報道,也可以參考
Rich Hickey的PPT。chunk sequence的思路類似批量處理來提高系統(tǒng)的吞吐量。