文章目录
环境配置RabbitMQ配置多listener异常解决方案一 手动配置RabbitAdmin方案二 配置lookup key
环境配置
spring:2.2.4.RELEASE 文档:Spring-AMQP文档翻译
RabbitMQ配置
@Bean("connectionFactory1")
public ConnectionFactory rabbitConnectionFactory1() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(rabbitProperties.getAddresses());
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean("connectionFactory2")
public ConnectionFactory rabbitConnectionFactory2(ConnectionNameStrategy cns) {
...
return connectionFactory;
}
@Bean("simpleRoutingConnectionFactory")
@Primary
public ConnectionFactory routingConnectionFactory(@Qualifier("connectionFactory1")ConnectionFactory connectionFactory1,
@Qualifier("connectionFactory2")ConnectionFactory connectionFactory2) {
SimpleRoutingConnectionFactory simpleRoutingConnectionFactory = new SimpleRoutingConnectionFactory();
Map
map.put("business1",connectionFactory1);
map.put("business2",connectionFactory2);
simpleRoutingConnectionFactory.setTargetConnectionFactories(map);
return simpleRoutingConnectionFactory;
}
这里配置了两个不同的连接工厂,并且RoutingConnectionFactory的lookup key的指定为不同的业务。
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(),routingKey);
rabbitTemplate.convertAndSend(exchange, routingKey, sendContent, (message) -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
correlationData);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
源码跟踪 rabbitTemplate send方法
public void send(final String exchange, final String routingKey,
final Message message, @Nullable final CorrelationData correlationData)
throws AmqpException {
execute(channel -> {...}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
}
obtainTargetConnectionFactory方法使用用户设置的(RabbitTemplate数量)SpEL表达式和message来计算出lookup key在查找对应的connectionFactory。这里我们没有单独配置。所以仍然得到的仍然是RoutingConnectionFactory。 接下来会执行到创建connection方法。
if (isChannelTransacted()) {
...
}
else {
//这里usePublisherConnection默认为false
connection = ConnectionFactoryUtils.createConnection(connectionFactory,
this.usePublisherConnection);
}
//创建方法
public static Connection createConnection(final ConnectionFactory connectionFactory,
final boolean publisherConnectionIfPossible) {
if (publisherConnectionIfPossible) {
ConnectionFactory publisherFactory = connectionFactory.getPublisherConnectionFactory();
if (publisherFactory != null) {
return publisherFactory.createConnection();
}
}
return connectionFactory.createConnection();
}
//创建方法有三个实现类,这里使用AbstractRoutingConnectionFactory的
protected ConnectionFactory determineTargetConnectionFactory() {
//从这查找到lookup key,也就是配置那里bind的。
Object lookupKey = determineCurrentLookupKey();
ConnectionFactory connectionFactory = null;
if (lookupKey != null) {
connectionFactory = this.targetConnectionFactories.get(lookupKey);
}
if (connectionFactory == null && (this.lenientFallback || lookupKey == null)) {
connectionFactory = this.defaultTargetConnectionFactory;
}
if (connectionFactory == null) {
throw new IllegalStateException("Cannot determine target ConnectionFactory for lookup key ["
+ lookupKey + "]");
}
return connectionFactory;
}
多listener异常解决
以上源码还是很简单的。不过需要注意的是,此时项目中未配置listener。 如果配置了listener可能会看到类似的异常。 Cannot determine target ConnectionFactory for lookup key [null] 这是由于SimpleMessageListenerContainer初始化要声明队列引起的,虽然我没可以在配置时进行如下配置,并指定connectionFactory:
@Bean("myListener1")
public SimpleRabbitListenerContainerFactory listenerContainerFactory1(@Qualifier("connectionFactory1")ConnectionFactory
connectionFactory) {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
return simpleRabbitListenerContainerFactory;
}
//
@RabbitListener(queues = {TestMqConfig.QUEUE_MQ_TEST},containerFactory = "myListener1")
但是实际报错的原因是因为初始化时使用的是RabbitAdmin,这个RabbitAdmin也是用的rabbitTempla进行操作,也就是说仍然用的RoutingConnectionFactory。
java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:120)
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:98)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2095)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2068)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2048)
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueInfo(RabbitAdmin.java:407)
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:391)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.attemptDeclarations(AbstractMessageListenerContainer.java:1830)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1811)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1183)
at java.lang.Thread.run(Thread.java:745)
找到了问题所在,现在来试试怎么解决这个问题。
方案一 手动配置RabbitAdmin
@Bean
public RabbitAdmin rabbitAdmin(@Qualifier("connectionFactory1")ConnectionFactory
connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
这个方案不可行,因为初始化BlockingQueueConsumer时发现connectionFactory还是Routing的所以还是有问题。 具体原因暂时不知道,后面看到了再补充
方案二 配置lookup key
map.put("[queue.mq.test,test]",connectionFactory1);
这里直接将每个listenner监听的队列名(如果一个listenner监听了多个队列就要如上面代码所示填充多个)
文章链接
发表评论