处理 Java Akka 流中不断变化的源数据
Processing changing source data in Java Akka streams
2 个线程已启动。 dataListUpdateThread
将数字 2 添加到 List
。 processFlowThread
对同一个 List
中的值求和并将求和列表打印到控制台。这是代码:
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import static java.lang.Thread.sleep;
public class SourceExample {
private final static ActorSystem system = ActorSystem.create("SourceExample");
private static void delayOneSecond() {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void printValue(CompletableFuture<Integer> integerCompletableFuture) {
try {
System.out.println("Sum is " + integerCompletableFuture.get().intValue());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
final List dataList = new ArrayList<Integer>();
final Thread dataListUpdateThread = new Thread(() -> {
while (true) {
dataList.add(2);
System.out.println(dataList);
delayOneSecond();
}
});
dataListUpdateThread.start();
final Thread processFlowThread = new Thread(() -> {
while (true) {
final Source<Integer, NotUsed> source = Source.from(dataList);
final Sink<Integer, CompletionStage<Integer>> sink =
Sink.fold(0, (agg, next) -> agg + next);
final CompletionStage<Integer> sum = source.runWith(sink, system);
printValue(sum.toCompletableFuture());
delayOneSecond();
}
});
processFlowThread.start();
}
}
我试图创建最简单的示例来构建问题。 dataListUpdateThread
可以从 REST 服务或 Kafka 主题填充列表,而不是仅将值 2 添加到列表中。而不是使用 Java 线程应该如何实现这种情况?也就是说,如何共享dataList
给Akka Stream处理?
改变传递给 Source.from
的集合只是巧合:如果集合用完,Source.from
将完成流。这是因为它适用于有限的、严格评估的数据(用例基本上是:a)文档的简单示例和 b)在后台对集合执行操作时想要限制资源消耗的情况(想想列表您要向其发送 HTTP 请求的 URL 的数量))。
注意:自从 Java 7 天以来,我还没有在很大程度上写过 Java,所以我没有提供 Java 代码,只是提供了一个方法大纲。
如之前的回答所述,Source.queue
可能是最佳选择(除了使用 Akka HTTP 或 Alpakka 连接器之类的东西)。在这种情况下,流的物化值是一个在流完成之前不会完成的未来,Source.queue
永远不会完成流(因为它无法知道它的引用是唯一的参考),引入 KillSwitch
并通过 viaMat
和 toMat
传播它将使您能够决定在流之外完成流。
Source.queue
的替代方法是 Source.actorRef
,它可以让您发送一个可区分的消息(Java API 中的 akka.Done.done()
对于这个)。该源具体化为一个 ActorRef
,您可以向其发送 tell
消息,并且这些消息(至少那些与流类型匹配的消息)将可供流使用。
对于 Source.queue
和 Source.actorRef
,prematerialize
它们通常很有用:在像您的示例这样的情况下,您还需要接收器的物化值,替代方案是大量使用 Mat
运算符来自定义物化值(在 Scala 中,可以使用元组至少简化多个物化值的组合,但在 Java 中,一旦超出一对(如你会用 queue
),我很确定你必须定义一个 class 来保存三个(队列、killswitch、完成值的未来)物化值)。
还值得注意的是,由于 Akka Streams 运行 在后台的 actors 上(因此根据需要被安排到 ActorSystem
的线程上),几乎没有理由创建运行 流的线程。
2 个线程已启动。 dataListUpdateThread
将数字 2 添加到 List
。 processFlowThread
对同一个 List
中的值求和并将求和列表打印到控制台。这是代码:
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import static java.lang.Thread.sleep;
public class SourceExample {
private final static ActorSystem system = ActorSystem.create("SourceExample");
private static void delayOneSecond() {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void printValue(CompletableFuture<Integer> integerCompletableFuture) {
try {
System.out.println("Sum is " + integerCompletableFuture.get().intValue());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
final List dataList = new ArrayList<Integer>();
final Thread dataListUpdateThread = new Thread(() -> {
while (true) {
dataList.add(2);
System.out.println(dataList);
delayOneSecond();
}
});
dataListUpdateThread.start();
final Thread processFlowThread = new Thread(() -> {
while (true) {
final Source<Integer, NotUsed> source = Source.from(dataList);
final Sink<Integer, CompletionStage<Integer>> sink =
Sink.fold(0, (agg, next) -> agg + next);
final CompletionStage<Integer> sum = source.runWith(sink, system);
printValue(sum.toCompletableFuture());
delayOneSecond();
}
});
processFlowThread.start();
}
}
我试图创建最简单的示例来构建问题。 dataListUpdateThread
可以从 REST 服务或 Kafka 主题填充列表,而不是仅将值 2 添加到列表中。而不是使用 Java 线程应该如何实现这种情况?也就是说,如何共享dataList
给Akka Stream处理?
改变传递给 Source.from
的集合只是巧合:如果集合用完,Source.from
将完成流。这是因为它适用于有限的、严格评估的数据(用例基本上是:a)文档的简单示例和 b)在后台对集合执行操作时想要限制资源消耗的情况(想想列表您要向其发送 HTTP 请求的 URL 的数量))。
注意:自从 Java 7 天以来,我还没有在很大程度上写过 Java,所以我没有提供 Java 代码,只是提供了一个方法大纲。
如之前的回答所述,Source.queue
可能是最佳选择(除了使用 Akka HTTP 或 Alpakka 连接器之类的东西)。在这种情况下,流的物化值是一个在流完成之前不会完成的未来,Source.queue
永远不会完成流(因为它无法知道它的引用是唯一的参考),引入 KillSwitch
并通过 viaMat
和 toMat
传播它将使您能够决定在流之外完成流。
Source.queue
的替代方法是 Source.actorRef
,它可以让您发送一个可区分的消息(Java API 中的 akka.Done.done()
对于这个)。该源具体化为一个 ActorRef
,您可以向其发送 tell
消息,并且这些消息(至少那些与流类型匹配的消息)将可供流使用。
对于 Source.queue
和 Source.actorRef
,prematerialize
它们通常很有用:在像您的示例这样的情况下,您还需要接收器的物化值,替代方案是大量使用 Mat
运算符来自定义物化值(在 Scala 中,可以使用元组至少简化多个物化值的组合,但在 Java 中,一旦超出一对(如你会用 queue
),我很确定你必须定义一个 class 来保存三个(队列、killswitch、完成值的未来)物化值)。
还值得注意的是,由于 Akka Streams 运行 在后台的 actors 上(因此根据需要被安排到 ActorSystem
的线程上),几乎没有理由创建运行 流的线程。