SpringBoot进阶教程(六十二)整合Kafka
在上一篇文章《Linux安装Kafka》中,已经介绍了如何在Linux安装Kafka,以及Kafka的启动/关闭和创建发话题并产生消息和消费消息。这篇文章就介绍介绍SpringBoot整合Kafka。
v创建项目
若是已有的项目中添加kafka, 请直接跳至1.3
1.1 创建springboot:
1.2 选web和kafka:
1.3 已有的项目中添加kafka, pom.xml中添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
1.4 整体架构目录:
v配置项目
2.1 设置application.yml
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default_consumer_group #群组ID
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server:
port: 8500
项目默认生成的是applicaiton.properties,直接重命名修改文件后缀名为yml即可。
2.2 添加生产者ProducerController
package com.toutou.Controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author toutou * @date by 2019/08 */ @RestController public class ProducerController { @Autowired private KafkaTemplate<String,Object> kafkaTemplate; @RequestMapping("message/send") public String send(String msg){ kafkaTemplate.send("demo", msg); //使用kafka模板发送信息 return "success"; } }
2.3 添加消费者ConsumerDemo
package com.toutou.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * @author toutou * 监听服务器上的kafka是否有相关的消息发过来 * @date by 2019/08 */ @Component public class ConsumerDemo { /** * 定义此消费者接收topics = "demo"的消息,与controller中的topic对应上即可 * @param record 变量代表消息本身,可以通过ConsumerRecord<?,?>类型的record变量来打印接收的消息的各种信息 */ @KafkaListener(topics = "demo") public void listen (ConsumerRecord<?, ?> record){ System.out.printf("topic is %s, offset is %d, value is %s \n", record.topic(), record.offset(), record.value()); } }
v启动测试
3.1 测试生产者
3.2 消费者效果
v源码地址
https://github.com/toutouge/javademosecond/tree/master/hellokafka