🍊 Java學(xué)習(xí):Java從入門(mén)到精通總結(jié)
🍊 深入淺出RocketMQ設(shè)計(jì)思想:深入淺出RocketMQ設(shè)計(jì)思想
🍊 絕對(duì)不一樣的職場(chǎng)干貨:大廠(chǎng)最佳實(shí)踐經(jīng)驗(yàn)指南
📆 最近更新:2022年10月24日
🍊 個(gè)人簡(jiǎn)介:通信工程本碩💪、Java程序員🌕。做過(guò)科研paper,發(fā)過(guò)專(zhuān)利,優(yōu)秀的程序員不應(yīng)該只是CRUD
🍊 點(diǎn)贊 👍 收藏 ?留言 📝 都是我最大的動(dòng)力!
異常消息處理

使用Consumer時(shí)會(huì)注冊(cè)MessageListener,消費(fèi)消息的接口會(huì)返回處理狀態(tài):
ConsumeConcurrentlyStatus.CONSUME_SUCCESS:消費(fèi)成功ConsumeConcurrentlyStatus.REConsume_LATER:等待一段時(shí)間后再消費(fèi)
MessageListener在CnsumeMessageConcurrentlyService中被調(diào)用的,上面兩個(gè)狀態(tài)會(huì)映射到CMResult定義的枚舉值:
CMResult.CR_SUCCESS:消費(fèi)成功CMResult.CR_LATER:等待一段時(shí)間后再消費(fèi)CMResult.CR_ROLLBACK:事務(wù)回滾CMResult.CR_COMMIT:事務(wù)提交CMResult.CR_THROW_EXCEPTION:消費(fèi)異常CMResult.CR_RETURN_NULL:消費(fèi)結(jié)果為null
針對(duì)CMResult.CR_LATER狀態(tài)的處理策略為:將該消息發(fā)揮Broker,繼續(xù)等待后續(xù)消息。發(fā)送回的消息會(huì)設(shè)置重試的Topic,命名規(guī)則為:“%RETRY%” + Consumer組名,消息原本的Topic會(huì)暫存到消息體中,并且會(huì)額外設(shè)置delayLevel及reconsumeTimes
消息消費(fèi)的結(jié)果會(huì)在CnsumeMessageConcurrentlyService.processConsumeResult中處理
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
// 消息為空,直接返回
if (consumeRequest.getMsgs().isEmpty())
return;
// 計(jì)算從consumerequest.msg[0]到consumerequest.msgs[ackIndex]的消息消費(fèi)成功的數(shù)量
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
// 統(tǒng)計(jì)成功/失敗數(shù)量
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
// 統(tǒng)計(jì)失敗數(shù)量
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
// 處理消息失敗的消息
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 如果是廣播模式,無(wú)論是否消費(fèi)失敗,都不回發(fā)消息給Broker,只打印Log
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
// 發(fā)回失敗消息到Broker
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// 回發(fā)給Broker的的具體方法
boolean result = this.sendMessageBack(msg, context);
if (!result) {
// 重復(fù)消費(fèi)次數(shù) + 1
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
// 如果回發(fā)給Broker也失敗的話(huà),則提交延遲消費(fèi)請(qǐng)求(稍后在客戶(hù)端重新消費(fèi))
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 移除消費(fèi)成功消息,并返回消費(fèi)的最新進(jìn)度
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
// 更新最新消費(fèi)進(jìn)度,進(jìn)度更新只能增長(zhǎng)
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
Consumer消費(fèi)的時(shí)候可以設(shè)置consumeMessageBatchMaxSize來(lái)控制傳入MessageListener的消息數(shù)量。RocketMQ認(rèn)為只要有一條消息消費(fèi)失敗,這一批消息都會(huì)發(fā)回給Broker,所以設(shè)置consumeMessageBatchMaxSize這個(gè)值時(shí)應(yīng)當(dāng)注意避免出現(xiàn)消息重復(fù)消費(fèi)的問(wèn)題。
Broker處理流程
Broker端對(duì)應(yīng)的處理邏輯在SendMessageProcessor.consumerSendMsgBack里,對(duì)于Consumer發(fā)送失敗返的消息,Broker會(huì)將其放入重試Topic中
/**
* 消費(fèi)者將消息發(fā)回給Broker,可以指定多久后重新消費(fèi)該消息
*
* @param ctx
* @param request
* @return
* @throws RemotingCommandException
*/
private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
// 初始化響應(yīng)
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
ConsumeMessageContext context = buildConsumeMessageContext(namespace, requestHeader, request);
this.executeConsumeMessageHookAfter(context);
}
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return CompletableFuture.completedFuture(response);
}
// 檢查Broker是否有寫(xiě)入權(quán)限
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return CompletableFuture.completedFuture(response);
}
// 檢查重試隊(duì)列個(gè)數(shù)是否大于0
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return CompletableFuture.completedFuture(response);
}
// 計(jì)算retry Topic = "%RETRY% + consumeGroup"
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
// 計(jì)算隊(duì)列編號(hào)
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
// 獲取topicConfig,如果獲取不到,則在response里進(jìn)行相應(yīng)設(shè)置
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
// 不允許寫(xiě)入
if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
return CompletableFuture.completedFuture(response);
}
// 根據(jù)消息的commitLog Offset查詢(xún)實(shí)際的MessageExt(消費(fèi)失敗的實(shí)際消息)
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return CompletableFuture.completedFuture(response);
}
// 設(shè)置 PROPERTY_RETRY_TOPIC = 原始消息的Topic,msgInner通過(guò)setProperties()方法將原始消息的Properties拷貝過(guò)去
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
// 設(shè)置消息不等待存儲(chǔ)完成
msgExt.setWaitStoreMsgOK(false);
int delayLevel = requestHeader.getDelayLevel();
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
// 3.4.9版本之后可以支持自定義消息的最大消費(fèi)次數(shù),若未指定,默認(rèn)為16
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
Integer times = requestHeader.getMaxReconsumeTimes();
if (times != null) {
maxReconsumeTimes = times;
}
}
// 如果超過(guò)最大消費(fèi)次數(shù)或delayLevel < 0,則加入私信隊(duì)列
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
} else {
if (0 == delayLevel) {
// 設(shè)置延遲級(jí)別為重試消費(fèi)次數(shù) + 3
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
// 創(chuàng)建MessageExtBrokerInner,除了Topic、QueueId不同外,其他的都是拷貝原始消息的數(shù)據(jù)
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
// 發(fā)送消息
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return putMessageResult.thenApply((r) -> {
if (r != null) {
switch (r.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(r.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
});
}
重試消息的重新投遞邏輯與延遲消息一致,等待DelayLevel對(duì)應(yīng)的延時(shí)之后,Broker會(huì)嘗試重新進(jìn)行消息投遞。
關(guān)于延遲級(jí)別的的配置在MessageStoreConfig.messageDelay里,默認(rèn)配置如下:
this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
可以主動(dòng)調(diào)整每一個(gè)延遲級(jí)別對(duì)應(yīng)的時(shí)間,但仍然有一些缺陷:
- 時(shí)間精度不夠細(xì),最小粒度是1s
- 延遲級(jí)別的個(gè)數(shù)是固定的,無(wú)法調(diào)整
死信隊(duì)列
RocketMQ里的消息不能無(wú)限次重復(fù)消費(fèi),當(dāng)重試次數(shù)超過(guò)所有延遲級(jí)別的個(gè)數(shù)之后,消息就會(huì)進(jìn)入到死信隊(duì)列里,死信的Topic命名規(guī)則為:"%DLQ% " + Consumer組名
// 如果超過(guò)最大消費(fèi)次數(shù)或delayLevel < 0,則加入私信隊(duì)列
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0);
進(jìn)入到死信隊(duì)列后的消息就不會(huì)再被投遞了,可以通過(guò)接口來(lái)查詢(xún)當(dāng)前RocketMQ中的死信隊(duì)列的消息。