RocketMQ+Dubbo+Zookeeper,实现下单和支付业务
参考:xvmingyuan/shop: SpringBoot Dubbo RocketMQ订单支付系统 (github.com)
业务分析
- 模拟购物中的【下单】和【支付】
下单
用户请求订单系统下单
订单系统通过RPC调用订单服务下单
订单服务调用优惠券服务,扣减优惠券
订单服务调用调用库存服务,校验并扣减库存
订单服务调用用户服务,扣减用户余额
订单服务完成确认订单
支付
- 用户请求支付系统
- 支付系统调用第三方支付平台API进行发起支付流程
- 用户通过第三方支付平台支付成功后,第三方支付平台回调通知支付系统
- 支付系统调用订单服务修改订单状态
- 支付系统调用积分服务添加积分
- 支付系统调用日志服务记录日志
问题分析
用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。如何保证数据的完整性?
使用MQ保证在下单失败后系统数据的完整性(见上方的下单时序图)
用户通过第三方支付平台(支付宝、微信)支付成功后,支付平台回调API异步通知商家支付结果,支付系统修改订单状态、记录支付日志和给用户增加积分。支付系统如何保证在收到异步通知时,快速给第三方支付凭条做出回应?
通过MQ分发数据,提高系统处理性能
SpringBoot整合
- SpringBoot、Dubbo、Zookeeper、RocketMQ、Mysql
整合RocketMQ
- 基于rocketmq-spring,git clone后安装到本地仓库
mvn install -Dmaven.skip.test=true
生产者
依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<properties>
<rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter-version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>+
</dependency>
</dependencies>配置文件
1
2
3# application.properties
192.168.25.135:9876;192.168.25.138:9876 =
my-group =启动类
1
2
3
4
5
6
public class MQProducerApplication {
public static void main(String[] args) {
SpringApplication.run(MQSpringBootApplication.class);
}
}测试类
1
2
3
4
5
6
7
8
9
10
11
12
public class ProducerTest {
private RocketMQTemplate rocketMQTemplate;
public void test1(){
rocketMQTemplate.convertAndSend("springboot-mq", "hello springboot rocketmq");
}
}
消费者
依赖:同生产者
配置文件:同生产者
启动类
1
2
3
4
5
6
public class MQConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(MQSpringBootApplication.class);
}
}消息监听器
1
2
3
4
5
6
7
8
9
10
public class Consumer implements RocketMQListener<String> {
public void onMessage(String message) {
log.info("Receive message:"+message);
}
}
SpringBoot整合Dubbo
- git clonedubbo-spring-boot-starter,mvn安装到本地
mvn install -Dmaven.skip.test=true
搭建Zookeeper集群
安装JDK,Zookeeper上传到服务器,解压并创建data目录,将conf下的zoo_sample.cfg文件改名为zoo.cfg
建立
/user/local/zookeeper-cluster
,解压后的Zookeeper复制到以下目录1
2
3/usr/local/zookeeper-cluster/zookeeper-1
/usr/local/zookeeper-cluster/zookeeper-2
/usr/local/zookeeper-cluster/zookeeper-3配置 Zookeeper 的 dataDir(zoo.cfg) clientPort 分别为 2181 2182 2183
/usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
1
2clientPort=2181
dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data/usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
1
2clientPort=2182
dataDir=/usr/local/zookeeper-cluster/zookeeper-2/data/usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
1
2clientPort=2183
dataDir=/usr/local/zookeeper-cluster/zookeeper-3/data
配置:
每个 zookeeper 的 data 目录下创建 myid 文件,内容分别是 1、2、3——记录每个服务器的 ID
每一个 zookeeper 的 zoo.cfg 配置客户端访问端口(clientPort)和集群服务器 IP 列表
1
2
3
4# 集群IP列表 server.服务器 ID=服务器 IP 地址:服务器之间通信端口:服务器之间投票选举端口
server.1=192.168.25.140:2881:3881
server.2=192.168.25.140:2882:3882
server.3=192.168.25.140:2883:3883
启动集群:分别启动每个实例
RPC服务接口:
1
2
3public interface IUserService {
public String sayHello(String name);
}
服务提供者
依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<!--dubbo-->
<dependency>
<groupId>com.alibaba.spring.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<!--spring-boot-stater-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j-to-slf4j</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!--zookeeper-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.9</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!--API-->
<dependency>
<groupId>com.demo</groupId>
<artifactId>dubbo-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>1
2
3
4
5
6
7
8
9
10
11
12
* 配置文件
```properties
# application.properties
spring.application.name=dubbo-demo-provider
spring.dubbo.application.id=dubbo-demo-provider
spring.dubbo.application.name=dubbo-demo-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20880启动类
1
2
3
4
5
6
7
8
9
public class ProviderBootstrap {
public static void main(String[] args) throws IOException {
SpringApplication.run(ProviderBootstrap.class,args);
}
}服务实现
1
2
3
4
5
6
7
8
public class UserServiceImpl implements IUserService{
public String sayHello(String name) {
return "hello:"+name;
}
}
服务消费者
依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--dubbo-->
<dependency>
<groupId>com.alibaba.spring.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j-to-slf4j</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!--zookeeper-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.9</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!--API-->
<dependency>
<groupId>com.demo</groupId>
<artifactId>dubbo-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>配置文件
1
2
3
4
5# application.properties
dubbo-demo-consumer =
dubbo-demo-consumer =
dubbo-demo-consumer =
zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183 =启动类
1
2
3
4
5
6
7
public class ConsumerBootstrap {
public static void main(String[] args) {
SpringApplication.run(ConsumerBootstrap.class);
}
}Controller调用注册的Service
1
2
3
4
5
6
7
8
9
10
11
12
13
public class UserController {
private IUserService userService;
public String sayHello(String name){
return userService.sayHello(name);
}
}
项目初始化
数据库
优惠券表
Field Type Comment coupon_id bigint(50) NOT NULL 优惠券ID coupon_price decimal(10,2) NULL 优惠券金额 user_id bigint(50) NULL 用户ID order_id bigint(32) NULL 订单ID is_used int(1) NULL 是否使用 0未使用 1已使用 used_time timestamp NULL 使用时间 商品表
Field Type Comment goods_id bigint(50) NOT NULL 主键 goods_name varchar(255) NULL 商品名称 goods_number int(11) NULL 商品库存 goods_price decimal(10,2) NULL 商品价格 goods_desc varchar(255) NULL 商品描述 add_time timestamp NULL 添加时间 订单表
Field Type Comment order_id bigint(50) NOT NULL 订单ID user_id bigint(50) NULL 用户ID order_status int(1) NULL 订单状态 0未确认 1已确认 2已取消 3无效 4退款 pay_status int(1) NULL 支付状态 0未支付 1支付中 2已支付 shipping_status int(1) NULL 发货状态 0未发货 1已发货 2已退货 address varchar(255) NULL 收货地址 consignee varchar(255) NULL 收货人 goods_id bigint(50) NULL 商品ID goods_number int(11) NULL 商品数量 goods_price decimal(10,2) NULL 商品价格 goods_amount decimal(10,0) NULL 商品总价 shipping_fee decimal(10,2) NULL 运费 order_amount decimal(10,2) NULL 订单价格 coupon_id bigint(50) NULL 优惠券ID coupon_paid decimal(10,2) NULL 优惠券 money_paid decimal(10,2) NULL 已付金额 pay_amount decimal(10,2) NULL 支付金额 add_time timestamp NULL 创建时间 confirm_time timestamp NULL 订单确认时间 pay_time timestamp NULL 支付时间 订单日志表
Field Type Comment goods_id int(11) NOT NULL 商品ID order_id varchar(32) NOT NULL 订单ID goods_number int(11) NULL 库存数量 log_time datetime NULL 记录时间 用户表
Field Type Comment user_id bigint(50) NOT NULL 用户ID user_name varchar(255) NULL 用户姓名 user_password varchar(255) NULL 用户密码 user_mobile varchar(255) NULL 手机号 user_score int(11) NULL 积分 user_reg_time timestamp NULL 注册时间 user_money decimal(10,0) NULL 用户余额 用户余额日志表
Field Type Comment user_id bigint(50) NOT NULL 用户ID order_id bigint(50) NOT NULL 订单ID money_log_type int(1) NOT NULL 日志类型 1订单付款 2 订单退款 use_money decimal(10,2) NULL 操作金额 create_time timestamp NULL 日志时间 订单支付表
Field Type Comment pay_id bigint(50) NOT NULL 支付编号 order_id bigint(50) NULL 订单编号 pay_amount decimal(10,2) NULL 支付金额 is_paid int(1) NULL 是否已支付 1否 2是 MQ消息生产表
Field Type Comment id varchar(100) NOT NULL 主键 group_name varchar(100) NULL 生产者组名 msg_topic varchar(100) NULL 消息主题 msg_tag varchar(100) NULL Tag msg_key varchar(100) NULL Key msg_body varchar(500) NULL 消息内容 msg_status int(1) NULL 0:未处理;1:已经处理 create_time timestamp NOT NULL 记录时间 MQ消息消费表
Field Type Comment msg_id varchar(50) NULL 消息ID group_name varchar(100) NOT NULL 消费者组名 msg_tag varchar(100) NOT NULL Tag msg_key varchar(100) NOT NULL Key msg_body varchar(500) NULL 消息体 consumer_status int(1) NULL 0:正在处理;1:处理成功;2:处理失败 consumer_times int(1) NULL 消费次数 consumer_timestamp timestamp NULL 消费时间 remark varchar(500) NULL 备注
项目概括
- 父工程:shop-parent
- 订单系统:shop-order-web
- 支付系统:shop-pay-web
- 优惠券服务:shop-coupon-service
- 订单服务:shop-order-service
- 支付服务:shop-pay-service
- 商品服务:shop-goods-service
- 用户服务:shop-user-service
- 实体类:shop-pojo
- 持久层:shop-dao
- 接口层:shop-api
- 工具工程:shop-common
- 工程之间的关系参考“SpringBoot整合”的第一张图
通过Mybatis逆向工程对数据表生成CURD持久层代码
- 实体类导入到shop-pojo
- 服务层中导入对应的Mapper类和对应配置文件
公共类:
ID生成器:IDWorker(Twitter雪花算法)
异常处理类
CustomerException:自定义异常类
CastException:异常抛出类
常量类:
- ShopCode:系统状态类
响应实体类:
- Result:封装响应状态和响应信息
下单业务
基本流程
- 接口定义:
1 | public interface IOrderService { |
- 业务类实现
1 |
|
- 校验订单
1 | private void checkOrder(TradeOrder order) { |
- 生成预订单
1 | private Long savePreOrder(TradeOrder order) { |
扣减库存
通过dubbo调用商品服务完成扣减库存
1
2
3
4
5
6
7
8
9
10
11
12private void reduceGoodsNum(TradeOrder order) {
TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
goodsNumberLog.setGoodsId(order.getGoodsId());
goodsNumberLog.setOrderId(order.getOrderId());
goodsNumberLog.setGoodsNumber(order.getGoodsNumber());
Result result = goodsService.reduceGoodsNum(goodsNumberLog);
if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
CastException.cast(ShopCode.SHOP_REDUCE_GOODS_NUM_FAIL);
}
log.info("订单:["+order.getOrderId()+"]扣减库存["+order.getGoodsNumber()+"个]成功");
}
}商品服务GoodsService扣减库存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Result reduceGoodsNum(TradeGoodsNumberLog goodsNumberLog) {
if (goodsNumberLog == null ||
goodsNumberLog.getGoodsNumber() == null ||
goodsNumberLog.getOrderId() == null ||
goodsNumberLog.getGoodsNumber() == null ||
goodsNumberLog.getGoodsNumber().intValue() <= 0) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsNumberLog.getGoodsId());
if(goods.getGoodsNumber()<goodsNumberLog.getGoodsNumber()){
//库存不足
CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
}
//减库存
goods.setGoodsNumber(goods.getGoodsNumber()-goodsNumberLog.getGoodsNumber());
goodsMapper.updateByPrimaryKey(goods);
//记录库存操作日志
goodsNumberLog.setGoodsNumber(-(goodsNumberLog.getGoodsNumber()));
goodsNumberLog.setLogTime(new Date());
goodsNumberLogMapper.insert(goodsNumberLog);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
}
扣减优惠券
通过dubbo完成扣减优惠券
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private void changeCoponStatus(TradeOrder order) {
//判断用户是否使用优惠券
if (!StringUtils.isEmpty(order.getCouponId())) {
//封装优惠券对象
TradeCoupon coupon = couponService.findOne(order.getCouponId());
coupon.setIsUsed(ShopCode.SHOP_COUPON_ISUSED.getCode());
coupon.setUsedTime(new Date());
coupon.setOrderId(order.getOrderId());
Result result = couponService.changeCouponStatus(coupon);
//判断执行结果
if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
//优惠券使用失败
CastException.cast(ShopCode.SHOP_COUPON_USE_FAIL);
}
log.info("订单:["+order.getOrderId()+"]使用扣减优惠券["+coupon.getCouponPrice()+"元]成功");
}
}优惠券服务CouponService更改优惠券状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public Result changeCouponStatus(TradeCoupon coupon) {
try {
//判断请求参数是否合法
if (coupon == null || StringUtils.isEmpty(coupon.getCouponId())) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
//更新优惠券状态为已使用
couponMapper.updateByPrimaryKey(coupon);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
} catch (Exception e) {
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
}
扣减用户余额
通过UserService完成扣减余额
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private void reduceMoneyPaid(TradeOrder order) {
//判断订单中使用的余额是否合法
if (order.getMoneyPaid() != null && order.getMoneyPaid().compareTo(BigDecimal.ZERO) == 1) {
TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
userMoneyLog.setOrderId(order.getOrderId());
userMoneyLog.setUserId(order.getUserId());
userMoneyLog.setUseMoney(order.getMoneyPaid());
userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_PAID.getCode());
//扣减余额
Result result = userService.changeUserMoney(userMoneyLog);
if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
CastException.cast(ShopCode.SHOP_USER_MONEY_REDUCE_FAIL);
}
log.info("订单:["+order.getOrderId()+"扣减余额["+order.getMoneyPaid()+"元]成功]");
}
}用户服务UserService更新余额
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public Result changeUserMoney(TradeUserMoneyLog userMoneyLog) {
//判断请求参数是否合法
if (userMoneyLog == null
|| userMoneyLog.getUserId() == null
|| userMoneyLog.getUseMoney() == null
|| userMoneyLog.getOrderId() == null
|| userMoneyLog.getUseMoney().compareTo(BigDecimal.ZERO) <= 0) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
//查询该订单是否存在付款记录
TradeUserMoneyLogExample userMoneyLogExample = new TradeUserMoneyLogExample();
userMoneyLogExample.createCriteria()
.andUserIdEqualTo(userMoneyLog.getUserId())
.andOrderIdEqualTo(userMoneyLog.getOrderId());
int count = userMoneyLogMapper.countByExample(userMoneyLogExample);
TradeUser tradeUser = new TradeUser();
tradeUser.setUserId(userMoneyLog.getUserId());
tradeUser.setUserMoney(userMoneyLog.getUseMoney().longValue());
//判断余额操作行为
//【付款操作】
if (userMoneyLog.getMoneyLogType().equals(ShopCode.SHOP_USER_MONEY_PAID.getCode())) {
//订单已经付款,则抛异常
if (count > 0) {
CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
}
//用户账户扣减余额
userMapper.reduceUserMoney(tradeUser);
}
//【退款操作】
if (userMoneyLog.getMoneyLogType().equals(ShopCode.SHOP_USER_MONEY_REFUND.getCode())) {
//如果订单未付款,则不能退款,抛异常
if (count == 0) {
CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY);
}
//防止多次退款
userMoneyLogExample = new TradeUserMoneyLogExample();
userMoneyLogExample.createCriteria()
.andUserIdEqualTo(userMoneyLog.getUserId())
.andOrderIdEqualTo(userMoneyLog.getOrderId())
.andMoneyLogTypeEqualTo(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
count = userMoneyLogMapper.countByExample(userMoneyLogExample);
if (count > 0) {
CastException.cast(ShopCode.SHOP_USER_MONEY_REFUND_ALREADY);
}
//用户账户添加余额
userMapper.addUserMoney(tradeUser);
}
//记录用户使用余额日志
userMoneyLog.setCreateTime(new Date());
userMoneyLogMapper.insert(userMoneyLog);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
}
确认订单
1
2
3
4
5
6
7
8
9
10private void updateOrderStatus(TradeOrder order) {
order.setOrderStatus(ShopCode.SHOP_ORDER_CONFIRM.getCode());
order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
order.setConfirmTime(new Date());
int r = orderMapper.updateByPrimaryKey(order);
if (r <= 0) {
CastException.cast(ShopCode.SHOP_ORDER_CONFIRM_FAIL);
}
log.info("订单:["+order.getOrderId()+"]状态修改成功");
}组织以上流程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public Result confirmOrder(TradeOrder order) {
//1.校验订单
checkOrder(order);
//2.生成预订单
Long orderId = savePreOrder(order);
order.setOrderId(orderId);
try {
//3.扣减库存
reduceGoodsNum(order);
//4.扣减优惠券
changeCoponStatus(order);
//5.使用余额
reduceMoneyPaid(order);
//6.确认订单
updateOrderStatus(order);
log.info("订单:["+orderId+"]确认成功");
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
} catch (Exception e) {
//确认订单失败,发送消息
...
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
}
失败补偿机制
消息发送方
配置RocketMQ属性值
1
2
3
4
5
6
7192.168.25.135:9876;192.168.25.138:9876 =
orderProducerGroup =
order_orderTopic_cancel_group =
orderTopic =
order_confirm =
order_cancel =注入模板类和属性值信息
1
2
3
4
5
6
7
8
private RocketMQTemplate rocketMQTemplate;
private String topic;
private String cancelTag;发送下单失败消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public Result confirmOrder(TradeOrder order) {
//1.校验订单
//2.生成预订
try {
//3.扣减库存
//4.扣减优惠券
//5.使用余额
//6.确认订单
} catch (Exception e) {
//确认订单失败,发送消息
CancelOrderMQ cancelOrderMQ = new CancelOrderMQ();
cancelOrderMQ.setOrderId(order.getOrderId());
cancelOrderMQ.setCouponId(order.getCouponId());
cancelOrderMQ.setGoodsId(order.getGoodsId());
cancelOrderMQ.setGoodsNumber(order.getGoodsNumber());
cancelOrderMQ.setUserId(order.getUserId());
cancelOrderMQ.setUserMoney(order.getMoneyPaid());
try {
sendMessage(topic,
cancelTag,
cancelOrderMQ.getOrderId().toString(),
JSON.toJSONString(cancelOrderMQ));
} catch (Exception e1) {
e1.printStackTrace();
CastException.cast(ShopCode.SHOP_MQ_SEND_MESSAGE_FAIL);
}
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14private void sendMessage(String topic, String tags, String keys, String body) throws Exception {
//判断Topic是否为空
if (StringUtils.isEmpty(topic)) {
CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
}
//判断消息内容是否为空
if (StringUtils.isEmpty(body)) {
CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
}
//消息体
Message message = new Message(topic, tags, keys, body.getBytes());
//发送消息
rocketMQTemplate.getProducer().send(message);
}
消费接收方
配置RocketMQ属性值
1
2
3192.168.25.135:9876;192.168.25.138:9876 =
order_orderTopic_cancel_group =
orderTopic =创建监听类,消费消息
1
2
3
4
5
6
7
8
9
10
11
12
public class CancelOrderConsumer implements RocketMQListener<MessageExt>{
public void onMessage(MessageExt messageExt) {
...
}
}
回退库存
1 |
|
回退优惠券
1 |
|
回退余额
1 |
|
取消订单
1 |
|
测试
测试环境
1 |
|
测试下单成功流程
1 |
|
测试下单失败流程
略
支付业务
创建支付订单
1 | public Result createPayment(TradePay tradePay) { |
支付回调
流程分析
1 | public Result callbackPayment(TradePay tradePay) { |
线程池优化消息发送逻辑
1 |
|
1 |
|
处理消息(以订单服务为例)
- 支付服务payService发送MQ消息,订单服务、用户服务、日志服务需要订阅消息进行处理
- 订单服务修改订单状态为已支付
- 日志服务记录支付日志
- 用户服务负责给用户增加积分
1)配置RocketMQ
1 | payTopic = |
2)消费消息
订单服务配置公共的消息处理类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public class BaseConsumer {
public TradeOrder handleMessage(IOrderService
orderService,
MessageExt messageExt,Integer code) throws Exception {
//解析消息内容
String body = new String(messageExt.getBody(), "UTF-8");
String msgId = messageExt.getMsgId();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
OrderMQ orderMq = JSON.parseObject(body, OrderMQ.class);
//查询
TradeOrder order = orderService.findOne(orderMq.getOrderId());
if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_CANCEL.getCode().equals(code)){
order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
}
if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode().equals(code)){
order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
}
orderService.changeOrderStatus(order);
return order;
}
}接受订单支付成功消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class PayConsumer extends BaseConsumer implements RocketMQListener<MessageExt> {
private IOrderService orderService;
public void onMessage(MessageExt messageExt) {
try {
log.info("CancelOrderProcessor receive message:"+messageExt);
TradeOrder order = handleMessage(orderService,
messageExt,
ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode());
log.info("订单:["+order.getOrderId()+"]支付成功");
} catch (Exception e) {
e.printStackTrace();
log.error("订单支付失败");
}
}
}
整体组合
- 通过Rest客户端请求shop-order-web和shop-pay-web
准备工作
1)配置RestTemplate类
1 |
|
2)配置请求地址
订单
1
2
3
4
5http://localhost =
/order-web =
8080 =
${server.host}:${server.port}${server.servlet.path} =
/order/confirm =支付
1
2
3
4
5
6http://localhost =
/pay-web =
9090 =
${server.host}:${server.port}${server.servlet.path} =
/pay/createPayment =
/pay/callbackPayment =
下单测试
1 |
|
支付测试
1 |
|