小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

數(shù)據(jù)導(dǎo)入HBase常用三種方式(一):通過(guò)單個(gè)客戶端導(dǎo)入MySQL

 SparkStreaming 2014-11-20

HBase數(shù)據(jù)遷移(1)-使用HBase的API中的Put方法

使用HBase的API中的Put是最直接的方法,用法也很容易學(xué)習(xí)。但針對(duì)大部分情況,它并非都是最高效的方式。當(dāng)需要將海量數(shù)據(jù)在規(guī)定時(shí)間內(nèi)載 入HBase中時(shí),效率問(wèn)題體現(xiàn)得尤為明顯。待處理的數(shù)據(jù)量一般都是巨大的,這也許是為何我們選擇了HBase而不是其他數(shù)據(jù)庫(kù)的原因。在項(xiàng)目開始之前, 你就該思考如何將所有能夠很好的將數(shù)據(jù)轉(zhuǎn)移進(jìn)HBase,否則之后可能面臨嚴(yán)重的性能問(wèn)題。

HBase有一個(gè)名為 bulk load的功能支持將海量數(shù)據(jù)高效地裝載入HBase中。Bulk load是通過(guò)一個(gè)MapReduce Job來(lái)實(shí)現(xiàn)的,通過(guò)Job直接生成一個(gè)HBase的內(nèi)部HFile格式文件來(lái)形成一個(gè)特殊的HBase數(shù)據(jù)表,然后直接將數(shù)據(jù)文件加載到運(yùn)行的集群中。 使用bulk load功能最簡(jiǎn)單的方式就是使用importtsv 工具。importtsv 是從TSV文件直接加載內(nèi)容至HBase的一個(gè)內(nèi)置工具。它通過(guò)運(yùn)行一個(gè)MapReduce Job,將數(shù)據(jù)從TSV文件中直接寫入HBase的表或者寫入一個(gè)HBase的自有格式數(shù)據(jù)文件。

盡管importtsv 工具在需要將文本數(shù)據(jù)導(dǎo)入HBase的時(shí)候十分有用,但是有一些情況,比如導(dǎo)入其他格式的數(shù)據(jù),你會(huì)希望使用編程來(lái)生成數(shù)據(jù),而MapReduce是處理 海量數(shù)據(jù)最有效的方式。這可能也是HBase中加載海量數(shù)據(jù)唯一最可行的方法了。當(dāng)然我們可以使用MapReduce向HBase導(dǎo)入數(shù)據(jù),但海量的數(shù)據(jù) 集會(huì)使得MapReduce Job也變得很繁重。若處理不當(dāng),則可能使得MapReduce的job運(yùn)行時(shí)的吞吐量很小。

在HBase中數(shù)據(jù)合并是一項(xiàng)頻繁執(zhí)行寫操作任務(wù),除非我們能夠生成HBase的內(nèi)部數(shù)據(jù)文件,并且直接加載。這樣盡管HBase的寫入速度一直很 快,但是若合并過(guò)程沒有合適的配置,也有可能造成寫操作時(shí)常被阻塞。寫操作很重的任務(wù)可能引起的另一個(gè)問(wèn)題就是將數(shù)據(jù)寫入了相同的族群服務(wù)器 (region server),這種情況常出現(xiàn)在將海量數(shù)據(jù)導(dǎo)入到一個(gè)新建的HBase中。一旦數(shù)據(jù)集中在相同的服務(wù)器,整個(gè)集群就變得不平衡,并且寫速度會(huì)顯著的降 低。我們將會(huì)在本文中致力于解決這些問(wèn)題。我們將從一個(gè)簡(jiǎn)單的任務(wù)開始,使用API中的Put方法將MySQL中的數(shù)據(jù)導(dǎo)入HBase。接著我們會(huì)描述如 何使用 importtsv 和 bulk load將TSV數(shù)據(jù)文件導(dǎo)入HBase。我們也會(huì)有一個(gè)MapReduce樣例展示如何使用其他數(shù)據(jù)文件格式來(lái)導(dǎo)入數(shù)據(jù)。上述方式都包括將數(shù)據(jù)直接寫入 HBase中,以及在HDFS中直接寫入HFile類型文件。本文中最后一節(jié)解釋在向HBase導(dǎo)入數(shù)據(jù)之前如何構(gòu)建好集群。本文代碼均是以Java編 寫,我們假設(shè)您具有基本Java知識(shí),所以我們將略過(guò)如何編譯與打包文中的Java示例代碼,但我們會(huì)在示例源碼中進(jìn)行注釋。

通過(guò)單個(gè)客戶端導(dǎo)入MySQL數(shù)據(jù)

數(shù)據(jù)合并最常見的應(yīng)用場(chǎng)景就是從已經(jīng)存在的關(guān)系型數(shù)據(jù)庫(kù)將數(shù)據(jù)導(dǎo)入到HBase中。對(duì)于此類型任務(wù),最簡(jiǎn)單直接的方式就是從一個(gè)單獨(dú)的客戶端獲取數(shù)據(jù),然后通過(guò)HBase的API中Put方法將數(shù)據(jù)存入HBase中。這種方式適合處理數(shù)據(jù)不是太多的情況。

本節(jié)描述的是使用Put方法將MySQL數(shù)據(jù)導(dǎo)入HBase中的方式。所有的操作均是在一個(gè)單獨(dú)的客戶端執(zhí)行,并且不會(huì)使用到MapReduce。本節(jié)將會(huì)帶領(lǐng)你通過(guò)HBase Shell創(chuàng)建HBase表格,通過(guò)Java來(lái)連接集群,并將數(shù)據(jù)導(dǎo)入HBase。

準(zhǔn)備

公共數(shù)據(jù)集合是個(gè)練習(xí)HBase數(shù)據(jù)合并的很好數(shù)據(jù)源?;ヂ?lián)網(wǎng)上有很多公共數(shù)據(jù)集合。我們?cè)诒疚闹歇?jiǎng)使用 “美國(guó)國(guó)家海洋和大氣管理局 1981-2010氣候平均值”的公共數(shù)據(jù)集合。訪問(wèn)http://www1.ncdc./pub/data/normals /1981-2010/下載。

這些氣候報(bào)表數(shù)據(jù)是由美國(guó)國(guó)家海洋和大氣管理局(NOAA)生成的。在本文中,我們使用在目錄 products | hourly 下的小時(shí)溫度數(shù)據(jù)(可以在上述鏈接頁(yè)面中找到)。下載hly-temp-normal.txt文件。
需要一個(gè)MySQL實(shí)例,在MySQL數(shù)據(jù)庫(kù)中創(chuàng)建hly_temp_normal表格,使用如下的SQL命令:

  1. create table hly_temp_normal (  
  2. id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,  
  3. stnid CHAR(11),  
  4. month TINYINT,  
  5. day TINYINT,  
  6. value1 VARCHAR(5),  
  7. value2 VARCHAR(5),  
  8. value3 VARCHAR(5),  
  9. value4 VARCHAR(5),  
  10. value5 VARCHAR(5),  
  11. value6 VARCHAR(5),  
  12. value7 VARCHAR(5),  
  13. value8 VARCHAR(5),  
  14. value9 VARCHAR(5),  
  15. value10 VARCHAR(5),  
  16. value11 VARCHAR(5),  
  17. value12 VARCHAR(5),  
  18. value13 VARCHAR(5),  
  19. value14 VARCHAR(5),  
  20. value15 VARCHAR(5),  
  21. value16 VARCHAR(5),  
  22. value17 VARCHAR(5),  
  23. value18 VARCHAR(5),  
  24. value19 VARCHAR(5),  
  25. value20 VARCHAR(5),  
  26. value21 VARCHAR(5),  
  27. value22 VARCHAR(5),  
  28. value23 VARCHAR(5),  
  29. value24 VARCHAR(5)  
  30. );  

本文提供了一些腳本將txt中的數(shù)據(jù)導(dǎo)入到MySQL表中。你可以使用 insert_hly.py 來(lái)加載每小時(shí)的NOAA數(shù)據(jù)。只需要修改腳本中的主機(jī)(host),用戶(user),密碼(password)以及數(shù)據(jù)名稱(database name)。完成修改后就能夠?qū)⑾螺d的hly-temp-normal.txt數(shù)據(jù)導(dǎo)入到mysql的hly_temp_normal 表中,使用命令如下:
$ python insert_hly.py -f hly-temp-normal.txt -t hly_temp_normal

譯者注:此處給出python腳本下載地址(https://github.com/uprush/hac-book/blob/master/2-data-migration/script/insert_hly.py)

譯者注:由于對(duì)于python的了解有限以及環(huán)境限制,所以單獨(dú)另寫了一段Java的代碼,可以直接使用的:

  1. import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader;  
  2.     import java.io.Reader; import java.sql.Connection; import java.sql.DriverManager;  
  3.     import java.sql.PreparedStatement; import java.sql.SQLException; import  
  4.     java.util.ArrayList; import java.util.List; public class InsertHly { static  
  5.     String user="root"static String pwd="root123"static String driver="com.mysql.jdbc.Driver";  
  6.     static String url="jdbc:mysql://127.0.0.1:3306/htom?useUnicode=true&characterEncoding=UTF-8";  
  7.     public static void main(String[] args) throws SQLException { Connection  
  8.     baseCon = null; String sqlStr="insert into hly_temp_normal (stnid,month,day,value1,value2,value3,value4,value5,value6,value7,value8,value9,value10,value11,value12,value13,value14,value15,value16,value17,value18,value19,value20,value21,value22,value23,value24)  
  9.     values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; List parasValues=new  
  10.     ArrayList(); try { baseCon = DriverManager.getConnection(url, user, pwd);  
  11.     } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace();  
  12.     } // 替換為文件地址 String allRowsStr=readFileByChars("d:\\TestZone\\hly-temp-normal.txt",  
  13.     "gbk"); String[] rows=allRowsStr.split("\n"); for(String row : rows){ parasValues.add(row.split("\\s+"));  
  14.     } PreparedStatement basePsm = nulltry { baseCon.setAutoCommit(false);  
  15.     basePsm = baseCon.prepareStatement(sqlStr); for (int i = 0; i < parasValues.size();  
  16.     i++) { Object[] parasValue = parasValues.get(i); for (int j = 0; j <  
  17.     parasValue.length; j++) { basePsm.setObject(j + 1, parasValue[j]); } basePsm.addBatch();  
  18.     } basePsm.executeBatch(); baseCon.commit(); } catch (SQLException e) {  
  19.     baseCon.rollback(); throw e; } finally { if (basePsm != null) { basePsm.close();  
  20.     basePsm = null; } if (baseCon != null) { baseCon.close(); } } } public  
  21.     static String readFileByChars(String fileName, String enc) { StringBuffer  
  22.     content=new StringBuffer(); Reader reader = nulltry { // 一次讀多個(gè)字符 char[]  
  23.     tempchars = new char[30]; int charread = 0; reader = new InputStreamReader(new  
  24.     FileInputStream(fileName),enc); // 讀入多個(gè)字符到字符數(shù)組中,charread為一次讀取字符數(shù) while  
  25.     ((charread = reader.read(tempchars)) != -1) { // 同樣屏蔽掉\r不顯示 if ((charread  
  26.     == tempchars.length) && (tempchars[tempchars.length - 1] != '\r'))  
  27.     { content.append(tempchars); } else { for (int i = 0; i < charread;  
  28.     i++) { if (tempchars[i] == '\r') { continue; } else { content.append(tempchars[i]);  
  29.     } } } } return content.toString(); } catch (Exception e1) { e1.printStackTrace();  
  30.     } finally { if (reader != null) { try { reader.close(); } catch (IOException  
  31.     e1) { } } } return null; } } )  

為使得下一節(jié)中的Java源碼能夠編譯,你需要下列庫(kù)支持:
hadoop-core-1.0.2.jar
hbase-0.92.1.jar
mysql-connector-java-5.1.18.jar

你可以將他們手動(dòng)加入classpath中,或者使用本文中的可用的示例代碼。

在導(dǎo)入數(shù)據(jù)之前,確認(rèn)HDFS, ZooKeeper,和HBase集群均正常運(yùn)行。在HBase的客戶端節(jié)點(diǎn)記錄日志。

如何實(shí)施

通過(guò)單節(jié)點(diǎn)客戶端將數(shù)據(jù)從MySQL導(dǎo)入HBase:
1.從HBase的客戶端服務(wù)器從過(guò)HBase的Shell命令行,連接到HBase的集群。
hadoop$ $HBASE_HOME/bin/hbase shell
2.在HBase中創(chuàng)建 hly_temp 表
hbase> create ‘hly_temp’, {NAME => ‘n’, VERSIONS => 1}
3.寫一個(gè)Java程序?qū)?shù)據(jù)從MySQL中導(dǎo)入HBase,并將其打包成jar。在Java中按照下列步驟導(dǎo)入數(shù)據(jù):
i. 使用Java創(chuàng)建一個(gè)connectHBase() 方法來(lái)連接到指定的HBase表:
$ vi Recipe1.java

  1. private static HTable connectHBase(String tablename) \  
  2. throws IOException {  
  3. HTable table = null;  
  4. Configuration conf = HBaseConfiguration.create();  
  5. table = new HTable(conf, tablename);  
  6. return table;  
  7. }  

ii. 使用Java創(chuàng)建一個(gè) connectDB() 方法來(lái) MySQL :
$ vi Recipe1.java

  1. private static Connection connectDB() \  
  2. throws Exception {  
  3. String userName = "db_user";  
  4. String password = "db_password";  
  5. String url = "jdbc:mysql://db_host/database";  
  6. Class.forName("com.mysql.jdbc.Driver").newInstance();  
  7. Connection conn = DriverManager.getConnection(url,  
  8. userName, password);  
  9. return conn;  
  10. }  

此處是Java類中的main() 方法,在其中我們從MySQL獲取數(shù)據(jù)并存入HBase中:
$ vi Recipe1.java

  1. public class Recipe1 {  
  2.  public static void main(String[] args) {  
  3.    Connection dbConn = null;  
  4.    HTable htable = null;  
  5.    Statement stmt = null;  
  6.    String query = "select * from hly_temp_normal";  
  7.    try {  
  8.      dbConn = connectDB();  
  9.      htable = connectHBase("hly_temp");  
  10.      byte[] family = Bytes.toBytes("n");  
  11.      stmt = dbConn.createStatement();  
  12.      ResultSet rs = stmt.executeQuery(query);  
  13.      // time stamp for all inserted rows  
  14.      // 所有插入數(shù)據(jù)的時(shí)間戳  
  15.      long ts = System.currentTimeMillis();  
  16.      while (rs.next()) {  
  17.        String stationid = rs.getString("stnid");  
  18.        int month = rs.getInt("month");  
  19.        int day = rs.getInt("day");  
  20.        String rowkey = stationid + Common.lpad(String.   
  21.        valueOf(month), 2,    
  22.        '0') + Common.lpad(String.valueOf(day), 2'0');  
  23.        Put p = new Put(Bytes.toBytes(rowkey));  
  24.        // get hourly data from MySQL and put into hbase  
  25.        //從MySQL中獲取小時(shí)數(shù)據(jù)并存入HBase  
  26.        for (int i = 5; i < 29; i++) {  
  27.          String columnI = "v" + Common.lpad   
  28.          (String.valueOf(i - 4), 2'0');  
  29.          String valueI = rs.getString(i);  
  30.          p.add(family, Bytes.toBytes(columnI), ts,    
  31.          Bytes.toBytes(valueI));  
  32.        }  
  33.        htable.put(p);  
  34.      }  
  35.    } catch (Exception e) {  
  36.      e.printStackTrace();  
  37.    } finally {  
  38.      try {  
  39.        if (stmt != null) {  
  40.          stmt.close();  
  41.        }  
  42.        if (dbConn != null) {  
  43.          dbConn.close();  
  44.        }  
  45.        if (htable != null) {  
  46.          htable.close();  
  47.        }  
  48.      } catch (Exception e) {  
  49.        // ignore  
  50.      }  
  51.    }  
  52.  }  
  53. }  

4.運(yùn)行導(dǎo)入任務(wù),下面的腳本就是用于執(zhí)行JAR文件:

  1. #/bin/bash  
  2. bin=`dirname $0`  
  3. bin=`cd $bin;pwd`  
  4. cp=$HBASE_HOME/conf:$HBASE_HOME/hbase-0.92.1.jar:$bin/build/hac-  
  5. chapter2.jar  
  6. for jar in $bin/lib/*.jar  
  7. do  
  8.    cp=$cp:$jar  
  9. done  
  10. for jar in $HBASE_HOME/lib/*.jar  
  11. do  
  12.    cp=$cp:$jar  
  13. done  

$JAVA_HOME/bin/java -classpath $cp “hac.chapter2.Recipe1″

5.驗(yàn)證HBase中導(dǎo)入的數(shù)據(jù),通過(guò)HBase的Shell連接至HBase:
hadoop$ $HBASE_HOME/bin/hbase shell

6.驗(yàn)證數(shù)據(jù)已經(jīng)被導(dǎo)入了HBase的對(duì)應(yīng)表中:
hbase> count ‘hly_temp’
95630 row(s) in 8.9850 seconds
hbase> scan ‘hly_temp’, {LIMIT => 10}

AQW000617050110 column=n:v23,
timestamp=1322958813521, value=814S
AQW000617050110 column=n:v24,
timestamp=1322958813521, value=811C
10 row(s) in 0.6730 seconds

運(yùn)行原理

在步驟1和2中,我們?cè)贖Base中創(chuàng)建了目標(biāo)表用于插入數(shù)據(jù)。目標(biāo)表名稱為hly_temp,且只有單個(gè)列族(column family) n。我們將列族名稱設(shè)計(jì)為一個(gè)字母的原因,是因?yàn)榱凶迕Q會(huì)存儲(chǔ)在HBase的每個(gè)鍵值對(duì)中。使用短名能夠讓數(shù)據(jù)的存儲(chǔ)和緩存更有效率。我們只需要保留一 個(gè)版本的數(shù)據(jù),所以為列族指定VERSION屬性。

在Java代碼中,為了連接到HBase,我們首先創(chuàng)建一個(gè)配置(Configuration )對(duì)象,使用該對(duì)象創(chuàng)建一個(gè)HTable實(shí)例。這個(gè)HTable對(duì)象用于處理所有的客戶端API調(diào)用。如你所見,我們?cè)诖a沒有設(shè)置任何 ZooKeeper或HBase的連接配置。所以程序該如何連接到運(yùn)行的HBase集群呢?這或許是因?yàn)槲覀冊(cè)诓襟E4中將 $HBase/conf目錄添加到classpath中了。通過(guò)上述設(shè)置,HBase的客戶端API會(huì)classpath中的hbase- site.xml加載配置信息。連接配置信息在hbase-site.xml中設(shè)置。

在使用JDBC中MySQL中獲取數(shù)據(jù)之后,我們循環(huán)讀取結(jié)果集,將MySQL中的一行映射為HBase表中的一行。此處我們使用stationid,月份和日期欄位來(lái)生成HBase數(shù)據(jù)的row key。我們?cè)谠路莺腿掌谧筮呉蔡畛?,補(bǔ)足2位數(shù)。這樣做很重要,因?yàn)镠Base的row key是按照字典排序的,意味著12將排序在2之前,這樣可能會(huì)導(dǎo)致一些意外的情況發(fā)生。

我們創(chuàng)建了Put對(duì)象,利用row key添加一行數(shù)據(jù)。每小時(shí)的數(shù)據(jù)的添加需要調(diào)用Put.add()方法,傳入?yún)?shù)包括列族(column family), 限定符(qualifier),時(shí)間戳( timestamp), and 值(value)。再次聲明,我們使用很短的列族名稱能夠讓存儲(chǔ)數(shù)據(jù)更高效。所有的數(shù)據(jù)都被添加之后,我們調(diào)用HTable.put() 方法會(huì)將數(shù)據(jù)保存進(jìn)HBase的table中。

最后,所有打開的資源都需要手動(dòng)關(guān)閉。我們?cè)诖a中的final塊中結(jié)束了MySQL和HBase的連接,這樣確保即時(shí)導(dǎo)入動(dòng)作中拋出異常仍然會(huì)被調(diào)用到。
你能夠通過(guò)對(duì)比MySQL和HBase的數(shù)據(jù)行數(shù)來(lái)驗(yàn)證導(dǎo)入是否正確。你可以在掃描(scan)結(jié)果集中發(fā)現(xiàn)數(shù)據(jù)都準(zhǔn)確的導(dǎo)入了HBase。

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多