DDD 架构分层,MQ消息要放到那一层处理?

DDD 架构分层,MQ消息要放到那一层处理?

首页卡牌对战代号23游戏更新时间:2024-06-06
一、案例背景

首先我们要知道,MQ 消息的作用是用于;解耦过长的业务流程和应对流量冲击的消峰。如;用户下单支付完成后,拿到支付消息推动后续的发货流程。也可以是我们基于 《MyBatis 使用教程和插件开发》 中的案例场景,给雇员提升级别和薪资的时候,也发送一条MQ消息,用于发送邮件通知给用户。

二、领域事件

因为我们本章所讲解的内容是把 RocketMQ 放入 DDD 架构中进行使用,那么也就引申出领域事件定义。所以我们先来了解下,什么是领域事件。

领域事件,可以说是解耦微服务设计的关键。领域事件也是领域模型中非常重要的一部分内容,用于标示当前领域模型中发生的事件行为。一个领域事件会推进业务流程的进一步操作,在实现业务解耦的同时,也推动了整个业务的闭环。

三、环境安装

本案例涉及了数据库和RocketMQ的使用,都已经在工程中提供了安装脚本,可以按需执行。

这里主要介绍 RocketMQ 的安装;

1. 执行 compose yml

文件:docs/rocketmq/rocketmq-docker-compose-mac-amd-arm.yml - 关于安装小傅哥提供了不同的镜像,包括Mac、Mac M1、Windows 可以按需选择使用。

yml复制代码version: '3' services: # https://hub.docker.com/r/xuchengen/rocketmq # 注意修改项; # 01:data/rocketmq/conf/broker.conf 添加 brokerIP1=127.0.0.1 # 02:data/console/config/application.properties server.port=9009 - 如果8080端口被占用,可以修改或者添加映射端口 rocketmq: image: livinphp/rocketmq:5.1.0 container_name: rocketmq ports: - 9009:9009 - 9876:9876 - 10909:10909 - 10911:10911 - 10912:10912 volumes: - ./data:/home/app/data environment: TZ: "Asia/Shanghai" NAMESRV_ADDR: "rocketmq:9876"

2. 修改默认配合
  1. 打开 data/rocketmq/conf/broker.conf 添加一条 brokerIP1=127.0.0.1 在结尾

java复制代码# 集群名称 brokerClusterName = DefaultCluster # BROKER 名称 brokerName = broker-a # 0 表示 Master, > 0 表示 Slave brokerId = 0 # 删除文件时间点,默认凌晨 4 点 deleteWhen = 04 # 文件保留时间,默认 48 小时 fileReservedTime = 48 # BROKER 角色 ASYNC_MASTER为异步主节点,SYNC_MASTER为同步主节点,SLAVE为从节点 brokerRole = ASYNC_MASTER # 刷新数据到磁盘的方式,ASYNC_FLUSH 刷新 flushDiskType = ASYNC_FLUSH # 存储路径 storePathRootDir = /home/app/data/rocketmq/store # IP地址 brokerIP1 = 127.0.0.1

  1. 打开 ``data/console/config/application.properties修改server.port=9009` 端口。

java复制代码server.address=0.0.0.0 server.port=9009

3. RockMQ登录与配置3.1 登录

RocketMQ 此镜像,会在安装后在控制台打印登录账号信息,你可以查看使用。

登录:http://localhost:9009/

3.2 创建Topic

3.3 创建消费者组

四、工程实现1. 工程结构

2. 配置文件

引入POM

xml复制代码<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-java --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.4</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>

添加配置

yml复制代码# RocketMQ 配置 rocketmq: name-server: 127.0.0.1:9876 consumer: group: xfg-group # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值 pull-batch-size: 10 producer: # 发送同一类消息的设置为同一个group,保证唯一 group: xfg-group # 发送消息超时时间,默认3000 sendMessageTimeout: 10000 # 发送消息失败重试次数,默认2 retryTimesWhenSendFailed: 2 # 异步消息重试此处,默认2 retryTimesWhenSendAsyncFailed: 2 # 消息最大长度,默认1024 * 1024 * 4(默认4M) maxMessageSize: 4096 # 压缩消息阈值,默认4k(1024 * 4) compressMessageBodyThreshold: 4096 # 是否在内部发送失败时重试另一个broker,默认false retryNextServer: false 3. 定义领域事件

源码:cn.bugstack.xfg.dev.tech.domain.salary.event.SalaryAdjustEvent

java复制代码@EqualsAndHashCode(callSuper = true) @Data public class SalaryAdjustEvent extends BaseEvent<AdjustSalaryApplyOrderAggregate> { public static String TOPIC = "xfg-mq"; public static SalaryAdjustEvent create(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) { SalaryAdjustEvent event = new SalaryAdjustEvent(); event.setId(RandomStringUtils.randomNumeric(11)); event.setTimestamp(new Date()); event.setData(adjustSalaryApplyOrderAggregate); return event; } }

4. 消息发送

源码:cn.bugstack.xfg.dev.tech.infrastructure.event.EventPublisher

java复制代码@Component @Slf4j public class EventPublisher { @Setter(onMethod_ = @Autowired) private RocketMQTemplate rocketmqTemplate; /** * 普通消息 * * @param topic 主题 * @param message 消息 */ public void publish(String topic, BaseEvent<?> message) { try { String mqMessage = JSON.toJSONString(message); log.info("发送MQ消息 topic:{} message:{}", topic, mqMessage); rocketmqTemplate.convertAndSend(topic, mqMessage); } catch (Exception e) { log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(message), e); // 大部分MQ发送失败后,会需要任务补偿 } } /** * 延迟消息 * * @param topic 主题 * @param message 消息 * @param delayTimeLevel 延迟时长 */ public void publishDelivery(String topic, BaseEvent<?> message, int delayTimeLevel) { try { String mqMessage = JSON.toJSONString(message); log.info("发送MQ延迟消息 topic:{} message:{}", topic, mqMessage); rocketmqTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 1000, delayTimeLevel); } catch (Exception e) { log.error("发送MQ延迟消息失败 topic:{} message:{}", topic, JSON.toJSONString(message), e); // 大部分MQ发送失败后,会需要任务补偿 } } }

源码:cn.bugstack.xfg.dev.tech.infrastructure.repository.SalaryAdjustRepository

java复制代码@Resource private EventPublisher eventPublisher; @Override @Transactional(rollbackFor = Exception.class, timeout = 350, propagation = Propagation.REQUIRED, isolation = Isolation.DEFAULT) public String adjustSalary(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) { // ... 省略部分代码 eventPublisher.publish(SalaryAdjustEvent.TOPIC, SalaryAdjustEvent.create(adjustSalaryApplyOrderAggregate)); return orderId; }

在 SalaryAdjustRepository 仓储的实现中,做完业务流程开始发送 MQ 消息。这里有2点要注意;

  1. 消息发送,不要写在数据库事务中。因为事务一直占用数据库连接,需要快速释放。
  2. 对于一些强MQ要求的场景,需要在发送MQ前,写入一条数据库 Task 记录,发送消息后更新 Task 状态为成功。如果长时间未更新数据库状态或者为失败的,则需要由任务补偿进行处理。
5. 消费消息

源码:cn.bugstack.xfg.dev.tech.trigger.mq.SalaryAdjustMQListener

java复制代码@Component @Slf4j @RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group") public class SalaryAdjustMQListener implements RocketMQListener<String> { @Override public void onMessage(String s) { log.info("接收到MQ消息 {}", s); } }

六、测试验证1. 单独发送消息测试

java复制代码@Slf4j @RunWith(SpringRunner.class) @SpringBootTest public class RocketMQTest { @Setter(onMethod_ = @Autowired) private RocketMQTemplate rocketmqTemplate; @Test public void test() throws InterruptedException { while (true) { rocketmqTemplate.convertAndSend("xfg-mq", "我是测试消息"); Thread.sleep(3000); } } }

2. 业务流程消息验证

java复制代码@Test public void test_execSalaryAdjust() throws InterruptedException { AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate = AdjustSalaryApplyOrderAggregate.builder() .employeeNumber("10000001") .orderId("100908977676003") .employeeEntity(EmployeeEntity.builder().employeeLevel(EmployeePostVO.T3).employeeTitle(EmployeePostVO.T3).build()) .employeeSalaryAdjustEntity(EmployeeSalaryAdjustEntity.builder() .adjustTotalAmount(new BigDecimal(100)) .adjustBaseAmount(new BigDecimal(80)) .adjustMeritAmount(new BigDecimal(20)).build()) .build(); String orderId = salaryAdjustApplyService.execSalaryAdjust(adjustSalaryApplyOrderAggregate); log.info("调薪测试 req: {} res: {}", JSON.toJSONString(adjustSalaryApplyOrderAggregate), orderId); Thread.sleep(Integer.MAX_VALUE); }

java复制代码23-07-29.15:40:52.307 [main ] INFO HikariDataSource - HikariPool-1 - Start completed. 23-07-29.15:40:52.445 [main ] INFO EventPublisher - 发送MQ消息 topic:xfg-mq message:{"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"} 23-07-29.15:40:52.517 [main ] INFO ISalaryAdjustApplyServiceTest - 调薪测试 req: {"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"} res: 100908977676004 23-07-29.15:40:52.520 [ConsumeMessageThread_1] INFO SalaryAdjustMQListener - 接收到MQ消息 {"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"}

查看全文
大家还看了
也许喜欢
更多游戏

Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved