文中关键分析RocketMQ中怎么确保消息有序的。
RocketMQ的版本号为:4.2.0 release。
一.时序图
或是规矩,先把分析全过程的时序图摆出:
1.Producer推送顺序消息
2.Consumer接受顺序消息(一)
3.Consumer接收顺序消息(二)
二.源码分析 – Producer推送顺序消息
1 DefaultMQProducer#send:发送消息,入参中有自定的消息序列选择符。
// DefaultMQProducer#send public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, selector, arg); }
1.1 DefaultMQProducerImpl#makeSureStateOK:保证Producer的情况是运作情况-ServiceState.RUNNING。
// DefaultMQProducerImpl#makeSureStateOK private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException(\"The producer service state not OK, \" this.serviceState FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } }
1.2 DefaultMQProducerImpl#tryToFindTopicPublishInfo:依据Topic获得公布Topic使用的路由器信息内容。
// DefaultMQProducerImpl#tryToFindTopicPublishInfo private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);// 为空则从 NameServer升级获得,false,不传到 defaultMQProducer topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {// 拥有路由器信息内容并且情况OK,则回到 return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
1.3 启用自定消息序列选择符的select方式。
// DefaultMQProducerImpl#sendSelectImpl MessageQueue mq = null; try { mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); } catch (Throwable e) { throw new MQClientException(\"select message queue throwed exception.\", e); } // Producer#main SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);
1.4 DefaultMQProducerImpl#sendKernelImpl:推送消息的关键完成方式。
// DefaultMQProducerImpl#sendKernelImpl ...... switch (communicationMode) { case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException(\"sendKernelImpl call timeout\"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; ......
1.4.1 MQClientAPIImpl#sendMessage:推送消息。
// MQClientAPIImpl#sendMessage ...... switch (communicationMode) {// 依据推送消息的方式(同歩/多线程)挑选差异的方法,默认设置是同歩 case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException(\"sendMessage call timeout\"); } return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); ......
1.4.1.1 MQClientAPIImpl#sendMessageSync:推送同歩消息。
// MQClientAPIImpl#sendMessageSync private SendResult sendMessageSync( final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; return this.processSendResponse(brokerName, msg, response); }
1.4.1.1.1 NettyRemotingClient#invokeSync:结构RemotingCommand,启用的形式是同歩。
// NettyRemotingClient#invokeSync RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); if (this.rpcHook != null) { this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); } return response;
三.源码分析 – Consumer接受次序信息(一)
1 DefaultMQPushConsumer#registerMessageListener:把Consumer传到的信息窃听器添加到messageListener中。
// DefaultMQPushConsumer#registerMessageListener public void registerMessageListener(MessageListenerOrderly messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); }
1.1 DefaultMQPushConsumerImpl#registerMessageListener:把Consumer传到的信息窃听器添加到messageListenerInner中。
// DefaultMQPushConsumerImpl#registerMessageListener public void registerMessageListener(MessageListener messageListener) { this.messageListenerInner = messageListener; }
2 DefaultMQPushConsumer#start:运行Consumer。
// DefaultMQPushConsumer#start public void start() throws MQClientException { this.defaultMQPushConsumerImpl.start(); }
2.1 DefaultMQPushConsumerImpl#start:启动ConsumerImpl。
// DefaultMQPushConsumerImpl#start switch (this.serviceState) { case CREATE_JUST:// 刚建立 ...... if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {// 井然有序信息服务项目 this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {// 高并发混乱信息服务项目 this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } ...... this.consumeMessageService.start();// 运行信息服务项目 ...... mQClientFactory.start();// 运行MQClientInstance ......
2.1.1 new
ConsumeMessageOrderlyService():结构次序信息服务项目。
// ConsumeMessageOrderlyService#ConsumeMessageOrderlyService public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor(// 主信息交易线程池,正常的实行接到的ConsumeRequest。线程同步 this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl(\"ConsumeMessageThread_\")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(\"ConsumeMessageScheduledThread_\")); }
2.1.2
ConsumeMessageOrderlyService#start:运行线程池手机客户端案例。
// DefaultMQPushConsumerImpl#start this.consumeMessageService.start(); // ConsumeMessageOrderlyService#start public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically();// 按时向broker推送大批量锁定现阶段已经交易的序列结合的信息 } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }
2.1.2.1
ConsumeMessageOrderlyService#lockMQPeriodically:按时向broker推送大批量锁定现阶段已经交易的序列结合的信息。
2.1.2.1.1 RebalanceImpl#lockAll:锁定全部已经信息的序列。
// ConsumeMessageOrderlyService#lockMQPeriodically if (!this.stopped) { this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); } // RebalanceImpl#lockAll HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();// 依据brokerName从processQueueTable获得已经交易的序列结合 ...... Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);// 向Broker推送锁定线程池的命令 for (MessageQueue mq : lockOKMQSet) { ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { if (!processQueue.isLocked()) { log.info(\"the message queue locked OK, Group: {} {}\", this.consumerGroup, mq); } processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } ......
2.1.3 MQClientInstance#start:运行MQClientInstance。全过程较繁杂,放进大文章标题四中分析。
// DefaultMQPushConsumerImpl#start mQClientFactory.start();
四.源码分析 – Consumer接受次序信息(二)
1 MQClientInstance#start:运行手机客户端案例MQClientInstance。
// MQClientInstance#start synchronized (this) { switch (this.serviceState) { case CREATE_JUST: ...... // Start pull service 运行获取信息服务项目 this.pullMessageService.start(); // Start rebalance service 启动消费端web服务服务项目 this.rebalanceService.start(); ......
1.1 PullMessageService#run:启动拉取消息服务项目。具体调用的是DefaultMQPushConsumerImpl的pullMessage方式。
// PullMessageService#run public void run() { log.info(this.getServiceName() \" service started\"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error(\"Pull Message Service Run Method exception\", e); } } log.info(this.getServiceName() \" service end\"); } // PullMessageService#pullMessage private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest);// 调用DefaultMQPushConsumerImpl的pullMessage } else { log.warn(\"No matched consumer for the PullRequest {}, drop it\", pullRequest); } }
1.1.1.1 DefaultMQPushConsumerImpl#pullMessage:拉取消息。递交到
ConsumeMessageOrderlyService的线程池consumeExecutor中实行。
// DefaultMQPushConsumerImpl#pullMessage ...... PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; ...... DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); ......
1.1.1.1.1.1.1 ConsumeRequest#run:解决消息消费的进程。
// ConsumeMessageOrderlyService.ConsumeRequest#run List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); ...... long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn(\"consumeMessage, the message queue not be able to consume, because it\'s dropped. {}\",this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);// 具体消费消息的地区,调整消息窃听器的consumeMessage方式 } catch (Throwable e) { log.warn(\"consumeMessage exception: {} Group: {} Msgs: {} MQ: {}\", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs,messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } ......
1.2 RebalanceService#run:启动消息端web服务服务项目。
// RebalanceService#run public void run() { log.info(this.getServiceName() \" service started\"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() \" service end\"); } // MQClientInstance#doRebalance public void doRebalance() { for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error(\"doRebalance exception\", e); } } } } // DefaultMQPushConsumerImpl#doRebalance public void doRebalance() { if (!this.pause) { this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); } }
1.2.1.1.1 RebalanceImpl#doRebalance:web服务服务项目类解决。
// RebalanceImpl#doRebalance public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn(\"rebalanceByTopic Exception\", e); } } } } this.truncateMessageQueueNotMyTopic(); } // RebalanceImpl#rebalanceByTopic switch (messageModel) { case BROADCASTING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);// 依据Toipc除去queue if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info(\"messageQueueChanged {} {} {} {}\", consumerGroup, topic, mqSet, mqSet); } } else { ...... // RebalanceImpl#updateProcessQueueTableInRebalance this.dispatchPullRequest(pullRequestList);// RebalancePushImpl派发消息
1.2.1.1.1.1.1.1 RebalancePushImpl#dispatchPullRequest:RebalancePushImpl分发。
// RebalancePushImpl#dispatchPullRequest public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info(\"doRebalance, {}, add a new pull request {}\", consumerGroup, pullRequest); } }
五.汇总
对比Producer的推送步骤,Consumer的接受步骤略微繁杂一点。根据以上的源代码剖析,可以了解RocketMQ是如何确保信息的井然有序的:
1.根据ReblanceImp的lockAll方式,每过一段时间按时锁定现阶段交易摆正在交易的序列。设定当地序列ProcessQueue的locked特性为true。确保broker中的每一个线程池只相应一个交易端;
2.此外,交易端也是根据锁,确保每一个ProcessQueue只有一个进程交易。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。