RocketMQ

RocketMQ 整理

MQ

  • 先进先出

  • 应用:

    • 应用解耦:下游系统故障时,向下游发送的数据被缓存到消息队列,对外接口正常返回。下游系统系统回复后,补充处理存在消息队列中的消息即可

      队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。

    • 削峰:将大量请求缓存起来,分散到很长一段时间处理

    • 数据分发:数据产生方不关心谁使用数据,只将数据发送到消息队列,使用方从消息队列直接获取数据即可

  • 缺点:

    • 可用性降低(外部依赖越多,稳定性越差)——如何保证MQ高可用?
    • 复杂度提高(从同步转为异步)——消息的顺序性?消息丢失怎么办?消息不被重复消费?
    • 消息的一致性?——BC同时拉取数据,B成功执行,C执行失败,如何保证处理进度的一致性?

主流MQ

单节点、集群搭建

  • 下载地址
  • 目录结构:
    • bin:启动脚本,包括shell脚本和CMD脚本
    • conf:实例配置文件 ,包括broker配置文件、logback配置文件
    • lib:依赖jar包,包括Netty、commons-lang、FastJSON等

单节点

1
2
3
4
5
6
7
8
9
# 1.启动NameServer
nohup sh bin/mqnamesrv &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log

# 1.启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
  • RocketMQ默认的虚拟机内存较大,可能需要修改JVM内存大小(runbroker.sh和runserver.sh中修改JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

  • bash命令:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 1.设置环境变量
    export NAMESRV_ADDR=localhost:9876
    # 2.发送消息
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

    # 2.接收消息
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

    # 1.关闭NameServer
    sh bin/mqshutdown namesrv
    # 2.关闭Broker
    sh bin/mqshutdown broker

集群

集群角色

  • Producer:消息的发送者

    • Producer与NameServer集群中的任意节点建立长连接,定期从NameServer取Topic路由信息
    • Producer向提供Topic服务的Master Broker建立长连接,定时向Master发送心跳
    • Producer完全无状态,可集群部署
  • Consumer:消息接收者

    • Consumer与NameServer集群中的任意节点建立长连接,定期从NameServer取Topic路由信息
    • Consumer向提供Topic服务的Master Broker、Slave Broker建立长连接,定时向Master、Slave发送心跳
    • Consumer既可以从Master订阅消息,也可以从Slave订阅消息
    • 一个消费者只可以订阅和消费一种Topic的消息
  • Broker:暂存和传输消息

    • 分为Master与Slave,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId定义
    • BrokerId为0表示Master,非0表示Slave
    • Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer
  • NameServer:管理Broker

    • 几乎无状态节点,可集群部署,节点之间无任何信息同步
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic

  • Message Queue:Topic的分区;用于并行发送和接收消息

  • Tag:为消息设置的Tag,用于同一主题下区分不同类型的消息。 来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性

  • GroupName:

    • 在集群的情况下,一个生产者down之后,可以继续联系该组下的另外一个生产者实例,不至于导致业务走不下去。在消费者组中,可以实现消息消费的负载均衡和消息容错目标

    • 有了GroupName,在集群下,只需要在新加的机器中,配置相同的GroupName。启动后,能加入到所在的群组中,参与消息生产或消费

    • 同一个消费者组下的消费者,不能同时消费同一个queue

    • 一个节点下,一个topic加一个tag可以对应一个consumer。一个消费者组就是横向上多个节点的相同consumer为一个消费组

搭建方式

  • 单Master:只有一个Broker存储消息

  • 多Master:多个Broker存储消息

    • 单一个Broker宕机时未消费的消息
    • 磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高
  • 多Master多Slave

    • 异步:异步复制,主备有短暂消息延迟(毫秒级);性能同多Master模式几乎一样;Master宕机,磁盘损坏情况下会丢失少量消息
    • 同步:只有主备都写成功,才向应用返回成功;服务可用性与数据可用性都非常高;性能比异步复制模式略低
  • (同步双写)搭建

    • 工作流程:
      • 启动NameServer,NameServer监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心
      • Broker启动,与所有NameServer保持长连接,定时发送心跳——心跳包含当前Broker信息(IP+端口等)以及存储的Topic信息。注册成功后,NameServer集群记录Topic跟Broker的映射关系
      • 收发消息前,创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic
      • Producer发送消息,启动时与一个NameServer建立长连接,从NameServer获取Topic所在的Broker列表,与一个Broker建立长连接从而向Broker发消息
      • Consumer与一台NameServer建立长连接,获取Topic和Broker的映射,直接跟Broker建立连接通道,开始消费消息

Broker配置

1)master1

服务器:192.168.25.135

1
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a.properties

修改配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
2)slave2

服务器:192.168.25.135

1
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties

修改配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
3)master2

服务器:192.168.25.138

1
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b.properties

修改配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
4)slave1

服务器:192.168.25.138

1
vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties

修改配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

mqadmin管理工具

  • 进入RocketMQ,bin目录下执行./mqadmin {command} {args}

1)Topic相关

名称 含义 命令选项 说明
updateTopic 创建更新Topic配置 -b Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port
-c cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询)
-h- 打印帮助
-n NameServer服务地址,格式 ip:port
-p 指定新topic的读写权限( W=2|R=4|WR=6 )
-r 可读队列数(默认为 8)
-w 可写队列数(默认为 8)
-t topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ )
deleteTopic 删除Topic -c cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询)
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ )
topicList 查看 Topic 列表信息 -h 打印帮助
-c 不配置-c只返回topic列表,增加-c返回clusterName, topic, consumerGroup信息,即topic的所属集群和订阅关系,没有参数
-n NameServer 服务地址,格式 ip:port
topicRoute 查看 Topic 路由信息 -t topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
topicStatus 查看 Topic 消息队列offset -t topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
topicClusterList 查看 Topic 所在集群列表 -t topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
updateTopicPerm 更新 Topic 读写权限 -t topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
-b Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port
-p 指定新 topic 的读写权限( W=2|R=4|WR=6 )
-c cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b优先,如果没有-b,则对集群中所有Broker执行命令
updateOrderConf 从NameServer上创建、删除、获取特定命名空间的kv配置,目前还未启用 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic,键
-v orderConf,值
-m method,可选get、put、delete
allocateMQ 以平均负载算法计算消费者列表负载消息队列的负载结果 -t topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
-i ipList,用逗号分隔,计算这些ip去负载Topic的消息队列
statsAll 打印Topic订阅关系、TPS、积累量、24h读写总量等信息 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-a 是否只打印活跃topic
-t 指定topic

2)集群相关

名称 含义 命令选项 说明
clusterList 查看集群信息,集群、BrokerName、BrokerId、TPS等信息 -m 打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday)
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
-i 打印间隔,单位秒
clusterRT 发送消息检测集群各Broker RT。消息发往${BrokerName} Topic。 -a amount,每次探测的总数,RT = 总时间 / amount
-s 消息大小,单位B
-c 探测哪个集群
-p 是否打印格式化日志,以|分割,默认不打印
-h 打印帮助
-m 所属机房,打印使用
-i 发送间隔,单位秒
-n NameServer 服务地址,格式 ip:port

3)Broker相关

名称 含义 命令选项 说明
updateBrokerConfig 更新 Broker 配置文件,会修改Broker.conf -b Broker 地址,格式为ip:port
-c cluster 名称
-k key 值
-v value 值
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
brokerStatus 查看 Broker 统计信息、运行状态(你想要的信息几乎都在里面) -b Broker 地址,地址为ip:port
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
brokerConsumeStats Broker中各个消费者的消费情况,按Message Queue维度返回Consume Offset,Broker Offset,Diff,TImestamp等信息 -b Broker 地址,地址为ip:port
-t 请求超时时间
-l diff阈值,超过阈值才打印
-o 是否为顺序topic,一般为false
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
getBrokerConfig 获取Broker配置 -b Broker 地址,地址为ip:port
-n NameServer 服务地址,格式 ip:port
wipeWritePerm 从NameServer上清除 Broker写权限 -b Broker 地址,地址为ip:port
-n NameServer 服务地址,格式 ip:port
-h 打印帮助
cleanExpiredCQ 清理Broker上过期的Consume Queue,如果手动减少对列数可能产生过期队列 -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-b Broker 地址,地址为ip:port
-c 集群名称
cleanUnusedTopic 清理Broker上不使用的Topic,从内存中释放Topic的Consume Queue,如果手动删除Topic会产生不使用的Topic -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-b Broker 地址,地址为ip:port
-c 集群名称
sendMsgStatus 向Broker发消息,返回发送状态和RT -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-b BrokerName,注意不同于Broker地址
-s 消息大小,单位B
-c 发送次数

4)消息相关

名称 含义 命令选项 说明
queryMsgById 根据offsetMsgId查询msg,如果使用开源控制台,应使用offsetMsgId,此命令还有其他参数,具体作用请阅读QueryMsgByIdSubCommand。 -i msgId
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
queryMsgByKey 根据消息 Key 查询消息 -k msgKey
-t Topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
queryMsgByOffset 根据 Offset 查询消息 -b Broker 名称,(这里需要注意 填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到)
-i query 队列 id
-o offset 值
-t topic 名称
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
queryMsgByUniqueKey 根据msgId查询,msgId不同于offsetMsgId,区别详见常见运维问题。-g,-d配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-i uniqe msg id
-g consumerGroup
-d clientId
-t topic名称
checkMsgSendRT 检测向topic发消息的RT,功能类似clusterRT -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic名称
-a 探测次数
-s 消息大小
sendMessage 发送一条消息,可以根据配置发往特定Message Queue,或普通发送。 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic名称
-p body,消息体
-k keys
-c tags
-b BrokerName
-i queueId
consumeMessage 消费消息。可以根据offset、开始&结束时间戳、消息队列消费消息,配置不同执行不同消费逻辑,详见ConsumeMessageCommand。 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic名称
-b BrokerName
-o 从offset开始消费
-i queueId
-g 消费者分组
-s 开始时间戳,格式详见-h
-d 结束时间戳
-c 消费多少条消息
printMsg 从Broker消费消息并打印,可选时间段 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic名称
-c 字符集,例如UTF-8
-s subExpress,过滤表达式
-b 开始时间戳,格式参见-h
-e 结束时间戳
-d 是否打印消息体
printMsgByQueue 类似printMsg,但指定Message Queue -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-t topic名称
-i queueId
-a BrokerName
-c 字符集,例如UTF-8
-s subExpress,过滤表达式
-b 开始时间戳,格式参见-h
-e 结束时间戳
-p 是否打印消息
-d 是否打印消息体
-f 是否统计tag数量并打印
resetOffsetByTime 按时间戳重置offset,Broker和consumer都会重置 -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-g 消费者分组
-t topic名称
-s 重置为此时间戳对应的offset
-f 是否强制重置,如果false,只支持回溯offset,如果true,不管时间戳对应offset与consumeOffset关系
-c 是否重置c++客户端offset

5)消费者、消费组相关

名称 含义 命令选项 说明
consumerProgress 查看订阅组消费状态,可以查看具体的client IP的消息积累量 -g 消费者所属组名
-s 是否打印client IP
-h 打印帮助
-n NameServer 服务地址,格式 ip:port
consumerStatus 查看消费者状态,包括同一个分组中是否都是相同的订阅,分析Process Queue是否堆积,返回消费者jstack结果,内容较多,使用者参见ConsumerStatusSubCommand -h 打印帮助
-n NameServer 服务地址,格式 ip:port
-g consumer group
-i clientId
-s 是否执行jstack
getConsumerStatus 获取 Consumer 消费进度 -g 消费者所属组名
-t 查询主题
-i Consumer 客户端 ip
-n NameServer 服务地址,格式 ip:port
-h 打印帮助
updateSubGroup 更新或创建订阅关系 -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-b Broker地址
-c 集群名称
-g 消费者分组名称
-s 分组是否允许消费
-m 是否从最小offset开始消费
-d 是否是广播模式
-q 重试队列数量
-r 最大重试次数
-i 当slaveReadEnable开启时有效,且还未达到从slave消费时建议从哪个BrokerId消费,可以配置备机id,主动从备机消费
-w 如果Broker建议从slave消费,配置决定从哪个slave消费,配置BrokerId,例如1
-a 当消费者数量变化时是否通知其他消费者负载均衡
deleteSubGroup 从Broker删除订阅关系 -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-b Broker地址
-c 集群名称
-g 消费者分组名称
cloneGroupOffset 在目标群组中使用源群组的offset -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-s 源消费者组
-d 目标消费者组
-t topic名称
-o 暂未使用

6)连接相关

名称 含义 命令选项 说明
consumerConnec tion 查询 Consumer 的网络连接 -g 消费者所属组名
-n NameServer 服务地址,格式 ip:port
-h 打印帮助
producerConnec tion 查询 Producer 的网络连接 -g 生产者所属组名
-t 主题名称
-n NameServer 服务地址,格式 ip:port
-h 打印帮助

7)NameServer相关

名称 含义 命令选项 说明
updateKvConfig 更新NameServer的kv配置,目前还未使用 -s 命名空间
-k key
-v value
-n NameServer 服务地址,格式 ip:port
-h 打印帮助
deleteKvConfig 删除NameServer的kv配置 -s 命名空间
-k key
-n NameServer 服务地址,格式 ip:port
-h 打印帮助
getNamesrvConfig 获取NameServer配置 -n NameServer 服务地址,格式 ip:port
-h 打印帮助
updateNamesrvConfig 修改NameServer配置 -n NameServer 服务地址,格式 ip:port
-h 打印帮助
-k key
-v value

消息发送消费范例

  • MQ客户端依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
  • 发送者

    1. 创建消息生产者producer,制定生产者组名
    2. 指定Nameserver地址
    3. 启动producer
    4. 创建消息对象,指定主题Topic、Tag和消息体
    5. 发送消息
    6. 关闭生产者
  • 消费者

    1. 创建消费者Consumer,制定消费者组名
    2. 指定Nameserver地址
    3. 订阅主题Topic和Tag
    4. 设置回调函数,处理消息
    5. 启动消费者

基本范例

消息发送

  • 发送同步消息:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class SyncProducer {
    public static void main(String[] args) throws Exception {
    // 实例化消息生产者Producer
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // 设置NameServer的地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动Producer实例
    producer.start();
    for (int i = 0; i < 100; i++) {
    // 创建消息,并指定Topic,Tag和消息体
    Message msg = new Message("TopicTest", /* Topic */
    "TagA", /* Tag */
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
    );
    // 发送消息到一个Broker
    SendResult sendResult = producer.send(msg);
    // 通过sendResult返回消息是否成功送达
    System.out.printf("%s%n", sendResult);
    }
    // 如果不再发送消息,关闭Producer实例。
    producer.shutdown();
    }
    }
  • 发送异步消息:异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class AsyncProducer {
    public static void main(String[] args) throws Exception {
    // ...
    producer.setRetryTimesWhenSendAsyncFailed(0);
    for (int i = 0; i < 100; i++) {
    final int index = i;
    // 创建消息,并指定Topic,Tag和消息体
    Message msg = new Message("TopicTest",
    "TagA",
    "OrderID188",
    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    // SendCallback接收异步返回结果的回调
    producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
    System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
    }
    @Override
    public void onException(Throwable e) {
    System.out.printf("%-10d Exception %s %n", index, e);
    e.printStackTrace();
    }
    });
    }
    producer.shutdown();
    }
    }
  • 发送单向消息:主要用在不特别关心发送结果的场景,例如日志发送

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public class OnewayProducer {
    public static void main(String[] args) throws Exception{
    // 实例化消息生产者Producer
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // 设置NameServer的地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动Producer实例
    producer.start();
    for (int i = 0; i < 100; i++) {
    Message msg = new Message("TopicTest",
    "TagA",
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
    );
    // 发送单向消息,没有任何返回结果
    producer.sendOneway(msg);
    }
    producer.shutdown();
    }
    }

消息消费

  • DefaultMQPushConsumer:系统控制读取操作,系统会自动根据用户设置参数进行消息处理,自动保存Offset、做负载均衡等

  • DefaultMQPullConsumer:使用者自主控制读取操作

  • 负载均衡模式:多个消费者共同消费队列消息,每个消费者处理的消息不同

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public static void main(String[] args) throws Exception {
    // 实例化消息生产者,指定组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    // 指定Namesrv地址信息.
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅Topic
    consumer.subscribe("Test", "*");
    //负载均衡模式消费
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    ConsumeConcurrentlyContext context) {
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    //启动消费者
    consumer.start();
    }
  • 广播模式:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("Test", "*");
    //广播模式消费
    consumer.setMessageModel(MessageModel.BROADCASTING);
    // 注册回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    ConsumeConcurrentlyContext context) {
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    consumer.start();
    }

顺序消息

  • 按照消息的发送顺序来消费(FIFO)——RocketMQ可以保证消息有序,分为分区有序、全局有序

  • 原理:默认情况下,采取Round Robin轮询方式把消息发送到不同的queue(分区队列),消费时从多个queue拉取消息。如果控制发送的消息只依次发送到同一个queue中,并只从这个queue上依次拉取消费,则为全局有序,否则为分区有序——相对每个queue消息都是有序的

  • 分区有序的示例:订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列

  • 消息生产

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    public class Producer {

    public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();

    String[] tags = new String[]{"TagA", "TagC", "TagD"};

    // 订单列表
    List<OrderStep> orderList = new Producer().buildOrders();

    Date date = new Date();
    String dateStr = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
    for (int i = 0; i < 10; i++) {
    String body = dateStr + " Hello RocketMQ " + orderList.get(i);
    Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    Long id = (Long) arg; //根据订单id选择发送queue
    long index = id % mqs.size();
    return mqs.get((int) index);
    }
    }, orderList.get(i).getOrderId()); //订单id,即为arg
    }

    producer.shutdown();
    }

    private static class Order {
    private long orderId;
    private String desc;
    }

    private List<OrderStep> buildOrders() {
    List<OrderStep> orderList = new ArrayList<OrderStep>();

    OrderStep orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111039L);
    orderDemo.setDesc("创建");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111065L);
    orderDemo.setDesc("创建");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111039L);
    orderDemo.setDesc("付款");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111065L);
    orderDemo.setDesc("付款");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111065L);
    orderDemo.setDesc("完成");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111039L);
    orderDemo.setDesc("推送");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111039L);
    orderDemo.setDesc("完成");
    orderList.add(orderDemo);

    return orderList;
    }
    }
  • 顺序消费消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public class ConsumerInOrder {

    public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    /**
    * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
    * 如果非第一次启动,那么按照上次消费的位置继续消费
    */
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TopicTest", "TagA || TagC || TagD");

    consumer.registerMessageListener(new MessageListenerOrderly() {
    Random random = new Random();

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
    context.setAutoCommit(true);
    for (MessageExt msg : msgs) {
    // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
    System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
    }

    try {
    //模拟业务逻辑处理中...
    TimeUnit.SECONDS.sleep(random.nextInt(10));
    } catch (Exception e) {
    e.printStackTrace();
    }
    return ConsumeOrderlyStatus.SUCCESS;
    }
    });
    consumer.start();
    }
    }

延时消息

  • 例如,提交一个订单就可以发送一个延时消息,1h后检查订单的状态,如果未付款就取消订单

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    public class ScheduledMessageConsumer {
    public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
    consumer.subscribe("TestTopic", "*");
    // 注册消息监听
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
    for (MessageExt message : messages) {
    // Print approximate delay time period
    System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    consumer.start();
    }
    }

    public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
    // 实例化一个生产者来产生延时消息
    DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
    producer.start();
    int totalMessagesToSend = 100;
    for (int i = 0; i < totalMessagesToSend; i++) {
    Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
    // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
    message.setDelayTimeLevel(3);
    // 发送消息
    producer.send(message);
    }
    // 关闭生产者
    producer.shutdown();
    }
    }
  • 限制:现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18

    1
    2
    // org/apache/rocketmq/store/config/MessageStoreConfig.java
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

批量消息

  • 批量发送消息显著提高传递小消息的性能,但这些批量消息有相同的topic,相同的waitStoreMsgOK,不能是延时消息,总大小不应超过4MB(超过则需要分割消息)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    String topic = "BatchTest";
    List<Message> messages = new ArrayList<>();
    messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
    messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
    messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
    try {
    producer.send(messages);
    } catch (Exception e) {
    e.printStackTrace();
    }

    // 分割消息
    public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
    this.messages = messages;
    }
    @Override
    public boolean hasNext() {
    return currIndex < messages.size();
    }
    @Override
    public List<Message> next() {
    int nextIndex = currIndex;
    int totalSize = 0;
    for (; nextIndex < messages.size(); nextIndex++) {
    Message message = messages.get(nextIndex);
    int tmpSize = message.getTopic().length() + message.getBody().length;
    Map<String, String> properties = message.getProperties();
    for (Map.Entry<String, String> entry : properties.entrySet()) {
    tmpSize += entry.getKey().length() + entry.getValue().length();
    }
    tmpSize = tmpSize + 20; // 增加日志的开销20字节
    if (tmpSize > SIZE_LIMIT) {
    //单个消息超过了最大的限制
    //忽略,否则会阻塞分裂的进程
    if (nextIndex - currIndex == 0) {
    //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
    nextIndex++;
    }
    break;
    }
    if (tmpSize + totalSize > SIZE_LIMIT) {
    break;
    } else {
    totalSize += tmpSize;
    }
    }
    List<Message> subList = messages.subList(currIndex, nextIndex);
    currIndex = nextIndex;
    return subList;
    }
    }
    //把大的消息分裂成若干个小的消息
    ListSplitter splitter = new ListSplitter(messages);
    while (splitter.hasNext()) {
    try {
    List<Message> listItem = splitter.next();
    producer.send(listItem);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

过滤消息

  • 可以根据tag来过滤消息,如下所示,消费者将消费TAGA或TAGB或TAGC的消息,但一个消息只能有一个标签
1
2
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
  • 使用SQL表达式筛选消息——通过消息的属性运算,如下所示

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    ------------
    | message |
    |----------| a > 5 AND b = 'abc'
    | a = 10 | --------------------> Gotten
    | b = 'abc'|
    | c = true |
    ------------
    ------------
    | message |
    |----------| a > 5 AND b = 'abc'
    | a = 1 | --------------------> Missed
    | b = 'abc'|
    | c = true |
    ------------

SQL基本语法

  • 数值比较,比如:**>,>=,<,<=,BETWEEN,=;**

  • 字符比较,比如:**=,<>,IN;**

  • IS NULL 或者 IS NOT NULL;

  • 逻辑符号 AND,OR,NOT;

  • 常量类型:

    • 数值,比如:123,3.1415;
    • 字符,比如:**’abc’,必须用单引号包裹起来;**
    • NULL,特殊的常量
    • 布尔值,TRUEFALSE
  • 只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

1
public void subscribe(finalString topic, final MessageSelector messageSelector)

消息生产者

  • 通过putUserProperty来设置消息的属性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.start();
    Message msg = new Message("TopicTest",
    tag,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
    );
    // 设置一些属性
    msg.putUserProperty("a", String.valueOf(i));
    SendResult sendResult = producer.send(msg);

    producer.shutdown();

消息消费者

  • 用MessageSelector.bySql使用sql筛选消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    // 只有订阅的消息有这个属性a, a >=0 and a <= 3
    consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    consumer.start();

事务消息

  • 流程:正常事务消息的发送及提交、事务消息的补偿

    • 事务消息发送及提交

      (1) 发送消息(half消息)

      (2) 服务端响应消息写入结果

      (3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)

      (4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

    • 事务补偿:用于解决消息Commit或者Rollback发生超时或者失败的情况

      (1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

      (2) Producer收到回查消息,检查回查消息对应的本地事务的状态

      (3) 根据本地事务状态,重新Commit或者Rollback

  • 事务消息状态:提交状态、回滚状态、中间状态

    • TransactionStatus.CommitTransaction: 提交事务,允许消费者消费此消息
    • TransactionStatus.RollbackTransaction: 回滚事务,代表该消息将被删除,不允许被消费
    • TransactionStatus.Unknown: 中间状态,代表需要检查消息队列来确定状态

事务性生产者

  • 使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态如上所述
  • 当发送半消息成功时,使用 executeLocalTransaction 方法来执行本地事务,该方法返回的三个事务状态之一。checkLocalTranscation 方法用于检查本地事务状态,并回应消息队列的检查请求,返回三个事务状态之一
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
//创建消息生产者
TransactionMQProducer producer = new TransactionMQProducer("group6");
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");

producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
try {
Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
TimeUnit.SECONDS.sleep(1);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//producer.shutdown();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行本地事务");
if (StringUtils.equals("TagA", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TagB", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("MQ检查消息Tag【"+msg.getTags()+"】的本地事务执行结果");
return LocalTransactionState.COMMIT_MESSAGE;
}
}

限制

  1. 事务消息不支持延时消息和批量消息
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,默认将单个消息的检查次数限制为 15 次。用户通过 Broker 配置文件的 transactionCheckMax参数修改此限制。如果检查某条消息超过 N 次( N = transactionCheckMax ) ,Broker 将丢弃此消息,并在默认情况下同时打印错误日志,可以重写 AbstractTransactionCheckListener 类来修改这个行为
  3. 事务消息在 Broker 配置文件中参数 transactionMsgTimeout 时间长度之后被检查。用户可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数
  4. 事务性消息可能不止一次被检查或消费
  5. 如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享
  7. 事务消息允许反向查询、MQ服务器能通过生产者 ID 查询到消费者

高级功能

消息存储

分布式队列要求高可靠性,数据要持久化存储

  1. producer发送消息
  2. MQ收到消息,将消息进行持久化,存储中新增一条记录
  3. 返回ACK给producer
  4. MQ push 消息给对应的consumer,等待consumer返回ACK
  5. MQ在指定时间内收到consumer的ACK,从存储中删除消息,否则认为消息消费失败,尝试重新push消息

存储介质

  • 关系型数据库:ActiveMQ(默认采用的KahaDB做消息存储)可通过JDBC持久化消息
  • 文件系统:RocketMQ/Kafka/RabbitMQ 均是刷盘至所部署虚拟机/物理机的文件系统来持久化(分为异步刷盘和同步刷盘)——除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题,且刷盘速度更快

消息的存储和发送

1)消息存储
  • RocketMQ的消息为顺序写,保证消息存储的速度(顺序写速度可达到600MB/s, 超过一般网卡的传输速度)
2)消息发送
  • 文件操作、网络操作需要涉及用户态和内核态的切换,需要数据复制(以服务器发送文件到客户端为例)

    • read:读取本地文件内容
      • 从磁盘复制数据到内核态内存
      • 从内核态内存复制到用户态内存
    • write:将读取的内容通过网络发送
      • 从用户态内存复制到网络驱动的内核态内存
      • 从网络驱动的内核态内存复制到网卡进行传输

  • 通过使用mmap省去向用户态的内存复制——Java中通过MappedByteBuffer实现(零拷贝)

    • 一次只能映射1.5~2G 的文件至用户态的虚拟内存
    • RocketMQ默认设置单个CommitLog日志数据文件为1G

消息存储结构

  • 存储由ConsumeQueue和CommitLog配合完成
    • 物理存储文件:CommitLog
    • 消息的逻辑队列:ConsumeQueue(类似数据库的索引文件,存储指向物理存储的地址),每个Topic下的每个Message Queue都有一个ConsumeQueue
  • IndexFile:一种通过key或时间区间来查询消息的方法,不影响发送与消费消息的主流程

刷盘机制

  • 同步刷盘、异步刷盘——通过Broker配置文件的 flushDiskType 参数设置(SYNC_FLUSH、ASYNC_FLUSH)
1)同步刷盘
  • 在返回写成功状态时,消息已经被写入磁盘
  • 消息写入内存的PAGE CACHE后立刻通知刷盘线程刷盘, 刷盘完成后唤醒等待的线程,返回消息写成功
2)异步刷盘
  • 返回写成功状态时,消息可能只是被写入了内存的PAGE CACHE
  • 当内存里的消息量积累到一定程度时,统一触发写磁盘

高可用性机制

NameServer高可用

  • NameServer 节点是无状态的,各个节点直接的数据一致
  • 存在多个 NameServer 节点的情况下,部分 NameServer 不可用也可以保证 MQ 服务正常运行

Broker高可用

  • RocketMQ分布式集群是通过Master-Slave实现高可用
    • Broker配置文件参数 brokerId 为0表明是Master,大于0表明是 Slave
    • Broker配置文件参数 brokerRole 说明是Master还是Slave
  • Master 的 Broker 支持读和写,Slave仅支持读

消息消费高可用

  • Consumer不需要设置读Master还是Slave,Master不可用时Consumer自动切换到读Slave

消息发送高可用

  • 创建Topic时,把该Topic的多个MQ创建在多个Broker组上(相同Broker名称,不同 brokerId 的机器组成一个Broker组)
  • 一个Broker组的Master不可用,其他组的Master仍然可用,Producer仍然可以发送消息
  • 目前不支持Slave转Master,要手动停止Slave角色的Broker,更改配置文件,用新的配置文件启动Broker。

消息主从复制

  • 消息从Master复制到Slave上,同步、异步——通过Broker配置文件的brokerRole参数设置,可以设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个

1)同步复制

  • 同步复制:Master和Slave均写成功后才反馈给客户端写成功
  • 优点:Master出故障, Slave上有全部的备份数据,容易恢复;缺点:增大数据写入延迟,降低系统吞吐量

2)异步复制

  • 异步复制:只要Master写成功即反馈给客户端写成功——吞吐量高,但有可能会丢数

负载均衡

Producer负载均衡

  • 每个Producer实例会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。queue可以散落在不同的broker,消息就发送到不同的broker

Consumer负载均衡

集群模式
  • 集群消费模式下,每条消息投递到订阅该topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式消费消息,拉取的时候需要明确指定拉取哪一条message queue
  • 当实例的数量有变更,会触发一次所有实例的负载均衡——按照queue的数量和实例的数量平均分配queue给每个实例,默认的分配算法是AllocateMessageQueueAveragely
  • 另一种平均算法是AllocateMessageQueueAveragelyByCircle,以环状轮流分queue
  • 集群模式下,queue只允许分配一个实例,如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,会导致同一个消息在不同的实例下被消费多次

  • 增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用

  • 需要让queue的总数量大于等于consumer的数量

广播模式
  • 广播模式下一条消息需要投递到一个消费组下面所有的消费者实例
  • 所有consumer都分到所有的queue

消息重试

顺序消息的重试

  • 消费者消费顺序消息失败后,RocketMQ不断进行消息重试(每次间隔 1 秒)——会出现消费阻塞

无序消息的重试

  • 无序消息(普通、定时、延时、事务消息)消费失败时,可以通过设置返回状态实现消息重试
  • 无序消息的重试只在集群消费生效;广播消费不提供失败重试
重试次数
  • RocketMQ 默认允许每条消息最多重试 16 次(消息的 Message ID 不变)
  • 消息重试 16 次后仍然失败,消息将不再投递
第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时
配置方式
  • 集群消费方式下,需要在消息监听器接口的实现中明确配置:
    • 返回 Action.ReconsumeLater (推荐)
    • 返回 Null
    • 抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//处理消息
doConsumeMessage(message);
//方式1:返回 Action.ReconsumeLater,消息将重试
return Action.ReconsumeLater;
//方式2:返回 null,消息将重试
return null;
//方式3:直接抛出异常, 消息将重试
throw new RuntimeException("Consumer Message exceotion");
}
}
  • 如果不重试消息,需要捕获消费逻辑中可能抛出的异常,返回 Action.CommitMessage
1
2
3
4
5
6
7
8
9
10
11
12
13
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
doConsumeMessage(message);
} catch (Throwable e) {
//捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
return Action.CommitMessage;
}
//消息处理正常,直接返回 Action.CommitMessage;
return Action.CommitMessage;
}
}
  • 可以自定义消息最大重试次数

    • 最大重试次数小于等于 16 次,重试时间如上表所述

    • 最大重试次数大于 16 次,超过 16 次的重试时间间隔为每次 2 小时

      1
      2
      3
      4
      Properties properties = new Properties();
      //配置对应 Group ID 的最大消息重试次数为 20 次
      properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
      Consumer consumer =ONSFactory.createConsumer(properties);
  • 配置采用覆盖的方式生效,最后启动的 Consumer 实例会覆盖之前的启动实例的配置
  • 消费者可获取消息的重试次数:

    1
    2
    3
    4
    5
    6
    7
    8
    public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
    //获取消息的重试次数
    message.getReconsumeTimes();
    ...
    }
    }

死信队列

  • 消费失败的消息(死信消息(Dead-Letter Message)),RocketMQ 不会立刻将消息丢弃,而是发送到该消费者对应的特殊队列(死信队列(Dead-Letter Queue))

  • 死信不会再被正常消费吗,三天后自动删除

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例,且包含其产生的所有死信消息。一个 Group ID 未产生死信消息,RocketMQ 不会为其创建相应的死信队列
  • 可以在 RocketMQ 控制台重新发送该消息

消费幂等

  • 消息有可能会出现重复

    • 发送/消费时消息重复:消息已被成功发送到服务端并完成持久化/消息已经成功被消费,但因为网络问题没有应答,此时重复发送消息

    • 负载均衡时消息重复:RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,消费者可能会收到重复消息

  • Message ID 有可能出现冲突(重复),因此不建议以 Message ID 作为处理依据。 最好以业务唯一标识作为幂等处理的依据(可以通过消息 Key 设置)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Message message = new Message();
    message.setKey("ORDERID_100");
    SendResult sendResult = producer.send(message);

    // 幂等处理
    consumer.subscribe("ons_test", "*", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
    String key = message.getKey()
    // 根据业务唯一标识的 key 做幂等处理
    }
    });