文章目录

环境配置RabbitMQ配置多listener异常解决方案一 手动配置RabbitAdmin方案二 配置lookup key

环境配置

spring:2.2.4.RELEASE 文档:Spring-AMQP文档翻译

RabbitMQ配置

@Bean("connectionFactory1")

public ConnectionFactory rabbitConnectionFactory1() {

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();

connectionFactory.setAddresses(rabbitProperties.getAddresses());

connectionFactory.setUsername(rabbitProperties.getUsername());

connectionFactory.setPassword(rabbitProperties.getPassword());

connectionFactory.setVirtualHost("/");

return connectionFactory;

}

@Bean("connectionFactory2")

public ConnectionFactory rabbitConnectionFactory2(ConnectionNameStrategy cns) {

...

return connectionFactory;

}

@Bean("simpleRoutingConnectionFactory")

@Primary

public ConnectionFactory routingConnectionFactory(@Qualifier("connectionFactory1")ConnectionFactory connectionFactory1,

@Qualifier("connectionFactory2")ConnectionFactory connectionFactory2) {

SimpleRoutingConnectionFactory simpleRoutingConnectionFactory = new SimpleRoutingConnectionFactory();

Map map = new HashMap<>();

map.put("business1",connectionFactory1);

map.put("business2",connectionFactory2);

simpleRoutingConnectionFactory.setTargetConnectionFactories(map);

return simpleRoutingConnectionFactory;

}

这里配置了两个不同的连接工厂,并且RoutingConnectionFactory的lookup key的指定为不同的业务。

SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(),routingKey);

rabbitTemplate.convertAndSend(exchange, routingKey, sendContent, (message) -> {

message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);

return message;

},

correlationData);

SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());

源码跟踪 rabbitTemplate send方法

public void send(final String exchange, final String routingKey,

final Message message, @Nullable final CorrelationData correlationData)

throws AmqpException {

execute(channel -> {...}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));

}

obtainTargetConnectionFactory方法使用用户设置的(RabbitTemplate数量)SpEL表达式和message来计算出lookup key在查找对应的connectionFactory。这里我们没有单独配置。所以仍然得到的仍然是RoutingConnectionFactory。 接下来会执行到创建connection方法。

if (isChannelTransacted()) {

...

}

else {

//这里usePublisherConnection默认为false

connection = ConnectionFactoryUtils.createConnection(connectionFactory,

this.usePublisherConnection);

}

//创建方法

public static Connection createConnection(final ConnectionFactory connectionFactory,

final boolean publisherConnectionIfPossible) {

if (publisherConnectionIfPossible) {

ConnectionFactory publisherFactory = connectionFactory.getPublisherConnectionFactory();

if (publisherFactory != null) {

return publisherFactory.createConnection();

}

}

return connectionFactory.createConnection();

}

//创建方法有三个实现类,这里使用AbstractRoutingConnectionFactory的

protected ConnectionFactory determineTargetConnectionFactory() {

//从这查找到lookup key,也就是配置那里bind的。

Object lookupKey = determineCurrentLookupKey();

ConnectionFactory connectionFactory = null;

if (lookupKey != null) {

connectionFactory = this.targetConnectionFactories.get(lookupKey);

}

if (connectionFactory == null && (this.lenientFallback || lookupKey == null)) {

connectionFactory = this.defaultTargetConnectionFactory;

}

if (connectionFactory == null) {

throw new IllegalStateException("Cannot determine target ConnectionFactory for lookup key ["

+ lookupKey + "]");

}

return connectionFactory;

}

多listener异常解决

以上源码还是很简单的。不过需要注意的是,此时项目中未配置listener。 如果配置了listener可能会看到类似的异常。 Cannot determine target ConnectionFactory for lookup key [null] 这是由于SimpleMessageListenerContainer初始化要声明队列引起的,虽然我没可以在配置时进行如下配置,并指定connectionFactory:

@Bean("myListener1")

public SimpleRabbitListenerContainerFactory listenerContainerFactory1(@Qualifier("connectionFactory1")ConnectionFactory

connectionFactory) {

SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();

simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);

return simpleRabbitListenerContainerFactory;

}

//

@RabbitListener(queues = {TestMqConfig.QUEUE_MQ_TEST},containerFactory = "myListener1")

但是实际报错的原因是因为初始化时使用的是RabbitAdmin,这个RabbitAdmin也是用的rabbitTempla进行操作,也就是说仍然用的RoutingConnectionFactory。

java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]

at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:120)

at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:98)

at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214)

at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2095)

at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2068)

at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2048)

at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueInfo(RabbitAdmin.java:407)

at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:391)

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.attemptDeclarations(AbstractMessageListenerContainer.java:1830)

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1811)

at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1337)

at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1183)

at java.lang.Thread.run(Thread.java:745)

找到了问题所在,现在来试试怎么解决这个问题。

方案一 手动配置RabbitAdmin

@Bean

public RabbitAdmin rabbitAdmin(@Qualifier("connectionFactory1")ConnectionFactory

connectionFactory) {

RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);

return rabbitAdmin;

}

这个方案不可行,因为初始化BlockingQueueConsumer时发现connectionFactory还是Routing的所以还是有问题。 具体原因暂时不知道,后面看到了再补充

方案二 配置lookup key

map.put("[queue.mq.test,test]",connectionFactory1);

这里直接将每个listenner监听的队列名(如果一个listenner监听了多个队列就要如上面代码所示填充多个)

文章链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: