rocketmq源码解析(rocketmq源码部署)

本文主要分析RocketMQ中如何保证消息有序的。 RocketMQ的版本为:4.2.0release。 一.时序图 还是老规矩,先把分析过程的时序图摆出来: 1.Producer发送顺序消息 2.Consumer接收顺序消息(一) 3.Consumer接收顺序消息(二) 二.源码分析-Producer发送顺序消息 1DefaultMQProducer#send:发送消息,入参中有自定义的消息队列…

文中关键分析RocketMQ中怎么确保消息有序的。

RocketMQ的版本号为:4.2.0 release。

一.时序图

或是规矩,先把分析全过程的时序图摆出:

1.Producer推送顺序消息

RocketMQ源码:有序消息分析

2.Consumer接受顺序消息(一)

RocketMQ源码:有序消息分析

3.Consumer接收顺序消息(二)

RocketMQ源码:有序消息分析

二.源码分析 – Producer推送顺序消息

1 DefaultMQProducer#send:发送消息,入参中有自定的消息序列选择符。

 // DefaultMQProducer#send
 public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
 throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 return this.defaultMQProducerImpl.send(msg, selector, arg);
 }

1.1 DefaultMQProducerImpl#makeSureStateOK:保证Producer的情况是运作情况-ServiceState.RUNNING。

 // DefaultMQProducerImpl#makeSureStateOK
 private void makeSureStateOK() throws MQClientException {
 if (this.serviceState != ServiceState.RUNNING) {
 throw new MQClientException(\"The producer service state not OK, \"  this.serviceState
   FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
 null);
 }
 }

1.2 DefaultMQProducerImpl#tryToFindTopicPublishInfo:依据Topic获得公布Topic使用的路由器信息内容。

 // DefaultMQProducerImpl#tryToFindTopicPublishInfo
 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
 if (null == topicPublishInfo || !topicPublishInfo.ok()) {
 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);// 为空则从 NameServer升级获得,false,不传到 defaultMQProducer
 topicPublishInfo = this.topicPublishInfoTable.get(topic);
 }
 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {// 拥有路由器信息内容并且情况OK,则回到
 return topicPublishInfo;
 } else {
 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
 topicPublishInfo = this.topicPublishInfoTable.get(topic);
 return topicPublishInfo;
 }
 }

1.3 启用自定消息序列选择符的select方式。

 // DefaultMQProducerImpl#sendSelectImpl
 MessageQueue mq = null;
 try {
 mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
 } catch (Throwable e) {
 throw new MQClientException(\"select message queue throwed exception.\", e);
 }
 // Producer#main
 SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
 @Override
 public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
 Integer id = (Integer) arg;
 int index = id % mqs.size();
 return mqs.get(index);
 }
 }, orderId);

1.4 DefaultMQProducerImpl#sendKernelImpl:推送消息的关键完成方式。

 // DefaultMQProducerImpl#sendKernelImpl
 ......
 switch (communicationMode) {
 case SYNC:
 long costTimeSync = System.currentTimeMillis() - beginStartTime;
 if (timeout < costTimeSync) {
 throw new RemotingTooMuchRequestException(\"sendKernelImpl call timeout\");
 }
 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
 brokerAddr,
 mq.getBrokerName(),
 msg,
 requestHeader,
 timeout - costTimeSync,
 communicationMode,
 context,
 this);
 break;
 ......

1.4.1 MQClientAPIImpl#sendMessage:推送消息。

 // MQClientAPIImpl#sendMessage
 ......
 switch (communicationMode) {// 依据推送消息的方式(同歩/多线程)挑选差异的方法,默认设置是同歩
 case SYNC:
 long costTimeSync = System.currentTimeMillis() - beginStartTime;
 if (timeoutMillis < costTimeSync) {
 throw new RemotingTooMuchRequestException(\"sendMessage call timeout\");
 }
 return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
 ......

1.4.1.1 MQClientAPIImpl#sendMessageSync:推送同歩消息。

 // MQClientAPIImpl#sendMessageSync
 private SendResult sendMessageSync(
 final String addr,
 final String brokerName,
 final Message msg,
 final long timeoutMillis,
 final RemotingCommand request
 ) throws RemotingException, MQBrokerException, InterruptedException {
 RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
 assert response != null;
 return this.processSendResponse(brokerName, msg, response);
 }

1.4.1.1.1 NettyRemotingClient#invokeSync:结构RemotingCommand,启用的形式是同歩。

 // NettyRemotingClient#invokeSync 
 RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
 if (this.rpcHook != null) {
 this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
 }
 return response;

三.源码分析 – Consumer接受次序信息(一)

1 DefaultMQPushConsumer#registerMessageListener:把Consumer传到的信息窃听器添加到messageListener中。

 // DefaultMQPushConsumer#registerMessageListener
 public void registerMessageListener(MessageListenerOrderly messageListener) {
 this.messageListener = messageListener;
 this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
 }

1.1 DefaultMQPushConsumerImpl#registerMessageListener:把Consumer传到的信息窃听器添加到messageListenerInner中。

 // DefaultMQPushConsumerImpl#registerMessageListener
 public void registerMessageListener(MessageListener messageListener) {
 this.messageListenerInner = messageListener;
 }

2 DefaultMQPushConsumer#start:运行Consumer。

 // DefaultMQPushConsumer#start
 public void start() throws MQClientException {
 this.defaultMQPushConsumerImpl.start();
 }

2.1 DefaultMQPushConsumerImpl#start:启动ConsumerImpl。

 // DefaultMQPushConsumerImpl#start
 switch (this.serviceState) {
 case CREATE_JUST:// 刚建立
 ......
 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {// 井然有序信息服务项目
 this.consumeOrderly = true;
 this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {// 高并发混乱信息服务项目
 this.consumeOrderly = false;
 this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
 }
 ......
 this.consumeMessageService.start();// 运行信息服务项目
 ......
 mQClientFactory.start();// 运行MQClientInstance
 ......

2.1.1 new
ConsumeMessageOrderlyService():结构次序信息服务项目。

 // ConsumeMessageOrderlyService#ConsumeMessageOrderlyService
 public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
 MessageListenerOrderly messageListener) {
 this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
 this.messageListener = messageListener;
 this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
 this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
 this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
 this.consumeExecutor = new ThreadPoolExecutor(// 主信息交易线程池,正常的实行接到的ConsumeRequest。线程同步
 this.defaultMQPushConsumer.getConsumeThreadMin(),
 this.defaultMQPushConsumer.getConsumeThreadMax(),
 1000 * 60,
 TimeUnit.MILLISECONDS,
 this.consumeRequestQueue,
 new ThreadFactoryImpl(\"ConsumeMessageThread_\"));
 this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(\"ConsumeMessageScheduledThread_\"));
 }

2.1.2
ConsumeMessageOrderlyService#start:运行线程池手机客户端案例。

 // DefaultMQPushConsumerImpl#start
 this.consumeMessageService.start();
 // ConsumeMessageOrderlyService#start
 public void start() {
 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 @Override
 public void run() {
 ConsumeMessageOrderlyService.this.lockMQPeriodically();// 按时向broker推送大批量锁定现阶段已经交易的序列结合的信息
 }
 }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
 }
 }

2.1.2.1
ConsumeMessageOrderlyService#lockMQPeriodically:按时向broker推送大批量锁定现阶段已经交易的序列结合的信息。

2.1.2.1.1 RebalanceImpl#lockAll:锁定全部已经信息的序列。

 // ConsumeMessageOrderlyService#lockMQPeriodically
 if (!this.stopped) {
 this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
 }
 // RebalanceImpl#lockAll
 HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();// 依据brokerName从processQueueTable获得已经交易的序列结合
 ......
 Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);// 向Broker推送锁定线程池的命令
 for (MessageQueue mq : lockOKMQSet) {
 ProcessQueue processQueue = this.processQueueTable.get(mq);
 if (processQueue != null) {
 if (!processQueue.isLocked()) {
 log.info(\"the message queue locked OK, Group: {} {}\", this.consumerGroup, mq);
 }
 processQueue.setLocked(true);
 processQueue.setLastLockTimestamp(System.currentTimeMillis());
 }
 }
 ......

2.1.3 MQClientInstance#start:运行MQClientInstance。全过程较繁杂,放进大文章标题四中分析。

 // DefaultMQPushConsumerImpl#start
 mQClientFactory.start();

四.源码分析 – Consumer接受次序信息(二)

1 MQClientInstance#start:运行手机客户端案例MQClientInstance。

 // MQClientInstance#start
 synchronized (this) {
 switch (this.serviceState) {
 case CREATE_JUST:
 ......
 // Start pull service 运行获取信息服务项目
 this.pullMessageService.start();
 // Start rebalance service 启动消费端web服务服务项目
 this.rebalanceService.start();
 ......

1.1 PullMessageService#run:启动拉取消息服务项目。具体调用的是DefaultMQPushConsumerImpl的pullMessage方式。

 // PullMessageService#run
 public void run() {
 log.info(this.getServiceName()   \" service started\");
 while (!this.isStopped()) {
 try {
 PullRequest pullRequest = this.pullRequestQueue.take();
 this.pullMessage(pullRequest);
 } catch (InterruptedException ignored) {
 } catch (Exception e) {
 log.error(\"Pull Message Service Run Method exception\", e);
 }
 }
 log.info(this.getServiceName()   \" service end\");
 }
 // PullMessageService#pullMessage
 private void pullMessage(final PullRequest pullRequest) {
 final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
 if (consumer != null) {
 DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
 impl.pullMessage(pullRequest);// 调用DefaultMQPushConsumerImpl的pullMessage
 } else {
 log.warn(\"No matched consumer for the PullRequest {}, drop it\", pullRequest);
 }
 }

1.1.1.1 DefaultMQPushConsumerImpl#pullMessage:拉取消息。递交到
ConsumeMessageOrderlyService的线程池consumeExecutor中实行。

 // DefaultMQPushConsumerImpl#pullMessage
 ......
 PullCallback pullCallback = new PullCallback() {
 @Override
 public void onSuccess(PullResult pullResult) {
 switch (pullResult.getPullStatus()) {
 case FOUND:
 long prevRequestOffset = pullRequest.getNextOffset();
 pullRequest.setNextOffset(pullResult.getNextBeginOffset());
 long pullRT = System.currentTimeMillis() - beginTimestamp;
 ......
 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
 pullResult.getMsgFoundList(),
 processQueue,
 pullRequest.getMessageQueue(),
 dispatchToConsume);
 ......

1.1.1.1.1.1.1 ConsumeRequest#run:解决消息消费的进程。

 // ConsumeMessageOrderlyService.ConsumeRequest#run
 List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
 ......
 long beginTimestamp = System.currentTimeMillis();
 ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
 boolean hasException = false;
 try {
 this.processQueue.getLockConsume().lock();
 if (this.processQueue.isDropped()) {
 log.warn(\"consumeMessage, the message queue not be able to consume, because it\'s dropped. {}\",this.messageQueue);
 break;
 }
 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);// 具体消费消息的地区,调整消息窃听器的consumeMessage方式
 } catch (Throwable e) {
 log.warn(\"consumeMessage exception: {} Group: {} Msgs: {} MQ: {}\",
 RemotingHelper.exceptionSimpleDesc(e),
 ConsumeMessageOrderlyService.this.consumerGroup,
 msgs,messageQueue);
 hasException = true;
 } finally {
 this.processQueue.getLockConsume().unlock();
 }
 ......

1.2 RebalanceService#run:启动消息端web服务服务项目。

 // RebalanceService#run
 public void run() {
 log.info(this.getServiceName()   \" service started\");
 while (!this.isStopped()) {
 this.waitForRunning(waitInterval);
 this.mqClientFactory.doRebalance();
 }
 log.info(this.getServiceName()   \" service end\");
 }
 // MQClientInstance#doRebalance
 public void doRebalance() {
 for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
 MQConsumerInner impl = entry.getValue();
 if (impl != null) {
 try {
 impl.doRebalance();
 } catch (Throwable e) {
 log.error(\"doRebalance exception\", e);
 }
 }
 }
 }
 // DefaultMQPushConsumerImpl#doRebalance
 public void doRebalance() {
 if (!this.pause) {
 this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
 }
 }

1.2.1.1.1 RebalanceImpl#doRebalance:web服务服务项目类解决。

 // RebalanceImpl#doRebalance
 public void doRebalance(final boolean isOrder) {
 Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
 if (subTable != null) {
 for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
 final String topic = entry.getKey();
 try {
 this.rebalanceByTopic(topic, isOrder);
 } catch (Throwable e) {
 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 log.warn(\"rebalanceByTopic Exception\", e);
 }
 }
 }
 }
 this.truncateMessageQueueNotMyTopic();
 }
 // RebalanceImpl#rebalanceByTopic
 switch (messageModel) {
 case BROADCASTING: {
 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
 if (mqSet != null) {
 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);// 依据Toipc除去queue
 if (changed) {
 this.messageQueueChanged(topic, mqSet, mqSet);
 log.info(\"messageQueueChanged {} {} {} {}\",
 consumerGroup,
 topic,
 mqSet,
 mqSet);
 }
 } else {
 ......
 // RebalanceImpl#updateProcessQueueTableInRebalance
 this.dispatchPullRequest(pullRequestList);// RebalancePushImpl派发消息

1.2.1.1.1.1.1.1 RebalancePushImpl#dispatchPullRequest:RebalancePushImpl分发。

 // RebalancePushImpl#dispatchPullRequest
 public void dispatchPullRequest(List<PullRequest> pullRequestList) {
 for (PullRequest pullRequest : pullRequestList) {
 this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
 log.info(\"doRebalance, {}, add a new pull request {}\", consumerGroup, pullRequest);
 }
 }

五.汇总

对比Producer的推送步骤,Consumer的接受步骤略微繁杂一点。根据以上的源代码剖析,可以了解RocketMQ是如何确保信息的井然有序的:

1.根据ReblanceImp的lockAll方式,每过一段时间按时锁定现阶段交易摆正在交易的序列。设定当地序列ProcessQueue的locked特性为true。确保broker中的每一个线程池只相应一个交易端;

2.此外,交易端也是根据锁,确保每一个ProcessQueue只有一个进程交易。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

(0)
上一篇 2022年5月9日 上午11:48
下一篇 2022年5月9日 上午11:50

相关推荐

  • 营销管理的实质是什么,简述市场营销管理过程步骤

    一、销售管理流程 销售管理流程包括六个:  第一,开单与发货流程。  第二,结算与返利流程。  第三,供货与收款流程。  第四,售后服务管理流程。 第五,网络开发与维护流程。  第六,价格与市场秩序管理流程。 二、市场管理流程 市场管理流程包括七个: 第一,营销资源的分配及效果监测;  第二,品牌规划与管理;  第三,主题营销策划和管理;  第四,产品组合策略;  第五,广告管理; 第六,促销策略…

    2022年6月17日
    710
  • 薄饼机怎么用(薄饼机使用教程及测评)

    作为手残党的一员,每次在烙饼的时候都是一塌糊涂,所以就直接放弃了这门手艺!不过,摩飞近期推出了一款轻食薄饼一体机,我在极果网看到了它的试用,马上就申请了!在收到摩飞轻食薄饼一体机之后,就被它简洁大方的外观吸引了。在这一段时间实际体验过程中也能感受到产品细节上的周到设计,给我的感觉就是很贴心,面面俱到。下面我就详细的给大家介绍一下这款烙饼神器! 外观萌萌的 外观设计上,摩飞轻食薄饼一体机发热盘采用了…

    2022年7月19日
    1970
  • 安卓怎么定位手机位置,一招教你精准定位

    智能手机现在基本上人手一部,而手机定位这个功能已经非常常见。 例如:打车软件的司机可以准确的知道你的位置,就可以规划出一条更好的路线。 这是很典型的定位服务应用,某些地图也是同样的。 那样,它们是怎么定位到我们的呢? 手机定位的原理 1、基站信号定位:一部手机能接打电话,就说明这部手机附近有基站进行交互信号,而通过这个基站跟手机的交互信号,就能很准确的知道手机的位置。 2、wifi信号定位:每个w…

    2022年9月12日
    760
  • 微信封号原因查询,微信限制登录一般多久

    hello又和大家见面啦~ 今天来安利微信技巧, 而且是很重要的那种。 微信是越来越重要了, 代表的是身份的象征。 如果被不法分子拿到了敏感信息, 问题就大了。 你可能莫名其妙就背了几笔网贷, 傻乎乎地做了某公司法人, 变成老赖什么的。 虽然微信一直在严厉打击, 但挡不住利益动人, 网上仍有很多买卖微信帐号的渠道。 其中很多, 都声称自己卖的微信号, 是那种已经通过实名认证的, 高级微信号。 有买…

    2022年5月30日
    850
  • 公司股权变更流程,变更公司股权需要什么手续

    企业股东变更流程: 第一步,申请人持相关材料向市政务服务中心工商局窗口提出申请,经受理审查员初审通过,开具受理通知书或者申请材料接收单;不符合受理条件的,在当场或者5个工作日内一次性告知申请人应当补正的全部材料(出具告知单)。 第二步,对申请人申请材料齐全、符合法定形式的,当场作出是否准予登记的决定并出具登记决定通知书;需要对申请材料的实质内容进行核实的,出具企业登记材料需要核实事项告知书,在10…

    2022年6月25日
    680
  • 做什么比较挣钱快,来钱最快的零基础赚钱方法

    实话实说,现在很多人都在寻找赚钱快,赚钱多的行业创业项目,事实上这种行业也一直存在,关键是和创业者自身条件以及资源是否匹配?如果自己不具备条件,看似赚钱快的行业,自己也无能为力。我这里推荐几个供参考。 小生意 我们暂且就把这些街头巷尾的小生意称之为一个行业。目前市场上的一些小生意离钱最近,赚钱也最快。譬如小吃铺,奶茶店,摆地摊,流动售卖点这些都是,赚钱快是肯定的,至于赚钱的多少就是完全取决于个人能…

    2022年9月12日
    550
  • 纪梵希散粉1号色上脸图(上脸效果不是一般的好)

    那天一直是间歇性得能刷出一两个来的状态,第一次下单刚输完地址就没货了,好在后来刷新出了少量补货成功get 订单记录也木有了,偷一个爆料图 网上这款散粉功课很多了,基本上化妆的妹子们都曾经或正在使用这款,口碑很好,可以算是便宜大碗了,而且颜值超高,1号色适合提亮。出于对纪梵希外观的钟爱和史低价,迅速入手了这款,如果没记错的话这应该是meidebi爆过的最低价了,我对此价格表示很满意~~~(某猫旗舰店…

    2022年9月25日
    520
  • 互联网房产中介模式是什么,互联网房产交易平台盘点

    (图片来演:全景视觉) 经济观察报记者张凤玲杨依依曾裹挟着百亿资本的互联网房产中介们,开始从两年前颠覆传统的热潮中调整。房多多砍掉金融部门、爱屋吉屋注销超过15家子公司、房天下重整业务结构等,都显示新一代的房产中介生态玩家正在经历低谷。 如今,资本杀入互联网房产中介领域的盛况不再,也很难看到新入局者高举颠覆行业的旗帜。 可玩味的是,在互联网房产中介颠覆传统的几年里,传统中介的代表公司之一链家,年交…

    2022年6月1日
    800
  • 安全管理思路和方法,提升安全管理的五步法则

    近年来,虽然国家出台了一系列的安全法律法规、管理办法、安全操作规程及安全培训等,我们国家的安全事故还是不断,还是无法根除。为什么呢? 从1:29:300“海因里希”安全法则来说,每发生一起死亡或重大安全事故必然发生了29起小的工伤伤害事故,29起小的工伤伤害事故里必然发生了300起未伤害到人的事故。这里说明了事故不是偶然的,是必然的,是长期的违章操作造成的。 从事故调查结果来说:80%的事故是人的…

    2022年10月8日
    360
  • 开什么店成本低又赚钱,这四个低成本创业项目适合白手起家

    年轻人干什么赚钱?给别人打工?自己创业?冲劲十足的年轻人大多数会选择后者吧,但是由于经验不足等原因,想创业的年轻人在选择创业项目时还是要十分谨慎,以免将风险扩大 。 那到底什么样的创业项目值得投资呢?我们又该如何去选择一个好的投资项目呢?这个是投资者首先需要关注的问题。俗话说,方向不对,努力白费。找到一个合适的项目就相当于找到一个风口。 一、百变泥匠 现在市场上的陶艺行业特别的火爆,很多家长朋友们…

    2022年8月22日
    650

发表回复

登录后才能评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信