有没有办法让 Akka Streams 的 groupedWithin 发出空组?

Is there any way to get Akka Streams' groupedWithin to emit empty groups?

在下面的代码中,tick 每三秒发出一个新对象。我正在尝试使用 groupedWithin (忽略空组)计算每秒发射的对象数。 Akka Streams 中是否有任何方法可以让以下代码在 tick 不发出任何对象的时间段内打印 0

Source.tick(Duration.ZERO, Duration.ofSeconds(3), new Object())
    .groupedWithin(Integer.MAX_VALUE, Duration.ofSeconds(1))
    .map(List::size)
    .runWith(Sink.foreach(e -> System.out.println(e)), materializer);

换句话说,我希望这段代码的输出是这样的序列:1 0 0 1 0 0 1 ...(每秒)而不是 1 1 1 ...(每三秒)。

编辑:这是迄今为止我想出的最好的解决方法(如果上游空闲,使用keepAlive发送一些特殊对象):

Source.tick(Duration.ZERO, Duration.ofSeconds(3), new Object())
    .keepAlive(Duration.ofSeconds(1), KeepAliveElement::new)
    .groupedWithin(Integer.MAX_VALUE, Duration.ofSeconds(1))
    .map(lst -> lst.stream().filter(e -> !(e instanceof KeepAliveElement)).collect(Collectors.toList()))
    .map(List::size)
    .runWith(Sink.foreach(e -> System.out.println(e)), materializer);

有更好的方法吗?

我以为这会是普通难度,我错了。我想做的一件事是确保通过流的流量计数项目不会保留对它看到的每个项目的引用:如果许多项目在聚合期间通过,你最终会在内存中得到一个不必要的大列表(即使只有一秒钟)以及向其中添加(许多)项目的性能损失。下面的解决方案虽然复杂,但只保留一个计数器。

注意:虽然我测试了快乐的场景,但我不能说这是经过实战验证的,所以请谨慎使用!

基于 Akka 的 GroupedWeightedWithin 和文档 here:

public class CountInPeriod<T> extends GraphStage<FlowShape<T, Integer>> {
    public Inlet<T> in = Inlet.<T>create("CountInPeriod.in");
    public Outlet<Integer> out = Outlet.<Integer>create("CountInPeriod.out");
    private FlowShape<T, Integer> shape = FlowShape.of(in, out);
    private Duration duration;

    public CountInPeriod(Duration duration) {
        this.duration = duration;
    }

    @Override
    public GraphStageLogic createLogic(Attributes inheritedAttributes) {
        return new TimerGraphStageLogic(shape) {
            private int counter = 0;
            private int bufferPushCounter = -1;

            {
                setHandler(in, new AbstractInHandler() {
                    @Override public void onPush() throws Exception, Exception {
                        grab(in);
                        counter++;
                        pull(in);
                    }
                });
                setHandler(out, new AbstractOutHandler() {
                    @Override public void onPull() throws Exception, Exception {
                        if (bufferPushCounter >= 0) {
                            push(out, bufferPushCounter);
                            bufferPushCounter = -1;
                        }
                    }
                });
            }

            @Override
            public void preStart() throws Exception, Exception {
                scheduleWithFixedDelay(CountInPeriod.class, duration, duration);
                pull(in);
            }

            @Override
            public void onTimer(Object timerKey) throws Exception, Exception {
                if (isAvailable(out)) emitCounter();
                else bufferPush();
            }

            private void emitCounter() {
                push(out, counter);
                counter = 0;
                bufferPushCounter = -1;
            }

            private void bufferPush() {
                bufferPushCounter = counter;
                counter = 0;
            }
        };
    }

    @Override
    public FlowShape<T, Integer> shape() {
        return shape;
    }
}

测试代码:

public class GroupTicked {
    final static ActorSystem as = ActorSystem.create("as");

    public static void main(String... args) throws Exception {
        CompletionStage<Done> done = Source.tick(Duration.ZERO, Duration.ofSeconds(3), new Object())
                .take(7) // to finish in finite time...
                .via(new CountInPeriod<>(Duration.ofSeconds(1)))
                .runWith(Sink.foreach(e -> System.out.println(System.currentTimeMillis() + " -> " + e)), as);
        done.thenAccept(x -> as.terminate());
    }
}