如何使用 KSQL KTable 正确实现窗口化滚动平均值?
How can I implement a windowed rolling average with KSQL KTable correctly?
我正在尝试在 KSQL 中实现成交量滚动平均值。
Kafka 目前正在将生产者的数据提取到主题 "KLINES" 中。此数据以一致的格式跨越多个市场。然后我从该数据创建一个流,如下所示:
CREATE STREAM KLINESTREAM (market VARCHAR, open DOUBLE, high DOUBLE, low DOUBLE, close DOUBLE, volume DOUBLE, start_time BIGINT, close_time BIGINT, event_time BIGINT) \
WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='KLINES', TIMESTAMP='event_time', KEY='market');
然后我创建一个 table 来计算每个市场在过去 20 分钟内的平均交易量,如下所示:
CREATE TABLE AVERAGE_VOLUME_TABLE_BY_MARKET AS \
SELECT CEIL(SUM(volume) / COUNT(*)) AS volume_avg, market FROM KLINESTREAM \
WINDOW HOPPING (SIZE 20 MINUTES, ADVANCE BY 5 SECONDS) \
GROUP BY market;
SELECT * FROM AVERAGE_VOLUME_TABLE_BY_MARKET LIMIT 1;
为清楚起见,生成:
1560647412620 | EXAMPLEMARKET : Window{start=1560647410000 end=-} | 44.0 | EXAMPLEMARKET
我想要的是一个 KSQL Table,它将代表每个市场的当前 "kline" 状态,同时还包括在 [=48= 中计算的滚动平均交易量] KTable 所以我可以在当前交易量和平均滚动量之间进行分析
我试过这样加入:
SELECT K.market, K.open, K.high, K.low, K.close, K.volume, V.volume_avg \
FROM KLINESTREAM K \
LEFT JOIN AVERAGE_VOLUME_TABLE_BY_MARKET V \
ON K.market = V.market;
但显然这会导致错误,因为 "AVERAGE_VOLUME_TABLE_BY_MARKET" 键包括 TimeWindow 和市场。
A serializer (key:
org.apache.kafka.streams.kstream.TimeWindowedSerializer) is not compatible to
the actual key type (key type: java.lang.String). Change the default Serdes in
StreamConfig or provide correct Serdes via method parameters.
我是否正确地解决了这个问题?
我要实现的是:
Windowed Aggregate KTable + Kline Stream ->
KTable representing current market state
including average volume from the KTable
它显示了 KSQL 中可能的当前市场状态。还是我必须使用 KStreams 或其他库来完成此操作?
这里有一个很棒的聚合示例:https://www.confluent.io/stream-processing-cookbook/ksql-recipes/aggregating-data
适用于此示例,当新数据到达 KSQL Table 时,我将如何使用聚合与新数据进行比较?
干杯,詹姆斯
我相信你要找的可能是LATEST_BY_OFFSET:
CREATE TABLE AVERAGE_VOLUME_TABLE_BY_MARKET AS
SELECT
market,
LATEST_BY_OFFSET(volume) AS volume,
CEIL(SUM(volume) / COUNT(*)) AS volume_avg
FROM KLINESTREAM
WINDOW HOPPING (SIZE 20 MINUTES, ADVANCE BY 5 SECONDS)
GROUP BY market;
我正在尝试在 KSQL 中实现成交量滚动平均值。
Kafka 目前正在将生产者的数据提取到主题 "KLINES" 中。此数据以一致的格式跨越多个市场。然后我从该数据创建一个流,如下所示:
CREATE STREAM KLINESTREAM (market VARCHAR, open DOUBLE, high DOUBLE, low DOUBLE, close DOUBLE, volume DOUBLE, start_time BIGINT, close_time BIGINT, event_time BIGINT) \
WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='KLINES', TIMESTAMP='event_time', KEY='market');
然后我创建一个 table 来计算每个市场在过去 20 分钟内的平均交易量,如下所示:
CREATE TABLE AVERAGE_VOLUME_TABLE_BY_MARKET AS \
SELECT CEIL(SUM(volume) / COUNT(*)) AS volume_avg, market FROM KLINESTREAM \
WINDOW HOPPING (SIZE 20 MINUTES, ADVANCE BY 5 SECONDS) \
GROUP BY market;
SELECT * FROM AVERAGE_VOLUME_TABLE_BY_MARKET LIMIT 1;
为清楚起见,生成:
1560647412620 | EXAMPLEMARKET : Window{start=1560647410000 end=-} | 44.0 | EXAMPLEMARKET
我想要的是一个 KSQL Table,它将代表每个市场的当前 "kline" 状态,同时还包括在 [=48= 中计算的滚动平均交易量] KTable 所以我可以在当前交易量和平均滚动量之间进行分析
我试过这样加入:
SELECT K.market, K.open, K.high, K.low, K.close, K.volume, V.volume_avg \
FROM KLINESTREAM K \
LEFT JOIN AVERAGE_VOLUME_TABLE_BY_MARKET V \
ON K.market = V.market;
但显然这会导致错误,因为 "AVERAGE_VOLUME_TABLE_BY_MARKET" 键包括 TimeWindow 和市场。
A serializer (key:
org.apache.kafka.streams.kstream.TimeWindowedSerializer) is not compatible to
the actual key type (key type: java.lang.String). Change the default Serdes in
StreamConfig or provide correct Serdes via method parameters.
我是否正确地解决了这个问题?
我要实现的是:
Windowed Aggregate KTable + Kline Stream ->
KTable representing current market state
including average volume from the KTable
它显示了 KSQL 中可能的当前市场状态。还是我必须使用 KStreams 或其他库来完成此操作?
这里有一个很棒的聚合示例:https://www.confluent.io/stream-processing-cookbook/ksql-recipes/aggregating-data
适用于此示例,当新数据到达 KSQL Table 时,我将如何使用聚合与新数据进行比较?
干杯,詹姆斯
我相信你要找的可能是LATEST_BY_OFFSET:
CREATE TABLE AVERAGE_VOLUME_TABLE_BY_MARKET AS
SELECT
market,
LATEST_BY_OFFSET(volume) AS volume,
CEIL(SUM(volume) / COUNT(*)) AS volume_avg
FROM KLINESTREAM
WINDOW HOPPING (SIZE 20 MINUTES, ADVANCE BY 5 SECONDS)
GROUP BY market;