KSQL 在结构字段上加入带有条件的流
KSQL Join streams with condition on struct field
我定义了两个流,每个流都来自一个主题,在该主题上发布了 JSON 消息,有点像这样:
{"payload": {"some_id": "123"}}
他们对应的流是这样定义的:
CREATE STREAM mystream
(payload STRUCT <someid varchar>)
WITH (kafka_topic='mytopic', value_format='JSON')
当我尝试将两个流合并在一起时:
SELECT
s.payload->some_id,
o.payload->other_id
FROM mystream s
LEFT JOIN otherstream o ON s.payload->some_id = o.payload->other_id;
我收到以下错误:
Invalid comparison expression 'S.PAYLOAD->SOME_ID'
in join '(S.PAYLOAD->SOME_ID = O.PAYLOAD->OTHER_ID)'.
Joins must only contain a field comparison.
是否不能基于结构字段连接两个流?在执行 JOIN 之前,我是否首先需要发布一个流来展平每个源流?
正确,目前无法做到这一点。请随意在此处对跟踪它的问题进行投票:https://github.com/confluentinc/ksql/issues/4051
如您所说,解决方法是先在另一个流中将其展平,然后加入。
我定义了两个流,每个流都来自一个主题,在该主题上发布了 JSON 消息,有点像这样:
{"payload": {"some_id": "123"}}
他们对应的流是这样定义的:
CREATE STREAM mystream
(payload STRUCT <someid varchar>)
WITH (kafka_topic='mytopic', value_format='JSON')
当我尝试将两个流合并在一起时:
SELECT
s.payload->some_id,
o.payload->other_id
FROM mystream s
LEFT JOIN otherstream o ON s.payload->some_id = o.payload->other_id;
我收到以下错误:
Invalid comparison expression 'S.PAYLOAD->SOME_ID'
in join '(S.PAYLOAD->SOME_ID = O.PAYLOAD->OTHER_ID)'.
Joins must only contain a field comparison.
是否不能基于结构字段连接两个流?在执行 JOIN 之前,我是否首先需要发布一个流来展平每个源流?
正确,目前无法做到这一点。请随意在此处对跟踪它的问题进行投票:https://github.com/confluentinc/ksql/issues/4051
如您所说,解决方法是先在另一个流中将其展平,然后加入。