在网上找了很久都发现说的乱七八糟,要不就是版本太老了用法不同,spring-cloud-starter-stream-rabbit新版本推荐使用函数式编程,旧版本的一些注解已经过时弃用了(@StreamListener,@EnableBinding等注解已过时),自己看了下写一篇来记录一下
记得引入maven依赖,版本是3.2.x,本人使用3.2.8,配置了spring-cloud的版本这里就不用写了
1:开启手动提交配置,yml文件配置
spring:
cloud:
stream:
rabbit:
bindings:
delay-in-0:
consumer:
# 禁用自动提交ack,消费者手动提交ack
acknowledge-mode: manual
delayedExchange: true
delay-out-0:
producer:
delayedExchange: true
主要配置:acknowledge-mode: manual,开启手动提交ack
2:编写producer消费者代码,如下
import com.ruoyi.order.mq.TestMessaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class DelayProducer {
@Autowired
private StreamBridge streamBridge;
public void sendMsg(String msg, Long delay) {
// 构建消息对象
TestMessaging testMessaging = new TestMessaging()
.setMsgId(UUID.randomUUID().toString())
.setMsgText(msg);
Message
.setHeader("x-delay", delay).build();
streamBridge.send("delay-out-0", message);
}
}
注意:Message
import lombok.Data;
import lombok.experimental.Accessors;
/**
* @author Lion Li
*/
@Data
@Accessors(chain = true)
public class TestMessaging {
/**
* 消息id
*/
private String msgId;
/**
* 消息内容
*/
private String msgText;
}
2:编写consumer消费者代码
import com.rabbitmq.client.Channel;
import com.ruoyi.order.mq.TestMessaging;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.function.Consumer;
@Slf4j
@Component
public class DelayConsumer {
@Bean
Consumer
log.info("初始化订阅");
return obj -> {
TestMessaging payload = obj.getPayload();
MessageHeaders headers = obj.getHeaders();
Channel channel = headers.get("amqp_channel", Channel.class);
Long deliveryTag = headers.get("amqp_deliveryTag", Long.class);
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
log.info("消息接收成功:" + payload);
};
}
}
消费者代码主要是要接受Consumer
核心代码
Channel channel = headers.get("amqp_channel", Channel.class);
Long deliveryTag = headers.get("amqp_deliveryTag", Long.class);
精彩文章
发表评论