更改 akka 流的源数据
Changing source data for akka streams
我正在学习 Java Akka 流并使用 https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html 定义了以下内容:
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
public class SourceExample {
static ActorSystem system = ActorSystem.create("SourceExample");
public static void main(String args[]) throws ExecutionException, InterruptedException {
final List<Integer> sourceData = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
final Source<Integer, NotUsed> source =
Source.from(sourceData);
final Sink<Integer, CompletionStage<Integer>> sink =
Sink.<Integer, Integer>fold(0, (agg, next) -> agg + next);
final CompletionStage<Integer> sum = source.runWith(sink, system);
System.out.println(sum.toCompletableFuture().get());
}
}
运行 此代码按预期运行。
Akka Streams 解决了这段代码可以重复执行的问题吗?
在现实场景中,sourceData
不会是静态的,Akka Streams 对如何处理变化的数据有意见还是由开发人员决定?
在最简单的情况下,当源数据发生变化时,只需每 X 分钟重新执行一次流式处理流程(例如使用计划任务)。还是Akka流是长寿命的,源数据发生变化,流计算根据一些参数重新执行?
Akka Streams 文档定义了多个数据源,但我不明白应该如何利用 Akka Streams 来处理不断变化的源数据。
Akka Streams 可以而且经常这样做 运行 直到(不久之前)您的应用程序停止。例如,流消费(例如使用来自 Alpakka Kafka 的 Kafka 消费者源)Kafka 记录在应用程序中很早就开始,直到应用程序被终止才停止。
具体来说,流 运行 秒直到:
- 阶段表示完成(例如,在您的示例中,
Source.from
会在发出 10
后表示完成)
- 阶段失败(通常抛出异常)
一个对动态数据有用的示例源(不引入 Alpakka 或 Akka HTTP)是 Source.queue
,它具体化为一个队列,其中排队的元素可用于流。
我正在学习 Java Akka 流并使用 https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html 定义了以下内容:
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
public class SourceExample {
static ActorSystem system = ActorSystem.create("SourceExample");
public static void main(String args[]) throws ExecutionException, InterruptedException {
final List<Integer> sourceData = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
final Source<Integer, NotUsed> source =
Source.from(sourceData);
final Sink<Integer, CompletionStage<Integer>> sink =
Sink.<Integer, Integer>fold(0, (agg, next) -> agg + next);
final CompletionStage<Integer> sum = source.runWith(sink, system);
System.out.println(sum.toCompletableFuture().get());
}
}
运行 此代码按预期运行。
Akka Streams 解决了这段代码可以重复执行的问题吗?
在现实场景中,sourceData
不会是静态的,Akka Streams 对如何处理变化的数据有意见还是由开发人员决定?
在最简单的情况下,当源数据发生变化时,只需每 X 分钟重新执行一次流式处理流程(例如使用计划任务)。还是Akka流是长寿命的,源数据发生变化,流计算根据一些参数重新执行?
Akka Streams 文档定义了多个数据源,但我不明白应该如何利用 Akka Streams 来处理不断变化的源数据。
Akka Streams 可以而且经常这样做 运行 直到(不久之前)您的应用程序停止。例如,流消费(例如使用来自 Alpakka Kafka 的 Kafka 消费者源)Kafka 记录在应用程序中很早就开始,直到应用程序被终止才停止。
具体来说,流 运行 秒直到:
- 阶段表示完成(例如,在您的示例中,
Source.from
会在发出10
后表示完成) - 阶段失败(通常抛出异常)
一个对动态数据有用的示例源(不引入 Alpakka 或 Akka HTTP)是 Source.queue
,它具体化为一个队列,其中排队的元素可用于流。