实现相同接口的多个 bean
Several beans implementating the same interface
具体用法是这样的:
@Slf4j
public class Client<E, Key> {
@Getter @NonNull private final UpdateListener<E, Key> updateListener;
@NonNull private final SubscriptionFactory subscriptionFactory;
@NonNull private final Map<Key, Instant> updatedRegistry = new ConcurrentHashMap<>();
public Client(UpdateListener<E, Key> updateListener,
SubscriptionFactory subscriptionFactory) {
this.updateListener = updateListener;
this.subscriptionFactory = subscriptionFactory;
this.subscriptionFactory.registerSnapshotClient(updateListener);
log.info("Created new snapshot client for entity key [{}], update type [{}] and component qualifier [{}]",
updateListener.getEntityKey(),
updateListener.getOptionalChangeType(),
updateListener.getComponentQualifier());
}
@RabbitListener(queues = {"#{@queueNameCreator.createUpdateQueueName(snapshotClient.getUpdateListener())}",
"#{@queueNameCreator.createSnapshotQueueName(snapshotClient.getUpdateListener())}"})
public void handleMessage(Message<E> rawUpdate, @Header("last_updated") Instant newUpdatedTime) {
...//more code
}
}
每个 'Client' 实例都有自己的 bean id,以免相互冲突。
如何使用 SpEl 调用获取此对象的确切 updateListener?
更新
在使用编程方法和注册方法后,我得到以下异常:
Apr 28, 2015 3:22:47 PM org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler handleError
WARNING: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.everymatrix.om2020.messaging.model.SnapshotClient.handleMessage(org.springframework.messaging.Message<E>,java.time.Instant)' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:126)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=11=]1(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:167)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access0(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: No suitable resolver for argument [0] [type=org.springframework.messaging.Message]
更新
完成,您需要执行以下操作才能实现所需的行为。
@Configuration
@EnableRabbit
public static class OmbeRabbitListenerConfigurer implements RabbitListenerConfigurer {
@Autowired ApplicationContext applicationContext;
@Autowired SnapshotClientQueueNamesCreator snapshotClientQueueNamesCreator;
@Autowired RabbitListenerContainerFactory rabbitListenerContainerFactory;
@Autowired MessageConverter messageConverter;
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
final Collection<SnapshotClient> snapshotClients = applicationContext.getBeansOfType(SnapshotClient.class).values();
System.out.println(snapshotClients);
snapshotClients.stream().forEach(bean -> {
final String snapshotQueueName = snapshotClientQueueNamesCreator.createSnapshotQueueName(bean.getUpdateListener());
final String updateQueueName = snapshotClientQueueNamesCreator.createUpdateQueueName(bean.getUpdateListener());
Method method = Stream.of(bean.getClass().getMethods()).filter(x -> x.getName().equals("handleMessage")).findAny().get();
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
final DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.afterPropertiesSet();
endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
endpoint.setBean(bean);
endpoint.setMethod(method);
endpoint.setId(snapshotQueueName + ":" + updateQueueName + UUID.randomUUID());
endpoint.setQueueNames(snapshotQueueName, updateQueueName);
endpoint.setExclusive(false);
registrar.registerEndpoint(endpoint, rabbitListenerContainerFactory);
});
}
}
您的问题不清楚 - 您似乎混淆了运行时和初始化时间的概念。
例如,"#{@queueNameCreator.createUpdateQueueName(e.c.doSomething())}"
在初始化期间被评估一次 - 从这个表达式中不清楚 e
是什么,或者它来自哪里。
但是,您似乎在消息的有效负载中传递了 E
:Message<E> rawUpdate
。此消息来自队列,因此不会影响队列名称。
也许如果你能解释你正在尝试做什么而不是你是如何尝试做的,我可以用可能的解决方案更新这个"answer"。
编辑:
如果你的意思是你想在你的 SpEL 中引用当前(侦听器)bean 中的某些字段,那么它不能直接完成。
EDIT2:
我想不出任何方法来获取对 SpEL 表达式中当前 bean 的引用——它必须是一个常量;这就是注释在 Java 中的工作方式;它们绑定到 class,而不是实例。
我想做你想做的事,你需要恢复使用 programmatic endpoint registration。但是,您需要连接 MethodRabbitListenerEndpoint
(而不是 SimpleRabbitListenerEndpoint
)以获得您正在寻找的注释的好处(@Header
等)。
我们并没有真正在文档中涵盖它;它有点高级,但本质上,您需要注入 bean 和 Method
(对于侦听器),以及 DefaultMessageHandlerMethodFactory
.
具体用法是这样的:
@Slf4j
public class Client<E, Key> {
@Getter @NonNull private final UpdateListener<E, Key> updateListener;
@NonNull private final SubscriptionFactory subscriptionFactory;
@NonNull private final Map<Key, Instant> updatedRegistry = new ConcurrentHashMap<>();
public Client(UpdateListener<E, Key> updateListener,
SubscriptionFactory subscriptionFactory) {
this.updateListener = updateListener;
this.subscriptionFactory = subscriptionFactory;
this.subscriptionFactory.registerSnapshotClient(updateListener);
log.info("Created new snapshot client for entity key [{}], update type [{}] and component qualifier [{}]",
updateListener.getEntityKey(),
updateListener.getOptionalChangeType(),
updateListener.getComponentQualifier());
}
@RabbitListener(queues = {"#{@queueNameCreator.createUpdateQueueName(snapshotClient.getUpdateListener())}",
"#{@queueNameCreator.createSnapshotQueueName(snapshotClient.getUpdateListener())}"})
public void handleMessage(Message<E> rawUpdate, @Header("last_updated") Instant newUpdatedTime) {
...//more code
}
}
每个 'Client' 实例都有自己的 bean id,以免相互冲突。
如何使用 SpEl 调用获取此对象的确切 updateListener?
更新
在使用编程方法和注册方法后,我得到以下异常:
Apr 28, 2015 3:22:47 PM org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler handleError
WARNING: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.everymatrix.om2020.messaging.model.SnapshotClient.handleMessage(org.springframework.messaging.Message<E>,java.time.Instant)' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:126)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=11=]1(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:167)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access0(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: No suitable resolver for argument [0] [type=org.springframework.messaging.Message]
更新
完成,您需要执行以下操作才能实现所需的行为。
@Configuration
@EnableRabbit
public static class OmbeRabbitListenerConfigurer implements RabbitListenerConfigurer {
@Autowired ApplicationContext applicationContext;
@Autowired SnapshotClientQueueNamesCreator snapshotClientQueueNamesCreator;
@Autowired RabbitListenerContainerFactory rabbitListenerContainerFactory;
@Autowired MessageConverter messageConverter;
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
final Collection<SnapshotClient> snapshotClients = applicationContext.getBeansOfType(SnapshotClient.class).values();
System.out.println(snapshotClients);
snapshotClients.stream().forEach(bean -> {
final String snapshotQueueName = snapshotClientQueueNamesCreator.createSnapshotQueueName(bean.getUpdateListener());
final String updateQueueName = snapshotClientQueueNamesCreator.createUpdateQueueName(bean.getUpdateListener());
Method method = Stream.of(bean.getClass().getMethods()).filter(x -> x.getName().equals("handleMessage")).findAny().get();
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
final DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.afterPropertiesSet();
endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
endpoint.setBean(bean);
endpoint.setMethod(method);
endpoint.setId(snapshotQueueName + ":" + updateQueueName + UUID.randomUUID());
endpoint.setQueueNames(snapshotQueueName, updateQueueName);
endpoint.setExclusive(false);
registrar.registerEndpoint(endpoint, rabbitListenerContainerFactory);
});
}
}
您的问题不清楚 - 您似乎混淆了运行时和初始化时间的概念。
例如,"#{@queueNameCreator.createUpdateQueueName(e.c.doSomething())}"
在初始化期间被评估一次 - 从这个表达式中不清楚 e
是什么,或者它来自哪里。
但是,您似乎在消息的有效负载中传递了 E
:Message<E> rawUpdate
。此消息来自队列,因此不会影响队列名称。
也许如果你能解释你正在尝试做什么而不是你是如何尝试做的,我可以用可能的解决方案更新这个"answer"。
编辑:
如果你的意思是你想在你的 SpEL 中引用当前(侦听器)bean 中的某些字段,那么它不能直接完成。
EDIT2:
我想不出任何方法来获取对 SpEL 表达式中当前 bean 的引用——它必须是一个常量;这就是注释在 Java 中的工作方式;它们绑定到 class,而不是实例。
我想做你想做的事,你需要恢复使用 programmatic endpoint registration。但是,您需要连接 MethodRabbitListenerEndpoint
(而不是 SimpleRabbitListenerEndpoint
)以获得您正在寻找的注释的好处(@Header
等)。
我们并没有真正在文档中涵盖它;它有点高级,但本质上,您需要注入 bean 和 Method
(对于侦听器),以及 DefaultMessageHandlerMethodFactory
.