|
alpakka-kafka-consumer的功能描述很簡(jiǎn)單:向kafka訂閱某些topic然后把讀到的消息傳給akka-streams做業(yè)務(wù)處理。在kafka-consumer的實(shí)現(xiàn)細(xì)節(jié)上,為了達(dá)到高可用、高吞吐的目的,topic又可用劃分出多個(gè)分區(qū)partition。分區(qū)是分布在kafka集群節(jié)點(diǎn)broker上的。由于一個(gè)topic可能有多個(gè)partition,對(duì)應(yīng)topic就會(huì)有多個(gè)consumer,形成一個(gè)consumer組,共用統(tǒng)一的groupid。一個(gè)partition只能對(duì)應(yīng)一個(gè)consumer、而一個(gè)consumer負(fù)責(zé)從多個(gè)partition甚至多個(gè)topic讀取消息。kafka會(huì)根據(jù)實(shí)際情況將某個(gè)partition分配給某個(gè)consumer,即partition-assignment。所以一般來(lái)說(shuō)我們會(huì)把topic訂閱與consumer-group掛鉤。這個(gè)可以在典型的ConsumerSettings證實(shí): val system = ActorSystem("kafka-sys")
val config = system.settings.config.getConfig("akka.kafka.consumer")
val bootstrapServers = "localhost:9092"
val consumerSettings =
ConsumerSettings(config, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
我們先用一個(gè)簡(jiǎn)單的consumer plainSource試試把前一篇示范中producer寫(xiě)入kafka的消息讀出來(lái): import akka.actor.ActorSystem
import akka.kafka._
import akka.kafka.scaladsl._
import akka.stream.{RestartSettings, SystemMaterializer}
import akka.stream.scaladsl.{Keep, RestartSource, Sink}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.concurrent._
import scala.concurrent.duration._
object plain_source extends App {
val system = ActorSystem("kafka-sys")
val config = system.settings.config.getConfig("akka.kafka.consumer")
implicit val mat = SystemMaterializer(system).materializer
implicit val ec: ExecutionContext = mat.executionContext
val bootstrapServers = "localhost:9092"
val consumerSettings =
ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val subscription = Subscriptions.topics("greatings")
Consumer.plainSource(consumerSettings, subscription)
.runWith(Sink.foreach(msg => println(msg.value())))
scala.io.StdIn.readLine()
system.terminate()
}
以上我們沒(méi)有對(duì)讀出的消息做任何的業(yè)務(wù)處理,直接顯示出來(lái)。注意每次都會(huì)從頭完整讀出,因?yàn)樵O(shè)置了 .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),也就是kafka-consumer的auto.offset.reset = "earliest" 。那么如果需要用讀出的數(shù)據(jù)進(jìn)行業(yè)務(wù)處理的話,每次開(kāi)始運(yùn)行應(yīng)用時(shí)都會(huì)重復(fù)從頭執(zhí)行這些業(yè)務(wù)。所以需要某種機(jī)制來(lái)標(biāo)注已經(jīng)讀取的消息,也就是需要記住當(dāng)前讀取位置offset。 Consumer.plainSource輸入ConsumerRecord類(lèi)型: public ConsumerRecord(String topic,
int partition,
long offset,
K key,
V value) {
this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
}
這個(gè)ConsumerRecord類(lèi)型里包括了offset,用戶(hù)可以自行commit這個(gè)位置參數(shù),也就是說(shuō)用戶(hù)可以選擇把這個(gè)offset存儲(chǔ)在kafka或者其它的數(shù)據(jù)庫(kù)里。說(shuō)到commit-offset,offset管理機(jī)制在kafka-consumer業(yè)務(wù)應(yīng)用中應(yīng)該屬于關(guān)鍵技術(shù)。kafka-consumer方面的業(yè)務(wù)流程可以簡(jiǎn)述為:從kafka讀出業(yè)務(wù)指令,執(zhí)行指令并更新業(yè)務(wù)狀態(tài),然后再?gòu)膋afka里讀出下一批指令。為了實(shí)現(xiàn)業(yè)務(wù)狀態(tài)的準(zhǔn)確性,無(wú)論錯(cuò)過(guò)一些指令或者重復(fù)執(zhí)行一些指令都是不能容忍的。所以,必須準(zhǔn)確的標(biāo)記每次從kafka讀取數(shù)據(jù)后的指針位置,commit-offset。但是,如果讀出數(shù)據(jù)后即刻commit-offset,那么在執(zhí)行業(yè)務(wù)指令時(shí)如果系統(tǒng)發(fā)生異常,那么下次再?gòu)臉?biāo)注的位置開(kāi)始讀取數(shù)據(jù)時(shí)就會(huì)越過(guò)一批業(yè)務(wù)指令。這種情況稱(chēng)為at-most-once,即可能會(huì)執(zhí)行一次,但絕不會(huì)重復(fù)。另一方面:如果在成功改變業(yè)務(wù)狀態(tài)后再commit-offset,那么,一旦執(zhí)行業(yè)務(wù)指令時(shí)發(fā)生異常而無(wú)法進(jìn)行commit-offset,下次讀取的位置將使用前一次的標(biāo)注位置,就會(huì)出現(xiàn)重復(fù)改變業(yè)務(wù)狀態(tài)的情況,這種情況稱(chēng)為at-least-once,即一定會(huì)執(zhí)行業(yè)務(wù)指令,但可能出現(xiàn)重復(fù)更新情況。如果涉及資金、庫(kù)存等業(yè)務(wù),兩者皆不可接受,只能采用exactly-once保證一次這種模式了。不過(guò)也有很多業(yè)務(wù)要求沒(méi)那么嚴(yán)格,比如某個(gè)網(wǎng)站統(tǒng)計(jì)點(diǎn)擊量,只需個(gè)約莫數(shù),無(wú)論at-least-once,at-most-once都可以接受。 kafka-consumer-offset是一個(gè)Long類(lèi)型的值,可以存放在kafka內(nèi)部或者外部的數(shù)據(jù)庫(kù)里。如果選擇在kafka內(nèi)部存儲(chǔ)offset, kafka配置里可以設(shè)定按時(shí)間間隔自動(dòng)進(jìn)行位置標(biāo)注,自動(dòng)把當(dāng)前offset存入kafka里。當(dāng)我們?cè)谏厦胬拥腃onsumerSettings里設(shè)置自動(dòng)commit后,多次重新運(yùn)行就不會(huì)出現(xiàn)重復(fù)數(shù)據(jù)的情況了: val consumerSettings =
ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("group1")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") //自動(dòng)commit
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") //自動(dòng)commit間隔
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
alpakka-kafka提供了Committer類(lèi)型,是akka-streams的Sink或Flow組件,負(fù)責(zé)把offset寫(xiě)入kafka。如果用Committer的Sink或Flow就可以按用戶(hù)的需要控制commit-offset的發(fā)生時(shí)間。如下面這段示范代碼:
val committerSettings = CommitterSettings(system)
val control: DrainingControl[Done] =
Consumer
.committableSource(consumerSettings, Subscriptions.topics("greatings"))
.mapAsync(10) { msg =>
BusinessLogic.runBusiness(msg.record.key, msg.record.value)
.map(_ => msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(DrainingControl.apply)
.run()
control.drainAndShutdown();
scala.io.StdIn.readLine()
system.terminate()
}
object BusinessLogic {
def runBusiness(key: String, value: String): Future[Done] = Future.successful(Done)
}
上面這個(gè)例子里BusinessLogic.runBusiess()模擬一段業(yè)務(wù)處理代碼,也就是說(shuō)完成了業(yè)務(wù)處理之后就用Committer.sink進(jìn)行了commit-offset。這是一種at-least-once模式,因?yàn)閞unBusiness可能會(huì)發(fā)生異常失敗,所以有可能出現(xiàn)重復(fù)運(yùn)算的情況。Consumer.committableSource輸出CommittableMessage: def committableSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[CommittableMessage[K, V], Control] =
Source.fromGraph(new CommittableSource[K, V](settings, subscription))
final case class CommittableMessage[K, V](
record: ConsumerRecord[K, V],
committableOffset: CommittableOffset
)
@DoNotInherit sealed trait CommittableOffset extends Committable {
def partitionOffset: PartitionOffset
}
Committer.sink接受輸入Committable類(lèi)型并將之寫(xiě)入kafka,上游的CommittableOffset 繼承了 Committable。另外,這個(gè)DrainingControl類(lèi)型結(jié)合了Control類(lèi)型和akka-streams終結(jié)信號(hào)可以有效控制整個(gè)consumer-streams安全終結(jié)。 alpakka-kafka還有一個(gè)atMostOnceSource。這個(gè)Source組件每讀一條數(shù)據(jù)就會(huì)立即自動(dòng)commit-offset: def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
committableSource[K, V](settings, subscription).mapAsync(1) { m =>
m.committableOffset.commitInternal().map(_ => m.record)(ExecutionContexts.sameThreadExecutionContext)
}
可以看出來(lái),atMostOnceSource在輸出ConsumerRecord之前就進(jìn)行了commit-offset。atMostOnceSource的一個(gè)具體使用示范如下: import scala.collection.immutable
val control: DrainingControl[immutable.Seq[Done]] =
Consumer
.atMostOnceSource(consumerSettings, Subscriptions.topics("greatings"))
.mapAsync(1)(record => BussinessLogic.runBusiness(record.key, record.value()))
.toMat(Sink.seq)(DrainingControl.apply)
.run()
control.drainAndShutdown();
scala.io.StdIn.readLine()
system.terminate()
所以,使用atMostOnceSource后是不需要任何Committer來(lái)進(jìn)行commit-offset的了。值得注意的是atMostOnceSource是對(duì)每一條數(shù)據(jù)進(jìn)行位置標(biāo)注的,所以運(yùn)行效率必然會(huì)受到影響,如果要求不是那么嚴(yán)格的話還是啟動(dòng)自動(dòng)commit比較合適。 對(duì)于任何類(lèi)型的交易業(yè)務(wù)系統(tǒng)來(lái)說(shuō),無(wú)論at-least-once或at-most-once都是不可接受的,只有exactly-once才妥當(dāng)。實(shí)現(xiàn)exactly-once的其中一個(gè)方法是把offset與業(yè)務(wù)數(shù)據(jù)存放在同一個(gè)外部數(shù)據(jù)庫(kù)中。如果在外部數(shù)據(jù)庫(kù)通過(guò)事務(wù)處理機(jī)制(transaction-processing)把業(yè)務(wù)狀態(tài)更新與commit-offset放在一個(gè)事務(wù)單元中同進(jìn)同退就能實(shí)現(xiàn)exactly-once模式了。下面這段是官方文檔給出的一個(gè)示范: val db = new mongoldb
val control = db.loadOffset().map { fromOffset =>
Consumer
.plainSource(
consumerSettings,
Subscriptions.assignmentWithOffset(
new TopicPartition(topic, /* partition = */ 0) -> fromOffset
)
)
.mapAsync(1)(db.businessLogicAndStoreOffset)
.toMat(Sink.seq)(DrainingControl.apply)
.run()
}
class mongoldb {
def businessLogicAndStoreOffset(record: ConsumerRecord[String, String]): Future[Done] = // ...
def loadOffset(): Future[Long] = // ...
}
在上面這段代碼里:db.loadOffset()從mongodb里取出上一次讀取位置,返回Future[Long],然后用Subscriptions.assignmentWithOffset把這個(gè)offset放在一個(gè)tuple (TopicPartition,Long)里。TopicPartition定義如下: public TopicPartition(String topic, int partition) {
this.partition = partition;
this.topic = topic;
}
這樣Consumer.plainSource就可以從offset開(kāi)始讀取數(shù)據(jù)了。plainSource輸出ConsumerRecord類(lèi)型: public ConsumerRecord(String topic,
int partition,
long offset,
K key,
V value) {
this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
}
這里面除業(yè)務(wù)指令value外還提供了當(dāng)前offset。這些已經(jīng)足夠在businessLogicAndStoreOffset()里運(yùn)算一個(gè)單獨(dú)的business+offset事務(wù)了(transaction)。
|
|
|
來(lái)自: 怡紅公子0526 > 《待分類(lèi)》