<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 28, comments - 37, trackbacks - 0, articles - 0

    hadoop中mapreduce部分執行流程

    Posted on 2011-01-14 09:05 俞靈 閱讀(8373) 評論(7)  編輯  收藏

    最近看了hadoop的mapreduce部分代碼,看了之后總結了一下,算是成果吧。以下是程序執行的主要流程,其中參考了網上的一些文章。


    概括

    Hadoop包括hdfsmapreduce兩部分,在試用期期間我主要看了mapreduce部分,即hadoop執行作業的部分。

    1. mapreduce中幾個主要的概念

           mapreduce整體上可以分為這么幾條執行的線索,jobclientJobTrackerTaskTracker

      1. JobClient

                   每一個job都會在客戶端通過JobClient類將應用程序以及配置參數打包成jar文件存儲在HDFS,然后向JobTracker提交作業,JobTracker創建Task(即MapTaskReduceTask)并將它們分發到各個TaskTracker服務中去執行。


      1. JobTracker

                    JobTracker是一個master服務,hadoop服務端啟動之后JobTracker接收job,負責調度job的每一個子任務task運行于TaskTracker上,并監控它們,如果發現有失敗的task就重新運行它。一般情況應該把JobTracker部署在單獨的機器上。


      1. TaskTracker

                   TaskTracker是運行于多個節點上的slaver服務。TaskTracker主動與JobTracker通信,接收作業,并負責直接執行每一個任務。

    下圖簡單的描述了三者之間的關系:(上傳不了圖片,抱歉!)


    1. 數據結構

    2.1 JobInProgress

    JobClient提交job后,JobTracker會創建一個JobInProgress來跟蹤和調度這個job,并把它添加到job隊列里。JobInProgress會根據提交的job jar中定義的輸入數據集(已分解成FileSplit)創建對應的一批TaskInProgress用于監控和調度MapTask,同時在創建指定數目的TaskInProgress用于監控和調度ReduceTask,缺省為1ReduceTask


    2.2 TaskInProgress

    JobTracker啟動任務時通過每一個TaskInProgress來launchTask,這時會把Task對象(即MapTaskReduceTask)序列化寫入相應的TaskTracker服務中,TaskTracker收到后會創建對應的TaskInProgress(此TaskInProgress實現非JobTracker中使用的TaskInProgress,作用類似)用于監控和調度該Task。啟動具體的Task進程是通過TaskInProgress管理的TaskRunner對象來運行的。TaskRunner會自動裝載job jar,并設置好環境變量后啟動一個獨立的java child進程來執行Task,即MapTask或者ReduceTask,但它們不一定運行在同一個TaskTracker中。


    2.3 MapTaskReduceTask

    一個完整的job會自動依次執行MapperCombiner(在JobConf指定了Combiner時執行)和Reducer,其中MapperCombiner是由MapTask調用執行,Reducer則由ReduceTask調用,Combiner實際也是Reducer接口類的實現。Mapper會根據job jar中定義的輸入數據集按<key1,value1>對讀入,處理完成生成臨時的<key2,value2>對,如果定義了CombinerMapTask會在Mapper完成調用該Combiner將相同key的值做合并處理,以減少輸出結果集。MapTask的任務全完成即交給ReduceTask進程調用Reducer處理,生成最終結果<key3,value3>對。

     

    1. 整體流程

    一道MapRedcue作業是通過JobClient.rubJob(job)master節點的JobTracker提交的, JobTracker接到JobClient的請求后把其加入作業隊列中。JobTracker一直在等待JobClient通過RPC提交作業,TaskTracker一直通過RPCJobTracker發送心跳heartbeat詢問有沒有任務可做,如果有,讓其派發任務給它執行。如果JobTracker的作業隊列不為空, TaskTracker發送的心跳將會獲得JobTracker給它派發的任務。這是一道pull過程。slave節點的TaskTracker接到任務后在其本地發起Task,執行任務。以下是簡略示意圖:



    下圖比較詳細的解釋了程序的流程:



     

    1. Jobclient

    在編寫MapReduce程序時通常是上是這樣寫的:

    Configuration conf = new Configuration(); // 讀取hadoop配置

    Job job = new Job(conf, "作業名稱"); // 實例化一道作業

    job.setMapperClass(Mapper類型);

    job.setCombinerClass(Combiner類型);

    job.setReducerClass(Reducer類型);

    job.setOutputKeyClass(輸出Key的類型);

    job.setOutputValueClass(輸出Value的類型);

    FileInputFormat.addInputPath(job, new Path(輸入hdfs路徑));

    FileOutputFormat.setOutputPath(job, new Path(輸出hdfs路徑));

    // 其它初始化配置

    JobClient.runJob(job);

    4.1 配置Job

    JobConf是用戶描述一個job的接口。下面的信息是MapReduce過程中一些較關鍵的定制信息:


    4.2 JobClient.runJob():運行Job并分解輸入數據集


    runJob()提交作業,如何等待返回的狀態,根據狀態返回不同的結構給客戶端。

    其中runJob()使用submitJob(job)方法向 master提交作業。

    submitJob(Job)方法的流程



     

    一個MapReduceJob會通過JobClient類根據用戶在JobConf類中定義的InputFormat實現類來將輸入的數據集分解成一批小的數據集,每一個小數據集會對應創建一個MapTask來處理。JobClient會使用缺省的FileInputFormat類調用FileInputFormat.getSplits()方法生成小數據集,如果判斷數據文件是isSplitable()的話,會將大的文件分解成小的FileSplit,當然只是記錄文件在HDFS里的路徑及偏移量和Split大小。這些信息會統一打包到jobFilejar中。


    hadoop分布系統文件系統hdfs依次上傳三個文件: job.jar, job.splitjob.xml。 

    job.xml: 作業配置,例如Mapper, Combiner, Reducer的類型,輸入輸出格式的類型等。 

    job.jar: jar,里面包含了執行此任務需要的各種類,比如 Mapper,Reducer等實現。 

    job.split: 文件分塊的相關信息,比如有數據分多少個塊,塊的大小(默認64m)等。 

    這三個文件在hdfs上的路徑由hadoop-default.xml文件中的mapreduce系統路徑mapred.system.dir屬性 + jobid決定。mapred.system.dir屬性默認是/tmp/hadoop-user_name/mapred/system。寫完這三個文 件之后, 此方法會通過RPC調用master節點上的JobTracker.submitJob(job)方法,等待返回狀態,此時作業已經提交完成。

    接下來轉到JobTracker上執行。

    (事實上這里還涉及到一些相關的類與方法)

    4.3 提交Job

    jobFile的提交過程是通過RPC(遠程進程調用)模塊來實現的。大致過程是,JobClient類中通過RPC實現的Proxy接口調用JobTrackersubmitJob()方法,而JobTracker必須實現JobSubmissionProtocol接口。

    JobTracker創建job成功后會給JobClient傳回一個JobStatus對象用于記錄job的狀態信息,如執行時間、MapReduce任務完成的比例等。JobClient會根據這個JobStatus對象創建一個NetworkedJobRunningJob對象,用于定時從JobTracker獲得執行過程的統計數據來監控并打印到用戶的控制臺。

    與創建Job過程相關的類和方法如下圖所示


     

    1. JobTracker

    5.1 JobTracker啟動

    JobTracker類中有一個main()函數,在軟件啟動的時候執行此main()函數啟動JobTracker進程,main()中生成一個JobTracker的對象,然后通過tracker.offerService()語句啟動服務,即啟動一些線程,下面是幾個主要的線程:

    taskScheduler:一個抽象類,被JobTracker用于安排執行在TaskTrackers上的task任務,它使用一個或多個JobInProgressListeners接收jobs的通知。另外一個任務是調用JobInProgress.initTask()job初始化tasks。啟動,提交作業,設置配置參數,終止等方法。


    completedJobsStoreThread對應completedJobStatusStoreCompletedJobStatusStore類:把JobInProgress中的job信息存儲到DFS中;提供一些讀取狀態信息的方法;是一個守護進程,用于刪除DFS中的保存時間超過規定時間的job status刪除,


    interTrackerServer,抽象類Server類型的實例。一個IPC (Inter-Process Communication,進程間通信)服務器,IPC調用一個以一個參數的形式調用Writable,然后返回一個Writable作為返回值,在某個端口上運行。提供了call,listener,responder,connection,handle類。包括start(),stop(),join(),getListenerAddress(),call()等方法。

    這些線程啟動之后,便可開始工作了。



    job是統一由JobTracker來調度的,把具體的Task分發給各個TaskTracker節點來執行。下面來詳細解析執行過程,首先先從JobTracker收到JobClient的提交請求開始。

      1. JobTracker初始化Job

    5.2.1 JobTracker.submitJob() 收到請求

    JobTracker接收到新的job請求(即submitJob()函數被調用)后,會創建一個JobInProgress對象并通過它來管理和調度任務。JobInProgress在創建的時候會初始化一系列與任務有關的參數,調用到FileSystem,把在JobClient端上傳的所有任務文件下載到本地的文件系統中的臨時目錄里。這其中包括上傳的*.jar文件包、記錄配置信息的xml、記錄分割信息的文件。

    5.2 JobTracker.JobInitThread 通知初始化線程

    JobTracker 中的監聽器類EagerTaskInitializationListener負責任務Task的初始化。JobTracker使用jobAdded(job)加入jobEagerTaskInitializationListener中一個專門管理需要初始化的隊列里,即一個list成員變量jobInitQueue里。resortInitQueue方法根據作業的優先級排序。然后調用notifyAll()函數,會喚起一個用于初始化job的線程JobInitThread來處理???JobInitThread收到信號后即取出最靠前的job,即優先級別最高的job,調用TaskTrackerManagerinitJob最終調用JobInProgress.initTasks()執行真正的初始化工作。

    5.3 JobInProgress.initTasks() 初始化TaskInProgress

    任務Task分兩種: MapTask reduceTask,它們的管理對象都是TaskInProgress

    首先JobInProgress會創建Map的監控對象。在initTasks()函數里通過調用JobClientreadSplitFile()獲得已分解的輸入數據的RawSplit列表,然后根據這個列表創建對應數目的Map執行管理對象TaskInProgress。在這個過程中,還會記錄該RawSplit塊對應的所有在HDFS里的blocks所在的DataNode節點的host,這個會在RawSplit創建時通過FileSplitgetLocations()函數獲取,該函數會調用DistributedFileSystem的getFileCacheHints()獲得。當然如果是存儲在本地文件系統中,即使用LocalFileSystem時當然只有一個location即“localhost”了。

    創建這些TaskInProgress對象完畢后,initTasks()方法會通 過createCache()方法為這些TaskInProgress對象產生一個未執行任務的Map緩存nonRunningMapCacheslave端的 TaskTrackermaster發送心跳時,就可以直接從這個cache中取任務去執行。

    其次JobInProgress會創建Reduce的監控對象,這個比較簡單,根據JobConf里指定的Reduce數目創建,缺省只創建1Reduce任務。監控和調度Reduce任務的是TaskInProgress類,不過構造方法有所不同,TaskInProgress會根據不同參數分別創建具體的MapTask或者ReduceTask。同樣地,initTasks()也會通過createCache()方法產生nonRunningReduces成員。

    JobInProgress創建完TaskInProgress后,最后構造JobStatus并記錄job正在執行中,然后再調用JobHistory.JobInfo.logStarted()記錄job的執行日志。到這里JobTracker里初始化job的過程全部結束。


    5.3.2 JobTracker調度Job

    hadoop默認的調度器是FIFO策略的JobQueueTaskScheduler,它有兩個成員變量 jobQueueJobInProgressListener與上面說的eagerTaskInitializationListener。JobQueueJobInProgressListener是JobTracker的另一個監聽器類,它包含了一個映射,用來管理和調度所有的JobInProgress。jobAdded(job)同時會加入job到JobQueueJobInProgressListener中的映射。

    JobQueueTaskScheduler最重要的方法是assignTasks ,他實現了工作調度。具體實現:JobTracker 接到TaskTracker heartbeat() 調用后,首先會檢查上一個心跳響應是否完成,是沒要求啟動或重啟任務,如果一切正常,則會處理心跳。首先它會檢查 TaskTracker 端還可以做多少個 map reduce 任務,將要派發的任務數是否超出這個數,是否超出集群的任務平均剩余可負載數。如果都沒超出,則為此 TaskTracker 分配一個 MapTask ReduceTask 。產生 Map 任務使用 JobInProgress obtainNewMapTask() 方法,實質上最后調用了 JobInProgress findNewMapTask() 訪問 nonRunningMapCache

    上面講解任務初始化時說過,createCache()方法會在網絡拓撲結構上掛上需要執行的TaskInProgressfindNewMapTask()從近到遠一層一層地尋找,首先是同一節點,然后在尋找同一機柜上的節點,接著尋找相同數據中心下的節點,直到找了maxLevel層結束。這樣的話,在JobTrackerTaskTracker派發任務的時候,可以迅速找到最近的TaskTracker,讓它執行任務。

    最終生成一個Task類對象,該對象被封裝在一個LanuchTaskAction 中,發回給TaskTracker,讓它去執行任務。

    產生 Reduce 任務過程類似,使用 JobInProgress.obtainNewReduceTask() 方法,實質上最后調用了 JobInProgress findNewReduceTask() 訪問 nonRunningReduces

    6. TaskTracker

    6.1 TaskTracker的啟動

    JobTracker一樣,里面包含一個main()方法,在hadoop啟動的時候啟動此進程。

    Main()方法最主要的一句話

    TaskTracker(conf).run()

    TaskTracker(conf)獲取本機的一些配置信息,初始化服務器并啟動服務器(StatusHttpServer);然后調用initialize(),這個方法才是真正構造TaskTracker的地方,把它作為一個單獨的方法便可以再次調用并可以在close()之后回收對象,就是初始化一些變量對象,最后啟動線程:

    taskMemoryManagerTaskMemoryManagerThread類的對象。管理本機上task運行時內存的使用,殺死任何溢出和超出內存限制的task-trees

    mapLauncherreduceLauncher都是TaskLauncher類的對象,其作用是啟動maptaskreducetask任務線程。根據tasksToLaunch判斷是否需要新建任務,其中的調用的關系為:run()startNewTask()localizeJob()launchTaskForJoblaunchTask()localizeTask


    run()方法中啟動TaskTracker服務器然后一直循環。循環會嘗試連接到的JobTracker。主要調用了兩個方法startCleanupThreads(),offerService()

    startCleanupThreads()啟動為守護進程,可以用來刪除一個獨立線程的路徑。

    offerService()類似于JobTracker中的offerService()方法,即服務器執行的主循環。規定的時間內給JobTracker發送心跳信息,并處理返回的命令。

    下面具體介紹流程中的每一步。

    6.2 TaskTracker加載Task到子進程

    Task的執行實際是由TaskTracker發起的,TaskTracker會定期與JobTracker進行一次通信,報告自己Task的執行狀態,接收JobTracker的指令等。如果發現有自己需要執行的新任務也會在這時啟動,即是在TaskTracker調用JobTrackerheartbeat()方法時進行,此調用底層是通過IPC層調用Proxy接口實現。

    6.2.1 TaskTracker.run() 連接JobTracker

    TaskTracker的啟動過程會初始化一系列參數和服務,然后嘗試連接JobTracker(即必須實現InterTrackerProtocol接口),如果連接斷開,則會循環嘗試連接JobTracker,并重新初始化所有成員和參數。

    6.2.2 TaskTracker.offerService() 主循環

    如果連接JobTracker服務成功,TaskTracker就會調用offerService()函數進入主執行循環中。這個循環會每隔10秒與JobTracker通訊一次,調用transmitHeartBeat(),獲得HeartbeatResponse信息。然后調用HeartbeatResponsegetActions()函數獲得JobTracker傳過來的所有指令即一個TaskTrackerAction數組。再遍歷這個數組,如果是一個新任務指令即LaunchTaskAction則調用調用addToTaskQueue加入到待執行

    隊列,否則加入到tasksToCleanup隊列,交給一個taskCleanupThread線程來處理,如執行KillJobAction或者KillTaskAction等。

    6.2.3 TaskTracker.transmitHeartBeat() 獲取JobTracker指令

    transmitHeartBeat()函數處理中,TaskTracker會創建一個新的TaskTrackerStatus對象記錄目前任務的執行狀況,檢查目前執行的Task數目以及本地磁盤的空間使用情況等,如果可以接收新的Task則設置heartbeat()askForNewTask參數為true。然后通過IPC接口調用JobTrackerheartbeat()方法發送過去,heartbeat()返回值TaskTrackerAction數組。

    6.2.4 TaskTracker.addToTaskQueue,交給TaskLauncher處理

    TaskLauncher是用來處理新任務的線程類,包含了一個待運行任務的隊列 tasksToLaunchTaskTracker.addToTaskQueue會調用TaskTrackerregisterTask,創建TaskInProgress對象來調度和監控任務,并把它加入到runningTasks隊列中。同時將這個TaskInProgress加到tasksToLaunch 中,并notifyAll()喚醒一個線程運行,該線程從隊列tasksToLaunch取出一個待運行任務,調用TaskTrackerstartNewTask運行任務。

    6.2.5 TaskTracker.startNewTask() 啟動新任務

    調用localizeJob()真正初始化Task并開始執行。

    6.2.6 TaskTracker.localizeJob() 初始化job目錄等

    此函數主要任務是初始化工作目錄workDir,再將job jar包從HDFS復制到本地文件系統中,調用RunJar.unJar()將包解壓到工作目錄。然后創建一個RunningJob并調用addTaskToJob()函數將它添加到runningJobs監控隊列中。addTaskToJob方法把一個任務加入到該任務屬于的runningJobtasks列表中。如果該任務屬于的runningJob不存在,先新建,加到runningJobs中。完成后即調用launchTaskForJob()開始執行Task

    6.2.7 TaskTracker.launchTaskForJob() 執行任務

    啟動Task的工作實際是調用TaskTracker$TaskInProgresslaunchTask()函數來執行的。

    6.2.8 TaskTracker$TaskInProgress.launchTask() 執行任務

    執行任務前先調用localizeTask()更新一下jobConf文件并寫入到本地目錄中。然后通過調用TaskcreateRunner()方法創建TaskRunner對象并調用其start()方法最后啟動Task獨立的java執行子進程。

    6.2.9 Task.createRunner() 創建啟動Runner對象

    Task有兩個實現版本,即MapTaskReduceTask,它們分別用于創建MapReduce任務。MapTask會創建MapTaskRunner來啟動Task子進程,而ReduceTask則創建ReduceTaskRunner來啟動。

    6.2.10 TaskRunner.start() 啟動子進程

    TaskRunner負責將一個任務放到一個進程里面來執行。它會調用run()函數來處理,主要的工作就是初始化啟動java子進程的一系列環境變量,包括設定工作目錄workDir,設置CLASSPATH環境變量等。然后裝載job jar包。JvmManager用于管理該TaskTracker上所有運行的Task子進程。每一個進程都是由JvmRunner來管理的,它也是位于單獨線程中的。JvmManagerlaunchJvm方法,根據任務是map還是reduce,生成對應的JvmRunner并放到對應JvmManagerForType的進程容器中進行管理。JvmManagerForTypereapJvm()

    分配一個新的JVM進程。如果JvmManagerForType槽滿,就尋找idle的進程,如果是同Job的直接放進去,否則殺死這個進程,用一個新的進程代替。如果槽沒有滿,那么就啟動新的子進程。生成新的進程使用spawnNewJvm方法。spawnNewJvm使用JvmRunner線程的run方法,run方法用于生成一個新的進程并運行它,具體實現是調用runChild

    6.3 子進程執行MapTask

    真實的執行載體,是Child,它包含一個 main函數,進程執行,會將相關參數傳進來,它會拆解這些參數,通過getTask(jvmId)向父進程索取任務,并且構造出相關的Task實例,然后使用Taskrun()啟動任務。

    6.3.1 run

    方法相當簡單,配置完系統的TaskReporter后,就根據情況執行runJobCleanupTaskrunJobSetupTaskrunTaskCleanupTask或執行map

    6.3.2 mapper

    首先構造Mapper的輸出,是通過MapOutputCollector進行的,也分兩種情況,如果沒有Reducer,那么,用DirectMapOutputCollector,否則,用MapOutputBuffer。然后構造Mapper處理的InputSplit,然后就開始創建MapperRecordReader,最終得到map的輸入。構造完Mapper的輸入輸出,通過構造配置文件中配置的MapRunnable,就可以執行Mapper了。目前系統有兩個MapRunnableMapRunnerMultithreadedMapRunnerMapRunner是單線程執行器,比較簡單,他會使用反射機制生成用戶定義的Mapper接口實現類,作為他的一個成員。

    6.3.3 MapRunnerrun方法

    會先創建對應的keyvalue對象,然后,對InputSplit的每一對<keyvalue>,調用用戶實現的Mapper接口實現類的map方法,每處理一個數據對,就要使用OutputCollector收集每次處理kv對后得到的新的kv對,把他們spill到文件或者放到內存,以做進一步的處理,比如排序,combine等。

    6.3.4 OutputCollector

    OutputCollector的作用是收集每次調用map后得到的新的kv對,并把他們spill到文件或者放到內存,以做進一步的處理,比如排序,combine等。

    MapOutputCollector 有兩個子類:MapOutputBufferDirectMapOutputCollectorDirectMapOutputCollector用在不需要Reduce階段的時候。如果Mapper后續有reduce任務,系統會使用MapOutputBuffer做為輸出, MapOutputBuffer使用了一個緩沖區對map的處理結果進行緩存,放在內存中,又使用幾個數組對這個緩沖區進行管理。



    在適當的時機,緩沖區中的數據會被spill到硬盤中。



    向硬盤中寫數據的時機:

    1)當內存緩沖區不能容下一個太大的k v對時。spillSingleRecord方法。

    2)內存緩沖區已滿時。SpillThread線程。

    3Mapper的結果都已經collect了,需要對緩沖區做最后的清理。Flush方法。

    2.5 spillThread線程:將緩沖區中的數據spill到硬盤中。

    1)需要spill時調用函數sortAndSpill,按照partitionkey做排序。默認使用的是快速排序QuickSort

    2)如果沒有combiner,則直接輸出記錄,否則,調用CombinerRunnercombine,先做combin然后輸出。

    6.4 子進程執行ReduceTask

    ReduceTask.run方法開始和MapTask類似,包括initialize()初始化 ,runJobCleanupTask()runJobSetupTask()runTaskCleanupTask()。之后進入正式的工作,主要有這么三個步驟:CopySortReduce

    6.4.1 Copy

    就是從執行各個Map任務的服務器那里,收羅到map的輸出文件。拷貝的任務,是由ReduceTask.ReduceCopier 類來負責。

    6.4.1.1 類圖:



    6.4.1.2 流程: 使用ReduceCopier.fetchOutputs開始

    1)索取任務。使用GetMapEventsThread線程。該線程的run方法不停的調用getMapCompletionEvents方法,該方法又使用RPC調用TaskUmbilicalProtocol協議的getMapCompletionEvents,方法使用所屬的jobID向其父TaskTracker詢問此作業個Map任務的完成狀況(TaskTracker要向JobTracker詢問后再轉告給它...)。返回一個數組TaskCompletionEvent events[]TaskCompletionEvent包含taskidip地址之類的信息。

    2)當獲取到相關Map任務執行服務器的信息后,有一個線程MapOutputCopier開啟,做具體的拷貝工作。 它會在一個單獨的線程內,負責某個Map任務服務器上文件的拷貝工作。MapOutputCopierrun循環調用copyOutputcopyOutput又調用getMapOutput,使用HTTP遠程拷貝。

    3getMapOutput遠程拷貝過來的內容(當然也可以是本地了...),作為MapOutput對象存在,它可以在內存中也可以序列化在磁盤上,這個根據內存使用狀況來自動調節。

    4) 同時,還有一個內存Merger線程InMemFSMergeThread和一個文件Merger線程LocalFSMerger在同步工作,它們將下載過來的文件(可能在內存中,簡單的統稱為文件...),做著歸并排序,以此,節約時間,降低輸入文件的數量,為后續的排序工作減 負。InMemFSMergeThreadrun循環調用doInMemMerge, 該方法使用工具類Merger實現歸并,如果需要combine,則combinerRunner.combine

    6.4.2 Sort

    排序工作,就相當于上述排序工作的一個延續。它會在所有的文件都拷貝完畢后進行。使用工具類Merger歸并所有的文件。經過這一個流程,一個合并了所有所需Map任務輸出文件的新文件產生了。而那些從其他各個服務器網羅過來的 Map任務輸出文件,全部刪除了。

    6.4.3 Reduce

    Reduce任務的最后一個階段。他會準備好 keyClass"mapred.output.key.class""mapred.mapoutput.key.class", valueClass("mapred.mapoutput.value.class""mapred.output.value.class")Comparator(“mapred.output.value.groupfn.class”或 “mapred.output.key.comparator.class”)。最后調用runOldReducer方法。(也是兩套API,我們分析runOldReducer

    6.4.4 runReducer

    1)輸出方面。它會準備一個OutputCollector收集輸出,與MapTask不同,這個OutputCollector更為簡單,僅僅是打開一個RecordWritercollect一次,write一次。最大的不同在于,這次傳入RecordWriter的文件系統,基本都是分布式文件系統, 或者說是HDFS

    2)輸入方面,ReduceTask會用準備好的KeyClassValueClassKeyComparator等等之類的自定義類,構造出Reducer所需的鍵類型, 和值的迭代類型Iterator(一個鍵到了這里一般是對應一組值)。

    3)有了輸入,有了輸出,不斷循環調用自定義的Reducer,最終,Reduce階段完成。



     

     

    Feedback

    # re: hadoop中mapreduce部分執行流程  回復  更多評論   

    2011-06-14 16:00 by laowuuser
    寫的很好,學習了

    # re: hadoop中mapreduce部分執行流程  回復  更多評論   

    2011-08-04 09:02 by wlu
    寫的太棒了

    # re: hadoop中mapreduce部分執行流程  回復  更多評論   

    2011-08-31 17:42 by chemcial
    JobClient.runJob(org.apache.hadoop.mapreduce.Job)調用錯了

    # re: hadoop中mapreduce部分執行流程  回復  更多評論   

    2011-09-05 19:50 by 俞靈
    @chemcial
    沒看懂你的意思
    JobClient.runJob(org.apache.hadoop.mapred.JobConf) ?

    # re: hadoop中mapreduce部分執行流程[未登錄]  回復  更多評論   

    2013-03-06 21:02 by owen
    寫的很好,支持

    # re: hadoop中mapreduce部分執行流程  回復  更多評論   

    2013-08-09 10:40 by itzzq
    樓主寫得很好,可是圖掛了,能不能重新放一下?

    # re: hadoop中mapreduce部分執行流程[未登錄]  回復  更多評論   

    2013-11-13 16:03 by test
    寫的很不錯,有用。學習了

    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 国产美女被遭强高潮免费网站| 免费看搞黄视频网站| 免费精品人在线二线三线区别| 精品日韩亚洲AV无码一区二区三区 | 国产亚洲免费的视频看| 亚洲熟妇av一区二区三区漫画| 国产免费久久精品丫丫| 亚洲中文字幕第一页在线| 中文在线观看免费网站| 亚洲国产精品无码专区在线观看| 怡红院免费全部视频在线视频| 亚洲va中文字幕无码久久| 久久成人免费电影| 亚洲最大在线视频| 男人的好免费观看在线视频| 亚洲熟女精品中文字幕| 亚洲国产电影av在线网址| 91在线免费视频| 亚洲国产精品免费在线观看| 18禁超污无遮挡无码免费网站国产| 亚洲成a∧人片在线观看无码| 亚洲国产精品综合久久网络 | 全部在线播放免费毛片| 久久精品国产亚洲AV不卡| 免费av一区二区三区| 亚洲人成人77777在线播放 | 亚洲av无码成人精品区在线播放| 一个人看的www视频免费在线观看 一个人看的免费观看日本视频www | 国产成人在线观看免费网站| 一级做a爱片特黄在线观看免费看| 国产精品亚洲A∨天堂不卡| xxxxwww免费| 视频一区在线免费观看| 亚洲av无码乱码国产精品| 97性无码区免费| 香蕉免费看一区二区三区| 亚洲国产日韩在线成人蜜芽 | 亚洲熟妇无码一区二区三区 | 久久久久成人片免费观看蜜芽| 亚洲一区中文字幕在线电影网| 免费少妇a级毛片|