有没有办法让 Akka Streams 的 groupedWithin 发出空组?

Is there any way to get Akka Streams' groupedWithin to emit empty groups?

在下面的代码中,tick 每三秒发出一个新对象。我正在尝试使用 groupedWithin (忽略空组)计算每秒发射的对象数。 Akka Streams 中是否有任何方法可以让以下代码在 tick 不发出任何对象的时间段内打印 0

Source.tick(Duration.ZERO, Duration.ofSeconds(3), new Object())
    .groupedWithin(Integer.MAX_VALUE, Duration.ofSeconds(1))
    .runWith(Sink.foreach(e -> System.out.println(e)), materializer);

换句话说,我希望这段代码的输出是这样的序列:1 0 0 1 0 0 1 ...(每秒)而不是 1 1 1 ...(每三秒)。


Source.tick(Duration.ZERO, Duration.ofSeconds(3), new Object())
    .keepAlive(Duration.ofSeconds(1), KeepAliveElement::new)
    .groupedWithin(Integer.MAX_VALUE, Duration.ofSeconds(1))
    .map(lst -> lst.stream().filter(e -> !(e instanceof KeepAliveElement)).collect(Collectors.toList()))
    .runWith(Sink.foreach(e -> System.out.println(e)), materializer);




基于 Akka 的 GroupedWeightedWithin 和文档 here:

public class CountInPeriod<T> extends GraphStage<FlowShape<T, Integer>> {
    public Inlet<T> in = Inlet.<T>create("CountInPeriod.in");
    public Outlet<Integer> out = Outlet.<Integer>create("CountInPeriod.out");
    private FlowShape<T, Integer> shape = FlowShape.of(in, out);
    private Duration duration;

    public CountInPeriod(Duration duration) {
        this.duration = duration;

    public GraphStageLogic createLogic(Attributes inheritedAttributes) {
        return new TimerGraphStageLogic(shape) {
            private int counter = 0;
            private int bufferPushCounter = -1;

                setHandler(in, new AbstractInHandler() {
                    @Override public void onPush() throws Exception, Exception {
                setHandler(out, new AbstractOutHandler() {
                    @Override public void onPull() throws Exception, Exception {
                        if (bufferPushCounter >= 0) {
                            push(out, bufferPushCounter);
                            bufferPushCounter = -1;

            public void preStart() throws Exception, Exception {
                scheduleWithFixedDelay(CountInPeriod.class, duration, duration);

            public void onTimer(Object timerKey) throws Exception, Exception {
                if (isAvailable(out)) emitCounter();
                else bufferPush();

            private void emitCounter() {
                push(out, counter);
                counter = 0;
                bufferPushCounter = -1;

            private void bufferPush() {
                bufferPushCounter = counter;
                counter = 0;

    public FlowShape<T, Integer> shape() {
        return shape;


public class GroupTicked {
    final static ActorSystem as = ActorSystem.create("as");

    public static void main(String... args) throws Exception {
        CompletionStage<Done> done = Source.tick(Duration.ZERO, Duration.ofSeconds(3), new Object())
                .take(7) // to finish in finite time...
                .via(new CountInPeriod<>(Duration.ofSeconds(1)))
                .runWith(Sink.foreach(e -> System.out.println(System.currentTimeMillis() + " -> " + e)), as);
        done.thenAccept(x -> as.terminate());