|
翻譯的一篇國外的關于hadoop mapreduce的文章,文章比較長,先翻譯第一部分吧
翻譯者:pconlin900
博客:http://pconline900. Hadoop是apache的一個開源的map-reduce框架,MapReduce是一個并行計算模型,用來處理海量數(shù)據(jù)。模型思想來源于google的Jeffrey Dean 和 Sanjay Ghemawat,包括map() reduce()兩個主要的功能。
這是一個很簡單的類似于Hadoop的MapReduce應用例子,應用了mapreduce的基本思想,可以幫助理解hadoop的處理思想和技術,但注意,它沒有使用hadoop框架。
例子的功能是創(chuàng)建一些字符串,然后統(tǒng)計這些字符串里面每個字符出現(xiàn)的次數(shù),最后匯總得到總的字符出現(xiàn)次數(shù)。 Listing 1. 主程序
public class Main { public static void main(String[] args)
{ MyMapReduce my = new MyMapReduce();
my.init(); }
} Listing 2. MyMapReduce.java import java.util.*;
public class MyMapReduce
{ List buckets = new ArrayList(); List intermediateresults = new ArrayList(); List values = new ArrayList(); public void init()
{ for(int i = 1; i<=30; i++) { values.add("http://pconline900." + new Integer(i).toString()); } System.out.println("**STEP 1 START**-> Running Conversion into Buckets**");
System.out.println(); List b = step1ConvertIntoBuckets(values,5); System.out.println("************STEP 1 COMPLETE*************"); System.out.println(); System.out.println(); System.out.println("**STEP 2 START**->Running **Map Function** concurrently for all Buckets");
System.out.println(); List res = step2RunMapFunctionForAllBuckets(b); System.out.println("************STEP 2 COMPLETE*************"); System.out.println();
System.out.println(); System.out.println("**STEP 3 START**->Running **Reduce Function** for collating Intermediate Results and Printing Results"); System.out.println(); step3RunReduceFunctionForAllBuckets(res); System.out.println("************STEP 3 COMPLETE*************"); System.out.println("************pconline900 翻譯*************"); System.out.println("***********博客:http://pconline900.*************"); } public List step1ConvertIntoBuckets(List list,int numberofbuckets) { int n = list.size(); int m = n / numberofbuckets; int rem = n% numberofbuckets; int count = 0;
System.out.println("BUCKETS"); for(int j =1; j<= numberofbuckets; j++) { List temp = new ArrayList(); for(int i=1; i<= m; i++) { temp.add((String)values.get(count));
count++; } buckets.add(temp); temp = new ArrayList(); } if(rem != 0) { List temp = new ArrayList(); for(int i =1; i<=rem;i++) { temp.add((String)values.get(count));
count++; } buckets.add(temp); } System.out.println(); System.out.println(buckets); System.out.println(); return buckets; }
public List step2RunMapFunctionForAllBuckets(List list)
{ for(int i=0; i< list.size(); i++) { List elementList = (ArrayList)list.get(i); new StartThread(elementList).start(); } try
{ Thread.currentThread().sleep(1000); }catch(Exception e) { } return intermediateresults; } public void step3RunReduceFunctionForAllBuckets(List list)
{ int sum =0; for(int i=0; i< list.size(); i++) { //you can do some processing here, like finding max of all results etc int t = Integer.parseInt((String)list.get(i)); sum += t; } System.out.println(); System.out.println("Total Count is "+ sum); System.out.println(); }
class StartThread extends Thread
{ private List tempList = new ArrayList(); public StartThread(List list) { tempList = list; } public void run() { for(int i=0; i< tempList.size();i++)
{ String str = (String)tempList.get(i); synchronized(this)
{ intermediateresults.add(new Integer(str.length()).toString()); } } } }
}
init()方法創(chuàng)建了一些測試數(shù)據(jù),作為測試數(shù)據(jù)。實際應用中會是海量數(shù)據(jù)處理。 step1ConvertIntoBuckets()方法將測試數(shù)據(jù)拆分到5個 bucket中,每個bucket是一個ArrayList(包含6個String數(shù)據(jù))。bucket可以保存在內(nèi)存,磁盤,或者集群中的其他節(jié)點;
step2RunMapFunctionForAllBuckets()方法創(chuàng)建了5個線程(每個bucket一個),每個線程StartThread處理每個bucket并把處理結果放在intermediateresults這個arraylist中。
如果bucket分配給不同的節(jié)點處理,必須有一個master主控節(jié)點監(jiān)控各個節(jié)點的計算,匯總各個節(jié)點的處理結果,若有節(jié)點失敗,master必須能夠分配計算任務給其他節(jié)點計算。\
step3RunReduceFunctionForAllBuckets()方法加載intermediateresults中間處理結果,并進行匯總處理,最后得到最終的計算結果。
本文來自CSDN博客,轉載請標明出處:http://blog.csdn.net/joliny/archive/2008/11/24/3360731.aspx
|
|
|