1. 环境搭建
(1)搭建工程kafka-spring-boot-demo 添加pom依赖,最终的依赖信息
<!-- 继承Spring boot工程 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.8.RELEASE</version>
</parent>
<properties>
<fastjson.version>1.2.58</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>
(2)在resources下创建文件application.yml
server: port: 8081 spring: application: name: kafka-demo kafka: bootstrap-servers: 192.168.137.136:9092 consumer: group-id: kafka-demo-kafka-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
(3)引导类
package com.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class WebApp {
public static void main(String[] args) {
SpringApplication.run(WebApp.class, args);
}
}
2. 消息生产者
新建controller
package com.kafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/test") public class TestController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @GetMapping("/send") public String sendMessage() { // 发送消息到kafka // 需要使用KafkaTemplate String topic = "spring_test_169"; kafkaTemplate.send(topic,"hello spring boot kafka!"); return "发送成功."; } }
5.3 消息消费者
1.5.3 消息消费者文章来源:https://www.uudwc.com/A/dOVN/
新建监听类:文章来源地址https://www.uudwc.com/A/dOVN/
package com.heima.kafka.listener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class HelloListener { /** * 消费者端:指定监听话题 * * @param consumerRecord 监听到数据 */ @KafkaListener(topics = {"spring_test_169"}) public void handlerMsg(ConsumerRecord<String, String> consumerRecord) { System.out.println("接收到消息:消息值:" + consumerRecord.value() + ", 消息偏移量:" + consumerRecord.offset()); }