小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

HBase權(quán)威指南(中文版)——第三章(第二部分)

 閑來看看 2013-08-14

第三章 客戶端API: 基礎(chǔ)篇(第三部分)

批量操作

前面介紹了如何逐條和批量地添加、讀取、刪除數(shù)據(jù)。在這一節(jié),我們將介紹如何一次執(zhí)行多種不同類型的操作處理多行記錄。

事實上,一些使用List結(jié)構(gòu)的批量操作,如delete(List<Delete>
deletes)
或者get(List(Get> gets)等,底層都是依靠batch實現(xiàn)的。封裝這些接口可以更好的提高友好性。

在下面介紹的批量操作中,您將會看到一個新的類型叫Row,前面將到的Put、GetDelete類都是從Row類的子類。

void
batch(List<Row> actions, Object[] results)

throws
IOException, InterruptedException

Object[] batch(List<Row>
actions)

throws
IOException, InterruptedException

由于Row的存在,以及它和Get、Put、Delete的繼承關(guān)系,決定了可以在一個列表中混合多種不同類型的操作。示例3-16給出了這種使用的例子。

值得注意地是,您不應(yīng)該將一個rowPutDelete操作混合在一起。因為List中多個操作在服務(wù)器端執(zhí)行的順序是無法保證的,這樣會得到一個無法預(yù)料到的結(jié)果。

示例3-16 批量操作

private final
static byte[] ROW1 = Bytes.toBytes(“row1″);

private final
static byte[] ROW2 = Bytes.toBytes(“row2″);

private final
static byte[] COLFAM1 = Bytes.toBytes(“colfam1″);

private final
static byte[] COLFAM2 = Bytes.toBytes(“colfam2″);

private final
static byte[] QUAL1 = Bytes.toBytes(“qual1″);

private final
static byte[] QUAL2 = Bytes.toBytes(“qual2″);

List<Row>
batch = new ArrayList<Row>();

Put put = new
Put(ROW2);

put.add(COLFAM2,
QUAL1, Bytes.toBytes(“val5″));

batch.add(put);

Get get1 = new
Get(ROW1);

get1.addColumn(COLFAM1,
QUAL1);

batch.add(get1);

Delete delete =
new Delete(ROW1);

delete.deleteColumns(COLFAM1,
QUAL2);

batch.add(delete);

Get get2 = new
Get(ROW2);

get2.addFamily(Bytes.toBytes(“BOGUS”));

batch.add(get2);

Object[] results =
new Object[batch.size()];

try {

table.batch(batch,
results);

} catch (Exception
e) {

System.err.println(“Error:
” + e);

}

for (int i = 0; i
< results.length; i++) {

System.out.println(“Result["
+ i + "]: ” + results[i]);

}

首先定義了一組指向row、column family、column
qualifier
的常量,方便重用。然后創(chuàng)建一個Row實例的列表。向其中分別加入一個Put、GetDelete實例,再添加一個指向不存在列的Put操作。創(chuàng)建一個Result數(shù)組,大小和batch的大小相同。然后執(zhí)行batch方法,返回的結(jié)果放在result中,最后打印出Result數(shù)組的值。

整個程序的輸出如下:

Before batch
call…

KV:
row1/colfam1:qual1/1/Put/vlen=4, Value: val1

KV:
row1/colfam1:qual2/2/Put/vlen=4, Value: val2

KV: row1/colfam1:qual3/3/Put/vlen=4,
Value: val3

Result[0]:
keyvalues=NONE

Result[1]:
keyvalues={row1/colfam1:qual1/1/Put/vlen=4}

Result[2]:
keyvalues=NONE

Result[3]:
org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException:

org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException:

Column family
BOGUS does not exist in …

After batch
call…

KV:
row1/colfam1:qual1/1/Put/vlen=4, Value: val1

KV:
row1/colfam1:qual3/3/Put/vlen=4, Value: val3

KV:
row2/colfam2:qual1/1308836506340/Put/vlen=4, Value: val5

Error:
org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException:

Failed 1 action:
NoSuchColumnFamilyException: 1 time,

servers with
issues: 10.0.0.43:60020,

在前面的例子中,事先插入了一些測試數(shù)據(jù)并打印出來,讓您方便地看出示例代碼做的事情。最后發(fā)現(xiàn),想刪除的記錄成功被刪除,想插入的新記錄也成功被插入了。

Get操作的結(jié)果,需要從Result數(shù)組中找到。Result數(shù)組的大小和請求操作的總數(shù)相等。第一個ResultPut操作的結(jié)果,為一個空的KeyValue結(jié)構(gòu);第二個Result的值是Get操作的結(jié)果,可以打印出它的值。第三個值為Delete操作的值,為一個空的KeyValue結(jié)構(gòu)。表3-7給出了ResultRow類型之前的對應(yīng)關(guān)系。

3-7 batch()調(diào)用可能返回的結(jié)果

Result

Description

null

The operation has failed to
communicate with the remote server.

Empty Result

Returned for successful Put
and Delete operations.

Result

Returned for successful Get
operations, but may also be empty when there was no matching row or column.

Throwable

In case the servers return an
exception for the operation it is returned to the client as-is. You can use
it to check what went wrong and maybe handle the problem automatically in
your code.

觀察示例3-16執(zhí)行的結(jié)果,您可以發(fā)現(xiàn)空的Result對象打印出了keyvalues=NONE Get操作打印出了對應(yīng)取到的值。對于錯誤列上的Put操作得到了一個異常。

值得注意地是,當您使用batch()方法時,里面的Put實例并不會使用客戶端緩存。batch()調(diào)用是同步的,同時直接向服務(wù)器發(fā)送請求。沒有時延和其它處理過程,這和批量put操作是不同的。因此,您需要選擇合適于您的。

batch調(diào)用有兩種形式,一種將Result數(shù)據(jù)放在參數(shù)中,另一種放在返回值中。

void
batch(List<Row> actions, Object[] results)

throws
IOException, InterruptedException

Object[]
batch(List<Row> actions)

throws
IOException, InterruptedException

它們之間比較大的區(qū)別在于,當拋出異常時,第一方法的result中被填充了部分結(jié)果。而第二個方法在異常時,將會返回null。

兩種方法都支持get、put、delete操作。如果執(zhí)行其中的任何一個請求時出錯,一個客戶端異常將會被拋出,報告出錯情況??蛻舳司彺娌粫皇褂玫?。void batch(actions,
results)
會得到所有成功操作的結(jié)果和其中失敗的服務(wù)端異常。Object[] batch( actions )只返回客戶端異常,沒有結(jié)果被填充到返回值中。

所有的操作都將在check之前執(zhí)行:當您發(fā)現(xiàn)一個action出現(xiàn)了錯誤,但其它的操作也將被執(zhí)行。在最差的場景下,所有的action都返回失敗。

另一方面,batch操作在意瞬時失敗,比如NotServingRegionException(比如一個Region Server被下線了),它會進行重試幾次。配置項hbase.client.retries.number會設(shè)定重試的次數(shù)(默認的重試次數(shù)為10)。

Row Locks(行鎖)

       更新操作,比如put()、delete()、checkAndPut()等等對于一個row來說是互斥執(zhí)行的,從而保證一個低層面的原子性。Region Server提供了一個row lock行級鎖來保證只有擁有鎖的客戶端才能夠?qū)υ撔羞M行修改。在實際中,客戶端并不提供這種顯示的鎖,而是依賴于一種將每個操作獨立開的機制。

       您應(yīng)該最大限度地避免使用row lock,很容易出現(xiàn)RDBMS中類似的死鎖現(xiàn)象。當Lock在等待超時的過程中,兩個被掛起的客戶端都持有一個句柄,這類句柄屬于稀缺資源。如果這個鎖被加在了一個訪問很頻繁的行上,那么很多客戶端都會被阻塞。

當您使用put使用將一個Put實例發(fā)送到服務(wù)器時,如果您使用了如下的構(gòu)造函數(shù):

Put(byte[] row)

在這個構(gòu)造函數(shù)里,并沒有出現(xiàn)RowLock參數(shù),服務(wù)器會根據(jù)您的行為,自動為您創(chuàng)建一個鎖。這個鎖對于客戶端是透明的,客戶端無法獲取到這個生命期很短的服務(wù)器端的鎖。相比于服務(wù)器自動創(chuàng)建的隱式鎖。客戶端也可以使用顯式鎖,并且在多個操作中使用到它??梢酝ㄟ^以下的方法:

RowLock
lockRow(byte[] row) throws IOException

void
unlockRow(RowLock rl) throws IOException

第一個方法lockRowrowkey為參數(shù),返回一個RockLock實例,這個實例可以傳遞到PutDelete的構(gòu)造函數(shù)中去。一旦您成功的獲取到一個鎖后,在使用完之后,您必須調(diào)用unlockRow方法釋放它。

您申請到的LockRow會鎖定整行,它通過rowkey來確定行,一旦擁有,別的客戶端將不能對該行進行更新操作。

當這個行鎖無論是被客戶端或者服務(wù)器占有時,另一個想申請一個該行的新行鎖的行為都會被掛起,直到這行原有的行鎖被釋放或者過期。后者是對出錯進程持有行鎖情況的一個兼容。

一個行鎖的默認超時時間是1分鐘,可以在hbase-site.xml進行配置:

<property>

<name>hbase.regionserver.lease.period</name>

<value>120000</value>

</property>

上述代碼將超時時間改為120秒。當然,這個值不能設(shè)的太大,否則,每個客戶端申請一個被別的進程占用的鎖的最大等待時間都會變?yōu)檫@個值。示例3-17給出一個由用戶生成的行鎖阻塞其它讀取客戶端的例子。

示例3-17 使用顯示行鎖

static class
UnlockedPut implements Runnable {

@Override

public void run()
{

try {

HTable
table = new HTable(conf, “testtable”);

Put
put = new Put(ROW1);

put.add(COLFAM1,
QUAL1, VAL3);

long
time = System.currentTimeMillis();

System.out.println(“Thread
trying to put same row now…”);

table.put(put);

System.out.println(“Wait
time: ” +

(System.currentTimeMillis()
- time) + “ms”);

} catch
(IOException e) {

System.err.println(“Thread
error: ” + e);

}

}

}

System.out.println(“Taking
out lock…”);

RowLock lock =
table.lockRow(ROW1);

System.out.println(“Lock
ID: ” + lock.getLockId());

Thread thread =
new Thread(new UnlockedPut());

thread.start();

try {

System.out.println(“Sleeping
5secs in main()…”);

Thread.sleep(5000);

} catch
(InterruptedException e) {

//
ignore

}

try {

Put put1 = new
Put(ROW1, lock);

put1.add(COLFAM1,
QUAL1, VAL1);

table.put(put1);

Put put2 = new
Put(ROW1, lock);

put2.add(COLFAM1, QUAL1,
VAL2);

table.put(put2);

} catch (Exception
e) {

System.err.println(“Error:
” + e);

} finally {

System.out.println(“Releasing
lock…”);

table.unlockRow(lock);

}

首先定義了一個的線程,它會不斷使用隱式鎖不斷地訪問同一行上的記錄,在取得鎖之前,put操作始終會被掛起,在線程內(nèi)部會打印出等待的時間。

主線程,先顯示的創(chuàng)建一個行鎖,然后啟動一個前面定義的線程實例。接著,主線程sleep 5秒之后,將這個鎖傳給Put對象,進行put操作之后,這個鎖會被釋放,從而前面那個無鎖的線程會繼續(xù)運行。

運行這段示例代碼,將會得到如下的輸出:

Taking out lock…

Lock ID:
4751274798057238718

Sleeping 5secs in
main()…

Thread trying to
put same row now…

Releasing lock…

Wait time: 5007ms

After thread
ended…

KV:
row1/colfam1:qual1/1300775520118/Put/vlen=4, Value: val2

KV:
row1/colfam1:qual1/1300775520113/Put/vlen=4, Value: val1

KV:
row1/colfam1:qual1/1300775515116/Put/vlen=4, Value: val3

可以看出無鎖的線程的確被阻塞了5秒鐘。直到主線程做完兩次put操作后釋放了行鎖,才繼續(xù)運行。我們可以看到一個有趣的現(xiàn)象,由無鎖對象最后插入的值,卻擁有最小的時間戳。這是因為,實際上無鎖線程的put操作是最早執(zhí)行的。一旦它被發(fā)送到Region Server服務(wù)器,Region Server便會給它打上一個時間戳,雖然此時它還無法獲取到行鎖而被阻塞,但此時的時間戳已經(jīng)生成了。主線程一共花了7ms向服務(wù)器提交了兩次Put命令,并釋放了行鎖。

當您使用一個先前申請到的行鎖,但是由于超時無效時,您將會收到服務(wù)器端生成的一個錯誤,以UnknownRowLockException異常的形式返回。它說明,服務(wù)器端已經(jīng)將這個鎖廢棄掉了。此時,您可以將它drop掉,然后重新申請一個新的鎖。

Scan(掃描)

前面我們討論了基本的CRUD類型的操作,現(xiàn)在輪到scan了,類似于數(shù)據(jù)庫系統(tǒng)中的游標(cursor)。它可以充分使用到HBase提供的順序性的、排序的存儲結(jié)構(gòu)。

基本介紹

scan操作與get操作非常相似。介紹它之前,必須先介紹另一個類Scan。由于scan更像是一個迭代器,因此,并沒有scan()的方法,取而代之的是getScanner方法,它會返回您想要遍歷訪問的scanner對象??梢岳萌缦碌姆椒ㄈ〉剿?/SPAN>

ResultScanner
getScanner(Scan scan) throws IOException

ResultScanner
getScanner(byte[] family) throws IOException

ResultScanner
getScanner(byte[] family, byte[] qualifier)

throws
IOException

后面兩個方法是為了加強友好性,會隱式地創(chuàng)建一個scan對象,然后再調(diào)用getScanner(Scan
scan)
方法取到ResultScanner

Scan類有如下的構(gòu)造函數(shù):

Scan()

Scan(byte[]
startRow, Filter filter)

Scan(byte[]
startRow)

Scan(byte[]
startRow, byte[] stopRow)

Get方法不同的是,您不再指定一個具體的rowkey,您現(xiàn)在可以選擇性的指定一個startRow參數(shù),這個參數(shù)定義HTable中,要讀取的rowkey的起始位置??蛇x的參數(shù)stopRow定義讀取rowkey的結(jié)束位置。

startRow是被包含的,而endRow是不被包含的,因此,可以用以表述式[startRow,
stopRow)
來描述這種關(guān)系。

Scan提供的一個特殊的功能就是您不必精確地指定rowkey。相反,scan會相匹配與指定的startKey相等或者大于它的所有的rowkey。如是沒有指定startKey,那么將會從表的起始位置開始遍歷。

如果指定了stopKey,那么只會遍歷rowkey小于stopKey的所有記錄。如果沒有指定stopKey,那么scan會遍歷到表的末尾。

還有另一個可選的參數(shù)Filter,這個參數(shù)指向一個Filter實例。Scan對象常常使用空的構(gòu)造函數(shù)來創(chuàng)建,另外的參數(shù)都可以通過gettersetter方法進行指定。

Scan實例被創(chuàng)建后,如果想加入更多的限制條件,您可以使用很多的方法來限制讀出的數(shù)據(jù)。當然,您也可以創(chuàng)建一個空的Scan,將整個表的所有的column familycolumn
qualifier
讀出來。

Scan
addFamily(byte [] family)

Scan
addColumn(byte[] family, byte[] qualifier)

Get類相似,您也可以使用addFamily來設(shè)置column
families
或者使用addColumn來設(shè)置column,從而限定讀出數(shù)據(jù)的條件。

如果您只需要數(shù)據(jù)的一部分,那么通過限制Scan的范圍,只是體現(xiàn)了HBase的強大之處,由于數(shù)據(jù)是以column family為物理單元分隔文件的,不在Scan范圍內(nèi)的column family對應(yīng)的文件根本不會被打開,這只是面向列存儲最大的優(yōu)勢所在。

Scan
setTimeRange(long minStamp, long maxStamp) throws IOException

Scan
setTimeStamp(long timestamp)

Scan
setMaxVersions()

Scan setMaxVersions(int
maxVersions)

您還可以限制Scantimestamp,設(shè)置timestamp的范圍,設(shè)置掃描的版本數(shù)。使用setStartRow(),setStopRow()setFilter(),可以達到構(gòu)造函數(shù)中的參數(shù)值同樣的效果。方法hasFilter()可以判斷一個Scan中是否添加了filter。表3-8列出了其它一些方法。

3-8 Scan的方法一覽

Result

Description

getStartRow()/getStopRow()

Can be used to retrieve the
currently assigned values.

getTimeRange()

Retrieves the associated timestamp or time range of the Get
instance. Note that there is no getTimeStamp() since the API converts a value
assigned with setTimeStamp() into a TimeRange instance internally, setting
the minimum and maximum values to the given timestamp.

getMaxVersions()

Returns the currently configured number of versions that should
be retrieved from the table for every column.

getFilter()

Special filter instances can be used to select certain columns
or cells, based on a wide variety of conditions. You can get the currently
assigned filter using this method. It may return null if none was previously
set.

setCacheBlocks()

/getCacheBlocks()

Each HBase region server has a block cache that efficiently
retains recently accessed data for subsequent reads of contiguous
information. In some events it is better to not engage the cache to avoid too
much churn when doing full table scans. These methods give you control over
this feature.

numFamilies()

Convenience method to retrieve the size of the family map,
containing the families added using the addFamily() or addColumn() calls.

hasFamilies()

Another helper to check if a familyor columnhas been added to the current
instance of the Scan class.

getFamilies()

/setFamilyMap()

/getFamilyMap()

These methods give you access to the column families and
specific columns, as added by the addFamily() and/or addColumn() calls. The
family map is a map where the key is the family name and the value is a list
of added column qualifiers for this particular family. The getFamilies()
returns an array of all stored families, i.e., containing only the family
names (as

byte[] arrays).

當您創(chuàng)建了Scan實例之后,您就需要調(diào)用HTablegetScanner方法,取得ResultScanner實例。在下一節(jié)中,我們將詳細介紹ResultScanner類。

ResultScanner

Scans不會將所有匹配到的行通過一個RPC調(diào)用發(fā)送到客戶端,而是分多次。這是因為,一行數(shù)據(jù)可能很大,放在一次傳輸會消耗掉很多資源、花費很長的時間。

ResultScannerscan轉(zhuǎn)化成一種類似于get的操作,將結(jié)果通過迭代器訪問出來。它具有自己的一些方法:

Result next()
throws IOException

Result[] next(int
nbRows) throws IOException

void close()

有兩種形式的next函數(shù)可以調(diào)用,close()操作用來釋放資源。

Next()調(diào)用返回一個單獨的Result實例,存放一個可用的row對象。同樣的,您也可以使用next(int
nbRows)
一次取回來很多行,該調(diào)用返回一個Result的數(shù)組對象,數(shù)組中的每一行都代表一個唯一的row。當然,取到的值可能小于nbRows,但這很少在未取完數(shù)據(jù)時發(fā)生??梢圆榭?,前面對Result實例的介紹,學習如何使用這個類。示例3-18給出了如何使用scan訪問一個表。

示例3-18 使用scan訪問數(shù)據(jù)

Scan scan1 = new
Scan();

ResultScanner
scanner1 = table.getScanner(scan1);

for (Result res :
scanner1) {

System.out.println(res);

}

scanner1.close();

Scan scan2 = new
Scan();

scan2.addFamily(Bytes.toBytes(“colfam1″));

ResultScanner
scanner2 = table.getScanner(scan2);

for (Result res :
scanner2) {

System.out.println(res);

}

scanner2.close();

Scan scan3 = new
Scan();

scan3.addColumn(Bytes.toBytes(“colfam1″),
Bytes.toBytes(“col-5″)).

addColumn(Bytes.toBytes(“colfam2″),
Bytes.toBytes(“col-33″)).

setStartRow(Bytes.toBytes(“row-10″)).

setStopRow(Bytes.toBytes(“row-20″));

ResultScanner
scanner3 = table.getScanner(scan3);

for (Result res :
scanner3) {

System.out.println(res);

}

scanner3.close();

首先創(chuàng)建一個空的Scan對象,使用這個對象對表進行遍歷,然后關(guān)閉這個scanner釋放相關(guān)資源。接著再創(chuàng)建一個只查詢colfam1下記錄的scanner,并打印相關(guān)的記錄。最后創(chuàng)建一個只掃描列colfam1:col-5colfam2:col-33,且rowkey范圍從row-10row-20的所有記錄。

為了測試上述示例,首先創(chuàng)建一個表,含有colfam1colfam2兩個column family。然后很這個表中插入100行記錄。我們不列出全表掃描的輸出結(jié)果,而僅列出scan2scan3的輸出結(jié)果:

Scanning table
#3…

keyvalues={row-10/colfam1:col-5/1300803775078/Put/vlen=8,

row-10/colfam2:col-33/1300803775099/Put/vlen=9}

keyvalues={row-100/colfam1:col-5/1300803780079/Put/vlen=9,

row-100/colfam2:col-33/1300803780095/Put/vlen=10}

keyvalues={row-11/colfam1:col-5/1300803775152/Put/vlen=8,

row-11/colfam2:col-33/1300803775170/Put/vlen=9}

keyvalues={row-12/colfam1:col-5/1300803775212/Put/vlen=8,

row-12/colfam2:col-33/1300803775246/Put/vlen=9}

keyvalues={row-13/colfam1:col-5/1300803775345/Put/vlen=8,

row-13/colfam2:col-33/1300803775376/Put/vlen=9}

keyvalues={row-14/colfam1:col-5/1300803775479/Put/vlen=8,

row-14/colfam2:col-33/1300803775498/Put/vlen=9}

keyvalues={row-15/colfam1:col-5/1300803775554/Put/vlen=8,

row-15/colfam2:col-33/1300803775582/Put/vlen=9}

keyvalues={row-16/colfam1:col-5/1300803775665/Put/vlen=8,

row-16/colfam2:col-33/1300803775687/Put/vlen=9}

keyvalues={row-17/colfam1:col-5/1300803775734/Put/vlen=8,

row-17/colfam2:col-33/1300803775748/Put/vlen=9}

keyvalues={row-18/colfam1:col-5/1300803775791/Put/vlen=8,

row-18/colfam2:col-33/1300803775805/Put/vlen=9}

keyvalues={row-19/colfam1:col-5/1300803775843/Put/vlen=8,

row-19/colfam2:col-33/1300803775859/Put/vlen=9}

keyvalues={row-2/colfam1:col-5/1300803774463/Put/vlen=7,

row-2/colfam2:col-33/1300803774485/Put/vlen=8}

再一次強調(diào)的時,輸出的結(jié)果與插入的順序無關(guān),有趣的是,rowkey的排列按照了字母序進行輸出。

Caching vs. Batching

以目前為止,每個next()操作都是一次RPC調(diào)用,即使當你使用next( int
nbRows)
時。因為這個next( int nbRows )只是一個簡單的next()調(diào)用的客戶端循環(huán)。顯然,這樣的性能是很難令人滿意的。因此,在一次RPC調(diào)用中返回多行便顯得意義重大。這個機制叫制scanner緩存(scanner
caching)
,它默認是關(guān)閉的。

您可以使用在兩個層面打開它:在表一級設(shè)定,將會對這個表的所有的scanner實例生效。在scan一級設(shè)定,將只會對這個scan生效。當然您也可以對通過對HTable實例進行設(shè)定,達到對所有的表生效。

void
setScannerCaching(int scannerCaching)

int
getScannerCaching()

當然您也可以在HBase安裝時,修改默認值??梢酝ㄟ^修改配置文件hbase-site.xml實現(xiàn):

<property>

<name>hbase.client.scanner.caching</name>

<value>10</value>

</property>

上述配置文件將cache的默認大小改為了10。當然,您還可以在代碼中繼續(xù)設(shè)定新的值。

在您通過getScanner方法,得到一個Scanner對象后,通過setScannerCaching()設(shè)置緩存大小,getScannerCaching()得到目前的緩存大小。API將會把設(shè)置的大小傳遞給scanner對象,除非您使用了Scan類的方法:

void
setCaching(int caching)

int getCaching()

scan直接設(shè)置cache大小擁有最高的優(yōu)先級。通過對緩存大小的設(shè)定,可以使一次RPC調(diào)用返回多行記錄。兩種next()都會使用到這個緩存。

您可能需要找到RPC操作的數(shù)據(jù)和內(nèi)存占用情況的一個折中,scanner的緩存大小越大,讀取的性能越好(當然值過大,也不好),但緩存的條目多了之后,一次傳輸消耗的時間越長,占用的堆空間大小也越大,還會引發(fā)OOM異常(OutOfMemoryException)

當從服務(wù)器到客戶端傳輸數(shù)據(jù)的時間或者客戶端處理數(shù)據(jù)的時間大于了scanner設(shè)置的超時時間,那么客戶端將會收到一個ScannerTimeoutException。

示例3-19 scanner超時的例子

Scan scan = new
Scan();

ResultScanner
scanner = table.getScanner(scan);

int scannerTimeout
= (int) conf.getLong(

HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
-1);

try {

Thread.sleep(scannerTimeout
+ 5000);

} catch
(InterruptedException e) {

//
ignore

}

while (true){

try {

Result
result = scanner.next();

if
(result == null) break;

System.out.println(result);

} catch (Exception
e) {

e.printStackTrace();

break;

}

}

scanner.close();

首先獲取當前scanner的超時時間,然后sleep一會兒,等待超時。接著嘗試打印出取到的結(jié)果集。將會得到如下的輸出:

Adding rows to
table…

Current (local)
lease period: 60000

Sleeping now for
65000ms…

Attempting to
iterate over scanner…

Exception in
thread “main” java.lang.RuntimeException:

org.apache.hadoop.hbase.client.ScannerTimeoutException:
65094ms passed

since
the last invocation, timeout is currently set to 60000

at
org.apache.hadoop.hbase.client.HTable$ClientScanner$1.hasNext

at
ScanTimeoutExample.main

Caused by:
org.apache.hadoop.hbase.client.ScannerTimeoutException: 65094ms

passed
since the last invocation, timeout is currently set to 60000

at
org.apache.hadoop.hbase.client.HTable$ClientScanner.next

at
org.apache.hadoop.hbase.client.HTable$ClientScanner$1.hasNext


1 more

Caused by:
org.apache.hadoop.hbase.UnknownScannerException:

org.apache.hadoop.hbase.UnknownScannerException:
Name: -315058406354472427

at
org.apache.hadoop.hbase.regionserver.HRegionServer.next

scanner超時之后,客戶端嘗試打印從服務(wù)器上取出的值時,將會將到一個異常。

您可能想在客戶端代碼中加入以下代碼來增大超時時間:

Configuration conf
= HBaseConfiguration.create()

conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
120000)

但由于scan的超時時間是配在Region Server上的,因此,上述配置并不會生效。如果您真的想修改這個值,您只有去Region Server上去修改hbase-site.xml,并重啟集群。

從輸出打印的堆棧可以看出,ScannerTimeoutException嵌套在了UnknownScannerException之中,這意味著next()調(diào)用使用了一個已經(jīng)過期的scanner ID,但這個ID已經(jīng)被刪除了。換句話說,客戶端存儲的scanner ID,Region Server已經(jīng)不認識了,從而拋出一個UnknownScannerException。

現(xiàn)在,您已經(jīng)學會了如何使用客戶端的scanner緩存來提高批量交互的性能。但有一點要注意的是,對于非常大的行,可能無法放入客戶端的內(nèi)存中。使用HBase客戶端API中的batching,可以處理這種情況:

void setBatch(int
batch)

int getBatch()

caching(處理層次為rows)相對應(yīng),batching處理的層次是columns。它控制一次next()調(diào)用傳輸多少個columns。通過ResultScannersetBatch()方法可以進行設(shè)置,setBatch(5)將使每個Result實例,返回5columns。

當一行含有非常多的列時,您可以使用setBatch方法,一次next()返回一行中的部分列。Result中返回的列也可能達不到batching的值,比較一行有17列,batching的值為5,那么前三次next()將得到5,5,5,最后一次調(diào)用只能返回2個列。

通過設(shè)置cachingbatch的大小,scanner可以在選擇rowkey范圍查詢時控制RPC的多少。示例3-20用兩個參數(shù)來對Result實例大小與請求次數(shù)進行調(diào)優(yōu)。

示例3-20 使用cachingbatch兩個參數(shù)

private static
void scan(int caching, int batch) throws IOException {

Logger log =
Logger.getLogger(“org.apache.hadoop”);

final int[]
counters = {0, 0};

Appender appender
= new AppenderSkeleton() {

@Override

protected void
append(LoggingEvent event) {

String msg =
event.getMessage().toString();

if (msg != null
&& msg.contains(“Call: next”)) {

counters[0]++;

}

}

@Override

public void
close() {}

@Override

public boolean
requiresLayout() {

return
false;

}

};

log.removeAllAppenders();

log.setAdditivity(false);

log.addAppender(appender);

log.setLevel(Level.DEBUG);

Scan scan = new
Scan();

scan.setCaching(caching);

scan.setBatch(batch);

ResultScanner
scanner = table.getScanner(scan);

for (Result result
: scanner) {

counters[1]++;

}

scanner.close();

System.out.println(“Caching:
” + caching + “, Batch: ” + batch +

“,
Results: ” + counters[1] + “, RPCs: ” + counters[0]);

}

public static void
main(String[] args) throws IOException {

scan(1, 1);

scan(200, 1);

scan(2000, 100);

scan(2, 100);

scan(2, 10);

scan(5, 100);

scan(5, 20);

scan(10, 10);

}

示例代碼首先設(shè)置cachingbatch的參數(shù),然后打印Result的大小和RPC的次數(shù)。對不同的cachingbatch大小進行了組合測試。

Caching: 1, Batch:
1, Results: 200, RPCs: 201

Caching: 200,
Batch: 1, Results: 200, RPCs: 2

Caching: 2000,
Batch: 100, Results: 10, RPCs: 1

Caching: 2, Batch:
100, Results: 10, RPCs: 6

Caching: 2, Batch:
10, Results: 20, RPCs: 11

Caching: 5, Batch:
100, Results: 10, RPCs: 3

Caching: 5, Batch:
20, Results: 10, RPCs: 3

Caching: 10,
Batch: 10, Results: 20, RPCs: 3

通過調(diào)整兩個參數(shù)的值,可以觀察它們對結(jié)果的影響。表3-9給出了一些組合的結(jié)果。為了運行示例3-20,首先創(chuàng)建了一個擁有兩個column family的表,添加了10行,每行中,每個column family下添加10column。這就意味著一共存在著200columns或者叫做cell,每個cell只有一個版本。

3-9 示例參數(shù)的影響

Caching

Batch

Results

RPCs

Description

1

1

200

201

Each column is returned as a
separate Result instance. One more RPC is needed to realize the scan is
complete.

200

1

200

2

Each column is a separate
Result, but they are all transferred in one RPC (plus the extra check).

2

10

20

11

The batch is half the row
width, so 200 divided by 10 is 20 Results needed. 10 RPCs (plus the check) to
transfer them.

5

100

10

3

The batch is too large for
each row, so all 20 columns are batched. This requires 10 Result instances.
Caching brings the number of RPCs down to two (plus the check).

5

20

10

3

This is the same as above, but
this time the batch matches the columns available. The outcome is the same.

10

10

20

3

This divides the table into
smaller Result instances, but larger caching also means only two RPCs are
needed.

為了計算RPC的次數(shù),您需要首先將行數(shù)與最行的column數(shù)相乘,然后用這個值除以batchcolumn數(shù)中的較小值。最后用這個值除以caching大小。用數(shù)學公式表示為:

RPCs = (Rows *
Cols per Row) / Min(Cols per Row, Batch Size) /Scanner Caching

還需要額外的RPC操作來打開和關(guān)閉scanner。因此,還scanner還需要兩次額外的RPC操作。

3-2描述了cachingbatching是如何起作用的。如圖所示,該表具有9行值,每行都有不定數(shù)目的column。設(shè)置scannercaching的大小為6,batch大小為3。您可以看到,需要3RPC操作來轉(zhuǎn)輸數(shù)據(jù)(虛線包圍的部分)。

3-2 通過cachingbatching控制scan操作RPC的次數(shù)

由于batch大小小于一行中column的數(shù)目,因此,服務(wù)器將3columns打成一個Result,一次RPC操作可以傳輸6個這樣的Result。如果batch大小不設(shè)置,而caching大小被設(shè)置時,每行記錄將包含一行中所有column,這樣一個Result實例中就是一個完整的行。只有當您設(shè)置了batch參數(shù),才有可能把一行拆成多個Result實例。

一開始您可能不需要考慮cachingbatching的大小設(shè)置,當您進行應(yīng)用程序的調(diào)優(yōu)時,您必須對這個原理非常清楚才能找到一個最好的平衡點。

輔助功能

在進一步了解客戶端的其它功能之前,我們有必要先了解HBase和它的客戶端提供的一些有用的輔助功能。

HTable方法集

客戶端API代表了一個HTable實例,它提供了訪問一個HBase表的一些方法。除了前面提到的一些對于訪問HBase表的主要方法,還有另外一些值得留意的方法:

void close()

該方法前面有所提及,但考慮到它的重要性,這樣有必要專門再次提及。在結(jié)束了對表的訪問之后,一定要調(diào)用close()接口。當close()被調(diào)用時,客戶端會調(diào)用flushCache()方法,將客戶端緩存區(qū)中緩存的數(shù)據(jù)提交到服務(wù)器。

byte[]
getTableName()

這個方法可以方例地取出表名。

HTableDescriptor
getTableDescriptor()

HBase中每個表都會使用一個HTableDescriptor的實例。您可以通過getTableDescriptor()獲取對表信息的訪問。

static Boolean
isTableEnabled(table)

HTable4個靜態(tài)方法,它們都需要一個配置對象,如沒有提供configurationHTable會在程序的classpath下使用一個默認的configuration。該函數(shù)檢查ZooKeepertable表是否為enable狀態(tài)。

byte[][]
getStartKeys()

byte[][]
getEndKeys()

pair<byte[][],
byte[][]> getStartEndKeys()

這幾個函數(shù)可以訪問表中當前的rowkey范圍,隨著數(shù)據(jù)的不斷增加,調(diào)用后也會得到不同的結(jié)果。這3個方法返回byte數(shù)組。您可以使用Bytes.toStringBinary()來打印出key值。

void
clearRegionCache()

HRegionLocation
getRegionLocation( row )

Map<HRegionInfo,
HServerAddress> getRegionInfo()

這幾個方法使您可以取出Region的信息,您可以使用第一個方法來清楚客戶端上的緩存,也可以使用第三個方法來取出所有Region信息。這些方法幫助一些高級使用者來利用Region信息,比較路由負載、計算靠近數(shù)據(jù)等。

void
prewarmRegionCache(Map<HRegionInfo, HServerAddress> regionMap)

static void
setRegionCachePrefetch(table, enable)

static Boolean
getRegionCachePrefetch(table)

這也是一組高級用法的API。這組API可以提前將tableRegion信息緩存到客戶端。使用上述API,您可以提供一個Region的列表來對Region信息進行預(yù)熱。

Bytes

該類用來將Java類型,比如String或者long轉(zhuǎn)化為raw、byte數(shù)組等HBase支持的類型。因此,再介紹這個類和它的函數(shù)是有意義的。

大多數(shù)方法都有這三種形式,比如:

static long
toLong(byte[] bytes)

static long
toLong(byte[] bytes, int offset)

static long
toLong(byte[] bytes, int offset, int length)

它們的輸出都是byte數(shù)組,偏移量、長度,后兩個可以缺省。它們的使用方式取決于您擁有的byte數(shù)組。如果您是使用Bytes.toBytes()方法得到的,那么您可以安全的使用第一個API,整個bytes數(shù)據(jù)存放著待轉(zhuǎn)化的值。

HBase內(nèi)部,將數(shù)據(jù)存放在一個大的字節(jié)數(shù)組中,使用如下的方法:

static int
putLong(byte[] bytes, int offset, long val)

這個方法允許您將一個Long對象寫入到一個給定的字節(jié)數(shù)組中的指定位置。如果您想從一個大數(shù)組中取出數(shù)據(jù),可以使用toLong方法。

Bytes類支持的Java類型包括:String、Boolean、short、int、long、doublefloat。除了這些,表3-10中還列出了一些有用的方法。

3-10 Bytes提供的一些方法

Result

Description

toStringBinary()

While working very similar to
toString(), this variant has an extra safeguard to convert nonprintable data
into their human-readable hexadecimal numbers. Whenever you are not sure what
a byte array contains you should use this method to print its content, for
example, to the console, or into a logfile.

compareTo()/equals()

These methods allow you to
compare two byte[], that is, byte arrays. The former gives you a comparison
result and the latter a boolean value, indicating whether the given arrays
are equal to each other.

add()/head()/tail()

You can use these to add two
byte arrays to each other, resulting in a new, concatenated array, or to get
the first, or last, few bytes of the given byte array.

binarySearch()

This performs a binary search
in the given array of values. It operates on byte arrays for the values and
the key you are searching for.

incrementBytes()

This increments a long value
in its byte array representation, as if you had used toBytes(long) to create
it. You can decrement using a negative amount parameter.

Bytes提供的一些方法與Java提供的ByteBuffer有一些復(fù)疊。區(qū)別是前者在處理的過程中不會生成新的實例,因此,它采用了一些優(yōu)化。對于HBase中,這種類型與字節(jié)之間的轉(zhuǎn)化操作被頻繁使用,因此,通過這種優(yōu)化,可以避免非常耗時的GC操作。


    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約