内部连接流的 KSQLDB 右端始终为空
KSQLDB right end side of inner joined streams is always null
运行 KSQLDB 版本:0.12.0
我在加入流时遇到问题。
当创建流作为窗口化内部连接查询时,右侧字段全部为空,即使是连接条件的一部分。
当 运行 独立进行相同的查询时,我可以正常获取字段。
设置如下:
运行 服务器 docker-compose:
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.12.0
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: mybroker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://myregistry:8081
KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: 'earliest'
流和连接定义
CREATE STREAM generic_orders (
...
clientrequestid_ string KEY,
...
) WITH (
KAFKA_TOPIC='thetopic',
VALUE_FORMAT='AVRO'
);
CREATE STREAM specific_order (
originatoruserid_ string,
request_clientid_ bigint,
bidquoteinfo_volume_ bigint
) WITH (
KAFKA_TOPIC='anothertopic',
VALUE_FORMAT='AVRO'
);
加入上面两个就不行了,我也试过把request_clientid_
设为key。
我怀疑它不会工作,因为 clientrequestid_
和 request_clientid_
的类型不同所以我创建了另一个流:
CREATE STREAM specific_order_typed AS
select originatoruserid_ ,
CAST(request_clientid_ AS string) as request_clientid_ KEY,
bidquoteinfo_volume_
FROM ice_massquote_order
EMIT CHANGES
;
这没有帮助...
这是加入的流:
CREATE STREAM enriched_orders AS
SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq FROM specific_order_typed i
INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_
EMIT CHANGES;
还尝试交换来自流和连接流...结果始终为空:
|623562762 |null
...
当运行查询直接虽然我得到了我期望的
SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq FROM specific_order_typed i
INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_
EMIT CHANGES;
|623562762 |623562762
有人知道发生了什么事吗?我已经用尽了我所有的想法
所以我通过降级到 0.10.2 解决了我的问题。
我注意到的第一个区别是,当尝试加入原始流 generic_orders
和 specific_order
时,我在 string
和 [=15] 之间的比较中得到了正确的错误=]
CREATE STREAM generic_orders (
...
clientrequestid_ string,
...
) WITH (
KAFKA_TOPIC='thetopic',
VALUE_FORMAT='AVRO'
);
CREATE STREAM specific_order (
originatoruserid_ string,
request_clientid_ bigint,
bidquoteinfo_volume_ bigint
) WITH (
KAFKA_TOPIC='anothertopic',
VALUE_FORMAT='AVRO'
);
然后我只需要将 bigint 转换为字符串(我之前也尝试过使用 0.12):
CREATE STREAM enriched_orders AS
SELECT * FROM specific_order_typed i
INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = CAST(i.request_clientid_ AS String)
EMIT CHANGES;
最后,在读取该流时,值不再为空。
运行 KSQLDB 版本:0.12.0
我在加入流时遇到问题。
当创建流作为窗口化内部连接查询时,右侧字段全部为空,即使是连接条件的一部分。 当 运行 独立进行相同的查询时,我可以正常获取字段。
设置如下:
运行 服务器 docker-compose:
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.12.0
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: mybroker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://myregistry:8081
KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: 'earliest'
流和连接定义
CREATE STREAM generic_orders (
...
clientrequestid_ string KEY,
...
) WITH (
KAFKA_TOPIC='thetopic',
VALUE_FORMAT='AVRO'
);
CREATE STREAM specific_order (
originatoruserid_ string,
request_clientid_ bigint,
bidquoteinfo_volume_ bigint
) WITH (
KAFKA_TOPIC='anothertopic',
VALUE_FORMAT='AVRO'
);
加入上面两个就不行了,我也试过把request_clientid_
设为key。
我怀疑它不会工作,因为 clientrequestid_
和 request_clientid_
的类型不同所以我创建了另一个流:
CREATE STREAM specific_order_typed AS
select originatoruserid_ ,
CAST(request_clientid_ AS string) as request_clientid_ KEY,
bidquoteinfo_volume_
FROM ice_massquote_order
EMIT CHANGES
;
这没有帮助...
这是加入的流:
CREATE STREAM enriched_orders AS
SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq FROM specific_order_typed i
INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_
EMIT CHANGES;
还尝试交换来自流和连接流...结果始终为空:
|623562762 |null
...
当运行查询直接虽然我得到了我期望的
SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq FROM specific_order_typed i
INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_
EMIT CHANGES;
|623562762 |623562762
有人知道发生了什么事吗?我已经用尽了我所有的想法
所以我通过降级到 0.10.2 解决了我的问题。
我注意到的第一个区别是,当尝试加入原始流 generic_orders
和 specific_order
时,我在 string
和 [=15] 之间的比较中得到了正确的错误=]
CREATE STREAM generic_orders (
...
clientrequestid_ string,
...
) WITH (
KAFKA_TOPIC='thetopic',
VALUE_FORMAT='AVRO'
);
CREATE STREAM specific_order (
originatoruserid_ string,
request_clientid_ bigint,
bidquoteinfo_volume_ bigint
) WITH (
KAFKA_TOPIC='anothertopic',
VALUE_FORMAT='AVRO'
);
然后我只需要将 bigint 转换为字符串(我之前也尝试过使用 0.12):
CREATE STREAM enriched_orders AS
SELECT * FROM specific_order_typed i
INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = CAST(i.request_clientid_ AS String)
EMIT CHANGES;
最后,在读取该流时,值不再为空。