<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 28, comments - 37, trackbacks - 0, articles - 0

    hadoop中mapreduce部分執(zhí)行流程

    Posted on 2011-01-14 09:05 俞靈 閱讀(8374) 評論(7)  編輯  收藏

    最近看了hadoop的mapreduce部分代碼,看了之后總結(jié)了一下,算是成果吧。以下是程序執(zhí)行的主要流程,其中參考了網(wǎng)上的一些文章。


    概括

    Hadoop包括hdfsmapreduce兩部分,在試用期期間我主要看了mapreduce部分,即hadoop執(zhí)行作業(yè)的部分。

    1. mapreduce中幾個(gè)主要的概念

           mapreduce整體上可以分為這么幾條執(zhí)行的線索,jobclientJobTrackerTaskTracker

      1. JobClient

                   每一個(gè)job都會在客戶端通過JobClient類將應(yīng)用程序以及配置參數(shù)打包成jar文件存儲在HDFS,然后向JobTracker提交作業(yè),JobTracker創(chuàng)建Task(即MapTaskReduceTask)并將它們分發(fā)到各個(gè)TaskTracker服務(wù)中去執(zhí)行。


      1. JobTracker

                    JobTracker是一個(gè)master服務(wù),hadoop服務(wù)端啟動之后JobTracker接收job,負(fù)責(zé)調(diào)度job的每一個(gè)子任務(wù)task運(yùn)行于TaskTracker上,并監(jiān)控它們,如果發(fā)現(xiàn)有失敗的task就重新運(yùn)行它。一般情況應(yīng)該把JobTracker部署在單獨(dú)的機(jī)器上。


      1. TaskTracker

                   TaskTracker是運(yùn)行于多個(gè)節(jié)點(diǎn)上的slaver服務(wù)。TaskTracker主動與JobTracker通信,接收作業(yè),并負(fù)責(zé)直接執(zhí)行每一個(gè)任務(wù)。

    下圖簡單的描述了三者之間的關(guān)系:(上傳不了圖片,抱歉!)


    1. 數(shù)據(jù)結(jié)構(gòu)

    2.1 JobInProgress

    JobClient提交job后,JobTracker會創(chuàng)建一個(gè)JobInProgress來跟蹤和調(diào)度這個(gè)job,并把它添加到job隊(duì)列里。JobInProgress會根據(jù)提交的job jar中定義的輸入數(shù)據(jù)集(已分解成FileSplit)創(chuàng)建對應(yīng)的一批TaskInProgress用于監(jiān)控和調(diào)度MapTask,同時(shí)在創(chuàng)建指定數(shù)目的TaskInProgress用于監(jiān)控和調(diào)度ReduceTask,缺省為1個(gè)ReduceTask


    2.2 TaskInProgress

    JobTracker啟動任務(wù)時(shí)通過每一個(gè)TaskInProgress來launchTask,這時(shí)會把Task對象(即MapTaskReduceTask)序列化寫入相應(yīng)的TaskTracker服務(wù)中,TaskTracker收到后會創(chuàng)建對應(yīng)的TaskInProgress(此TaskInProgress實(shí)現(xiàn)非JobTracker中使用的TaskInProgress,作用類似)用于監(jiān)控和調(diào)度該Task。啟動具體的Task進(jìn)程是通過TaskInProgress管理的TaskRunner對象來運(yùn)行的。TaskRunner會自動裝載job jar,并設(shè)置好環(huán)境變量后啟動一個(gè)獨(dú)立的java child進(jìn)程來執(zhí)行Task,即MapTask或者ReduceTask,但它們不一定運(yùn)行在同一個(gè)TaskTracker中。


    2.3 MapTaskReduceTask

    一個(gè)完整的job會自動依次執(zhí)行MapperCombiner(在JobConf指定了Combiner時(shí)執(zhí)行)和Reducer,其中MapperCombiner是由MapTask調(diào)用執(zhí)行,Reducer則由ReduceTask調(diào)用,Combiner實(shí)際也是Reducer接口類的實(shí)現(xiàn)。Mapper會根據(jù)job jar中定義的輸入數(shù)據(jù)集按<key1,value1>對讀入,處理完成生成臨時(shí)的<key2,value2>對,如果定義了CombinerMapTask會在Mapper完成調(diào)用該Combiner將相同key的值做合并處理,以減少輸出結(jié)果集。MapTask的任務(wù)全完成即交給ReduceTask進(jìn)程調(diào)用Reducer處理,生成最終結(jié)果<key3,value3>對。

     

    1. 整體流程

    一道MapRedcue作業(yè)是通過JobClient.rubJob(job)master節(jié)點(diǎn)的JobTracker提交的, JobTracker接到JobClient的請求后把其加入作業(yè)隊(duì)列中。JobTracker一直在等待JobClient通過RPC提交作業(yè),TaskTracker一直通過RPCJobTracker發(fā)送心跳heartbeat詢問有沒有任務(wù)可做,如果有,讓其派發(fā)任務(wù)給它執(zhí)行。如果JobTracker的作業(yè)隊(duì)列不為空, TaskTracker發(fā)送的心跳將會獲得JobTracker給它派發(fā)的任務(wù)。這是一道pull過程。slave節(jié)點(diǎn)的TaskTracker接到任務(wù)后在其本地發(fā)起Task,執(zhí)行任務(wù)。以下是簡略示意圖:



    下圖比較詳細(xì)的解釋了程序的流程:



     

    1. Jobclient

    在編寫MapReduce程序時(shí)通常是上是這樣寫的:

    Configuration conf = new Configuration(); // 讀取hadoop配置

    Job job = new Job(conf, "作業(yè)名稱"); // 實(shí)例化一道作業(yè)

    job.setMapperClass(Mapper類型);

    job.setCombinerClass(Combiner類型);

    job.setReducerClass(Reducer類型);

    job.setOutputKeyClass(輸出Key的類型);

    job.setOutputValueClass(輸出Value的類型);

    FileInputFormat.addInputPath(job, new Path(輸入hdfs路徑));

    FileOutputFormat.setOutputPath(job, new Path(輸出hdfs路徑));

    // 其它初始化配置

    JobClient.runJob(job);

    4.1 配置Job

    JobConf是用戶描述一個(gè)job的接口。下面的信息是MapReduce過程中一些較關(guān)鍵的定制信息:


    4.2 JobClient.runJob():運(yùn)行Job并分解輸入數(shù)據(jù)集


    runJob()提交作業(yè),如何等待返回的狀態(tài),根據(jù)狀態(tài)返回不同的結(jié)構(gòu)給客戶端。

    其中runJob()使用submitJob(job)方法向 master提交作業(yè)。

    submitJob(Job)方法的流程



     

    一個(gè)MapReduceJob會通過JobClient類根據(jù)用戶在JobConf類中定義的InputFormat實(shí)現(xiàn)類來將輸入的數(shù)據(jù)集分解成一批小的數(shù)據(jù)集,每一個(gè)小數(shù)據(jù)集會對應(yīng)創(chuàng)建一個(gè)MapTask來處理。JobClient會使用缺省的FileInputFormat類調(diào)用FileInputFormat.getSplits()方法生成小數(shù)據(jù)集,如果判斷數(shù)據(jù)文件是isSplitable()的話,會將大的文件分解成小的FileSplit,當(dāng)然只是記錄文件在HDFS里的路徑及偏移量和Split大小。這些信息會統(tǒng)一打包到jobFilejar中。


    hadoop分布系統(tǒng)文件系統(tǒng)hdfs依次上傳三個(gè)文件: job.jar, job.splitjob.xml。 

    job.xml: 作業(yè)配置,例如Mapper, Combiner, Reducer的類型,輸入輸出格式的類型等。 

    job.jar: jar,里面包含了執(zhí)行此任務(wù)需要的各種類,比如 Mapper,Reducer等實(shí)現(xiàn)。 

    job.split: 文件分塊的相關(guān)信息,比如有數(shù)據(jù)分多少個(gè)塊,塊的大小(默認(rèn)64m)等。 

    這三個(gè)文件在hdfs上的路徑由hadoop-default.xml文件中的mapreduce系統(tǒng)路徑mapred.system.dir屬性 + jobid決定。mapred.system.dir屬性默認(rèn)是/tmp/hadoop-user_name/mapred/system。寫完這三個(gè)文 件之后, 此方法會通過RPC調(diào)用master節(jié)點(diǎn)上的JobTracker.submitJob(job)方法,等待返回狀態(tài),此時(shí)作業(yè)已經(jīng)提交完成。

    接下來轉(zhuǎn)到JobTracker上執(zhí)行。

    (事實(shí)上這里還涉及到一些相關(guān)的類與方法)

    4.3 提交Job

    jobFile的提交過程是通過RPC(遠(yuǎn)程進(jìn)程調(diào)用)模塊來實(shí)現(xiàn)的。大致過程是,JobClient類中通過RPC實(shí)現(xiàn)的Proxy接口調(diào)用JobTrackersubmitJob()方法,而JobTracker必須實(shí)現(xiàn)JobSubmissionProtocol接口。

    JobTracker創(chuàng)建job成功后會給JobClient傳回一個(gè)JobStatus對象用于記錄job的狀態(tài)信息,如執(zhí)行時(shí)間、MapReduce任務(wù)完成的比例等。JobClient會根據(jù)這個(gè)JobStatus對象創(chuàng)建一個(gè)NetworkedJobRunningJob對象,用于定時(shí)從JobTracker獲得執(zhí)行過程的統(tǒng)計(jì)數(shù)據(jù)來監(jiān)控并打印到用戶的控制臺。

    與創(chuàng)建Job過程相關(guān)的類和方法如下圖所示


     

    1. JobTracker

    5.1 JobTracker啟動

    JobTracker類中有一個(gè)main()函數(shù),在軟件啟動的時(shí)候執(zhí)行此main()函數(shù)啟動JobTracker進(jìn)程,main()中生成一個(gè)JobTracker的對象,然后通過tracker.offerService()語句啟動服務(wù),即啟動一些線程,下面是幾個(gè)主要的線程:

    taskScheduler:一個(gè)抽象類,被JobTracker用于安排執(zhí)行在TaskTrackers上的task任務(wù),它使用一個(gè)或多個(gè)JobInProgressListeners接收jobs的通知。另外一個(gè)任務(wù)是調(diào)用JobInProgress.initTask()job初始化tasks。啟動,提交作業(yè),設(shè)置配置參數(shù),終止等方法。


    completedJobsStoreThread對應(yīng)completedJobStatusStoreCompletedJobStatusStore類:把JobInProgress中的job信息存儲到DFS中;提供一些讀取狀態(tài)信息的方法;是一個(gè)守護(hù)進(jìn)程,用于刪除DFS中的保存時(shí)間超過規(guī)定時(shí)間的job status刪除,


    interTrackerServer,抽象類Server類型的實(shí)例。一個(gè)IPC (Inter-Process Communication,進(jìn)程間通信)服務(wù)器,IPC調(diào)用一個(gè)以一個(gè)參數(shù)的形式調(diào)用Writable,然后返回一個(gè)Writable作為返回值,在某個(gè)端口上運(yùn)行。提供了call,listener,responder,connection,handle類。包括start(),stop(),join(),getListenerAddress(),call()等方法。

    這些線程啟動之后,便可開始工作了。



    job是統(tǒng)一由JobTracker來調(diào)度的,把具體的Task分發(fā)給各個(gè)TaskTracker節(jié)點(diǎn)來執(zhí)行。下面來詳細(xì)解析執(zhí)行過程,首先先從JobTracker收到JobClient的提交請求開始。

      1. JobTracker初始化Job

    5.2.1 JobTracker.submitJob() 收到請求

    當(dāng)JobTracker接收到新的job請求(即submitJob()函數(shù)被調(diào)用)后,會創(chuàng)建一個(gè)JobInProgress對象并通過它來管理和調(diào)度任務(wù)。JobInProgress在創(chuàng)建的時(shí)候會初始化一系列與任務(wù)有關(guān)的參數(shù),調(diào)用到FileSystem,把在JobClient端上傳的所有任務(wù)文件下載到本地的文件系統(tǒng)中的臨時(shí)目錄里。這其中包括上傳的*.jar文件包、記錄配置信息的xml、記錄分割信息的文件。

    5.2 JobTracker.JobInitThread 通知初始化線程

    JobTracker 中的監(jiān)聽器類EagerTaskInitializationListener負(fù)責(zé)任務(wù)Task的初始化。JobTracker使用jobAdded(job)加入jobEagerTaskInitializationListener中一個(gè)專門管理需要初始化的隊(duì)列里,即一個(gè)list成員變量jobInitQueue里。resortInitQueue方法根據(jù)作業(yè)的優(yōu)先級排序。然后調(diào)用notifyAll()函數(shù),會喚起一個(gè)用于初始化job的線程JobInitThread來處理???JobInitThread收到信號后即取出最靠前的job,即優(yōu)先級別最高的job,調(diào)用TaskTrackerManagerinitJob最終調(diào)用JobInProgress.initTasks()執(zhí)行真正的初始化工作。

    5.3 JobInProgress.initTasks() 初始化TaskInProgress

    任務(wù)Task分兩種: MapTask reduceTask,它們的管理對象都是TaskInProgress

    首先JobInProgress會創(chuàng)建Map的監(jiān)控對象。在initTasks()函數(shù)里通過調(diào)用JobClientreadSplitFile()獲得已分解的輸入數(shù)據(jù)的RawSplit列表,然后根據(jù)這個(gè)列表創(chuàng)建對應(yīng)數(shù)目的Map執(zhí)行管理對象TaskInProgress。在這個(gè)過程中,還會記錄該RawSplit塊對應(yīng)的所有在HDFS里的blocks所在的DataNode節(jié)點(diǎn)的host,這個(gè)會在RawSplit創(chuàng)建時(shí)通過FileSplitgetLocations()函數(shù)獲取,該函數(shù)會調(diào)用DistributedFileSystem的getFileCacheHints()獲得。當(dāng)然如果是存儲在本地文件系統(tǒng)中,即使用LocalFileSystem時(shí)當(dāng)然只有一個(gè)location即“localhost”了。

    創(chuàng)建這些TaskInProgress對象完畢后,initTasks()方法會通 過createCache()方法為這些TaskInProgress對象產(chǎn)生一個(gè)未執(zhí)行任務(wù)的Map緩存nonRunningMapCacheslave端的 TaskTrackermaster發(fā)送心跳時(shí),就可以直接從這個(gè)cache中取任務(wù)去執(zhí)行。

    其次JobInProgress會創(chuàng)建Reduce的監(jiān)控對象,這個(gè)比較簡單,根據(jù)JobConf里指定的Reduce數(shù)目創(chuàng)建,缺省只創(chuàng)建1個(gè)Reduce任務(wù)。監(jiān)控和調(diào)度Reduce任務(wù)的是TaskInProgress類,不過構(gòu)造方法有所不同,TaskInProgress會根據(jù)不同參數(shù)分別創(chuàng)建具體的MapTask或者ReduceTask。同樣地,initTasks()也會通過createCache()方法產(chǎn)生nonRunningReduces成員。

    JobInProgress創(chuàng)建完TaskInProgress后,最后構(gòu)造JobStatus并記錄job正在執(zhí)行中,然后再調(diào)用JobHistory.JobInfo.logStarted()記錄job的執(zhí)行日志。到這里JobTracker里初始化job的過程全部結(jié)束。


    5.3.2 JobTracker調(diào)度Job

    hadoop默認(rèn)的調(diào)度器是FIFO策略的JobQueueTaskScheduler,它有兩個(gè)成員變量 jobQueueJobInProgressListener與上面說的eagerTaskInitializationListener。JobQueueJobInProgressListener是JobTracker的另一個(gè)監(jiān)聽器類,它包含了一個(gè)映射,用來管理和調(diào)度所有的JobInProgress。jobAdded(job)同時(shí)會加入job到JobQueueJobInProgressListener中的映射。

    JobQueueTaskScheduler最重要的方法是assignTasks ,他實(shí)現(xiàn)了工作調(diào)度。具體實(shí)現(xiàn):JobTracker 接到TaskTracker heartbeat() 調(diào)用后,首先會檢查上一個(gè)心跳響應(yīng)是否完成,是沒要求啟動或重啟任務(wù),如果一切正常,則會處理心跳。首先它會檢查 TaskTracker 端還可以做多少個(gè) map reduce 任務(wù),將要派發(fā)的任務(wù)數(shù)是否超出這個(gè)數(shù),是否超出集群的任務(wù)平均剩余可負(fù)載數(shù)。如果都沒超出,則為此 TaskTracker 分配一個(gè) MapTask ReduceTask 。產(chǎn)生 Map 任務(wù)使用 JobInProgress obtainNewMapTask() 方法,實(shí)質(zhì)上最后調(diào)用了 JobInProgress findNewMapTask() 訪問 nonRunningMapCache

    上面講解任務(wù)初始化時(shí)說過,createCache()方法會在網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)上掛上需要執(zhí)行的TaskInProgressfindNewMapTask()從近到遠(yuǎn)一層一層地尋找,首先是同一節(jié)點(diǎn),然后在尋找同一機(jī)柜上的節(jié)點(diǎn),接著尋找相同數(shù)據(jù)中心下的節(jié)點(diǎn),直到找了maxLevel層結(jié)束。這樣的話,在JobTrackerTaskTracker派發(fā)任務(wù)的時(shí)候,可以迅速找到最近的TaskTracker,讓它執(zhí)行任務(wù)。

    最終生成一個(gè)Task類對象,該對象被封裝在一個(gè)LanuchTaskAction 中,發(fā)回給TaskTracker,讓它去執(zhí)行任務(wù)。

    產(chǎn)生 Reduce 任務(wù)過程類似,使用 JobInProgress.obtainNewReduceTask() 方法,實(shí)質(zhì)上最后調(diào)用了 JobInProgress findNewReduceTask() 訪問 nonRunningReduces

    6. TaskTracker

    6.1 TaskTracker的啟動

    JobTracker一樣,里面包含一個(gè)main()方法,在hadoop啟動的時(shí)候啟動此進(jìn)程。

    Main()方法最主要的一句話

    TaskTracker(conf).run()

    TaskTracker(conf)獲取本機(jī)的一些配置信息,初始化服務(wù)器并啟動服務(wù)器(StatusHttpServer);然后調(diào)用initialize(),這個(gè)方法才是真正構(gòu)造TaskTracker的地方,把它作為一個(gè)單獨(dú)的方法便可以再次調(diào)用并可以在close()之后回收對象,就是初始化一些變量對象,最后啟動線程:

    taskMemoryManagerTaskMemoryManagerThread類的對象。管理本機(jī)上task運(yùn)行時(shí)內(nèi)存的使用,殺死任何溢出和超出內(nèi)存限制的task-trees

    mapLauncherreduceLauncher都是TaskLauncher類的對象,其作用是啟動maptaskreducetask任務(wù)線程。根據(jù)tasksToLaunch判斷是否需要新建任務(wù),其中的調(diào)用的關(guān)系為:run()startNewTask()localizeJob()launchTaskForJoblaunchTask()localizeTask


    run()方法中啟動TaskTracker服務(wù)器然后一直循環(huán)。循環(huán)會嘗試連接到的JobTracker。主要調(diào)用了兩個(gè)方法startCleanupThreads(),offerService()

    startCleanupThreads()啟動為守護(hù)進(jìn)程,可以用來刪除一個(gè)獨(dú)立線程的路徑。

    offerService()類似于JobTracker中的offerService()方法,即服務(wù)器執(zhí)行的主循環(huán)。規(guī)定的時(shí)間內(nèi)給JobTracker發(fā)送心跳信息,并處理返回的命令。

    下面具體介紹流程中的每一步。

    6.2 TaskTracker加載Task到子進(jìn)程

    Task的執(zhí)行實(shí)際是由TaskTracker發(fā)起的,TaskTracker會定期與JobTracker進(jìn)行一次通信,報(bào)告自己Task的執(zhí)行狀態(tài),接收JobTracker的指令等。如果發(fā)現(xiàn)有自己需要執(zhí)行的新任務(wù)也會在這時(shí)啟動,即是在TaskTracker調(diào)用JobTrackerheartbeat()方法時(shí)進(jìn)行,此調(diào)用底層是通過IPC層調(diào)用Proxy接口實(shí)現(xiàn)。

    6.2.1 TaskTracker.run() 連接JobTracker

    TaskTracker的啟動過程會初始化一系列參數(shù)和服務(wù),然后嘗試連接JobTracker(即必須實(shí)現(xiàn)InterTrackerProtocol接口),如果連接斷開,則會循環(huán)嘗試連接JobTracker,并重新初始化所有成員和參數(shù)。

    6.2.2 TaskTracker.offerService() 主循環(huán)

    如果連接JobTracker服務(wù)成功,TaskTracker就會調(diào)用offerService()函數(shù)進(jìn)入主執(zhí)行循環(huán)中。這個(gè)循環(huán)會每隔10秒與JobTracker通訊一次,調(diào)用transmitHeartBeat(),獲得HeartbeatResponse信息。然后調(diào)用HeartbeatResponsegetActions()函數(shù)獲得JobTracker傳過來的所有指令即一個(gè)TaskTrackerAction數(shù)組。再遍歷這個(gè)數(shù)組,如果是一個(gè)新任務(wù)指令即LaunchTaskAction則調(diào)用調(diào)用addToTaskQueue加入到待執(zhí)行

    隊(duì)列,否則加入到tasksToCleanup隊(duì)列,交給一個(gè)taskCleanupThread線程來處理,如執(zhí)行KillJobAction或者KillTaskAction等。

    6.2.3 TaskTracker.transmitHeartBeat() 獲取JobTracker指令

    transmitHeartBeat()函數(shù)處理中,TaskTracker會創(chuàng)建一個(gè)新的TaskTrackerStatus對象記錄目前任務(wù)的執(zhí)行狀況,檢查目前執(zhí)行的Task數(shù)目以及本地磁盤的空間使用情況等,如果可以接收新的Task則設(shè)置heartbeat()askForNewTask參數(shù)為true。然后通過IPC接口調(diào)用JobTrackerheartbeat()方法發(fā)送過去,heartbeat()返回值TaskTrackerAction數(shù)組。

    6.2.4 TaskTracker.addToTaskQueue,交給TaskLauncher處理

    TaskLauncher是用來處理新任務(wù)的線程類,包含了一個(gè)待運(yùn)行任務(wù)的隊(duì)列 tasksToLaunchTaskTracker.addToTaskQueue會調(diào)用TaskTrackerregisterTask,創(chuàng)建TaskInProgress對象來調(diào)度和監(jiān)控任務(wù),并把它加入到runningTasks隊(duì)列中。同時(shí)將這個(gè)TaskInProgress加到tasksToLaunch 中,并notifyAll()喚醒一個(gè)線程運(yùn)行,該線程從隊(duì)列tasksToLaunch取出一個(gè)待運(yùn)行任務(wù),調(diào)用TaskTrackerstartNewTask運(yùn)行任務(wù)。

    6.2.5 TaskTracker.startNewTask() 啟動新任務(wù)

    調(diào)用localizeJob()真正初始化Task并開始執(zhí)行。

    6.2.6 TaskTracker.localizeJob() 初始化job目錄等

    此函數(shù)主要任務(wù)是初始化工作目錄workDir,再將job jar包從HDFS復(fù)制到本地文件系統(tǒng)中,調(diào)用RunJar.unJar()將包解壓到工作目錄。然后創(chuàng)建一個(gè)RunningJob并調(diào)用addTaskToJob()函數(shù)將它添加到runningJobs監(jiān)控隊(duì)列中。addTaskToJob方法把一個(gè)任務(wù)加入到該任務(wù)屬于的runningJobtasks列表中。如果該任務(wù)屬于的runningJob不存在,先新建,加到runningJobs中。完成后即調(diào)用launchTaskForJob()開始執(zhí)行Task

    6.2.7 TaskTracker.launchTaskForJob() 執(zhí)行任務(wù)

    啟動Task的工作實(shí)際是調(diào)用TaskTracker$TaskInProgresslaunchTask()函數(shù)來執(zhí)行的。

    6.2.8 TaskTracker$TaskInProgress.launchTask() 執(zhí)行任務(wù)

    執(zhí)行任務(wù)前先調(diào)用localizeTask()更新一下jobConf文件并寫入到本地目錄中。然后通過調(diào)用TaskcreateRunner()方法創(chuàng)建TaskRunner對象并調(diào)用其start()方法最后啟動Task獨(dú)立的java執(zhí)行子進(jìn)程。

    6.2.9 Task.createRunner() 創(chuàng)建啟動Runner對象

    Task有兩個(gè)實(shí)現(xiàn)版本,即MapTaskReduceTask,它們分別用于創(chuàng)建MapReduce任務(wù)。MapTask會創(chuàng)建MapTaskRunner來啟動Task子進(jìn)程,而ReduceTask則創(chuàng)建ReduceTaskRunner來啟動。

    6.2.10 TaskRunner.start() 啟動子進(jìn)程

    TaskRunner負(fù)責(zé)將一個(gè)任務(wù)放到一個(gè)進(jìn)程里面來執(zhí)行。它會調(diào)用run()函數(shù)來處理,主要的工作就是初始化啟動java子進(jìn)程的一系列環(huán)境變量,包括設(shè)定工作目錄workDir,設(shè)置CLASSPATH環(huán)境變量等。然后裝載job jar包。JvmManager用于管理該TaskTracker上所有運(yùn)行的Task子進(jìn)程。每一個(gè)進(jìn)程都是由JvmRunner來管理的,它也是位于單獨(dú)線程中的。JvmManagerlaunchJvm方法,根據(jù)任務(wù)是map還是reduce,生成對應(yīng)的JvmRunner并放到對應(yīng)JvmManagerForType的進(jìn)程容器中進(jìn)行管理。JvmManagerForTypereapJvm()

    分配一個(gè)新的JVM進(jìn)程。如果JvmManagerForType槽滿,就尋找idle的進(jìn)程,如果是同Job的直接放進(jìn)去,否則殺死這個(gè)進(jìn)程,用一個(gè)新的進(jìn)程代替。如果槽沒有滿,那么就啟動新的子進(jìn)程。生成新的進(jìn)程使用spawnNewJvm方法。spawnNewJvm使用JvmRunner線程的run方法,run方法用于生成一個(gè)新的進(jìn)程并運(yùn)行它,具體實(shí)現(xiàn)是調(diào)用runChild

    6.3 子進(jìn)程執(zhí)行MapTask

    真實(shí)的執(zhí)行載體,是Child,它包含一個(gè) main函數(shù),進(jìn)程執(zhí)行,會將相關(guān)參數(shù)傳進(jìn)來,它會拆解這些參數(shù),通過getTask(jvmId)向父進(jìn)程索取任務(wù),并且構(gòu)造出相關(guān)的Task實(shí)例,然后使用Taskrun()啟動任務(wù)。

    6.3.1 run

    方法相當(dāng)簡單,配置完系統(tǒng)的TaskReporter后,就根據(jù)情況執(zhí)行runJobCleanupTaskrunJobSetupTaskrunTaskCleanupTask或執(zhí)行map

    6.3.2 mapper

    首先構(gòu)造Mapper的輸出,是通過MapOutputCollector進(jìn)行的,也分兩種情況,如果沒有Reducer,那么,用DirectMapOutputCollector,否則,用MapOutputBuffer。然后構(gòu)造Mapper處理的InputSplit,然后就開始創(chuàng)建MapperRecordReader,最終得到map的輸入。構(gòu)造完Mapper的輸入輸出,通過構(gòu)造配置文件中配置的MapRunnable,就可以執(zhí)行Mapper了。目前系統(tǒng)有兩個(gè)MapRunnableMapRunnerMultithreadedMapRunnerMapRunner是單線程執(zhí)行器,比較簡單,他會使用反射機(jī)制生成用戶定義的Mapper接口實(shí)現(xiàn)類,作為他的一個(gè)成員。

    6.3.3 MapRunnerrun方法

    會先創(chuàng)建對應(yīng)的keyvalue對象,然后,對InputSplit的每一對<keyvalue>,調(diào)用用戶實(shí)現(xiàn)的Mapper接口實(shí)現(xiàn)類的map方法,每處理一個(gè)數(shù)據(jù)對,就要使用OutputCollector收集每次處理kv對后得到的新的kv對,把他們spill到文件或者放到內(nèi)存,以做進(jìn)一步的處理,比如排序,combine等。

    6.3.4 OutputCollector

    OutputCollector的作用是收集每次調(diào)用map后得到的新的kv對,并把他們spill到文件或者放到內(nèi)存,以做進(jìn)一步的處理,比如排序,combine等。

    MapOutputCollector 有兩個(gè)子類:MapOutputBufferDirectMapOutputCollectorDirectMapOutputCollector用在不需要Reduce階段的時(shí)候。如果Mapper后續(xù)有reduce任務(wù),系統(tǒng)會使用MapOutputBuffer做為輸出, MapOutputBuffer使用了一個(gè)緩沖區(qū)對map的處理結(jié)果進(jìn)行緩存,放在內(nèi)存中,又使用幾個(gè)數(shù)組對這個(gè)緩沖區(qū)進(jìn)行管理。



    在適當(dāng)?shù)臅r(shí)機(jī),緩沖區(qū)中的數(shù)據(jù)會被spill到硬盤中。



    向硬盤中寫數(shù)據(jù)的時(shí)機(jī):

    1)當(dāng)內(nèi)存緩沖區(qū)不能容下一個(gè)太大的k v對時(shí)。spillSingleRecord方法。

    2)內(nèi)存緩沖區(qū)已滿時(shí)。SpillThread線程。

    3Mapper的結(jié)果都已經(jīng)collect了,需要對緩沖區(qū)做最后的清理。Flush方法。

    2.5 spillThread線程:將緩沖區(qū)中的數(shù)據(jù)spill到硬盤中。

    1)需要spill時(shí)調(diào)用函數(shù)sortAndSpill,按照partitionkey做排序。默認(rèn)使用的是快速排序QuickSort

    2)如果沒有combiner,則直接輸出記錄,否則,調(diào)用CombinerRunnercombine,先做combin然后輸出。

    6.4 子進(jìn)程執(zhí)行ReduceTask

    ReduceTask.run方法開始和MapTask類似,包括initialize()初始化 ,runJobCleanupTask()runJobSetupTask()runTaskCleanupTask()。之后進(jìn)入正式的工作,主要有這么三個(gè)步驟:CopySortReduce

    6.4.1 Copy

    就是從執(zhí)行各個(gè)Map任務(wù)的服務(wù)器那里,收羅到map的輸出文件。拷貝的任務(wù),是由ReduceTask.ReduceCopier 類來負(fù)責(zé)。

    6.4.1.1 類圖:



    6.4.1.2 流程: 使用ReduceCopier.fetchOutputs開始

    1)索取任務(wù)。使用GetMapEventsThread線程。該線程的run方法不停的調(diào)用getMapCompletionEvents方法,該方法又使用RPC調(diào)用TaskUmbilicalProtocol協(xié)議的getMapCompletionEvents,方法使用所屬的jobID向其父TaskTracker詢問此作業(yè)個(gè)Map任務(wù)的完成狀況(TaskTracker要向JobTracker詢問后再轉(zhuǎn)告給它...)。返回一個(gè)數(shù)組TaskCompletionEvent events[]TaskCompletionEvent包含taskidip地址之類的信息。

    2)當(dāng)獲取到相關(guān)Map任務(wù)執(zhí)行服務(wù)器的信息后,有一個(gè)線程MapOutputCopier開啟,做具體的拷貝工作。 它會在一個(gè)單獨(dú)的線程內(nèi),負(fù)責(zé)某個(gè)Map任務(wù)服務(wù)器上文件的拷貝工作。MapOutputCopierrun循環(huán)調(diào)用copyOutputcopyOutput又調(diào)用getMapOutput,使用HTTP遠(yuǎn)程拷貝。

    3getMapOutput遠(yuǎn)程拷貝過來的內(nèi)容(當(dāng)然也可以是本地了...),作為MapOutput對象存在,它可以在內(nèi)存中也可以序列化在磁盤上,這個(gè)根據(jù)內(nèi)存使用狀況來自動調(diào)節(jié)。

    4) 同時(shí),還有一個(gè)內(nèi)存Merger線程InMemFSMergeThread和一個(gè)文件Merger線程LocalFSMerger在同步工作,它們將下載過來的文件(可能在內(nèi)存中,簡單的統(tǒng)稱為文件...),做著歸并排序,以此,節(jié)約時(shí)間,降低輸入文件的數(shù)量,為后續(xù)的排序工作減 負(fù)。InMemFSMergeThreadrun循環(huán)調(diào)用doInMemMerge, 該方法使用工具類Merger實(shí)現(xiàn)歸并,如果需要combine,則combinerRunner.combine

    6.4.2 Sort

    排序工作,就相當(dāng)于上述排序工作的一個(gè)延續(xù)。它會在所有的文件都拷貝完畢后進(jìn)行。使用工具類Merger歸并所有的文件。經(jīng)過這一個(gè)流程,一個(gè)合并了所有所需Map任務(wù)輸出文件的新文件產(chǎn)生了。而那些從其他各個(gè)服務(wù)器網(wǎng)羅過來的 Map任務(wù)輸出文件,全部刪除了。

    6.4.3 Reduce

    Reduce任務(wù)的最后一個(gè)階段。他會準(zhǔn)備好 keyClass"mapred.output.key.class""mapred.mapoutput.key.class", valueClass("mapred.mapoutput.value.class""mapred.output.value.class")Comparator(“mapred.output.value.groupfn.class”或 “mapred.output.key.comparator.class”)。最后調(diào)用runOldReducer方法。(也是兩套API,我們分析runOldReducer

    6.4.4 runReducer

    1)輸出方面。它會準(zhǔn)備一個(gè)OutputCollector收集輸出,與MapTask不同,這個(gè)OutputCollector更為簡單,僅僅是打開一個(gè)RecordWritercollect一次,write一次。最大的不同在于,這次傳入RecordWriter的文件系統(tǒng),基本都是分布式文件系統(tǒng), 或者說是HDFS

    2)輸入方面,ReduceTask會用準(zhǔn)備好的KeyClassValueClassKeyComparator等等之類的自定義類,構(gòu)造出Reducer所需的鍵類型, 和值的迭代類型Iterator(一個(gè)鍵到了這里一般是對應(yīng)一組值)。

    3)有了輸入,有了輸出,不斷循環(huán)調(diào)用自定義的Reducer,最終,Reduce階段完成。



     

     

    Feedback

    # re: hadoop中mapreduce部分執(zhí)行流程  回復(fù)  更多評論   

    2011-06-14 16:00 by laowuuser
    寫的很好,學(xué)習(xí)了

    # re: hadoop中mapreduce部分執(zhí)行流程  回復(fù)  更多評論   

    2011-08-04 09:02 by wlu
    寫的太棒了

    # re: hadoop中mapreduce部分執(zhí)行流程  回復(fù)  更多評論   

    2011-08-31 17:42 by chemcial
    JobClient.runJob(org.apache.hadoop.mapreduce.Job)調(diào)用錯(cuò)了

    # re: hadoop中mapreduce部分執(zhí)行流程  回復(fù)  更多評論   

    2011-09-05 19:50 by 俞靈
    @chemcial
    沒看懂你的意思
    JobClient.runJob(org.apache.hadoop.mapred.JobConf) ?

    # re: hadoop中mapreduce部分執(zhí)行流程[未登錄]  回復(fù)  更多評論   

    2013-03-06 21:02 by owen
    寫的很好,支持

    # re: hadoop中mapreduce部分執(zhí)行流程  回復(fù)  更多評論   

    2013-08-09 10:40 by itzzq
    樓主寫得很好,可是圖掛了,能不能重新放一下?

    # re: hadoop中mapreduce部分執(zhí)行流程[未登錄]  回復(fù)  更多評論   

    2013-11-13 16:03 by test
    寫的很不錯(cuò),有用。學(xué)習(xí)了

    只有注冊用戶登錄后才能發(fā)表評論。


    網(wǎng)站導(dǎo)航:
     
    主站蜘蛛池模板: 日日噜噜噜噜夜夜爽亚洲精品| 理论片在线观看免费| 亚洲一区二区精品视频| 又粗又大又黑又长的免费视频| 男女一边摸一边做爽的免费视频| 亚洲一区二区久久| 亚洲综合婷婷久久| 国产亚洲综合成人91精品| 免费萌白酱国产一区二区| 在线视频观看免费视频18| 久久综合国产乱子伦精品免费| 免费人成网上在线观看| 亚洲乱理伦片在线观看中字| 1区1区3区4区产品亚洲| 亚洲精品国产精品乱码不99| 亚洲第一成人影院| 国产免费人视频在线观看免费| 99热在线精品免费全部my| 亚洲精品免费在线观看| 欧洲人成在线免费| 怡红院免费全部视频在线视频| 一本久久免费视频| eeuss免费影院| 亚洲免费无码在线| 一级做a爰性色毛片免费| 有色视频在线观看免费高清在线直播| 亚洲AV成人无码网站| 亚洲AV色无码乱码在线观看| 亚洲欧美日韩一区二区三区 | 免费看一级毛片在线观看精品视频| 亚洲最大的成人网| 亚洲日本成本人观看| 亚洲最大的成人网站| 亚洲国产成人精品无码区二本| 亚洲精品无码国产片| 亚洲AV色欲色欲WWW| 免费无码AV一区二区| 日亚毛片免费乱码不卡一区| 一本岛v免费不卡一二三区| 在线观看免费视频一区| 免费在线看污视频|