學習目標: 至少掌握五點: 1. 深入理解HTable,掌握如何結合業(yè)務涉及高性能的HTable。 2. 掌握與HBase的交互,通過HBase Shell命令及Java API進行數(shù)據(jù)的增刪改查。 3. 掌握如何用MapReduce分析HBase里的數(shù)據(jù) 4. 掌握如何測試HBase MapReduce。 HBase簡介: HBase在產(chǎn)品中還包含了Jetty,在HBase啟動時采用嵌入式的方式來啟動Jetty,因此可以通過web界面對HBase進行管理和查看當前運行的一些狀態(tài),非常輕巧。 簡單來說,你在HBase中的表創(chuàng)建的可以看做是一張很大的表,而這個表的屬性可以根據(jù)需求去動態(tài)增加,在HBase中沒有表與表之間關聯(lián)查詢。 列存儲
HStore存儲是HBase存儲的核心了,其中由兩部分組成,一部分是MemStore,一部分是StoreFiles。MemStore是Sorted Memory Buffer,用戶寫入的數(shù)據(jù)首先會放入MemStore,當MemStore滿了以后會Flush成一個StoreFile(底層實現(xiàn)是HFile),當StoreFile文件數(shù)量增長到一定閾值,會觸發(fā)Compact合并操作,將多個StoreFiles合并成一個StoreFile,合并過程中會進行版本合并和數(shù)據(jù)刪除,因此可以看出HBase其實只有增加數(shù)據(jù),所有的更新和刪除操作都是在后續(xù)的compact過程中進行的,這使得用戶的寫操作只要進入內(nèi)存中就可以立即返回,保證了HBase I/O的高性能。當StoreFilesCompact后,會逐步形成越來越大的StoreFile,當單個StoreFile大小超過一定閾值后,會觸發(fā)Split操作,同時把當前Region Split成2個Region,父Region會下線,新Split出的2個孩子Region會被HMaster分配到相應的HRegionServer上,使得原先1個Region的壓力得以分流到2個Region上。 HFile里面的每個KeyValue對就是一個簡單的byte數(shù)組。 HRegion是Hbase中分布式存儲和負載均衡的最小單元。最小單元就表示不同的Hregion可以分布在不同的HRegion server上。但一個Hregion是不會拆分到多個server上的。 HFile格式:
一、HBase基本概念:2個主要概念: a) Rowkey: Hbase 中的記錄是按照rowkey來排序的; b) Column family:(列族)是在系統(tǒng)啟動之前預先定義好的; c) Hbase優(yōu)缺點: 1.不支持條件查詢以及orderby等查詢; 2.列可以動態(tài)增加,列為空則不存儲數(shù)據(jù),節(jié)省存儲空間; 3.會自動切分數(shù)據(jù);4.可以提供高并發(fā)讀寫操作的支持; 訪問方式: 訪問hbasetable中的行,只有三種方式: 1 通過單個row key訪問 2 通過row key的range 3 全表掃描 Row key:行鍵 (Row key)可以是任意字符串(最大長度是 64KB,實際應用中長度一般為 10-100bytes),在hbase內(nèi)部,row key保存為字節(jié)數(shù)組。 存儲: HBase以表的形式存儲數(shù)據(jù)。表有行和列組成,存儲時,數(shù)據(jù)按照Row key的字典序(byte order)排序存儲。設計key時,要充分排序存儲這個特性,將經(jīng)常一起讀取的行存儲放到一起。(位置相關性) 存儲類型: TableName 是字符串 RowKey 和 ColumnName 是二進制值(Java 類型 byte[]) Timestamp 是一個 64 位整數(shù)(Java 類型 long) value 是一個字節(jié)數(shù)組(Java類型 byte[]) 存儲結構:即HTable按Row key自動排序,每個Row包含任意數(shù)量個Columns,Columns之間按Columnkey自動排序,每個Column包含任意數(shù)量個Values。理解該存儲結構將有助于查詢結果的迭代。 (RowKey,List(SortedMap(column,List(value,TimeStamp)))) 列簇:hbase表中的每個列,都歸屬與某個列族。列名都以列族作為前綴。 HBase中的列可以動態(tài)新增。 存儲單元:HBase中通過row和columns確定的為一個存貯單元稱為cell。 每個cell都保存著同一份數(shù)據(jù)的多個版本。版本通過時間戳來索引。 HBase為null的Column不會被存儲,這樣既節(jié)省了空間又提高了讀性能 cell中的數(shù)據(jù)是沒有類型的,全部是字節(jié)碼形式存貯 兩種數(shù)據(jù)版本回收方式:一是保存數(shù)據(jù)的最后n個版本 二是保存最近一段時間內(nèi)的版本(比如最近七天) 用戶可以針對每個列族進行設置值value:每個值由4個鍵唯一索引 tableName+RowKey+ColumnKey+Timestamp=>value
二、搭建HBase環(huán)境:
http://hbase./book/quickstart.html和http://hbase./book/notsoquick.html。 如果你在windows環(huán)境下配置cygwin及ssh遇到問題可以參考http://qa.taobao.com/?p=10633 1. 創(chuàng)建一個Maven工程。 mvn archetype:generate-DgroupId=com.alibaba.webx -DartifactId=tutorial1 -Dversion=1.0-SNAPSHOT -Dpackage=com.alibaba.webx.tutorial1-DarchetypeArtifactId=archetype-webx-quickstart-DarchetypeGroupId=com.alibaba.citrus.sample-DarchetypeVersion=1.0-SNAPSHOT -DinteractiveMode=falsecmd進入剛才建立的項目,運行:mvn jetty:run 在瀏覽器中打開:localhost:8081就可以看到我們新建的webx項目了。具體里面是怎么運行的,可以查看webx用戶手冊。 2. 加入Hadoop、HBase依賴:
3. 在src/test/resources目錄下新建文件conf/hbase-site.xml 文件具體配置為:
[java] view plaincopy
配置你工程要使用的zookeeper客戶端端口號和zookeeper的地址,這個地址可以向開發(fā)索要。
4. 寫測試文件: [java] view plaincopy
如果不報錯,表示鏈接已經(jīng)通過,接下來就可以創(chuàng)建表以及對表的增刪改查了。
三、基礎知識:
1. 通過HBase shell 與HBase交互: 進入控制臺:bin/hbase shell 創(chuàng)建表:create ‘表名’,’列簇名’,’列簇名’ 增加記錄:put ‘表名’,’Row Key’,’列簇名:列名’,’value’ 查詢:get ‘表名’,’Row Key’ 刪除:delete‘表名’,’Row Key’,’列簇名:列名’ (只能刪除一列) delete‘表名’,’Row Key’ (刪除RowKey的所有列) 刪除表:>disable ‘表名’ >drop ‘表名’ 2. 通過Java 的API與HBase交互: 步驟一: 創(chuàng)建一個Maven工程加入依賴:
[java] view plaincopy
如果你的Maven庫里還沒有hbase,還需要配置下repository:
[java] view plaincopy
步驟二: 確保HBase環(huán)境已啟動且能連接到,將HBase環(huán)境的hbase-site.xml文件拷貝到上述工程的src/test/resources目錄 加載配置->創(chuàng)建表->增加記錄->根據(jù)RowKey查詢->遍歷查詢與迭代->刪除記錄->刪除表
具體操作可以參考:http://qa.taobao.com/?p=13894 http://www.cnblogs.com/panfeng412/archive/2011/08/14/2137984.html
四、深入理解HBase:
思考:HBase服務器內(nèi)部由那些主要部件構成? HBase的內(nèi)部工作原理是什么?
1. HBase的工作原理: 首先HBase Client端會連接Zookeeper Qurom(從下面的代碼也能看出來,例如:HBASE_CONFIG.set("hbase.zookeeper.quorum","192.168.50.216") )。通過Zookeeper組件Client能獲知哪個Server管理-ROOT-Region。那么Client就去訪問管理-ROOT-的Server,在META中記錄了HBase中所有表信息,(你可以使用 scan '.META.' 命令列出你創(chuàng)建的所有表的詳細信息),從而獲取Region分布的信息。一旦Client獲取了這一行的位置信息,比如這一行屬于哪個Region,Client將會緩存這個信息并直接訪問HRegionServer。久而久之Client緩存的信息漸漸增多,即使不訪問.META.表也能知道去訪問哪個HRegionServer。HBase中包含兩種基本類型的文件,一種用于存儲WAL的log,另一種用于存儲具體的數(shù)據(jù),這些數(shù)據(jù)都通過DFS Client和分布式的文件系統(tǒng)HDFS進行交互實現(xiàn)存儲。 2. Client訪問數(shù)據(jù)過程: Client訪問用戶數(shù)據(jù)之前需要首先訪問zookeeper,然后訪問-ROOT-表,接著訪問.META.表,最后才能找到用戶數(shù)據(jù)的位置去訪問,中間需要多次網(wǎng)絡操作,不過client端會做cache緩存。 -ROOT-表、.META都是存放在哪里?? client訪問hbase上數(shù)據(jù)的過程并不需要master參與(尋址訪問zookeeper和region server,數(shù)據(jù)讀寫訪問region server),master僅僅維護者table和region的元數(shù)據(jù)信息,負載很低。
3. 在HBase上進行MapReduce操作:
4. HBase系統(tǒng)架構: HBase Client使用HBase的RPC機制與HMaster和HRegionServer進行通信,對于管理類操作,Client與HMaster進行RPC;對于數(shù)據(jù)讀寫類操作,Client與HRegionServer進行RPC
5. Zookeeper: Zookeeper簡單說就是協(xié)調(diào)和服務于分布式應用程序的服務。 Zookeeper Quorum中除了存儲了-ROOT-表的地址和HMaster的地址,HRegionServer也會把自己以Ephemeral方式注冊到Zookeeper中,使得HMaster可以隨時感知到各個HRegionServer的健康狀態(tài)。此外,Zookeeper也避免了HMaster的單點問題。 1 保證任何時候,集群中只有一個master 2存貯所有Region的尋址入口。 3 實時監(jiān)控RegionServer的狀態(tài),將Region server的上線和下線信息實時通知給Master 4 存儲Hbase的schema,包括有哪些table,每個table有哪些column family
Zookeeper到底為我們干了什么? 1. 集中配置:可以APP1的配置配置到/APP1 znode下的所有機器。 2. 集群管理:同步:維護活機列表(讓集群所有機器得到實時更新), 組服務:從集群中選擇Master。 3. ….. 參考:http://hi.baidu.com/surendaxiao/blog/item/cb1b42f86b03084e252df233.html 6. HMaster: HMaster沒有單點問題,HBase中可以啟動多個HMaster,通過Zookeeper的MasterElection機制保證總有一個Master運行,HMaster在功能上主要負責Table和Region的管理工作: 1. 管理用戶對Table的增、刪、改、查操作 2. 管理HRegionServer的負載均衡,調(diào)整Region分布 3. 在Region Split后,負責新Region的分配 4. 在HRegionServer停機后,負責失效HRegionServer 上的Regions遷移 7. HRegionServer: HRegionServer主要負責響應用戶I/O請求,向HDFS文件系統(tǒng)中讀寫數(shù)據(jù),是HBase中最核心的模塊。 HRegionServer內(nèi)部管理了一系列HRegion對象,每個HRegion對應了Table中的一個Region,HRegion中由多個HStore組成。每個HStore對應了Table中的一個ColumnFamily的存儲,可以看出每個Column Family其實就是一個集中的存儲單元,因此最好將具備共同IO特性的column放在一個ColumnFamily中,這樣最高效。
思考:
8. Hadoop+HBase+Zookeeper三者關系:
1.經(jīng)過Map、Reduce運算后產(chǎn)生的結果看上去是被寫入到HBase了,但是其實HBase中HLog和StoreFile中的文件在進行flush to disk操作時,這兩個文件存儲到了HDFS的DataNode中,HDFS才是永久存儲。 2.ZooKeeper跟HadoopCore、HBase有什么關系呢?ZooKeeper都提供了哪些服務呢?主要有:管理Hadoop集群中的NameNode,HBase中HBaseMaster的選舉,Servers之間狀態(tài)同步等。具體一點,細一點說,單只HBase中ZooKeeper實例負責的工作就有:存儲HBase的Schema,實時監(jiān)控HRegionServer,存儲所有Region的尋址入口,當然還有最常見的功能就是保證HBase集群中只有一個Master。
Hadoop、ZooKeeper和HBase之間應該按照順序啟動和關閉:啟動Hadoop—>啟動ZooKeeper集群—>啟動HBase—>停止HBase—>停止ZooKeeper集群—>停止Hadoop。
五:理解Hadoop:
Hadoop學習網(wǎng)址:http://book.51cto.com/art/201106/269616.htm
1. MapReduce (1) MapReduce基礎: Mapper接口:是一個泛型,有4個形式的參數(shù)類型,分別指定map函數(shù)的輸入鍵,輸入值,輸出鍵,輸出值。 數(shù)據(jù)類型:Hadoop規(guī)定了自己的一套可用于網(wǎng)絡序列優(yōu)化的基本類型,而不是使用內(nèi)置的java類型,這些都在org.apache.hadoop.io包中定義,上面使用的Text類型相當于java的String類型,IntWritable類型相當于java的Integer類型。 Maper 和Reducer 可以理解為分久必合,合久必分! Maper是將任務切分成很多個小任務,分配給不同的工作者去完成 Reducer是將哪些工作者做完的工作結果收集起來加以整理匯總成最后結果。 總結:job的配置有著復雜的屬性參數(shù),如文件分割策略、排序策略、map輸出內(nèi)存緩沖區(qū)的大小、工作線程數(shù)量等,深入理解掌握這些參數(shù)才能使自己的MapReduce程序在集群環(huán)境中運行的最優(yōu)。 (2)深入理解MapReduce: (1)在map進行之前,需要對輸入文件在客戶端先進行“分片”,然后將分片信息上傳到HDFS。 (2)分片上傳結束后,jobtracker拿到分片信息,來分配map,reduct task;map對每條記錄的輸出以<key,value> 的形式輸出。 (3)如果定義了combiner,則在本地會對map處理的結果進行處理:對相同key的聚合,對key的排序,value的迭代。combiner完成類似于本地reduce的功能。 (4)在進入reduce階段之前,系統(tǒng)會完成一些列操作(merge,sort):將list中key相同的數(shù)據(jù)進行合并、排序,最后形成<k1`,list<v1`>>的數(shù)據(jù); 然后發(fā)往一個reduce (5)進入一個reduce,相同的key的map輸出會到達同一個reduce,reduce對key相同的多個value進行“reduce操作”;
> 沒有combiner的處理過程: > 添加combiner的處理過程: ?為什么我買的map函數(shù)和reduce函數(shù)一般使用靜態(tài)類? 答:task內(nèi)部可以共享靜態(tài)類屬性,每個task可能會多次調(diào)用map或reduce函數(shù),但每個key只對應某個節(jié)點上的某個task的reduce函數(shù)的一次執(zhí)行。 多個task之間不能共享靜態(tài)類屬性,即使是在同一臺機器上,因為是以進程的方式在運行。
1. Map類:(繼承TableMapper或者Mapper) Map原理: 在map階段,使用job.setInputFormatClass定義的InputFormat將輸入的數(shù)據(jù)集分割成小數(shù)據(jù)塊splites,同時InputFormat提供一個RecordReder的實現(xiàn)。本例子中使用的是 TextInputFormat,他提供的RecordReder會將文本的一行的行號作為key,這一行的文本作為value。這就是自定義Map的輸入是<LongWritable,Text>的原因。然后調(diào)用自定義Map的map方法,將一個個<LongWritable, Text>對輸入給Map的map方法。注意輸出應該符合自定義Map中定義的輸出<IntPair, IntWritable>。最終是生成一個List<IntPair,IntWritable>。在map階段的最后,會先調(diào)用job.setPartitionerClass對這個List進行分區(qū),每個分區(qū)映射到一個reducer。每個分區(qū)內(nèi)又調(diào)用job.setSortComparatorClass設置的key比較函數(shù)類排序??梢钥吹?,這本身就是一個二次排序。如果沒有通過job.setSortComparatorClass設置key比較函數(shù)類,則使用key的實現(xiàn)的compareTo方法。在第一個例子中,使用了IntPair實現(xiàn)的compareTo方法,而在下一個例子中,專門定義了key比較函數(shù)類。
Q: map的結果發(fā)給那個reduce?誰來管理這一切? A: Partitioner用于劃分鍵值空間(key space)。 Partitioner負責控制map輸出結果key的分割。Key(或者一個key子集)被用 于產(chǎn)生分區(qū),通常使用的是Hash函數(shù)。分區(qū)的數(shù)目與一個作業(yè)的reduce任務的數(shù)目是一樣的。因此,它控制將中間過程的key(也就是這條記錄)應該發(fā)送給m個reduce任務中的哪一個來進行reduce操作。
2. Reduce類:(繼承TableReducer或者Reducer) Reduce的原理:在reduce階段,reducer接收到所有映射到這個reducer的map輸出后,也是會調(diào)用job.setSortComparatorClass設置的key比較函數(shù)類對所有數(shù)據(jù)對排序。然后開始構造一個key對應的value迭代器。這時就要用到分組,使用jobjob.setGroupingComparatorClass設置的分組函數(shù)類。只要這個比較器比較的兩個key相同,他們就屬于同一個組,它們的value放在一個value迭代器,而這個迭代器的key使用屬于同一個組的所有key的第一個key。最后就是進入Reducer的reduce方法,reduce方法的輸入是所有的(key和它的value迭代器)。同樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。 reduce的輸出是沒有排序的。
Q: Reduce的數(shù)目應該設置多少? A:Reduce的數(shù)目建議是0.95或1.75乘以 ( *mapred.tasktracker.reduce.tasks.maximum)。用0.95,所有reduce可以在maps一完成時就立刻啟動,開始傳輸map的輸出結 果。用1.75,速度快的節(jié)點可以在完成第一輪reduce任務后,可以開始第二輪,這樣可以得到比較好的負載均衡的效果。上述比例因子比整體數(shù)目稍小一些是為了給框 架中的推測性任務(speculative-tasks) 或失敗的任務預留一些reduce的資源。 Q: Reduce的三個階段都干了什么? A: Reducer有3個主要階段:shuffle、sort和reduce。 Shuffle :Reducer的輸入就是Mapper已經(jīng)排好序的輸出。在這個階段,框架通過HTTP為每個Reducer獲得所有Mapper輸出中與之相關的分塊。(其實就是copy的過程) Sort :這個階段,框架將按照key的值對Reducer的輸入進行分組 (因為不同mapper的輸出中可能會有相同的key,combain保證了同一臺機器相同key的合并,但是不同機器也可能有相同的key)。 map的輸出是一邊被取回一邊被合并的。
3. Job 的配置: (1). 使用job.setInputFormatClass定義的InputFormat將輸入的數(shù)據(jù)集分割成小數(shù)據(jù)塊splites,HadoopMap/Reduce框架為每一個Split產(chǎn)生一個map任務. Map的數(shù)目通常是由輸入數(shù)據(jù)的大小決定的,一般就是所有輸入文件的總塊(block)數(shù)。如果你輸入10TB的數(shù)據(jù),每個塊(block)的大小是 128MB,你將需要大約82,000個map來完成任務,除非使用setNumMapTasks(int)將這個數(shù)值設置得更高。 (2).如果需要中間過程對key的分組規(guī)則和reduce前對key的分組規(guī)則不同,那么可以通過 JobConf.setOutputValueGroupingComparator(Class)來指定一個Comparator。再加上 JobConf.setOutputKeyComparatorClass(Class)可用于控制中間過程的key如何被分組,所以結合兩者可以實現(xiàn)按值的二次排序 (3).一些作業(yè)的參數(shù)可以被直截了當?shù)剡M行設置(例如: setNumReduceTasks(int)),而另一些參數(shù)則與框架或者作業(yè)的其他參數(shù)之間微妙地相互影響,并且設置起來比較復雜(例如: setNumMapTasks(int)) (4).Mapper和Reducer的實現(xiàn)可以利用Reporter 來報告進度,或者僅是表明自己運行正常。我們從界面上看到的圖形就是利用Reporter來進行進度的展示。
(2) MapReduce基本編程: 創(chuàng)建一個Maven工程 加入hadoop依賴 編寫Map類 編寫reduce類 定義job (3) 進行Mapreduce測試: l 用MRUnit做單元測試: 加入mrunit依賴
單獨測試Map 單獨測試Reduce 測試MapReduce 參考:葉渡:Hadoop學習筆記_yedu.pdf
疑惑:1. 使用MRUnit,測試代碼在.run下通過,在.runTest()失敗,原因是什么?兩者有什么區(qū)別?
l 運行MapReduce Job進行集成測試
流程:預設置(準備輸入文件、啟動hadoop進程等)->運行作業(yè)->輸出結果跟預期結果的對比->報告導致失敗的原因
l 精簡HBaseMapReduce測試: 使用Hadoop/HBaseMini Cluster (iTest-hadoop) 參考文檔:http://qa.taobao.com/?p=13939 (不安裝Hadoop、HBase環(huán)境,只要有JDK搞定MapReduce的Job測試)
2. 本地搭建單機版hadoop環(huán)境(win): Hadoop主要是在Linux 平臺下運行的,如果想在 Windows 平臺下運行,你需要安裝 Cygwin 才能運行, Hadoop 腳本。 按照“在Windows上安裝Hadoop教程.pdf“執(zhí)行完成。 安裝還可以參考文檔:http://blog.csdn.net/savechina/article/details/5656937 按照“在Windows上安裝Hadoop教程.pdf“的說明進行到最后一步時,在啟動./start-all.sh之前,需要 格式化一個新的分布式文件,./hadoopnamenode –format .這樣就會啟動JobTracker.
瀏覽NameNode 和JobTracker 的網(wǎng)絡接口,他們的地址默認為: NameNode – http://localhost:50070/ JobTracker – http://localhost:50030/ Node數(shù)為0,如何配置NameNode 和Datanode?? 運行hadoop自帶jar文件: 運行hadoop自帶的jar文件,理解MapReduce的過程:hadoop-0.20.2-examples.jar 跑通自己第一個Job程序: 首先開啟hadoop服務: ./start-all.sh 1. 根據(jù)文檔示例編寫wordCout程序。 2. 將編寫的代碼打包成HadoopTest.jar放到本地某一個目錄下, (打包的時候要選擇mainclass) 或者直接運行hadoop自帶文件中的示例jar包(hadoop-0.20.2-examples.jar)。 3. 將要分析的數(shù)據(jù)傳到hdfs上去 在dfs上創(chuàng)建測試輸入目錄:./hadoop dfs –mkdir test-in 然后將本地文件copy到test-in中: ./hadoop dfs –copyFromLocal [本地文件目錄] test-in 驗證文件是否復制成功: ./hadoop dfs –ls test-in 注:這里的test-in其實是HDFS路徑下的目錄,七絕對路徑為 “http://localhost:50070/user/XXXXX/test-in”
4. 開始執(zhí)行 ./bin/hadoopjar hadoop-0.20.2-examples.jar wordCount test-in test-out 當遇到文件已存在異常的時候,只要將test-out改一個名字即可。 5. 遇到問題:
拋出文件不存在的異常,原因是找不到tmp目錄。開如圖上的目錄看到并不存在那樣的目錄結構,說明根本就沒有創(chuàng)建相應的目錄結構,可能是連tmp都沒有找到,所以查找配置文件發(fā)現(xiàn),conf下的mapred-site.xml中中默認配置是 ./tmp,所以修改成自己的相應目錄就可以了。 6. 運行成功:
運行自己編寫的文件: (1).邏輯性代碼: 1. 編寫自己的mapper函數(shù):繼承Mapper基類,實現(xiàn)map方法 2. 編寫自己的reducer函數(shù):繼承reducer基類,實現(xiàn)reduce方法 3. 編寫自己的主函數(shù):創(chuàng)建job,配置map、combiner、reducer類型,設置輸入輸出路徑, 設置輸出鍵/值格式,提交任務 (2).驅(qū)動性代碼:驅(qū)動類來注冊業(yè)務的class為一個可標示的命令,讓hadoop jar可以執(zhí)行。 如: (3). 最后一步:將自己的項目導出成jar格式,注意:在選擇main class時,是選擇我們創(chuàng)建的驅(qū)動類,而不是邏輯主類。
3. 一個Job的請求過程: 用戶通過界面提交一個Job,服務器把Job請求發(fā)送給gateway,gateway接收請求后按照一定的邏輯拼裝成MR需要的請求文件。 Gateway:我把可以把gateway理解為跳板機,我們的機器不能直接訪問集群,需要一個入口,這個入口就是Gateway。 思考:這個跳板機是單獨拿出來的一臺機器專門做Job的入口的呢,還是只是機群中的普通機器?
JobTracker: TaskTracker:
我們提交一個JOB(一般通過JobClient,這個類有三種策略來提交一個JOB,1、job完成后才返回狀態(tài)2、job提交后,返回一個持有狀態(tài)的Handler,3、提交job,但是不返回狀態(tài)) 首先會從JobTracker(hadoop中運用了master/slaver機制,他是master服務,那么slaver在這里就是tasktracker)中的得到一個job的definition Id, 其實這個id也就是JobTracker管理job的個數(shù) jobClient會從Configuration找到hadoop系統(tǒng)目錄("mapred.system.dir",默認值"/tmp/hadoop/mapred/system")在這里jobClient做了一件比較重要的事情,他把input的數(shù)據(jù)做split操作(相當于將大數(shù)據(jù)量切分成若干塊,具體切分成多大,這個通過一個公式來計算的:FileInputFormat的策略max(minimumSize,min(maximumSize, blockSize)) 其中minSize表示一個map切分的最小容量,maxSize即最大容量,blockSize表示HDFS中的block容量)[1],從而決定了Map的個數(shù)(其實就是MapTask的數(shù)量)。jobClient還將一部分資源文件放到jobtracker的FS中(jar、file、archives、split[2]) 正式提交 JT(jobtracker)會根據(jù)這個job創(chuàng)建一個JobInProgress對象,這個對象記錄著這個job所有信息。最后JT會將這個job注冊到JobInProgressListener中(以下簡稱JIPL),讓JIPL監(jiān)聽這些job。JIPL是在JT啟動的時候啟動的監(jiān)聽器(由TaskScheduler注入,作用參照step 7)。一個EagerTaskInitializationListener:它是一個生命周期和JT一樣的監(jiān)聽線程,主要功能就是初始化這個Job,并且創(chuàng)建相應的TaskInProgress(TIP,包括M個MapTask,N個ReduceTask,2個CleanTask,2個SetupTask)。另一個就是JobQueueJobInProgressListener,這個listener是處理job隊列的,也就是job提交的先后順序跟它有關系,默認的是FIFO。 在初始化job的時候(其實是初始化MapTask),會將之前的input數(shù)據(jù)split的信息回流回來,初始化maptask 到這里,JT初始化job工作完畢。 每臺slaver機器啟動的時候,都會啟動一個tasktracker的線程,這個線程主要負責和JT去通信,也就是發(fā)送心跳(通過RPC通信協(xié)議)。當發(fā)送心跳的時候,TT會將自己現(xiàn)有的狀態(tài)(是否是剛剛啟動、是否剛初始化,自己狀態(tài)是否可以申請新的task,如果JT中沒有這個TT的引用,那么需要保存下來)JT首先會獲取Setup和CleanUp的Task(默認每個job都會有兩個setuptask和兩個cleanup task),如果沒有了以上兩種類型的task,那么剩下的就是MapTask和ReduceTask此時,JT會去向TaskScheduler這個調(diào)度類去申請Task。在hadoop中,默認的TaskScheduler是JobQueueTaskScheduler,他持有JPL的引用。當TT發(fā)送一個心跳表明自己空閑需要執(zhí)行Task時,這時候,JT會調(diào)用Schedule的assignTask方法去獲得一個Task。(這里Hadoop找MapTask的時候,首先node-local,然后rack-local,最后才是不同機架,具體怎么找,還未仔細看)
TT發(fā)送心跳后,JT返回給TT一個HeartbeatResponse對象的引用,這里面包含著需要執(zhí)行Tasks的action數(shù)組(如果action的類型LaunchTaskAction:執(zhí)行一個新的Task, 如果CommitTaskAction:加入commitResponses列表,由Task在適當?shù)臅r候提交給JobTracker),同時JT還會更新TTS的內(nèi)容。所以TT根據(jù)這兩點,就可以很好的判斷自己Task在JT那邊的狀態(tài)。 執(zhí)行任務前先調(diào)用localizeTask()更新一下jobConf文件并寫入到本地目錄中。然后通過調(diào)用Task的createRunner()方法創(chuàng)建TaskRunner對象并調(diào)用其start()方法,值得注意點是,TaskRunner會去新開一個JVM去執(zhí)行Task(如果考慮掉Task開銷小而且多,可以將jvm reuse)。
兩種啟動Job方式: A:Submit() submit函數(shù)會把Job提交給對應的Cluster,然后不等待Job執(zhí)行結束就立刻返回。同時會把Job實例的狀態(tài)設置為JobState.RUNNING,從而來表示Job正在進行中。然后在Job運行過程中,可以調(diào)用getJobState()來獲取Job的運行狀態(tài)。 B:waitForCompletion(boolean) waitForCompletion函數(shù)會提交Job到對應的Cluster,并等待Job執(zhí)行結束。函數(shù)的boolean參數(shù)表示是否打印Job執(zhí)行的相關信息。返回的結果是一個boolean變量,用來標識Job的執(zhí)行結果。 執(zhí)行Job的內(nèi)部流程: 1).Inputformat會從job的INPUT_DIR目錄下讀入待處理的文件,檢查輸入的有效性并將文件切分成InputSplit列表。Job實例可以通過setInputFormatClass(Class<? extends InputFormat>)函數(shù)來設置所需的inputformat。 2).當Inputformat對輸入文件分片后,會對每個分片構建一個MapperTask實例(MapTask(String, TaskAttemptID, int, TaskSplitIndex, int))。其實整個Mapper類的調(diào)度過程,都是由MapperTask來實現(xiàn)的。MapperTask的run(JobConf,TaskUmbilicalProtocol)方法實現(xiàn)了對于Mapper task調(diào)度的整個過程。 2.1) RecordReader會解析InputSplit,生成對應的key/value pair。Inputformat中有一個除了用于分片的getSplits(JobContext)方法外,還有一個方法createRecordReader(InputSplit,TaskAttemptContext),該方法用于給每一個分片創(chuàng)建一個RecordReader。重寫這個方法,可以添加自己的RecordReader。 2.2)Mapper類會對屬于一個InputSplit所有key/value pair調(diào)用一次map函數(shù)。關于Mapper類的作用,在Javadoc中描述如下:“Mapper maps input key/value pairs to a set ofintermediate key/value pairs”。 Job實例可以通過setMapperClass(Class<? extends Mapper>)函數(shù)來設置自己的Mapper類。 2.3)可以通過Job實例的setSortComparatorClass(Class<?extends RawComparator>)方法來為Mapper設定一個Comparator class,用來對Mapper的結果根據(jù)key進行排序。 2.4)可以通過Job實例的setPartitionerClass(Class<? extends Partitioner>)方法來為Mapper設定一個Partitioner Class,用來對Mapper的結果根據(jù)Reducer進行分片。 2.5)可以通過Job實例的setCombinerClass(Class<? extends Reducer>)方法為Mapper設定一個Combiner Class,用來在本地進行聚集操作,從而減少從Mapper到Reducer的數(shù)據(jù)傳輸量。 3).Mapper執(zhí)行結束之后,ReducerTask類會被用來進行整個Reducer操作的調(diào)度 3.1)Shuffle類會被調(diào)用從而來獲取在Mapper輸出中屬于本Reducer的分片,并將多個分片combine成一個。 3.2)Shuffle類會使用MergeManager根據(jù)Job實例的setSortComparatorClass(Class<?extends RawComparator>)所設定的Comparator class對key/value pair進行排序 3.3)在shuffle操作執(zhí)行結束之后,如果對于Reducer的input數(shù)據(jù),有使用特殊分組的需求的話,可以通過Job實例的setGroupingComparatorClass(Class<?extends RawComparator>)方法來實現(xiàn)定制的分組策略,否則,則使用setSortComparatorClass(Class<?extends RawComparator>)的比較方式。 3.4)在分組后的結果中,針對每一個<key, (list of values)> pair 調(diào)用Reduce的reduce(K2, Iterator<V2>, OutputCollector<K3, V3>,Reporter)方法??梢酝ㄟ^Job實例的setReducerClass(Class<?extends Reducer>)方法類設置相應的Reduce實現(xiàn)。 4).Reduce的結果將由OutputCollector.collect(WritableComparable, Writable)寫入文件系統(tǒng) 參考文檔:http://blog.csdn.net/derekjiang/article/details/6851625 思考:
3. 啟動Hadoop過程: (1) 啟動NameNode: 啟動NameNode節(jié)點; 初始化操作(如在name目錄下創(chuàng)建文件); 記錄HDFS狀態(tài)(如安全模式狀態(tài)); 本機FS注冊,啟動HDFS容器,并初始化; (2) 啟動DataNode: (3) 啟動SecondaryNameNode: (4) 啟動JobTracker: (5) 啟動TaskTracker: 4. 運行Map,Reduce過程: 1. 在分布式環(huán)境中客戶端創(chuàng)建任務并提交。 2. InputFormat做Map前的預處理,主要負責以下工作: 1. 驗證輸入的格式是否符合JobConfig的輸入定義,這個在實現(xiàn)Map和構建Conf的時候就會知道,不定義可以是Writable的任意子類。 2. 將input的文件切分為邏輯上的輸入InputSplit,其實這就是在上面提到的在分布式文件系統(tǒng)中blocksize是有大小限制的,因此大文件會被劃分為多個block。 3. 通過RecordReader來再次處理inputsplit為一組records,輸出給Map。(inputsplit只是邏輯切分的第一步,但是如何根據(jù)文件中的信息來切分還需要RecordReader來實現(xiàn),例如最簡單的默認方式就是回車換行的切分) 3. RecordReader處理后的結果作為Map的輸入,Map執(zhí)行定義的Map邏輯,輸出處理后的key和value對應到臨時中間文件。 4. Combiner可選擇配置,主要作用是在每一個Map執(zhí)行完分析以后,在本地優(yōu)先作Reduce的工作,減少在Reduce過程中的數(shù)據(jù)傳輸量。 5. Partitioner可選擇配置,主要作用是在多個Reduce的情況下,指定Map的結果由某一個Reduce處理,每一個Reduce都會有單獨的輸出文件。(后面的代碼實例中有介紹使用場景) 6. Reduce執(zhí)行具體的業(yè)務邏輯,并且將處理結果輸出給OutputFormat。 7. OutputFormat的職責是,驗證輸出目錄是否已經(jīng)存在,同時驗證輸出結果類型是否如Config中配置,最后輸出Reduce匯總后的結果。 5. MapReduce 中如何處理HBase中的數(shù)據(jù)?如何讀取HBase數(shù)據(jù)給Map?如何將結果存儲到HBase中? Mapper類:包括一個內(nèi)部類(Context)和四個方法(setup,map,cleanup,run); setup,cleanup用于管理Mapper生命周期中的資源。setup -> map -> cleanup, run方法執(zhí)行了這個過程; map方法用于對一次輸入的key/value對進行map動作,對應HBase操作也就是一行的處理;
job的配置: 5.1TableInputFormat完成了什么功能? (1)通過設置conf.set(TableInputFormat.INPUT_TABLE,"udc_sell");設定HBase的輸入表; 設置conf.set(TableInputFormat.SCAN,TableMRUtil.convertScanToString(scan));設定對HBase輸入表的scan方式; (2)通過TableInputFormat.setConf(Configration conf)方法初始化scan對象; scan對象是從job中設置的對象,以字符串的形式傳給TableInputFormat,在TableInputFormat內(nèi)部將scan字符創(chuàng)轉(zhuǎn)換為scan對象 * TableMapReduceUtily有兩個方法:convertScanToString和convertStringToScan作用? 將scan實例轉(zhuǎn)換為Base64字符串 和將Base64字符串還原為scan實例; Q:為什么不直接穿Scan對象而是費盡周折地轉(zhuǎn)換來轉(zhuǎn)換去呢? A: (3)TableInputFormat繼承了TableInputFormatBase實現(xiàn)了InputFormat抽象類的兩個抽象方法: getSplits()和createRecordReader()方法: l getSplits()斷定輸入對象的切分原則:對于TableInputFormatBase,會遍歷HBase相應表的所有HRegion,每一個HRegion都會被分成一個split,所以切分的塊數(shù)是與表中HRegion的數(shù)目是相同的; InputSplitsplit =newTableSplit(table.getTableName(),splitStart, splitStop, regionLocation);在split中只會記載HRegion的其實rowkey和終止rowkey,具體的去讀取這篇區(qū)域的數(shù)據(jù)是createRecordReader()實現(xiàn)的。 計算出來的每一個分塊都將被作為一個map Task的輸入; Q:但是分出的塊分給那臺機器的那個task去執(zhí)行Map,即jobTracker如何調(diào)度任務給taskTracker? A: 需要進一步了解Map的本地化運行機制和jobTracker的調(diào)度算法;(可能是就近原則). 對于一個map任務,jobtracker會考慮tasktracker的網(wǎng)絡位置,并選取一個距離其輸入分片文件最近的tasktracker。在最理想的情況下,任務是數(shù)據(jù)本地化的(data-local),也就是任務運行在輸入分片所在的節(jié)點上。同樣,任務也可能是機器本地化的:任務和輸入分片在同一個機架,但不在同一個節(jié)點上。reduce任務,jobtracker簡單滴從待運行的reduce任務列表中選取下一個來運行,用不著考慮數(shù)據(jù)段餓本地化。 l createRecordReader()按照必然格式讀取響應數(shù)據(jù): 接收split塊,返回讀取記錄的結果; public RecordReader<ImmutableBytesWritable,Result> createRecordReader(InputSplit split, TaskAttemptContext context){ } trr.init()返回的是這個分塊的起始rowkey的記錄; RecordReader將一個split解析成<key,value>對的形式提供給map函數(shù),key就是rowkey,value就是對應的一行數(shù)據(jù); RecordReader用于在劃分中讀取<Key,Value>對。RecordReader有五個虛方法,分別是: initialize:初始化,輸入?yún)?shù)包括該Reader工作的數(shù)據(jù)劃分InputSplit和Job的上下文context;nextKey:得到輸入的下一個Key,如果數(shù)據(jù)劃分已經(jīng)沒有新的記錄,返回空; nextValue:得到Key對應的Value,必須在調(diào)用nextKey后調(diào)用;getProgress:得到現(xiàn)在的進度; close:來自java.io的Closeable接口,用于清理RecordReader。 5.2 job.setInputFormatClass(TableInputFormat.class);
5.3 TableMapReduceUtil.initTableReducerJob("daily_result",DailyReduce.class, job); 使用了該方法就不需要再單獨定義 initTableReducerJob()方法完成了一系列操作: (1). job.setOutputFormatClass(TableOutputFormat.class); 設置輸出格式; (2). conf.set(TableOutputFormat.OUTPUT_TABLE, table);設置輸出表; (3).初始化partition;
六:HBase測試點:
前提:自己維護HBase集群,否則無需關注HBase本身。 1. 功能測試: (1) Row Key的校驗(重點): rowkey的長度、rowkey的排序、rowkey是否有遺失 (2) Value的校驗: (3) Table schema: TTL(生存周期): 壓縮方式:Value值的壓縮是否出錯。 (4) Family名稱正確性的校驗: (5) 破壞性校驗: 由于HBase的數(shù)據(jù)都是在集群中有備份的,所以才去人工宕機,查看數(shù)據(jù)是否能夠正常取出。
2. 性能測試: (1) 對HBase性能測試的工具:YCSB YCSB(Yahoo!Cloud Serving Benchmark)是雅虎開源的一款通用的性能測試工具。 通過這個工具我們可以對各類NoSQL產(chǎn)品進行相關的性能測試。 參考文檔:http://www.cnblogs.com/gpcuster/archive/2011/08/16/2141430.html
參考:http://www./(趨勢科技)
七:Hadoop測試點:
1. Job任務請求: job需要解析一個request的請求文件,這里需要考慮到文件編碼格式的問題。 2. MR數(shù)據(jù)處理: (1) MR異常:
3. 程序的穩(wěn)定和優(yōu)化:
Hadoop測試參考:HADOOP測試常見問題和測試方法.docx
八:附
1. RPC通信協(xié)議: RPC(RemoteProcedure Call Protocol)——遠程過程調(diào)用協(xié)議,它是一種通過網(wǎng)絡從遠程計算機程序上請求服務,而不需要了解底層網(wǎng)絡技術的協(xié)議。RPC協(xié)議假定某些傳輸協(xié)議的存在,如TCP或UDP,為通信程序之間攜帶信息數(shù)據(jù)。在OSI網(wǎng)絡通信模型中,RPC跨越了傳輸層和應用層。RPC使得開發(fā)包括網(wǎng)絡分布式多程序在內(nèi)的應用程序更加容易。
九:隨想:
1. Hadoop 的分布式并行運算有一個作為主控的JobTracker,用于調(diào)度和管理其它的 TaskTracker, JobTracker 可以運行于集群中任一臺計算機上。TaskTracker負責執(zhí)行任務,必須運行于 DataNode 上,即 DataNode 既是數(shù)據(jù)存儲結點,也是計算結點。
思考: JobTracker是如何從閑置的機器中選擇出來的?是不是任何一臺集群中的機器都可能有成為JobTracker的可能?所以機器都同事裝了JobTracker和TaskTracker嗎? 是誰在管理著JobTracker的分配和TaskTracker的運行? 2. 隨筆記錄: 1.Zookeeper中記錄了-ROOT-表的location,我們的程序會通過我們配置的zookeeper地址找到zookeeper,然后根據(jù)zookeeper中存儲的-ROOT-表的location,去到相應的機器上訪問-ROOT-表,根據(jù)-ROOT-表中描述的.META表找到相應的Ration信息。 -ROOT-表只有一個區(qū)域,而.META可以有多個區(qū)域。
|
|
|