RocketMQ 整理
MQ
先进先出
应用:
应用解耦:下游系统故障时,向下游发送的数据被缓存到消息队列,对外接口正常返回。下游系统系统回复后,补充处理存在消息队列中的消息即可
队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。
削峰:将大量请求缓存起来,分散到很长一段时间处理
数据分发:数据产生方不关心谁使用数据,只将数据发送到消息队列,使用方从消息队列直接获取数据即可
缺点:
- 可用性降低(外部依赖越多,稳定性越差)——如何保证MQ高可用?
- 复杂度提高(从同步转为异步)——消息的顺序性?消息丢失怎么办?消息不被重复消费?
- 消息的一致性?——BC同时拉取数据,B成功执行,C执行失败,如何保证处理进度的一致性?
主流MQ
单节点、集群搭建
- 下载地址
- 目录结构:
- bin:启动脚本,包括shell脚本和CMD脚本
- conf:实例配置文件 ,包括broker配置文件、logback配置文件
- lib:依赖jar包,包括Netty、commons-lang、FastJSON等
单节点
1 | # 1.启动NameServer |
RocketMQ默认的虚拟机内存较大,可能需要修改JVM内存大小(runbroker.sh和runserver.sh中修改
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
)bash命令:
1
2
3
4
5
6
7
8
9
10
11
12# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker
集群
集群角色
Producer:消息的发送者
- Producer与NameServer集群中的任意节点建立长连接,定期从NameServer取Topic路由信息
- Producer向提供Topic服务的Master Broker建立长连接,定时向Master发送心跳
- Producer完全无状态,可集群部署
Consumer:消息接收者
- Consumer与NameServer集群中的任意节点建立长连接,定期从NameServer取Topic路由信息
- Consumer向提供Topic服务的Master Broker、Slave Broker建立长连接,定时向Master、Slave发送心跳
- Consumer既可以从Master订阅消息,也可以从Slave订阅消息
- 一个消费者只可以订阅和消费一种Topic的消息
Broker:暂存和传输消息
- 分为Master与Slave,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId定义
- BrokerId为0表示Master,非0表示Slave
- Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer
NameServer:管理Broker
- 几乎无状态节点,可集群部署,节点之间无任何信息同步
Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic
Message Queue:Topic的分区;用于并行发送和接收消息
Tag:为消息设置的Tag,用于同一主题下区分不同类型的消息。 来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性
GroupName:
在集群的情况下,一个生产者down之后,可以继续联系该组下的另外一个生产者实例,不至于导致业务走不下去。在消费者组中,可以实现消息消费的负载均衡和消息容错目标
有了GroupName,在集群下,只需要在新加的机器中,配置相同的GroupName。启动后,能加入到所在的群组中,参与消息生产或消费
同一个消费者组下的消费者,不能同时消费同一个queue
一个节点下,一个topic加一个tag可以对应一个consumer。一个消费者组就是横向上多个节点的相同consumer为一个消费组
搭建方式
单Master:只有一个Broker存储消息
多Master:多个Broker存储消息
- 单一个Broker宕机时未消费的消息
- 磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高
多Master多Slave
- 异步:异步复制,主备有短暂消息延迟(毫秒级);性能同多Master模式几乎一样;Master宕机,磁盘损坏情况下会丢失少量消息
- 同步:只有主备都写成功,才向应用返回成功;服务可用性与数据可用性都非常高;性能比异步复制模式略低
(同步双写)搭建
- 工作流程:
- 启动NameServer,NameServer监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心
- Broker启动,与所有NameServer保持长连接,定时发送心跳——心跳包含当前Broker信息(IP+端口等)以及存储的Topic信息。注册成功后,NameServer集群记录Topic跟Broker的映射关系
- 收发消息前,创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic
- Producer发送消息,启动时与一个NameServer建立长连接,从NameServer获取Topic所在的Broker列表,与一个Broker建立长连接从而向Broker发消息
- Consumer与一台NameServer建立长连接,获取Topic和Broker的映射,直接跟Broker建立连接通道,开始消费消息
- 工作流程:
Broker配置
1)master1
服务器:192.168.25.135
1 | vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a.properties |
修改配置如下:
1 | #所属集群名字 |
2)slave2
服务器:192.168.25.135
1 | vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties |
修改配置如下:
1 | #所属集群名字 |
3)master2
服务器:192.168.25.138
1 | vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b.properties |
修改配置如下:
1 | #所属集群名字 |
4)slave1
服务器:192.168.25.138
1 | vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties |
修改配置如下:
1 | #所属集群名字 |
mqadmin管理工具
- 进入RocketMQ,bin目录下执行
./mqadmin {command} {args}
1)Topic相关
名称 | 含义 | 命令选项 | 说明 |
updateTopic | 创建更新Topic配置 | -b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port |
-c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询) | ||
-h- | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
-p | 指定新topic的读写权限( W=2|R=4|WR=6 ) | ||
-r | 可读队列数(默认为 8) | ||
-w | 可写队列数(默认为 8) | ||
-t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) | ||
deleteTopic | 删除Topic | -c | cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询) |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) | ||
topicList | 查看 Topic 列表信息 | -h | 打印帮助 |
-c | 不配置-c只返回topic列表,增加-c返回clusterName, topic, consumerGroup信息,即topic的所属集群和订阅关系,没有参数 | ||
-n | NameServer 服务地址,格式 ip:port | ||
topicRoute | 查看 Topic 路由信息 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
topicStatus | 查看 Topic 消息队列offset | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
topicClusterList | 查看 Topic 所在集群列表 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
updateTopicPerm | 更新 Topic 读写权限 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port | ||
-p | 指定新 topic 的读写权限( W=2|R=4|WR=6 ) | ||
-c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b优先,如果没有-b,则对集群中所有Broker执行命令 | ||
updateOrderConf | 从NameServer上创建、删除、获取特定命名空间的kv配置,目前还未启用 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic,键 | ||
-v | orderConf,值 | ||
-m | method,可选get、put、delete | ||
allocateMQ | 以平均负载算法计算消费者列表负载消息队列的负载结果 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-i | ipList,用逗号分隔,计算这些ip去负载Topic的消息队列 | ||
statsAll | 打印Topic订阅关系、TPS、积累量、24h读写总量等信息 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-a | 是否只打印活跃topic | ||
-t | 指定topic |
2)集群相关
名称 | 含义 | 命令选项 | 说明 |
clusterList | 查看集群信息,集群、BrokerName、BrokerId、TPS等信息 | -m | 打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday) |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-i | 打印间隔,单位秒 | ||
clusterRT | 发送消息检测集群各Broker RT。消息发往${BrokerName} Topic。 | -a | amount,每次探测的总数,RT = 总时间 / amount |
-s | 消息大小,单位B | ||
-c | 探测哪个集群 | ||
-p | 是否打印格式化日志,以|分割,默认不打印 | ||
-h | 打印帮助 | ||
-m | 所属机房,打印使用 | ||
-i | 发送间隔,单位秒 | ||
-n | NameServer 服务地址,格式 ip:port |
3)Broker相关
名称 | 含义 | 命令选项 | 说明 |
updateBrokerConfig | 更新 Broker 配置文件,会修改Broker.conf | -b | Broker 地址,格式为ip:port |
-c | cluster 名称 | ||
-k | key 值 | ||
-v | value 值 | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
brokerStatus | 查看 Broker 统计信息、运行状态(你想要的信息几乎都在里面) | -b | Broker 地址,地址为ip:port |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
brokerConsumeStats | Broker中各个消费者的消费情况,按Message Queue维度返回Consume Offset,Broker Offset,Diff,TImestamp等信息 | -b | Broker 地址,地址为ip:port |
-t | 请求超时时间 | ||
-l | diff阈值,超过阈值才打印 | ||
-o | 是否为顺序topic,一般为false | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
getBrokerConfig | 获取Broker配置 | -b | Broker 地址,地址为ip:port |
-n | NameServer 服务地址,格式 ip:port | ||
wipeWritePerm | 从NameServer上清除 Broker写权限 | -b | Broker 地址,地址为ip:port |
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
cleanExpiredCQ | 清理Broker上过期的Consume Queue,如果手动减少对列数可能产生过期队列 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | Broker 地址,地址为ip:port | ||
-c | 集群名称 | ||
cleanUnusedTopic | 清理Broker上不使用的Topic,从内存中释放Topic的Consume Queue,如果手动删除Topic会产生不使用的Topic | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | Broker 地址,地址为ip:port | ||
-c | 集群名称 | ||
sendMsgStatus | 向Broker发消息,返回发送状态和RT | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | BrokerName,注意不同于Broker地址 | ||
-s | 消息大小,单位B | ||
-c | 发送次数 |
4)消息相关
名称 | 含义 | 命令选项 | 说明 |
queryMsgById | 根据offsetMsgId查询msg,如果使用开源控制台,应使用offsetMsgId,此命令还有其他参数,具体作用请阅读QueryMsgByIdSubCommand。 | -i | msgId |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
queryMsgByKey | 根据消息 Key 查询消息 | -k | msgKey |
-t | Topic 名称 | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
queryMsgByOffset | 根据 Offset 查询消息 | -b | Broker 名称,(这里需要注意 填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到) |
-i | query 队列 id | ||
-o | offset 值 | ||
-t | topic 名称 | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
queryMsgByUniqueKey | 根据msgId查询,msgId不同于offsetMsgId,区别详见常见运维问题。-g,-d配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-i | uniqe msg id | ||
-g | consumerGroup | ||
-d | clientId | ||
-t | topic名称 | ||
checkMsgSendRT | 检测向topic发消息的RT,功能类似clusterRT | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-a | 探测次数 | ||
-s | 消息大小 | ||
sendMessage | 发送一条消息,可以根据配置发往特定Message Queue,或普通发送。 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-p | body,消息体 | ||
-k | keys | ||
-c | tags | ||
-b | BrokerName | ||
-i | queueId | ||
consumeMessage | 消费消息。可以根据offset、开始&结束时间戳、消息队列消费消息,配置不同执行不同消费逻辑,详见ConsumeMessageCommand。 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-b | BrokerName | ||
-o | 从offset开始消费 | ||
-i | queueId | ||
-g | 消费者分组 | ||
-s | 开始时间戳,格式详见-h | ||
-d | 结束时间戳 | ||
-c | 消费多少条消息 | ||
printMsg | 从Broker消费消息并打印,可选时间段 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-c | 字符集,例如UTF-8 | ||
-s | subExpress,过滤表达式 | ||
-b | 开始时间戳,格式参见-h | ||
-e | 结束时间戳 | ||
-d | 是否打印消息体 | ||
printMsgByQueue | 类似printMsg,但指定Message Queue | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-i | queueId | ||
-a | BrokerName | ||
-c | 字符集,例如UTF-8 | ||
-s | subExpress,过滤表达式 | ||
-b | 开始时间戳,格式参见-h | ||
-e | 结束时间戳 | ||
-p | 是否打印消息 | ||
-d | 是否打印消息体 | ||
-f | 是否统计tag数量并打印 | ||
resetOffsetByTime | 按时间戳重置offset,Broker和consumer都会重置 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-g | 消费者分组 | ||
-t | topic名称 | ||
-s | 重置为此时间戳对应的offset | ||
-f | 是否强制重置,如果false,只支持回溯offset,如果true,不管时间戳对应offset与consumeOffset关系 | ||
-c | 是否重置c++客户端offset |
5)消费者、消费组相关
名称 | 含义 | 命令选项 | 说明 |
consumerProgress | 查看订阅组消费状态,可以查看具体的client IP的消息积累量 | -g | 消费者所属组名 |
-s | 是否打印client IP | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
consumerStatus | 查看消费者状态,包括同一个分组中是否都是相同的订阅,分析Process Queue是否堆积,返回消费者jstack结果,内容较多,使用者参见ConsumerStatusSubCommand | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-g | consumer group | ||
-i | clientId | ||
-s | 是否执行jstack | ||
getConsumerStatus | 获取 Consumer 消费进度 | -g | 消费者所属组名 |
-t | 查询主题 | ||
-i | Consumer 客户端 ip | ||
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
updateSubGroup | 更新或创建订阅关系 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | Broker地址 | ||
-c | 集群名称 | ||
-g | 消费者分组名称 | ||
-s | 分组是否允许消费 | ||
-m | 是否从最小offset开始消费 | ||
-d | 是否是广播模式 | ||
-q | 重试队列数量 | ||
-r | 最大重试次数 | ||
-i | 当slaveReadEnable开启时有效,且还未达到从slave消费时建议从哪个BrokerId消费,可以配置备机id,主动从备机消费 | ||
-w | 如果Broker建议从slave消费,配置决定从哪个slave消费,配置BrokerId,例如1 | ||
-a | 当消费者数量变化时是否通知其他消费者负载均衡 | ||
deleteSubGroup | 从Broker删除订阅关系 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | Broker地址 | ||
-c | 集群名称 | ||
-g | 消费者分组名称 | ||
cloneGroupOffset | 在目标群组中使用源群组的offset | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-s | 源消费者组 | ||
-d | 目标消费者组 | ||
-t | topic名称 | ||
-o | 暂未使用 |
6)连接相关
名称 | 含义 | 命令选项 | 说明 |
consumerConnec tion | 查询 Consumer 的网络连接 | -g | 消费者所属组名 |
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
producerConnec tion | 查询 Producer 的网络连接 | -g | 生产者所属组名 |
-t | 主题名称 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 |
7)NameServer相关
名称 | 含义 | 命令选项 | 说明 |
updateKvConfig | 更新NameServer的kv配置,目前还未使用 | -s | 命名空间 |
-k | key | ||
-v | value | ||
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
deleteKvConfig | 删除NameServer的kv配置 | -s | 命名空间 |
-k | key | ||
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
getNamesrvConfig | 获取NameServer配置 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
updateNamesrvConfig | 修改NameServer配置 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-k | key | ||
-v | value |
消息发送消费范例
- MQ客户端依赖
1 | <dependency> |
发送者
- 创建消息生产者producer,制定生产者组名
- 指定Nameserver地址
- 启动producer
- 创建消息对象,指定主题Topic、Tag和消息体
- 发送消息
- 关闭生产者
消费者
- 创建消费者Consumer,制定消费者组名
- 指定Nameserver地址
- 订阅主题Topic和Tag
- 设置回调函数,处理消息
- 启动消费者
基本范例
消息发送
发送同步消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest", /* Topic */
"TagA", /* Tag */
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}发送异步消息:异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public class AsyncProducer {
public static void main(String[] args) throws Exception {
// ...
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
producer.shutdown();
}
}发送单向消息:主要用在不特别关心发送结果的场景,例如日志发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
producer.shutdown();
}
}
消息消费
DefaultMQPushConsumer:系统控制读取操作,系统会自动根据用户设置参数进行消息处理,自动保存Offset、做负载均衡等
DefaultMQPullConsumer:使用者自主控制读取操作
负载均衡模式:多个消费者共同消费队列消息,每个消费者处理的消息不同
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("Test", "*");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
}广播模式:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("Test", "*");
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
顺序消息
按照消息的发送顺序来消费(FIFO)——RocketMQ可以保证消息有序,分为分区有序、全局有序
原理:默认情况下,采取Round Robin轮询方式把消息发送到不同的queue(分区队列),消费时从多个queue拉取消息。如果控制发送的消息只依次发送到同一个queue中,并只从这个queue上依次拉取消费,则为全局有序,否则为分区有序——相对每个queue消息都是有序的
分区有序的示例:订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列
消息生产
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List<OrderStep> orderList = new Producer().buildOrders();
Date date = new Date();
String dateStr = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
for (int i = 0; i < 10; i++) {
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId()); //订单id,即为arg
}
producer.shutdown();
}
private static class Order {
private long orderId;
private String desc;
}
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}顺序消费消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
延时消息
例如,提交一个订单就可以发送一个延时消息,1h后检查订单的状态,如果未付款就取消订单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
consumer.subscribe("TestTopic", "*");
// 注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}限制:现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
1
2// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
批量消息
批量发送消息显著提高传递小消息的性能,但这些批量消息有相同的topic,相同的waitStoreMsgOK,不能是延时消息,总大小不应超过4MB(超过则需要分割消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
}
// 分割消息
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
public boolean hasNext() {
return currIndex < messages.size();
}
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
}
}
过滤消息
- 可以根据tag来过滤消息,如下所示,消费者将消费TAGA或TAGB或TAGC的消息,但一个消息只能有一个标签
1 | DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); |
使用SQL表达式筛选消息——通过消息的属性运算,如下所示
1
2
3
4
5
6
7
8
9
10
11
12
13
14------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
SQL基本语法
数值比较,比如:**>,>=,<,<=,BETWEEN,=;**
字符比较,比如:**=,<>,IN;**
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
常量类型:
- 数值,比如:123,3.1415;
- 字符,比如:**’abc’,必须用单引号包裹起来;**
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
1 | public void subscribe(finalString topic, final MessageSelector messageSelector) |
消息生产者
通过
putUserProperty
来设置消息的属性1
2
3
4
5
6
7
8
9
10
11DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
消息消费者
用MessageSelector.bySql使用sql筛选消息
1
2
3
4
5
6
7
8
9
10DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
事务消息
流程:正常事务消息的发送及提交、事务消息的补偿
事务消息发送及提交
(1) 发送消息(half消息)
(2) 服务端响应消息写入结果
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
事务补偿:用于解决消息Commit或者Rollback发生超时或者失败的情况
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
事务消息状态:提交状态、回滚状态、中间状态
- TransactionStatus.CommitTransaction: 提交事务,允许消费者消费此消息
- TransactionStatus.RollbackTransaction: 回滚事务,代表该消息将被删除,不允许被消费
- TransactionStatus.Unknown: 中间状态,代表需要检查消息队列来确定状态
事务性生产者
- 使用
TransactionMQProducer
类创建生产者,并指定唯一的ProducerGroup
,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态如上所述 - 当发送半消息成功时,使用
executeLocalTransaction
方法来执行本地事务,该方法返回的三个事务状态之一。checkLocalTranscation
方法用于检查本地事务状态,并回应消息队列的检查请求,返回三个事务状态之一
1 | public class Producer { |
1 | public class TransactionListenerImpl implements TransactionListener { |
限制
- 事务消息不支持延时消息和批量消息
- 为了避免单个消息被检查太多次而导致半队列消息累积,默认将单个消息的检查次数限制为 15 次。用户通过 Broker 配置文件的
transactionCheckMax
参数修改此限制。如果检查某条消息超过 N 次( N =transactionCheckMax
) ,Broker 将丢弃此消息,并在默认情况下同时打印错误日志,可以重写AbstractTransactionCheckListener
类来修改这个行为 - 事务消息在 Broker 配置文件中参数 transactionMsgTimeout 时间长度之后被检查。用户可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于
transactionMsgTimeout
参数 - 事务性消息可能不止一次被检查或消费
- 如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享
- 事务消息允许反向查询、MQ服务器能通过生产者 ID 查询到消费者
高级功能
消息存储
分布式队列要求高可靠性,数据要持久化存储
- producer发送消息
- MQ收到消息,将消息进行持久化,存储中新增一条记录
- 返回ACK给producer
- MQ push 消息给对应的consumer,等待consumer返回ACK
- MQ在指定时间内收到consumer的ACK,从存储中删除消息,否则认为消息消费失败,尝试重新push消息
存储介质
- 关系型数据库:ActiveMQ(默认采用的KahaDB做消息存储)可通过JDBC持久化消息
- 文件系统:RocketMQ/Kafka/RabbitMQ 均是刷盘至所部署虚拟机/物理机的文件系统来持久化(分为异步刷盘和同步刷盘)——除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题,且刷盘速度更快
消息的存储和发送
1)消息存储
- RocketMQ的消息为顺序写,保证消息存储的速度(顺序写速度可达到600MB/s, 超过一般网卡的传输速度)
2)消息发送
文件操作、网络操作需要涉及用户态和内核态的切换,需要数据复制(以服务器发送文件到客户端为例)
- read:读取本地文件内容
- 从磁盘复制数据到内核态内存
- 从内核态内存复制到用户态内存
- write:将读取的内容通过网络发送
- 从用户态内存复制到网络驱动的内核态内存
- 从网络驱动的内核态内存复制到网卡进行传输
- read:读取本地文件内容
通过使用mmap省去向用户态的内存复制——Java中通过MappedByteBuffer实现(零拷贝)
- 一次只能映射1.5~2G 的文件至用户态的虚拟内存
- RocketMQ默认设置单个CommitLog日志数据文件为1G
消息存储结构
- 存储由ConsumeQueue和CommitLog配合完成
- 物理存储文件:CommitLog
- 消息的逻辑队列:ConsumeQueue(类似数据库的索引文件,存储指向物理存储的地址),每个Topic下的每个Message Queue都有一个ConsumeQueue
- IndexFile:一种通过key或时间区间来查询消息的方法,不影响发送与消费消息的主流程
刷盘机制
- 同步刷盘、异步刷盘——通过Broker配置文件的 flushDiskType 参数设置(SYNC_FLUSH、ASYNC_FLUSH)
1)同步刷盘
- 在返回写成功状态时,消息已经被写入磁盘
- 消息写入内存的PAGE CACHE后立刻通知刷盘线程刷盘, 刷盘完成后唤醒等待的线程,返回消息写成功
2)异步刷盘
- 返回写成功状态时,消息可能只是被写入了内存的PAGE CACHE
- 当内存里的消息量积累到一定程度时,统一触发写磁盘
高可用性机制
NameServer高可用
- NameServer 节点是无状态的,各个节点直接的数据一致
- 存在多个 NameServer 节点的情况下,部分 NameServer 不可用也可以保证 MQ 服务正常运行
Broker高可用
- RocketMQ分布式集群是通过Master-Slave实现高可用
- Broker配置文件参数 brokerId 为0表明是Master,大于0表明是 Slave
- Broker配置文件参数 brokerRole 说明是Master还是Slave
- Master 的 Broker 支持读和写,Slave仅支持读
消息消费高可用
- Consumer不需要设置读Master还是Slave,Master不可用时Consumer自动切换到读Slave
消息发送高可用
- 创建Topic时,把该Topic的多个MQ创建在多个Broker组上(相同Broker名称,不同 brokerId 的机器组成一个Broker组)
- 一个Broker组的Master不可用,其他组的Master仍然可用,Producer仍然可以发送消息
- 目前不支持Slave转Master,要手动停止Slave角色的Broker,更改配置文件,用新的配置文件启动Broker。
- rocketmq采用Broker故障延迟机制来规避故障的broker(深度解析RocketMQ消息发送的高可用设计)
消息主从复制
- 消息从Master复制到Slave上,同步、异步——通过Broker配置文件的brokerRole参数设置,可以设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个
1)同步复制
- 同步复制:Master和Slave均写成功后才反馈给客户端写成功
- 优点:Master出故障, Slave上有全部的备份数据,容易恢复;缺点:增大数据写入延迟,降低系统吞吐量
2)异步复制
- 异步复制:只要Master写成功即反馈给客户端写成功——吞吐量高,但有可能会丢数
负载均衡
Producer负载均衡
- 每个Producer实例会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。queue可以散落在不同的broker,消息就发送到不同的broker
Consumer负载均衡
集群模式
- 集群消费模式下,每条消息投递到订阅该topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式消费消息,拉取的时候需要明确指定拉取哪一条message queue
- 当实例的数量有变更,会触发一次所有实例的负载均衡——按照queue的数量和实例的数量平均分配queue给每个实例,默认的分配算法是AllocateMessageQueueAveragely
- 另一种平均算法是AllocateMessageQueueAveragelyByCircle,以环状轮流分queue
集群模式下,queue只允许分配一个实例,如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,会导致同一个消息在不同的实例下被消费多次
增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用
需要让queue的总数量大于等于consumer的数量
广播模式
- 广播模式下一条消息需要投递到一个消费组下面所有的消费者实例
- 所有consumer都分到所有的queue
消息重试
顺序消息的重试
- 消费者消费顺序消息失败后,RocketMQ不断进行消息重试(每次间隔 1 秒)——会出现消费阻塞
无序消息的重试
- 无序消息(普通、定时、延时、事务消息)消费失败时,可以通过设置返回状态实现消息重试
- 无序消息的重试只在集群消费生效;广播消费不提供失败重试
重试次数
- RocketMQ 默认允许每条消息最多重试 16 次(消息的 Message ID 不变)
- 消息重试 16 次后仍然失败,消息将不再投递
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分钟 |
2 | 30 秒 | 10 | 8 分钟 |
3 | 1 分钟 | 11 | 9 分钟 |
4 | 2 分钟 | 12 | 10 分钟 |
5 | 3 分钟 | 13 | 20 分钟 |
6 | 4 分钟 | 14 | 30 分钟 |
7 | 5 分钟 | 15 | 1 小时 |
8 | 6 分钟 | 16 | 2 小时 |
配置方式
- 集群消费方式下,需要在消息监听器接口的实现中明确配置:
- 返回 Action.ReconsumeLater (推荐)
- 返回 Null
- 抛出异常
1 | public class MessageListenerImpl implements MessageListener { |
- 如果不重试消息,需要捕获消费逻辑中可能抛出的异常,返回 Action.CommitMessage
1 | public class MessageListenerImpl implements MessageListener { |
可以自定义消息最大重试次数
最大重试次数小于等于 16 次,重试时间如上表所述
最大重试次数大于 16 次,超过 16 次的重试时间间隔为每次 2 小时
1
2
3
4Properties properties = new Properties();
//配置对应 Group ID 的最大消息重试次数为 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);
- 配置采用覆盖的方式生效,最后启动的 Consumer 实例会覆盖之前的启动实例的配置
消费者可获取消息的重试次数:
1
2
3
4
5
6
7
8public class MessageListenerImpl implements MessageListener {
public Action consume(Message message, ConsumeContext context) {
//获取消息的重试次数
message.getReconsumeTimes();
...
}
}
死信队列
消费失败的消息(死信消息(Dead-Letter Message)),RocketMQ 不会立刻将消息丢弃,而是发送到该消费者对应的特殊队列(死信队列(Dead-Letter Queue))
死信不会再被正常消费吗,三天后自动删除
- 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例,且包含其产生的所有死信消息。一个 Group ID 未产生死信消息,RocketMQ 不会为其创建相应的死信队列
- 可以在 RocketMQ 控制台重新发送该消息
消费幂等
消息有可能会出现重复
发送/消费时消息重复:消息已被成功发送到服务端并完成持久化/消息已经成功被消费,但因为网络问题没有应答,此时重复发送消息
负载均衡时消息重复:RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,消费者可能会收到重复消息
Message ID 有可能出现冲突(重复),因此不建议以 Message ID 作为处理依据。 最好以业务唯一标识作为幂等处理的依据(可以通过消息 Key 设置)
1
2
3
4
5
6
7
8
9
10
11Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
// 幂等处理
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根据业务唯一标识的 key 做幂等处理
}
});