|
來源:https://www.cnblogs.com/guanzhyan/p/8974143.html 為了演示集群的效果,這里準備一臺虛擬機(window 7),在虛擬機中搭建了單IP多節(jié)點的zookeeper集群(多IP節(jié)點的也是同理的),并且在本機(win 7)和虛擬機中都安裝了kafka。 前期準備說明:1.三臺zookeeper服務器,本機安裝一個作為server1,虛擬機安裝兩個(單IP) 2.三臺kafka服務器,本機安裝一個作為server1,虛擬機安裝兩個。 備注:當然你可以直接在虛擬機上安裝三個服務器分別為server1、server2、server3 。
1.將虛擬機的網(wǎng)絡模式調(diào)整為橋接模式,將虛擬機的防火墻功能關閉; 2.將主機的防火墻也關閉。 3.互相ping,測試是否能正常連接.。
從Kafka官網(wǎng)http://kafka./downloads下載Kafka安裝包。(要下載Binary downloads這個類型,不要下載源文件,方便使用)
從zookeeper官網(wǎng)http://zookeeper./releases.html下載zookeeper安裝包。
從官網(wǎng)http://www./download/下載(建議下載Oracle官方發(fā)布的Java),,配置環(huán)境變量。 zookeeper集群環(huán)境搭建本機配置修改本機sever1的zoo.cfg文件配置:#存儲內(nèi)存中數(shù)據(jù)庫快照的位置,如果不設置參數(shù),更新事務日志將被存儲到默認位置。 #dataDir=/tmp/zookeeper dataDir=D:/bigData/zookeeper-3.4.10/data #日志文件的位置 dataLogDir=D:/bigData/zookeeper-3.4.10/zlog #監(jiān)聽端口 clientPort=2181 #集群服務器配置 server.1=192.168.1.130:9000:7000 server.2=192.168.1.103:9001:7001 server.3=192.168.1.103:9002:7002 格式: server.A = B:C:D A:是一個數(shù)字,表示第幾號服務器 B:服務器IP地址 C:是一個端口號,用來集群成員的信息交換,表示這個服務器與集群中的leader服務器交換信息的端口 D:是在leader掛掉時專門用來進行選舉leader所用的端口 完整配置文件如下: ![]() # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. #dataDir=/tmp/zookeeper dataDir=D:/bigData/zookeeper-3.4.10/data dataLogDir=D:/bigData/zookeeper-3.4.10/zlog # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper./doc/current/zookeeperAdmin.html#sc_maintenance# # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.1=192.168.1.130:9000:7000 server.2=192.168.1.101:9001:7001 server.3=192.168.1.101:9002:7002 創(chuàng)建serverID在zoo.cfg配置文件中dataDir目錄中新建一個沒有后綴的myid文件,里邊寫1。
虛擬機配置將本機的zookeeper安裝包拷貝到虛擬機上,然后在盤中新建兩個目錄,server2,server3目錄。
修改server2的zoo.cfg配置文件:#存儲內(nèi)存中數(shù)據(jù)庫快照的位置,如果不設置參數(shù),更新事務日志將被存儲到默認位置。 #dataDir=/tmp/zookeeper dataDir=E:/bigData/server2/zookeeper-3.4.10/data #日志文件的位置 dataLogDir=E:/bigData/server2/zookeeper-3.4.10/zlog #監(jiān)聽端口 clientPort=2182 #集群服務器配置 server.1=192.168.1.130:9000:7000 server.2=192.168.1.103:9001:7001 server.3=192.168.1.103:9002:7002 創(chuàng)建SeverID在zoo.cfg配置文件中dataDir目錄中新建一個沒有后綴的myid文件,里邊寫2。 同理配置server3中的zookeeper,這樣zookeeper集群的配置工作已經(jīng)完成。 下面我們啟動zookeeper集群:一個一個啟動每一個zookeeper服務器:
zookeeper集群環(huán)境搭建遇到問題總結:
解決方法: a. 關閉所有服務器防火墻 b. 將本服務器的IP修改成0.0.0.0 例如: Kafka集群環(huán)境搭建:下載kafka后解壓到某個目錄,然后修改server.properties配置。 server.properties修改說明:broker.id=0
· 在kafka這個集群中的唯一標識,且只能是正整數(shù)
port=9091
· 該服務監(jiān)聽的端口
host.name=192.168.1.130
· broker 綁定的主機名稱(IP) 如果不設置將綁定所有的接口。
advertised.host.name=192.168.1.130
· broker服務將通知消費者和生產(chǎn)者 換言之,就是消費者和生產(chǎn)者就是通過這個主機(IP)來進行通信的。如果沒有設置就默認采用host.name。
num.network.threads=2
· broker處理消息的最大線程數(shù),一般情況是CPU的核數(shù)
num.io.threads=8
· broker處理IO的線程數(shù) 一般是num.network.threads的兩倍
socket.send.buffer.bytes=1048576
· socket發(fā)送的緩沖區(qū)。socket調(diào)優(yōu)參數(shù)SO_SNDBUFF
socket.receive.buffer.bytes=1048576
· socket接收的緩沖區(qū) socket的調(diào)優(yōu)參數(shù)SO_RCVBUF
socket.request.max.bytes=104857600
· socket請求的最大數(shù)量,防止serverOOM。
log.dirs=\logs
· kafka數(shù)據(jù)的存放地址,多個地址的話用逗號隔開。多個目錄分布在不同的磁盤上可以提高讀寫性能
num.partitions=2
· 每個tipic的默認分區(qū)個數(shù),在創(chuàng)建topic時可以重新制定
log.retention.hours=168
· 數(shù)據(jù)文件的保留時間 log.retention.minutes也是一個道理。
log.segment.bytes=536870912
· topic中的最大文件的大小 -1表示沒有文件大小限制 log.segment.bytes 和log.retention.minutes 任意一個
達到要求 都會刪除該文件 在創(chuàng)建topic時可以重新制定。若沒有.則選取該默認值
log.retention.check.interval.ms=60000
· 文件大小檢查的周期時間,是否處罰 log.cleanup.policy中設置的策略
log.cleaner.enable=false· 是否開啟日志清理
zookeeper.connect=192.168.1.130:num1,192.168.1.130:num2,192.168.1.130:num3
· 上面我們的Zookeeper集群
zookeeper.connection.timeout.ms=1000000
· 進群鏈接時間超時本機配置修改本機server1的server.properties配置:#在kafka這個集群中的唯一標識,且只能是正整數(shù)
broker.id=0
# kafka集群的地址
broker.list=192.168.0.130:9092,192.168.0.103:9093,192.168.0.103:9094
#listener and port
port=9092
#broker 綁定的主機名稱(IP) 如果不設置將綁定所有的接口。
host.name=192.168.1.130
#kafka數(shù)據(jù)的存放地址,多個地址的話用逗號隔開。多個目錄分布在不同的磁盤上可以提高讀寫性能
# A comma separated list of directories under which to store log files
#log.dirs=/tmp/kafka-logs
log.dirs=D:/bigData/kafka_2.11-1.1.0/kafka-logs
#每個tipic的默認分區(qū)個數(shù),在創(chuàng)建topic時可以重新制定
num.partitions=3
# root directory for all kafka znodes.
zookeeper.connect=192.168.1.130:2181,192.168.1.103:2182,192.168.1.103:2183虛擬機配置將本機的kafka安裝包拷貝到虛擬機上,存放在server2,server3目錄。
注意:若本機拷貝到虛擬機中kafka之前有使用過,生成過topics,直接拷貝搭建集群,啟動kafka時會報如下錯誤:
原因:在單點環(huán)境下創(chuàng)建的topic中,kafka服務器的broker.id為0,在搭建集群時修改了borker.id,先前創(chuàng)建的topic就無法找到對應的broker.id。 解決辦法:刪除原來的topic信息。 修改server2目錄中kafka配置:#在kafka這個集群中的唯一標識,且只能是正整數(shù)
broker.id=1
# kafka集群的地址
broker.list=192.168.0.130:9092,192.168.0.103:9093,192.168.0.103:9094
#listener and port
port=9093
#broker 綁定的主機名稱(IP) 如果不設置將綁定所有的接口。
host.name=192.168.1.103
#kafka數(shù)據(jù)的存放地址,多個地址的話用逗號隔開。多個目錄分布在不同的磁盤上可以提高讀寫性能
# A comma separated list of directories under which to store log files
#log.dirs=/tmp/kafka-logs
log.dirs=D:/bigData/server2/kafka_2.11-1.1.0/kafka-logs
#每個tipic的默認分區(qū)個數(shù),在創(chuàng)建topic時可以重新制定
num.partitions=3
# root directory for all kafka znodes.
zookeeper.connect=192.168.1.130:2181,192.168.1.103:2182,192.168.1.103:2183同理修改server3中的kafka配置,配置好之后,kafka集成環(huán)境已經(jīng)配置完成,下面我們來驗證下集成環(huán)境是否OK。 驗證集群環(huán)境依次先啟動zookeeper服務器進入zookeeper的bin目錄,執(zhí)行zkServer.cmd
依次啟動kafka服務器進入kafka安裝目錄D:\bigData\kafka_2.11-1.1.0,按下shift+鼠標右鍵,選擇"在此處打開命令窗口",打開命令行,在命令行中輸入:.\bin\windows\kafka-server-start.bat .\config\server.properties回車。正常啟動界面如圖:
在主機kafka服務器創(chuàng)建topic主要參數(shù)說明:partitions分區(qū)數(shù)(1). partitions :分區(qū)數(shù),控制topic將分片成多少個log。可以顯示指定,如果不指定則會使用broker(server.properties)中的num.partitions配置的數(shù)量 replication-factor副本1. replication factor 控制消息保存在幾個broker(服務器)上,一般情況下等于broker的個數(shù)。 創(chuàng)建Topic1. 創(chuàng)建主題,命名為"test20180430",replicationfactor=3(因為這里有三個kafka服務器在運行)??筛鶕?jù)集群中kafka服務器個數(shù)來修改replicationfactor的數(shù)量,以便提高系統(tǒng)容錯性等。
查看Topickafka-topics.bat --zookeeper 192.168.1.130:2181 --describe --topic test20180430
結果說明: 創(chuàng)建Producer消息提供者1. 在D:\bigData\kafka_2.11-1.1.0\bin\windows目錄下打開新的命令行,輸入命令:kafka-console-producer.bat --broker-list 192.168.1.130:9092,192.168.1.101:9093,192.168.1.101:9094 --topic test20180430 回車。(該窗口不要關閉)
創(chuàng)建Customer消息消費者1. 在D:\bigData\kafka_2.11-1.1.0\bin\windows目錄下打開新的命令行,輸入命令:kafka-console-consumer.bat --zookeeper 192.168.1.130:2181 --topic test20180430回車。
以同樣的方式,將虛擬機上server2和server3的消息者customer創(chuàng)建,然后通過主機上的producer進行消息發(fā)送操作。
至此,集群環(huán)境已經(jīng)驗證OK。 集群容錯性1. 首先查看topic的信息
可以看到此時選舉的leader是0,即就是虛擬機中的kafka服務器,現(xiàn)在把虛擬機的kafka服務器給干掉。此時leader為變?yōu)?,消費者能繼續(xù)消費。
注意:zk的部署個數(shù)最好為基數(shù),ZK集群的機制是只要超過半數(shù)的節(jié)點OK,集群就能正常提供服務。只有ZK節(jié)點掛得太多,只剩一半或不到一半節(jié)點能工作,集群才失效。 |
|
|
來自: jackeyqing > 《消息隊列》