基于 Akka actor 源的滑动 window 未按预期运行

Sliding window based on Akka actor source not behaving as expected

使用下面的代码,我尝试使用 actor 作为源并发送 Double 类型的消息以通过滑动 window.

进行处理

滑动 windows 定义为 sliding(2, 2) 来计算发送的每个 twp 值序列。

正在发送消息:

        actorRef.tell(10, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(30, ActorRef.noSender());
        actorRef.tell(40, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());

应按如下方式计算平均值:

10 + 20 / 2 = 15
30 + 40 / 2 = 35

但是在下面的代码中似乎没有调用计算。

这里我输出值:

    movingAverage.runForeach(n -> {
        if( n > 0){
            System.out.println(n);
        }
    }, system);

源代码:

import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.Optional;

public class FilterThreshold {

    public static void main(String[] args) {

        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("Source");

        final int bufferSize = 1;
        final Source<Double, ActorRef> source =
                Source.actorRef(
                        elem -> {
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) {
                                return Optional.of(CompletionStrategy.immediately());
                            } else {
                                return Optional.empty();
                            }
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        bufferSize,
                        OverflowStrategy.dropHead());
        ActorRef actorRef = source.to(Sink.foreach(System.out::println)).run(system);

        actorRef.tell(10, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(30, ActorRef.noSender());
        actorRef.tell(40, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());

        Source<Double, ActorRef> movingAverage = source
                .sliding(2, 2)
                .map(window -> (window.stream().mapToDouble(i -> i).sum()) / window.size());


        movingAverage.runForeach(n -> {
            if( n > 0){
                System.out.println(n);
            }
        }, system);

    }
}

我编辑了 https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/sliding.html

中的代码

如何应用定义为 movingAverage 的滑动 window 函数来计算通过 Akka actor actorRef 发送的值?

更新:

方法permaterialize将演员系统作为参数。

更新代码来自:

final Pair<ActorRef, Source<Double, ActorRef>> prematPair = source.preMaterialize();

至:

final Pair<ActorRef, Source<Double, ActorRef>> prematPair = source.preMaterialize(system);

导致编译时错误:

Required type:
Pair
<ActorRef,
Source<Double, ActorRef>>

Provided:
Pair
<ActorRef,
Source<Double, NotUsed>>

我应该使用其他方法吗?

已发布更新代码:

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;

import java.util.Optional;

public class FilterThreshold {

    public static void main(String[] args) {

        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("Source");

        final int bufferSize = 1;
        final Source<Double, ActorRef> source =
                Source.actorRef(
                        elem -> {
                            System.out.println("elem is "+elem);
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) {
                                return Optional.of(CompletionStrategy.immediately());
                            } else {
                                return Optional.empty();
                            }
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        bufferSize,
                        OverflowStrategy.dropHead());

        // source is as before
        final Pair<ActorRef, Source<Double, ActorRef>> prematPair = source.preMaterialize(system);

        Flow<Double, Double, NotUsed> movingAverageFlow =
                Flow.of(Double.class)
                        .sliding(2, 2)
                        .map(window -> (window.stream().mapToDouble(i -> i).sum()) / window.size());

        final Source<Double, ActorRef> prematSource = prematPair.second();

        prematSource.via(movingAverageFlow).runForeach(n -> {
            System.out.println("n is "+n);
            if (n > 0) {
                System.out.println(n);
            }
        }, system);

        final ActorRef actorRef = prematPair.first();

        actorRef.tell(10, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());

    }
}

更新2:

使用代码:

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;

import java.util.Optional;

public class FilterThreshold {

    public static void main(String[] args) {

        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("Source");

        final int bufferSize = 1;
        final Source<Double, ActorRef> source =
                Source.actorRef(
                        elem -> {
                            System.out.println("elem is "+elem);
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) {
                                return Optional.of(CompletionStrategy.immediately());
                            } else {
                                return Optional.empty();
                            }
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        bufferSize,
                        OverflowStrategy.dropHead());

        // source is as before
        final Pair<ActorRef, Source<Double, NotUsed>> prematPair = source.preMaterialize(system);
        final ActorRef actorRef = prematPair.first();
        final Source<Double, NotUsed> prematSource = prematPair.second();

        Flow<Double, Double, NotUsed> movingAverageFlow =
                Flow.of(Double.class)
                        .sliding(2, 2)
                        .map(window -> (window.stream().mapToDouble(i -> i).sum()) / window.size());

        prematSource.via(movingAverageFlow).runForeach(n -> {
            System.out.println("n is "+n);
            if (n > 0) {
                System.out.println(n);
            }
        }, system);

        actorRef.tell(10, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());
        actorRef.tell(20, ActorRef.noSender());

        prematSource.run(system);

    }
}

打印:

elem is 10
elem is 20
elem is 20
elem is 20
elem is 20

看来消息发送正确,但移动平均线没有具体化。

使用 prematSource.run(system); 不是实现值的正确方法吗?

简短的回答是,您的 source 是实现 Source<Double, ActorRef> 的各种方法,并且每个实现最终都是不同的来源。

在您的代码中,source.to(Sink.foreach(System.out::println)).run(system) 是一个流,具体化的 actorRef 仅连接到该流,并且

movingAverage.runForeach(n -> {
    if( n > 0){
        System.out.println(n);
    }
}, system);

是一个完全独立的流,具有不同的物化 ActorRef(最终被丢弃,因为 runForeach 物化为 CompletionStage<Done>

处理 Source.actorRef 时,在 运行 流之前预先实现源通常是个好主意:

import akka.NotUsed
import akka.japi.Pair
import akka.stream.javadsl.Flow

// source is as before
final Pair<ActorRef, Source<Double, NotUsed>> prematPair = source.preMaterialize(system);
final ActorRef actorRef = prematPair.first();
final Source<Double, NotUsed> prematSource = prematPair.second();

Flow<Double, Double, NotUsed> movingAverageFlow =
    Flow.of(Double.class)
        .sliding(2, 2)
        .map(window -> (window.stream().mapToDouble(i -> i).sum()) / window.size());

prematSource.via(movingAverageFlow).runForeach(n -> {
    if (n > 0) {
      System.out.println(n);
    }
}, system);

(抱歉,我的 Java 有点生疏了)