讀者是沒有耐心的,我也沒有,所以先說結(jié)論:你可以不用編程序,只要鼠標點幾下拖動些圖標,改改參數(shù),就能完成過億數(shù)據(jù)的分布處理程序。
當(dāng)然,這么理想的目標現(xiàn)在還沒有達到,但路已經(jīng)明明白白的展現(xiàn)在面前了,至少我們已經(jīng)走了接近一半了。
首先說明, MapReduce算法本身就來自于函數(shù)式編程,因此用FP的思路來進行算法構(gòu)建是再合理不過的事情。之前的程序是用Haskell開發(fā)的,現(xiàn)在用Python重新寫了一個版本。
在做了一些MR的實際應(yīng)用后,發(fā)現(xiàn)很多問題都有基本的算法模式,而且?guī)讉€模式都很簡單。后續(xù)會總結(jié)出來,這里說個該要:(自己總結(jié)的,比較山寨)
MapReduce算法模式
1. 元模式: MR Chain
多個MapReduce過程可以串接起來,實現(xiàn)任意復(fù)雜的統(tǒng)計算法。
也可以稱為 Data Flow 模式
2. Map模式
包括 Field count , Field Join 兩個
3. Reduce 模式
KeyCount, Value Sum, NubCount, Value Join
核心思想
(借用Java界的說法)
1. 數(shù)據(jù)流編程:源數(shù)據(jù)從MR網(wǎng)一端流入,在一個處理鏈中依次處理,獲得最終結(jié)果,鏈可以有多個分支
2. 組合子編程:使用通用的 Mapper, Reducer 算子,組合起來實現(xiàn)復(fù)雜的功能,
這是一個相乘的過程,結(jié)合MR Chain,可以倍增處理的復(fù)雜度。
盡量保持每個算子的簡單性和原子性,功能正交。
3. 函數(shù)柯里化:組合子可以通過參數(shù)定制,生成用戶定義的函數(shù)
應(yīng)用實例
輸入數(shù)據(jù)
時間 ip 省份 用戶uuid
"03-09-2008 17:11:10" 1987636648 "四川" "0CE12C9121CA8E2484440B4459781BDB"
"03-09-2008 17:11:15" 1018955844 "浙江" "19173BB499F4B0A62F19AFEB5BA5017A"
"03-09-2008 17:11:18" 2030878566 "廣東" "B596B9655D2ACD4D449D5262C1B9D3BE"
"03-09-2008 17:11:19" 1947385333 "廣東" "9CF2210902BBF421E9DF1CB384B65CC7"
"03-09-2008 17:11:24" 1964392548 "陜西" "7EBE2805FBDFAB3C7B11395CB76364F4"
"03-09-2008 17:11:35" 3722701596 "江蘇" "CDA23CC1EBAC208168C8AF1C88D03E55"
"03-09-2008 17:11:09" 1034301425 "云南" "5573F458F859E35D7DDCA346FD1A35A8"
"03-09-2008 17:11:09" 1987636648 "四川" "0CE12C9121CA8E2484440B4459781BDB"
"03-09-2008 17:11:09" 1987636648 "四川" "0CE12C9121CA8E2484440B4459781BDB"
"03-09-2008 17:11:10" 1987636648 "四川" "0CE12C9121CA8E2484440B4459781BDB"
統(tǒng)計需求
各個省上報的uuid 的不重復(fù)數(shù)目,
每個uuid上報的次數(shù)
不同上報次數(shù)分別有多少人
處理過程
串接的兩個MR,第一個產(chǎn)生前兩個需求的結(jié)果,獲得的中間結(jié)果給第二個MR,得到第三個需求結(jié)果。
任務(wù)描述
test_tasks = {
'task1' : {'name' : 'task1',
'input' : 'userinfo.test',
'mrs' : [('province', ('', 'm_field_count(2)'), ['KeyCount', 'NubCount']),
('uuid', ('', 'm_field_count(3)'), ['KeyCount']),
],
'output' : 'task1.out',
'next' : ['task2']
},
'task2' : {'name' : 'task2',
'input' : 'task1.out',
'mrs' : [('uuid_count_nub', ('c_uuid', 'm_field_join(1, 0)'), ['NubCount'])
],
'output' : 'task2.out',
'next' : []
}
}
通過框架讀取任務(wù)描述,自動生成測試運行腳本,及4個程序:
run.sh
#!/bin/sh
cat userinfo.test | python task1_map.py | sort | python task1_reduce.py > task1.out
cat task1.out | python task2_map.py | sort | python task2_reduce.py > task2.out
task1_map.py, task1_reduce.py, task2_map.py, task2_reduce.py 是自動生成的。
執(zhí)行測試:
Task1:
$ head -n 10 userinfo.test | ./task1_map.py | sort | python ./task1_reduce.py
c_province_"云南" 1
nc_province_"云南" 1
c_province_"四川" 4
nc_province_"四川" 1
c_province_"廣東" 2
nc_province_"廣東" 1
c_province_"江蘇" 1
nc_province_"江蘇" 1
c_province_"浙江" 1
nc_province_"浙江" 1
c_province_"陜西" 1
nc_province_"陜西" 1
c_uuid_"0CE12C9121CA8E2484440B4459781BDB" 4
c_uuid_"19173BB499F4B0A62F19AFEB5BA5017A" 1
c_uuid_"5573F458F859E35D7DDCA346FD1A35A8" 1
c_uuid_"7EBE2805FBDFAB3C7B11395CB76364F4" 1
c_uuid_"9CF2210902BBF421E9DF1CB384B65CC7" 1
c_uuid_"B596B9655D2ACD4D449D5262C1B9D3BE" 1
c_uuid_"CDA23CC1EBAC208168C8AF1C88D03E55" 1
Task2:
$ head -n 10 userinfo.test | ./task1_map.py | sort | python ./task1_reduce.py | python task2_map.py | sort | python task2_reduce.py
nc_uuid_count_nub_1 6
nc_uuid_count_nub_4 1
實際運行,扔到Hadoop上跑,前面的文章中說過了。
整個過程中,只需要寫一個配置文件,描述出各個任務(wù),每個任務(wù)中的Map 和 Reduce 是什么即可。
后續(xù)工作
完善框架,自動生成程序等。
收集整理 Mapper, Reducer 算子。
基于web或者gui 的MR Chain 設(shè)計器。
轉(zhuǎn)自:MapReduce算法模式