前言

       首先,你的JDK是否已经是8+了呢?        其次,你是否已经用上SpringBoot3了呢?        最后,这次分享的是SpringBoot3下的kafka发信息与消费信息。

一、场景说明

       这次的场景是springboot3+多数据源的数据交换中心(数仓)需要消费Kafka里的上游推送信息,这里做数据解析处理入库TDengine。

二、使用步骤

1.引入库

org.springframework.kafka

spring-kafka

       简简单单,就这一个依赖就够了。

2.配置

spring:

#kafka配置

kafka:

#bootstrap-servers: 192.168.200.72:9092,192.168.200.73:9092

#bootstrap-servers: 192.168.200.83:9092,192.168.200.84:9092

bootstrap-servers: localhost:9092

client-id: dc-device-flow-analyze

consumer:

group-id: dc-device-flow-analyze-consumer-group

max-poll-records: 10

#Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】

auto-offset-reset: earliest

#是否开启自动提交

enable-auto-commit: false

#自动提交的时间间隔

auto-commit-interval: 1000

producer:

acks: 1

batch-size: 4096

buffer-memory: 40960000

client-id: dc-device-flow-analyze-producer

compression-type: zstd

value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

retries: 3

properties:

spring.json.add.type.headers: false

max.request.size: 126951500

listener:

ack-mode: MANUAL_IMMEDIATE

concurrency: 1 #推荐设置为topic的分区数

type: BATCH #开启批量监听

#消费topic配置

xiaotian:

analyze:

device:

flow:

topic:

consumer: device-flow

3.消费

import com.xiaotian.datagenius.service.DataTransService;

import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.kafka.support.Acknowledgment;

import org.springframework.stereotype.Component;

import java.util.List;

/**

* 消费者listener

*

* @author zhengwen

**/

@Slf4j

@Component

public class KafkaListenConsumer {

@Autowired

private DataTransService dataTransService;

/**

* 设备流水listenner

*

* @param records 消费信息

* @param ack Ack机制

*/

@KafkaListener(topics = "${xiaotian.analyze.device.flow.topic.consumer}")

public void deviceFlowListen(List records, Acknowledgment ack) {

log.debug("=====设备流水deviceFlowListen消费者接收信息====");

try {

for (ConsumerRecord record : records) {

log.debug("---开启线程解析设备流水数据:{}", record.toString());

//具体service里取做逻辑

dataTransService.deviceFlowTransSave(record);

}

} catch (Exception e) {

log.error("----设备流水数据消费者解析数据异常:{}", e.getMessage(), e);

} finally {

//手动提交偏移量

ack.acknowledge();

}

}

}

       消费与SpringBoot2的写法一样,没有任何改变。

4.发布信息

import cn.hutool.json.JSON;

import cn.hutool.json.JSONUtil;

import com.easylinkin.datagenius.core.Result;

import com.easylinkin.datagenius.core.ResultGenerator;

import com.easylinkin.datagenius.vo.KafkaMessageVo;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestBody;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

import java.util.concurrent.CompletableFuture;

/**

* kafka信息管理

*

* @author zhengwen

**/

@Slf4j

@RestController

@RequestMapping("/kafka/push")

public class KafkaPushController {

@Autowired

private KafkaTemplate kafkaTemplate;

/**

* kafka的信息push发送

*

* @param kafkaMessageVo kafka信息对象

* @return 推送结果

*/

@PostMapping("/sendMsg")

public Result sendMsg(@RequestBody KafkaMessageVo kafkaMessageVo) {

String topic = kafkaMessageVo.getTopic();

String msg = kafkaMessageVo.getMessage();

log.debug(msg);

JSON msgJson = JSONUtil.parseObj(msg);

/* springboot2的写法

ListenableFuture> listenableFuture = kafkaTemplate.send(topic, UUID.randomUUID().toString(), msgJson);

//发送成功后回调

SuccessCallback successCallback = new SuccessCallback() {

@Override

public void onSuccess(Object result) {

log.debug("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));

}

};

//发送失败回调

FailureCallback failureCallback = new FailureCallback() {

@Override

public void onFailure(Throwable ex) {

log.error("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), ex);

}

};

listenableFuture.addCallback(successCallback, failureCallback);

*/

//SpringBoot3的写法

CompletableFuture> completableFuture = kafkaTemplate.send(topic, UUID.randomUUID().toString(), msgJson);

//执行成功回调

completableFuture.thenAccept(result -> {

log.debug("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));

});

//执行失败回调

completableFuture.exceptionally(e -> {

log.error("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), e);

return null;

});

return ResultGenerator.genSuccessResult();

}

}

       这个发送信息就与springBoot2的写法一致了。原ListenableFuture类已过时了,现在SpringBoot3、JDK8+用CompletableFuture监听信息发送结果。

总结

1、SpringBoot3真香 2、Kafka的集成已经非常成熟了,资料也多。        我这里这个SpringBoot3集成Kafka发送信息目前觉得是独家,你能找到的应该都还是使用的ListenableFuture类。        好了,就写到这里,希望能帮到大家,uping!!!

精彩链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: