如何在kafka流的键中使用几个字段,加入流和table?

How to use few fields in a key of kafka stream, joining stream and table?

我有来自主题的优惠券信息流:

CREATE STREAM personal_coupons
(Coupon VARCHAR KEY,
CouponType VARCHAR,
MarketingArea VARCHAR,
CouponCode VARCHAR,
CouponName VARCHAR) WITH
(KAFKA_TOPIC = 'Coupons_Personal',
VALUE_FORMAT = 'JSON');

我 table 有两个字段 - 优惠券和 GUID

CREATE TABLE coupon_and_guid(Coupon varchar PRIMARY KEY,
guid varchar)
WITH (KAFKA_TOPIC = 'Coupon_GUID',VALUE_FORMAT = 'JSON');

我尝试加入:

CREATE STREAM coupon_with_guid WITH (KEY_FORMAT = 'JSON', VALUE_FORMAT = 'JSON') AS
SELECT
personal_coupons.Coupon,
COUPON_AND_GUID.guid,
CouponType,
MarketingArea,
AS_VALUE(personal_coupons.Coupon),
CouponCode,
CouponName
FROM personal_coupons
LEFT JOIN coupon_and_guid ON personal_coupons.Coupon = coupon_and_guid.coupon
PARTITION BY personal_coupons.Coupon,COUPON_AND_GUID.guid EMIT CHANGES;

我收到了格式为:

的消息
key: {"PERSONAL_COUPONS_COUPON":"{\"Coupon\":\"1-2NAZTM69\"}","GUID":null}
value: {"COUPONTYPE":"MULTI","COUPONCONTACTRELATIONSHIPTYPE":"03","MARKETINGAREA":"VKUSOMANIA","KSQL_COL_0":"{\"Coupon\":\"1-2NAZTM69\"}","COUPONORIGIN":"Siebel","COUPONSTATUS":"01","LANGUAGE":"RU","COUPONCODE":"9001196300379670","COUPONNAME":"1000275479000214"}

但我想得到:

 key: {"Coupon":"1-2NAZTM69","GUID":null}
 value: {"COUPONTYPE":"MULTI","COUPONCONTACTRELATIONSHIPTYPE":"03","MARKETINGAREA":"VKUSOMANIA","Coupon":"1-2NAZTM69","COUPONORIGIN":"Siebel","COUPONSTATUS":"01","LANGUAGE":"RU","COUPONCODE":"9001196300379670","COUPONNAME":"1000275479000214"}

我做错了什么,我该如何解决?

我找到了这个解决方案 - 使用 EXTRACTJSONFIELD ksql 函数

CREATE STREAM coupon_with_guid WITH (KEY_FORMAT = 'JSON', VALUE_FORMAT = 'JSON') AS
SELECT
EXTRACTJSONFIELD(personal_coupons.Coupon, '$.Coupon') as Coupon,
COUPON_AND_GUID.guid,
CouponType,
MarketingArea,
AS_VALUE(personal_coupons.Coupon),
CouponCode,
CouponName
FROM personal_coupons
LEFT JOIN coupon_and_guid ON personal_coupons.Coupon = coupon_and_guid.coupon
PARTITION BY EXTRACTJSONFIELD(personal_coupons.Coupon, '$.Coupon'),COUPON_AND_GUID.guid EMIT CHANGES;