RocketMQ源码阅读
环境 
NameServer 架构设计 
Producer发送某一个主题到消息服务器,消息服务器将消息持久化存储,Consumer订阅该兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费者(Push模式)或者消费者主动向消息服务器拉取(Pull模式),同时部署多台消息服务器共同承担消息的存储 
Producer如何知道消息要发送到哪台消息服务器?一个Broker宕机后,Producer如何在不重启服务下发送消息给新的Broker?——NameServer
Broker启动时向所有NameServer注册,Producer发送消息前从NameServer获取Broker地址列表,根据负载均衡算法从列表选择一台Broker发送 
NameServer与每台Broker保持长连接,间隔30S检测Broker是否存活,如果Broker宕机则从路由注册表中删除(路由变化不会马上通知Producer——降低NameServer实现的复杂度,消息发送端提供容错机制保证消息发送的可用性) 
通过部署多台NameServer实现高可用,但彼此之间不通讯——在某一个时刻Nameservers并不完全相同,但并不影响消息发送 
 
 
 
启动流程 
启动类:org.apache.rocketmq.namesrv.NamesrvStartup 
 
步骤一 
解析配置文件,填充NameServerConfig、NettyServerConfig属性值,创建NamesrvController 
 
代码:NamesrvController#createNamesrvController 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 final  NamesrvConfig namesrvConfig = new  NamesrvConfig();final  NettyServerConfig nettyServerConfig = new  NettyServerConfig();nettyServerConfig.setListenPort(9876 ); if  (commandLine.hasOption('c' )) {    String file = commandLine.getOptionValue('c' );     if  (file != null ) {         InputStream in = new  BufferedInputStream(new  FileInputStream(file));         properties = new  Properties();         properties.load(in);         MixAll.properties2Object(properties, namesrvConfig);         MixAll.properties2Object(properties, nettyServerConfig);         namesrvConfig.setConfigStorePath(file);         System.out.printf("load config properties file OK, %s%n" , file);         in.close();     } } if  (commandLine.hasOption('p' )) {    InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);     MixAll.printObjectProperties(console, namesrvConfig);     MixAll.printObjectProperties(console, nettyServerConfig);     System.exit(0 ); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); final  NamesrvController controller = new  NamesrvController(namesrvConfig, nettyServerConfig);
NamesrvConfig属性 
1 2 3 4 5 6 private  String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));  private  String kvConfigPath = System.getProperty("user.home" ) + File.separator + "namesrv"  + File.separator + "kvConfig.json" ;  private  String configStorePath = System.getProperty("user.home" ) + File.separator + "namesrv"  + File.separator + "namesrv.properties" ;  private  String productEnvName = "center" ;private  boolean  clusterTest = false ;private  boolean  orderMessageEnable = false ;  
NettyServerConfig属性 
1 2 3 4 5 6 7 8 9 10 11 private  int  listenPort = 8888 ;  private  int  serverWorkerThreads = 8 ;  private  int  serverCallbackExecutorThreads = 0 ;  private  int  serverSelectorThreads = 3 ;  private  int  serverOnewaySemaphoreValue = 256 ;  private  int  serverAsyncSemaphoreValue = 64 ;  private  int  serverChannelMaxIdleTimeSeconds = 120 ;  private  int  serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;  private  int  serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;  private  boolean  serverPooledByteBufAllocatorEnable = true ;  private  boolean  useEpollNativeSelector = false ;  
步骤二 
根据启动属性创建NamesrvController实例,并初始化该实例(NameServer核心控制器) 
 
代码:NamesrvController#initialize 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public  boolean  initialize ()  	     this .kvConfigManager.load(); 	     this .remotingServer = new  NettyRemotingServer(this .nettyServerConfig, this .brokerHousekeepingService); 	     this .remotingExecutor =         Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new  ThreadFactoryImpl("RemotingExecutorThread_" ));     this .registerProcessor();     this .scheduledExecutorService.scheduleAtFixedRate(new  Runnable() {         @Override          public  void  run ()               NamesrvController.this .routeInfoManager.scanNotActiveBroker();         }     }, 5 , 10 , TimeUnit.SECONDS); 	 	this .scheduledExecutorService.scheduleAtFixedRate(new  Runnable() {         @Override          public  void  run ()               NamesrvController.this .kvConfigManager.printAllPeriodically();         }     }, 1 , 10 , TimeUnit.MINUTES);          return  true ; } 
步骤三 
代码:NamesrvStartup#start 
1 2 3 4 5 6 7 8 9 Runtime.getRuntime().addShutdownHook(new  ShutdownHookThread(log, new  Callable<Void>() {     @Override      public  Void call ()  throws  Exception                   controller.shutdown();         return  null ;     } })); 
路由管理 
NameServer为Producer、Consumer提供关于Topic的路由信息,需要存储路由的基础信息并还要管理Broker节点——路由注册、路由删除等 
路由注册通过Broker与NameServer的心跳功能实现。Broker启动时向集群中所有NameServer发送心跳,每隔30s向集群中所有NameServer 发送心跳 
NameServer收到心跳更新brokerLiveTable中BrokerLiveInfo的lastUpdataTimeStamp 
NameServer每隔10s扫描brokerLiveTable,连续120S没有收到心跳包则移除Broker的路由信息同时关闭Socket连接 
心跳包含BrokerId,Broker地址,Broker名称,Broker所属集群名称、Broker关联的FilterServer列表 
 
路由元信息 代码:RouteInfoManager 
1 2 3 4 5 private  final  HashMap<String, List<QueueData>> topicQueueTable;  private  final  HashMap<String, BrokerData> brokerAddrTable;  private  final  HashMap<String, Set<String>> clusterAddrTable;  private  final  HashMap<String, BrokerLiveInfo> brokerLiveTable;  private  final  HashMap<String, List<String>> filterServerTable;  
一个Topic拥有多个消息队列,一个Broker为每一个Topic创建4个读队列和4个写队列 
多个Broker组成一个集群,集群由相同的多台Broker组成Master-Slave,brokerId为0代表Master,大于0为Slave 
BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间 
 
路由注册 1)发送心跳包 代码:BrokerController#start 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 this .registerBrokerAll(true , false , true );this .scheduledExecutorService.scheduleAtFixedRate(new  Runnable() {    @Override      public  void  run ()           try  {             BrokerController.this .registerBrokerAll(true , false , brokerConfig.isForceRegister());         } catch  (Throwable e) {             log.error("registerBrokerAll Exception" , e);         }     } }, 1000  * 10 , Math.max(10000 , Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000 )),                                                    TimeUnit.MILLISECONDS); 
代码:BrokerOuterAPI#registerBrokerAll 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 List<String> nameServerAddressList = this .remotingClient.getNameServerAddressList(); if  (nameServerAddressList != null  && nameServerAddressList.size() > 0 ) {         final  RegisterBrokerRequestHeader requestHeader = new  RegisterBrokerRequestHeader();     requestHeader.setBrokerAddr(brokerAddr);     requestHeader.setBrokerId(brokerId);     requestHeader.setBrokerName(brokerName);     requestHeader.setClusterName(clusterName);     requestHeader.setHaServerAddr(haServerAddr);     requestHeader.setCompressed(compressed); 	     RegisterBrokerBody requestBody = new  RegisterBrokerBody();     requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);     requestBody.setFilterServerList(filterServerList);     final  byte [] body = requestBody.encode(compressed);     final  int  bodyCrc32 = UtilAll.crc32(body);     requestHeader.setBodyCrc32(bodyCrc32);     final  CountDownLatch countDownLatch = new  CountDownLatch(nameServerAddressList.size());     for  (final  String namesrvAddr : nameServerAddressList) {         brokerOuterExecutor.execute(new  Runnable() {             @Override              public  void  run ()                   try  {                                          RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);                     if  (result != null ) {                         registerBrokerResultList.add(result);                     }                     log.info("register broker[{}]to name server {} OK" , brokerId, namesrvAddr);                 } catch  (Exception e) {                     log.warn("registerBroker Exception, {}" , namesrvAddr, e);                 } finally  {                     countDownLatch.countDown();                 }             }         });     }     try  {         countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);     } catch  (InterruptedException e) {     } } 
代码:BrokerOutAPI#registerBroker 
1 2 3 4 5 6 7 8 9 if  (oneway) {    try  {         this .remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);     } catch  (RemotingTooMuchRequestException e) {              }     return  null ; } RemotingCommand response = this .remotingClient.invokeSync(namesrvAddr, request, timeoutMills); 
2)处理心跳包 
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor处理类解析请求类型,如果请求类型是为**REGISTER_BROKER** ,则将请求转发到RouteInfoManager#regiesterBroker 
代码:DefaultRequestProcessor#processRequest 
1 2 3 4 5 6 7 8 9 case  RequestCode.REGISTER_BROKER:	Version brokerVersion = MQVersion.value2Version(request.getVersion()); 	if  (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { 	    return  this .registerBrokerWithFilterServer(ctx, request); 	} else  {          	    return  this .registerBroker(ctx, request); 	} 
代码:DefaultRequestProcessor#registerBroker 
1 2 3 4 5 6 7 8 9 10 RegisterBrokerResult result = this .namesrvController.getRouteInfoManager().registerBroker(     requestHeader.getClusterName(),     requestHeader.getBrokerAddr(),     requestHeader.getBrokerName(),     requestHeader.getBrokerId(),     requestHeader.getHaServerAddr(),     topicConfigWrapper,     null ,     ctx.channel() ); 
代码:RouteInfoManager#registerBroker 
维护路由信息
1 2 3 4 5 6 7 8 9 this .lock.writeLock().lockInterruptibly();Set<String> brokerNames = this .clusterAddrTable.get(clusterName); if  (null  == brokerNames) {    brokerNames = new  HashSet<String>();     this .clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 BrokerData brokerData = this .brokerAddrTable.get(brokerName); if  (null  == brokerData) {    registerFirst = true ;     brokerData = new  BrokerData(clusterName, brokerName, new  HashMap<Long, String>());     this .brokerAddrTable.put(brokerName, brokerData); } Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while  (it.hasNext()) {    Entry<Long, String> item = it.next();     if  (null  != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {         it.remove();     } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null  == oldAddr); 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 if  (null  != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {    if  (this .isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) ||          registerFirst) {         ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();         if  (tcTable != null ) {             for  (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {                 this .createAndUpdateQueueData(brokerName, entry.getValue());             }         }     } } ***代码:RouteInfoManager#createAndUpdateQueueData*** private  void  createAndUpdateQueueData (final  String brokerName, final  TopicConfig topicConfig)       	QueueData queueData = new  QueueData(); 	queueData.setBrokerName(brokerName); 	queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); 	queueData.setReadQueueNums(topicConfig.getReadQueueNums()); 	queueData.setPerm(topicConfig.getPerm()); 	queueData.setTopicSynFlag(topicConfig.getTopicSysFlag()); 	 	List<QueueData> queueDataList = this .topicQueueTable.get(topicConfig.getTopicName());      	if  (null  == queueDataList) { 	    queueDataList = new  LinkedList<QueueData>(); 	    queueDataList.add(queueData); 	    this .topicQueueTable.put(topicConfig.getTopicName(), queueDataList); 	    log.info("new topic registered, {} {}" , topicConfig.getTopicName(), queueData); 	} else  {          	    boolean  addNewOne = true ; 	    Iterator<QueueData> it = queueDataList.iterator(); 	    while  (it.hasNext()) { 	        QueueData qd = it.next();              	        if  (qd.getBrokerName().equals(brokerName)) { 	            if  (qd.equals(queueData)) { 	                addNewOne = false ; 	        } else  { 	                    log.info("topic changed, {} OLD: {} NEW: {}" , topicConfig.getTopicName(), qd, 	                        queueData); 	                    it.remove(); 	                } 	            } 	        } 		         if  (addNewOne) {             queueDataList.add(queueData);         }     } } 
1 2 3 4 5 6 BrokerLiveInfo prevBrokerLiveInfo = this .brokerLiveTable.put(brokerAddr,new  BrokerLiveInfo(     System.currentTimeMillis(),     topicConfigWrapper.getDataVersion(),     channel,     haServerAddr)); 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 if  (filterServerList != null ) {    if  (filterServerList.isEmpty()) {         this .filterServerTable.remove(brokerAddr);     } else  {         this .filterServerTable.put(brokerAddr, filterServerList);     } } if  (MixAll.MASTER_ID != brokerId) {    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);     if  (masterAddr != null ) {         BrokerLiveInfo brokerLiveInfo = this .brokerLiveTable.get(masterAddr);         if  (brokerLiveInfo != null ) {             result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());             result.setMasterAddr(masterAddr);         }     } } 
路由删除 
删除的场景:
NameServer定期扫描brokerLiveTable,检测上次心跳包与当前系统的时间差,如果时间超过120s,则移除broker 
Broker在正常关闭的情况下,会执行unregisterBroker指令 
 
 
二者都是从相关路由表中删除与该broker相关的信息 
 
代码:NamesrvController#initialize 
1 2 3 4 5 6 7 8 this .scheduledExecutorService.scheduleAtFixedRate(new  Runnable() {    @Override      public  void  run ()           NamesrvController.this .routeInfoManager.scanNotActiveBroker();     } }, 5 , 10 , TimeUnit.SECONDS); 
代码:RouteInfoManager#scanNotActiveBroker 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public  void  scanNotActiveBroker ()           Iterator<Entry<String, BrokerLiveInfo>> it = this .brokerLiveTable.entrySet().iterator();          while  (it.hasNext()) {         Entry<String, BrokerLiveInfo> next = it.next();         long  last = next.getValue().getLastUpdateTimestamp();                  if  ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {                          RemotingUtil.closeChannel(next.getValue().getChannel());                          it.remove();                          this .onChannelDestroy(next.getKey(), next.getValue().getChannel());         }     } } 
代码:RouteInfoManager#onChannelDestroy 
1 2 3 4 this .lock.writeLock().lockInterruptibly();this .brokerLiveTable.remove(brokerAddrFound);this .filterServerTable.remove(brokerAddrFound);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 String brokerNameFound = null ; boolean  removeBrokerName = false ;Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this .brokerAddrTable.entrySet().iterator(); while  (itBrokerAddrTable.hasNext() && (null  == brokerNameFound)) {    BrokerData brokerData = itBrokerAddrTable.next().getValue();          Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();     while  (it.hasNext()) {         Entry<Long, String> entry = it.next();         Long brokerId = entry.getKey();         String brokerAddr = entry.getValue();                  if  (brokerAddr.equals(brokerAddrFound)) {             brokerNameFound = brokerData.getBrokerName();             it.remove();             log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed" ,                 brokerId, brokerAddr);             break ;         }     } 	     if  (brokerData.getBrokerAddrs().isEmpty()) {         removeBrokerName = true ;         itBrokerAddrTable.remove();         log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed" ,             brokerData.getBrokerName());     } } 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 if  (brokerNameFound != null  && removeBrokerName) {    Iterator<Entry<String, Set<String>>> it = this .clusterAddrTable.entrySet().iterator();          while  (it.hasNext()) {         Entry<String, Set<String>> entry = it.next();                  String clusterName = entry.getKey();                  Set<String> brokerNames = entry.getValue();                  boolean  removed = brokerNames.remove(brokerNameFound);         if  (removed) {             log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed" ,                 brokerNameFound, clusterName);             if  (brokerNames.isEmpty()) {                 log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster" ,                     clusterName);                                  it.remove();             }             break ;         }     } } 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 if  (removeBrokerName) {         Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =         this .topicQueueTable.entrySet().iterator();     while  (itTopicQueueTable.hasNext()) {         Entry<String, List<QueueData>> entry = itTopicQueueTable.next();                  String topic = entry.getKey();                  List<QueueData> queueDataList = entry.getValue(); 		         Iterator<QueueData> itQueueData = queueDataList.iterator();         while  (itQueueData.hasNext()) {                          QueueData queueData = itQueueData.next();             if  (queueData.getBrokerName().equals(brokerNameFound)) {                 itQueueData.remove();                 log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed" ,                     topic, queueData);             }         } 		         if  (queueDataList.isEmpty()) {             itTopicQueueTable.remove();             log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed" ,                 topic);         }     } } 
1 2 3 4 finally  {    this .lock.writeLock().unlock(); } 
路由发现 
路由发现是非实时的,Topic路由变化后NameServer不会主动推送给客户端(Producer),由客户端定时拉取Topic最新的路由 
 
代码:DefaultRequestProcessor#getRouteInfoByTopic 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public  RemotingCommand getRouteInfoByTopic (ChannelHandlerContext ctx,     RemotingCommand request)  throws  RemotingCommandException     final  RemotingCommand response = RemotingCommand.createResponseCommand(null );     final  GetRouteInfoRequestHeader requestHeader =         (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); 	     TopicRouteData topicRouteData = this .namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); 	     if  (topicRouteData != null ) {         if  (this .namesrvController.getNamesrvConfig().isOrderMessageEnable()) {             String orderTopicConf =                 this .namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,                     requestHeader.getTopic());             topicRouteData.setOrderTopicConf(orderTopicConf);         }         byte [] content = topicRouteData.encode();         response.setBody(content);         response.setCode(ResponseCode.SUCCESS);         response.setRemark(null );         return  response;     }     response.setCode(ResponseCode.TOPIC_NOT_EXIST);     response.setRemark("No topic route info in name server for the topic: "  + requestHeader.getTopic()         + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));     return  response; } 
Producer 
方法和属性 接口方法 
MQAdmin
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void  createTopic (final  String key, final  String newTopic, final  int  queueNum)  throws  MQClientExceptionlong  searchOffset (final  MessageQueue mq, final  long  timestamp) long  maxOffset (final  MessageQueue mq)  throws  MQClientExceptionlong  minOffset (final  MessageQueue mq)  MessageExt viewMessage (final  String offsetMsgId)  throws  RemotingException, MQBrokerException, InterruptedException, MQClientException ;  QueryResult queryMessage (final  String topic, final  String key, final  int  maxNum, final  long  begin,  final  long  end)throws  MQClientException, InterruptedExceptionMessageExt viewMessage (String topic,String msgId)  throws  RemotingException, MQBrokerException, InterruptedException, MQClientException ;
MQProducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void  start ()  throws  MQClientExceptionvoid  shutdown () List<MessageQueue> fetchPublishMessageQueues (final  String topic)  throws  MQClientException ;SendResult send (final  Message msg)  throws  MQClientException, RemotingException, MQBrokerException,         InterruptedException ;SendResult send (final  Message msg, final  long  timeout)  throws  MQClientException,         RemotingException, MQBrokerException, InterruptedException ;void  send (final  Message msg, final  SendCallback sendCallback)  throws  MQClientException,        RemotingException, InterruptedException ;void  send (final  Message msg, final  SendCallback sendCallback, final  long  timeout)     throws  MQClientException, RemotingException, InterruptedException ;void  sendOneway (final  Message msg)  throws  MQClientException, RemotingException,    InterruptedException ;SendResult send (final  Message msg, final  MessageQueue mq)  throws  MQClientException,     RemotingException, MQBrokerException, InterruptedException ;void  send (final  Message msg, final  MessageQueue mq, final  SendCallback sendCallback)     throws  MQClientException, RemotingException, InterruptedException ;void  sendOneway (final  Message msg, final  MessageQueue mq)  throws  MQClientException,    RemotingException, InterruptedException ;SendResult send (final  Collection<Message> msgs)  throws  MQClientException, RemotingException, MQBrokerException,InterruptedException ;
 
类属性 1 2 3 4 5 6 7 8 9 producerGroup:生产者所属组 createTopicKey:默认Topic defaultTopicQueueNums:默认主题在每一个Broker队列数量 sendMsgTimeout:发送消息默认超时时间,默认3 s compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4 k retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为2 ,总共执行3 次 retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2  retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false  maxMessageSize:允许发送的最大消息长度,默认为4 M 
启动流程 
JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表
ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String,MQClientInstance>();
一个clientId只会创建一个MQClientInstance,其中封装RocketMQ网络处理API,是Producer、Consumer、NameServer、Broker之间的网络通道
 
代码:DefaultMQProducerImpl#start 
1 2 3 4 5 6 7 8 this .checkConfig();if  (!this .defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {    this .defaultMQProducer.changeInstanceNameToPID(); } this .mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this .defaultMQProducer, rpcHook);
代码:MQClientManager#getAndCreateMQClientInstance 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public  MQClientInstance getAndCreateMQClientInstance (final  ClientConfig clientConfig,                                                       RPCHook rpcHook)           String clientId = clientConfig.buildMQClientId();          MQClientInstance instance = this .factoryTable.get(clientId);          if  (null  == instance) {         instance =             new  MQClientInstance(clientConfig.cloneClientConfig(),                 this .factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);         MQClientInstance prev = this .factoryTable.putIfAbsent(clientId, instance);         if  (prev != null ) {             instance = prev;             log.warn("Returned Previous MQClientInstance for clientId:[{}]" , clientId);         } else  {             log.info("Created new MQClientInstance for clientId:[{}]" , clientId);         }     }     return  instance; } 
代码:DefaultMQProducerImpl#start 
1 2 3 4 5 6 7 8 9 10 11 12 boolean  registerOK = mQClientFactory.registerProducer(this .defaultMQProducer.getProducerGroup(), this );if  (!registerOK) {    this .serviceState = ServiceState.CREATE_JUST;     throw  new  MQClientException("The producer group["  + this .defaultMQProducer.getProducerGroup()         + "] has been created before, specify another name please."  + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),         null ); } if  (startFactory) {    mQClientFactory.start(); } 
消息发送 代码:DefaultMQProducerImpl#send(Message msg) 
1 2 3 4 public  SendResult send (Message msg)      return  send(msg, this .defaultMQProducer.getSendMsgTimeout()); } 
代码:DefaultMQProducerImpl#send(Message msg,long timeout) 
1 2 3 4 public  SendResult send (Message msg,long  timeout)     return  this .sendDefaultImpl(msg, CommunicationMode.SYNC, null , timeout); } 
代码:DefaultMQProducerImpl#sendDefaultImpl 
1 2 Validators.checkMessage(msg, this .defaultMQProducer); 
1)校验消息 代码:Validators#checkMessage 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public  static  void  checkMessage (Message msg, DefaultMQProducer defaultMQProducer)     throws  MQClientException  {         if  (null  == msg) {         throw  new  MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null" );     }          Validators.checkTopic(msg.getTopic()); 		          if  (null  == msg.getBody()) {         throw  new  MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null" );     }     if  (0  == msg.getBody().length) {         throw  new  MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero" );     }     if  (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {         throw  new  MQClientException(ResponseCode.MESSAGE_ILLEGAL,             "the message body size over max value, MAX: "  + defaultMQProducer.getMaxMessageSize());     } } 
2)查找路由 代码:DefaultMQProducerImpl#tryToFindTopicPublishInfo 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private  TopicPublishInfo tryToFindTopicPublishInfo (final  String topic)           TopicPublishInfo topicPublishInfo = this .topicPublishInfoTable.get(topic);          if  (null  == topicPublishInfo || !topicPublishInfo.ok()) {         this .topicPublishInfoTable.putIfAbsent(topic, new  TopicPublishInfo());         this .mQClientFactory.updateTopicRouteInfoFromNameServer(topic);         topicPublishInfo = this .topicPublishInfoTable.get(topic);     }     if  (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {         return  topicPublishInfo;     } else  {                  this .mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true , this .defaultMQProducer);         topicPublishInfo = this .topicPublishInfoTable.get(topic);         return  topicPublishInfo;     } } 
代码:TopicPublishInfo 
1 2 3 4 5 6 7 public  class  TopicPublishInfo      private  boolean  orderTopic = false ;	     private  boolean  haveTopicRouterInfo = false ;      private  List<MessageQueue> messageQueueList = new  ArrayList<MessageQueue>();	     private  volatile  ThreadLocalIndex sendWhichQueue = new  ThreadLocalIndex();     private  TopicRouteData topicRouteData; } 
代码:MQClientInstance#updateTopicRouteInfoFromNameServer 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 TopicRouteData topicRouteData; if  (isDefault && defaultMQProducer != null ) {    topicRouteData = this .mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),         1000  * 3 );     if  (topicRouteData != null ) {         for  (QueueData data : topicRouteData.getQueueDatas()) {             int  queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());             data.setReadQueueNums(queueNums);             data.setWriteQueueNums(queueNums);         }     } } else  {          topicRouteData = this .mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000  * 3 ); } 
代码:MQClientInstance#updateTopicRouteInfoFromNameServer 
1 2 3 4 5 6 7 8 TopicRouteData old = this .topicRouteTable.get(topic); boolean  changed = topicRouteDataIsChange(old, topicRouteData);if  (!changed) {    changed = this .isNeedUpdateTopicRouteInfo(topic); } else  {     log.info("the topic[{}] route info changed, old[{}] ,new[{}]" , topic, old, topicRouteData); } 
代码:MQClientInstance#updateTopicRouteInfoFromNameServer 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if  (changed) {         TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);     publishInfo.setHaveTopicRouterInfo(true );          Iterator<Entry<String, MQProducerInner>> it = this .producerTable.entrySet().iterator();     while  (it.hasNext()) {         Entry<String, MQProducerInner> entry = it.next();         MQProducerInner impl = entry.getValue();         if  (impl != null ) {                          impl.updateTopicPublishInfo(topic, publishInfo);         }     } } 
代码:MQClientInstance#topicRouteData2TopicPublishInfo 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public  static  TopicPublishInfo topicRouteData2TopicPublishInfo (final  String topic, final  TopicRouteData route)      	         TopicPublishInfo info = new  TopicPublishInfo();     	         info.setTopicRouteData(route);     	         if  (route.getOrderTopicConf() != null  && route.getOrderTopicConf().length() > 0 ) {             String[] brokers = route.getOrderTopicConf().split(";" );             for  (String broker : brokers) {                 String[] item = broker.split(":" );                 int  nums = Integer.parseInt(item[1 ]);                 for  (int  i = 0 ; i < nums; i++) {                     MessageQueue mq = new  MessageQueue(topic, item[0 ], i);                     info.getMessageQueueList().add(mq);                 }             }             info.setOrderTopic(true );         } else  {                          List<QueueData> qds = route.getQueueDatas();             Collections.sort(qds);                          for  (QueueData qd : qds) {                                  if  (PermName.isWriteable(qd.getPerm())) {                     BrokerData brokerData = null ;                                          for  (BrokerData bd : route.getBrokerDatas()) {                                                  if  (bd.getBrokerName().equals(qd.getBrokerName())) {                         brokerData = bd;                         break ;                     }                 }                 if  (null  == brokerData) {                     continue ;                 }                 if  (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {                     continue ;                 } 				                 for  (int  i = 0 ; i < qd.getWriteQueueNums(); i++) {                     MessageQueue mq = new  MessageQueue(topic, qd.getBrokerName(), i);                     info.getMessageQueueList().add(mq);                 }             }         }         info.setOrderTopic(false );     } 	     return  info; } 
3)选择队列 
4)发送消息 
消息发送API核心入口:**DefaultMQProducerImpl#sendKernelImpl**  
 
1 2 3 4 5 6 7 8 private  SendResult sendKernelImpl (     final  Message msg,	//待发送消息     final  MessageQueue mq,	//消息发送队列     final  CommunicationMode communicationMode,		//消息发送内模式     final  SendCallback sendCallback,	pp	//异步消息回调函数     final  TopicPublishInfo topicPublishInfo,	//主题路由信息     final  long  timeout	//超时时间     ) 
代码:DefaultMQProducerImpl#sendKernelImpl 
1 2 3 4 5 6 7 String brokerAddr = this .mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if  (null  == brokerAddr) {         tryToFindTopicPublishInfo(mq.getTopic());     brokerAddr = this .mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 if  (!(msg instanceof  MessageBatch)) {    MessageClientIDSetter.setUniqID(msg); } boolean  topicWithNamespace = false ;if  (null  != this .mQClientFactory.getClientConfig().getNamespace()) {    msg.setInstanceId(this .mQClientFactory.getClientConfig().getNamespace());     topicWithNamespace = true ; } int  sysFlag = 0 ;boolean  msgBodyCompressed = false ;if  (this .tryToCompressMessage(msg)) {    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;     msgBodyCompressed = true ; } final  String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if  (tranMsg != null  && Boolean.parseBoolean(tranMsg)) {    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 if  (this .hasSendMessageHook()) {    context = new  SendMessageContext();     context.setProducer(this );     context.setProducerGroup(this .defaultMQProducer.getProducerGroup());     context.setCommunicationMode(communicationMode);     context.setBornHost(this .defaultMQProducer.getClientIP());     context.setBrokerAddr(brokerAddr);     context.setMessage(msg);     context.setMq(mq);     context.setNamespace(this .defaultMQProducer.getNamespace());     String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);     if  (isTrans != null  && isTrans.equals("true" )) {         context.setMsgType(MessageType.Trans_Msg_Half);     }     if  (msg.getProperty("__STARTDELIVERTIME" ) != null  || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null ) {         context.setMsgType(MessageType.Delay_Msg);     }     this .executeSendMessageHookBefore(context); } 
代码:SendMessageHook 
1 2 3 4 5 6 7 public  interface  SendMessageHook      String hookName ()  ;     void  sendMessageBefore (final  SendMessageContext context)      void  sendMessageAfter (final  SendMessageContext context)  } 
代码:DefaultMQProducerImpl#sendKernelImpl 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 SendMessageRequestHeader requestHeader = new  SendMessageRequestHeader(); requestHeader.setProducerGroup(this .defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this .defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this .defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0 ); requestHeader.setUnitMode(this .isUnitMode()); requestHeader.setBatch(msg instanceof  MessageBatch); if  (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);     if  (reconsumeTimes != null ) {         requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));         MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);     }     String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);     if  (maxReconsumeTimes != null ) {         requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));         MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);     } } 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 case  ASYNC:		    Message tmpMessage = msg;     boolean  messageCloned = false ;     if  (msgBodyCompressed) {                                    tmpMessage = MessageAccessor.cloneMessage(msg);         messageCloned = true ;         msg.setBody(prevBody);     }     if  (topicWithNamespace) {         if  (!messageCloned) {             tmpMessage = MessageAccessor.cloneMessage(msg);             messageCloned = true ;         }         msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(),                                                      this .defaultMQProducer.getNamespace()));     } 		long  costTimeAsync = System.currentTimeMillis() - beginStartTime; 		if  (timeout < costTimeAsync) { 		    throw  new  RemotingTooMuchRequestException("sendKernelImpl call timeout" ); 		} 		sendResult = this .mQClientFactory.getMQClientAPIImpl().sendMessage(         			brokerAddr,         			mq.getBrokerName(),         			tmpMessage,         			requestHeader,         			timeout - costTimeAsync,         			communicationMode,         			sendCallback,         			topicPublishInfo,         			this .mQClientFactory,         			this .defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),         			context,         			this );     	break ; case  ONEWAY:case  SYNC:		    long  costTimeSync = System.currentTimeMillis() - beginStartTime;         if  (timeout < costTimeSync) {             throw  new  RemotingTooMuchRequestException("sendKernelImpl call timeout" );         }         sendResult = this .mQClientFactory.getMQClientAPIImpl().sendMessage(             brokerAddr,             mq.getBrokerName(),             msg,             requestHeader,             timeout - costTimeSync,             communicationMode,             context,             this );         break ;     default :         assert  false ;         break ; } 
1 2 3 4 5 if  (this .hasSendMessageHook()) {    context.setSendResult(sendResult);     this .executeSendMessageHookAfter(context); } 
批量消息发送 
同一个主题的多条消息一起打包发送到消息服务端 
如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间 
单批次消息总长度不能超过DefaultMQProducer#maxMessageSize 
如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容? 
 
代码:DefaultMQProducer#send 
1 2 3 4 5 public  SendResult send (Collection<Message> msgs)      throws  MQClientException, RemotingException, MQBrokerException, InterruptedException  {         return  this .defaultMQProducerImpl.send(batch(msgs)); } 
代码:DefaultMQProducer#batch 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private  MessageBatch batch (Collection<Message> msgs)  throws  MQClientException     MessageBatch msgBatch;     try  {                  msgBatch = MessageBatch.generateFromList(msgs);                  for  (Message message : msgBatch) {             Validators.checkMessage(message, this );             MessageClientIDSetter.setUniqID(message);             message.setTopic(withNamespace(message.getTopic()));         }                  msgBatch.setBody(msgBatch.encode());     } catch  (Exception e) {         throw  new  MQClientException("Failed to initiate the MessageBatch" , e);     }          msgBatch.setTopic(withNamespace(msgBatch.getTopic()));     return  msgBatch; } 
消息存储 消息存储核心类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private  final  MessageStoreConfig messageStoreConfig;	private  final  CommitLog commitLog;		private  final  ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> consumeQueueTable;	private  final  FlushConsumeQueueService flushConsumeQueueService;	private  final  CleanCommitLogService cleanCommitLogService;	private  final  CleanConsumeQueueService cleanConsumeQueueService;	private  final  IndexService indexService;	private  final  AllocateMappedFileService allocateMappedFileService;	private  final  ReputMessageService reputMessageService;private  final  HAService haService;	private  final  ScheduleMessageService scheduleMessageService;	private  final  StoreStatsService storeStatsService;	private  final  TransientStorePool transientStorePool;	private  final  BrokerStatsManager brokerStatsManager;	private  final  MessageArrivingListener messageArrivingListener;	private  final  BrokerConfig brokerConfig;	private  StoreCheckpoint storeCheckpoint;	private  final  LinkedList<CommitLogDispatcher> dispatcherList;	
消息存储流程 消息存储入口:DefaultMessageStore#putMessage 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 if  (BrokerRole.SLAVE == this .messageStoreConfig.getBrokerRole()) {        long  value = this .printTimes.getAndIncrement();         if  ((value % 50000 ) == 0 ) {             log.warn("message store is slave mode, so putMessage is forbidden " );         }     return  new  PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null ); } if  (!this .runningFlags.isWriteable()) {        long  value = this .printTimes.getAndIncrement();     	return  new  PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null ); } else  {     this .printTimes.set(0 ); } if  (msg.getTopic().length() > Byte.MAX_VALUE) {    log.warn("putMessage message topic length too long "  + msg.getTopic().length());     return  new  PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null ); } if  (msg.getPropertiesString() != null  && msg.getPropertiesString().length() > Short.MAX_VALUE) {    log.warn("putMessage message properties length too long "  + msg.getPropertiesString().length());     return  new  PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null ); } if  (this .isOSPageCacheBusy()) {    return  new  PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null ); } PutMessageResult result = this .commitLog.putMessage(msg); 
代码:CommitLog#putMessage 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 msg.setStoreTimestamp(beginLockTimestamp); if  (null  == mappedFile || mappedFile.isFull()) {    mappedFile = this .mappedFileQueue.getLastMappedFile(0 );  } if  (null  == mappedFile) {    log.error("create mapped file1 error, topic: "  + msg.getTopic() + " clientAddr: "  + msg.getBornHostString());     beginTimeInLock = 0 ;     return  new  PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null ); } result = mappedFile.appendMessage(msg, this .appendMessageCallback); 
代码:MappedFile#appendMessagesInner 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 int  currentPos = this .wrotePosition.get();if  (currentPos < this .fileSize) {         ByteBuffer byteBuffer = writeBuffer != null  ? writeBuffer.slice() : this .mappedByteBuffer.slice();     byteBuffer.position(currentPos);     AppendMessageResult result = null ;     if  (messageExt instanceof  MessageExtBrokerInner) {        	         result = cb.doAppend(this .getFileFromOffset(), byteBuffer, this .fileSize - currentPos, (MessageExtBrokerInner) messageExt);     } else  if  (messageExt instanceof  MessageExtBatch) {         result = cb.doAppend(this .getFileFromOffset(), byteBuffer, this .fileSize - currentPos, (MessageExtBatch) messageExt);     } else  {         return  new  AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);     }     this .wrotePosition.addAndGet(result.getWroteBytes());     this .storeTimestamp = result.getStoreTimestamp();     return  result; } 
代码:CommitLog#doAppend 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 long  wroteOffset = fileFromOffset + byteBuffer.position();this .resetByteBuffer(hostHolder, 8 );String msgId = MessageDecoder.createMessageId(this .msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); keyBuilder.setLength(0 ); keyBuilder.append(msgInner.getTopic()); keyBuilder.append('-' ); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this .topicQueueTable.get(key); if  (null  == queueOffset) {    queueOffset = 0L ;     CommitLog.this .topicQueueTable.put(key, queueOffset); } final  byte [] propertiesData =msgInner.getPropertiesString() == null  ? null  : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);final  int  propertiesLength = propertiesData == null  ? 0  : propertiesData.length;if  (propertiesLength > Short.MAX_VALUE) {    log.warn("putMessage message properties length too long. length={}" , propertiesData.length);     return  new  AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); } final  byte [] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);final  int  topicLength = topicData.length;final  int  bodyLength = msgInner.getBody() == null  ? 0  : msgInner.getBody().length;final  int  msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
代码:CommitLog#calMsgLength 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 protected  static  int  calMsgLength (int  bodyLength, int  topicLength, int  propertiesLength)      final  int  msgLen = 4           + 4           + 4           + 4           + 4           + 8           + 8           + 4           + 8           + 8           + 8           + 8           + 4           + 8           + 4  + (bodyLength > 0  ? bodyLength : 0 )          + 1  + topicLength          + 2  + (propertiesLength > 0  ? propertiesLength : 0 )          + 0 ;     return  msgLen; } 
代码:CommitLog#doAppend 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 if  (msgLen > this .maxMessageSize) {    CommitLog.log.warn("message size exceeded, msg total size: "  + msgLen + ", msg body size: "  + bodyLength         + ", maxMessageSize: "  + this .maxMessageSize);     return  new  AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } if  ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {    this .resetByteBuffer(this .msgStoreItemMemory, maxBlank);          this .msgStoreItemMemory.putInt(maxBlank);          this .msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);               final  long  beginTimeMills = CommitLog.this .defaultMessageStore.now();     byteBuffer.put(this .msgStoreItemMemory.array(), 0 , maxBlank);     return  new  AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),         queueOffset, CommitLog.this .defaultMessageStore.now() - beginTimeMills); } final  long  beginTimeMills = CommitLog.this .defaultMessageStore.now();byteBuffer.put(this .msgStoreItemMemory.array(), 0 , msgLen); AppendMessageResult result = new  AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset,                                                       msgLen, msgId,msgInner.getStoreTimestamp(),                                                       queueOffset,                                                       CommitLog.this .defaultMessageStore.now()                                                       -beginTimeMills); switch  (tranType) {    case  MessageSysFlag.TRANSACTION_PREPARED_TYPE:     case  MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:         break ;     case  MessageSysFlag.TRANSACTION_NOT_TYPE:     case  MessageSysFlag.TRANSACTION_COMMIT_TYPE:                  CommitLog.this .topicQueueTable.put(key, ++queueOffset);         break ;     default :         break ; } 
代码:CommitLog#putMessage 
1 2 3 4 5 6 putMessageLock.unlock(); handleDiskFlush(result, putMessageResult, msg); handleHA(result, putMessageResult, msg); 
存储文件 
commitLog:消息存储目录 
config:运行期间一些配置信息 
consumerqueue:消息消费队列存储目录 
index:消息索引文件存储目录 
abort:如果存在,表明该Broker非正常关闭 
checkpoint:存储CommitLog文件最后一次刷盘时间戳、consumerqueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳 
 
存储文件内存映射 
RocketMQ通过内存映射文件提高IO性能,CommitLog、ConsumerQueue、IndexFile的单个文件都为固定长度,一个文件写满后创建的新文件,其文件名为该文件第一条消息对应的全局物理偏移量 
 
1)MappedFileQueue 1 2 3 4 5 6 String storePath;	 int  mappedFileSize;	CopyOnWriteArrayList<MappedFile> mappedFiles;	 AllocateMappedFileService allocateMappedFileService;	 long  flushedWhere = 0 ;		long  committedWhere = 0 ;	
2)MappedFile 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 int  OS_PAGE_SIZE = 1024  * 4 ;		AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new  AtomicLong(0 );	 AtomicInteger TOTAL_MAPPED_FILES = new  AtomicInteger(0 );	 AtomicInteger wrotePosition = new  AtomicInteger(0 );	 AtomicInteger committedPosition = new  AtomicInteger(0 );	 AtomicInteger flushedPosition = new  AtomicInteger(0 );	 int  fileSize;	FileChannel fileChannel;	 ByteBuffer writeBuffer = null ;	 TransientStorePool transientStorePool = null ;	 String fileName;	 long  fileFromOffset;	File file;	 MappedByteBuffer mappedByteBuffer;	 volatile  long  storeTimestamp = 0 ;	boolean  firstCreateInQueue = false ;	
3)TransientStorePool 
短暂的存储池 
RocketMQ单独创建一个MappedByteBuffer内存缓存池,用来临时存储数据 
数据先写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RocketMQ引入该机制是为了提供一种内存锁定,将当前堆外内存一直锁定在堆内内存中,避免被进程将内存交换到磁盘 
 
1 2 3 private  final  int  poolSize;		private  final  int  fileSize;		private  final  Deque<ByteBuffer> availableBuffers;	
初始化 
1 2 3 4 5 6 7 8 9 10 11 12 public  void  init ()           for  (int  i = 0 ; i < poolSize; i++) {         ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);         final  long  address = ((DirectBuffer) byteBuffer).address();         Pointer pointer = new  Pointer(address);                  LibC.INSTANCE.mlock(pointer, new  NativeLong(fileSize));         availableBuffers.offer(byteBuffer);     } } 
实时更新消息消费队列与索引文件 
消息消费队列文件、消息属性索引文件是基于CommitLog文件构建的,Producer提交的消息存储在CommitLog后,ConsumerQueue、IndexFile需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟 
RocketMQ通过开启一个线程ReputMessageService来准实时转发CommitLog文件更新事件,相应的任务处理器根据转发的消息更新ConsumerQueue、IndexFile文件 
 
代码:DefaultMessageStore:start 
1 2 3 4 this .reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);this .reputMessageService.start();
代码:DefaultMessageStore:run 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public  void  run ()      DefaultMessageStore.log.info(this .getServiceName() + " service started" ); 	     while  (!this .isStopped()) {         try  {             Thread.sleep(1 );             this .doReput();         } catch  (Exception e) {             DefaultMessageStore.log.warn(this .getServiceName() + " service has exception. " , e);         }     }     DefaultMessageStore.log.info(this .getServiceName() + " service end" ); } 
代码:DefaultMessageStore:deReput 
1 2 3 4 5 6 7 8 9 10 11 for  (int  readSize = 0 ; readSize < result.getSize() && doNext; ) {	DispatchRequest dispatchRequest =                               DefaultMessageStore.this .commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false , false ); 	int  size = dispatchRequest.getBufferSize() == -1  ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); 	if  (dispatchRequest.isSuccess()) { 	    if  (size > 0 ) { 	        DefaultMessageStore.this .doDispatch(dispatchRequest); 	    }     } } 
DispatchRequest 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 String topic;  int  queueId;  long  commitLogOffset;	int  msgSize;	long  tagsCode;	long  storeTimestamp;	long  consumeQueueOffset;	String keys;	 boolean  success;	String uniqKey;	 int  sysFlag;	long  preparedTransactionOffset;	Map<String, String> propertiesMap;	 byte [] bitMap;	
1)转发到ConsumerQueue 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class  CommitLogDispatcherBuildConsumeQueue  implements  CommitLogDispatcher      @Override      public  void  dispatch (DispatchRequest request)           final  int  tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());         switch  (tranType) {             case  MessageSysFlag.TRANSACTION_NOT_TYPE:             case  MessageSysFlag.TRANSACTION_COMMIT_TYPE:                                  DefaultMessageStore.this .putMessagePositionInfo(request);                 break ;             case  MessageSysFlag.TRANSACTION_PREPARED_TYPE:             case  MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:                 break ;         }     } } 
代码:DefaultMessageStore#putMessagePositionInfo 
1 2 3 4 5 6 public  void  putMessagePositionInfo (DispatchRequest dispatchRequest)           ConsumeQueue cq = this .findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());          cq.putMessagePositionInfoWrapper(dispatchRequest); } 
代码:DefaultMessageStore#putMessagePositionInfo 
1 2 3 4 5 6 7 8 9 10 11 12 this .byteBufferIndex.flip();this .byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this .byteBufferIndex.putLong(offset);this .byteBufferIndex.putInt(size);this .byteBufferIndex.putLong(tagsCode);MappedFile mappedFile = this .mappedFileQueue.getLastMappedFile(expectLogicOffset); if  (mappedFile != null ) {         return  mappedFile.appendMessage(this .byteBufferIndex.array()); } 
2)转发到Index 1 2 3 4 5 6 7 8 9 class  CommitLogDispatcherBuildIndex  implements  CommitLogDispatcher      @Override      public  void  dispatch (DispatchRequest request)           if  (DefaultMessageStore.this .messageStoreConfig.isMessageIndexEnable()) {             DefaultMessageStore.this .indexService.buildIndex(request);         }     } } 
代码:DefaultMessageStore#buildIndex 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public  void  buildIndex (DispatchRequest req)           IndexFile indexFile = retryGetAndCreateIndexFile();     if  (indexFile != null ) {                  long  endPhyOffset = indexFile.getEndPhyOffset();         DispatchRequest msg = req;         String topic = msg.getTopic();         String keys = msg.getKeys();                  if  (msg.getCommitLogOffset() < endPhyOffset) {             return ;         }         final  int  tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());         switch  (tranType) {             case  MessageSysFlag.TRANSACTION_NOT_TYPE:             case  MessageSysFlag.TRANSACTION_PREPARED_TYPE:             case  MessageSysFlag.TRANSACTION_COMMIT_TYPE:                 break ;             case  MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:                 return ;         } 		                  if  (req.getUniqKey() != null ) {             indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));             if  (indexFile == null ) {                 return ;             }         } 		         if  (keys != null  && keys.length() > 0 ) {             String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);             for  (int  i = 0 ; i < keyset.length; i++) {                 String key = keyset[i];                 if  (key.length() > 0 ) {                     indexFile = putKey(indexFile, msg, buildKey(topic, key));                     if  (indexFile == null ) {                         return ;                     }                 }             }         }     } else  {         log.error("build index error, stop building index" );     } } 
消息队列和索引文件恢复 
RocketMQ首先将消息全量存储在CommitLog文件,然后异步生成转发任务更新ConsumerQueue和Index文件 
如果消息成功存储到CommitLog文件,转发任务未成功执行,且Broker由于某个愿意宕机,会导致CommitLog、ConsumerQueue、IndexFile文件数据不一致 
 
1)存储文件加载 代码:DefaultMessageStore#load 
判断上一次是否异常退出 
Broker在启动时创建abort文件,在退出时通过JVM钩子函数删除abort文件。如果下次启动时存在abort文件,说明Broker异常退出,CommitLog与ConsumerQueue数据有可能不一致,需要修复 
 
1 2 3 4 5 6 7 8 9 boolean  lastExitOK = !this .isTempFileExist();private  boolean  isTempFileExist ()      String fileName = StorePathConfigHelper         .getAbortFile(this .messageStoreConfig.getStorePathRootDir());     File file = new  File(fileName);     return  file.exists(); } 
代码:DefaultMessageStore#load 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 if  (null  != scheduleMessageService) {    result = result && this .scheduleMessageService.load(); } result = result && this .commitLog.load(); result = result && this .loadConsumeQueue(); if  (result) {	     this .storeCheckpoint =new  StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this .messageStoreConfig.getStorePathRootDir())); 	     this .indexService.load(lastExitOK); 	     this .recover(lastExitOK); } 
代码:MappedFileQueue#load 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 File dir = new  File(this .storePath); File[] files = dir.listFiles(); if  (files != null ) {         Arrays.sort(files);          for  (File file : files) { 		         if  (file.length() != this .mappedFileSize) {                          return  false ;         }         try  {                          MappedFile mappedFile = new  MappedFile(file.getPath(), mappedFileSize);             mappedFile.setWrotePosition(this .mappedFileSize);             mappedFile.setFlushedPosition(this .mappedFileSize);             mappedFile.setCommittedPosition(this .mappedFileSize);                          this .mappedFiles.add(mappedFile);             log.info("load "  + file.getPath() + " OK" );         } catch  (IOException e) {             log.error("load file "  + file + " error" , e);             return  false ;         }     } } return  true ;
代码:DefaultMessageStore#loadConsumeQueue 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 File dirLogic = new  File(StorePathConfigHelper.getStorePathConsumeQueue(this .messageStoreConfig.getStorePathRootDir())); File[] fileTopicList = dirLogic.listFiles(); if  (fileTopicList != null ) {    for  (File fileTopic : fileTopicList) {                  String topic = fileTopic.getName(); 		         File[] fileQueueIdList = fileTopic.listFiles();         if  (fileQueueIdList != null ) {                          for  (File fileQueueId : fileQueueIdList) {                                  int  queueId;                 try  {                     queueId = Integer.parseInt(fileQueueId.getName());                 } catch  (NumberFormatException e) {                     continue ;                 }                                  ConsumeQueue logic = new  ConsumeQueue(                     topic,                     queueId,                     StorePathConfigHelper.getStorePathConsumeQueue(this .messageStoreConfig.getStorePathRootDir()),             this .getMessageStoreConfig().getMapedFileSizeConsumeQueue(),                     this );                 this .putConsumeQueue(topic, queueId, logic);                 if  (!logic.load()) {                     return  false ;                 }             }         }     } } log.info("load logics queue all over, OK" ); return  true ;
代码:IndexService#load 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public  boolean  load (final  boolean  lastExitOK)           File dir = new  File(this .storePath);          File[] files = dir.listFiles();     if  (files != null ) {                  Arrays.sort(files);                  for  (File file : files) {             try  {                                  IndexFile f = new  IndexFile(file.getPath(), this .hashSlotNum, this .indexNum, 0 , 0 );                 f.load();                 if  (!lastExitOK) {                                          if  (f.getEndTimestamp() > this .defaultMessageStore.getStoreCheckpoint()                         .getIndexMsgTimestamp()) {                         f.destroy(0 );                         continue ;                     }                 } 				                 log.info("load index file OK, "  + f.getFileName());                 this .indexFileList.add(f);             } catch  (IOException e) {                 log.error("load file {} error" , file, e);                 return  false ;             } catch  (NumberFormatException e) {                 log.error("load file {} error" , file, e);             }         }     }     return  true ; } 
代码:DefaultMessageStore#recover 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private  void  recover (final  boolean  lastExitOK)           long  maxPhyOffsetOfConsumeQueue = this .recoverConsumeQueue();     if  (lastExitOK) {                  this .commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);     } else  {                  this .commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);     } 	     this .recoverTopicQueueTable(); } 
代码:DefaultMessageStore#recoverTopicQueueTable 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public  void  recoverTopicQueueTable ()      HashMap<String, Long> table = new  HashMap<String, Long>(1024 );          long  minPhyOffset = this .commitLog.getMinOffset();          for  (ConcurrentMap<Integer, ConsumeQueue> maps : this .consumeQueueTable.values()) {         for  (ConsumeQueue logic : maps.values()) {             String key = logic.getTopic() + "-"  + logic.getQueueId();             table.put(key, logic.getMaxOffsetInQueue());             logic.correctMinOffset(minPhyOffset);         }     }     this .commitLog.setTopicQueueTable(table); } 
2)正常恢复 代码:CommitLog#recoverNormally 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public  void  recoverNormally (long  maxPhyOffsetOfConsumeQueue)  	     final  List<MappedFile> mappedFiles = this .mappedFileQueue.getMappedFiles();     if  (!mappedFiles.isEmpty()) {                   int  index = mappedFiles.size() - 3 ;         if  (index < 0 )             index = 0 ;         MappedFile mappedFile = mappedFiles.get(index);         ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();         long  processOffset = mappedFile.getFileFromOffset();                  long  mappedFileOffset = 0 ;         while  (true ) {                          DispatchRequest dispatchRequest = this .checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);                          int  size = dispatchRequest.getMsgSize();            	             if  (dispatchRequest.isSuccess() && size > 0 ) {                 mappedFileOffset += size;             } 			             else  if  (dispatchRequest.isSuccess() && size == 0 ) {               index++;               if  (index >= mappedFiles.size()) {                                      break ;               } else  {                                      mappedFile = mappedFiles.get(index);                   byteBuffer = mappedFile.sliceByteBuffer();                   processOffset = mappedFile.getFileFromOffset();                   mappedFileOffset = 0 ;                              		}             }                          else  if  (!dispatchRequest.isSuccess()) {                 log.info("recover physics file end, "  + mappedFile.getFileName());                 break ;             }         } 		         processOffset += mappedFileOffset;         this .mappedFileQueue.setFlushedWhere(processOffset);         this .mappedFileQueue.setCommittedWhere(processOffset);                  this .mappedFileQueue.truncateDirtyFiles(processOffset);                  if  (maxPhyOffsetOfConsumeQueue >= processOffset) {             this .defaultMessageStore.truncateDirtyLogicFiles(processOffset);         }     } else  {         this .mappedFileQueue.setFlushedWhere(0 );         this .mappedFileQueue.setCommittedWhere(0 );         this .defaultMessageStore.destroyLogics();     } } 
代码:MappedFileQueue#truncateDirtyFiles 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public  void  truncateDirtyFiles (long  offset)      List<MappedFile> willRemoveFiles = new  ArrayList<MappedFile>(); 	     for  (MappedFile file : this .mappedFiles) {                  long  fileTailOffset = file.getFileFromOffset() + this .mappedFileSize;                  if  (fileTailOffset > offset) {                          if  (offset >= file.getFileFromOffset()) {                                  file.setWrotePosition((int ) (offset % this .mappedFileSize));                 file.setCommittedPosition((int ) (offset % this .mappedFileSize));                 file.setFlushedPosition((int ) (offset % this .mappedFileSize));             } else  {                                  file.destroy(1000 );                 willRemoveFiles.add(file);             }         }     }     this .deleteExpiredFile(willRemoveFiles); } 
3)异常恢复 
Broker异常停止文件恢复的实现为CommitLog#recoverAbnormally 
异常文件恢复步骤与正常停止文件恢复流程基本相同,差异在于:
正常停止默认从倒数第三个文件开始进行恢复,异常停止需要从最后一个文件往前找第一个消息存储正常的文件 
如果CommitLog目录没有消息文件,而消息消费队列目录下存在文件,则需要销毁 
 
 
 
代码:CommitLog#recoverAbnormally 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 if  (!mappedFiles.isEmpty()) {         int  index = mappedFiles.size() - 1 ;     MappedFile mappedFile = null ;     for  (; index >= 0 ; index--) {         mappedFile = mappedFiles.get(index);                  if  (this .isMappedFileMatchedRecover(mappedFile)) {             log.info("recover from this mapped file "  + mappedFile.getFileName());             break ;         }     } 	     if  (index < 0 ) {         index = 0 ;         mappedFile = mappedFiles.get(index);     }              }else {          this .mappedFileQueue.setFlushedWhere(0 );     this .mappedFileQueue.setCommittedWhere(0 );     this .defaultMessageStore.destroyLogics(); } 
刷盘机制 
RocketMQ的存储基于JDK NIO的内存映射机制(MappedByteBuffer),消息存储时首先将消息追加到内存,再根据配置的刷盘策略在不同时间刷写磁盘  
同步刷盘 
代码:CommitLog#handleDiskFlush 
1 2 3 4 5 6 7 8 9 10 11 12 final  GroupCommitService service = (GroupCommitService) this .flushCommitLogService;if  (messageExt.isWaitStoreMsgOK()) {         GroupCommitRequest request = new  GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());          service.putRequest(request);          boolean  flushOK = request.waitForFlush(this .defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());     if  (!flushOK) {         putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);     } 
GroupCommitRequest 
1 2 3 long  nextOffset;	CountDownLatch countDownLatch = new  CountDownLatch(1 );	 volatile  boolean  flushOK = false ;	
代码:GroupCommitService#run 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public  void  run ()      CommitLog.log.info(this .getServiceName() + " service started" );     while  (!this .isStopped()) {         try  {                          this .waitForRunning(10 );                          this .doCommit();         } catch  (Exception e) {             CommitLog.log.warn(this .getServiceName() + " service has exception. " , e);         }     } 	... } 
代码:GroupCommitService#doCommit 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private  void  doCommit ()           synchronized  (this .requestsRead) {         if  (!this .requestsRead.isEmpty()) {                          for  (GroupCommitRequest req : this .requestsRead) {                                                   boolean  flushOK = false ;                 for  (int  i = 0 ; i < 2  && !flushOK; i++) {                     flushOK = CommitLog.this .mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); 					                     if  (!flushOK) {                         CommitLog.this .mappedFileQueue.flush(0 );                     }                 } 				                 req.wakeupCustomer(flushOK);             } 			                          long  storeTimestamp = CommitLog.this .mappedFileQueue.getStoreTimestamp();             if  (storeTimestamp > 0 ) {               CommitLog.this .defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);             } 			             this .requestsRead.clear();         } else  {                                       CommitLog.this .mappedFileQueue.flush(0 );         }     } } 
异步刷盘 
消息追加到内存后,立即响应 
如果开启transientStorePoolEnable,RocketMQ单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,使用内存锁定确保其不会被置换到虚拟内存。消息追加到堆外内存,然后提交到物理文件的内存映射中,刷写磁盘
将消息直接追加到ByteBuffer(堆外内存) 
CommitRealTimeService线程每隔200ms将ByteBuffer新追加内容提交到MappedByteBuffer 
MappedByteBuffer在内存中追加提交的内容,wrotePosition指针向后移动 
commit操作成功返回,将committedPosition位置恢复 
FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存刷写到磁盘 
 
 
 
如果未开启transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,刷写到磁盘 
 
代码:CommitLog$CommitRealTimeService#run 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 int  interval = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();int  commitDataLeastPages = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();int  commitDataThoroughInterval =CommitLog.this .defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long  begin = System.currentTimeMillis();if  (begin >= (this .lastCommitTimestamp + commitDataThoroughInterval)) {    this .lastCommitTimestamp = begin;     commitDataLeastPages = 0 ; } boolean  result = CommitLog.this .mappedFileQueue.commit(commitDataLeastPages);long  end = System.currentTimeMillis();if  (!result) {    this .lastCommitTimestamp = end;                flushCommitLogService.wakeup(); } if  (end - begin > 500 ) {    log.info("Commit data to file costs {} ms" , end - begin); } this .waitForRunning(interval);
代码:CommitLog$FlushRealTimeService#run 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 boolean  flushCommitLogTimed = CommitLog.this .defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();int  interval = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();int  flushPhysicQueueLeastPages = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();int  flushPhysicQueueThoroughInterval =CommitLog.this .defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); ... long  currentTimeMillis = System.currentTimeMillis();if  (currentTimeMillis >= (this .lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {    this .lastFlushTimestamp = currentTimeMillis;     flushPhysicQueueLeastPages = 0 ;     printFlushProgress = (printTimes++ % 10 ) == 0 ; } ... if  (flushCommitLogTimed) {    Thread.sleep(interval); } else  {     this .waitForRunning(interval); } ... long  begin = System.currentTimeMillis();CommitLog.this .mappedFileQueue.flush(flushPhysicQueueLeastPages); long  storeTimestamp = CommitLog.this .mappedFileQueue.getStoreTimestamp();if  (storeTimestamp > 0 ) {CommitLog.this .defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); 
过期文件删除机制 
RocketMQ操作CommitLog、ConsumerQueue文件基于内存映射机制,并在启动的时候加载CommitLog、ConsumerQueue目录下的所有文件 
为避免内存与磁盘的浪费,引入一种机制删除已过期的文件 
RocketMQ顺序写CommitLog、ConsumerQueue文件,写操作全部落在最后一个CommitLog或者ConsumerQueue文件,之前的文件在下一个文件创建后将不会再被更新 
如果当前文件在一定时间间隔内没有再次被消费,则认为是过期文件,可以被删除 
RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时(通过在Broker配置文件中设置fileReservedTime,单位为小时) 
 
代码:DefaultMessageStore#addScheduleTask 
1 2 3 4 5 6 7 8 9 10 private  void  addScheduleTask ()  	     this .scheduledExecutorService.scheduleAtFixedRate(new  Runnable() {         @Override          public  void  run ()               DefaultMessageStore.this .cleanFilesPeriodically();         }     }, 1000  * 60 , this .messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); 	... } 
代码:DefaultMessageStore#cleanFilesPeriodically 
1 2 3 4 5 6 private  void  cleanFilesPeriodically ()           this .cleanCommitLogService.run();          this .cleanConsumeQueueService.run(); } 
代码:DefaultMessageStore#deleteExpiredFiles 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private  void  deleteExpiredFiles ()           int  deleteCount = 0 ;          long  fileReservedTime = DefaultMessageStore.this .getMessageStoreConfig().getFileReservedTime();          int  deletePhysicFilesInterval = DefaultMessageStore.this .getMessageStoreConfig().getDeleteCommitLogFilesInterval();          int  destroyMapedFileIntervalForcibly = DefaultMessageStore.this .getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); boolean  timeup = this .isTimeToDelete();boolean  spacefull = this .isSpaceToDelete();boolean  manualDelete = this .manualDeleteFileSeveralTimes > 0 ;if  (timeup || spacefull || manualDelete) {	...执行删除逻辑 }else {     ...无作为 } 
代码:CleanCommitLogService#isSpaceToDelete 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private  boolean  isSpaceToDelete ()           double  ratio = DefaultMessageStore.this .getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0 ; 	     cleanImmediately = false ;     {         String storePathPhysic = DefaultMessageStore.this .getMessageStoreConfig().getStorePathCommitLog();                  double  physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);                  if  (physicRatio > diskSpaceWarningLevelRatio) {             boolean  diskok = DefaultMessageStore.this .runningFlags.getAndMakeDiskFull();             if  (diskok) {                 DefaultMessageStore.log.error("physic disk maybe full soon "  + physicRatio + ", so mark disk full" );             } 			             cleanImmediately = true ;         } else  if  (physicRatio > diskSpaceCleanForciblyRatio) {             cleanImmediately = true ;         } else  {             boolean  diskok = DefaultMessageStore.this .runningFlags.getAndMakeDiskOK();             if  (!diskok) {             DefaultMessageStore.log.info("physic disk space OK "  + physicRatio + ", so mark disk ok" );         }     }     if  (physicRatio < 0  || physicRatio > ratio) {         DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "  + physicRatio);         return  true ;     } } 
代码:MappedFileQueue#deleteExpiredFileByTime 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 for  (int  i = 0 ; i < mfsLength; i++) {         MappedFile mappedFile = (MappedFile) mfs[i];          long  liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;          if  (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {         if  (mappedFile.destroy(intervalForcibly)) {             files.add(mappedFile);             deleteCount++;             if  (files.size() >= DELETE_FILES_BATCH_MAX) {                 break ;             }             if  (deleteFilesInterval > 0  && (i + 1 ) < mfsLength) {                 try  {                     Thread.sleep(deleteFilesInterval);                 } catch  (InterruptedException e) {                 }             }         } else  {             break ;         }     } else  {                  break ;     } } 
小结 
RocketMQ的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、Hash索引文件(IndexFile)、监测点文件(checkPoint)、abort(关闭异常文件)
单个消息存储文件、消息消费队列文件、Hash索引文件长度固定,以便使用内存映射机制进行文件的读写操作
RocketMQ组织文件以文件的起始偏移量来命名文件
RocketMQ基于内存映射文件机制提供同步刷盘和异步刷盘,后者先追加消息到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘
RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息(Commitlog),保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为了方便消费,构建了消息消费队列文件,基于主题与队列进行组织。同时RocketMQ为消息实现了Hash索引,可以为消息设置索引键,快速从CommitLog文件中检索消息
当消息达到CommitLog后,通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件与索引文件
为了安全,RocketMQ引入abort文件,记录Broker的停机是否是正常。重启Broker时为了保证CommitLog文件、ConsumerQueue文件与Hash索引文件的正确性,分别采用不同策略来恢复文件
删除策略:启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费
 
Consumer 消息消费概述 
消息推送模式 
消息消费重要方法 
1 2 3 4 5 6 7 8 void  sendMessageBack (final  MessageExt msg, final  int  delayLevel, final  String brokerName) :发送消息确认Set<MessageQueue> fetchSubscribeMessageQueues (final  String topic)  :获取消费者对主题分配了那些消息队列 void  registerMessageListener (final  MessageListenerConcurrently messageListener) :注册并发事件监听器void  registerMessageListener (final  MessageListenerOrderly messageListener) :注册顺序消息事件监听器void  subscribe (final  String topic, final  String subExpression) :基于主题订阅消息,消息过滤使用表达式void  subscribe (final  String topic, final  String fullClassName,final  String filterClassSource) :基于主题订阅消息,消息过滤使用类模式void  subscribe (final  String topic, final  MessageSelector selector)  :订阅消息,并指定队列选择器void  unsubscribe (final  String topic) :取消消息订阅
DefaultMQPushConsumer 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private  String consumerGroup;	private  MessageModel messageModel = MessageModel.CLUSTERING;	private  ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;private  AllocateMessageQueueStrategy allocateMessageQueueStrategy;private  Map<String , String > subscription = new  HashMap<String, String>();private  MessageListener messageListener;private  OffsetStore offsetStore;private  int  consumeThreadMin = 20 ;private  int  consumeThreadMax = 20 ;private  int  consumeConcurrentlyMaxSpan = 2000 ;private  int  pullThresholdForQueue = 1000 ;private  long  pullInterval = 0 ;private  int  pullBatchSize = 32 ;private  int  consumeMessageBatchMaxSize = 1 ;private  boolean  postSubscriptionWhenPull = false ;private  int  maxReconsumeTimes = -1 ;private  long  consumeTimeout = 15 ;
消费者启动流程 代码:DefaultMQPushConsumerImpl#start 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 public  synchronized  void  start ()  throws  MQClientException     switch  (this .serviceState) {         case  CREATE_JUST:                              this .defaultMQPushConsumer.getMessageModel(), this .defaultMQPushConsumer.isUnitMode());             this .serviceState = ServiceState.START_FAILED; 			             this .checkConfig(); 			             this .copySubscription(); 			             if  (this .defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {                 this .defaultMQPushConsumer.changeInstanceNameToPID();             } 			             this .mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this .defaultMQPushConsumer, this .rpcHook); 			             this .rebalanceImpl.setConsumerGroup(this .defaultMQPushConsumer.getConsumerGroup());             this .rebalanceImpl.setMessageModel(this .defaultMQPushConsumer.getMessageModel());             this .rebalanceImpl.setAllocateMessageQueueStrategy(this .defaultMQPushConsumer.getAllocateMessageQueueStrategy());             this .rebalanceImpl.setmQClientFactory(this .mQClientFactor             this .pullAPIWrapper = new  PullAPIWrapper(                 mQClientFactory,                 this .defaultMQPushConsumer.getConsumerGroup(), isUnitMode());             this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookLis             if  (this .defaultMQPushConsumer.getOffsetStore() != null ) {                 this .offsetStore = this .defaultMQPushConsumer.getOffsetStore();             } else  {            		switch  (this .defaultMQPushConsumer.getMessageModel()) {                            	    case  BROADCASTING:	             	        this .offsetStore = new  LocalFileOffsetStore(this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup());            	            break ;            	        case  CLUSTERING:	            	            this .offsetStore = new  RemoteBrokerOffsetStore(this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup());            	            break ;            	        default :            	            break ;            	    }            	    this .defaultMQPushConsumer.setOffsetStore(this .offsetStore);            	}             this .offsetStore.load                          if  (this .getMessageListenerInner() instanceof  MessageListenerOrderly) {                 this .consumeOrderly = true ;                 this .consumeMessageService =                     new  ConsumeMessageOrderlyService(this , (MessageListenerOrderly) this .getMessageListenerInner());                              } else  if  (this .getMessageListenerInner() instanceof  MessageListenerConcurrently) {                 this .consumeOrderly = false ;                 this .consumeMessageService =                     new  ConsumeMessageConcurrentlyService(this , (MessageListenerConcurrently) this .getMessageListenerInner());             }                          this .consumeMessageService.start();                          boolean  registerOK = mQClientFactory.registerConsumer(this .defaultMQPushConsumer.getConsumerGroup(), this );                          if  (!registerOK) {                 this .serviceState = ServiceState.CREATE_JUST;                 this .consumeMessageService.shutdown();                 throw  new  MQClientException("The consumer group["  + this .defaultMQPushConsumer.getConsumerGroup()                     + "] has been created before, specify another name please."  + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),                     null );                          mQClientFactory.start();             log.info("the consumer [{}] start OK." , this .defaultMQPushConsumer.getConsumerGroup());             this .serviceState = ServiceState.RUNNING;             break ;             case  RUNNING:             case  START_FAILED:         case  SHUTDOWN_ALREADY:             throw  new  MQClientException("The PushConsumer service state not OK, maybe started once, "                  + this .serviceState                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),                 null );         default :             break ;     }     this .updateTopicSubscribeInfoWhenSubscriptionChanged();     this .mQClientFactory.checkClientInBroker();     this .mQClientFactory.sendHeartbeatToAllBrokerWithLock();     this .mQClientFactory.rebalanceImmediately(); } 
消息拉取 1)PullMessageService实现机制 
RocketMQ使用一个单独的线程PullMessageService,负责消息的拉取 
 
代码:PullMessageService#run 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public  void  run ()      log.info(this .getServiceName() + " service started" ); 	     while  (!this .isStopped()) {         try  {                          PullRequest pullRequest = this .pullRequestQueue.take();                          this .pullMessage(pullRequest);         } catch  (InterruptedException ignored) {         } catch  (Exception e) {             log.error("Pull Message Service Run Method exception" , e);         }     }     log.info(this .getServiceName() + " service end" ); } 
PullRequest 
1 2 3 4 5 private  String consumerGroup;	private  MessageQueue messageQueue;	private  ProcessQueue processQueue;	private  long  nextOffset;	private  boolean  lockedFirst = false ;	
代码:PullMessageService#pullMessage 
1 2 3 4 5 6 7 8 9 10 11 12 private  void  pullMessage (final  PullRequest pullRequest)           final  MQConsumerInner consumer = this .mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());     if  (consumer != null ) {                  DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;                  impl.pullMessage(pullRequest);     } else  {         log.warn("No matched consumer for the PullRequest {}, drop it" , pullRequest);     } } 
2)ProcessQueue实现机制 
ProcessQueue是MessageQueue在消费端的重现、快照 
从Broker默认每次拉取32条消息,按照消息的队列偏移量顺序存放在ProcessQueue,然后将消息提交到Consumer消费线程池,消息成功消费后从ProcessQueue中移除 
 
属性 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private  final  TreeMap<Long, MessageExt> msgTreeMap = new  TreeMap<Long, MessageExt>();private  final  ReadWriteLock lockTreeMap = new  ReentrantReadWriteLock();private  final  AtomicLong msgCount = new  AtomicLong();private  volatile  long  queueOffsetMax = 0L ;private  volatile  boolean  dropped = false ;private  volatile  long  lastPullTimestamp = System.currentTimeMillis();private  volatile  long  lastConsumeTimestamp = System.currentTimeMillis();
方法 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public  void  cleanExpiredMsg (DefaultMQPushConsumer pushConsumer) public  boolean  putMessage (final  List<MessageExt> msgs) public  long  getMaxSpan () public  long  removeMessage (final  List<MessageExt> msgs) public  void  rollback ()  public  long  commit () public  void  makeMessageToCosumeAgain (List<MessageExt> msgs)  public  List<MessageExt> takeMessags (final  int  batchSize) 
3)消息拉取基本流程 1.客户端发起拉取请求 代码:DefaultMQPushConsumerImpl#pullMessage 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public  void  pullMessage (final  PullRequest pullRequest)           final  ProcessQueue processQueue = pullRequest.getProcessQueue();          if  (processQueue.isDropped()) {         log.info("the pull request[{}] is dropped." , pullRequest.toString());         return ;     } 	     pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());     try  {         this .makeSureStateOK();     } catch  (MQClientException e) {         log.warn("pullMessage exception, consumer state not ok" , e);         this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);         return ;     } 	     if  (this .isPause()) {         log.warn("consumer was paused, execute pull request later. instanceName={}, group={}" , this .defaultMQPushConsumer.getInstanceName(), this .defaultMQPushConsumer.getConsumerGroup());         this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);         return ;     } 	 	long  cachedMessageCount = processQueue.getMsgCount().get();      	long  cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024  * 1024 ); 	 	if  (cachedMessageCount > this .defaultMQPushConsumer.getPullThresholdForQueue()) { 	    this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 	    if  ((queueFlowControlTimes++ % 1000 ) == 0 ) { 	        log.warn( 	            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}" , 	            this .defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); 	    } 	    return ; 	} 	 	if  (cachedMessageSizeInMiB > this .defaultMQPushConsumer.getPullThresholdSizeForQueue()) { 	    this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 	    if  ((queueFlowControlTimes++ % 1000 ) == 0 ) { 	        log.warn( 	            "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}" , 	            this .defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); 	    } 	    return ;     }     	 		 final  SubscriptionData subscriptionData = this .rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());     	if  (null  == subscriptionData) {     	    this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);     	    log.warn("find the consumer's subscription failed, {}" , pullRequest);     	    return ; 		 	    this .pullAPIWrapper.pullKernelImpl( 	    pullRequest.getMessageQueue(), 	    subExpression, 	    subscriptionData.getExpressionType(), 	    subscriptionData.getSubVersion(), 	    pullRequest.getNextOffset(), 	    this .defaultMQPushConsumer.getPullBatchSize(), 	    sysFlag, 	    commitOffsetValue, 	    BROKER_SUSPEND_MAX_TIME_MILLIS, 	    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, 	    CommunicationMode.ASYNC, 	    pullCallback 	);              } 
2.消息服务端Broker组装消息 代码:PullMessageProcessor#processRequest 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 MessageFilter messageFilter; if  (this .brokerController.getBrokerConfig().isFilterSupportRetry()) {    messageFilter = new  ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,         this .brokerController.getConsumerFilterManager()); } else  {     messageFilter = new  ExpressionMessageFilter(subscriptionData, consumerFilterData,         this .brokerController.getConsumerFilterManager()); } final  GetMessageResult getMessageResult =    this .brokerController.getMessageStore().getMessage(     				requestHeader.getConsumerGroup(),      				requestHeader.getTopic(),	         			requestHeader.getQueueId(),      				requestHeader.getQueueOffset(), 	     				requestHeader.getMaxMsgNums(), 	     				messageFilter	     		); 
代码:DefaultMessageStore#getMessage 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; long  nextBeginOffset = offset;	long  minOffset = 0 ;		long  maxOffset = 0 ;		GetMessageResult getResult = new  GetMessageResult(); final  long  maxOffsetPy = this .commitLog.getMaxOffset();	ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); ... minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); if  (maxOffset == 0 ) {	    status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;     nextBeginOffset = nextOffsetCorrection(offset, 0 ); } else  if  (offset < minOffset) {	     status = GetMessageStatus.OFFSET_TOO_SMALL;     nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else  if  (offset == maxOffset) {	     status = GetMessageStatus.OFFSET_OVERFLOW_ONE;     nextBeginOffset = nextOffsetCorrection(offset, offset); } else  if  (offset > maxOffset) {	     status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;     if  (0  == minOffset) {         nextBeginOffset = nextOffsetCorrection(offset, minOffset);     } else  {         nextBeginOffset = nextOffsetCorrection(offset, maxOffset);     } } ... SelectMappedBufferResult selectResult = this .commitLog.getMessage(offsetPy, sizePy); 
代码:PullMessageProcessor#processRequest 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 response.setRemark(getMessageResult.getStatus().name()); responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); switch  (this .brokerController.getMessageStoreConfig().getBrokerRole()) {    case  ASYNC_MASTER:     case  SYNC_MASTER:         break ;     case  SLAVE:         if  (!this .brokerController.getBrokerConfig().isSlaveReadEnable()) {             response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);             responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);         }         break ; } ... switch  (getMessageResult.getStatus()) {    case  FOUND:			         response.setCode(ResponseCode.SUCCESS);         break ;     case  MESSAGE_WAS_REMOVING:	         response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);	         break ;     case  NO_MATCHED_LOGIC_QUEUE:	     case  NO_MESSAGE_IN_QUEUE:	         if  (0  != requestHeader.getQueueOffset()) {             response.setCode(ResponseCode.PULL_OFFSET_MOVED);             requestHeader.getQueueOffset(),             getMessageResult.getNextBeginOffset(),             requestHeader.getTopic(),             requestHeader.getQueueId(),             requestHeader.getConsumerGroup()             );         } else  {             response.setCode(ResponseCode.PULL_NOT_FOUND);         }         break ;     case  NO_MATCHED_MESSAGE:	         response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);         break ;     case  OFFSET_FOUND_NULL:	         response.setCode(ResponseCode.PULL_NOT_FOUND);         break ;     case  OFFSET_OVERFLOW_BADLY:	         response.setCode(ResponseCode.PULL_OFFSET_MOVED);                  log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}" ,                 requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());         break ;     case  OFFSET_OVERFLOW_ONE:	         response.setCode(ResponseCode.PULL_NOT_FOUND);         break ;     case  OFFSET_TOO_SMALL:	         response.setCode(ResponseCode.PULL_OFFSET_MOVED);         requestHeader.getConsumerGroup(),          requestHeader.getTopic(),          requestHeader.getQueueOffset(),         getMessageResult.getMinOffset(), channel.remoteAddress());         break ;     default :         assert  false ;         break ; } ... boolean  storeOffsetEnable = brokerAllowSuspend;storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; storeOffsetEnable = storeOffsetEnable     && this .brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; if  (storeOffsetEnable) {    this .brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),         requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); } 
3.消息拉取客户端处理消息 代码:MQClientAPIImpl#processPullResponse 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private  PullResult processPullResponse (     final  RemotingCommand response)  throws  MQBrokerException, RemotingCommandException     PullStatus pullStatus = PullStatus.NO_NEW_MSG;    	     switch  (response.getCode()) {         case  ResponseCode.SUCCESS:             pullStatus = PullStatus.FOUND;             break ;         case  ResponseCode.PULL_NOT_FOUND:             pullStatus = PullStatus.NO_NEW_MSG;             break ;         case  ResponseCode.PULL_RETRY_IMMEDIATELY:             pullStatus = PullStatus.NO_MATCHED_MSG;             break ;         case  ResponseCode.PULL_OFFSET_MOVED:             pullStatus = PullStatus.OFFSET_ILLEGAL;             break ;         default :             throw  new  MQBrokerException(response.getCode(), response.getRemark());     } 	     PullMessageResponseHeader responseHeader =         (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); 	     return  new  PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),         responseHeader.getMaxOffset(), null , responseHeader.getSuggestWhichBrokerId(), response.getBody()); } 
PullResult类 
1 2 3 4 5 private  final  PullStatus pullStatus;	private  final  long  nextBeginOffset;	private  final  long  minOffset;	private  final  long  maxOffset;	private  List<MessageExt> msgFoundList;	
代码:DefaultMQPushConsumerImpl$PullCallback#OnSuccess 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 boolean  dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());DefaultMQPushConsumerImpl.this .consumeMessageService.submitConsumeRequest(     pullResult.getMsgFoundList(),     processQueue,     pullRequest.getMessageQueue(),     dispatchToConsume); if  (DefaultMQPushConsumerImpl.this .defaultMQPushConsumer.getPullInterval() > 0 ) {    DefaultMQPushConsumerImpl.this .executePullRequestLater(pullRequest,         DefaultMQPushConsumerImpl.this .defaultMQPushConsumer.getPullInterval()); } else  {     DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); } 
4.消息拉取总结 4)消息拉取长轮询机制分析 
RocketMQ的Push模式是循环向消息服务端发起消息拉取请求 
如果不启用长轮询机制,拉取未到达消费队列的消息时,会等待shortPollingTimeMills时间后(挂起)再去判断消息是否已经到达指定消息队列,如果消息仍未到达则提示拉取Consumer PULL—NOT—FOUND(消息不存在) 
如果开启长轮询模式,Broker会每隔5s轮询检查一次消息是否可达,有消息达到后立马通知挂起线程再次验证消息是否是自己感兴趣的消息
是则从CommitLog文件提取消息返回给Consumer 
否则挂起直到超时,才给Consumer响应。超时时间由Consumer封装在请求参数中,PUSH模式为15s,PULL模式通过DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis设置 
 
 
RocketMQ通过在Broker客户端配置longPollingEnable为true开启长轮询模式 
 
代码:PullMessageProcessor#processRequest 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 case  ResponseCode.PULL_NOT_FOUND:    if  (brokerAllowSuspend && hasSuspendFlag) {         long  pollingTimeMills = suspendTimeoutMillisLong;         if  (!this .brokerController.getBrokerConfig().isLongPollingEnable()) {             pollingTimeMills = this .brokerController.getBrokerConfig().getShortPollingTimeMills();         }         String topic = requestHeader.getTopic();         long  offset = requestHeader.getQueueOffset();         int  queueId = requestHeader.getQueueId();                  PullRequest pullRequest = new  PullRequest(request, channel, pollingTimeMills,             this .brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);                  this .brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);         response = null ;         break ;     } 
PullRequestHoldService方式实现长轮询 
代码:PullRequestHoldService#suspendPullRequest 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public  void  suspendPullRequest (final  String topic, final  int  queueId, final  PullRequest pullRequest)      String key = this .buildKey(topic, queueId);     ManyPullRequest mpr = this .pullRequestTable.get(key);     if  (null  == mpr) {         mpr = new  ManyPullRequest();         ManyPullRequest prev = this .pullRequestTable.putIfAbsent(key, mpr);         if  (prev != null ) {             mpr = prev;         }     }     mpr.addPullRequest(pullRequest); } 
代码:PullRequestHoldService#run 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public  void  run ()      log.info("{} service started" , this .getServiceName());     while  (!this .isStopped()) {         try  {                          if  (this .brokerController.getBrokerConfig().isLongPollingEnable()) {                 this .waitForRunning(5  * 1000 );             } else  {                                this .waitForRunning(this .brokerController.getBrokerConfig().getShortPollingTimeMills());             }             long  beginLockTimestamp = this .systemClock.now();             this .checkHoldRequest();             long  costTime = this .systemClock.now() - beginLockTimestamp;             if  (costTime > 5  * 1000 ) {                 log.info("[NOTIFYME] check hold request cost {} ms." , costTime);             }         } catch  (Throwable e) {             log.warn(this .getServiceName() + " service has exception. " , e);         }     }     log.info("{} service end" , this .getServiceName()); } 
代码:PullRequestHoldService#checkHoldRequest 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private  void  checkHoldRequest ()      for  (String key : this .pullRequestTable.keySet()) {         String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);         if  (2  == kArray.length) {             String topic = kArray[0 ];             int  queueId = Integer.parseInt(kArray[1 ]);                          final  long  offset = this .brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);             try  {                                  this .notifyMessageArriving(topic, queueId, offset);             } catch  (Throwable e) {                 log.error("check hold request failed. topic={}, queueId={}" , topic, queueId, e);             }         }     } } 
代码:PullRequestHoldService#notifyMessageArriving 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 if  (newestOffset > request.getPullFromThisOffset()) {    boolean  match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,         new  ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));          if  (match && properties != null ) {         match = request.getMessageFilter().isMatchedByCommitLog(null , properties);     }     if  (match) {         try  {             this .brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),                 request.getRequestCommand());         } catch  (Throwable e) {             log.error("execute request when wakeup failed." , e);         }         continue ;     } } if  (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {    try  {         this .brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),             request.getRequestCommand());     } catch  (Throwable e) {         log.error("execute request when wakeup failed." , e);     }     continue ; } 
**DefaultMessageStore$ReputMessageService机制 **(无长轮询)
代码:DefaultMessageStore#start 
1 2 3 this .reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);this .reputMessageService.start();
代码:DefaultMessageStore$ReputMessageService#run 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public  void  run ()      DefaultMessageStore.log.info(this .getServiceName() + " service started" );     while  (!this .isStopped()) {         try  {             Thread.sleep(1 );                          this .doReput();         } catch  (Exception e) {             DefaultMessageStore.log.warn(this .getServiceName() + " service has exception. " , e);         }     }     DefaultMessageStore.log.info(this .getServiceName() + " service end" ); } 
代码:DefaultMessageStore$ReputMessageService#deReput 
1 2 3 4 5 6 7 8 if  (BrokerRole.SLAVE != DefaultMessageStore.this .getMessageStoreConfig().getBrokerRole()    && DefaultMessageStore.this .brokerConfig.isLongPollingEnable()) {     DefaultMessageStore.this .messageArrivingListener.arriving(dispatchRequest.getTopic(),         dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1 ,         dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),         dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } 
代码:NotifyMessageArrivingListener#arriving 
1 2 3 4 5 public  void  arriving (String topic, int  queueId, long  logicOffset, long  tagsCode,     long  msgStoreTime, byte [] filterBitMap, Map<String, String> properties)      this .pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,         msgStoreTime, filterBitMap, properties); } 
消息队列负载与重新分布机制 
RocketMQ消息队列重新分配由RebalanceService线程实现 
一个MQClientInstance持有一个RebalanceService,随着MQClientInstance的启动而启动 
 
代码:RebalanceService#run 
1 2 3 4 5 6 7 8 9 10 public  void  run ()      log.info(this .getServiceName() + " service started" ); 	     while  (!this .isStopped()) {         this .waitForRunning(waitInterval);         this .mqClientFactory.doRebalance();     }     log.info(this .getServiceName() + " service end" ); } 
代码:MQClientInstance#doRebalance 
1 2 3 4 5 6 7 8 9 10 11 12 13 public  void  doRebalance ()           for  (Map.Entry<String, MQConsumerInner> entry : this .consumerTable.entrySet()) {         MQConsumerInner impl = entry.getValue();         if  (impl != null ) {             try  {                 impl.doRebalance();             } catch  (Throwable e) {                 log.error("doRebalance exception" , e);             }         }     } } 
代码:RebalanceImpl#doRebalance 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public  void  doRebalance (final  boolean  isOrder)      Map<String, SubscriptionData> subTable = this .getSubscriptionInner();     if  (subTable != null ) {         for  (final  Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {             final  String topic = entry.getKey();             try  {                 this .rebalanceByTopic(topic, isOrder);             } catch  (Throwable e) {                 if  (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                     log.warn("rebalanceByTopic Exception" , e);                 }             }         }     }     this .truncateMessageQueueNotMyTopic(); } 
代码:RebalanceImpl#rebalanceByTopic 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 Set<MessageQueue> mqSet = this .topicSubscribeInfoTable.get(topic); List<String> cidAll = this .mQClientFactory.findConsumerIdList(topic, consumerGroup); if  (mqSet != null  && cidAll != null ) {    List<MessageQueue> mqAll = new  ArrayList<MessageQueue>();     mqAll.addAll(mqSet);     Collections.sort(mqAll);     Collections.sort(cidAll);     AllocateMessageQueueStrategy strategy = this .allocateMessageQueueStrategy;     List<MessageQueue> allocateResult = null ;     try  {         allocateResult = strategy.allocate(             this .consumerGroup,             this .mQClientFactory.getClientId(),             mqAll,             cidAll);     } catch  (Throwable e) {         log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}" , strategy.getName(),             e);         return ;     } 
RocketMQ默认提供5种负载均衡分配算法——但是,一个Consumer可以分配到多个队列,但同一个消息队列只会分配给一个Consumer,如果Consumer个数大于消息队列数量,则有些Consumer无法消费消息 
 
1 2 3 4 5 6 7 8 9 10 11 12 AllocateMessageQueueAveragely:平均分配 举例:8 个队列q1,q2,q3,q4,q5,a6,q7,q8,消费者3 个:c1,c2,c3 分配如下: c1:q1,q2,q3 c2:q4,q5,a6 c3:q7,q8 AllocateMessageQueueAveragelyByCircle:平均轮询分配 举例:8 个队列q1,q2,q3,q4,q5,a6,q7,q8,消费者3 个:c1,c2,c3 分配如下: c1:q1,q4,q7 c2:q2,q5,a8 c3:q3,q6 
消息消费过程 
PullMessageService负责拉取消息队列,从远端Broker拉取消息后将消息存储ProcessQueue(消息队列处理队列) 
调用ConsumeMessageService#submitConsumeRequest进行消息消费——使用线程池消费消息,确保消息拉取与消息消费的解耦 
ConsumeMessageService支持顺序消息和并发消息,核心类图如下: 
 
并发消息消费 
代码:ConsumeMessageConcurrentlyService#submitConsumeRequest 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 final  int  consumeBatchSize = this .defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if  (msgs.size() <= consumeBatchSize) {    ConsumeRequest consumeRequest = new  ConsumeRequest(msgs, processQueue, messageQueue);     try  {         this .consumeExecutor.submit(consumeRequest);     } catch  (RejectedExecutionException e) {         this .submitConsumeRequestLater(consumeRequest);     } }else {	        for  (int  total = 0 ; total < msgs.size(); ) {    		    List<MessageExt> msgThis = new  ArrayList<MessageExt>(consumeBatchSize);    		    for  (int  i = 0 ; i < consumeBatchSize; i++, total++) {    		        if  (total < msgs.size()) {    		            msgThis.add(msgs.get(total));    		        } else  {    		            break ;    		        }    		    		    ConsumeRequest consumeRequest = new  ConsumeRequest(msgThis, processQueue, messageQueue);    		    try  {    		        this .consumeExecutor.submit(consumeRequest);    		    } catch  (RejectedExecutionException e) {    		        for  (; total < msgs.size(); total++) {    		            msgThis.add(msgs.get(total));    		     		        this .submitConsumeRequestLater(consumeRequest);    		    }    		} } 
代码:ConsumeMessageConcurrentlyService$ConsumeRequest#run 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 if  (this .processQueue.isDropped()) {    log.info("the message queue not be able to consume, because it's dropped. group={} {}" , ConsumeMessageConcurrentlyService.this .consumerGroup, this .messageQueue);     return ; } ... if  (ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.hasHook()) {    consumeMessageContext = new  ConsumeMessageContext();     consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());     consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());     consumeMessageContext.setProps(new  HashMap<String, String>());     consumeMessageContext.setMq(messageQueue);     consumeMessageContext.setMsgList(msgs);     consumeMessageContext.setSuccess(false );     ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } ... status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); if  (ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.hasHook()) {    consumeMessageContext.setStatus(status.toString());     consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);     ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } 
定时消息机制 
消息发送到Broker后,不立即被Consumer消费,等到特定的时间后才能被消费 
RocketMQ不支持任意的时间精度——要支持任意时间精度定时调度,需要在Broker层做消息排序,再加上持久化,将带来巨大的性能消耗 
消息延迟级别在Broker端通过messageDelayLevel配置,默认为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,delayLevel=1表示延迟消息1s 
定时消息实现类为ScheduleMessageService,该类在DefaultMessageStore中创建 
通过在DefaultMessageStore中调用load方法加载该类并调用start方法启动 
 
代码:ScheduleMessageService#load 
1 2 3 4 5 6 public  boolean  load ()      boolean  result = super .load();     result = result && this .parseDelayLevel();     return  result; } 
代码:ScheduleMessageService#start 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 for  (Map.Entry<Integer, Long> entry : this .delayLevelTable.entrySet()) {    Integer level = entry.getKey();     Long timeDelay = entry.getValue();     Long offset = this .offsetTable.get(level);     if  (null  == offset) {         offset = 0L ;     }     if  (timeDelay != null ) {         this .timer.schedule(new  DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);     } } this .timer.scheduleAtFixedRate(new  TimerTask() {    @Override      public  void  run ()           try  {             if  (started.get()) ScheduleMessageService.this .persist();         } catch  (Throwable e) {             log.error("scheduleAtFixedRate flush exception" , e);         }     } }, 10000 , this .defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); 
调度机制 
ScheduleMessageService的start方法启动后,为每一个延迟级别创建一个调度任务,每一个延迟级别对应SCHEDULE_TOPIC_XXXX主题下的一个消息消费队列 
定时调度任务的实现类为DeliverDelayedMessageTimerTask,核心实现方法为executeOnTimeup 
 
代码:ScheduleMessageService$DeliverDelayedMessageTimerTask#executeOnTimeup 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 ConsumeQueue cq =     ScheduleMessageService.this .defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,         delayLevel2QueueId(delayLevel)); ... SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this .offset); ... for  (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {    long  offsetPy = bufferCQ.getByteBuffer().getLong();     int  sizePy = bufferCQ.getByteBuffer().getInt();     long  tagsCode = bufferCQ.getByteBuffer().getLong();     if  (cq.isExtAddr(tagsCode)) {         if  (cq.getExt(tagsCode, cqExtUnit)) {             tagsCode = cqExtUnit.getTagsCode();         } else  {                          log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}" ,                 tagsCode, offsetPy, sizePy);             long  msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);             tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);         }     }     long  now = System.currentTimeMillis();     long  deliverTimestamp = this .correctDeliverTimestamp(now, tagsCode);          ...        	MessageExt msgExt =    ScheduleMessageService.this .defaultMessageStore.lookMessageByOffset(        offsetPy, sizePy); }  
顺序消息 
实现类是org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService 
 
代码:ConsumeMessageOrderlyService#start 
1 2 3 4 5 6 7 8 9 10 11 public  void  start ()           if  (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this .defaultMQPushConsumerImpl.messageModel())) {         this .scheduledExecutorService.scheduleAtFixedRate(new  Runnable() {             @Override              public  void  run ()                   ConsumeMessageOrderlyService.this .lockMQPeriodically();             }         }, 1000  * 1 , ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);     } } 
代码:ConsumeMessageOrderlyService#submitConsumeRequest 
1 2 3 4 5 6 7 8 9 10 11 public  void  submitConsumeRequest (     final  List<MessageExt> msgs,     final  ProcessQueue processQueue,     final  MessageQueue messageQueue,     final  boolean  dispathToConsume)      if  (dispathToConsume) {         ConsumeRequest consumeRequest = new  ConsumeRequest(processQueue, messageQueue);         this .consumeExecutor.submit(consumeRequest);     } } 
代码:ConsumeMessageOrderlyService$ConsumeRequest#run 
1 2 3 4 5 6 7 8 9 10 if  (this .processQueue.isDropped()) {    log.warn("run, the message queue not be able to consume, because it's dropped. {}" , this .messageQueue);     return ; } final  Object objLock = messageQueueLock.fetchLockObject(this .messageQueue);synchronized  (objLock) {	... } 
小结 
RocketMQ消息消费分别为集群模式、广播模式
消息队列负载:由RebalanceService线程默认每隔20s进行一次消息队列负载,根据当前消费者组内Consumer个数与Topic队列数量,按照某一种负载算法进行队列分配
消息拉取:由PullMessageService线程根据RebalanceService线程创建的拉取任务进行拉取,默认每次拉取32条消息,提交给Consumer消费线程后继续下一次消息拉取。如果消息消费过慢产生消息堆积会触发消息消费拉取流控
并发消息消费:消费线程池中的线程可以并发对同一个消息队列的消息进行消费,消费成功后,取出消息队列中最小的消息偏移量作为消费进度偏移量存储于消费进度存储文件中
集群模式:消息消费进度存储在Broker(消息服务器) 
广播模式:消息消费进度存储在Consumer 
 
RocketMQ不支持任意精度的定时调度消息,只支持自定义的消息延迟级别
顺序消息一般使用集群模式,Consumer内的线程池中的线程对消息消费队列只能串行消费,和并发消息消费最本质的区别是消息消费时必须锁定消息消费队列 ,Broker会存储消息消费队列的锁占用情况