我如何使用 kafka 流 window 为烛台图表生成生成一条记录

How could i use the kafka stream window to gerate one record for the Candlestick chart generation

我必须使用Kafka Stream从交易结果主题中获取交易信息以绘制每个特定持续时间的烛台图表,它有交易ID,金额,价格,交易时间,关键是交易ID,这是每条记录都完全不同, 我想做的是根据交易结果进行计算以获得 每个持续时间的最高价、最低价、开盘价、收盘价、tx close_time 并使用它来创建烛台图表。 我已经使用 kafka 流 window 来做到这一点:

final KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> transactionKStream = builder.stream(keySerde, valueSerde, srcTopicName);
KTable<Windowed<String>, InfoRecord> kTableRecords= groupedStream.aggregate(
 InfoRecord::new, /* initializer */
 (k, v, aggregate) -> aggregate.add(k,v), /* adder */
 TimeWindows.of(TimeUnit.SECONDS.toMillis(5)).until(TimeUnit.SECONDS.toMillis(5)),
 infoRecordSerde);

和source topic一样,每条记录都以txId为key,txId永远不会重复,所以聚合时,结果K-table会和K-有相同的记录流,但我可以使用 window 获取所有记录 具体时长。

我认为 kTableRecords 应该包含特定持续时间内的所有记录,即 5 秒, 因此,我可以在 5 秒内遍历所有记录,以获得最高价、最低价、开盘价(window 中的第一个记录价格)、收盘价([=67= 中的最后一个记录价格) ]), close_time (window 中最后一条记录的发送时间), 这样我就只会得到这个 window 的一条记录并将这个结果输出到一个 sink kafka 主题,但我不知道如何在这些 window 持续时间内做到这一点。

我想代码应该是这样的:

kTableRecords.foreach((键, 值) -> {

// TODO:在此处添加逻辑

})

IDE 显示此 foreach 已被弃用,

但我不知道如何区分这个 window 或下一个 window 中的记录 或者我需要一个 window 记录保留时间使用,直到在上面的示例代码中。

我在这方面挣扎了好几天,我仍然不知道完成我的工作的正确方法,感谢任何人的帮助让我走上正确的道路,谢谢

kafka版本为:0.11.0.0

更新:

根据 Michal 在他 post 中的提示,我更改了我的代码,并执行了 聚合器实例中的最高价、最低价、开盘价、收盘价计算, 但结果让我意识到特定的每个不同的键 window,逻辑为键创建一个新实例,并且只对当前键执行添加操作,不与其他键的值交互, 我真正想要的是计算 每条记录的高价、低价、开盘价、收盘价,其中包含不同的键 window 持续时间,所以我需要的不是为每个键创建一个新实例, 它应该只为每个特定的 window 创建一个聚合实例 并对持续时间中的所有记录值进行计算,每个持续时间 window 得到一组(最高价、最低价、开盘价、收盘价)。 我已阅读主题: 如何在连续增加的时间 windows 上计算 windowed 聚合? 所以,我怀疑,我不确定,如果这对我来说是正确的解决方案,谢谢。

顺便说一句,K线是指烛台图。


更新二:

根据您的更新,我创建了如下所示的代码:

KStream<String, JsonNode> transactionKStream = builder.stream(keySerde, valueSerde, srcTopicName);

KGroupedStream<String, JsonNode> groupedStream = transactionKStream.groupBy((k,v)-> "constkey", keySerde, valueSerde);

KTable<Windowed<String>, MarketInfoRecord> kTable =
        groupedStream.aggregate(
        MarketInfoRecord::new, /* initializer */
        (k, v, aggregate) -> aggregate.add(k,v), /* adder */
        TimeWindows.of(TimeUnit.SECONDS.toMillis(100)).until(TimeUnit.SECONDS.toMillis(100)),
        infoRecordSerde, "test-state-store");

KStream<String, MarketInfoRecord> newS = kTable.toStream().map(
        (k,v) -> {
            System.out.println("key: "+k+",  value:"+v);
            return KeyValue.pair(k.window().start() + "_" + k.window().end(), v);

        }

);

newS.to(Serdes.String(),infoRecordSerde, "OUTPUT_NEW_RESULT");

如果我在进行分组时使用静态字符串作为键,那么在进行 windowed 聚合时肯定会, 只为 window 创建了一个聚合器实例,我们可以得到 (high, low, open, close) 对于 window 中的所有记录,但是 作为所有记录的相同键,这个 window 将被更新多次,并为一个 window 产生多条记录,如:

key: [constkey@1521304400000/1521304500000],  value:MarketInfoRecord{high=11, low=11, openTime=1521304432205, closeTime=1521304432205, open=11, close=11, count=1}
key: [constkey@1521304600000/1521304700000],  value:MarketInfoRecord{high=44, low=44, openTime=1521304622655, closeTime=1521304622655, open=44, close=44, count=1}
key: [constkey@1521304600000/1521304700000],  value:MarketInfoRecord{high=44, low=33, openTime=1521304604182, closeTime=1521304622655, open=33, close=44, count=2}
key: [constkey@1521304400000/1521304500000],  value:MarketInfoRecord{high=22, low=22, openTime=1521304440887, closeTime=1521304440887, open=22, close=22, count=1}
key: [constkey@1521304600000/1521304700000],  value:MarketInfoRecord{high=55, low=55, openTime=1521304629943, closeTime=1521304629943, open=55, close=55, count=1}
key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=77, low=77, openTime=1521304827181, closeTime=1521304827181, open=77, close=77, count=1}
key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=77, low=66, openTime=1521304817079, closeTime=1521304827181, open=66, close=77, count=2}
key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=88, low=66, openTime=1521304817079, closeTime=1521304839047, open=66, close=88, count=3}
key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=99, low=66, openTime=1521304817079, closeTime=1521304848350, open=66, close=99, count=4}
key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=100.0, low=66, openTime=1521304817079, closeTime=1521304862006, open=66, close=100.0, count=5}

所以我们需要按照“38945277/7897191”中描述的 posted link 进行重复数据删除,对吗?

所以,我想知道我是否可以做类似的事情:

KGroupedStream<String, JsonNode> groupedStream = transactionKStream.groupByKey();
// as key was unique txId, so this group is just for doing next window operation, the record number is not changed.

KTable<Windowed<String>, MarketInfoRecord> kTable =
   groupedStream.SOME_METHOD(
// just use some method to deliver the records in different windows,
// no sure if this is possible?
TimeWindows.of(TimeUnit.SECONDS.toMillis(100)).until(TimeUnit.SECONDS.toMillis(100))
// use until here to let the record purged if out of the window, 
// please correct me if i am wrong?

我们可以将基于时间的输入记录序列转换为多个 windowed 组, 每个组都有 window(或使用 window 开始时间、结束时间组合为字符串键), 所以,对于每个组,键是不同的,但是它有几个具有不同值的记录, 然后我们做聚合(这里不需要使用windowed聚合),值已经计算出来,并且 从每个 key:value 对,即 ,我们可以得到一个结果记录, 而下一个 window 具有不同的 windowBased Key 名称,所以这样,执行下游应该有多个线程( 随着密钥的变化)

我建议您不在 foreach 中而是直接在您的聚合器中,即在加法器中进行您提到的所有计算:

(k, v, aggregate) -> aggregate.add(k,v), /* adder */

add 方法可以完成你提到的所有事情(我建议你先将 JsonNode 映射到一个 Java 对象,我们称之为 Transaction),考虑这个 pseudo-code:

private int low = Integer.MAX; // whatever type you use to represent prices
private int high = Integer.MIN;
private long openTime = Long.MAX; // whatever type you use to represent time
private long closeTime = Long.MIN;
...
public InfoRecord add(String key, Transaction tx) {
  if(tx.getPrice() > this.high) this.high = tx.getPrice();
  if(tx.getPrice() < this.low) this.low = tx.getPrice();
  if(tx.getTime() < this.openTime) {
    this.openTime = tx.getTime();
    this.open = tx.getPrice();
  }
  if(tx.getTime() > this.closeTime) {
    this.closeTime = tx.getTime();
    this.close = tx.getPrice();
  }
  return this;
}

请记住,您实际上可能会在每个 window 的输出中获得不止一条记录,因为 windows 可以更新多次(它们永远不会是最终的),如中所述此处有更多详细信息:

我不知道 K-line 是什么,但如果你想要多个 windows 持续时间增加,模式概述 here

更新: 要聚合 window 中的所有记录,只需在进行聚合之前将键更改为某个静态值。因此,要创建分组流,您可以使用 groupBy(KeyValueMapper),例如:

KGroupedStream<String, JsonNode> groupedStream = transactionKStream.groupBy( (k, v) -> ""); // give all records the same key (empty string)

请注意,这将导致重新分区(因为分区是由键决定的,而我们正在更改键)并且下游执行将变为单线程(因为现在只有一个分区)。