如何创建从 akka-stream 源订阅时开始的 Flow.Publisher

How to create a Flow.Publisher that starts on subscribe from an akka-stream Source

我有一个必须实现的 java 接口,它看起来像这样:

public Flow.Publisher<Packet> getLivePublisher();

此接口必须 return 一个 Flow.Publisher 在订阅之前保持不活动状态,并且订阅者调用 Subscription.next(n).

到目前为止,我的实现看起来像

return Source
    .fromIterator(() -> new LivePacketIterator())
    .async("live-dispatcher")
    .runWith(JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), actorSystem);

不幸的是,这似乎立即开始从我的 LivePacketIterator 获取元素,即使没有订阅者订阅 returned Flow.Publisher

我知道 Source 只是一种可订阅对象源的模板(我的理解是它就像一个出版商工厂),并且它只会转换为具体的活动源一旦实现。所以如果我理解正确的话,我需要以某种方式具体化我的 Source 以获得 Flow.Publisher。但我希望它以一种仅在订阅时才开始 运行 的方式实现。

我也试过用toMat()

return Source
                .fromIterator(() -> new LivePacketIterator(maximumPacketSize))
                .filter(OrderSnapshotPacket::isNotEmpty)
                .async(dbDispatcher)
                .toMat(JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), Keep.right())
                .???;

但我不确定如何处理结果 RunnableGraph

我理解正确吗? 有没有办法做我想做的事?

Unfortunately, this seems to immediately start getting elements from my LivePacketIterator, even when no subscribers ahve subscribed to the returned Flow.Publisher.

您具体观察到什么来说明这一点?我使用了一个与你的非常相似的片段:

Flow.Publisher<Integer> integerPublisher =
      Source.from(List.of(1,2,3,4,5))
            .wireTap(System.out::println)
            .async()
            .runWith(
              JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT),
              ActorSystem.create());

在订阅发布者之前,这不会开始从列表中发出项目。

I understand that a Source is just a sort of a template for a Subscribable source of objects (my understanding is that it's like a Factory of Publishers), and that it only converts to a concrete active source once it's materialized

有点。所有 Flow.* 个接口都是 reactive streams specification for JVM. Akka Streams treats those interfaces as SPI and doesn't use them directly in its API. It introduces its own abstractions like Source, Flow and Sink. Akka Streams allows you to convert the processing stream expressed in its API to the lower level Flow.* just as you did in your snippet. This is useful if you say want to plugin Akka Streams processing pipeline to some other reactive streams implementation like say RxJava or Project Reactor 的一部分。所以 Source 是 Akka Stream 的抽象,在某种程度上等同于 Flow.Publisher,也就是说,它是潜在无限数量的值的来源。您需要将 Source 连接到 Sink(可能通过 Flow)以便获得 RunnableGraph,您可以 运行。这将使一切都开始运转,在大多数情况下,这将导致订阅链,并且元素将开始在流中流动。但这不是 JavaFlowSupport.Sink.asPublisher Sink 情况下的唯一选择,运行 宁 RunnableGraph 会将整个 Akka Stream 转换为 Flow.Publisher 的实例。这里的语义是订阅被推迟,直到某个地方在该实例上调用 subscribe 。如果我理解正确的话,这正是您要实现的目标。