使用 Akka Streams 计算平均值

Calcualte average using Akka Streams

使用本指南:https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/splitAfter.html 我正在尝试计算列表中多个属性的平均值。更具体地说,使用下面的代码,我试图计算在指定时间范围内发出值的演员源的平均值:

import akka.Done;
import akka.actor.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import lombok.Getter;
import lombok.Setter;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Optional;

public class TestTradeStrategy1 {

    class GroupedStats {
        @Getter
        @Setter
        private Double volume;

        @Getter
        @Setter
        private Double open;

        @Getter
        @Setter
        private long timestamp;
    }

    private final static ActorSystem system = ActorSystem.create(Behaviors.empty(), "as");
    private final static int BUFFER_SIZE = 100;

    private void runFlow() throws InterruptedException {

        final Source<Integer, ActorRef> source =
                Source.actorRef(
                        elem -> {
                            if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
                            else return Optional.empty();
                        },
                        elem -> Optional.empty(),
                        BUFFER_SIZE,
                        OverflowStrategy.dropHead());

        final ActorRef actorRef = source.throttle(20, Duration.ofSeconds(1))
                .map(elem -> new Pair<>(elem, Instant.now()))
                .sliding(2 , 1)
                .splitAfter(slidingElements -> {
                            if (slidingElements.size() == 2) {
                                final Pair<Integer, Instant> current = slidingElements.get(0);
                                final Pair<Integer, Instant> next = slidingElements.get(1);
                                final LocalDateTime currentBucket = LocalDateTime.ofInstant(current.second(), ZoneOffset.UTC).withNano(0);
                                final LocalDateTime nextBucket = LocalDateTime.ofInstant(next.second(), ZoneOffset.UTC).withNano(0);
                                return !currentBucket.equals(nextBucket);
                            } else {
                                return false;
                            }
                        })
                .map(slidingElements -> slidingElements.get(0).first())
                .reduce((acc, element) -> acc + element)
                .to(Sink.foreach(x -> {
                        System.out.println(x);
                }))
                .run(system);

        while(true){
            actorRef.tell(1, ActorRef.noSender());
            Thread.sleep(100);
        }

    }

    public static void main(String args[]) throws Exception {
        TestTradeStrategy1 t = new TestTradeStrategy1();
        t.runFlow();
    }

}

我最接近的只是使用 reduce 对值求和:

.reduce((acc, element) -> acc + element)

如何修改以便计算平均值?

您可以先映射将元素转换为新数据结构的流,而不是减少数字元素流,a) 包含数值和 b) 对这个值有贡献的元素数量,这意味着它最初将为 1,因此 10、11、12 的流将变为 (10, 1), (11, 1), (12, 1).

当您随后减少流时,您通过将数值和元素数计数器相加来组合元素。这样,您最终不仅会获得所有数值的总和,还会获得您相加的元素的数量(在示例中为 (33, 3))。获得平均值就很简单了。