小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

Hadoop源碼之我見

 gerial 2011-11-23

為了不遺忘和可以速查源碼,準(zhǔn)備重新讀一遍Hadoop的MapReduce部分的源碼,記錄下來,盡量詳細(xì)點(diǎn)。如要轉(zhuǎn)載,請(qǐng)標(biāo)明出處。

 

寫MapReduce程序首先接觸的是Job類,Job類是管理一個(gè)集群作業(yè)的類,包含了一個(gè)作業(yè)的所有信息和向集群提交作業(yè)的方法。

 

 

如圖所示,它有以上一些方法,我們寫程序是調(diào)用waitForCompletion()方法,方法實(shí)現(xiàn)如下:

 

 

  1. public boolean waitForCompletion(boolean verbose  
  2.                                  ) throws IOException, InterruptedException,  
  3.                                           ClassNotFoundException {  
  4.   if (state == JobState.DEFINE) {  
  5.     submit();  
  6.   }  
  7.   if (verbose) {  
  8.     jobClient.monitorAndPrintJob(conf, info);  
  9.   } else {  
  10.     info.waitForCompletion();  
  11.   }  
  12.   return isSuccessful();  
  13. }  
 

 

 

它調(diào)用了submit向集群提交作業(yè),下面看下submit()方法:

 

 

  1. public void submit() throws IOException, InterruptedException,   
  2.                             ClassNotFoundException {  
  3.   ensureState(JobState.DEFINE);  
  4. 建立新的API,檢查兼容性   
  5.   setUseNewAPI();  
  6.   info = jobClient.submitJobInternal(conf);  
  7.   state = JobState.RUNNING;  
  8.  }  
 

 

jobClient是在初始化時(shí)候建立的。

 

  1. public Job(Configuration conf) throws IOException {  
  2.   super(conf, null);  
  3.   jobClient = new JobClient((JobConf) getConfiguration());  
  4. }  
 

 

JobClient類 建立了一個(gè)代理,用于連接JobTracker(集群上的master結(jié)點(diǎn)),

 

  1. public JobClient(JobConf conf) throws IOException {  
  2.   setConf(conf);  
  3.   init(conf);  
  4. }  
  5. /** 
  6.  * Connect to the default {@link JobTracker}. 
  7.  * @param conf the job configuration. 
  8.  * @throws IOException 
  9.  */  
  10. public void init(JobConf conf) throws IOException {  
  11.   String tracker = conf.get("mapred.job.tracker""local");  
  12.   if ("local".equals(tracker)) {  
  13.     this.jobSubmitClient = new LocalJobRunner(conf);  
  14.   } else {  
  15.     this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);  
  16.   }          
  17. }  
 

 

這個(gè)代理會(huì)檢查mapred.job.tracker 這個(gè)屬性有沒有建立,默認(rèn)值是local,如果建立了,則建立一個(gè)連接JobTracker的代理。這個(gè)代理負(fù)責(zé)上傳作業(yè)的配置和作業(yè)內(nèi)容到集群中。

 

  1. private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,  
  2.     Configuration conf) throws IOException {  
  3.   return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,  
  4.       JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,  
  5.       NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));  
  6. }  
 

發(fā)現(xiàn)他實(shí)現(xiàn)了JobSubmissionProtocol接口的一個(gè)對(duì)象

  1. public static VersionedProtocol getProxy(Class<?> protocol,  
  2.     long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,  
  3.     Configuration conf, SocketFactory factory) throws IOException {      
  4.       
  5.   VersionedProtocol proxy =  
  6.       (VersionedProtocol) Proxy.newProxyInstance(  
  7.           protocol.getClassLoader(), new Class[] { protocol },  
  8.           new Invoker(addr, ticket, conf, factory));  
  9.   long serverVersion = proxy.getProtocolVersion(protocol.getName(),   
  10.                                                 clientVersion);  
  11.   if (serverVersion == clientVersion) {  
  12.     return proxy;  
  13.   } else {  
  14.     throw new VersionMismatch(protocol.getName(), clientVersion,   
  15.                               serverVersion);  
  16.   }  
  17. }  
 

 

 

總之,Job類使用了一個(gè)實(shí)現(xiàn)了JobSubmissionProtocol接口的一個(gè)代理,這個(gè)代理對(duì)象可以用來和集群通信,job類的一些方法也可以用來幫助我們對(duì)集群和任務(wù)的進(jìn)展情況進(jìn)行查看。

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多