DirectChannel 和 FluxMessageChannel 的区别
Difference between DirectChannel and FluxMessageChannel
我正在阅读有关 Spring 集成的 FluxMessageChannel
here and here,但我仍然不明白使用 DirectChannel
和 [=10] 之间的确切区别=] 使用 Project Reactor 时。由于 DirectChannel 是无状态的并由其轮询器控制,因此我希望 FluxMessageChannel
不需要。在谈到使用 Spring 集成实现的 Reactive Streams 应用程序时,我试图了解我究竟应该在什么时候使用它们以及为什么使用它们。
我目前有一个使用 DirectChannel
的反应式项目,它似乎工作正常,甚至文档说:
the flow behavior is changed from an imperative push model to a reactive pull model
我想了解何时使用每个通道以及使用 Reactive Streams 时的确切区别是什么。
DirectChannel
没有任何轮询器,它的实现很简单:只要有消息发送给它,就会调用处理程序。在同一个调用者的线程中:
public class DirectChannel 扩展了 AbstractSubscribableChannel {
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
private volatile Integer maxSubscribers;
/**
* Create a channel with default {@link RoundRobinLoadBalancingStrategy}.
*/
public DirectChannel() {
this(new RoundRobinLoadBalancingStrategy());
}
其中 UnicastingDispatcher
是:
public final boolean dispatch(final Message<?> message) {
if (this.executor != null) {
Runnable task = createMessageHandlingTask(message);
this.executor.execute(task);
return true;
}
return this.doDispatch(message);
}
(DirectChannel
没有 executor
选项)
private boolean doDispatch(Message<?> message) {
if (tryOptimizedDispatch(message)) {
return true;
}
...
protected boolean tryOptimizedDispatch(Message<?> message) {
MessageHandler handler = this.theOneHandler;
if (handler != null) {
try {
handler.handleMessage(message);
return true;
}
catch (Exception e) {
throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
() -> "Dispatcher failed to deliver Message", e);
}
}
return false;
}
这就是我称之为“命令式推送模型”的原因。在这种情况下,调用者将等待处理程序完成其工作。如果你有一个大流量,发送者线程中的一切都会停止,直到发送的消息到达直接通道流的末端。用两个简单的词来说:发布者负责整个执行,在这种情况下它被阻止了。您基于 DirectChannel
的解决方案没有遇到任何问题,只是因为您没有使用反应式 non-blocking 线程,就像 WebFlux 中的 Netty 或 MongoDB 反应式驱动程序一样。
FluxMessageChannel
实际上是为 Reactive Streams 目的而设计的,订阅者负责处理根据需要从 Flux
中提取的消息。这样,在发送之后,发布者就可以自由地做任何其他事情。仅仅因为处理消息已经是订阅者的责任。
我会说只要您的处理程序不阻塞,使用 DirectChannel
肯定没问题。只要他们在阻止,你就应该选择 FluxMessageChannel
。尽管不要忘记还有用于不同任务的其他通道类型:https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations
我正在阅读有关 Spring 集成的 FluxMessageChannel
here and here,但我仍然不明白使用 DirectChannel
和 [=10] 之间的确切区别=] 使用 Project Reactor 时。由于 DirectChannel 是无状态的并由其轮询器控制,因此我希望 FluxMessageChannel
不需要。在谈到使用 Spring 集成实现的 Reactive Streams 应用程序时,我试图了解我究竟应该在什么时候使用它们以及为什么使用它们。
我目前有一个使用 DirectChannel
的反应式项目,它似乎工作正常,甚至文档说:
the flow behavior is changed from an imperative push model to a reactive pull model
我想了解何时使用每个通道以及使用 Reactive Streams 时的确切区别是什么。
DirectChannel
没有任何轮询器,它的实现很简单:只要有消息发送给它,就会调用处理程序。在同一个调用者的线程中:
public class DirectChannel 扩展了 AbstractSubscribableChannel {
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
private volatile Integer maxSubscribers;
/**
* Create a channel with default {@link RoundRobinLoadBalancingStrategy}.
*/
public DirectChannel() {
this(new RoundRobinLoadBalancingStrategy());
}
其中 UnicastingDispatcher
是:
public final boolean dispatch(final Message<?> message) {
if (this.executor != null) {
Runnable task = createMessageHandlingTask(message);
this.executor.execute(task);
return true;
}
return this.doDispatch(message);
}
(DirectChannel
没有 executor
选项)
private boolean doDispatch(Message<?> message) {
if (tryOptimizedDispatch(message)) {
return true;
}
...
protected boolean tryOptimizedDispatch(Message<?> message) {
MessageHandler handler = this.theOneHandler;
if (handler != null) {
try {
handler.handleMessage(message);
return true;
}
catch (Exception e) {
throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
() -> "Dispatcher failed to deliver Message", e);
}
}
return false;
}
这就是我称之为“命令式推送模型”的原因。在这种情况下,调用者将等待处理程序完成其工作。如果你有一个大流量,发送者线程中的一切都会停止,直到发送的消息到达直接通道流的末端。用两个简单的词来说:发布者负责整个执行,在这种情况下它被阻止了。您基于 DirectChannel
的解决方案没有遇到任何问题,只是因为您没有使用反应式 non-blocking 线程,就像 WebFlux 中的 Netty 或 MongoDB 反应式驱动程序一样。
FluxMessageChannel
实际上是为 Reactive Streams 目的而设计的,订阅者负责处理根据需要从 Flux
中提取的消息。这样,在发送之后,发布者就可以自由地做任何其他事情。仅仅因为处理消息已经是订阅者的责任。
我会说只要您的处理程序不阻塞,使用 DirectChannel
肯定没问题。只要他们在阻止,你就应该选择 FluxMessageChannel
。尽管不要忘记还有用于不同任务的其他通道类型:https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations