小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

使用Kettle實(shí)現(xiàn)數(shù)據(jù)實(shí)時(shí)增量同步

 愛(ài)吃魚(yú)的俊懶貓 2019-06-25

2018-09-28: 示例job已上傳至github,地址見(jiàn)文末

0. 前言

本文介紹了使用Kettle對(duì)一張業(yè)務(wù)表數(shù)據(jù)(500萬(wàn)條數(shù)據(jù)以上)進(jìn)行實(shí)時(shí)(10秒)同步,采用了時(shí)間戳增量回滾同步的方法。關(guān)于ETL和Kettle的入門(mén)知識(shí)大家可以閱讀相關(guān)的blog和文檔學(xué)習(xí)。

1. 時(shí)間戳增量回滾同步

假定在源數(shù)據(jù)表中有一個(gè)字段會(huì)記錄數(shù)據(jù)的新增或修改時(shí)間,可以通過(guò)它對(duì)數(shù)據(jù)在時(shí)間維度上進(jìn)行排序。通過(guò)中間表記錄每次更新的時(shí)間戳,在下一個(gè)同步周期時(shí),通過(guò)這個(gè)時(shí)間戳同步該時(shí)間戳以后的增量數(shù)據(jù)。這是時(shí)間戳增量同步。

但是時(shí)間戳增量同步不能對(duì)源數(shù)據(jù)庫(kù)中歷史數(shù)據(jù)的刪除操作進(jìn)行同步,我們可以通過(guò)在每次同步時(shí),把時(shí)間戳往前回滾一段時(shí)間,從而同步一定時(shí)間段內(nèi)的刪除操作。這就是時(shí)間戳增量回滾同步,這個(gè)名字是我自己給取得,意會(huì)即可,就是在時(shí)間戳增量同步的同時(shí)回滾一定的時(shí)間段。

說(shuō)明

  • 源數(shù)據(jù)表 需要被同步的數(shù)據(jù)表

  • 目標(biāo)數(shù)據(jù)表 同步至的數(shù)據(jù)表

  • 中間表 存儲(chǔ)時(shí)間戳的表

2. 前期準(zhǔn)備

在兩個(gè)數(shù)據(jù)庫(kù)中分別創(chuàng)建數(shù)據(jù)表,并通過(guò)腳本在源數(shù)據(jù)表中插入500萬(wàn)條數(shù)據(jù),完成后再以每秒一條的速度插入新數(shù)據(jù),模擬生產(chǎn)環(huán)境。

源數(shù)據(jù)表結(jié)構(gòu)如下:

CREATE TABLE `im_message` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `sender` varchar(45) COLLATE utf8_bin NOT NULL COMMENT '消息發(fā)送者:SYSTEM',
  `send_time` datetime(6) NOT NULL,
  `receiver` varchar(45) COLLATE utf8_bin NOT NULL COMMENT '消息接受者',
  `content` varchar(255) COLLATE utf8_bin NOT NULL COMMENT '消息內(nèi)容',
  `is_read` tinyint(4) NOT NULL COMMENT '消息是否被讀?。?-未讀;非0-已讀',
  `read_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='消息表'

3. 作業(yè)流程

  1. 開(kāi)始組件

  2. 建時(shí)間戳中間表

  3. 獲取中間表的時(shí)間戳,并設(shè)置為全局變量

  4. 刪除目標(biāo)表中時(shí)間戳及時(shí)間戳以后的數(shù)據(jù)

  5. 抽取兩個(gè)數(shù)據(jù)表的時(shí)間戳及時(shí)間戳以后的數(shù)據(jù)進(jìn)行比對(duì),并根據(jù)比對(duì)結(jié)果進(jìn)行刪除、新增或修改操作

  6. 更新時(shí)間戳

4. 創(chuàng)建作業(yè)

作業(yè)的最終截圖如下:
作業(yè)截圖

4.1 創(chuàng)建作業(yè)和DB連接

打開(kāi)Spoon工具,新建作業(yè),然后在左側(cè)主對(duì)象樹(shù)DB連接中新建DB連接。創(chuàng)建連接并測(cè)試通過(guò)后可以在左側(cè)DB連接下右鍵共享出來(lái)。因?yàn)樵趩蝹€(gè)作業(yè)或者轉(zhuǎn)換中新建的DB連接都是局域數(shù)據(jù)源,在其他轉(zhuǎn)換和作業(yè)中是不能使用的,即使屬于同一個(gè)作業(yè)下的不同轉(zhuǎn)換,所以需要把他們共享,這樣DB連接就會(huì)成為全局?jǐn)?shù)據(jù)源,不用多次編輯。

4.2 建時(shí)間戳中間表

這一步是為了在目標(biāo)數(shù)據(jù)庫(kù)建中間表etl_temp,并插入初始的時(shí)間戳字段。因?yàn)樵撟鳂I(yè)在生產(chǎn)環(huán)境是循環(huán)調(diào)用的,該步驟在每一個(gè)同步周期中都會(huì)調(diào)用,所以在建表時(shí)需要判斷該表是否已經(jīng)存在,如果不存在才建表。

SQL代碼和組件配置截圖如下:

CREATE TABLE IF NOT EXISTS etl_temp(id int primary key,time_stamp timestamp);
INSERT IGNORE INTO etl_temp (id,time_stamp) VALUES (1,'2018-05-22 00:00:00');

中間配置截圖

我把該作業(yè)時(shí)間戳的ID設(shè)為1,在接下來(lái)的步驟中也是通過(guò)這個(gè)ID查詢(xún)我們想要的時(shí)間戳

4.2 獲取時(shí)間戳并設(shè)為變量

新建一個(gè)轉(zhuǎn)換,在轉(zhuǎn)換中使用表輸入和設(shè)置變量?jī)蓚€(gè)組件

表輸入

SQL代碼和組件配置截圖如下

Kettle中設(shè)置的變量都是字符串類(lèi)型,為了便于比較。我在SQL語(yǔ)句把查出的時(shí)間戳進(jìn)行了格式轉(zhuǎn)換

select date_format(time_stamp , '%Y-%m-%d %H:%i:%s') time_stamp from etl_temp where id='1'

這里寫(xiě)圖片描述

設(shè)置變量

變量活動(dòng)類(lèi)型可以為該變量設(shè)置四種有效活動(dòng)范圍,分別是JVM、該Job、父Job和祖父Job

這里寫(xiě)圖片描述

4.3 刪除目標(biāo)表中時(shí)間戳及時(shí)間戳以后的數(shù)據(jù)

這樣做有兩個(gè)好處:

  1. 避免在同步中重復(fù)或者遺漏數(shù)據(jù)。例如當(dāng)時(shí)間戳在源數(shù)據(jù)表中不是唯一的,上一次同步周期最后一條數(shù)據(jù)的時(shí)間戳是2018-05-25 18:12:12,那么上一次同步周期結(jié)束后中間表中的時(shí)間戳就會(huì)更新為2018-05-25 18:12:12。如果在下一個(gè)同步周期時(shí)源數(shù)據(jù)表中仍然有時(shí)間戳為2018-05-25 18:12:12的新數(shù)據(jù),那么同步就會(huì)出現(xiàn)數(shù)據(jù)不一致。采用大于時(shí)間戳的方式同步就會(huì)遺漏數(shù)據(jù),采用等于時(shí)間戳的方式同步就會(huì)重復(fù)同步數(shù)據(jù)。

  2. 增加健壯性 當(dāng)作業(yè)異常結(jié)束后,不用做任何多余的操作就可以重啟。因?yàn)闀?huì)刪除目標(biāo)表中時(shí)間戳及時(shí)間戳以后的數(shù)據(jù),所以不用擔(dān)心數(shù)據(jù)一致性問(wèn)題

2018-09-29:對(duì)增加健壯性進(jìn)行補(bǔ)充:在一次同步周期中腳本異常中斷,這時(shí)候中間表的時(shí)間戳沒(méi)有更新,但是目標(biāo)表已經(jīng)同步了部分?jǐn)?shù)據(jù),當(dāng)再次啟動(dòng)腳本就會(huì)出現(xiàn)數(shù)據(jù)重復(fù)的情況,而且在很多時(shí)候因?yàn)橹麈I的存在,腳本啟動(dòng)會(huì)報(bào)錯(cuò)

在組件中使用了上一步驟設(shè)置的變量,所以必須勾選使用變量替換

delete from test_kettle.im_message where send_time>='${TIME_STAMP}'

這里寫(xiě)圖片描述

4.4 抽取、比對(duì)和更新數(shù)據(jù)

這一步才是真正的數(shù)據(jù)同步步驟,完成了數(shù)據(jù)的抽取、比對(duì),并根據(jù)不同的比對(duì)結(jié)果刪除、更新、插入或不做任何操作。
正如前文所說(shuō),為了同步刪除操作,在原始表輸入和目標(biāo)表輸入步驟中回滾了一定時(shí)間段。其中回滾的時(shí)間段設(shè)置為了全局的參數(shù)。左右空白處右鍵即可設(shè)置參數(shù),該作業(yè)下的所有作業(yè)和轉(zhuǎn)換都能使用,設(shè)置如下圖

這里寫(xiě)圖片描述

轉(zhuǎn)換截圖如下

這里寫(xiě)圖片描述

原始表輸入

SELECT
  id
, sender
, send_time
, receiver
, content
, is_read
, read_time
FROM ueqcsd.im_message
where send_time>= date_sub(str_to_date('${TIME_STAMP}','%Y-%m-%d %H:%i:%s'), interval ${ROLL_BACK_DAYS} day);

這里寫(xiě)圖片描述

目標(biāo)表輸入

SELECT
  id
, sender
, send_time
, receiver
, content
, is_read
, read_time
FROM test_kettle.im_message
where send_time>= date_sub(str_to_date('${TIME_STAMP}','%Y-%m-%d %H:%i:%s'), interval ${ROLL_BACK_DAYS} day);

這里寫(xiě)圖片描述

注意兩個(gè)組件的數(shù)據(jù)庫(kù)鏈接是不同的,當(dāng)然它們也就這個(gè)和名字不同

比對(duì)記錄

對(duì)兩個(gè)表輸入查出的數(shù)據(jù)進(jìn)行比對(duì),并把比對(duì)的結(jié)果寫(xiě)進(jìn)輸入流,傳遞給后面的組件。
比對(duì)的結(jié)果有三種:

  • new

  • changed

  • deleted

標(biāo)注字段表示比對(duì)結(jié)果的字段名,后面有用。關(guān)鍵字段表示比對(duì)的字段,在這個(gè)作業(yè)中我們比較兩個(gè)的主鍵ID。

這里寫(xiě)圖片描述

Switch

該步驟對(duì)上一步驟產(chǎn)生的標(biāo)注字段進(jìn)行路由,不同的結(jié)果路由到不同的步驟。其中目標(biāo)步驟表示下一步驟的名字。

這里寫(xiě)圖片描述

插入

Kettle有一個(gè)插入/更新組件,但是據(jù)網(wǎng)友介紹這個(gè)組件性能低下,每秒最多只能同步幾百條數(shù)據(jù),所有我對(duì)插入和更新分別作了不同的處理。插入使用表輸出組件;更新使用更新組件。
為了進(jìn)一步提升同步效率,我在表輸出組件使用了多線程(右鍵>改變開(kāi)始復(fù)制的數(shù)量),使同步速度達(dá)到每秒12000條。Switch組件和表輸出組件中間的虛擬組件(空操作)也是為了使用多線程添加的。

這里寫(xiě)圖片描述
這里寫(xiě)圖片描述

勾選批量插入,可以極大提高同步速度

更新和刪除

這里寫(xiě)圖片描述

這里寫(xiě)圖片描述

4.5 更新時(shí)間戳

set @new_etl_start_time_stamp = (SELECT SEND_TIME FROM test_kettle.im_message ORDER BY SEND_TIME DESC LIMIT 1);
update etl_temp set time_stamp=@new_etl_start_time_stamp where id='1';

這里寫(xiě)圖片描述

4.6 發(fā)送郵箱

關(guān)于發(fā)送郵件組件網(wǎng)上有很多資料,就不多做介紹。特別強(qiáng)調(diào)一點(diǎn),郵箱密碼是 單獨(dú)的授權(quán)碼,而不是郵箱登錄密碼。

運(yùn)行

在開(kāi)發(fā)環(huán)境點(diǎn)擊Spoon界面左上角三角符號(hào)運(yùn)行作業(yè)即可。

在第一次運(yùn)行時(shí),為了提高同步效率,可以先不創(chuàng)建目標(biāo)表的索引。在第一此同步完成后,再創(chuàng)建索引。然后在START組件中編輯調(diào)度邏輯,再次啟動(dòng)。

如下圖所示

這里寫(xiě)圖片描述

運(yùn)行日志如下圖

這里寫(xiě)圖片描述

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶(hù)發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買(mǎi)等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶(hù) 評(píng)論公約

    類(lèi)似文章 更多