处理 Java Akka 流中不断变化的源数据

Processing changing source data in Java Akka streams

2 个线程已启动。 dataListUpdateThread 将数字 2 添加到 ListprocessFlowThread 对同一个 List 中的值求和并将求和列表打印到控制台。这是代码:

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

import static java.lang.Thread.sleep;


public class SourceExample {

    private final static ActorSystem system = ActorSystem.create("SourceExample");

    private static void delayOneSecond() {
        try {
            sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void printValue(CompletableFuture<Integer> integerCompletableFuture) {
        try {
            System.out.println("Sum is " + integerCompletableFuture.get().intValue());
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        final List dataList = new ArrayList<Integer>();
        final Thread dataListUpdateThread = new Thread(() -> {
            while (true) {
                dataList.add(2);
                System.out.println(dataList);
                delayOneSecond();
            }
        });
        dataListUpdateThread.start();

        final Thread processFlowThread = new Thread(() -> {
            while (true) {
                final Source<Integer, NotUsed> source = Source.from(dataList);

                final Sink<Integer, CompletionStage<Integer>> sink =
                        Sink.fold(0, (agg, next) -> agg + next);

                final CompletionStage<Integer> sum = source.runWith(sink, system);

                printValue(sum.toCompletableFuture());

                delayOneSecond();
            }
        });

        processFlowThread.start();
    }
}

我试图创建最简单的示例来构建问题。 dataListUpdateThread 可以从 REST 服务或 Kafka 主题填充列表,而不是仅将值 2 添加到列表中。而不是使用 Java 线程应该如何实现这种情况?也就是说,如何共享dataList给Akka Stream处理?

改变传递给 Source.from 的集合只是巧合:如果集合用完,Source.from 将完成流。这是因为它适用于有限的、严格评估的数据(用例基本上是:a)文档的简单示例和 b)在后台对集合执行操作时想要限制资源消耗的情况(想想列表您要向其发送 HTTP 请求的 URL 的数量))。

注意:自从 Java 7 天以来,我还没有在很大程度上写过 Java,所以我没有提供 Java 代码,只是提供了一个方法大纲。

如之前的回答所述,Source.queue 可能是最佳选择(除了使用 Akka HTTP 或 Alpakka 连接器之类的东西)。在这种情况下,流的物化值是一个在流完成之前不会完成的未来,Source.queue 永远不会完成流(因为它无法知道它的引用是唯一的参考),引入 KillSwitch 并通过 viaMattoMat 传播它将使您能够决定在流之外完成流。

Source.queue 的替代方法是 Source.actorRef,它可以让您发送一个可区分的消息(Java API 中的 akka.Done.done() 对于这个)。该源具体化为一个 ActorRef,您可以向其发送 tell 消息,并且这些消息(至少那些与流类型匹配的消息)将可供流使用。

对于 Source.queueSource.actorRefprematerialize 它们通常很有用:在像您的示例这样的情况下,您还需要接收器的物化值,替代方案是大量使用 Mat 运算符来自定义物化值(在 Scala 中,可以使用元组至少简化多个物化值的组合,但在 Java 中,一旦超出一对(如你会用 queue),我很确定你必须定义一个 class 来保存三个(队列、killswitch、完成值的未来)物化值)。

还值得注意的是,由于 Akka Streams 运行 在后台的 actors 上(因此根据需要被安排到 ActorSystem 的线程上),几乎没有理由创建运行 流的线程。