小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

探討kafka的分區(qū)數(shù)與多線程消費(fèi)

 陳永正的圖書館 2016-11-18

若要轉(zhuǎn)載,請標(biāo)明出處,謝謝!

=========================================================

首先,容我吐一口老血。。。。。。叫喊

kafka算是很麻煩的一件事兒,起因是最近需要采集大量的數(shù)據(jù),原先是只用了典型的high-level Consumer的API,最經(jīng)典的不過如下:

 

 Properties props = new Properties();    
        props.put("zookeeper.connect", "xxxx:2181");    
        props.put("zookeeper.connectiontimeout.ms", "1000000");    
        props.put("group.id", "test_group");  
        props.put("zookeeper.session.timeout.ms", "40000");  
        props.put("zookeeper.sync.time.ms", "200");  
        props.put("auto.commit.interval.ms", "1000");  
    
        ConsumerConfig consumerConfig = new ConsumerConfig(props);    
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);    
    
     
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();    
        topicCountMap.put("test", new Integer(1));    
        //key--topic    
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =   
                consumerConnector.createMessageStreams(topicCountMap);  
        KafkaStream<byte[], byte[]> stream = consumerMap.get("test").get(0);  
        ConsumerIterator<byte[], byte[]> it = stream.iterator();  
        StringBuffer sb = new StringBuffer();  
        while(it.hasNext()) {  
            try {  
                String msg = new String(it.next().message(), "utf-8").trim();  
                System.out.println("receive:" + msg);  
            } catch (UnsupportedEncodingException e) {  
                e.printStackTrace();  
            }  
        }  

 這是典型的kafka消費(fèi)端消費(fèi)數(shù)據(jù)的代碼,但可以看出這是十分典型的單線程消費(fèi)。在本地玩玩熟悉kafka還行,(就跟入門java學(xué)會(huì)寫main方法打印hello world一樣~~~~),問題是學(xué)的東西必須真正應(yīng)用到實(shí)際中,你不可能只在單線程采集里原地打轉(zhuǎn)吧。。so,多線程采集迫在眉急?。。?/p>

 

 

本人研究卡夫卡多線程消費(fèi)還是耗了一段時(shí)間的,希望把過程盡可能完整地記錄下來,以便各位同行有需要可以參考。。

 

首先,最好理解kafka的基本原理和一些基本概念,閱讀官網(wǎng)文檔很有必要,這樣才會(huì)有一個(gè)比較清晰的概念,而不是跟無頭蒼蠅一樣亂撞——出了錯(cuò)去網(wǎng)上查是灰常痛苦滴??!

http://kafka./documentation.html

 

好了,大概說下卡夫卡的“分區(qū)·”的概念吧:



 這張圖比較清晰地描述了“分區(qū)”的概念,對(duì)于某一個(gè)topic的消息來說,我們可以把這組消息發(fā)送給若干個(gè)分區(qū),就相當(dāng)于一組消息分發(fā)一樣。

分區(qū)、Offset、消費(fèi)線程、group.id的關(guān)系

1)一組(類)消息通常由某個(gè)topic來歸類,我們可以把這組消息“分發(fā)”給若干個(gè)分區(qū)(partition),每個(gè)分區(qū)的消息各不相同;

2)每個(gè)分區(qū)都維護(hù)著他自己的偏移量(Offset),記錄著該分區(qū)的消息此時(shí)被消費(fèi)的位置;

3)一個(gè)消費(fèi)線程可以對(duì)應(yīng)若干個(gè)分區(qū),但一個(gè)分區(qū)只能被具體某一個(gè)消費(fèi)線程消費(fèi);

4)group.id用于標(biāo)記某一個(gè)消費(fèi)組,每一個(gè)消費(fèi)組都會(huì)被記錄他在某一個(gè)分區(qū)的Offset,即不同consumer group針對(duì)同一個(gè)分區(qū),都有“各自”的偏移量。

 

說完概念,必須要注意的一點(diǎn)是,必須確認(rèn)卡夫卡的server.properties里面的一個(gè)屬性num.partitions必須被設(shè)置成大于1的值,否則消費(fèi)端再怎么折騰,也用不了多線程哦。我這里的環(huán)境下,該屬性值被設(shè)置成10了。

 

重構(gòu)一下上述經(jīng)典的消費(fèi)端代碼:

 

public class KafakConsumer implements Runnable {
	
	private ConsumerConfig consumerConfig;
	private static String topic="blog";
	Properties props;
	final int a_numThreads = 6;
	
	public KafakConsumer() {
		props = new Properties();    
        props.put("zookeeper.connect", "xxx:2181,yyy:2181,zzz:2181");  
//		props.put("zookeeper.connect", "localhost:2181");    
//        props.put("zookeeper.connectiontimeout.ms", "30000");    
        props.put("group.id", "blog");  
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000"); 
        props.put("auto.offset.reset", "smallest");
        consumerConfig = new ConsumerConfig(props);
	}
	

	@Override
	public void run() {
		
	    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
	    topicCountMap.put(topic, new Integer(a_numThreads));
	    ConsumerConfig consumerConfig = new ConsumerConfig(props);
		ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
	    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
	    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
	    ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);
	    for (final KafkaStream stream : streams) {
	        executor.submit(new KafkaConsumerThread(stream));
	    }

	}
	

	public static void main(String[] args) {
		System.out.println(topic);
		Thread t = new Thread(new KafakConsumer());
		t.start();
	}
}

 從這段重構(gòu)的代碼可以看出,KafkaStream<byte[], byte[]> stream = consumerMap.get("test").get(0); 這行代碼已被廢棄,得到List<KafkaStream<byte[], byte[]>>之后不再是得到他的頭元素get(0)就ok了,而且topicCountMap.put(topic, new Integer(a_numThreads));的第二個(gè)參數(shù)也不再是1.

 

 其中,具體消費(fèi)線程KafkaConsumerThread代碼為:

public class KafkaConsumerThread implements Runnable {
	
	private KafkaStream<byte[], byte[]> stream;
	
	public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {
		this.stream = stream;
	}

	@Override
	public void run() {
		ConsumerIterator<byte[], byte[]> it = stream.iterator();
		while (it.hasNext()) {
			MessageAndMetadata<byte[], byte[]> mam = it.next();
			System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "]," 
			+ "offset[" + mam.offset() + "], " + new String(mam.message()));

		}
	}

}

 

 

編寫生產(chǎn)端(Producer)的代碼:

 

public class KafkaProducer implements Runnable {
	
	private Producer<String, String> producer = null;
	
	private ProducerConfig config = null;
	
	
	public KafkaProducer() {
		Properties props = new Properties();
		props.put("zookeeper.connect", "*****:2181,****:2181,****:2181");
		
//		props.put("zookeeper.connect", "localhost:2181");
		
		// 指定序列化處理類,默認(rèn)為kafka.serializer.DefaultEncoder,即byte[]
		props.put("serializer.class", "kafka.serializer.StringEncoder");

		// 同步還是異步,默認(rèn)2表同步,1表異步。異步可以提高發(fā)送吞吐量,但是也可能導(dǎo)致丟失未發(fā)送過去的消息
		props.put("producer.type", "sync");

		// 是否壓縮,默認(rèn)0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮后消息中會(huì)有頭來指明消息壓縮類型,故在消費(fèi)者端消息解壓是透明的無需指定。
		props.put("compression.codec", "1");

		// 指定kafka節(jié)點(diǎn)列表,用于獲取metadata(元數(shù)據(jù)),不必全部指定
		props.put("broker.list", "****:6667,***:6667,****:6667");
		
		config = new ProducerConfig(props);
	}

	@Override
	public void run() {
		producer = new Producer<String, String>(config);
//		for(int i=0; i<10; i++) {
//			String sLine = "I'm number " + i;
//			KeyedMessage<String, String> msg = new KeyedMessage<String, String>("group1", sLine);
//			producer.send(msg);
//		}
		for(int i = 1; i <= 6; i++){ //往6個(gè)分區(qū)發(fā)數(shù)據(jù) 
		      List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>();  
		      for(int j = 0; j < 6; j++){ //每個(gè)分區(qū)6條訊息 
		    	  messageList.add(new KeyedMessage<String, String>
		    	  //String topic, String partition, String message
		    	  	("blog", "partition[" + i + "]", "message[The " + i + " message]"));
		      }  
		      producer.send(messageList);  
		    }  
 	
	}
	
	public static void main(String[] args) {
		Thread t = new Thread(new KafkaProducer());
		t.start();
	}
}

 上述生產(chǎn)端代碼相對(duì)傳統(tǒng)的發(fā)送端代碼也做了改進(jìn),首先是用了批量發(fā)送(源碼):

 

 

public void send(List<KeyedMessage<K, V>> messages)
  {
    underlying().send(JavaConversions..MODULE$.asScalaBuffer(messages).toSeq());
  }

 而不是:

 

 

public void send(KeyedMessage<K, V> message)
  {
    underlying().send(Predef..MODULE$.wrapRefArray((Object[])new KeyedMessage[] { message }));
  }

 第二,KeyedMessage用的構(gòu)造函數(shù):

 

 

public KeyedMessage(String topic, K key, V message) { this(topic, key, key, message); }

第二個(gè)參數(shù)表示分區(qū)的key。 

 

而非:

 

public KeyedMessage(String topic, V message) {
    this(topic, null, null, message);
  }

 

 

分別run一下生產(chǎn)和消費(fèi)的代碼,可以看到消費(fèi)端打印結(jié)果:

pool-2-thread-5: partition[5],offset[0], message[The 5 message]

pool-2-thread-1: partition[2],offset[0], message[The 2 message]

pool-2-thread-2: partition[1],offset[0], message[The 1 message]

pool-2-thread-5: partition[4],offset[0], message[The 4 message]

pool-2-thread-1: partition[3],offset[0], message[The 3 message]

pool-2-thread-4: partition[6],offset[0], message[The 6 message]

 

可以看到,6個(gè)分區(qū)的數(shù)據(jù)全部被消費(fèi)了,但是不妨看下消費(fèi)線程:pool-2-thread-1線程同時(shí)消費(fèi)了partition[2]和partition[3]的數(shù)據(jù);pool-2-thread-2消費(fèi)了partiton[1]的數(shù)據(jù);pool-2-thread-4消費(fèi)了partiton[6]的數(shù)據(jù);而pool-2-thread-5則消費(fèi)了partitoin[4]和partition[5]的數(shù)據(jù)。

 

從上述消費(fèi)情況來看,驗(yàn)證了消費(fèi)線程和分區(qū)的對(duì)應(yīng)情況——即:一個(gè)分區(qū)只能被一個(gè)線程消費(fèi),但一個(gè)消費(fèi)線程可以消費(fèi)多個(gè)分區(qū)的數(shù)據(jù)!雖然我指定了線程池的線程數(shù)為6,但并不是所有的線程都去消費(fèi)了,這當(dāng)然跟線程池的調(diào)度有關(guān)系了。并不是一個(gè)消費(fèi)線程對(duì)應(yīng)地去消費(fèi)一個(gè)分區(qū)的數(shù)據(jù)。

 

我們不妨仔細(xì)看下消費(fèi)端啟動(dòng)日志部分,這對(duì)我們理解kafka的啟動(dòng)生成和消費(fèi)的原理有益:

【限于篇幅,啟動(dòng)日志略,只分析關(guān)鍵部分】

 

消費(fèi)端的啟動(dòng)日志表明:

1)Consumer happy_Connor-PC-1445916157267-b9cce79d rebalancing the following partitions: ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) for topic blog with consumers: List(happy_Connor-PC-1445916157267-b9cce79d-0, happy_Connor-PC-1445916157267-b9cce79d-1, happy_Connor-PC-1445916157267-b9cce79d-2, happy_Connor-PC-1445916157267-b9cce79d-3, happy_Connor-PC-1445916157267-b9cce79d-4, happy_Connor-PC-1445916157267-b9cce79d-5)

happy_Connor-PC-1445916157267-b9cce79d表示一個(gè)消費(fèi)組,該topic可以使用10個(gè)分區(qū):the following partitions: ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) for topic blog。然后定義了6個(gè)消費(fèi)線程,List(happy_Connor-PC-1445916157267-b9cce79d-0, happy_Connor-PC-1445916157267-b9cce79d-1, happy_Connor-PC-1445916157267-b9cce79d-2, happy_Connor-PC-1445916157267-b9cce79d-3, happy_Connor-PC-1445916157267-b9cce79d-4, happy_Connor-PC-1445916157267-b9cce79d-5)。消費(fèi)線程的個(gè)數(shù)由topicCountMap.put(String topic, Integer count)的第二個(gè)參數(shù)決定。但真正去消費(fèi)的線程還是由線程池的調(diào)度機(jī)制來決定;

2)線程由zookeeper來聲明它擁有1個(gè)或多個(gè)分區(qū);

3)真正有數(shù)據(jù)存在的分區(qū)是由生產(chǎn)發(fā)送端來決定,即使你的kafka設(shè)置了10個(gè)分區(qū),消費(fèi)端在消費(fèi)的時(shí)候,消費(fèi)線程雖然會(huì)根據(jù)zookeeper的某種機(jī)制來聲明它所消費(fèi)的分區(qū),但實(shí)際消費(fèi)過程中,還是會(huì)消費(fèi)真正存在數(shù)據(jù)的分區(qū)。(本例中,你只往6個(gè)分區(qū)push了數(shù)據(jù),所以即使你聲明了10個(gè)分區(qū),你也只能消費(fèi)6個(gè)分區(qū)的數(shù)據(jù))。

 

如果把topicCountMap的第二個(gè)參數(shù)Integer值改成1,發(fā)送端改成往7個(gè)分區(qū)發(fā)數(shù)據(jù)再測試,可得到消費(fèi)端的打印結(jié)果:

pool-2-thread-1: partition[6],offset[0], message[The 6 message]
pool-2-thread-1: partition[3],offset[0], message[The 3 message]
pool-2-thread-1: partition[2],offset[0], message[The 2 message]
pool-2-thread-1: partition[5],offset[0], message[The 5 message]
pool-2-thread-1: partition[4],offset[0], message[The 4 message]
pool-2-thread-1: partition[7],offset[0], message[The 7 message]
pool-2-thread-1: partition[1],offset[0], message[The 1 message]

 可以看出,如果你topicCountMap的值改成1,而 List<KafkaStream<byte[], byte[]>>的size由Integer值決定,此時(shí)為1,可以看出,線程池中只能使用一個(gè)線程來發(fā)送,還是單線程的效果。若要用多線程消費(fèi),Integer的值必須大于1.

 

下面再來模擬一些狀況:

狀況一:往大于實(shí)際分區(qū)數(shù)的分區(qū)發(fā)數(shù)據(jù),比如發(fā)送端的第一層循環(huán)設(shè)為11:

可看到消費(fèi)端此時(shí)雖能正常的完全消費(fèi)這10個(gè)分區(qū)的數(shù)據(jù),但生產(chǎn)端會(huì)報(bào)異常:

No partition metadata for topic blog4 due to kafka.common.LeaderNotAvailableException}] for topic [blog4]: class kafka.common.LeaderNotAvailableException 

這說明,你往partition11發(fā)送失敗,因?yàn)榭ǚ蚩ㄒ呀?jīng)設(shè)置了10個(gè)分區(qū),你再往不存在的分區(qū)數(shù)發(fā)當(dāng)然會(huì)報(bào)錯(cuò)了。

 

狀況二:發(fā)送端用傳統(tǒng)的發(fā)送方法,即KeyedMessage的構(gòu)造函數(shù)只有topic和Message

 

//針對(duì)topic創(chuàng)建一個(gè)分區(qū)并發(fā)送數(shù)據(jù)
		List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>();
		for(int i = 1; i <= 10; i++){
			messageList.add(new KeyedMessage<String, String>("blog6",  "我是發(fā)送的內(nèi)容message"+i));
		} 
		producer.send(messageList); 

 

消費(fèi)端打印結(jié)果:

pool-2-thread-1: partition[7],offset[0], 我是發(fā)送的內(nèi)容message1

pool-2-thread-1: partition[7],offset[1], 我是發(fā)送的內(nèi)容message2

pool-2-thread-1: partition[7],offset[2], 我是發(fā)送的內(nèi)容message3

pool-2-thread-1: partition[7],offset[3], 我是發(fā)送的內(nèi)容message4

pool-2-thread-1: partition[7],offset[4], 我是發(fā)送的內(nèi)容message5

pool-2-thread-1: partition[7],offset[5], 我是發(fā)送的內(nèi)容message6

pool-2-thread-1: partition[7],offset[6], 我是發(fā)送的內(nèi)容message7

pool-2-thread-1: partition[7],offset[7], 我是發(fā)送的內(nèi)容message8

pool-2-thread-1: partition[7],offset[8], 我是發(fā)送的內(nèi)容message9

pool-2-thread-1: partition[7],offset[9], 我是發(fā)送的內(nèi)容message10

 

這表明,只用了1個(gè)消費(fèi)線程消費(fèi)1個(gè)分區(qū)的數(shù)據(jù)。這說明,如果發(fā)送端發(fā)送數(shù)據(jù)沒有指定分區(qū),即用的是

public KeyedMessage(String topic,V message) { this(topic, key, key, message); }
sendMessage(KeyedMessage(String topic,V message))

的話,同樣達(dá)不到多線程消費(fèi)的效果!

 

狀況三:將線程池的大小設(shè)置成比topicCountMap的value值?。?/p>

  topicCountMap.put(topic, new Integer(7));
  //......................
  ExecutorService executor = Executors.newFixedThreadPool(5);

 發(fā)送端往9個(gè)分區(qū)發(fā)送數(shù)據(jù),run一下,會(huì)發(fā)現(xiàn)消費(fèi)端打印結(jié)果:

pool-2-thread-3: partition[7],offset[0], message[The 7 message]

pool-2-thread-5: partition[1],offset[0], message[The 1 message]

pool-2-thread-4: partition[4],offset[0], message[The 4 message]

pool-2-thread-2: partition[3],offset[0], message[The 3 message]

pool-2-thread-4: partition[5],offset[0], message[The 5 message]

pool-2-thread-1: partition[8],offset[0], message[The 8 message]

pool-2-thread-2: partition[2],offset[0], message[The 2 message]

你會(huì)發(fā)現(xiàn):雖然我生產(chǎn)發(fā)送端往9個(gè)分區(qū)發(fā)送了數(shù)據(jù),但實(shí)際上只消費(fèi)掉了7個(gè)分區(qū)的數(shù)據(jù)。(如果你再跑一邊,可能又是6個(gè)分區(qū)的數(shù)據(jù))——這說明,有的分區(qū)的數(shù)據(jù)沒有被消費(fèi),原因只可能是線程不夠。so,當(dāng)線程池中的大小小于分區(qū)數(shù)時(shí),會(huì)出現(xiàn)有的分區(qū)沒有被采集的情況。建議設(shè)置:實(shí)際發(fā)送分區(qū)數(shù)(一般就等于設(shè)置的分區(qū)數(shù))= topicCountMap的value = 線程池大小  否則極易出現(xiàn)reblance的異常?。?!

 

好了,折騰這么久。我們可以看出,卡夫卡如果想要多線程消費(fèi)提高效率的話,就可以從分區(qū)數(shù)上下手,分區(qū)數(shù)就是用來做并行消費(fèi)的而且生產(chǎn)端的發(fā)送代碼也很有講究。(這只是針對(duì)某一個(gè)topic而言,當(dāng)然實(shí)際情況中,你可以一個(gè)topic一個(gè)線程,同樣達(dá)到多線程效果,當(dāng)然這是后話了)

 

========================The end=====================

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請遵守用戶 評(píng)論公約

    類似文章 更多