|
HBase數(shù)據(jù)遷移(1)-使用HBase的API中的Put方法
使用HBase的API中的Put是最直接的方法,用法也很容易學(xué)習(xí)。但針對(duì)大部分情況,它并非都是最高效的方式。當(dāng)需要將海量數(shù)據(jù)在規(guī)定時(shí)間內(nèi)載
入HBase中時(shí),效率問(wèn)題體現(xiàn)得尤為明顯。待處理的數(shù)據(jù)量一般都是巨大的,這也許是為何我們選擇了HBase而不是其他數(shù)據(jù)庫(kù)的原因。在項(xiàng)目開始之前,
你就該思考如何將所有能夠很好的將數(shù)據(jù)轉(zhuǎn)移進(jìn)HBase,否則之后可能面臨嚴(yán)重的性能問(wèn)題。
HBase有一個(gè)名為 bulk load的功能支持將海量數(shù)據(jù)高效地裝載入HBase中。Bulk load是通過(guò)一個(gè)MapReduce
Job來(lái)實(shí)現(xiàn)的,通過(guò)Job直接生成一個(gè)HBase的內(nèi)部HFile格式文件來(lái)形成一個(gè)特殊的HBase數(shù)據(jù)表,然后直接將數(shù)據(jù)文件加載到運(yùn)行的集群中。
使用bulk
load功能最簡(jiǎn)單的方式就是使用importtsv 工具。importtsv
是從TSV文件直接加載內(nèi)容至HBase的一個(gè)內(nèi)置工具。它通過(guò)運(yùn)行一個(gè)MapReduce
Job,將數(shù)據(jù)從TSV文件中直接寫入HBase的表或者寫入一個(gè)HBase的自有格式數(shù)據(jù)文件。
盡管importtsv
工具在需要將文本數(shù)據(jù)導(dǎo)入HBase的時(shí)候十分有用,但是有一些情況,比如導(dǎo)入其他格式的數(shù)據(jù),你會(huì)希望使用編程來(lái)生成數(shù)據(jù),而MapReduce是處理
海量數(shù)據(jù)最有效的方式。這可能也是HBase中加載海量數(shù)據(jù)唯一最可行的方法了。當(dāng)然我們可以使用MapReduce向HBase導(dǎo)入數(shù)據(jù),但海量的數(shù)據(jù)
集會(huì)使得MapReduce
Job也變得很繁重。若處理不當(dāng),則可能使得MapReduce的job運(yùn)行時(shí)的吞吐量很小。
在HBase中數(shù)據(jù)合并是一項(xiàng)頻繁執(zhí)行寫操作任務(wù),除非我們能夠生成HBase的內(nèi)部數(shù)據(jù)文件,并且直接加載。這樣盡管HBase的寫入速度一直很
快,但是若合并過(guò)程沒有合適的配置,也有可能造成寫操作時(shí)常被阻塞。寫操作很重的任務(wù)可能引起的另一個(gè)問(wèn)題就是將數(shù)據(jù)寫入了相同的族群服務(wù)器
(region
server),這種情況常出現(xiàn)在將海量數(shù)據(jù)導(dǎo)入到一個(gè)新建的HBase中。一旦數(shù)據(jù)集中在相同的服務(wù)器,整個(gè)集群就變得不平衡,并且寫速度會(huì)顯著的降
低。我們將會(huì)在本文中致力于解決這些問(wèn)題。我們將從一個(gè)簡(jiǎn)單的任務(wù)開始,使用API中的Put方法將MySQL中的數(shù)據(jù)導(dǎo)入HBase。接著我們會(huì)描述如
何使用
importtsv 和 bulk
load將TSV數(shù)據(jù)文件導(dǎo)入HBase。我們也會(huì)有一個(gè)MapReduce樣例展示如何使用其他數(shù)據(jù)文件格式來(lái)導(dǎo)入數(shù)據(jù)。上述方式都包括將數(shù)據(jù)直接寫入
HBase中,以及在HDFS中直接寫入HFile類型文件。本文中最后一節(jié)解釋在向HBase導(dǎo)入數(shù)據(jù)之前如何構(gòu)建好集群。本文代碼均是以Java編
寫,我們假設(shè)您具有基本Java知識(shí),所以我們將略過(guò)如何編譯與打包文中的Java示例代碼,但我們會(huì)在示例源碼中進(jìn)行注釋。
通過(guò)單個(gè)客戶端導(dǎo)入MySQL數(shù)據(jù)
數(shù)據(jù)合并最常見的應(yīng)用場(chǎng)景就是從已經(jīng)存在的關(guān)系型數(shù)據(jù)庫(kù)將數(shù)據(jù)導(dǎo)入到HBase中。對(duì)于此類型任務(wù),最簡(jiǎn)單直接的方式就是從一個(gè)單獨(dú)的客戶端獲取數(shù)據(jù),然后通過(guò)HBase的API中Put方法將數(shù)據(jù)存入HBase中。這種方式適合處理數(shù)據(jù)不是太多的情況。
本節(jié)描述的是使用Put方法將MySQL數(shù)據(jù)導(dǎo)入HBase中的方式。所有的操作均是在一個(gè)單獨(dú)的客戶端執(zhí)行,并且不會(huì)使用到MapReduce。本節(jié)將會(huì)帶領(lǐng)你通過(guò)HBase
Shell創(chuàng)建HBase表格,通過(guò)Java來(lái)連接集群,并將數(shù)據(jù)導(dǎo)入HBase。
準(zhǔn)備
公共數(shù)據(jù)集合是個(gè)練習(xí)HBase數(shù)據(jù)合并的很好數(shù)據(jù)源?;ヂ?lián)網(wǎng)上有很多公共數(shù)據(jù)集合。我們?cè)诒疚闹歇?jiǎng)使用 “美國(guó)國(guó)家海洋和大氣管理局
1981-2010氣候平均值”的公共數(shù)據(jù)集合。訪問(wèn)http://www1.ncdc./pub/data/normals
/1981-2010/下載。
這些氣候報(bào)表數(shù)據(jù)是由美國(guó)國(guó)家海洋和大氣管理局(NOAA)生成的。在本文中,我們使用在目錄 products | hourly 下的小時(shí)溫度數(shù)據(jù)(可以在上述鏈接頁(yè)面中找到)。下載hly-temp-normal.txt文件。
需要一個(gè)MySQL實(shí)例,在MySQL數(shù)據(jù)庫(kù)中創(chuàng)建hly_temp_normal表格,使用如下的SQL命令:
- create table hly_temp_normal (
- id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
- stnid CHAR(11),
- month TINYINT,
- day TINYINT,
- value1 VARCHAR(5),
- value2 VARCHAR(5),
- value3 VARCHAR(5),
- value4 VARCHAR(5),
- value5 VARCHAR(5),
- value6 VARCHAR(5),
- value7 VARCHAR(5),
- value8 VARCHAR(5),
- value9 VARCHAR(5),
- value10 VARCHAR(5),
- value11 VARCHAR(5),
- value12 VARCHAR(5),
- value13 VARCHAR(5),
- value14 VARCHAR(5),
- value15 VARCHAR(5),
- value16 VARCHAR(5),
- value17 VARCHAR(5),
- value18 VARCHAR(5),
- value19 VARCHAR(5),
- value20 VARCHAR(5),
- value21 VARCHAR(5),
- value22 VARCHAR(5),
- value23 VARCHAR(5),
- value24 VARCHAR(5)
- );
本文提供了一些腳本將txt中的數(shù)據(jù)導(dǎo)入到MySQL表中。你可以使用 insert_hly.py 來(lái)加載每小時(shí)的NOAA數(shù)據(jù)。只需要修改腳本中的主機(jī)(host),用戶(user),密碼(password)以及數(shù)據(jù)名稱(database
name)。完成修改后就能夠?qū)⑾螺d的hly-temp-normal.txt數(shù)據(jù)導(dǎo)入到mysql的hly_temp_normal 表中,使用命令如下:
$ python insert_hly.py -f hly-temp-normal.txt -t hly_temp_normal
譯者注:此處給出python腳本下載地址(https://github.com/uprush/hac-book/blob/master/2-data-migration/script/insert_hly.py)
(譯者注:由于對(duì)于python的了解有限以及環(huán)境限制,所以單獨(dú)另寫了一段Java的代碼,可以直接使用的:
- import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader;
- import java.io.Reader; import java.sql.Connection; import java.sql.DriverManager;
- import java.sql.PreparedStatement; import java.sql.SQLException; import
- java.util.ArrayList; import java.util.List; public class InsertHly { static
- String user="root"; static String pwd="root123"; static String driver="com.mysql.jdbc.Driver";
- static String url="jdbc:mysql://127.0.0.1:3306/htom?useUnicode=true&characterEncoding=UTF-8";
- public static void main(String[] args) throws SQLException { Connection
- baseCon = null; String sqlStr="insert into hly_temp_normal (stnid,month,day,value1,value2,value3,value4,value5,value6,value7,value8,value9,value10,value11,value12,value13,value14,value15,value16,value17,value18,value19,value20,value21,value22,value23,value24)
- values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; List parasValues=new
- ArrayList(); try { baseCon = DriverManager.getConnection(url, user, pwd);
- } catch (SQLException e) {
- }
- "gbk"); String[] rows=allRowsStr.split("\n"); for(String row : rows){ parasValues.add(row.split("\\s+"));
- } PreparedStatement basePsm = null; try { baseCon.setAutoCommit(false);
- basePsm = baseCon.prepareStatement(sqlStr); for (int i = 0; i < parasValues.size();
- i++) { Object[] parasValue = parasValues.get(i); for (int j = 0; j <
- parasValue.length; j++) { basePsm.setObject(j + 1, parasValue[j]); } basePsm.addBatch();
- } basePsm.executeBatch(); baseCon.commit(); } catch (SQLException e) {
- baseCon.rollback(); throw e; } finally { if (basePsm != null) { basePsm.close();
- basePsm = null; } if (baseCon != null) { baseCon.close(); } } } public
- static String readFileByChars(String fileName, String enc) { StringBuffer
- content=new StringBuffer(); Reader reader = null; try {
- tempchars = new char[30]; int charread = 0; reader = new InputStreamReader(new
- FileInputStream(fileName),enc);
- ((charread = reader.read(tempchars)) != -1) {
- == tempchars.length) && (tempchars[tempchars.length - 1] != '\r'))
- { content.append(tempchars); } else { for (int i = 0; i < charread;
- i++) { if (tempchars[i] == '\r') { continue; } else { content.append(tempchars[i]);
- } } } } return content.toString(); } catch (Exception e1) { e1.printStackTrace();
- } finally { if (reader != null) { try { reader.close(); } catch (IOException
- e1) { } } } return null; } } )
為使得下一節(jié)中的Java源碼能夠編譯,你需要下列庫(kù)支持:
hadoop-core-1.0.2.jar
hbase-0.92.1.jar
mysql-connector-java-5.1.18.jar
你可以將他們手動(dòng)加入classpath中,或者使用本文中的可用的示例代碼。
在導(dǎo)入數(shù)據(jù)之前,確認(rèn)HDFS, ZooKeeper,和HBase集群均正常運(yùn)行。在HBase的客戶端節(jié)點(diǎn)記錄日志。
如何實(shí)施
通過(guò)單節(jié)點(diǎn)客戶端將數(shù)據(jù)從MySQL導(dǎo)入HBase:
1.從HBase的客戶端服務(wù)器從過(guò)HBase的Shell命令行,連接到HBase的集群。
hadoop$ $HBASE_HOME/bin/hbase shell
2.在HBase中創(chuàng)建 hly_temp 表
hbase> create ‘hly_temp’, {NAME => ‘n’, VERSIONS => 1}
3.寫一個(gè)Java程序?qū)?shù)據(jù)從MySQL中導(dǎo)入HBase,并將其打包成jar。在Java中按照下列步驟導(dǎo)入數(shù)據(jù):
i. 使用Java創(chuàng)建一個(gè)connectHBase() 方法來(lái)連接到指定的HBase表:
$ vi Recipe1.java
- private static HTable connectHBase(String tablename) \
- throws IOException {
- HTable table = null;
- Configuration conf = HBaseConfiguration.create();
- table = new HTable(conf, tablename);
- return table;
- }
ii. 使用Java創(chuàng)建一個(gè) connectDB() 方法來(lái) MySQL :
$ vi Recipe1.java
- private static Connection connectDB() \
- throws Exception {
- String userName = "db_user";
- String password = "db_password";
- String url = "jdbc:mysql://db_host/database";
- Class.forName("com.mysql.jdbc.Driver").newInstance();
- Connection conn = DriverManager.getConnection(url,
- userName, password);
- return conn;
- }
此處是Java類中的main() 方法,在其中我們從MySQL獲取數(shù)據(jù)并存入HBase中:
$ vi Recipe1.java
- public class Recipe1 {
- public static void main(String[] args) {
- Connection dbConn = null;
- HTable htable = null;
- Statement stmt = null;
- String query = "select * from hly_temp_normal";
- try {
- dbConn = connectDB();
- htable = connectHBase("hly_temp");
- byte[] family = Bytes.toBytes("n");
- stmt = dbConn.createStatement();
- ResultSet rs = stmt.executeQuery(query);
-
-
- long ts = System.currentTimeMillis();
- while (rs.next()) {
- String stationid = rs.getString("stnid");
- int month = rs.getInt("month");
- int day = rs.getInt("day");
- String rowkey = stationid + Common.lpad(String.
- valueOf(month), 2,
- '0') + Common.lpad(String.valueOf(day), 2, '0');
- Put p = new Put(Bytes.toBytes(rowkey));
-
-
- for (int i = 5; i < 29; i++) {
- String columnI = "v" + Common.lpad
- (String.valueOf(i - 4), 2, '0');
- String valueI = rs.getString(i);
- p.add(family, Bytes.toBytes(columnI), ts,
- Bytes.toBytes(valueI));
- }
- htable.put(p);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- if (stmt != null) {
- stmt.close();
- }
- if (dbConn != null) {
- dbConn.close();
- }
- if (htable != null) {
- htable.close();
- }
- } catch (Exception e) {
-
- }
- }
- }
- }
4.運(yùn)行導(dǎo)入任務(wù),下面的腳本就是用于執(zhí)行JAR文件:
- #/bin/bash
- bin=`dirname $0`
- bin=`cd $bin;pwd`
- cp=$HBASE_HOME/conf:$HBASE_HOME/hbase-0.92.1.jar:$bin/build/hac-
- chapter2.jar
- for jar in $bin/lib/*.jar
- do
- cp=$cp:$jar
- done
- for jar in $HBASE_HOME/lib/*.jar
- do
- cp=$cp:$jar
- done
$JAVA_HOME/bin/java -classpath $cp “hac.chapter2.Recipe1″
5.驗(yàn)證HBase中導(dǎo)入的數(shù)據(jù),通過(guò)HBase的Shell連接至HBase:
hadoop$ $HBASE_HOME/bin/hbase shell
6.驗(yàn)證數(shù)據(jù)已經(jīng)被導(dǎo)入了HBase的對(duì)應(yīng)表中:
hbase> count ‘hly_temp’
95630 row(s) in 8.9850 seconds
hbase> scan ‘hly_temp’, {LIMIT => 10}
…
AQW000617050110 column=n:v23,
timestamp=1322958813521, value=814S
AQW000617050110 column=n:v24,
timestamp=1322958813521, value=811C
10 row(s) in 0.6730 seconds
運(yùn)行原理
在步驟1和2中,我們?cè)贖Base中創(chuàng)建了目標(biāo)表用于插入數(shù)據(jù)。目標(biāo)表名稱為hly_temp,且只有單個(gè)列族(column
family)
n。我們將列族名稱設(shè)計(jì)為一個(gè)字母的原因,是因?yàn)榱凶迕Q會(huì)存儲(chǔ)在HBase的每個(gè)鍵值對(duì)中。使用短名能夠讓數(shù)據(jù)的存儲(chǔ)和緩存更有效率。我們只需要保留一
個(gè)版本的數(shù)據(jù),所以為列族指定VERSION屬性。
在Java代碼中,為了連接到HBase,我們首先創(chuàng)建一個(gè)配置(Configuration
)對(duì)象,使用該對(duì)象創(chuàng)建一個(gè)HTable實(shí)例。這個(gè)HTable對(duì)象用于處理所有的客戶端API調(diào)用。如你所見,我們?cè)诖a沒有設(shè)置任何
ZooKeeper或HBase的連接配置。所以程序該如何連接到運(yùn)行的HBase集群呢?這或許是因?yàn)槲覀冊(cè)诓襟E4中將
$HBase/conf目錄添加到classpath中了。通過(guò)上述設(shè)置,HBase的客戶端API會(huì)classpath中的hbase-
site.xml加載配置信息。連接配置信息在hbase-site.xml中設(shè)置。
在使用JDBC中MySQL中獲取數(shù)據(jù)之后,我們循環(huán)讀取結(jié)果集,將MySQL中的一行映射為HBase表中的一行。此處我們使用stationid,月份和日期欄位來(lái)生成HBase數(shù)據(jù)的row
key。我們?cè)谠路莺腿掌谧筮呉蔡畛?,補(bǔ)足2位數(shù)。這樣做很重要,因?yàn)镠Base的row key是按照字典排序的,意味著12將排序在2之前,這樣可能會(huì)導(dǎo)致一些意外的情況發(fā)生。
我們創(chuàng)建了Put對(duì)象,利用row key添加一行數(shù)據(jù)。每小時(shí)的數(shù)據(jù)的添加需要調(diào)用Put.add()方法,傳入?yún)?shù)包括列族(column family),
限定符(qualifier),時(shí)間戳( timestamp), and 值(value)。再次聲明,我們使用很短的列族名稱能夠讓存儲(chǔ)數(shù)據(jù)更高效。所有的數(shù)據(jù)都被添加之后,我們調(diào)用HTable.put()
方法會(huì)將數(shù)據(jù)保存進(jìn)HBase的table中。
最后,所有打開的資源都需要手動(dòng)關(guān)閉。我們?cè)诖a中的final塊中結(jié)束了MySQL和HBase的連接,這樣確保即時(shí)導(dǎo)入動(dòng)作中拋出異常仍然會(huì)被調(diào)用到。
你能夠通過(guò)對(duì)比MySQL和HBase的數(shù)據(jù)行數(shù)來(lái)驗(yàn)證導(dǎo)入是否正確。你可以在掃描(scan)結(jié)果集中發(fā)現(xiàn)數(shù)據(jù)都準(zhǔn)確的導(dǎo)入了HBase。
|