Spring 集成:手动渠道处理
Spring Integration: Manual channel handling
我想要的:构建一个
的可配置库
- 使用另一个具有内部路由和订阅方法的库,例如:
clientInstance.subscribe(endpoint, (endpoint, message) -> <handler>)
,例如Paho MQTT 库
- 稍后在我的代码中,我想访问
Flux
中的消息。
我的想法:
像这样创建 MessageChannels:
integrationFlowContext
.registration(IntegrationFlows.from("message-channel:" + endpoint)).bridge().get())
.register()
转发给有反应的发布者:
applicationContext.registerBean(
"publisher:" + endpoint,
Publisher.class,
() -> IntegrationFlows.from("message-channel:" + endpoint)).toReactivePublisher()
);
将消息通道保持在一个集合或类似的集合中并实现上述处理程序:(endpoint, message) -> messageChannels.get(endpoint).send( <converter>(message))
以后使用(在@PostConstruct
方法中):
Flux
.from((Publihser<Message<?>>)applicationContext.getBean("publisher:" + enpoint))
.map(...)
.subscribe()
我怀疑这是做我想做的事情的最佳方式。感觉就像在滥用 spring 集成。在这一点上欢迎任何建议。
但是总的来说(至少在我的测试中)这似乎有效。但是当我 运行 我的应用程序时,我收到如下错误: "Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available".
这尤其糟糕,因为在此异常之后,发布者声称不再有订阅者。因此,在实际应用中,不再处理任何消息。
我不确定这条消息是什么意思,但我可以重现它(但不明白为什么):
@Test
public void channelTest() {
integrationFlowContext
.registration(
IntegrationFlows.from("any-channel").bridge().get()
)
.register();
registryUtil.registerBean(
"any-publisher",
Publisher.class,
() -> IntegrationFlows.from("any-channel").toReactivePublisher()
);
Flux
.from((Publisher<Message<?>>) applicationContext.getBean("any-publisher"))
.subscribe(System.out::println);
MessageChannel messageChannel = applicationContext.getBean("any-channel", MessageChannel.class);
try {
messageChannel.send(MessageBuilder.withPayload("test").build());
} catch (Throwable t) {
log.error("Error: ", t);
}
}
我当然阅读了部分 spring 集成文档,但不太了解幕后发生的事情。因此,我想猜测可能的错误原因。
编辑:
然而,这有效:
@TestConfiguration
static class Config {
GenericApplicationContext applicationContext;
Config(
GenericApplicationContext applicationContext,
IntegrationFlowContext integrationFlowContext
) {
this.applicationContext = applicationContext;
// optional here, but needed for some reason in my library,
// since I can't find the channel beans like I will do here,
// if I didn't register them like so:
//integrationFlowContext
// .registration(
// IntegrationFlows.from("any-channel").bridge().get())
// .register();
applicationContext.registerBean(
"any-publisher",
Publisher.class,
() -> IntegrationFlows.from("any-channel").toReactivePublisher()
);
}
@PostConstruct
void connect(){
Flux
.from((Publisher<Message<?>>) applicationContext.getBean("any-publisher"))
.subscribe(System.out::println);
}
}
@Autowired
ApplicationContext applicationContext;
@Autowired
IntegrationFlowContext integrationFlowContext;
@Test
@SneakyThrows
public void channel2Test() {
MessageChannel messageChannel = applicationContext.getBean("any-channel", MessageChannel.class);
try {
messageChannel.send(MessageBuilder.withPayload("test").build());
} catch (Throwable t) {
log.error("Error: ", t);
}
}
因此,显然我的上述问题与到达的消息有关 "too early" .. 我猜?!
不,您的问题与 round-robin 有关 any-channel
bean 名称在 DirectChannel
上调度。
您定义了两个从该频道开始的 IntegrationFlow
实例,然后您声明了它们自己的订阅者,但在运行时它们都订阅了同一个 any-channel
实例。默认情况下,它带有 round-robin 平衡器。因此,一条消息发送给您的 Flux.from()
订阅者,而另一条消息发送给 bridge()
不知道如何处理您的消息,因此它尝试解析 replyChannel
header.
因此你的解决方案只有一个 IntegrationFlows.from("any-channel").toReactivePublisher()
是正确的。尽管您可以只进行 FluxMessageChannel
注册并从一侧使用它来发送常规消息,而从另一侧将其用作 Flux.from()
.
的反应源
我想要的:构建一个
的可配置库- 使用另一个具有内部路由和订阅方法的库,例如:
clientInstance.subscribe(endpoint, (endpoint, message) -> <handler>)
,例如Paho MQTT 库 - 稍后在我的代码中,我想访问
Flux
中的消息。
我的想法:
像这样创建 MessageChannels:
integrationFlowContext .registration(IntegrationFlows.from("message-channel:" + endpoint)).bridge().get()) .register()
转发给有反应的发布者:
applicationContext.registerBean( "publisher:" + endpoint, Publisher.class, () -> IntegrationFlows.from("message-channel:" + endpoint)).toReactivePublisher() );
将消息通道保持在一个集合或类似的集合中并实现上述处理程序:
(endpoint, message) -> messageChannels.get(endpoint).send( <converter>(message))
以后使用(在
@PostConstruct
方法中):Flux .from((Publihser<Message<?>>)applicationContext.getBean("publisher:" + enpoint)) .map(...) .subscribe()
我怀疑这是做我想做的事情的最佳方式。感觉就像在滥用 spring 集成。在这一点上欢迎任何建议。
但是总的来说(至少在我的测试中)这似乎有效。但是当我 运行 我的应用程序时,我收到如下错误: "Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available".
这尤其糟糕,因为在此异常之后,发布者声称不再有订阅者。因此,在实际应用中,不再处理任何消息。
我不确定这条消息是什么意思,但我可以重现它(但不明白为什么):
@Test
public void channelTest() {
integrationFlowContext
.registration(
IntegrationFlows.from("any-channel").bridge().get()
)
.register();
registryUtil.registerBean(
"any-publisher",
Publisher.class,
() -> IntegrationFlows.from("any-channel").toReactivePublisher()
);
Flux
.from((Publisher<Message<?>>) applicationContext.getBean("any-publisher"))
.subscribe(System.out::println);
MessageChannel messageChannel = applicationContext.getBean("any-channel", MessageChannel.class);
try {
messageChannel.send(MessageBuilder.withPayload("test").build());
} catch (Throwable t) {
log.error("Error: ", t);
}
}
我当然阅读了部分 spring 集成文档,但不太了解幕后发生的事情。因此,我想猜测可能的错误原因。
编辑:
然而,这有效:
@TestConfiguration
static class Config {
GenericApplicationContext applicationContext;
Config(
GenericApplicationContext applicationContext,
IntegrationFlowContext integrationFlowContext
) {
this.applicationContext = applicationContext;
// optional here, but needed for some reason in my library,
// since I can't find the channel beans like I will do here,
// if I didn't register them like so:
//integrationFlowContext
// .registration(
// IntegrationFlows.from("any-channel").bridge().get())
// .register();
applicationContext.registerBean(
"any-publisher",
Publisher.class,
() -> IntegrationFlows.from("any-channel").toReactivePublisher()
);
}
@PostConstruct
void connect(){
Flux
.from((Publisher<Message<?>>) applicationContext.getBean("any-publisher"))
.subscribe(System.out::println);
}
}
@Autowired
ApplicationContext applicationContext;
@Autowired
IntegrationFlowContext integrationFlowContext;
@Test
@SneakyThrows
public void channel2Test() {
MessageChannel messageChannel = applicationContext.getBean("any-channel", MessageChannel.class);
try {
messageChannel.send(MessageBuilder.withPayload("test").build());
} catch (Throwable t) {
log.error("Error: ", t);
}
}
因此,显然我的上述问题与到达的消息有关 "too early" .. 我猜?!
不,您的问题与 round-robin 有关 any-channel
bean 名称在 DirectChannel
上调度。
您定义了两个从该频道开始的 IntegrationFlow
实例,然后您声明了它们自己的订阅者,但在运行时它们都订阅了同一个 any-channel
实例。默认情况下,它带有 round-robin 平衡器。因此,一条消息发送给您的 Flux.from()
订阅者,而另一条消息发送给 bridge()
不知道如何处理您的消息,因此它尝试解析 replyChannel
header.
因此你的解决方案只有一个 IntegrationFlows.from("any-channel").toReactivePublisher()
是正确的。尽管您可以只进行 FluxMessageChannel
注册并从一侧使用它来发送常规消息,而从另一侧将其用作 Flux.from()
.