目录

1、安装kafka

2、SpringBoot集成kafka代码

2.1 依赖

2.2 yaml配置

2.3 消费者配置

2.4 生产者配置

2.5 发送消息controller

2.6 测试

1、安装kafka

Windows Docker 安装 Kafka_全栈编程网的博客-CSDN博客_docker安装kafka

2、SpringBoot集成kafka代码

2.1 依赖

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-test

test

org.springframework.kafka

spring-kafka

2.2 yaml配置

spring:

application:

name: demo

kafka:

bootstrap-servers: localhost:9092

producer: # producer 生产者

retries: 0 # 重试次数

acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)

batch-size: 16384 # 批量大小

buffer-memory: 33554432 # 生产端缓冲区大小

key-serializer: org.apache.kafka.common.serialization.StringSerializer

# value-serializer: com.itheima.demo.config.MySerializer

value-serializer: org.apache.kafka.common.serialization.StringSerializer

consumer: # consumer消费者

group-id: javagroup # 默认的消费组ID

enable-auto-commit: true # 是否自动提交offset

auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)

# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

auto-offset-reset: latest

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

# value-deserializer: com.itheima.demo.config.MyDeserializer

value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

server:

port: 8088

2.3 消费者配置

监听主题(topic): test_topic

消费组: test_topic_group

@Component

public class KafkaConsumer {

@KafkaListener(topics = {"test_topic"}, groupId = "test_topic_group")

public void listener(ConsumerRecord record) {

Optional msg = Optional.ofNullable(record.value());

if (msg.isPresent()) {

System.out.println(msg.get());

}

}

}

2.4 生产者配置

@Component

public class KafkaProducer {

@Autowired

private KafkaTemplate kafkaTemplate;

public void send(String topic, String key, String msg) {

kafkaTemplate.send(topic, key, msg);

}

}

2.5 发送消息controller

@RestController

public class KafkaController {

@Autowired

private KafkaProducer kafkaProducer;

@GetMapping("/sendMsg/{msg}")

public String send(@PathVariable(value = "msg") String msg

) {

kafkaProducer.send("test_topic", "test_key", msg);

return "success";

}

}

2.6 测试

打开浏览器访问接口:http://localhost:8088/sendMsg/nihao

发送消息:nihao

kafka消费者消费消息如下图:

总结:此文章介绍了最简单的SpringBoot集成kafka的例子。

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: