RocketMQ+Dubbo案例

RocketMQ+Dubbo+Zookeeper,实现下单和支付业务

参考:xvmingyuan/shop: SpringBoot Dubbo RocketMQ订单支付系统 (github.com)

业务分析

  • 模拟购物中的【下单】和【支付】

下单

  1. 用户请求订单系统下单

  2. 订单系统通过RPC调用订单服务下单

  3. 订单服务调用优惠券服务,扣减优惠券

  4. 订单服务调用调用库存服务,校验并扣减库存

  5. 订单服务调用用户服务,扣减用户余额

  6. 订单服务完成确认订单

支付

  1. 用户请求支付系统
  2. 支付系统调用第三方支付平台API进行发起支付流程
  3. 用户通过第三方支付平台支付成功后,第三方支付平台回调通知支付系统
  4. 支付系统调用订单服务修改订单状态
  5. 支付系统调用积分服务添加积分
  6. 支付系统调用日志服务记录日志

问题分析

  • 用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。如何保证数据的完整性?

    使用MQ保证在下单失败后系统数据的完整性(见上方的下单时序图)

  • 用户通过第三方支付平台(支付宝、微信)支付成功后,支付平台回调API异步通知商家支付结果,支付系统修改订单状态、记录支付日志和给用户增加积分。支付系统如何保证在收到异步通知时,快速给第三方支付凭条做出回应?

    通过MQ分发数据,提高系统处理性能

SpringBoot整合

  • SpringBoot、Dubbo、Zookeeper、RocketMQ、Mysql

整合RocketMQ

  • 基于rocketmq-spring,git clone后安装到本地仓库mvn install -Dmaven.skip.test=true

生产者

  • 依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
    </parent>

    <properties>
    <rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
    </properties>

    <dependencies>
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq-spring-boot-starter-version}</version>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.6</version>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>+
    </dependency>

    </dependencies>
  • 配置文件

    1
    2
    3
    # application.properties
    rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
    rocketmq.producer.group=my-group
  • 启动类

    1
    2
    3
    4
    5
    6
    @SpringBootApplication
    public class MQProducerApplication {
    public static void main(String[] args) {
    SpringApplication.run(MQSpringBootApplication.class);
    }
    }
  • 测试类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = {MQSpringBootApplication.class})
    public class ProducerTest {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void test1(){
    rocketMQTemplate.convertAndSend("springboot-mq", "hello springboot rocketmq");
    }
    }

消费者

  • 依赖:同生产者

  • 配置文件:同生产者

  • 启动类

    1
    2
    3
    4
    5
    6
    @SpringBootApplication
    public class MQConsumerApplication {
    public static void main(String[] args) {
    SpringApplication.run(MQSpringBootApplication.class);
    }
    }
  • 消息监听器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "springboot-mq-consumer-1")
    public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
    log.info("Receive message:"+message);
    }
    }

SpringBoot整合Dubbo

搭建Zookeeper集群

  • 安装JDK,Zookeeper上传到服务器,解压并创建data目录,将conf下的zoo_sample.cfg文件改名为zoo.cfg

    • 建立/user/local/zookeeper-cluster,解压后的Zookeeper复制到以下目录

      1
      2
      3
      /usr/local/zookeeper-cluster/zookeeper-1
      /usr/local/zookeeper-cluster/zookeeper-2
      /usr/local/zookeeper-cluster/zookeeper-3
    • 配置 Zookeeper 的 dataDir(zoo.cfg) clientPort 分别为 2181 2182 2183

      /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg

      1
      2
      clientPort=2181
      dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data

      /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg

      1
      2
      clientPort=2182
      dataDir=/usr/local/zookeeper-cluster/zookeeper-2/data

      /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg

      1
      2
      clientPort=2183
      dataDir=/usr/local/zookeeper-cluster/zookeeper-3/data
  • 配置:

    • 每个 zookeeper 的 data 目录下创建 myid 文件,内容分别是 1、2、3——记录每个服务器的 ID

    • 每一个 zookeeper 的 zoo.cfg 配置客户端访问端口(clientPort)和集群服务器 IP 列表

      1
      2
      3
      4
      # 集群IP列表 server.服务器 ID=服务器 IP 地址:服务器之间通信端口:服务器之间投票选举端口
      server.1=192.168.25.140:2881:3881
      server.2=192.168.25.140:2882:3882
      server.3=192.168.25.140:2883:3883
  • 启动集群:分别启动每个实例

  • RPC服务接口:

    1
    2
    3
    public interface IUserService {
    public String sayHello(String name);
    }

服务提供者

  • 依赖:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
    </parent>

    <dependencies>
    <!--dubbo-->
    <dependency>
    <groupId>com.alibaba.spring.boot</groupId>
    <artifactId>dubbo-spring-boot-starter</artifactId>
    <version>2.0.0</version>
    </dependency>
    <!--spring-boot-stater-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    <exclusions>
    <exclusion>
    <artifactId>log4j-to-slf4j</artifactId>
    <groupId>org.apache.logging.log4j</groupId>
    </exclusion>
    </exclusions>
    </dependency>
    <!--zookeeper-->
    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.10</version>
    <exclusions>
    <exclusion>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
    <exclusion>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    </exclusion>
    </exclusions>
    </dependency>

    <dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.9</version>
    <exclusions>
    <exclusion>
    <artifactId>slf4j-log4j12</artifactId>
    <groupId>org.slf4j</groupId>
    </exclusion>
    </exclusions>
    </dependency>
    <!--API-->
    <dependency>
    <groupId>com.demo</groupId>
    <artifactId>dubbo-api</artifactId>
    <version>1.0-SNAPSHOT</version>
    </dependency>

    </dependencies>
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    * 配置文件

    ```properties
    # application.properties
    spring.application.name=dubbo-demo-provider
    spring.dubbo.application.id=dubbo-demo-provider
    spring.dubbo.application.name=dubbo-demo-provider
    spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
    spring.dubbo.server=true
    spring.dubbo.protocol.name=dubbo
    spring.dubbo.protocol.port=20880
  • 启动类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @EnableDubboConfiguration
    @SpringBootApplication
    public class ProviderBootstrap {

    public static void main(String[] args) throws IOException {
    SpringApplication.run(ProviderBootstrap.class,args);
    }

    }
  • 服务实现

    1
    2
    3
    4
    5
    6
    7
    8
    @Component
    @Service(interfaceClass = IUserService.class)
    public class UserServiceImpl implements IUserService{
    @Override
    public String sayHello(String name) {
    return "hello:"+name;
    }
    }

服务消费者

  • 依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
    </parent>

    <dependencies>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!--dubbo-->
    <dependency>
    <groupId>com.alibaba.spring.boot</groupId>
    <artifactId>dubbo-spring-boot-starter</artifactId>
    <version>2.0.0</version>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    <exclusions>
    <exclusion>
    <artifactId>log4j-to-slf4j</artifactId>
    <groupId>org.apache.logging.log4j</groupId>
    </exclusion>
    </exclusions>
    </dependency>

    <!--zookeeper-->
    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.10</version>
    <exclusions>
    <exclusion>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
    <exclusion>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    </exclusion>
    </exclusions>
    </dependency>

    <dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.9</version>
    <exclusions>
    <exclusion>
    <artifactId>slf4j-log4j12</artifactId>
    <groupId>org.slf4j</groupId>
    </exclusion>
    </exclusions>
    </dependency>

    <!--API-->
    <dependency>
    <groupId>com.demo</groupId>
    <artifactId>dubbo-api</artifactId>
    <version>1.0-SNAPSHOT</version>
    </dependency>

    </dependencies>
  • 配置文件

    1
    2
    3
    4
    5
    # application.properties
    spring.application.name=dubbo-demo-consumer
    spring.dubbo.application.name=dubbo-demo-consumer
    spring.dubbo.application.id=dubbo-demo-consumer
    spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
  • 启动类

    1
    2
    3
    4
    5
    6
    7
    @EnableDubboConfiguration
    @SpringBootApplication
    public class ConsumerBootstrap {
    public static void main(String[] args) {
    SpringApplication.run(ConsumerBootstrap.class);
    }
    }
  • Controller调用注册的Service

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @RestController
    @RequestMapping("/user")
    public class UserController {

    @Reference
    private IUserService userService;

    @RequestMapping("/sayHello")
    public String sayHello(String name){
    return userService.sayHello(name);
    }

    }

项目初始化

数据库

  • 优惠券表

    Field Type Comment
    coupon_id bigint(50) NOT NULL 优惠券ID
    coupon_price decimal(10,2) NULL 优惠券金额
    user_id bigint(50) NULL 用户ID
    order_id bigint(32) NULL 订单ID
    is_used int(1) NULL 是否使用 0未使用 1已使用
    used_time timestamp NULL 使用时间
  • 商品表

    Field Type Comment
    goods_id bigint(50) NOT NULL 主键
    goods_name varchar(255) NULL 商品名称
    goods_number int(11) NULL 商品库存
    goods_price decimal(10,2) NULL 商品价格
    goods_desc varchar(255) NULL 商品描述
    add_time timestamp NULL 添加时间
  • 订单表

    Field Type Comment
    order_id bigint(50) NOT NULL 订单ID
    user_id bigint(50) NULL 用户ID
    order_status int(1) NULL 订单状态 0未确认 1已确认 2已取消 3无效 4退款
    pay_status int(1) NULL 支付状态 0未支付 1支付中 2已支付
    shipping_status int(1) NULL 发货状态 0未发货 1已发货 2已退货
    address varchar(255) NULL 收货地址
    consignee varchar(255) NULL 收货人
    goods_id bigint(50) NULL 商品ID
    goods_number int(11) NULL 商品数量
    goods_price decimal(10,2) NULL 商品价格
    goods_amount decimal(10,0) NULL 商品总价
    shipping_fee decimal(10,2) NULL 运费
    order_amount decimal(10,2) NULL 订单价格
    coupon_id bigint(50) NULL 优惠券ID
    coupon_paid decimal(10,2) NULL 优惠券
    money_paid decimal(10,2) NULL 已付金额
    pay_amount decimal(10,2) NULL 支付金额
    add_time timestamp NULL 创建时间
    confirm_time timestamp NULL 订单确认时间
    pay_time timestamp NULL 支付时间
  • 订单日志表

    Field Type Comment
    goods_id int(11) NOT NULL 商品ID
    order_id varchar(32) NOT NULL 订单ID
    goods_number int(11) NULL 库存数量
    log_time datetime NULL 记录时间
  • 用户表

    Field Type Comment
    user_id bigint(50) NOT NULL 用户ID
    user_name varchar(255) NULL 用户姓名
    user_password varchar(255) NULL 用户密码
    user_mobile varchar(255) NULL 手机号
    user_score int(11) NULL 积分
    user_reg_time timestamp NULL 注册时间
    user_money decimal(10,0) NULL 用户余额
  • 用户余额日志表

    Field Type Comment
    user_id bigint(50) NOT NULL 用户ID
    order_id bigint(50) NOT NULL 订单ID
    money_log_type int(1) NOT NULL 日志类型 1订单付款 2 订单退款
    use_money decimal(10,2) NULL 操作金额
    create_time timestamp NULL 日志时间
  • 订单支付表

    Field Type Comment
    pay_id bigint(50) NOT NULL 支付编号
    order_id bigint(50) NULL 订单编号
    pay_amount decimal(10,2) NULL 支付金额
    is_paid int(1) NULL 是否已支付 1否 2是
  • MQ消息生产表

    Field Type Comment
    id varchar(100) NOT NULL 主键
    group_name varchar(100) NULL 生产者组名
    msg_topic varchar(100) NULL 消息主题
    msg_tag varchar(100) NULL Tag
    msg_key varchar(100) NULL Key
    msg_body varchar(500) NULL 消息内容
    msg_status int(1) NULL 0:未处理;1:已经处理
    create_time timestamp NOT NULL 记录时间
  • MQ消息消费表

    Field Type Comment
    msg_id varchar(50) NULL 消息ID
    group_name varchar(100) NOT NULL 消费者组名
    msg_tag varchar(100) NOT NULL Tag
    msg_key varchar(100) NOT NULL Key
    msg_body varchar(500) NULL 消息体
    consumer_status int(1) NULL 0:正在处理;1:处理成功;2:处理失败
    consumer_times int(1) NULL 消费次数
    consumer_timestamp timestamp NULL 消费时间
    remark varchar(500) NULL 备注

项目概括

  • 父工程:shop-parent
  • 订单系统:shop-order-web
  • 支付系统:shop-pay-web
  • 优惠券服务:shop-coupon-service
  • 订单服务:shop-order-service
  • 支付服务:shop-pay-service
  • 商品服务:shop-goods-service
  • 用户服务:shop-user-service
  • 实体类:shop-pojo
  • 持久层:shop-dao
  • 接口层:shop-api
  • 工具工程:shop-common
  • 工程之间的关系参考“SpringBoot整合”的第一张图
  • 通过Mybatis逆向工程对数据表生成CURD持久层代码

    • 实体类导入到shop-pojo
    • 服务层中导入对应的Mapper类和对应配置文件
  • 公共类:

    • ID生成器:IDWorker(Twitter雪花算法)

    • 异常处理类

      • CustomerException:自定义异常类

      • CastException:异常抛出类

    • 常量类:

      • ShopCode:系统状态类
    • 响应实体类:

      • Result:封装响应状态和响应信息

下单业务

基本流程

  • 接口定义:
1
2
3
4
5
6
7
8
public interface IOrderService {
/**
* 确认订单
* @param order
* @return Result
*/
Result confirmOrder(TradeOrder order);
}
  • 业务类实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
@Component
@Service(interfaceClass = IOrderService.class)
public class OrderServiceImpl implements IOrderService {

@Override
public Result confirmOrder(TradeOrder order) {
//1.校验订单

//2.生成预订单

try {
//3.扣减库存

//4.扣减优惠券

//5.使用余额

//6.确认订单

//7.返回成功状态

} catch (Exception e) {
//1.确认订单失败,发送消息

//2.返回失败状态
}

}
}
  • 校验订单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void checkOrder(TradeOrder order) {
//1.校验订单是否存在
if(order==null){
CastException.cast(ShopCode.SHOP_ORDER_INVALID);
}
//2.校验订单中的商品是否存在
TradeGoods goods = goodsService.findOne(order.getGoodsId());
if(goods==null){
CastException.cast(ShopCode.SHOP_GOODS_NO_EXIST);
}
//3.校验下单用户是否存在
TradeUser user = userService.findOne(order.getUserId());
if(user==null){
CastException.cast(ShopCode.SHOP_USER_NO_EXIST);
}
//4.校验商品单价是否合法
if(order.getGoodsPrice().compareTo(goods.getGoodsPrice())!=0){
CastException.cast(ShopCode.SHOP_GOODS_PRICE_INVALID);
}
//5.校验订单商品数量是否合法
if(order.getGoodsNumber()>=goods.getGoodsNumber()){
CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
}

log.info("校验订单通过");
}
  • 生成预订单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private Long savePreOrder(TradeOrder order) {
//1.设置订单状态为不可见
order.setOrderStatus(ShopCode.SHOP_ORDER_NO_CONFIRM.getCode());
//2.订单ID
order.setOrderId(idWorker.nextId());
//核算运费是否正确
BigDecimal shippingFee = calculateShippingFee(order.getOrderAmount());
if (order.getShippingFee().compareTo(shippingFee) != 0) {
CastException.cast(ShopCode.SHOP_ORDER_SHIPPINGFEE_INVALID);
}
//3.计算订单总价格是否正确
BigDecimal orderAmount = order.getGoodsPrice().multiply(new BigDecimal(order.getGoodsNumber()));
orderAmount.add(shippingFee);
if (orderAmount.compareTo(order.getOrderAmount()) != 0) {
CastException.cast(ShopCode.SHOP_ORDERAMOUNT_INVALID);
}

//4.判断优惠券信息是否合法
Long couponId = order.getCouponId();
if (couponId != null) {
TradeCoupon coupon = couponService.findOne(couponId);
//优惠券不存在
if (coupon == null) {
CastException.cast(ShopCode.SHOP_COUPON_NO_EXIST);
}
//优惠券已经使用
if ((ShopCode.SHOP_COUPON_ISUSED.getCode().toString())
.equals(coupon.getIsUsed().toString())) {
CastException.cast(ShopCode.SHOP_COUPON_INVALIED);
}
order.setCouponPaid(coupon.getCouponPrice());
} else {
order.setCouponPaid(BigDecimal.ZERO);
}

//5.判断余额是否正确
BigDecimal moneyPaid = order.getMoneyPaid();
if (moneyPaid != null) {
//比较余额是否大于0
int r = order.getMoneyPaid().compareTo(BigDecimal.ZERO);
//余额小于0
if (r == -1) {
CastException.cast(ShopCode.SHOP_MONEY_PAID_LESS_ZERO);
}
//余额大于0
if (r == 1) {
//查询用户信息
TradeUser user = userService.findOne(order.getUserId());
if (user == null) {
CastException.cast(ShopCode.SHOP_USER_NO_EXIST);
}
//比较余额是否大于用户账户余额
if (user.getUserMoney().compareTo(order.getMoneyPaid().longValue()) == -1) {
CastException.cast(ShopCode.SHOP_MONEY_PAID_INVALID);
}
order.setMoneyPaid(order.getMoneyPaid());
}
} else {
order.setMoneyPaid(BigDecimal.ZERO);
}
//计算订单支付总价
order.setPayAmount(orderAmount.subtract(order.getCouponPaid())
.subtract(order.getMoneyPaid()));
//设置订单添加时间
order.setAddTime(new Date());

//保存预订单
int r = orderMapper.insert(order);
if (ShopCode.SHOP_SUCCESS.getCode() != r) {
CastException.cast(ShopCode.SHOP_ORDER_SAVE_ERROR);
}
log.info("订单:["+order.getOrderId()+"]预订单生成成功");
return order.getOrderId();
}
  • 扣减库存

    • 通过dubbo调用商品服务完成扣减库存

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      private void reduceGoodsNum(TradeOrder order) {
      TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
      goodsNumberLog.setGoodsId(order.getGoodsId());
      goodsNumberLog.setOrderId(order.getOrderId());
      goodsNumberLog.setGoodsNumber(order.getGoodsNumber());
      Result result = goodsService.reduceGoodsNum(goodsNumberLog);
      if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
      CastException.cast(ShopCode.SHOP_REDUCE_GOODS_NUM_FAIL);
      }
      log.info("订单:["+order.getOrderId()+"]扣减库存["+order.getGoodsNumber()+"个]成功");
      }
      }
    • 商品服务GoodsService扣减库存

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      @Override
      public Result reduceGoodsNum(TradeGoodsNumberLog goodsNumberLog) {
      if (goodsNumberLog == null ||
      goodsNumberLog.getGoodsNumber() == null ||
      goodsNumberLog.getOrderId() == null ||
      goodsNumberLog.getGoodsNumber() == null ||
      goodsNumberLog.getGoodsNumber().intValue() <= 0) {
      CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
      }
      TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsNumberLog.getGoodsId());
      if(goods.getGoodsNumber()<goodsNumberLog.getGoodsNumber()){
      //库存不足
      CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
      }
      //减库存
      goods.setGoodsNumber(goods.getGoodsNumber()-goodsNumberLog.getGoodsNumber());
      goodsMapper.updateByPrimaryKey(goods);

      //记录库存操作日志
      goodsNumberLog.setGoodsNumber(-(goodsNumberLog.getGoodsNumber()));
      goodsNumberLog.setLogTime(new Date());
      goodsNumberLogMapper.insert(goodsNumberLog);

      return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
      }
  • 扣减优惠券

    • 通过dubbo完成扣减优惠券

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      private void changeCoponStatus(TradeOrder order) {
      //判断用户是否使用优惠券
      if (!StringUtils.isEmpty(order.getCouponId())) {
      //封装优惠券对象
      TradeCoupon coupon = couponService.findOne(order.getCouponId());
      coupon.setIsUsed(ShopCode.SHOP_COUPON_ISUSED.getCode());
      coupon.setUsedTime(new Date());
      coupon.setOrderId(order.getOrderId());
      Result result = couponService.changeCouponStatus(coupon);
      //判断执行结果
      if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
      //优惠券使用失败
      CastException.cast(ShopCode.SHOP_COUPON_USE_FAIL);
      }
      log.info("订单:["+order.getOrderId()+"]使用扣减优惠券["+coupon.getCouponPrice()+"元]成功");
      }

      }
    • 优惠券服务CouponService更改优惠券状态

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      @Override
      public Result changeCouponStatus(TradeCoupon coupon) {
      try {
      //判断请求参数是否合法
      if (coupon == null || StringUtils.isEmpty(coupon.getCouponId())) {
      CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
      }
      //更新优惠券状态为已使用
      couponMapper.updateByPrimaryKey(coupon);
      return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
      } catch (Exception e) {
      return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
      }
      }
  • 扣减用户余额

    • 通过UserService完成扣减余额

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      private void reduceMoneyPaid(TradeOrder order) {
      //判断订单中使用的余额是否合法
      if (order.getMoneyPaid() != null && order.getMoneyPaid().compareTo(BigDecimal.ZERO) == 1) {
      TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
      userMoneyLog.setOrderId(order.getOrderId());
      userMoneyLog.setUserId(order.getUserId());
      userMoneyLog.setUseMoney(order.getMoneyPaid());
      userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_PAID.getCode());
      //扣减余额
      Result result = userService.changeUserMoney(userMoneyLog);
      if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
      CastException.cast(ShopCode.SHOP_USER_MONEY_REDUCE_FAIL);
      }
      log.info("订单:["+order.getOrderId()+"扣减余额["+order.getMoneyPaid()+"元]成功]");
      }
      }
    • 用户服务UserService更新余额

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      @Override
      public Result changeUserMoney(TradeUserMoneyLog userMoneyLog) {
      //判断请求参数是否合法
      if (userMoneyLog == null
      || userMoneyLog.getUserId() == null
      || userMoneyLog.getUseMoney() == null
      || userMoneyLog.getOrderId() == null
      || userMoneyLog.getUseMoney().compareTo(BigDecimal.ZERO) <= 0) {
      CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
      }

      //查询该订单是否存在付款记录
      TradeUserMoneyLogExample userMoneyLogExample = new TradeUserMoneyLogExample();
      userMoneyLogExample.createCriteria()
      .andUserIdEqualTo(userMoneyLog.getUserId())
      .andOrderIdEqualTo(userMoneyLog.getOrderId());
      int count = userMoneyLogMapper.countByExample(userMoneyLogExample);
      TradeUser tradeUser = new TradeUser();
      tradeUser.setUserId(userMoneyLog.getUserId());
      tradeUser.setUserMoney(userMoneyLog.getUseMoney().longValue());
      //判断余额操作行为
      //【付款操作】
      if (userMoneyLog.getMoneyLogType().equals(ShopCode.SHOP_USER_MONEY_PAID.getCode())) {
      //订单已经付款,则抛异常
      if (count > 0) {
      CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
      }
      //用户账户扣减余额
      userMapper.reduceUserMoney(tradeUser);
      }
      //【退款操作】
      if (userMoneyLog.getMoneyLogType().equals(ShopCode.SHOP_USER_MONEY_REFUND.getCode())) {
      //如果订单未付款,则不能退款,抛异常
      if (count == 0) {
      CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY);
      }
      //防止多次退款
      userMoneyLogExample = new TradeUserMoneyLogExample();
      userMoneyLogExample.createCriteria()
      .andUserIdEqualTo(userMoneyLog.getUserId())
      .andOrderIdEqualTo(userMoneyLog.getOrderId())
      .andMoneyLogTypeEqualTo(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
      count = userMoneyLogMapper.countByExample(userMoneyLogExample);
      if (count > 0) {
      CastException.cast(ShopCode.SHOP_USER_MONEY_REFUND_ALREADY);
      }
      //用户账户添加余额
      userMapper.addUserMoney(tradeUser);
      }


      //记录用户使用余额日志
      userMoneyLog.setCreateTime(new Date());
      userMoneyLogMapper.insert(userMoneyLog);
      return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
      }
  • 确认订单

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    private void updateOrderStatus(TradeOrder order) {
    order.setOrderStatus(ShopCode.SHOP_ORDER_CONFIRM.getCode());
    order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
    order.setConfirmTime(new Date());
    int r = orderMapper.updateByPrimaryKey(order);
    if (r <= 0) {
    CastException.cast(ShopCode.SHOP_ORDER_CONFIRM_FAIL);
    }
    log.info("订单:["+order.getOrderId()+"]状态修改成功");
    }
  • 组织以上流程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @Override
    public Result confirmOrder(TradeOrder order) {
    //1.校验订单
    checkOrder(order);
    //2.生成预订单
    Long orderId = savePreOrder(order);
    order.setOrderId(orderId);
    try {
    //3.扣减库存
    reduceGoodsNum(order);
    //4.扣减优惠券
    changeCoponStatus(order);
    //5.使用余额
    reduceMoneyPaid(order);
    //6.确认订单
    updateOrderStatus(order);
    log.info("订单:["+orderId+"]确认成功");
    return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
    } catch (Exception e) {
    //确认订单失败,发送消息
    ...
    return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
    }
    }

失败补偿机制

消息发送方

  • 配置RocketMQ属性值

    1
    2
    3
    4
    5
    6
    7
    rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
    rocketmq.producer.group=orderProducerGroup

    mq.order.consumer.group.name=order_orderTopic_cancel_group
    mq.order.topic=orderTopic
    mq.order.tag.confirm=order_confirm
    mq.order.tag.cancel=order_cancel
  • 注入模板类和属性值信息

    1
    2
    3
    4
    5
    6
    7
    8
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Value("${mq.order.topic}")
    private String topic;

    @Value("${mq.order.tag.cancel}")
    private String cancelTag;
  • 发送下单失败消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    @Override
    public Result confirmOrder(TradeOrder order) {
    //1.校验订单
    //2.生成预订
    try {
    //3.扣减库存
    //4.扣减优惠券
    //5.使用余额
    //6.确认订单
    } catch (Exception e) {
    //确认订单失败,发送消息
    CancelOrderMQ cancelOrderMQ = new CancelOrderMQ();
    cancelOrderMQ.setOrderId(order.getOrderId());
    cancelOrderMQ.setCouponId(order.getCouponId());
    cancelOrderMQ.setGoodsId(order.getGoodsId());
    cancelOrderMQ.setGoodsNumber(order.getGoodsNumber());
    cancelOrderMQ.setUserId(order.getUserId());
    cancelOrderMQ.setUserMoney(order.getMoneyPaid());
    try {
    sendMessage(topic,
    cancelTag,
    cancelOrderMQ.getOrderId().toString(),
    JSON.toJSONString(cancelOrderMQ));
    } catch (Exception e1) {
    e1.printStackTrace();
    CastException.cast(ShopCode.SHOP_MQ_SEND_MESSAGE_FAIL);
    }
    return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private void sendMessage(String topic, String tags, String keys, String body) throws Exception {
    //判断Topic是否为空
    if (StringUtils.isEmpty(topic)) {
    CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
    }
    //判断消息内容是否为空
    if (StringUtils.isEmpty(body)) {
    CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
    }
    //消息体
    Message message = new Message(topic, tags, keys, body.getBytes());
    //发送消息
    rocketMQTemplate.getProducer().send(message);
    }

消费接收方

  • 配置RocketMQ属性值

    1
    2
    3
    rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
    mq.order.consumer.group.name=order_orderTopic_cancel_group
    mq.order.topic=orderTopic
  • 创建监听类,消费消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "${mq.order.topic}",
    consumerGroup = "${mq.order.consumer.group.name}",
    messageModel = MessageModel.BROADCASTING)
    public class CancelOrderConsumer implements RocketMQListener<MessageExt>{

    @Override
    public void onMessage(MessageExt messageExt) {
    ...
    }
    }
回退库存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{


@Value("${mq.order.consumer.group.name}")
private String groupName;

@Autowired
private TradeGoodsMapper goodsMapper;

@Autowired
private TradeMqConsumerLogMapper mqConsumerLogMapper;

@Autowired
private TradeGoodsNumberLogMapper goodsNumberLogMapper;

@Override
public void onMessage(MessageExt messageExt) {
String msgId=null;
String tags=null;
String keys=null;
String body=null;
try {
//1. 解析消息内容
msgId = messageExt.getMsgId();
tags= messageExt.getTags();
keys= messageExt.getKeys();
body= new String(messageExt.getBody(),"UTF-8");

log.info("接受消息成功");

//2. 查询消息消费记录
TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
primaryKey.setMsgTag(tags);
primaryKey.setMsgKey(keys);
primaryKey.setGroupName(groupName);
TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);

if(mqConsumerLog!=null){
//3. 判断如果消费过...
//3.1 获得消息处理状态
Integer status = mqConsumerLog.getConsumerStatus();
//处理过...返回
if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue()==status.intValue()){
log.info("消息:"+msgId+",已经处理过");
return;
}

//正在处理...返回
if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue()==status.intValue()){
log.info("消息:"+msgId+",正在处理");
return;
}

//处理失败
if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue()==status.intValue()){
//获得消息处理次数
Integer times = mqConsumerLog.getConsumerTimes();
if(times>3){
log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了");
return;
}
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());

//使用数据库乐观锁更新
TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();
TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();
criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag());
criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey());
criteria.andGroupNameEqualTo(groupName);
criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());
int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);
if(r<=0){
//未修改成功,其他线程并发修改
log.info("并发修改,稍后处理");
}
}

}else{
//4. 判断如果没有消费过...
mqConsumerLog = new TradeMqConsumerLog();
mqConsumerLog.setMsgTag(tags);
mqConsumerLog.setMsgKey(keys);
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
mqConsumerLog.setMsgBody(body);
mqConsumerLog.setMsgId(msgId);
mqConsumerLog.setConsumerTimes(0);

//将消息处理信息添加到数据库
mqConsumerLogMapper.insert(mqConsumerLog);
}
//5. 回退库存
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
Long goodsId = mqEntity.getGoodsId();
TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);
goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum());
goodsMapper.updateByPrimaryKey(goods);

//记录库存操作日志
TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
goodsNumberLog.setOrderId(mqEntity.getOrderId());
goodsNumberLog.setGoodsId(goodsId);
goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());
goodsNumberLog.setLogTime(new Date());
goodsNumberLogMapper.insert(goodsNumberLog);

//6. 将消息的处理状态改为成功
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());
mqConsumerLog.setConsumerTimestamp(new Date());
mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);
log.info("回退库存成功");
} catch (Exception e) {
e.printStackTrace();
TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
primaryKey.setMsgTag(tags);
primaryKey.setMsgKey(keys);
primaryKey.setGroupName(groupName);
TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
if(mqConsumerLog==null){
//数据库未有记录
mqConsumerLog = new TradeMqConsumerLog();
mqConsumerLog.setMsgTag(tags);
mqConsumerLog.setMsgKey(keys);
mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());
mqConsumerLog.setMsgBody(body);
mqConsumerLog.setMsgId(msgId);
mqConsumerLog.setConsumerTimes(1);
mqConsumerLogMapper.insert(mqConsumerLog);
}else{
mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1);
mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);
}
}

}
}
回退优惠券
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{


@Autowired
private TradeCouponMapper couponMapper;

@Override
public void onMessage(MessageExt message) {

try {
//1. 解析消息内容
String body = new String(message.getBody(), "UTF-8");
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
log.info("接收到消息");
//2. 查询优惠券信息
TradeCoupon coupon = couponMapper.selectByPrimaryKey(mqEntity.getCouponId());
//3.更改优惠券状态
coupon.setUsedTime(null);
coupon.setIsUsed(ShopCode.SHOP_COUPON_UNUSED.getCode());
coupon.setOrderId(null);
couponMapper.updateByPrimaryKey(coupon);
log.info("回退优惠券成功");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
log.error("回退优惠券失败");
}

}
}
回退余额
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{
@Autowired
private IUserService userService;

@Override
public void onMessage(MessageExt messageExt) {

try {
//1.解析消息
String body = new String(messageExt.getBody(), "UTF-8");
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
log.info("接收到消息");
if(mqEntity.getUserMoney()!=null && mqEntity.getUserMoney().compareTo(BigDecimal.ZERO)>0){
//2.调用业务层,进行余额修改
TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
userMoneyLog.setUseMoney(mqEntity.getUserMoney());
userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
userMoneyLog.setUserId(mqEntity.getUserId());
userMoneyLog.setOrderId(mqEntity.getOrderId());
userService.updateMoneyPaid(userMoneyLog);
log.info("余额回退成功");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
log.error("余额回退失败");
}

}
}
取消订单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void onMessage(MessageExt messageExt) {
String body = new String(messageExt.getBody(), "UTF-8");
String msgId = messageExt.getMsgId();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
log.info("CancelOrderProcessor receive message:"+messageExt);
CancelOrderMQ cancelOrderMQ = JSON.parseObject(body, CancelOrderMQ.class);
TradeOrder order = orderService.findOne(cancelOrderMQ.getOrderId());
order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
orderService.changeOrderStatus(order);
log.info("订单:["+order.getOrderId()+"]状态设置为取消");
return order;
}

测试

测试环境

1
2
3
4
5
6
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ShopOrderServiceApplication.class)
public class OrderTest {
@Autowired
private IOrderService orderService;
}

测试下单成功流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test    
public void add(){
Long goodsId=XXXL;
Long userId=XXXL;
Long couponId=XXXL;

TradeOrder order = new TradeOrder();
order.setGoodsId(goodsId);
order.setUserId(userId);
order.setGoodsNumber(1);
order.setAddress("北京");
order.setGoodsPrice(new BigDecimal("5000"));
order.setOrderAmount(new BigDecimal("5000"));
order.setMoneyPaid(new BigDecimal("100"));
order.setCouponId(couponId);
order.setShippingFee(new BigDecimal(0));
orderService.confirmOrder(order);
}

测试下单失败流程

支付业务

创建支付订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Result createPayment(TradePay tradePay) {
//查询订单支付状态
try {
TradePayExample payExample = new TradePayExample();
TradePayExample.Criteria criteria = payExample.createCriteria();
criteria.andOrderIdEqualTo(tradePay.getOrderId());
criteria.andIsPaidEqualTo(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
int count = tradePayMapper.countByExample(payExample);
if (count > 0) {
CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
}

long payId = idWorker.nextId();
tradePay.setPayId(payId);
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
tradePayMapper.insert(tradePay);
log.info("创建支付订单成功:" + payId);
} catch (Exception e) {
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}

支付回调

流程分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public Result callbackPayment(TradePay tradePay) {

if (tradePay.getIsPaid().equals(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode())) {
tradePay = tradePayMapper.selectByPrimaryKey(tradePay.getPayId());
if (tradePay == null) {
CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
}
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
int i = tradePayMapper.updateByPrimaryKeySelective(tradePay);
//更新成功代表支付成功
if (i == 1) {
TradeMqProducerTemp mqProducerTemp = new TradeMqProducerTemp();
mqProducerTemp.setId(String.valueOf(idWorker.nextId()));
mqProducerTemp.setGroupName("payProducerGroup");
mqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
mqProducerTemp.setMsgTag(topic);
mqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
mqProducerTemp.setCreateTime(new Date());
mqProducerTempMapper.insert(mqProducerTemp);
TradePay finalTradePay = tradePay;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
SendResult sendResult = sendMessage(topic,
tag,
finalTradePay.getPayId(),
JSON.toJSONString(finalTradePay));
log.info(JSON.toJSONString(sendResult));
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());
System.out.println("删除消息表成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
} else {
CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
}
}
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}

线程池优化消息发送逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public ThreadPoolTaskExecutor getThreadPool() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("Pool-A");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();

return executor;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Autowired
private ThreadPoolTaskExecutor executorService;

executorService.submit(new Runnable() {
@Override
public void run() {
try {
SendResult sendResult = sendMessage(topic, tag, finalTradePay.getPayId(), JSON.toJSONString(finalTradePay));
log.info(JSON.toJSONString(sendResult));
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());
System.out.println("删除消息表成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});

处理消息(以订单服务为例)

  • 支付服务payService发送MQ消息,订单服务、用户服务、日志服务需要订阅消息进行处理
    1. 订单服务修改订单状态为已支付
    2. 日志服务记录支付日志
    3. 用户服务负责给用户增加积分

1)配置RocketMQ

1
2
mq.pay.topic=payTopic
mq.pay.consumer.group.name=pay_payTopic_group

2)消费消息

  • 订单服务配置公共的消息处理类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class BaseConsumer {

    public TradeOrder handleMessage(IOrderService
    orderService,
    MessageExt messageExt,Integer code) throws Exception {
    //解析消息内容
    String body = new String(messageExt.getBody(), "UTF-8");
    String msgId = messageExt.getMsgId();
    String tags = messageExt.getTags();
    String keys = messageExt.getKeys();
    OrderMQ orderMq = JSON.parseObject(body, OrderMQ.class);

    //查询
    TradeOrder order = orderService.findOne(orderMq.getOrderId());

    if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_CANCEL.getCode().equals(code)){
    order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
    }

    if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode().equals(code)){
    order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
    }
    orderService.changeOrderStatus(order);
    return order;
    }

    }
  • 接受订单支付成功消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "${mq.pay.topic}",
    consumerGroup = "${mq.pay.consumer.group.name}")
    public class PayConsumer extends BaseConsumer implements RocketMQListener<MessageExt> {

    @Autowired
    private IOrderService orderService;

    @Override
    public void onMessage(MessageExt messageExt) {
    try {
    log.info("CancelOrderProcessor receive message:"+messageExt);
    TradeOrder order = handleMessage(orderService,
    messageExt,
    ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode());
    log.info("订单:["+order.getOrderId()+"]支付成功");
    } catch (Exception e) {
    e.printStackTrace();
    log.error("订单支付失败");
    }
    }
    }

整体组合

  • 通过Rest客户端请求shop-order-web和shop-pay-web

准备工作

1)配置RestTemplate类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Configuration
public class RestTemplateConfig {

@Bean
@ConditionalOnMissingBean({ RestOperations.class, RestTemplate.class })
public RestTemplate restTemplate(ClientHttpRequestFactory factory) {

RestTemplate restTemplate = new RestTemplate(factory);

// 使用 utf-8 编码集的 conver 替换默认的 conver(默认的 string conver 的编码集为"ISO-8859-1")
List<HttpMessageConverter<?>> messageConverters = restTemplate.getMessageConverters();
Iterator<HttpMessageConverter<?>> iterator = messageConverters.iterator();
while (iterator.hasNext()) {
HttpMessageConverter<?> converter = iterator.next();
if (converter instanceof StringHttpMessageConverter) {
iterator.remove();
}
}
messageConverters.add(new StringHttpMessageConverter(Charset.forName("UTF-8")));

return restTemplate;
}

@Bean
@ConditionalOnMissingBean({ClientHttpRequestFactory.class})
public ClientHttpRequestFactory simpleClientHttpRequestFactory() {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
// ms
factory.setReadTimeout(15000);
// ms
factory.setConnectTimeout(15000);
return factory;
}
}

2)配置请求地址

  • 订单

    1
    2
    3
    4
    5
    server.host=http://localhost
    server.servlet.path=/order-web
    server.port=8080
    shop.order.baseURI=${server.host}:${server.port}${server.servlet.path}
    shop.order.confirm=/order/confirm
  • 支付

    1
    2
    3
    4
    5
    6
    server.host=http://localhost
    server.servlet.path=/pay-web
    server.port=9090
    shop.pay.baseURI=${server.host}:${server.port}${server.servlet.path}
    shop.pay.createPayment=/pay/createPayment
    shop.pay.callbackPayment=/pay/callbackPayment

下单测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ShopOrderWebApplication.class)
@TestPropertySource("classpath:application.properties")
public class OrderTest {

@Autowired
private RestTemplate restTemplate;

@Value("${shop.order.baseURI}")
private String baseURI;

@Value("${shop.order.confirm}")
private String confirmOrderPath;

@Autowired
private IDWorker idWorker;

/**
* 下单
*/
@Test
public void confirmOrder(){
Long goodsId=XXXL;
Long userId=XXXL;
Long couponId=XXXL;

TradeOrder order = new TradeOrder();
order.setGoodsId(goodsId);
order.setUserId(userId);
order.setGoodsNumber(1);
order.setAddress("北京");
order.setGoodsPrice(new BigDecimal("5000"));
order.setOrderAmount(new BigDecimal("5000"));
order.setMoneyPaid(new BigDecimal("100"));
order.setCouponId(couponId);
order.setShippingFee(new BigDecimal(0));

Result result = restTemplate.postForEntity(baseURI + confirmOrderPath, order, Result.class).getBody();
System.out.println(result);
}

}

支付测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ShopPayWebApplication.class)
@TestPropertySource("classpath:application.properties")
public class PayTest {

@Autowired
private RestTemplate restTemplate;

@Value("${shop.pay.baseURI}")
private String baseURI;

@Value("${shop.pay.createPayment}")
private String createPaymentPath;

@Value("${shop.pay.callbackPayment}")
private String callbackPaymentPath;

@Autowired
private IDWorker idWorker;

/**
* 创建支付订单
*/
@Test
public void createPayment(){

Long orderId = 346321587315814400L;
TradePay pay = new TradePay();
pay.setOrderId(orderId);
pay.setPayAmount(new BigDecimal(4800));

Result result = restTemplate.postForEntity(baseURI + createPaymentPath, pay, Result.class).getBody();
System.out.println(result);
}

/**
* 支付回调
*/
@Test
public void callbackPayment(){
Long payId = 346321891507720192L;
TradePay pay = new TradePay();
pay.setPayId(payId);
pay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
Result result = restTemplate.postForEntity(baseURI + callbackPaymentPath, pay, Result.class).getBody();
System.out.println(result);

}

}