RocketMQ源码阅读
环境
NameServer 架构设计
Producer发送某一个主题到消息服务器,消息服务器将消息持久化存储,Consumer订阅该兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费者(Push模式)或者消费者主动向消息服务器拉取(Pull模式),同时部署多台消息服务器共同承担消息的存储
Producer如何知道消息要发送到哪台消息服务器?一个Broker宕机后,Producer如何在不重启服务下发送消息给新的Broker?——NameServer
Broker启动时向所有NameServer注册,Producer发送消息前从NameServer获取Broker地址列表,根据负载均衡算法从列表选择一台Broker发送
NameServer与每台Broker保持长连接,间隔30S检测Broker是否存活,如果Broker宕机则从路由注册表中删除(路由变化不会马上通知Producer——降低NameServer实现的复杂度,消息发送端提供容错机制保证消息发送的可用性)
通过部署多台NameServer实现高可用,但彼此之间不通讯——在某一个时刻Nameservers并不完全相同,但并不影响消息发送
启动流程
启动类:org.apache.rocketmq.namesrv.NamesrvStartup
步骤一
解析配置文件,填充NameServerConfig、NettyServerConfig属性值,创建NamesrvController
代码:NamesrvController#createNamesrvController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 final NamesrvConfig namesrvConfig = new NamesrvConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();nettyServerConfig.setListenPort(9876 ); if (commandLine.hasOption('c' )) { String file = commandLine.getOptionValue('c' ); if (file != null ) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n" , file); in.close(); } } if (commandLine.hasOption('p' )) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0 ); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
NamesrvConfig属性
1 2 3 4 5 6 private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); private String kvConfigPath = System.getProperty("user.home" ) + File.separator + "namesrv" + File.separator + "kvConfig.json" ; private String configStorePath = System.getProperty("user.home" ) + File.separator + "namesrv" + File.separator + "namesrv.properties" ; private String productEnvName = "center" ;private boolean clusterTest = false ;private boolean orderMessageEnable = false ;
NettyServerConfig属性
1 2 3 4 5 6 7 8 9 10 11 private int listenPort = 8888 ; private int serverWorkerThreads = 8 ; private int serverCallbackExecutorThreads = 0 ; private int serverSelectorThreads = 3 ; private int serverOnewaySemaphoreValue = 256 ; private int serverAsyncSemaphoreValue = 64 ; private int serverChannelMaxIdleTimeSeconds = 120 ; private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; private boolean serverPooledByteBufAllocatorEnable = true ; private boolean useEpollNativeSelector = false ;
步骤二
根据启动属性创建NamesrvController实例,并初始化该实例(NameServer核心控制器)
代码:NamesrvController#initialize
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public boolean initialize () { this .kvConfigManager.load(); this .remotingServer = new NettyRemotingServer(this .nettyServerConfig, this .brokerHousekeepingService); this .remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_" )); this .registerProcessor(); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { NamesrvController.this .routeInfoManager.scanNotActiveBroker(); } }, 5 , 10 , TimeUnit.SECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { NamesrvController.this .kvConfigManager.printAllPeriodically(); } }, 1 , 10 , TimeUnit.MINUTES); return true ; }
步骤三
代码:NamesrvStartup#start
1 2 3 4 5 6 7 8 9 Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call () throws Exception { controller.shutdown(); return null ; } }));
路由管理
NameServer为Producer、Consumer提供关于Topic的路由信息,需要存储路由的基础信息并还要管理Broker节点——路由注册、路由删除等
路由注册通过Broker与NameServer的心跳功能实现。Broker启动时向集群中所有NameServer发送心跳,每隔30s向集群中所有NameServer 发送心跳
NameServer收到心跳更新brokerLiveTable中BrokerLiveInfo的lastUpdataTimeStamp
NameServer每隔10s扫描brokerLiveTable,连续120S没有收到心跳包则移除Broker的路由信息同时关闭Socket连接
心跳包含BrokerId
,Broker
地址,Broker
名称,Broker
所属集群名称、Broker
关联的FilterServer
列表
路由元信息 代码:RouteInfoManager
1 2 3 4 5 private final HashMap<String, List<QueueData>> topicQueueTable; private final HashMap<String, BrokerData> brokerAddrTable; private final HashMap<String, Set<String>> clusterAddrTable; private final HashMap<String, BrokerLiveInfo> brokerLiveTable; private final HashMap<String, List<String>> filterServerTable;
一个Topic拥有多个消息队列,一个Broker为每一个Topic创建4个读队列和4个写队列
多个Broker组成一个集群,集群由相同的多台Broker组成Master-Slave,brokerId为0代表Master,大于0为Slave
BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间
路由注册 1)发送心跳包
代码:BrokerController#start
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 this .registerBrokerAll(true , false , true );this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { BrokerController.this .registerBrokerAll(true , false , brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception" , e); } } }, 1000 * 10 , Math.max(10000 , Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000 )), TimeUnit.MILLISECONDS);
代码:BrokerOuterAPI#registerBrokerAll
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 List<String> nameServerAddressList = this .remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0 ) { final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte [] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run () { try { RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null ) { registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK" , brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}" , namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } }
代码:BrokerOutAPI#registerBroker
1 2 3 4 5 6 7 8 9 if (oneway) { try { this .remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { } return null ; } RemotingCommand response = this .remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
2)处理心跳包
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor
处理类解析请求类型,如果请求类型是为**REGISTER_BROKER** ,则将请求转发到RouteInfoManager#regiesterBroker
代码:DefaultRequestProcessor#processRequest
1 2 3 4 5 6 7 8 9 case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this .registerBrokerWithFilterServer(ctx, request); } else { return this .registerBroker(ctx, request); }
代码:DefaultRequestProcessor#registerBroker
1 2 3 4 5 6 7 8 9 10 RegisterBrokerResult result = this .namesrvController.getRouteInfoManager().registerBroker( requestHeader.getClusterName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerName(), requestHeader.getBrokerId(), requestHeader.getHaServerAddr(), topicConfigWrapper, null , ctx.channel() );
代码:RouteInfoManager#registerBroker
维护路由信息
1 2 3 4 5 6 7 8 9 this .lock.writeLock().lockInterruptibly();Set<String> brokerNames = this .clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this .clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 BrokerData brokerData = this .brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true ; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this .brokerAddrTable.put(brokerName, brokerData); } Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this .isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null ) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this .createAndUpdateQueueData(brokerName, entry.getValue()); } } } } ***代码:RouteInfoManager#createAndUpdateQueueData*** private void createAndUpdateQueueData (final String brokerName, final TopicConfig topicConfig) { QueueData queueData = new QueueData(); queueData.setBrokerName(brokerName); queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); queueData.setReadQueueNums(topicConfig.getReadQueueNums()); queueData.setPerm(topicConfig.getPerm()); queueData.setTopicSynFlag(topicConfig.getTopicSysFlag()); List<QueueData> queueDataList = this .topicQueueTable.get(topicConfig.getTopicName()); if (null == queueDataList) { queueDataList = new LinkedList<QueueData>(); queueDataList.add(queueData); this .topicQueueTable.put(topicConfig.getTopicName(), queueDataList); log.info("new topic registered, {} {}" , topicConfig.getTopicName(), queueData); } else { boolean addNewOne = true ; Iterator<QueueData> it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); if (qd.getBrokerName().equals(brokerName)) { if (qd.equals(queueData)) { addNewOne = false ; } else { log.info("topic changed, {} OLD: {} NEW: {}" , topicConfig.getTopicName(), qd, queueData); it.remove(); } } } if (addNewOne) { queueDataList.add(queueData); } } }
1 2 3 4 5 6 BrokerLiveInfo prevBrokerLiveInfo = this .brokerLiveTable.put(brokerAddr,new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr));
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 if (filterServerList != null ) { if (filterServerList.isEmpty()) { this .filterServerTable.remove(brokerAddr); } else { this .filterServerTable.put(brokerAddr, filterServerList); } } if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null ) { BrokerLiveInfo brokerLiveInfo = this .brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null ) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } }
路由删除
删除的场景:
NameServer定期扫描brokerLiveTable,检测上次心跳包与当前系统的时间差,如果时间超过120s,则移除broker
Broker在正常关闭的情况下,会执行unregisterBroker指令
二者都是从相关路由表中删除与该broker相关的信息
代码:NamesrvController#initialize
1 2 3 4 5 6 7 8 this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { NamesrvController.this .routeInfoManager.scanNotActiveBroker(); } }, 5 , 10 , TimeUnit.SECONDS);
代码:RouteInfoManager#scanNotActiveBroker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void scanNotActiveBroker () { Iterator<Entry<String, BrokerLiveInfo>> it = this .brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); this .onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }
代码:RouteInfoManager#onChannelDestroy
1 2 3 4 this .lock.writeLock().lockInterruptibly();this .brokerLiveTable.remove(brokerAddrFound);this .filterServerTable.remove(brokerAddrFound);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 String brokerNameFound = null ; boolean removeBrokerName = false ;Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this .brokerAddrTable.entrySet().iterator(); while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed" , brokerId, brokerAddr); break ; } } if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true ; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed" , brokerData.getBrokerName()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 if (brokerNameFound != null && removeBrokerName) { Iterator<Entry<String, Set<String>>> it = this .clusterAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, Set<String>> entry = it.next(); String clusterName = entry.getKey(); Set<String> brokerNames = entry.getValue(); boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed" , brokerNameFound, clusterName); if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster" , clusterName); it.remove(); } break ; } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 if (removeBrokerName) { Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = this .topicQueueTable.entrySet().iterator(); while (itTopicQueueTable.hasNext()) { Entry<String, List<QueueData>> entry = itTopicQueueTable.next(); String topic = entry.getKey(); List<QueueData> queueDataList = entry.getValue(); Iterator<QueueData> itQueueData = queueDataList.iterator(); while (itQueueData.hasNext()) { QueueData queueData = itQueueData.next(); if (queueData.getBrokerName().equals(brokerNameFound)) { itQueueData.remove(); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed" , topic, queueData); } } if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed" , topic); } } }
1 2 3 4 finally { this .lock.writeLock().unlock(); }
路由发现
路由发现是非实时的,Topic路由变化后NameServer不会主动推送给客户端(Producer),由客户端定时拉取Topic最新的路由
代码:DefaultRequestProcessor#getRouteInfoByTopic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public RemotingCommand getRouteInfoByTopic (ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null ); final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); TopicRouteData topicRouteData = this .namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); if (topicRouteData != null ) { if (this .namesrvController.getNamesrvConfig().isOrderMessageEnable()) { String orderTopicConf = this .namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic()); topicRouteData.setOrderTopicConf(orderTopicConf); } byte [] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null ); return response; } response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; }
Producer
方法和属性 接口方法
MQAdmin
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void createTopic (final String key, final String newTopic, final int queueNum) throws MQClientException ;long searchOffset (final MessageQueue mq, final long timestamp) long maxOffset (final MessageQueue mq) throws MQClientException ;long minOffset (final MessageQueue mq) MessageExt viewMessage (final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException ; QueryResult queryMessage (final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException ;MessageExt viewMessage (String topic,String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException ;
MQProducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void start () throws MQClientException ;void shutdown () ;List<MessageQueue> fetchPublishMessageQueues (final String topic) throws MQClientException ;SendResult send (final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException ;SendResult send (final Message msg, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException ;void send (final Message msg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException ;void send (final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException ;void sendOneway (final Message msg) throws MQClientException, RemotingException, InterruptedException ;SendResult send (final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException ;void send (final Message msg, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException ;void sendOneway (final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException ;SendResult send (final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,InterruptedException ;
类属性
1 2 3 4 5 6 7 8 9 producerGroup:生产者所属组 createTopicKey:默认Topic defaultTopicQueueNums:默认主题在每一个Broker队列数量 sendMsgTimeout:发送消息默认超时时间,默认3 s compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4 k retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为2 ,总共执行3 次 retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2 retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false maxMessageSize:允许发送的最大消息长度,默认为4 M
启动流程
JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表
ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String,MQClientInstance>();
一个clientId只会创建一个MQClientInstance,其中封装RocketMQ网络处理API,是Producer、Consumer、NameServer、Broker之间的网络通道
代码:DefaultMQProducerImpl#start
1 2 3 4 5 6 7 8 this .checkConfig();if (!this .defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this .defaultMQProducer.changeInstanceNameToPID(); } this .mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this .defaultMQProducer, rpcHook);
代码:MQClientManager#getAndCreateMQClientInstance
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public MQClientInstance getAndCreateMQClientInstance (final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this .factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this .factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this .factoryTable.putIfAbsent(clientId, instance); if (prev != null ) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]" , clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]" , clientId); } } return instance; }
代码:DefaultMQProducerImpl#start
1 2 3 4 5 6 7 8 9 10 11 12 boolean registerOK = mQClientFactory.registerProducer(this .defaultMQProducer.getProducerGroup(), this );if (!registerOK) { this .serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this .defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null ); } if (startFactory) { mQClientFactory.start(); }
消息发送
代码:DefaultMQProducerImpl#send(Message msg)
1 2 3 4 public SendResult send (Message msg) { return send(msg, this .defaultMQProducer.getSendMsgTimeout()); }
代码:DefaultMQProducerImpl#send(Message msg,long timeout)
1 2 3 4 public SendResult send (Message msg,long timeout) { return this .sendDefaultImpl(msg, CommunicationMode.SYNC, null , timeout); }
代码:DefaultMQProducerImpl#sendDefaultImpl
1 2 Validators.checkMessage(msg, this .defaultMQProducer);
1)校验消息 代码:Validators#checkMessage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static void checkMessage (Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null" ); } Validators.checkTopic(msg.getTopic()); if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null" ); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero" ); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } }
2)查找路由 代码:DefaultMQProducerImpl#tryToFindTopicPublishInfo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private TopicPublishInfo tryToFindTopicPublishInfo (final String topic) { TopicPublishInfo topicPublishInfo = this .topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this .topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this .mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this .topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this .mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true , this .defaultMQProducer); topicPublishInfo = this .topicPublishInfoTable.get(topic); return topicPublishInfo; } }
代码:TopicPublishInfo
1 2 3 4 5 6 7 public class TopicPublishInfo { private boolean orderTopic = false ; private boolean haveTopicRouterInfo = false ; private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData; }
代码:MQClientInstance#updateTopicRouteInfoFromNameServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null ) { topicRouteData = this .mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3 ); if (topicRouteData != null ) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { topicRouteData = this .mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3 ); }
代码:MQClientInstance#updateTopicRouteInfoFromNameServer
1 2 3 4 5 6 7 8 TopicRouteData old = this .topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) { changed = this .isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]" , topic, old, topicRouteData); }
代码:MQClientInstance#updateTopicRouteInfoFromNameServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if (changed) { TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true ); Iterator<Entry<String, MQProducerInner>> it = this .producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null ) { impl.updateTopicPublishInfo(topic, publishInfo); } } }
代码:MQClientInstance#topicRouteData2TopicPublishInfo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public static TopicPublishInfo topicRouteData2TopicPublishInfo (final String topic, final TopicRouteData route) { TopicPublishInfo info = new TopicPublishInfo(); info.setTopicRouteData(route); if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0 ) { String[] brokers = route.getOrderTopicConf().split(";" ); for (String broker : brokers) { String[] item = broker.split(":" ); int nums = Integer.parseInt(item[1 ]); for (int i = 0 ; i < nums; i++) { MessageQueue mq = new MessageQueue(topic, item[0 ], i); info.getMessageQueueList().add(mq); } } info.setOrderTopic(true ); } else { List<QueueData> qds = route.getQueueDatas(); Collections.sort(qds); for (QueueData qd : qds) { if (PermName.isWriteable(qd.getPerm())) { BrokerData brokerData = null ; for (BrokerData bd : route.getBrokerDatas()) { if (bd.getBrokerName().equals(qd.getBrokerName())) { brokerData = bd; break ; } } if (null == brokerData) { continue ; } if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) { continue ; } for (int i = 0 ; i < qd.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); info.getMessageQueueList().add(mq); } } } info.setOrderTopic(false ); } return info; }
3)选择队列
4)发送消息
消息发送API核心入口:**DefaultMQProducerImpl#sendKernelImpl**
1 2 3 4 5 6 7 8 private SendResult sendKernelImpl ( final Message msg, //待发送消息 final MessageQueue mq, //消息发送队列 final CommunicationMode communicationMode, //消息发送内模式 final SendCallback sendCallback, pp //异步消息回调函数 final TopicPublishInfo topicPublishInfo, //主题路由信息 final long timeout //超时时间 )
代码:DefaultMQProducerImpl#sendKernelImpl
1 2 3 4 5 6 7 String brokerAddr = this .mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this .mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false ;if (null != this .mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this .mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true ; } int sysFlag = 0 ;boolean msgBodyCompressed = false ;if (this .tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true ; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 if (this .hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this ); context.setProducerGroup(this .defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this .defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this .defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true" )) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME" ) != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null ) { context.setMsgType(MessageType.Delay_Msg); } this .executeSendMessageHookBefore(context); }
代码:SendMessageHook
1 2 3 4 5 6 7 public interface SendMessageHook { String hookName () ; void sendMessageBefore (final SendMessageContext context) ; void sendMessageAfter (final SendMessageContext context) ; }
代码:DefaultMQProducerImpl#sendKernelImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this .defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this .defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this .defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0 ); requestHeader.setUnitMode(this .isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null ) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null ) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 case ASYNC: Message tmpMessage = msg; boolean messageCloned = false ; if (msgBodyCompressed) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true ; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true ; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this .defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout" ); } sendResult = this .mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this .mQClientFactory, this .defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this ); break ; case ONEWAY:case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout" ); } sendResult = this .mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this ); break ; default : assert false ; break ; }
1 2 3 4 5 if (this .hasSendMessageHook()) { context.setSendResult(sendResult); this .executeSendMessageHookAfter(context); }
批量消息发送
同一个主题的多条消息一起打包发送到消息服务端
如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间
单批次消息总长度不能超过DefaultMQProducer#maxMessageSize
如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容?
代码:DefaultMQProducer#send
1 2 3 4 5 public SendResult send (Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this .defaultMQProducerImpl.send(batch(msgs)); }
代码:DefaultMQProducer#batch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private MessageBatch batch (Collection<Message> msgs) throws MQClientException { MessageBatch msgBatch; try { msgBatch = MessageBatch.generateFromList(msgs); for (Message message : msgBatch) { Validators.checkMessage(message, this ); MessageClientIDSetter.setUniqID(message); message.setTopic(withNamespace(message.getTopic())); } msgBatch.setBody(msgBatch.encode()); } catch (Exception e) { throw new MQClientException("Failed to initiate the MessageBatch" , e); } msgBatch.setTopic(withNamespace(msgBatch.getTopic())); return msgBatch; }
消息存储 消息存储核心类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private final MessageStoreConfig messageStoreConfig; private final CommitLog commitLog; private final ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> consumeQueueTable; private final FlushConsumeQueueService flushConsumeQueueService; private final CleanCommitLogService cleanCommitLogService; private final CleanConsumeQueueService cleanConsumeQueueService; private final IndexService indexService; private final AllocateMappedFileService allocateMappedFileService; private final ReputMessageService reputMessageService;private final HAService haService; private final ScheduleMessageService scheduleMessageService; private final StoreStatsService storeStatsService; private final TransientStorePool transientStorePool; private final BrokerStatsManager brokerStatsManager; private final MessageArrivingListener messageArrivingListener; private final BrokerConfig brokerConfig; private StoreCheckpoint storeCheckpoint; private final LinkedList<CommitLogDispatcher> dispatcherList;
消息存储流程
消息存储入口:DefaultMessageStore#putMessage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 if (BrokerRole.SLAVE == this .messageStoreConfig.getBrokerRole()) { long value = this .printTimes.getAndIncrement(); if ((value % 50000 ) == 0 ) { log.warn("message store is slave mode, so putMessage is forbidden " ); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null ); } if (!this .runningFlags.isWriteable()) { long value = this .printTimes.getAndIncrement(); return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null ); } else { this .printTimes.set(0 ); } if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null ); } if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null ); } if (this .isOSPageCacheBusy()) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null ); } PutMessageResult result = this .commitLog.putMessage(msg);
代码:CommitLog#putMessage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 msg.setStoreTimestamp(beginLockTimestamp); if (null == mappedFile || mappedFile.isFull()) { mappedFile = this .mappedFileQueue.getLastMappedFile(0 ); } if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0 ; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null ); } result = mappedFile.appendMessage(msg, this .appendMessageCallback);
代码:MappedFile#appendMessagesInner
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 int currentPos = this .wrotePosition.get();if (currentPos < this .fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this .mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result = null ; if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this .getFileFromOffset(), byteBuffer, this .fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this .getFileFromOffset(), byteBuffer, this .fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } this .wrotePosition.addAndGet(result.getWroteBytes()); this .storeTimestamp = result.getStoreTimestamp(); return result; }
代码:CommitLog#doAppend
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 long wroteOffset = fileFromOffset + byteBuffer.position();this .resetByteBuffer(hostHolder, 8 );String msgId = MessageDecoder.createMessageId(this .msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); keyBuilder.setLength(0 ); keyBuilder.append(msgInner.getTopic()); keyBuilder.append('-' ); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this .topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L ; CommitLog.this .topicQueueTable.put(key, queueOffset); } final byte [] propertiesData =msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}" , propertiesData.length); return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); } final byte [] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);final int topicLength = topicData.length;final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
代码:CommitLog#calMsgLength
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 protected static int calMsgLength (int bodyLength, int topicLength, int propertiesLength) { final int msgLen = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + 8 + 8 + 8 + 4 + 8 + 4 + (bodyLength > 0 ? bodyLength : 0 ) + 1 + topicLength + 2 + (propertiesLength > 0 ? propertiesLength : 0 ) + 0 ; return msgLen; }
代码:CommitLog#doAppend
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 if (msgLen > this .maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this .maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this .resetByteBuffer(this .msgStoreItemMemory, maxBlank); this .msgStoreItemMemory.putInt(maxBlank); this .msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); final long beginTimeMills = CommitLog.this .defaultMessageStore.now(); byteBuffer.put(this .msgStoreItemMemory.array(), 0 , maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this .defaultMessageStore.now() - beginTimeMills); } final long beginTimeMills = CommitLog.this .defaultMessageStore.now();byteBuffer.put(this .msgStoreItemMemory.array(), 0 , msgLen); AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this .defaultMessageStore.now() -beginTimeMills); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break ; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: CommitLog.this .topicQueueTable.put(key, ++queueOffset); break ; default : break ; }
代码:CommitLog#putMessage
1 2 3 4 5 6 putMessageLock.unlock(); handleDiskFlush(result, putMessageResult, msg); handleHA(result, putMessageResult, msg);
存储文件
commitLog:消息存储目录
config:运行期间一些配置信息
consumerqueue:消息消费队列存储目录
index:消息索引文件存储目录
abort:如果存在,表明该Broker非正常关闭
checkpoint:存储CommitLog文件最后一次刷盘时间戳、consumerqueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳
存储文件内存映射
RocketMQ通过内存映射文件提高IO性能,CommitLog、ConsumerQueue、IndexFile的单个文件都为固定长度,一个文件写满后创建的新文件,其文件名为该文件第一条消息对应的全局物理偏移量
1)MappedFileQueue
1 2 3 4 5 6 String storePath; int mappedFileSize; CopyOnWriteArrayList<MappedFile> mappedFiles; AllocateMappedFileService allocateMappedFileService; long flushedWhere = 0 ; long committedWhere = 0 ;
2)MappedFile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 int OS_PAGE_SIZE = 1024 * 4 ; AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0 ); AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0 ); AtomicInteger wrotePosition = new AtomicInteger(0 ); AtomicInteger committedPosition = new AtomicInteger(0 ); AtomicInteger flushedPosition = new AtomicInteger(0 ); int fileSize; FileChannel fileChannel; ByteBuffer writeBuffer = null ; TransientStorePool transientStorePool = null ; String fileName; long fileFromOffset; File file; MappedByteBuffer mappedByteBuffer; volatile long storeTimestamp = 0 ; boolean firstCreateInQueue = false ;
3)TransientStorePool
短暂的存储池
RocketMQ单独创建一个MappedByteBuffer内存缓存池,用来临时存储数据
数据先写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RocketMQ引入该机制是为了提供一种内存锁定,将当前堆外内存一直锁定在堆内内存中,避免被进程将内存交换到磁盘
1 2 3 private final int poolSize; private final int fileSize; private final Deque<ByteBuffer> availableBuffers;
初始化
1 2 3 4 5 6 7 8 9 10 11 12 public void init () { for (int i = 0 ; i < poolSize; i++) { ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize); final long address = ((DirectBuffer) byteBuffer).address(); Pointer pointer = new Pointer(address); LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize)); availableBuffers.offer(byteBuffer); } }
实时更新消息消费队列与索引文件
消息消费队列文件、消息属性索引文件是基于CommitLog文件构建的,Producer提交的消息存储在CommitLog后,ConsumerQueue、IndexFile需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟
RocketMQ通过开启一个线程ReputMessageService来准实时转发CommitLog文件更新事件,相应的任务处理器根据转发的消息更新ConsumerQueue、IndexFile文件
代码:DefaultMessageStore:start
1 2 3 4 this .reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);this .reputMessageService.start();
代码:DefaultMessageStore:run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void run () { DefaultMessageStore.log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { Thread.sleep(1 ); this .doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this .getServiceName() + " service has exception. " , e); } } DefaultMessageStore.log.info(this .getServiceName() + " service end" ); }
代码:DefaultMessageStore:deReput
1 2 3 4 5 6 7 8 9 10 11 for (int readSize = 0 ; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = DefaultMessageStore.this .commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false , false ); int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); if (dispatchRequest.isSuccess()) { if (size > 0 ) { DefaultMessageStore.this .doDispatch(dispatchRequest); } } }
DispatchRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 String topic; int queueId; long commitLogOffset; int msgSize; long tagsCode; long storeTimestamp; long consumeQueueOffset; String keys; boolean success; String uniqKey; int sysFlag; long preparedTransactionOffset; Map<String, String> propertiesMap; byte [] bitMap;
1)转发到ConsumerQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override public void dispatch (DispatchRequest request) { final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: DefaultMessageStore.this .putMessagePositionInfo(request); break ; case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break ; } } }
代码:DefaultMessageStore#putMessagePositionInfo
1 2 3 4 5 6 public void putMessagePositionInfo (DispatchRequest dispatchRequest) { ConsumeQueue cq = this .findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()); cq.putMessagePositionInfoWrapper(dispatchRequest); }
代码:DefaultMessageStore#putMessagePositionInfo
1 2 3 4 5 6 7 8 9 10 11 12 this .byteBufferIndex.flip();this .byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this .byteBufferIndex.putLong(offset);this .byteBufferIndex.putInt(size);this .byteBufferIndex.putLong(tagsCode);MappedFile mappedFile = this .mappedFileQueue.getLastMappedFile(expectLogicOffset); if (mappedFile != null ) { return mappedFile.appendMessage(this .byteBufferIndex.array()); }
2)转发到Index
1 2 3 4 5 6 7 8 9 class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { @Override public void dispatch (DispatchRequest request) { if (DefaultMessageStore.this .messageStoreConfig.isMessageIndexEnable()) { DefaultMessageStore.this .indexService.buildIndex(request); } } }
代码:DefaultMessageStore#buildIndex
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public void buildIndex (DispatchRequest req) { IndexFile indexFile = retryGetAndCreateIndexFile(); if (indexFile != null ) { long endPhyOffset = indexFile.getEndPhyOffset(); DispatchRequest msg = req; String topic = msg.getTopic(); String keys = msg.getKeys(); if (msg.getCommitLogOffset() < endPhyOffset) { return ; } final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: break ; case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: return ; } if (req.getUniqKey() != null ) { indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey())); if (indexFile == null ) { return ; } } if (keys != null && keys.length() > 0 ) { String[] keyset = keys.split(MessageConst.KEY_SEPARATOR); for (int i = 0 ; i < keyset.length; i++) { String key = keyset[i]; if (key.length() > 0 ) { indexFile = putKey(indexFile, msg, buildKey(topic, key)); if (indexFile == null ) { return ; } } } } } else { log.error("build index error, stop building index" ); } }
消息队列和索引文件恢复
RocketMQ首先将消息全量存储在CommitLog文件,然后异步生成转发任务更新ConsumerQueue和Index文件
如果消息成功存储到CommitLog文件,转发任务未成功执行,且Broker由于某个愿意宕机,会导致CommitLog、ConsumerQueue、IndexFile文件数据不一致
1)存储文件加载 代码:DefaultMessageStore#load
判断上一次是否异常退出
Broker在启动时创建abort文件,在退出时通过JVM钩子函数删除abort文件。如果下次启动时存在abort文件,说明Broker异常退出,CommitLog与ConsumerQueue数据有可能不一致,需要修复
1 2 3 4 5 6 7 8 9 boolean lastExitOK = !this .isTempFileExist();private boolean isTempFileExist () { String fileName = StorePathConfigHelper .getAbortFile(this .messageStoreConfig.getStorePathRootDir()); File file = new File(fileName); return file.exists(); }
代码:DefaultMessageStore#load
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 if (null != scheduleMessageService) { result = result && this .scheduleMessageService.load(); } result = result && this .commitLog.load(); result = result && this .loadConsumeQueue(); if (result) { this .storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this .messageStoreConfig.getStorePathRootDir())); this .indexService.load(lastExitOK); this .recover(lastExitOK); }
代码:MappedFileQueue#load (加载CommitLog到映射文件)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 File dir = new File(this .storePath); File[] files = dir.listFiles(); if (files != null ) { Arrays.sort(files); for (File file : files) { if (file.length() != this .mappedFileSize) { return false ; } try { MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); mappedFile.setWrotePosition(this .mappedFileSize); mappedFile.setFlushedPosition(this .mappedFileSize); mappedFile.setCommittedPosition(this .mappedFileSize); this .mappedFiles.add(mappedFile); log.info("load " + file.getPath() + " OK" ); } catch (IOException e) { log.error("load file " + file + " error" , e); return false ; } } } return true ;
代码:DefaultMessageStore#loadConsumeQueue (加载消息消费队列)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this .messageStoreConfig.getStorePathRootDir())); File[] fileTopicList = dirLogic.listFiles(); if (fileTopicList != null ) { for (File fileTopic : fileTopicList) { String topic = fileTopic.getName(); File[] fileQueueIdList = fileTopic.listFiles(); if (fileQueueIdList != null ) { for (File fileQueueId : fileQueueIdList) { int queueId; try { queueId = Integer.parseInt(fileQueueId.getName()); } catch (NumberFormatException e) { continue ; } ConsumeQueue logic = new ConsumeQueue( topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this .messageStoreConfig.getStorePathRootDir()), this .getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this ); this .putConsumeQueue(topic, queueId, logic); if (!logic.load()) { return false ; } } } } } log.info("load logics queue all over, OK" ); return true ;
代码:IndexService#load (加载索引文件)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public boolean load (final boolean lastExitOK) { File dir = new File(this .storePath); File[] files = dir.listFiles(); if (files != null ) { Arrays.sort(files); for (File file : files) { try { IndexFile f = new IndexFile(file.getPath(), this .hashSlotNum, this .indexNum, 0 , 0 ); f.load(); if (!lastExitOK) { if (f.getEndTimestamp() > this .defaultMessageStore.getStoreCheckpoint() .getIndexMsgTimestamp()) { f.destroy(0 ); continue ; } } log.info("load index file OK, " + f.getFileName()); this .indexFileList.add(f); } catch (IOException e) { log.error("load file {} error" , file, e); return false ; } catch (NumberFormatException e) { log.error("load file {} error" , file, e); } } } return true ; }
代码:DefaultMessageStore#recover (文件恢复,根据Broker是否正常退出执行不同的恢复策略)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private void recover (final boolean lastExitOK) { long maxPhyOffsetOfConsumeQueue = this .recoverConsumeQueue(); if (lastExitOK) { this .commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); } else { this .commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } this .recoverTopicQueueTable(); }
代码:DefaultMessageStore#recoverTopicQueueTable (恢复ConsumerQueue后,在CommitLog实例中保存每个消息队列当前的存储逻辑偏移量)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void recoverTopicQueueTable () { HashMap<String, Long> table = new HashMap<String, Long>(1024 ); long minPhyOffset = this .commitLog.getMinOffset(); for (ConcurrentMap<Integer, ConsumeQueue> maps : this .consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { String key = logic.getTopic() + "-" + logic.getQueueId(); table.put(key, logic.getMaxOffsetInQueue()); logic.correctMinOffset(minPhyOffset); } } this .commitLog.setTopicQueueTable(table); }
2)正常恢复 代码:CommitLog#recoverNormally
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public void recoverNormally (long maxPhyOffsetOfConsumeQueue) { final List<MappedFile> mappedFiles = this .mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { int index = mappedFiles.size() - 3 ; if (index < 0 ) index = 0 ; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0 ; while (true ) { DispatchRequest dispatchRequest = this .checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess() && size > 0 ) { mappedFileOffset += size; } else if (dispatchRequest.isSuccess() && size == 0 ) { index++; if (index >= mappedFiles.size()) { break ; } else { mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0 ; } } else if (!dispatchRequest.isSuccess()) { log.info("recover physics file end, " + mappedFile.getFileName()); break ; } } processOffset += mappedFileOffset; this .mappedFileQueue.setFlushedWhere(processOffset); this .mappedFileQueue.setCommittedWhere(processOffset); this .mappedFileQueue.truncateDirtyFiles(processOffset); if (maxPhyOffsetOfConsumeQueue >= processOffset) { this .defaultMessageStore.truncateDirtyLogicFiles(processOffset); } } else { this .mappedFileQueue.setFlushedWhere(0 ); this .mappedFileQueue.setCommittedWhere(0 ); this .defaultMessageStore.destroyLogics(); } }
代码:MappedFileQueue#truncateDirtyFiles
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public void truncateDirtyFiles (long offset) { List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>(); for (MappedFile file : this .mappedFiles) { long fileTailOffset = file.getFileFromOffset() + this .mappedFileSize; if (fileTailOffset > offset) { if (offset >= file.getFileFromOffset()) { file.setWrotePosition((int ) (offset % this .mappedFileSize)); file.setCommittedPosition((int ) (offset % this .mappedFileSize)); file.setFlushedPosition((int ) (offset % this .mappedFileSize)); } else { file.destroy(1000 ); willRemoveFiles.add(file); } } } this .deleteExpiredFile(willRemoveFiles); }
3)异常恢复
Broker异常停止文件恢复的实现为CommitLog#recoverAbnormally
异常文件恢复步骤与正常停止文件恢复流程基本相同,差异在于:
正常停止默认从倒数第三个文件开始进行恢复,异常停止需要从最后一个文件往前找第一个消息存储正常的文件
如果CommitLog目录没有消息文件,而消息消费队列目录下存在文件,则需要销毁
代码:CommitLog#recoverAbnormally
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 if (!mappedFiles.isEmpty()) { int index = mappedFiles.size() - 1 ; MappedFile mappedFile = null ; for (; index >= 0 ; index--) { mappedFile = mappedFiles.get(index); if (this .isMappedFileMatchedRecover(mappedFile)) { log.info("recover from this mapped file " + mappedFile.getFileName()); break ; } } if (index < 0 ) { index = 0 ; mappedFile = mappedFiles.get(index); } }else { this .mappedFileQueue.setFlushedWhere(0 ); this .mappedFileQueue.setCommittedWhere(0 ); this .defaultMessageStore.destroyLogics(); }
刷盘机制
RocketMQ的存储基于JDK NIO的内存映射机制(MappedByteBuffer),消息存储时首先将消息追加到内存,再根据配置的刷盘策略在不同时间刷写磁盘
同步刷盘
代码:CommitLog#handleDiskFlush
1 2 3 4 5 6 7 8 9 10 11 12 final GroupCommitService service = (GroupCommitService) this .flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this .defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); }
GroupCommitRequest
1 2 3 long nextOffset; CountDownLatch countDownLatch = new CountDownLatch(1 ); volatile boolean flushOK = false ;
代码:GroupCommitService#run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void run () { CommitLog.log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { this .waitForRunning(10 ); this .doCommit(); } catch (Exception e) { CommitLog.log.warn(this .getServiceName() + " service has exception. " , e); } } ... }
代码:GroupCommitService#doCommit
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void doCommit () { synchronized (this .requestsRead) { if (!this .requestsRead.isEmpty()) { for (GroupCommitRequest req : this .requestsRead) { boolean flushOK = false ; for (int i = 0 ; i < 2 && !flushOK; i++) { flushOK = CommitLog.this .mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) { CommitLog.this .mappedFileQueue.flush(0 ); } } req.wakeupCustomer(flushOK); } long storeTimestamp = CommitLog.this .mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0 ) { CommitLog.this .defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } this .requestsRead.clear(); } else { CommitLog.this .mappedFileQueue.flush(0 ); } } }
异步刷盘
消息追加到内存后,立即响应
如果开启transientStorePoolEnable,RocketMQ单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,使用内存锁定确保其不会被置换到虚拟内存。消息追加到堆外内存,然后提交到物理文件的内存映射中,刷写磁盘
将消息直接追加到ByteBuffer(堆外内存)
CommitRealTimeService线程每隔200ms将ByteBuffer新追加内容提交到MappedByteBuffer
MappedByteBuffer在内存中追加提交的内容,wrotePosition指针向后移动
commit操作成功返回,将committedPosition位置恢复
FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存刷写到磁盘
如果未开启transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,刷写到磁盘
代码:CommitLog$CommitRealTimeService#run (提交线程工作机制)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 int interval = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();int commitDataLeastPages = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();int commitDataThoroughInterval =CommitLog.this .defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis();if (begin >= (this .lastCommitTimestamp + commitDataThoroughInterval)) { this .lastCommitTimestamp = begin; commitDataLeastPages = 0 ; } boolean result = CommitLog.this .mappedFileQueue.commit(commitDataLeastPages);long end = System.currentTimeMillis();if (!result) { this .lastCommitTimestamp = end; flushCommitLogService.wakeup(); } if (end - begin > 500 ) { log.info("Commit data to file costs {} ms" , end - begin); } this .waitForRunning(interval);
代码:CommitLog$FlushRealTimeService#run (刷盘线程工作机制)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 boolean flushCommitLogTimed = CommitLog.this .defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();int interval = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();int flushPhysicQueueLeastPages = CommitLog.this .defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();int flushPhysicQueueThoroughInterval =CommitLog.this .defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); ... long currentTimeMillis = System.currentTimeMillis();if (currentTimeMillis >= (this .lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this .lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0 ; printFlushProgress = (printTimes++ % 10 ) == 0 ; } ... if (flushCommitLogTimed) { Thread.sleep(interval); } else { this .waitForRunning(interval); } ... long begin = System.currentTimeMillis();CommitLog.this .mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this .mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0 ) {CommitLog.this .defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
过期文件删除机制
RocketMQ操作CommitLog、ConsumerQueue文件基于内存映射机制,并在启动的时候加载CommitLog、ConsumerQueue目录下的所有文件
为避免内存与磁盘的浪费,引入一种机制删除已过期的文件
RocketMQ顺序写CommitLog、ConsumerQueue文件,写操作全部落在最后一个CommitLog或者ConsumerQueue文件,之前的文件在下一个文件创建后将不会再被更新
如果当前文件在一定时间间隔内没有再次被消费,则认为是过期文件,可以被删除
RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时(通过在Broker配置文件中设置fileReservedTime,单位为小时)
代码:DefaultMessageStore#addScheduleTask
1 2 3 4 5 6 7 8 9 10 private void addScheduleTask () { this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { DefaultMessageStore.this .cleanFilesPeriodically(); } }, 1000 * 60 , this .messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); ... }
代码:DefaultMessageStore#cleanFilesPeriodically
1 2 3 4 5 6 private void cleanFilesPeriodically () { this .cleanCommitLogService.run(); this .cleanConsumeQueueService.run(); }
代码:DefaultMessageStore#deleteExpiredFiles
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void deleteExpiredFiles () { int deleteCount = 0 ; long fileReservedTime = DefaultMessageStore.this .getMessageStoreConfig().getFileReservedTime(); int deletePhysicFilesInterval = DefaultMessageStore.this .getMessageStoreConfig().getDeleteCommitLogFilesInterval(); int destroyMapedFileIntervalForcibly = DefaultMessageStore.this .getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); boolean timeup = this .isTimeToDelete();boolean spacefull = this .isSpaceToDelete();boolean manualDelete = this .manualDeleteFileSeveralTimes > 0 ;if (timeup || spacefull || manualDelete) { ...执行删除逻辑 }else { ...无作为 }
代码:CleanCommitLogService#isSpaceToDelete (当磁盘空间不足时执行删除过期文件)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private boolean isSpaceToDelete () { double ratio = DefaultMessageStore.this .getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0 ; cleanImmediately = false ; { String storePathPhysic = DefaultMessageStore.this .getMessageStoreConfig().getStorePathCommitLog(); double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); if (physicRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this .runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full" ); } cleanImmediately = true ; } else if (physicRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true ; } else { boolean diskok = DefaultMessageStore.this .runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok" ); } } if (physicRatio < 0 || physicRatio > ratio) { DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); return true ; } }
代码:MappedFileQueue#deleteExpiredFileByTime (执行文件销毁和删除)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 for (int i = 0 ; i < mfsLength; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { if (mappedFile.destroy(intervalForcibly)) { files.add(mappedFile); deleteCount++; if (files.size() >= DELETE_FILES_BATCH_MAX) { break ; } if (deleteFilesInterval > 0 && (i + 1 ) < mfsLength) { try { Thread.sleep(deleteFilesInterval); } catch (InterruptedException e) { } } } else { break ; } } else { break ; } }
小结
RocketMQ的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、Hash索引文件(IndexFile)、监测点文件(checkPoint)、abort(关闭异常文件)
单个消息存储文件、消息消费队列文件、Hash索引文件长度固定,以便使用内存映射机制进行文件的读写操作
RocketMQ组织文件以文件的起始偏移量来命名文件
RocketMQ基于内存映射文件机制提供同步刷盘和异步刷盘,后者先追加消息到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘
RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息(Commitlog),保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为了方便消费,构建了消息消费队列文件,基于主题与队列进行组织。同时RocketMQ为消息实现了Hash索引,可以为消息设置索引键,快速从CommitLog文件中检索消息
当消息达到CommitLog后,通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件与索引文件
为了安全,RocketMQ引入abort文件,记录Broker的停机是否是正常。重启Broker时为了保证CommitLog文件、ConsumerQueue文件与Hash索引文件的正确性,分别采用不同策略来恢复文件
删除策略:启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费
Consumer 消息消费概述
消息推送模式
消息消费重要方法
1 2 3 4 5 6 7 8 void sendMessageBack (final MessageExt msg, final int delayLevel, final String brokerName) :发送消息确认Set<MessageQueue> fetchSubscribeMessageQueues (final String topic) :获取消费者对主题分配了那些消息队列 void registerMessageListener (final MessageListenerConcurrently messageListener) :注册并发事件监听器void registerMessageListener (final MessageListenerOrderly messageListener) :注册顺序消息事件监听器void subscribe (final String topic, final String subExpression) :基于主题订阅消息,消息过滤使用表达式void subscribe (final String topic, final String fullClassName,final String filterClassSource) :基于主题订阅消息,消息过滤使用类模式void subscribe (final String topic, final MessageSelector selector) :订阅消息,并指定队列选择器void unsubscribe (final String topic) :取消消息订阅
DefaultMQPushConsumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private String consumerGroup; private MessageModel messageModel = MessageModel.CLUSTERING; private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;private AllocateMessageQueueStrategy allocateMessageQueueStrategy;private Map<String , String > subscription = new HashMap<String, String>();private MessageListener messageListener;private OffsetStore offsetStore;private int consumeThreadMin = 20 ;private int consumeThreadMax = 20 ;private int consumeConcurrentlyMaxSpan = 2000 ;private int pullThresholdForQueue = 1000 ;private long pullInterval = 0 ;private int pullBatchSize = 32 ;private int consumeMessageBatchMaxSize = 1 ;private boolean postSubscriptionWhenPull = false ;private int maxReconsumeTimes = -1 ;private long consumeTimeout = 15 ;
消费者启动流程
代码:DefaultMQPushConsumerImpl#start
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 public synchronized void start () throws MQClientException { switch (this .serviceState) { case CREATE_JUST: this .defaultMQPushConsumer.getMessageModel(), this .defaultMQPushConsumer.isUnitMode()); this .serviceState = ServiceState.START_FAILED; this .checkConfig(); this .copySubscription(); if (this .defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this .defaultMQPushConsumer.changeInstanceNameToPID(); } this .mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this .defaultMQPushConsumer, this .rpcHook); this .rebalanceImpl.setConsumerGroup(this .defaultMQPushConsumer.getConsumerGroup()); this .rebalanceImpl.setMessageModel(this .defaultMQPushConsumer.getMessageModel()); this .rebalanceImpl.setAllocateMessageQueueStrategy(this .defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this .rebalanceImpl.setmQClientFactory(this .mQClientFactor this .pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookLis if (this .defaultMQPushConsumer.getOffsetStore() != null ) { this .offsetStore = this .defaultMQPushConsumer.getOffsetStore(); } else { switch (this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this .offsetStore = new LocalFileOffsetStore(this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; case CLUSTERING: this .offsetStore = new RemoteBrokerOffsetStore(this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; default : break ; } this .defaultMQPushConsumer.setOffsetStore(this .offsetStore); } this .offsetStore.load if (this .getMessageListenerInner() instanceof MessageListenerOrderly) { this .consumeOrderly = true ; this .consumeMessageService = new ConsumeMessageOrderlyService(this , (MessageListenerOrderly) this .getMessageListenerInner()); } else if (this .getMessageListenerInner() instanceof MessageListenerConcurrently) { this .consumeOrderly = false ; this .consumeMessageService = new ConsumeMessageConcurrentlyService(this , (MessageListenerConcurrently) this .getMessageListenerInner()); } this .consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this .defaultMQPushConsumer.getConsumerGroup(), this ); if (!registerOK) { this .serviceState = ServiceState.CREATE_JUST; this .consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this .defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null ); mQClientFactory.start(); log.info("the consumer [{}] start OK." , this .defaultMQPushConsumer.getConsumerGroup()); this .serviceState = ServiceState.RUNNING; break ; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this .serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null ); default : break ; } this .updateTopicSubscribeInfoWhenSubscriptionChanged(); this .mQClientFactory.checkClientInBroker(); this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this .mQClientFactory.rebalanceImmediately(); }
消息拉取 1)PullMessageService实现机制
RocketMQ使用一个单独的线程PullMessageService,负责消息的拉取
代码:PullMessageService#run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void run () { log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { PullRequest pullRequest = this .pullRequestQueue.take(); this .pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception" , e); } } log.info(this .getServiceName() + " service end" ); }
PullRequest
1 2 3 4 5 private String consumerGroup; private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset; private boolean lockedFirst = false ;
代码:PullMessageService#pullMessage
1 2 3 4 5 6 7 8 9 10 11 12 private void pullMessage (final PullRequest pullRequest) { final MQConsumerInner consumer = this .mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null ) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it" , pullRequest); } }
2)ProcessQueue实现机制
ProcessQueue是MessageQueue在消费端的重现、快照
从Broker默认每次拉取32条消息,按照消息的队列偏移量顺序存放在ProcessQueue,然后将消息提交到Consumer消费线程池,消息成功消费后从ProcessQueue中移除
属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();private final AtomicLong msgCount = new AtomicLong();private volatile long queueOffsetMax = 0L ;private volatile boolean dropped = false ;private volatile long lastPullTimestamp = System.currentTimeMillis();private volatile long lastConsumeTimestamp = System.currentTimeMillis();
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void cleanExpiredMsg (DefaultMQPushConsumer pushConsumer) public boolean putMessage (final List<MessageExt> msgs) public long getMaxSpan () public long removeMessage (final List<MessageExt> msgs) public void rollback () public long commit () public void makeMessageToCosumeAgain (List<MessageExt> msgs) public List<MessageExt> takeMessags (final int batchSize)
3)消息拉取基本流程 1.客户端发起拉取请求
代码:DefaultMQPushConsumerImpl#pullMessage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public void pullMessage (final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped." , pullRequest.toString()); return ; } pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); try { this .makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok" , e); this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); return ; } if (this .isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}" , this .defaultMQPushConsumer.getInstanceName(), this .defaultMQPushConsumer.getConsumerGroup()); this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return ; } long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024 ); if (cachedMessageCount > this .defaultMQPushConsumer.getPullThresholdForQueue()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000 ) == 0 ) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}" , this .defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return ; } if (cachedMessageSizeInMiB > this .defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000 ) == 0 ) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}" , this .defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return ; } final SubscriptionData subscriptionData = this .rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); log.warn("find the consumer's subscription failed, {}" , pullRequest); return ; this .pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this .defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); }
2.消息服务端Broker组装消息
代码:PullMessageProcessor#processRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 MessageFilter messageFilter; if (this .brokerController.getBrokerConfig().isFilterSupportRetry()) { messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, this .brokerController.getConsumerFilterManager()); } else { messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, this .brokerController.getConsumerFilterManager()); } final GetMessageResult getMessageResult = this .brokerController.getMessageStore().getMessage( requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter );
代码:DefaultMessageStore#getMessage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; long nextBeginOffset = offset; long minOffset = 0 ; long maxOffset = 0 ; GetMessageResult getResult = new GetMessageResult(); final long maxOffsetPy = this .commitLog.getMaxOffset(); ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); ... minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); if (maxOffset == 0 ) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0 ); } else if (offset < minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = nextOffsetCorrection(offset, offset); } else if (offset > maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 == minOffset) { nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else { nextBeginOffset = nextOffsetCorrection(offset, maxOffset); } } ... SelectMappedBufferResult selectResult = this .commitLog.getMessage(offsetPy, sizePy);
代码:PullMessageProcessor#processRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 response.setRemark(getMessageResult.getStatus().name()); responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); responseHeader.setMaxOffset(getMessageResult.getMaxOffset()); switch (this .brokerController.getMessageStoreConfig().getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: break ; case SLAVE: if (!this .brokerController.getBrokerConfig().isSlaveReadEnable()) { response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } break ; } ... switch (getMessageResult.getStatus()) { case FOUND: response.setCode(ResponseCode.SUCCESS); break ; case MESSAGE_WAS_REMOVING: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); break ; case NO_MATCHED_LOGIC_QUEUE: case NO_MESSAGE_IN_QUEUE: if (0 != requestHeader.getQueueOffset()) { response.setCode(ResponseCode.PULL_OFFSET_MOVED); requestHeader.getQueueOffset(), getMessageResult.getNextBeginOffset(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup() ); } else { response.setCode(ResponseCode.PULL_NOT_FOUND); } break ; case NO_MATCHED_MESSAGE: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); break ; case OFFSET_FOUND_NULL: response.setCode(ResponseCode.PULL_NOT_FOUND); break ; case OFFSET_OVERFLOW_BADLY: response.setCode(ResponseCode.PULL_OFFSET_MOVED); log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}" , requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress()); break ; case OFFSET_OVERFLOW_ONE: response.setCode(ResponseCode.PULL_NOT_FOUND); break ; case OFFSET_TOO_SMALL: response.setCode(ResponseCode.PULL_OFFSET_MOVED); requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), getMessageResult.getMinOffset(), channel.remoteAddress()); break ; default : assert false ; break ; } ... boolean storeOffsetEnable = brokerAllowSuspend;storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; storeOffsetEnable = storeOffsetEnable && this .brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; if (storeOffsetEnable) { this .brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); }
3.消息拉取客户端处理消息
代码:MQClientAPIImpl#processPullResponse
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private PullResult processPullResponse ( final RemotingCommand response) throws MQBrokerException, RemotingCommandException { PullStatus pullStatus = PullStatus.NO_NEW_MSG; switch (response.getCode()) { case ResponseCode.SUCCESS: pullStatus = PullStatus.FOUND; break ; case ResponseCode.PULL_NOT_FOUND: pullStatus = PullStatus.NO_NEW_MSG; break ; case ResponseCode.PULL_RETRY_IMMEDIATELY: pullStatus = PullStatus.NO_MATCHED_MSG; break ; case ResponseCode.PULL_OFFSET_MOVED: pullStatus = PullStatus.OFFSET_ILLEGAL; break ; default : throw new MQBrokerException(response.getCode(), response.getRemark()); } PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), responseHeader.getMaxOffset(), null , responseHeader.getSuggestWhichBrokerId(), response.getBody()); }
PullResult类
1 2 3 4 5 private final PullStatus pullStatus; private final long nextBeginOffset; private final long minOffset; private final long maxOffset; private List<MessageExt> msgFoundList;
代码:DefaultMQPushConsumerImpl$PullCallback#OnSuccess
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());DefaultMQPushConsumerImpl.this .consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this .defaultMQPushConsumer.getPullInterval() > 0 ) { DefaultMQPushConsumerImpl.this .executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this .defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); }
4.消息拉取总结
4)消息拉取长轮询机制分析
RocketMQ的Push模式是循环向消息服务端发起消息拉取请求
如果不启用长轮询机制,拉取未到达消费队列的消息时,会等待shortPollingTimeMills时间后(挂起)再去判断消息是否已经到达指定消息队列,如果消息仍未到达则提示拉取Consumer PULL—NOT—FOUND(消息不存在)
如果开启长轮询模式,Broker会每隔5s轮询检查一次消息是否可达,有消息达到后立马通知挂起线程再次验证消息是否是自己感兴趣的消息
是则从CommitLog文件提取消息返回给Consumer
否则挂起直到超时,才给Consumer响应。超时时间由Consumer封装在请求参数中,PUSH模式为15s,PULL模式通过DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis设置
RocketMQ通过在Broker客户端配置longPollingEnable为true开启长轮询模式
代码:PullMessageProcessor#processRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 case ResponseCode.PULL_NOT_FOUND: if (brokerAllowSuspend && hasSuspendFlag) { long pollingTimeMills = suspendTimeoutMillisLong; if (!this .brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this .brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this .brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); this .brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null ; break ; }
PullRequestHoldService方式实现长轮询
代码:PullRequestHoldService#suspendPullRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void suspendPullRequest (final String topic, final int queueId, final PullRequest pullRequest) { String key = this .buildKey(topic, queueId); ManyPullRequest mpr = this .pullRequestTable.get(key); if (null == mpr) { mpr = new ManyPullRequest(); ManyPullRequest prev = this .pullRequestTable.putIfAbsent(key, mpr); if (prev != null ) { mpr = prev; } } mpr.addPullRequest(pullRequest); }
代码:PullRequestHoldService#run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void run () { log.info("{} service started" , this .getServiceName()); while (!this .isStopped()) { try { if (this .brokerController.getBrokerConfig().isLongPollingEnable()) { this .waitForRunning(5 * 1000 ); } else { this .waitForRunning(this .brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this .systemClock.now(); this .checkHoldRequest(); long costTime = this .systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000 ) { log.info("[NOTIFYME] check hold request cost {} ms." , costTime); } } catch (Throwable e) { log.warn(this .getServiceName() + " service has exception. " , e); } } log.info("{} service end" , this .getServiceName()); }
代码:PullRequestHoldService#checkHoldRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void checkHoldRequest () { for (String key : this .pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0 ]; int queueId = Integer.parseInt(kArray[1 ]); final long offset = this .brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); try { this .notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { log.error("check hold request failed. topic={}, queueId={}" , topic, queueId, e); } } } }
代码:PullRequestHoldService#notifyMessageArriving
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 if (newestOffset > request.getPullFromThisOffset()) { boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); if (match && properties != null ) { match = request.getMessageFilter().isMatchedByCommitLog(null , properties); } if (match) { try { this .brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed." , e); } continue ; } } if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { this .brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed." , e); } continue ; }
**DefaultMessageStore$ReputMessageService机制 **(无长轮询)
代码:DefaultMessageStore#start
1 2 3 this .reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);this .reputMessageService.start();
代码:DefaultMessageStore$ReputMessageService#run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void run () { DefaultMessageStore.log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { Thread.sleep(1 ); this .doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this .getServiceName() + " service has exception. " , e); } } DefaultMessageStore.log.info(this .getServiceName() + " service end" ); }
代码:DefaultMessageStore$ReputMessageService#deReput
1 2 3 4 5 6 7 8 if (BrokerRole.SLAVE != DefaultMessageStore.this .getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this .brokerConfig.isLongPollingEnable()) { DefaultMessageStore.this .messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1 , dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); }
代码:NotifyMessageArrivingListener#arriving
1 2 3 4 5 public void arriving (String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, byte [] filterBitMap, Map<String, String> properties) { this .pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties); }
消息队列负载与重新分布机制
RocketMQ消息队列重新分配由RebalanceService线程实现
一个MQClientInstance持有一个RebalanceService,随着MQClientInstance的启动而启动
代码:RebalanceService#run
1 2 3 4 5 6 7 8 9 10 public void run () { log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { this .waitForRunning(waitInterval); this .mqClientFactory.doRebalance(); } log.info(this .getServiceName() + " service end" ); }
代码:MQClientInstance#doRebalance
1 2 3 4 5 6 7 8 9 10 11 12 13 public void doRebalance () { for (Map.Entry<String, MQConsumerInner> entry : this .consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null ) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception" , e); } } } }
代码:RebalanceImpl#doRebalance
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void doRebalance (final boolean isOrder) { Map<String, SubscriptionData> subTable = this .getSubscriptionInner(); if (subTable != null ) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this .rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception" , e); } } } } this .truncateMessageQueueNotMyTopic(); }
代码:RebalanceImpl#rebalanceByTopic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 Set<MessageQueue> mqSet = this .topicSubscribeInfoTable.get(topic); List<String> cidAll = this .mQClientFactory.findConsumerIdList(topic, consumerGroup); if (mqSet != null && cidAll != null ) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this .allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null ; try { allocateResult = strategy.allocate( this .consumerGroup, this .mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}" , strategy.getName(), e); return ; }
RocketMQ默认提供5种负载均衡分配算法——但是,一个Consumer可以分配到多个队列,但同一个消息队列只会分配给一个Consumer,如果Consumer个数大于消息队列数量,则有些Consumer无法消费消息
1 2 3 4 5 6 7 8 9 10 11 12 AllocateMessageQueueAveragely:平均分配 举例:8 个队列q1,q2,q3,q4,q5,a6,q7,q8,消费者3 个:c1,c2,c3 分配如下: c1:q1,q2,q3 c2:q4,q5,a6 c3:q7,q8 AllocateMessageQueueAveragelyByCircle:平均轮询分配 举例:8 个队列q1,q2,q3,q4,q5,a6,q7,q8,消费者3 个:c1,c2,c3 分配如下: c1:q1,q4,q7 c2:q2,q5,a8 c3:q3,q6
消息消费过程
PullMessageService负责拉取消息队列,从远端Broker拉取消息后将消息存储ProcessQueue(消息队列处理队列)
调用ConsumeMessageService#submitConsumeRequest进行消息消费——使用线程池消费消息,确保消息拉取与消息消费的解耦
ConsumeMessageService支持顺序消息和并发消息,核心类图如下:
并发消息消费
代码:ConsumeMessageConcurrentlyService#submitConsumeRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 final int consumeBatchSize = this .defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this .consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this .submitConsumeRequestLater(consumeRequest); } }else { for (int total = 0 ; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0 ; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break ; } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this .consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); this .submitConsumeRequestLater(consumeRequest); } } }
代码:ConsumeMessageConcurrentlyService$ConsumeRequest#run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 if (this .processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}" , ConsumeMessageConcurrentlyService.this .consumerGroup, this .messageQueue); return ; } ... if (ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap<String, String>()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false ); ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } ... status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); if (ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); }
定时消息机制
消息发送到Broker后,不立即被Consumer消费,等到特定的时间后才能被消费
RocketMQ不支持任意的时间精度——要支持任意时间精度定时调度,需要在Broker层做消息排序,再加上持久化,将带来巨大的性能消耗
消息延迟级别在Broker端通过messageDelayLevel配置,默认为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,delayLevel=1表示延迟消息1s
定时消息实现类为ScheduleMessageService,该类在DefaultMessageStore中创建
通过在DefaultMessageStore中调用load方法加载该类并调用start方法启动
代码:ScheduleMessageService#load
1 2 3 4 5 6 public boolean load () { boolean result = super .load(); result = result && this .parseDelayLevel(); return result; }
代码:ScheduleMessageService#start
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 for (Map.Entry<Integer, Long> entry : this .delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this .offsetTable.get(level); if (null == offset) { offset = 0L ; } if (timeDelay != null ) { this .timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } this .timer.scheduleAtFixedRate(new TimerTask() { @Override public void run () { try { if (started.get()) ScheduleMessageService.this .persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception" , e); } } }, 10000 , this .defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
调度机制
ScheduleMessageService的start方法启动后,为每一个延迟级别创建一个调度任务,每一个延迟级别对应SCHEDULE_TOPIC_XXXX主题下的一个消息消费队列
定时调度任务的实现类为DeliverDelayedMessageTimerTask,核心实现方法为executeOnTimeup
代码:ScheduleMessageService$DeliverDelayedMessageTimerTask#executeOnTimeup
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 ConsumeQueue cq = ScheduleMessageService.this .defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); ... SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this .offset); ... for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); if (cq.isExtAddr(tagsCode)) { if (cq.getExt(tagsCode, cqExtUnit)) { tagsCode = cqExtUnit.getTagsCode(); } else { log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}" , tagsCode, offsetPy, sizePy); long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); } } long now = System.currentTimeMillis(); long deliverTimestamp = this .correctDeliverTimestamp(now, tagsCode); ... MessageExt msgExt = ScheduleMessageService.this .defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); }
顺序消息
实现类是org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService
代码:ConsumeMessageOrderlyService#start
1 2 3 4 5 6 7 8 9 10 11 public void start () { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this .defaultMQPushConsumerImpl.messageModel())) { this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { ConsumeMessageOrderlyService.this .lockMQPeriodically(); } }, 1000 * 1 , ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }
代码:ConsumeMessageOrderlyService#submitConsumeRequest
1 2 3 4 5 6 7 8 9 10 11 public void submitConsumeRequest ( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this .consumeExecutor.submit(consumeRequest); } }
代码:ConsumeMessageOrderlyService$ConsumeRequest#run
1 2 3 4 5 6 7 8 9 10 if (this .processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}" , this .messageQueue); return ; } final Object objLock = messageQueueLock.fetchLockObject(this .messageQueue);synchronized (objLock) { ... }
小结
RocketMQ消息消费分别为集群模式、广播模式
消息队列负载:由RebalanceService线程默认每隔20s进行一次消息队列负载,根据当前消费者组内Consumer个数与Topic队列数量,按照某一种负载算法进行队列分配
消息拉取:由PullMessageService线程根据RebalanceService线程创建的拉取任务进行拉取,默认每次拉取32条消息,提交给Consumer消费线程后继续下一次消息拉取。如果消息消费过慢产生消息堆积会触发消息消费拉取流控
并发消息消费:消费线程池中的线程可以并发对同一个消息队列的消息进行消费,消费成功后,取出消息队列中最小的消息偏移量作为消费进度偏移量存储于消费进度存储文件中
集群模式:消息消费进度存储在Broker(消息服务器)
广播模式:消息消费进度存储在Consumer
RocketMQ不支持任意精度的定时调度消息,只支持自定义的消息延迟级别
顺序消息一般使用集群模式,Consumer内的线程池中的线程对消息消费队列只能串行消费,和并发消息消费最本质的区别是消息消费时必须锁定消息消费队列 ,Broker会存储消息消费队列的锁占用情况