springBoot集成Kafka

1.  环境搭建

(1)搭建工程kafka-spring-boot-demo 添加pom依赖,最终的依赖信息

        <!-- 继承Spring boot工程 -->
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.8.RELEASE</version>
        </parent>
        <properties>
            <fastjson.version>1.2.58</fastjson.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- kafkfa -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>${fastjson.version}</version>
            </dependency>
        </dependencies>

(2)在resources下创建文件application.yml

server:
  port: 8081
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.137.136:9092
    consumer:
      group-id: kafka-demo-kafka-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(3)引导类

        package com.kafka;

        import org.springframework.boot.SpringApplication;
        import org.springframework.boot.autoconfigure.SpringBootApplication;


        @SpringBootApplication
        public class WebApp {
            public static void main(String[] args) {
                SpringApplication.run(WebApp.class, args);
            }
        }

2.  消息生产者

        新建controller

        package com.kafka.controller;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.kafka.core.KafkaTemplate;
        import org.springframework.web.bind.annotation.GetMapping;
        import org.springframework.web.bind.annotation.RequestMapping;
        import org.springframework.web.bind.annotation.RestController;
​
        @RestController
        @RequestMapping("/test")
        public class TestController {
​
            @Autowired
            private KafkaTemplate<String,String> kafkaTemplate;
​
            @GetMapping("/send")
            public String sendMessage() {
                // 发送消息到kafka
                // 需要使用KafkaTemplate
                String topic = "spring_test_169";
                kafkaTemplate.send(topic,"hello spring boot kafka!");
                return "发送成功.";
            }
        }

5.3 消息消费者

        

1.5.3 消息消费者

        新建监听类:文章来源地址https://www.uudwc.com/A/dOVN/

        package com.heima.kafka.listener;
        import org.apache.kafka.clients.consumer.ConsumerRecord;
        import org.springframework.kafka.annotation.KafkaListener;
        import org.springframework.stereotype.Service;
​
        @Service
        public class HelloListener {
​
            /**
             * 消费者端:指定监听话题
             *
             * @param consumerRecord 监听到数据
             */
            @KafkaListener(topics = {"spring_test_169"})
            public void handlerMsg(ConsumerRecord<String, String> consumerRecord) {
                System.out.println("接收到消息:消息值:" + consumerRecord.value() + ",         消息偏移量:" + consumerRecord.offset());
            }
​

原文地址:https://blog.csdn.net/weixin_66545010/article/details/125633876

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请联系站长进行投诉反馈,一经查实,立即删除!

h
上一篇 2023年06月16日 01:21
下一篇 2023年06月16日 01:21