计算给定 window 的流的统计信息
Computing statistics over a stream for a given window
我有一个自动收报机 KStream,它经常跳动(想想秒),我想计算 24 小时内的各种统计数据 window。例如,24 小时变化,给定点与 24 小时前的价格差异。
我想要的输入的输出是:
t1 -> t1c1
t2 -> t1c2
t3 -> t1c3
其中 t1
是输入代码,t1c1
是输入代码以及针对它之前 window 的 24 小时计算的附加统计数据。
我考虑过一些没有奏效的方法:
* Window 我的代码流按大小 24 小时,跳数为 1 秒。
builder.stream(rawPriceTickerTopic, ...)
.groupByKey()
.windowedBy(
TimeWindows.of(TimeUnit.DAYS.toMillis(1))
.advanceBy(TimeUnit.SECONDS.toMillis(1))
.reduce((value1, value2) ->
value1.tickerWithStatsFrom(value2), ...)
.toStream();
然而,这会产生大量的输出点,因为每个输入代码都会为它所属的每个 window 生成一个输出代码。
- 保持某种时间序列存储最新,从存储中获取 24 小时前的值,并据此计算我的统计代码,但这似乎与流的观点背道而驰。
我在这里的最终解决方案是放弃 windowing 并简单地聚合我的代码,在聚合器中维护我自己的 24 小时 window。这仍然不是最好的方法,并且有一种挥之不去的感觉,我本可以使用 Kafka 的内置 windowing 概念来解决它。
如上所述,我对聚合器使用简单聚合:
streamBuilder.stream(tickerTopic, Consumed.with(...)
.groupByKey()
.aggregate(MyAggregator::new,
(key, value, aggregate) -> aggregate.addTicker(value),
Materialized.with(...)
.toStream()
结果是,对于原始代码流中的每条记录,我在输出流中得到一个聚合值。我的聚合器逻辑很简单:
- 向已排序的集合添加新代码。
- 丢弃任何比这个新的最新代码早 24 小时以上的代码。
- 计算新的 24 小时变化。
(此技术可用于给定 window 的任何类型的计算,例如移动平均线。)
聚合器的示例代码:
public class MyAggregator {
private BigDecimal change;
private TreeSet<Ticker> orderedTickers = new TreeSet<>(MyAggregator::tickerTimeComparator);
public MyAggregator () {
this.windowMilis = 86400000;
}
public MyAggregator addTicker(Ticker ticker) {
orderedTickers.add(ticker);
cleanOldTickers();
change = getLatest().getAsk().subtract(getEarliest().getAsk());
return this;
}
public BigDecimal getChange() {
return change;
}
public Ticker getEarliest() {
return orderedTickers.first();
}
public Ticker getLatest() {
return orderedTickers.last();
}
private void cleanOldTickers() {
Date endOfWindow = latestWindow();
Iterator<Ticker> iterator = orderedTickers.iterator();
while(iterator.hasNext()) {
Ticker next = iterator.next();
if (next.getTimestamp().before(endOfWindow)) {
iterator.remove();
}
// The collection is sorted by time so if we get here we can break.
break;
}
}
private Date latestWindow() {
return new Date(getLatest().getTimestamp().getTime() - windowMilis);
}
private static int tickerTimeComparator(Ticker t1, Ticker t2) {
return t1.getTimestamp().compareTo(t2.getTimestamp());
}
}
我有一个自动收报机 KStream,它经常跳动(想想秒),我想计算 24 小时内的各种统计数据 window。例如,24 小时变化,给定点与 24 小时前的价格差异。
我想要的输入的输出是:
t1 -> t1c1
t2 -> t1c2
t3 -> t1c3
其中 t1
是输入代码,t1c1
是输入代码以及针对它之前 window 的 24 小时计算的附加统计数据。
我考虑过一些没有奏效的方法: * Window 我的代码流按大小 24 小时,跳数为 1 秒。
builder.stream(rawPriceTickerTopic, ...)
.groupByKey()
.windowedBy(
TimeWindows.of(TimeUnit.DAYS.toMillis(1))
.advanceBy(TimeUnit.SECONDS.toMillis(1))
.reduce((value1, value2) ->
value1.tickerWithStatsFrom(value2), ...)
.toStream();
然而,这会产生大量的输出点,因为每个输入代码都会为它所属的每个 window 生成一个输出代码。
- 保持某种时间序列存储最新,从存储中获取 24 小时前的值,并据此计算我的统计代码,但这似乎与流的观点背道而驰。
我在这里的最终解决方案是放弃 windowing 并简单地聚合我的代码,在聚合器中维护我自己的 24 小时 window。这仍然不是最好的方法,并且有一种挥之不去的感觉,我本可以使用 Kafka 的内置 windowing 概念来解决它。
如上所述,我对聚合器使用简单聚合:
streamBuilder.stream(tickerTopic, Consumed.with(...)
.groupByKey()
.aggregate(MyAggregator::new,
(key, value, aggregate) -> aggregate.addTicker(value),
Materialized.with(...)
.toStream()
结果是,对于原始代码流中的每条记录,我在输出流中得到一个聚合值。我的聚合器逻辑很简单:
- 向已排序的集合添加新代码。
- 丢弃任何比这个新的最新代码早 24 小时以上的代码。
- 计算新的 24 小时变化。
(此技术可用于给定 window 的任何类型的计算,例如移动平均线。)
聚合器的示例代码:
public class MyAggregator {
private BigDecimal change;
private TreeSet<Ticker> orderedTickers = new TreeSet<>(MyAggregator::tickerTimeComparator);
public MyAggregator () {
this.windowMilis = 86400000;
}
public MyAggregator addTicker(Ticker ticker) {
orderedTickers.add(ticker);
cleanOldTickers();
change = getLatest().getAsk().subtract(getEarliest().getAsk());
return this;
}
public BigDecimal getChange() {
return change;
}
public Ticker getEarliest() {
return orderedTickers.first();
}
public Ticker getLatest() {
return orderedTickers.last();
}
private void cleanOldTickers() {
Date endOfWindow = latestWindow();
Iterator<Ticker> iterator = orderedTickers.iterator();
while(iterator.hasNext()) {
Ticker next = iterator.next();
if (next.getTimestamp().before(endOfWindow)) {
iterator.remove();
}
// The collection is sorted by time so if we get here we can break.
break;
}
}
private Date latestWindow() {
return new Date(getLatest().getTimestamp().getTime() - windowMilis);
}
private static int tickerTimeComparator(Ticker t1, Ticker t2) {
return t1.getTimestamp().compareTo(t2.getTimestamp());
}
}