小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

Canal安裝以及使用代碼

 BIGDATA云 2019-03-22

一、Canal介紹

  Canal的原理就是它自己偽裝成slave, 向mysql發(fā)送dump協(xié)議,MySQL master接收到dump請(qǐng)求之后推送binlog文件給slave, 也就是canal。  

二、Canal安裝

  1. 下載Canal 

   wget https://github.com/alibaba/canal/releases/download/canal-1.0.24/canal.deployer-1.0.24.tar.gz

  2. 解壓到/opt/softwares/canal目錄, 解壓完之后如下圖所示:

  3. 配置instance

  4. 修改canal.properties

三、Mysql 安裝

  1、mysql 安裝

    yum install mysql

    yum install mysql-server 

  2、啟動(dòng)mysql

    /etc/init.d/mysqld start 或者sevice mysqld start

  3、設(shè)置root用戶密碼

    mysqladmin -u root password '123456' 

  4、登錄msyql

    mysql -uroot -p123456

  5、檢查并開(kāi)啟binlog復(fù)制功能及binlog模式是否為ROW模式

    參考: binlog詳解

四、Canal抽取binlog

  Canal只是偽裝成slave抽取binlog,Canal拿到binlog之后還需要交給業(yè)務(wù)方去做響應(yīng)的處理,那么怎么去交給業(yè)務(wù)方呢?一般都是Canal獲取到binlog之后寫(xiě)到kafka里,業(yè)務(wù)方訂閱kafka topic消費(fèi)binlog,完成業(yè)務(wù)邏輯處理。

  但是Canal不能直接寫(xiě)Kafka, 所以還需要有個(gè)client連接Canal,Canal獲取binlog之后交給Client, Client在往Kafka里寫(xiě)binlog消息,Client代碼如下:

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry;

import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;

import java.util.List;

public class CanalClientExample {

    public static void main(String[] args) {

        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.41.254", 11111), "example", "canal", "canal");

        try {

            int batchSize = 1000;

            connector.connect();

            connector.subscribe("zhengxinv6\\..*");

            connector.rollback();

            while (true) {

                // 獲取指定數(shù)量的數(shù)據(jù)

                Message message = connector.getWithoutAck(batchSize);

                long batchId = message.getId();

                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {

                    try {

                        Thread.sleep(1000);

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                    continue;

                }

                System.out.println("batchId = [" + batchId + "]");

                printEntry(message.getEntries());

                connector.ack(batchId); //提交確認(rèn)

                //connector.rollback(batchId);

            }

        } finally {

            connector.disconnect();

        }

    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {

        for (CanalEntry.Entry entry : entrys) {

            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN

                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {

                continue;

            }

            CanalEntry.RowChange rowChange = null;

            try {

                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

            } catch (Exception e) {

                throw new RuntimeException(

                        "ERROR ## parser of eromanga-event has an error,data:"

                                + entry.toString(), e);

            }

            CanalEntry.EventType eventType = rowChange.getEventType();

            System.out.println(String.format("================> binlog[%s:%s] ,name[%s,%s] , eventType : %s",

                    entry.getHeader().getLogfileName(),

                    entry.getHeader().getLogfileOffset()+"",

                    entry.getHeader().getSchemaName(),

                    entry.getHeader().getTableName(),

                    eventType));

            for (CanalEntry.RowData rowData: rowChange.getRowDatasList()) {

                if (eventType == CanalEntry.EventType.DELETE) {

                    printColumn(rowData.getBeforeColumnsList());

                } else if (eventType == CanalEntry.EventType.INSERT) {

                    printColumn(rowData.getAfterColumnsList());

                } else {

                    System.out.println("-------> before");

                    printColumn(rowData.getBeforeColumnsList());

                    System.out.println("-------> after");

                    printColumn(rowData.getAfterColumnsList());

                }

            }

        }

    }

    private static void printColumn(List<CanalEntry.Column> columns) {

        for (CanalEntry.Column column : columns) {

            System.out.println(column.getName() + " : " + column.getValue()

                    + "    update=" + column.getUpdated());

        }

    }

}

五、Canal使用過(guò)程出現(xiàn)的問(wèn)題及解決方法

  參考:canal報(bào)錯(cuò)解決方法

參考:https://www.jianshu.com/p/6299048fad66

    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多