小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

RocketMQ 消息失敗重試 解析——圖解、源碼級(jí)解析

 小王曾是少年 2022-10-25 發(fā)布于江蘇

🍊 Java學(xué)習(xí):Java從入門(mén)到精通總結(jié)

🍊 深入淺出RocketMQ設(shè)計(jì)思想:深入淺出RocketMQ設(shè)計(jì)思想

🍊 絕對(duì)不一樣的職場(chǎng)干貨:大廠(chǎng)最佳實(shí)踐經(jīng)驗(yàn)指南


📆 最近更新:2022年10月24日

🍊 個(gè)人簡(jiǎn)介:通信工程本碩💪、Java程序員🌕。做過(guò)科研paper,發(fā)過(guò)專(zhuān)利,優(yōu)秀的程序員不應(yīng)該只是CRUD

🍊 點(diǎn)贊 👍 收藏 ?留言 📝 都是我最大的動(dòng)力!


文章目錄

異常消息處理

在這里插入圖片描述

使用Consumer時(shí)會(huì)注冊(cè)MessageListener,消費(fèi)消息的接口會(huì)返回處理狀態(tài):

  • ConsumeConcurrentlyStatus.CONSUME_SUCCESS:消費(fèi)成功
  • ConsumeConcurrentlyStatus.REConsume_LATER:等待一段時(shí)間后再消費(fèi)

MessageListenerCnsumeMessageConcurrentlyService中被調(diào)用的,上面兩個(gè)狀態(tài)會(huì)映射到CMResult定義的枚舉值:

  • CMResult.CR_SUCCESS:消費(fèi)成功
  • CMResult.CR_LATER:等待一段時(shí)間后再消費(fèi)
  • CMResult.CR_ROLLBACK:事務(wù)回滾
  • CMResult.CR_COMMIT:事務(wù)提交
  • CMResult.CR_THROW_EXCEPTION:消費(fèi)異常
  • CMResult.CR_RETURN_NULL:消費(fèi)結(jié)果為null

針對(duì)CMResult.CR_LATER狀態(tài)的處理策略為:將該消息發(fā)揮Broker,繼續(xù)等待后續(xù)消息。發(fā)送回的消息會(huì)設(shè)置重試的Topic,命名規(guī)則為:“%RETRY%” + Consumer組名,消息原本的Topic會(huì)暫存到消息體中,并且會(huì)額外設(shè)置delayLevelreconsumeTimes

消息消費(fèi)的結(jié)果會(huì)在CnsumeMessageConcurrentlyService.processConsumeResult中處理

    public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();
		
		// 消息為空,直接返回
        if (consumeRequest.getMsgs().isEmpty())
            return;

		// 計(jì)算從consumerequest.msg[0]到consumerequest.msgs[ackIndex]的消息消費(fèi)成功的數(shù)量
        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
               	// 統(tǒng)計(jì)成功/失敗數(shù)量
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                // 統(tǒng)計(jì)失敗數(shù)量
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }
		
		// 處理消息失敗的消息
        switch (this.defaultMQPushConsumer.getMessageModel()) {
        	// 如果是廣播模式,無(wú)論是否消費(fèi)失敗,都不回發(fā)消息給Broker,只打印Log
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            // 發(fā)回失敗消息到Broker
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    // 回發(fā)給Broker的的具體方法
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                    	// 重復(fù)消費(fèi)次數(shù) + 1
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }
				
				// 如果回發(fā)給Broker也失敗的話(huà),則提交延遲消費(fèi)請(qǐng)求(稍后在客戶(hù)端重新消費(fèi))
                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);

                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }
		
		// 移除消費(fèi)成功消息,并返回消費(fèi)的最新進(jìn)度
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        // 更新最新消費(fèi)進(jìn)度,進(jìn)度更新只能增長(zhǎng) 
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

Consumer消費(fèi)的時(shí)候可以設(shè)置consumeMessageBatchMaxSize來(lái)控制傳入MessageListener的消息數(shù)量。RocketMQ認(rèn)為只要有一條消息消費(fèi)失敗,這一批消息都會(huì)發(fā)回給Broker,所以設(shè)置consumeMessageBatchMaxSize這個(gè)值時(shí)應(yīng)當(dāng)注意避免出現(xiàn)消息重復(fù)消費(fèi)的問(wèn)題。

Broker處理流程

Broker端對(duì)應(yīng)的處理邏輯在SendMessageProcessor.consumerSendMsgBack里,對(duì)于Consumer發(fā)送失敗返的消息,Broker會(huì)將其放入重試Topic

/**
     * 消費(fèi)者將消息發(fā)回給Broker,可以指定多久后重新消費(fèi)該消息
     *
     * @param ctx
     * @param request
     * @return
     * @throws RemotingCommandException
     */
    private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
                                                                        RemotingCommand request) throws RemotingCommandException {
        // 初始化響應(yīng)
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final ConsumerSendMsgBackRequestHeader requestHeader =
                (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
        String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
        if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
            ConsumeMessageContext context = buildConsumeMessageContext(namespace, requestHeader, request);
            this.executeConsumeMessageHookAfter(context);
        }
        SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
        if (null == subscriptionGroupConfig) {
            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
            response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
                    + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
            return CompletableFuture.completedFuture(response);
        }

        // 檢查Broker是否有寫(xiě)入權(quán)限
        if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
            return CompletableFuture.completedFuture(response);
        }

        // 檢查重試隊(duì)列個(gè)數(shù)是否大于0
        if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return CompletableFuture.completedFuture(response);
        }

        // 計(jì)算retry Topic = "%RETRY% + consumeGroup"
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());

        // 計(jì)算隊(duì)列編號(hào)
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
        }

        // 獲取topicConfig,如果獲取不到,則在response里進(jìn)行相應(yīng)設(shè)置
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                newTopic,
                subscriptionGroupConfig.getRetryQueueNums(),
                PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
        if (null == topicConfig) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("topic[" + newTopic + "] not exist");
            return CompletableFuture.completedFuture(response);
        }

        // 不允許寫(xiě)入
        if (!PermName.isWriteable(topicConfig.getPerm())) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
            return CompletableFuture.completedFuture(response);
        }

        // 根據(jù)消息的commitLog Offset查詢(xún)實(shí)際的MessageExt(消費(fèi)失敗的實(shí)際消息)
        MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
        if (null == msgExt) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("look message by offset failed, " + requestHeader.getOffset());
            return CompletableFuture.completedFuture(response);
        }

        // 設(shè)置 PROPERTY_RETRY_TOPIC = 原始消息的Topic,msgInner通過(guò)setProperties()方法將原始消息的Properties拷貝過(guò)去
        final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
        if (null == retryTopic) {
            MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
        }
        // 設(shè)置消息不等待存儲(chǔ)完成
        msgExt.setWaitStoreMsgOK(false);

        int delayLevel = requestHeader.getDelayLevel();

        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
        // 3.4.9版本之后可以支持自定義消息的最大消費(fèi)次數(shù),若未指定,默認(rèn)為16
        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
            Integer times = requestHeader.getMaxReconsumeTimes();
            if (times != null) {
                maxReconsumeTimes = times;
            }
        }

        // 如果超過(guò)最大消費(fèi)次數(shù)或delayLevel < 0,則加入私信隊(duì)列
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
                || delayLevel < 0) {
            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                    DLQ_NUMS_PER_GROUP,
                    PermName.PERM_WRITE, 0);
            if (null == topicConfig) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("topic[" + newTopic + "] not exist");
                return CompletableFuture.completedFuture(response);
            }
        } else {
            if (0 == delayLevel) {
                // 設(shè)置延遲級(jí)別為重試消費(fèi)次數(shù) + 3
                delayLevel = 3 + msgExt.getReconsumeTimes();
            }
            msgExt.setDelayTimeLevel(delayLevel);
        }

        // 創(chuàng)建MessageExtBrokerInner,除了Topic、QueueId不同外,其他的都是拷貝原始消息的數(shù)據(jù)
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(newTopic);
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(msgExt.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        
        // 發(fā)送消息
        CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        return putMessageResult.thenApply((r) -> {
            if (r != null) {
                switch (r.getPutMessageStatus()) {
                    case PUT_OK:
                        String backTopic = msgExt.getTopic();
                        String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                        if (correctTopic != null) {
                            backTopic = correctTopic;
                        }
                        this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
                        response.setCode(ResponseCode.SUCCESS);
                        response.setRemark(null);
                        return response;
                    default:
                        break;
                }
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(r.getPutMessageStatus().name());
                return response;
            }
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("putMessageResult is null");
            return response;
        });
    }

重試消息的重新投遞邏輯與延遲消息一致,等待DelayLevel對(duì)應(yīng)的延時(shí)之后,Broker會(huì)嘗試重新進(jìn)行消息投遞。

關(guān)于延遲級(jí)別的的配置在MessageStoreConfig.messageDelay里,默認(rèn)配置如下:

this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

可以主動(dòng)調(diào)整每一個(gè)延遲級(jí)別對(duì)應(yīng)的時(shí)間,但仍然有一些缺陷:

  1. 時(shí)間精度不夠細(xì),最小粒度是1s
  2. 延遲級(jí)別的個(gè)數(shù)是固定的,無(wú)法調(diào)整

死信隊(duì)列

RocketMQ里的消息不能無(wú)限次重復(fù)消費(fèi),當(dāng)重試次數(shù)超過(guò)所有延遲級(jí)別的個(gè)數(shù)之后,消息就會(huì)進(jìn)入到死信隊(duì)列里,死信的Topic命名規(guī)則為:"%DLQ% " + Consumer組名

// 如果超過(guò)最大消費(fèi)次數(shù)或delayLevel < 0,則加入私信隊(duì)列
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
                || delayLevel < 0) {
            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                    DLQ_NUMS_PER_GROUP,
                    PermName.PERM_WRITE, 0);

進(jìn)入到死信隊(duì)列后的消息就不會(huì)再被投遞了,可以通過(guò)接口來(lái)查詢(xún)當(dāng)前RocketMQ中的死信隊(duì)列的消息。

    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶(hù) 評(píng)論公約

    類(lèi)似文章 更多