LogAnalysiser就是主類(lèi),主要負(fù)責(zé)創(chuàng)建,提交任務(wù),并且輸出部分信息。內(nèi)部的幾個(gè)子類(lèi)用途可以參看流程中提到的角色職責(zé)。具體的看看幾個(gè)類(lèi)和方法的代碼片斷:
LogAnalysiser::MapClass
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, LongWritable>
{
public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException
{
String line = value.toString();//沒(méi)有配置RecordReader,所以默認(rèn)采用line的實(shí)現(xiàn),key就是行號(hào),value就是行內(nèi)容
if (line == null || line.equals(""))
return;
String[] words = line.split(",");
if (words == null || words.length < 8)
return;
String appid = words[1];
String apiName = words[2];
LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));
Text record = new Text();
record.set(new StringBuffer("flow::").append(appid)
.append("::").append(apiName).toString());
reporter.progress();
output.collect(record, recbytes);//輸出流量的統(tǒng)計(jì)結(jié)果,通過(guò)flow::作為前綴來(lái)標(biāo)示。
record.clear();
record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());
output.collect(record, new LongWritable(1));//輸出次數(shù)的統(tǒng)計(jì)結(jié)果,通過(guò)count::作為前綴來(lái)標(biāo)示
}
}
LogAnalysiser:: PartitionerClass
public static class PartitionerClass implements Partitioner<Text, LongWritable>
{
public int getPartition(Text key, LongWritable value, int numPartitions)
{
if (numPartitions >= 2)//Reduce 個(gè)數(shù),判斷流量還是次數(shù)的統(tǒng)計(jì)分配到不同的Reduce
if (key.toString().startsWith("flow::"))
return 0;
else
return 1;
else
return 0;
}
public void configure(JobConf job){}
}
LogAnalysiser:: CombinerClass
參看ReduceClass,通常兩者可以使用一個(gè),不過(guò)這里有些不同的處理就分成了兩個(gè)。在ReduceClass中藍(lán)色的行表示在CombinerClass中不存在。
LogAnalysiser:: ReduceClass
public static class ReduceClass extends MapReduceBase
implements Reducer<Text, LongWritable,Text, LongWritable>
{
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException
{
Text newkey = new Text();
newkey.set(key.toString().substring(key.toString().indexOf("::")+2));
LongWritable result = new LongWritable();
long tmp = 0;
int counter = 0;
while(values.hasNext())//累加同一個(gè)key的統(tǒng)計(jì)結(jié)果
{
tmp = tmp + values.next().get();
counter = counter +1;//擔(dān)心處理太久,JobTracker長(zhǎng)時(shí)間沒(méi)有收到報(bào)告會(huì)認(rèn)為TaskTracker已經(jīng)失效,因此定時(shí)報(bào)告一下
if (counter == 1000)
{
counter = 0;
reporter.progress();
}
}
result.set(tmp);
output.collect(newkey, result);//輸出最后的匯總結(jié)果
}
}
LogAnalysiser
public static void main(String[] args)
{
try
{
run(args);
} catch (Exception e)
{
e.printStackTrace();
}
}
public static void run(String[] args) throws Exception
{
if (args == null || args.length <2)
{
System.out.println("need inputpath and outputpath");
return;
}
String inputpath = args[0];
String outputpath = args[1];
String shortin = args[0];
String shortout = args[1];
if (shortin.indexOf(File.separator) >= 0)
shortin = shortin.substring(shortin.lastIndexOf(File.separator));
if (shortout.indexOf(File.separator) >= 0)
shortout = shortout.substring(shortout.lastIndexOf(File.separator));
SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");
shortout = new StringBuffer(shortout).append("-")
.append(formater.format(new Date())).toString();
if (!shortin.startsWith("/"))
shortin = "/" + shortin;
if (!shortout.startsWith("/"))
shortout = "/" + shortout;
shortin = "/user/root" + shortin;
shortout = "/user/root" + shortout;
File inputdir = new File(inputpath);
File outputdir = new File(outputpath);
if (!inputdir.exists() || !inputdir.isDirectory())
{
System.out.println("inputpath not exist or isn't dir!");
return;
}
if (!outputdir.exists())
{
new File(outputpath).mkdirs();
}
JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//構(gòu)建Config
FileSystem fileSys = FileSystem.get(conf);
fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//將本地文件系統(tǒng)的文件拷貝到HDFS中
conf.setJobName("analysisjob");
conf.setOutputKeyClass(Text.class);//輸出的key類(lèi)型,在OutputFormat會(huì)檢查
conf.setOutputValueClass(LongWritable.class); //輸出的value類(lèi)型,在OutputFormat會(huì)檢查
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(CombinerClass.class);
conf.setReducerClass(ReduceClass.class);
conf.setPartitionerClass(PartitionerClass.class);
conf.set("mapred.reduce.tasks", "2");//強(qiáng)制需要有兩個(gè)Reduce來(lái)分別處理流量和次數(shù)的統(tǒng)計(jì)
FileInputFormat.setInputPaths(conf, shortin);//hdfs中的輸入路徑
FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中輸出路徑
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(conf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
//刪除輸入和輸出的臨時(shí)文件
fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));
fileSys.delete(new Path(shortin),true);
fileSys.delete(new Path(shortout),true);
}
以上的代碼就完成了所有的邏輯性代碼,然后還需要一個(gè)注冊(cè)驅(qū)動(dòng)類(lèi)來(lái)注冊(cè)業(yè)務(wù)Class為一個(gè)可標(biāo)示的命令,讓hadoop jar可以執(zhí)行。
public class ExampleDriver {
public static void main(String argv[]){
ProgramDriver pgd = new ProgramDriver();
try {
pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");
pgd.driver(argv);
}
catch(Throwable e){
e.printStackTrace();
}
}
}
將代碼打成jar,并且設(shè)置jar的mainClass為ExampleDriver這個(gè)類(lèi)。
在分布式環(huán)境啟動(dòng)以后執(zhí)行如下語(yǔ)句:
hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out
在/home/wenchu/test-in中是需要分析的日志文件,執(zhí)行后就會(huì)看見(jiàn)整個(gè)執(zhí)行過(guò)程,包括了Map,Reduce的進(jìn)度。執(zhí)行完畢會(huì)在/home/wenchu/test-out下看到輸出的內(nèi)容。有兩個(gè)文件:part-00000和part-00001分別記錄了統(tǒng)計(jì)后的結(jié)果。
如果需要看執(zhí)行的具體情況,可以看在輸出目錄下的_logs/history/xxxx_analysisjob,里面羅列了所有的Map,Reduce的創(chuàng)建情況以及執(zhí)行情況。
在運(yùn)行期也可以通過(guò)瀏覽器來(lái)查看Map,Reduce的情況:
http://MasterIP:50030/jobtracker.jsp
首先這里使用上面的范例作為測(cè)試,也沒(méi)有做太多的優(yōu)化配置,這個(gè)測(cè)試結(jié)果只是為了看看集群的效果,以及一些參數(shù)配置的影響。
文件復(fù)制數(shù)為1,blocksize 5M
|
Slave數(shù)
|
處理記錄數(shù)(萬(wàn)條)
|
執(zhí)行時(shí)間(秒)
|
|
2
|
95
|
38
|
|
2
|
950
|
337
|
|
4
|
95
|
24
|
|
4
|
950
|
178
|
|
6
|
95
|
21
|
|
6
|
950
|
114
|
Blocksize 5M
|
Slave數(shù)
|
處理記錄數(shù)(萬(wàn)條)
|
執(zhí)行時(shí)間(秒)
|
|
2(文件復(fù)制數(shù)為1)
|
950
|
337
|
|
2(文件復(fù)制數(shù)為3)
|
950
|
339
|
|
6(文件復(fù)制數(shù)為1)
|
950
|
114
|
|
6(文件復(fù)制數(shù)為3)
|
950
|
117
|
文件復(fù)制數(shù)為1
|
Slave數(shù)
|
處理記錄數(shù)(萬(wàn)條)
|
執(zhí)行時(shí)間(秒)
|
|
6(blocksize 5M)
|
95
|
21
|
|
6(blocksize 77M)
|
95
|
26
|
|
4(blocksize 5M)
|
950
|
178
|
|
4(blocksize 50M)
|
950
|
54
|
|
6(blocksize 5M)
|
950
|
114
|
|
6(blocksize 50M)
|
950
|
44
|
|
6(blocksize 77M)
|
950
|
74
|
測(cè)試的數(shù)據(jù)結(jié)果很穩(wěn)定,基本測(cè)幾次同樣條件下都是一樣。
測(cè)試結(jié)果可以看出一下幾點(diǎn):
1. 機(jī)器數(shù)對(duì)于性能還是有幫助的(等于沒(méi)說(shuō)^_^)。
2. 文件復(fù)制數(shù)的增加只對(duì)安全性有幫助,但是對(duì)于性能沒(méi)有太多幫助。而且現(xiàn)在采取的是將操作系統(tǒng)文件拷貝到HDFS中,所以備份多了,準(zhǔn)備的時(shí)間很長(zhǎng)。
3. blocksize對(duì)于性能影響很大,首先如果將block劃分的太小,那么將會(huì)增加job的數(shù)量,同時(shí)也增加了協(xié)作的代價(jià),降低了性能,但是配置的太大也會(huì)讓job不能最大化并行處理。所以這個(gè)值的配置需要根據(jù)數(shù)據(jù)處理的量來(lái)考慮。
4. 最后就是除了這個(gè)表里面列出來(lái)的結(jié)果,應(yīng)該去仔細(xì)看輸出目錄中的_logs/history中的xxx_analysisjob這個(gè)文件,里面記錄了全部的執(zhí)行過(guò)程以及讀寫(xiě)情況。這個(gè)可以更加清楚地了解哪里可能會(huì)更加耗時(shí)。
“云計(jì)算”熱的燙手,就和SAAS,Web2,SNS等等一樣,往往都是在搞概念,只有真正踏踏實(shí)實(shí)的那些大型的互聯(lián)網(wǎng)公司,才會(huì)投入人力物力去研究符合自己的分布式計(jì)算。其實(shí)當(dāng)你的數(shù)據(jù)量沒(méi)有那么大的時(shí)候,這種分布式計(jì)算也就僅僅只是一個(gè)玩具而已,真正只有解決問(wèn)題的過(guò)程中,它深層次的問(wèn)題才會(huì)被挖掘出來(lái)。
這篇文章僅僅是為了給對(duì)分布式計(jì)算有興趣的朋友拋個(gè)磚,要想真的掘到金子,那么就踏踏實(shí)實(shí)的去用,去想,去分析。后續(xù)自己也會(huì)更進(jìn)一步的去研究框架中的實(shí)現(xiàn)機(jī)制,在解決自己?jiǎn)栴}的同時(shí),也能夠貢獻(xiàn)一些什么。
前幾日看到有人跪求成為架構(gòu)師的方式,看了有些可悲,有些可笑,其實(shí)有多少架構(gòu)師知道什么叫做架構(gòu),架構(gòu)師的職責(zé)是什么,與其追求這么一個(gè)名號(hào),還不如踏踏實(shí)實(shí)的作塊石頭沉到水底,積累和沉淀的過(guò)程就是一種成長(zhǎng)。