在网上找了很久都发现说的乱七八糟,要不就是版本太老了用法不同,spring-cloud-starter-stream-rabbit新版本推荐使用函数式编程,旧版本的一些注解已经过时弃用了(@StreamListener,@EnableBinding等注解已过时),自己看了下写一篇来记录一下

记得引入maven依赖,版本是3.2.x,本人使用3.2.8,配置了spring-cloud的版本这里就不用写了

org.springframework.cloud

spring-cloud-starter-stream-rabbit

1:开启手动提交配置,yml文件配置

spring:

cloud:

stream:

rabbit:

bindings:

delay-in-0:

consumer:

# 禁用自动提交ack,消费者手动提交ack

acknowledge-mode: manual

delayedExchange: true

delay-out-0:

producer:

delayedExchange: true

主要配置:acknowledge-mode: manual,开启手动提交ack

2:编写producer消费者代码,如下

import com.ruoyi.order.mq.TestMessaging;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.cloud.stream.function.StreamBridge;

import org.springframework.messaging.Message;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.stereotype.Component;

import java.util.UUID;

@Component

public class DelayProducer {

@Autowired

private StreamBridge streamBridge;

public void sendMsg(String msg, Long delay) {

// 构建消息对象

TestMessaging testMessaging = new TestMessaging()

.setMsgId(UUID.randomUUID().toString())

.setMsgText(msg);

Message message = MessageBuilder.withPayload(testMessaging)

.setHeader("x-delay", delay).build();

streamBridge.send("delay-out-0", message);

}

}

注意:Message TestMessaging是你需要发送的消息本体

import lombok.Data;

import lombok.experimental.Accessors;

/**

* @author Lion Li

*/

@Data

@Accessors(chain = true)

public class TestMessaging {

/**

* 消息id

*/

private String msgId;

/**

* 消息内容

*/

private String msgText;

}

2:编写consumer消费者代码

import com.rabbitmq.client.Channel;

import com.ruoyi.order.mq.TestMessaging;

import lombok.extern.slf4j.Slf4j;

import org.springframework.context.annotation.Bean;

import org.springframework.messaging.Message;

import org.springframework.messaging.MessageHeaders;

import org.springframework.stereotype.Component;

import java.io.IOException;

import java.util.function.Consumer;

@Slf4j

@Component

public class DelayConsumer {

@Bean

Consumer> delay() {

log.info("初始化订阅");

return obj -> {

TestMessaging payload = obj.getPayload();

MessageHeaders headers = obj.getHeaders();

Channel channel = headers.get("amqp_channel", Channel.class);

Long deliveryTag = headers.get("amqp_deliveryTag", Long.class);

try {

channel.basicAck(deliveryTag, false);

} catch (IOException e) {

throw new RuntimeException(e);

}

log.info("消息接收成功:" + payload);

};

}

}

消费者代码主要是要接受Consumer>的返回值,才能通过Message的接口获得Channel和deliveryTag,从而执行channel.basicAck(deliveryTag, false)手动提交ack

核心代码

Channel channel = headers.get("amqp_channel", Channel.class);

Long deliveryTag = headers.get("amqp_deliveryTag", Long.class);

精彩文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: