小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

[轉(zhuǎn)載]Hadoop基本流程以及簡(jiǎn)單應(yīng)用的開(kāi)發(fā)-周園春的博客-科學(xué)網(wǎng)

 wlscut 2011-01-13
轉(zhuǎn)載]Hadoop基本流程以及簡(jiǎn)單應(yīng)用的開(kāi)發(fā)
基本流程






圖太大了,只好分割成為上面兩部分。
根據(jù)流程圖來(lái)說(shuō)一下具體的一個(gè)任務(wù)執(zhí)行的情況。

 

1.  分布式環(huán)境中客戶(hù)端創(chuàng)建任務(wù)并提交。

2.  InputFormatMap前的預(yù)處理,主要負(fù)責(zé)以下工作:

a)         驗(yàn)證輸入的格式是否符合JobConfig的輸入定義,這個(gè)在實(shí)現(xiàn)Map和構(gòu)建Conf的時(shí)候就會(huì)知道,不定義可以是Writable的任意子類(lèi)。

b)        input的文件split為邏輯上的輸入InputSplit,其實(shí)這就是在上面提到的在分布式文件系統(tǒng)中blocksize是有大小限制的,因此大文件會(huì)被劃分為多個(gè)block。

c)         通過(guò)RecordReader來(lái)再次處理inputsplit為一組records,輸出給Map。(inputsplit只是邏輯切分的第一步,但是如何根據(jù)文件中的信息來(lái)切分還需要RecordReader來(lái)實(shí)現(xiàn),例如最簡(jiǎn)單的默認(rèn)方式就是回車(chē)換行的切分)

 

3.  RecordReader處理后的結(jié)果作為Map的輸入,Map執(zhí)行定義的Map邏輯,輸出處理后的key,value對(duì)到臨時(shí)中間文件。

4.  Combiner可選擇配置,主要作用是在每一個(gè)Map執(zhí)行完分析以后,在本地優(yōu)先作Reduce的工作,減少在Reduce過(guò)程中的數(shù)據(jù)傳輸量。

5.  Partitioner可選擇配置,主要作用是在多個(gè)Reduce的情況下,指定Map的結(jié)果由某一個(gè)Reduce處理,每一個(gè)Reduce都會(huì)有單獨(dú)的輸出文件。(后面的代碼實(shí)例中有介紹使用場(chǎng)景)

6.  Reduce執(zhí)行具體的業(yè)務(wù)邏輯,并且將處理結(jié)果輸出給OutputFormat。

7.  OutputFormat的職責(zé)是,驗(yàn)證輸出目錄是否已經(jīng)存在,同時(shí)驗(yàn)證輸出結(jié)果類(lèi)型是否如Config中配置,最后輸出Reduce匯總后的結(jié)果。

代碼范例:

業(yè)務(wù)場(chǎng)景描述:

           可設(shè)定輸入和輸出路徑(操作系統(tǒng)的路徑非HDFS路徑),根據(jù)訪(fǎng)問(wèn)日志分析某一個(gè)應(yīng)用訪(fǎng)問(wèn)某一個(gè)API的總次數(shù)和總流量,統(tǒng)計(jì)后分別輸出到兩個(gè)文件中。

 

僅僅為了測(cè)試,因此沒(méi)有去細(xì)分很多類(lèi),將所有的類(lèi)都?xì)w并于一個(gè)類(lèi)便于說(shuō)明問(wèn)題。



測(cè)試代碼類(lèi)圖
LogAnalysiser就是主類(lèi),主要負(fù)責(zé)創(chuàng)建,提交任務(wù),并且輸出部分信息。內(nèi)部的幾個(gè)子類(lèi)用途可以參看流程中提到的角色職責(zé)。具體的看看幾個(gè)類(lèi)和方法的代碼片斷:

 

LogAnalysiser::MapClass

         public static class MapClass extends MapReduceBase

       implements Mapper<LongWritable, Text, Text, LongWritable>

         {

                   public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)

                                     throws IOException

                   {       

                            String line = value.toString();//沒(méi)有配置RecordReader,所以默認(rèn)采用line的實(shí)現(xiàn),key就是行號(hào),value就是行內(nèi)容

                            if (line == null || line.equals(""))

                                     return;

                            String[] words = line.split(",");

                            if (words == null || words.length < 8)

                                     return;

                            String appid = words[1];

                            String apiName = words[2];

                            LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));

                            Text record = new Text();

                            record.set(new StringBuffer("flow::").append(appid)

                                                                 .append("::").append(apiName).toString());

                            reporter.progress();

                            output.collect(record, recbytes);//輸出流量的統(tǒng)計(jì)結(jié)果,通過(guò)flow::作為前綴來(lái)標(biāo)示。

                            record.clear();

                            record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());

                            output.collect(record, new LongWritable(1));//輸出次數(shù)的統(tǒng)計(jì)結(jié)果,通過(guò)count::作為前綴來(lái)標(biāo)示

                   }       

         }

 

LogAnalysiser:: PartitionerClass

    public static class PartitionerClass implements Partitioner<Text, LongWritable>

      {

           public int getPartition(Text key, LongWritable value, int numPartitions)

           {

                 if (numPartitions >= 2)//Reduce 個(gè)數(shù),判斷流量還是次數(shù)的統(tǒng)計(jì)分配到不同的Reduce

                      if (key.toString().startsWith("flow::"))

                            return 0;

                      else

                            return 1;

                 else

                      return 0;

           }

           public void configure(JobConf job){}  

}

LogAnalysiser:: CombinerClass

參看ReduceClass,通常兩者可以使用一個(gè),不過(guò)這里有些不同的處理就分成了兩個(gè)。在ReduceClass中藍(lán)色的行表示在CombinerClass中不存在。

 

LogAnalysiser:: ReduceClass

    public static class ReduceClass extends MapReduceBase

           implements Reducer<Text, LongWritable,Text, LongWritable>

      {

           public void reduce(Text key, Iterator<LongWritable> values,

                      OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException

           {

                 Text newkey = new Text();

                 newkey.set(key.toString().substring(key.toString().indexOf("::")+2));

                 LongWritable result = new LongWritable();

                 long tmp = 0;

                 int counter = 0;

                 while(values.hasNext())//累加同一個(gè)key的統(tǒng)計(jì)結(jié)果

                 {

                      tmp = tmp + values.next().get();

                     

                      counter = counter +1;//擔(dān)心處理太久,JobTracker長(zhǎng)時(shí)間沒(méi)有收到報(bào)告會(huì)認(rèn)為TaskTracker已經(jīng)失效,因此定時(shí)報(bào)告一下

                      if (counter == 1000)

                      {

                            counter = 0;

                            reporter.progress();

                      }

                 }

                 result.set(tmp);

                 output.collect(newkey, result);//輸出最后的匯總結(jié)果

           }    

      }

 

LogAnalysiser

      public static void main(String[] args)

      {

           try

           {

                 run(args);

           } catch (Exception e)

           {

                 e.printStackTrace();

           }

      }

    public static void run(String[] args) throws Exception

      {

           if (args == null || args.length <2)

           {

                 System.out.println("need inputpath and outputpath");

                 return;

           }

           String inputpath = args[0];

           String outputpath = args[1];

           String shortin = args[0];

           String shortout = args[1];

           if (shortin.indexOf(File.separator) >= 0)

                 shortin = shortin.substring(shortin.lastIndexOf(File.separator));

           if (shortout.indexOf(File.separator) >= 0)

                 shortout = shortout.substring(shortout.lastIndexOf(File.separator));

           SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");

           shortout = new StringBuffer(shortout).append("-")

                 .append(formater.format(new Date())).toString();

          

          

           if (!shortin.startsWith("/"))

                 shortin = "/" + shortin;

           if (!shortout.startsWith("/"))

                 shortout = "/" + shortout;

           shortin = "/user/root" + shortin;

           shortout = "/user/root" + shortout;              

           File inputdir = new File(inputpath);

           File outputdir = new File(outputpath);

           if (!inputdir.exists() || !inputdir.isDirectory())

           {

                 System.out.println("inputpath not exist or isn't dir!");

                 return;

           }

           if (!outputdir.exists())

           {

                 new File(outputpath).mkdirs();

           }

          

           JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//構(gòu)建Config

           FileSystem fileSys = FileSystem.get(conf);

           fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//將本地文件系統(tǒng)的文件拷貝到HDFS

 

           conf.setJobName("analysisjob");

           conf.setOutputKeyClass(Text.class);//輸出的key類(lèi)型,在OutputFormat會(huì)檢查

           conf.setOutputValueClass(LongWritable.class); //輸出的value類(lèi)型,在OutputFormat會(huì)檢查

           conf.setMapperClass(MapClass.class);

           conf.setCombinerClass(CombinerClass.class);

           conf.setReducerClass(ReduceClass.class);

           conf.setPartitionerClass(PartitionerClass.class);

           conf.set("mapred.reduce.tasks", "2");//強(qiáng)制需要有兩個(gè)Reduce來(lái)分別處理流量和次數(shù)的統(tǒng)計(jì)

           FileInputFormat.setInputPaths(conf, shortin);//hdfs中的輸入路徑

           FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中輸出路徑

          

           Date startTime = new Date();

            System.out.println("Job started: " + startTime);

            JobClient.runJob(conf);

            Date end_time = new Date();

            System.out.println("Job ended: " + end_time);

            System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");

            //刪除輸入和輸出的臨時(shí)文件

           fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));

           fileSys.delete(new Path(shortin),true);

           fileSys.delete(new Path(shortout),true);

      }

以上的代碼就完成了所有的邏輯性代碼,然后還需要一個(gè)注冊(cè)驅(qū)動(dòng)類(lèi)來(lái)注冊(cè)業(yè)務(wù)Class為一個(gè)可標(biāo)示的命令,讓hadoop jar可以執(zhí)行。

public class ExampleDriver {

  public static void main(String argv[]){

    ProgramDriver pgd = new ProgramDriver();

    try {

      pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");

      pgd.driver(argv);

    }

    catch(Throwable e){

      e.printStackTrace();

    }

  }

}

將代碼打成jar,并且設(shè)置jarmainClassExampleDriver這個(gè)類(lèi)。

 

在分布式環(huán)境啟動(dòng)以后執(zhí)行如下語(yǔ)句:

hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out

 

/home/wenchu/test-in中是需要分析的日志文件,執(zhí)行后就會(huì)看見(jiàn)整個(gè)執(zhí)行過(guò)程,包括了Map,Reduce的進(jìn)度。執(zhí)行完畢會(huì)在/home/wenchu/test-out下看到輸出的內(nèi)容。有兩個(gè)文件:part-00000part-00001分別記錄了統(tǒng)計(jì)后的結(jié)果。

       如果需要看執(zhí)行的具體情況,可以看在輸出目錄下的_logs/history/xxxx_analysisjob,里面羅列了所有的Map,Reduce的創(chuàng)建情況以及執(zhí)行情況。

 

在運(yùn)行期也可以通過(guò)瀏覽器來(lái)查看Map,Reduce的情況:

http://MasterIP:50030/jobtracker.jsp

 

Hadoop集群測(cè)試


首先這里使用上面的范例作為測(cè)試,也沒(méi)有做太多的優(yōu)化配置,這個(gè)測(cè)試結(jié)果只是為了看看集群的效果,以及一些參數(shù)配置的影響。

 

文件復(fù)制數(shù)為1,blocksize 5M

Slave數(shù)

處理記錄數(shù)(萬(wàn)條)

執(zhí)行時(shí)間(秒)

2

95

38

2

950

337

4

95

24

4

950

178

6

95

21

6

950

114

 

Blocksize 5M

Slave數(shù)

處理記錄數(shù)(萬(wàn)條)

執(zhí)行時(shí)間(秒)

2(文件復(fù)制數(shù)為1

950

337

2(文件復(fù)制數(shù)為3

950

339

6(文件復(fù)制數(shù)為1

950

114

6(文件復(fù)制數(shù)為3

950

117

 

文件復(fù)制數(shù)為1

 

Slave數(shù)

處理記錄數(shù)(萬(wàn)條)

執(zhí)行時(shí)間(秒)

6(blocksize 5M)

95

21

6(blocksize 77M)

95

26

4(blocksize 5M)

950

178

4(blocksize 50M)

950

54

6(blocksize 5M)

950

114

6(blocksize 50M)

950

44

6(blocksize 77M)

950

74

 

測(cè)試的數(shù)據(jù)結(jié)果很穩(wěn)定,基本測(cè)幾次同樣條件下都是一樣。

測(cè)試結(jié)果可以看出一下幾點(diǎn):

1.       機(jī)器數(shù)對(duì)于性能還是有幫助的(等于沒(méi)說(shuō)^_^)。

2.       文件復(fù)制數(shù)的增加只對(duì)安全性有幫助,但是對(duì)于性能沒(méi)有太多幫助。而且現(xiàn)在采取的是將操作系統(tǒng)文件拷貝到HDFS中,所以備份多了,準(zhǔn)備的時(shí)間很長(zhǎng)。

3.       blocksize對(duì)于性能影響很大,首先如果將block劃分的太小,那么將會(huì)增加job的數(shù)量,同時(shí)也增加了協(xié)作的代價(jià),降低了性能,但是配置的太大也會(huì)讓job不能最大化并行處理。所以這個(gè)值的配置需要根據(jù)數(shù)據(jù)處理的量來(lái)考慮。

4.       最后就是除了這個(gè)表里面列出來(lái)的結(jié)果,應(yīng)該去仔細(xì)看輸出目錄中的_logs/history中的xxx_analysisjob這個(gè)文件,里面記錄了全部的執(zhí)行過(guò)程以及讀寫(xiě)情況。這個(gè)可以更加清楚地了解哪里可能會(huì)更加耗時(shí)。


隨想

       “云計(jì)算”熱的燙手,就和SAASWeb2,SNS等等一樣,往往都是在搞概念,只有真正踏踏實(shí)實(shí)的那些大型的互聯(lián)網(wǎng)公司,才會(huì)投入人力物力去研究符合自己的分布式計(jì)算。其實(shí)當(dāng)你的數(shù)據(jù)量沒(méi)有那么大的時(shí)候,這種分布式計(jì)算也就僅僅只是一個(gè)玩具而已,真正只有解決問(wèn)題的過(guò)程中,它深層次的問(wèn)題才會(huì)被挖掘出來(lái)。

       這篇文章僅僅是為了給對(duì)分布式計(jì)算有興趣的朋友拋個(gè)磚,要想真的掘到金子,那么就踏踏實(shí)實(shí)的去用,去想,去分析。后續(xù)自己也會(huì)更進(jìn)一步的去研究框架中的實(shí)現(xiàn)機(jī)制,在解決自己?jiǎn)栴}的同時(shí),也能夠貢獻(xiàn)一些什么。

       前幾日看到有人跪求成為架構(gòu)師的方式,看了有些可悲,有些可笑,其實(shí)有多少架構(gòu)師知道什么叫做架構(gòu),架構(gòu)師的職責(zé)是什么,與其追求這么一個(gè)名號(hào),還不如踏踏實(shí)實(shí)的作塊石頭沉到水底,積累和沉淀的過(guò)程就是一種成長(zhǎng)。

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶(hù)發(fā)布,不代表本站觀(guān)點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買(mǎi)等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶(hù) 評(píng)論公約

    類(lèi)似文章 更多