计算给定 window 的流的统计信息

Computing statistics over a stream for a given window

我有一个自动收报机 KStream,它经常跳动(想想秒),我想计算 24 小时内的各种统计数据 window。例如,24 小时变化,给定点与 24 小时前的价格差异。

我想要的输入的输出是:

t1 -> t1c1
t2 -> t1c2
t3 -> t1c3

其中 t1 是输入代码,t1c1 是输入代码以及针对它之前 window 的 24 小时计算的附加统计数据。

我考虑过一些没有奏效的方法: * Window 我的代码流按大小 24 小时,跳数为 1 秒。

builder.stream(rawPriceTickerTopic, ...)
            .groupByKey()
            .windowedBy(
                    TimeWindows.of(TimeUnit.DAYS.toMillis(1))
                    .advanceBy(TimeUnit.SECONDS.toMillis(1))
            .reduce((value1, value2) ->
                    value1.tickerWithStatsFrom(value2), ...)                
            .toStream();

然而,这会产生大量的输出点,因为每个输入代码都会为它所属的每个 window 生成一个输出代码。

我在这里的最终解决方案是放弃 windowing 并简单地聚合我的代码,在聚合器中维护我自己的 24 小时 window。这仍然不是最好的方法,并且有一种挥之不去的感觉,我本可以使用 Kafka 的内置 windowing 概念来解决它。

如上所述,我对聚合器使用简单聚合:

streamBuilder.stream(tickerTopic, Consumed.with(...)
                .groupByKey()
                .aggregate(MyAggregator::new,
                        (key, value, aggregate) -> aggregate.addTicker(value),
                        Materialized.with(...)
                .toStream()

结果是,对于原始代码流中的每条记录,我在输出流中得到一个聚合值。我的聚合器逻辑很简单:

  • 向已排序的集合添加新代码。
  • 丢弃任何比这个新的最新代码早 24 小时以上的代码。
  • 计算新的 24 小时变化。

(此技术可用于给定 window 的任何类型的计算,例如移动平均线。)

聚合器的示例代码:

public class MyAggregator {

    private BigDecimal change;

    private TreeSet<Ticker> orderedTickers = new TreeSet<>(MyAggregator::tickerTimeComparator);

    public MyAggregator () {
        this.windowMilis = 86400000;
    }

    public MyAggregator addTicker(Ticker ticker) {
        orderedTickers.add(ticker);
        cleanOldTickers();
        change = getLatest().getAsk().subtract(getEarliest().getAsk());
        return this;
    }

    public BigDecimal getChange() {
        return change;
    }

    public Ticker getEarliest() {
        return orderedTickers.first();
    }

    public Ticker getLatest() {
        return orderedTickers.last();
    }

    private void cleanOldTickers() {
        Date endOfWindow = latestWindow();

        Iterator<Ticker> iterator = orderedTickers.iterator();
        while(iterator.hasNext()) {
            Ticker next = iterator.next();
            if (next.getTimestamp().before(endOfWindow)) {
                iterator.remove();
            }
            // The collection is sorted by time so if we get here we can break.
            break;
        }
    }

    private Date latestWindow() {
        return new Date(getLatest().getTimestamp().getTime() - windowMilis);
    }

    private static int tickerTimeComparator(Ticker t1, Ticker t2) {
        return t1.getTimestamp().compareTo(t2.getTimestamp());
    }

}