写在最前
如果这个项目让你有所收获,记得 Star 关注哦,这对我是非常不错的鼓励与支持。
源码地址(后端):mingyue: ? 基于 Spring Boot、Spring Cloud & Alibaba 的分布式微服务架构基础服务中心
源码地址(前端):mingyue-ui: ? 基于 Vue3 + TS + Vite + Element plus 等技术,适配 MingYue 后台微服务
文档地址:Wiki - Gitee.com
mingyue-common-mq
添加依赖
根据需要在
mingyue-common-mq
模块中添加所需的 MQ 中间件,例如:RocketMQ、Kafka。
<dependencies> <!-- RocketMQ --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency> <!-- Kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> </dependencies>
集成 RocketMQ
引入依赖
<!-- MQ工具 --> <dependency> <groupId>com.csp.mingyue</groupId> <artifactId>mingyue-common-mq</artifactId> </dependency>
Nacos 配置
spring: cloud: stream: function: # 重点配置 与 binding 名与消费者对应 definition: rocketmqDemo rocketmq: binder: # rocketmq 地址 name-server: 192.168.21.32:9876 bindings: rocketmqDemo-out-0: producer: # 必须得写 group: default bindings: rocketmqDemo-out-0: content-type: application/json destination: stream-rocketmq-demo-topic group: demo-group binder: rocketmq rocketmqDemo-in-0: content-type: application/json destination: stream-rocketmq-demo-topic group: demo-group binder: rocketmq
RocketMQ 生产者
@Component public class RocketMqProducer { @Resource private StreamBridge streamBridge; public void rocketMqDemoMsg(String msg) { // 构建消息对象 MqMessageDto messageDto = new MqMessageDto() .setMsgId(IdUtil.fastSimpleUUID()) .setMsgText(msg); streamBridge.send("rocketmqDemo-out-0", MessageBuilder.withPayload(messageDto).build()); } }
RocketMQ 消费者
@Slf4j @Component public class RocketMqConsumer { @Bean Consumer<MqMessageDto> rocketmqDemo() { log.info("Rocket MQ 初始化订阅"); return msg -> { log.info("通过 Rocket MQ 消费到消息 => {}", msg.toString()); }; } }
推送消息到 RocketMQ
@GetMapping("/sendRocketMq") @Operation(summary = "发送消息到RocketMQ", parameters = { @Parameter(name = "msg", description = "推送的消息体", required = true) }) public R<Void> sendRocketMq(String msg) { rocketMqProducer.rocketMqDemoMsg(msg); return R.ok(); }
集成 Kafka
引入依赖
<!-- MQ工具 --> <dependency> <groupId>com.csp.mingyue</groupId> <artifactId>mingyue-common-mq</artifactId> </dependency>
Nacos 配置
spring: cloud: stream: function: # 重点配置 与 binding 名与消费者对应 definition: kafkaDemo kafka: binder: brokers: 192.168.21.32:9092 bindings: kafkaDemo-out-0: destination: stream-kafka-demo-topic contentType: application/json group: demo-group binder: kafka kafkaDemo-in-0: destination: stream-kafka-demo-topic contentType: application/json group: demo-group binder: kafka
Kafka 生产者
@Component public class KafkaProducer { @Resource private StreamBridge streamBridge; public void kafkaDemoMsg(String msg) { // 构建消息对象 MqMessageDto messageDto = new MqMessageDto() .setMsgId(IdUtil.fastSimpleUUID()) .setMsgText(msg); streamBridge.send("kafkaDemo-out-0", MessageBuilder.withPayload(messageDto).build()); } }
Kafka 消费者
@Slf4j @Component public class KafkaConsumer { @Bean Consumer<MqMessageDto> kafkaDemo() { log.info("Kafka 初始化订阅"); return msg -> { log.info("通过 Kafka 消费到消息 => {}", msg.toString()); }; } }
推送消息到 Kafka
@GetMapping("/sendKafka") @Operation(summary = "发送消息到Kafka", parameters = { @Parameter(name = "msg", description = "推送的消息体", required = true) }) public R<Void> sendKafka(String msg) { kafkaProducer.kafkaDemoMsg(msg); return R.ok(); }
拓展 RabbitMQ
mingyue-common-mq 添加依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
Nacos 配置
--- # rabbitmq 配置 spring: rabbitmq: host: rabbitmqIp port: 5672 username: root password: root cloud: stream: function: # 重点配置 与 binding 名与消费者对应 definition: rabbitmqDemo rabbit: bindings: rabbitmqDemo-in-0: consumer: delayedExchange: true rabbitmqDemo-out-0: producer: delayedExchange: true bindings: rabbitmqDemo-in-0: destination: delay.exchange.demo content-type: application/json group: delay-group binder: rabbit rabbitmqDemo-out-0: destination: delay.exchange.demo content-type: application/json group: delay-group binder: rabbit
小结
MQ 基础搭建已经完成,后续会编写一些实际开发中使用到队列的场景,如:
-
订单处理:
-
电子商务平台可以使用消息队列来处理订单,确保订单的创建、支付、发货和通知等各个步骤都能按顺序和可靠地执行。
-
-
通知和提醒:
-
网站或应用程序可以使用消息队列来发送通知和提醒,如电子邮件通知、短信通知、推送通知等,以便与用户互动。
-
-
用户注册和身份验证:
-
当用户注册或请求密码重置时,消息队列可以用于生成和发送验证链接或令牌,确保用户身份验证的安全性和可扩展性。
-
-
数据同步:
-
在多个系统之间同步数据,以确保数据的一致性,例如将用户配置信息从一个微服务同步到另一个微服务。
-
-
事件日志和审计:
-
记录应用程序事件、用户活动和系统操作,以进行审计、监视和故障排除。
-
-
批量处理:
-
处理大量数据导入、数据清洗、ETL(提取、转换、加载)操作等批处理任务,以提高性能和可维护性。
-
-
异步任务处理:
-
处理后台任务,如图像处理、视频编码、生成报告等,以减少响应时间和提高系统的吞吐量。
-
-
队列服务:
-
提供队列服务以支持其他应用程序或团队的异步通信需求,例如云服务提供商的消息队列服务。
-
-
数据分发:
-
将数据从生产者分发给多个消费者,以实现发布-订阅模式,例如新闻订阅、市场报价和天气预报。
-
-
错误处理和重试:
-
处理意外错误和故障,将失败的操作或任务放入队列,以便进行重试或错误处理。文章来源:https://www.uudwc.com/A/Nx5E5/
-
这些业务使用场景只是消息队列的一些示例。消息队列有助于提高系统的可扩展性、弹性和可靠性,允许异步处理和解耦合组件,从而改善了应用程序的整体性能和用户体验。不同的业务需求可能需要不同类型的消息队列系统和配置。文章来源地址https://www.uudwc.com/A/Nx5E5/