DirectChannel 和 FluxMessageChannel 的区别

Difference between DirectChannel and FluxMessageChannel

我正在阅读有关 Spring 集成的 FluxMessageChannel here and here,但我仍然不明白使用 DirectChannel 和 [=10] 之间的确切区别=] 使用 Project Reactor 时。由于 DirectChannel 是无状态的并由其轮询器控制,因此我希望 FluxMessageChannel 不需要。在谈到使用 Spring 集成实现的 Reactive Streams 应用程序时,我试图了解我究竟应该在什么时候使用它们以及为什么使用它们。

我目前有一个使用 DirectChannel 的反应式项目,它似乎工作正常,甚至文档说:

the flow behavior is changed from an imperative push model to a reactive pull model

我想了解何时使用每个通道以及使用 Reactive Streams 时的确切区别是什么。

DirectChannel没有任何轮询器,它的实现很简单:只要有消息发送给它,就会调用处理程序。在同一个调用者的线程中:

public class DirectChannel 扩展了 AbstractSubscribableChannel {

private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();

private volatile Integer maxSubscribers;

/**
 * Create a channel with default {@link RoundRobinLoadBalancingStrategy}.
 */
public DirectChannel() {
    this(new RoundRobinLoadBalancingStrategy());
}

其中 UnicastingDispatcher 是:

public final boolean dispatch(final Message<?> message) {
    if (this.executor != null) {
        Runnable task = createMessageHandlingTask(message);
        this.executor.execute(task);
        return true;
    }
    return this.doDispatch(message);
}

DirectChannel 没有 executor 选项)

private boolean doDispatch(Message<?> message) {
    if (tryOptimizedDispatch(message)) {
        return true;
    }

...

protected boolean tryOptimizedDispatch(Message<?> message) {
    MessageHandler handler = this.theOneHandler;
    if (handler != null) {
        try {
            handler.handleMessage(message);
            return true;
        }
        catch (Exception e) {
            throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
                    () -> "Dispatcher failed to deliver Message", e);
        }
    }
    return false;
}

这就是我称之为“命令式推送模型”的原因。在这种情况下,调用者将等待处理程序完成其工作。如果你有一个大流量,发送者线程中的一切都会停止,直到发送的消息到达直接通道流的末端。用两个简单的词来说:发布者负责整个执行,在这种情况下它被阻止了。您基于 DirectChannel 的解决方案没有遇到任何问题,只是因为您没有使用反应式 non-blocking 线程,就像 WebFlux 中的 Netty 或 MongoDB 反应式驱动程序一样。

FluxMessageChannel 实际上是为 Reactive Streams 目的而设计的,订阅者负责处理根据需要从 Flux 中提取的消息。这样,在发送之后,发布者就可以自由地做任何其他事情。仅仅因为处理消息已经是订阅者的责任。

我会说只要您的处理程序不阻塞,使用 DirectChannel 肯定没问题。只要他们在阻止,你就应该选择 FluxMessageChannel。尽管不要忘记还有用于不同任务的其他通道类型:https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations