使用 Akka Streams 计算平均值
Calcualte average using Akka Streams
使用本指南:https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/splitAfter.html 我正在尝试计算列表中多个属性的平均值。更具体地说,使用下面的代码,我试图计算在指定时间范围内发出值的演员源的平均值:
import akka.Done;
import akka.actor.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import lombok.Getter;
import lombok.Setter;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Optional;
public class TestTradeStrategy1 {
class GroupedStats {
@Getter
@Setter
private Double volume;
@Getter
@Setter
private Double open;
@Getter
@Setter
private long timestamp;
}
private final static ActorSystem system = ActorSystem.create(Behaviors.empty(), "as");
private final static int BUFFER_SIZE = 100;
private void runFlow() throws InterruptedException {
final Source<Integer, ActorRef> source =
Source.actorRef(
elem -> {
if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
else return Optional.empty();
},
elem -> Optional.empty(),
BUFFER_SIZE,
OverflowStrategy.dropHead());
final ActorRef actorRef = source.throttle(20, Duration.ofSeconds(1))
.map(elem -> new Pair<>(elem, Instant.now()))
.sliding(2 , 1)
.splitAfter(slidingElements -> {
if (slidingElements.size() == 2) {
final Pair<Integer, Instant> current = slidingElements.get(0);
final Pair<Integer, Instant> next = slidingElements.get(1);
final LocalDateTime currentBucket = LocalDateTime.ofInstant(current.second(), ZoneOffset.UTC).withNano(0);
final LocalDateTime nextBucket = LocalDateTime.ofInstant(next.second(), ZoneOffset.UTC).withNano(0);
return !currentBucket.equals(nextBucket);
} else {
return false;
}
})
.map(slidingElements -> slidingElements.get(0).first())
.reduce((acc, element) -> acc + element)
.to(Sink.foreach(x -> {
System.out.println(x);
}))
.run(system);
while(true){
actorRef.tell(1, ActorRef.noSender());
Thread.sleep(100);
}
}
public static void main(String args[]) throws Exception {
TestTradeStrategy1 t = new TestTradeStrategy1();
t.runFlow();
}
}
我最接近的只是使用 reduce
对值求和:
.reduce((acc, element) -> acc + element)
如何修改以便计算平均值?
您可以先映射将元素转换为新数据结构的流,而不是减少数字元素流,a) 包含数值和 b) 对这个值有贡献的元素数量,这意味着它最初将为 1,因此 10、11、12 的流将变为 (10, 1), (11, 1), (12, 1).
当您随后减少流时,您通过将数值和元素数计数器相加来组合元素。这样,您最终不仅会获得所有数值的总和,还会获得您相加的元素的数量(在示例中为 (33, 3))。获得平均值就很简单了。
使用本指南:https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/splitAfter.html 我正在尝试计算列表中多个属性的平均值。更具体地说,使用下面的代码,我试图计算在指定时间范围内发出值的演员源的平均值:
import akka.Done;
import akka.actor.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import lombok.Getter;
import lombok.Setter;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Optional;
public class TestTradeStrategy1 {
class GroupedStats {
@Getter
@Setter
private Double volume;
@Getter
@Setter
private Double open;
@Getter
@Setter
private long timestamp;
}
private final static ActorSystem system = ActorSystem.create(Behaviors.empty(), "as");
private final static int BUFFER_SIZE = 100;
private void runFlow() throws InterruptedException {
final Source<Integer, ActorRef> source =
Source.actorRef(
elem -> {
if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
else return Optional.empty();
},
elem -> Optional.empty(),
BUFFER_SIZE,
OverflowStrategy.dropHead());
final ActorRef actorRef = source.throttle(20, Duration.ofSeconds(1))
.map(elem -> new Pair<>(elem, Instant.now()))
.sliding(2 , 1)
.splitAfter(slidingElements -> {
if (slidingElements.size() == 2) {
final Pair<Integer, Instant> current = slidingElements.get(0);
final Pair<Integer, Instant> next = slidingElements.get(1);
final LocalDateTime currentBucket = LocalDateTime.ofInstant(current.second(), ZoneOffset.UTC).withNano(0);
final LocalDateTime nextBucket = LocalDateTime.ofInstant(next.second(), ZoneOffset.UTC).withNano(0);
return !currentBucket.equals(nextBucket);
} else {
return false;
}
})
.map(slidingElements -> slidingElements.get(0).first())
.reduce((acc, element) -> acc + element)
.to(Sink.foreach(x -> {
System.out.println(x);
}))
.run(system);
while(true){
actorRef.tell(1, ActorRef.noSender());
Thread.sleep(100);
}
}
public static void main(String args[]) throws Exception {
TestTradeStrategy1 t = new TestTradeStrategy1();
t.runFlow();
}
}
我最接近的只是使用 reduce
对值求和:
.reduce((acc, element) -> acc + element)
如何修改以便计算平均值?
您可以先映射将元素转换为新数据结构的流,而不是减少数字元素流,a) 包含数值和 b) 对这个值有贡献的元素数量,这意味着它最初将为 1,因此 10、11、12 的流将变为 (10, 1), (11, 1), (12, 1).
当您随后减少流时,您通过将数值和元素数计数器相加来组合元素。这样,您最终不仅会获得所有数值的总和,还会获得您相加的元素的数量(在示例中为 (33, 3))。获得平均值就很简单了。