數(shù)據(jù)分布傾斜性指的是數(shù)據(jù)分布過度集中于數(shù)據(jù)空間的某端,造成“頭重腳輕”或者“比薩斜塔”等不均勻的分布特點(diǎn)。數(shù)據(jù)分布傾斜性將造成運(yùn)算效率上的“瓶頸”和數(shù)據(jù)分析結(jié)果的“以偏概全”。
效率上的“瓶頸”
假如在大型商場(chǎng)中,共有A,B1,B2…..B9十家店鋪,其中A店鋪中有99W商品,B1,B2….B9這九家店鋪分別有1W商品。我們要統(tǒng)計(jì)商場(chǎng)中商品總數(shù),計(jì)算初,采用HASHMAP作為存儲(chǔ)結(jié)構(gòu),其中Key:店鋪 Value:商品。我們的計(jì)算過程是先統(tǒng)計(jì)每個(gè)店鋪的商品總數(shù),最后將結(jié)果累加。可以發(fā)現(xiàn),由于A有99W商品,按照1+1的累積方式(假如1+1耗時(shí)1秒),我們要加99W個(gè)1才能得到A店鋪的商品總數(shù)(總耗時(shí)99W秒),而B1,B2….B9只需分別累加1W個(gè)1(分別耗時(shí)1W秒),而為了得到商場(chǎng)中的商品總數(shù),我們必須等待所有店鋪都分別累計(jì)結(jié)束才能處理總和,顯而易見,此時(shí)運(yùn)算瓶頸便集中在A店鋪的商品累計(jì)上。
這類狀況經(jīng)常發(fā)生在分布式運(yùn)算過程中,比如Hadoop Job計(jì)算,因?yàn)?/span>map/reduce 過程中是以Key-value形式來處理數(shù)據(jù),假如某key下的數(shù)據(jù)量太大,會(huì)導(dǎo)致整個(gè)計(jì)算過程中move/shuffle/sort的耗時(shí)遠(yuǎn)遠(yuǎn)高于其他key,因此該Key變成為效率“瓶頸”。一般解決辦法是,自定義partitioner,對(duì)所有的Value進(jìn)行自定義分組,使得每組的量較平均,從而解決時(shí)間瓶頸問題。
數(shù)據(jù)分析結(jié)果的“以偏概全”
同樣使用上述的“商場(chǎng)”案例,并且在此基礎(chǔ)上我們假設(shè)A店鋪,B9店鋪是賣低端商品,而B1,B2…..B8是賣高端商品,銷量較小。如果我們要根據(jù)商品銷售狀況分析店鋪在買家當(dāng)中的受歡迎程度。由于A店鋪本身商品量大,而且定位的銷售價(jià)位是屬于薄利多銷,如果只從銷售量的考慮,我們會(huì)以為A店鋪在商場(chǎng)中是最受買家歡迎的,造成“片面”的分析結(jié)果。
其實(shí),遇到這種情況,我們首先的分析賣家性質(zhì)和買家性質(zhì),并且使用相對(duì)量來作為評(píng)估值,比如A店鋪賣低端商品,日銷售量1W商品,1W/99W<1%, 而B9店鋪賣低端商品,日銷售量5K商品,5K/1W=50%,所以在低端買家中,低端商品店鋪B9應(yīng)該是最受歡迎的。
沒想到Hadoop在解析XML時(shí)如此糾結(jié),以至于新版api的mapreduce竟然放棄了XML格式的format以及reader,在老版(hadoop-0.19.*)的streaming模塊提供了這樣的api,由于我用的hadoop-0.20.2 3U1版本,因此需要把處理XML的幾個(gè)類移植過來使用。
移植所帶來的問題是各處依賴包,和各種api不兼容。沒關(guān)系,我可以看一下源碼,然后自己寫一個(gè)。細(xì)看了一下reader的代碼,發(fā)現(xiàn)mapreduce使用了BufferedInputStream的mark,reset來尋找XML的tag,這個(gè)tag就是我們?cè)谔峤蛔鳂I(yè)所設(shè)置的,比如<log>,</log>這樣的標(biāo)簽。Java中stream流的mark和reset,允許指針回讀,即在找到<log>時(shí),mark一下指針,然后再找到</log>標(biāo)簽,最后通過reset方法,返回到mark的位置,把<log></log>內(nèi)的數(shù)據(jù)讀取出來。但在匹配的過程中,我發(fā)現(xiàn)mapred使用了BufferedInputStream 的 read(); 方法,該方法返回下一個(gè)可讀的字節(jié)。那么整個(gè)處理過程就是讀一個(gè)字節(jié),比較一個(gè)字節(jié),我沒有在mapreduce中用這樣的算法,但我測(cè)試過,向緩沖區(qū)(BufferedInputStream)中一個(gè)字節(jié)一個(gè)字節(jié)的讀,性能嚴(yán)重不足,read(); 方法平均返回時(shí)間在331納秒,處理一個(gè)170M的xml文檔(tag比較多),竟然花了200+秒。(streaming模塊還寫了一個(gè)faster*方法,哎,慢死了)
周敏同學(xué)提供了pig中處理xml的reader,但pig那邊的代碼我還沒細(xì)看,也不知道hadoop的jira中有沒有新的feature來解決現(xiàn)有xml的問題。如果有的話,不防可以告訴我一下下。呵呵。
現(xiàn)在有一個(gè)構(gòu)思,即主要思想仍然圍繞字節(jié)比較,因?yàn)樽址ヅ湫矢停硗馑惴ㄔ从赟tring.indexOf(“”),即找到<log>這個(gè)后,記住位置,然后再找</log>,這樣算完全匹配,中間的內(nèi)容用system.arraycopy來復(fù)制到新的字節(jié)數(shù)組,目前這算法我實(shí)現(xiàn)了一半,即找到<log>和</log>后,把這兩個(gè)簽標(biāo)全部替換掉,170M文檔,用時(shí)2.2秒(最快1.3秒)。
算法及問題:
首先提供一個(gè)BufferedInputStream,默認(rèn)大小8k,在程序中建一個(gè)字節(jié)數(shù)組,大小為4k,即每次向BufferedInputStream讀4k,這個(gè)效率是很不錯(cuò)的,然后去尋找<log>.toArray這樣的字節(jié)數(shù)組,這一步速度是很驚人的。但這里有一個(gè)小的問題,即每次讀4k的大小去處理,那很有可能<log></log>位于兩次讀取的一尾一頭,那么我的想法是做一個(gè)半循環(huán)的字節(jié)數(shù)組,即如果在4k的字節(jié)數(shù)組中的最后找到<log>,那么就把前面未匹配的仍掉,然后把<log>標(biāo)簽移到字節(jié)數(shù)組最前端,然后另用這個(gè)字節(jié)數(shù)組再向BufferedInputStream中去讀4k-5長度的內(nèi)容(5是<log>的字節(jié)長度)。關(guān)于4k這個(gè)大小,首先要對(duì)XML數(shù)據(jù)進(jìn)行sampling,即確定<log></log>當(dāng)中的內(nèi)容長度,然后再定這個(gè)緩沖buf的大小。