如何在kafka流的键中使用几个字段,加入流和table?
How to use few fields in a key of kafka stream, joining stream and table?
我有来自主题的优惠券信息流:
CREATE STREAM personal_coupons
(Coupon VARCHAR KEY,
CouponType VARCHAR,
MarketingArea VARCHAR,
CouponCode VARCHAR,
CouponName VARCHAR) WITH
(KAFKA_TOPIC = 'Coupons_Personal',
VALUE_FORMAT = 'JSON');
我 table 有两个字段 - 优惠券和 GUID
CREATE TABLE coupon_and_guid(Coupon varchar PRIMARY KEY,
guid varchar)
WITH (KAFKA_TOPIC = 'Coupon_GUID',VALUE_FORMAT = 'JSON');
我尝试加入:
CREATE STREAM coupon_with_guid WITH (KEY_FORMAT = 'JSON', VALUE_FORMAT = 'JSON') AS
SELECT
personal_coupons.Coupon,
COUPON_AND_GUID.guid,
CouponType,
MarketingArea,
AS_VALUE(personal_coupons.Coupon),
CouponCode,
CouponName
FROM personal_coupons
LEFT JOIN coupon_and_guid ON personal_coupons.Coupon = coupon_and_guid.coupon
PARTITION BY personal_coupons.Coupon,COUPON_AND_GUID.guid EMIT CHANGES;
我收到了格式为:
的消息
key: {"PERSONAL_COUPONS_COUPON":"{\"Coupon\":\"1-2NAZTM69\"}","GUID":null}
value: {"COUPONTYPE":"MULTI","COUPONCONTACTRELATIONSHIPTYPE":"03","MARKETINGAREA":"VKUSOMANIA","KSQL_COL_0":"{\"Coupon\":\"1-2NAZTM69\"}","COUPONORIGIN":"Siebel","COUPONSTATUS":"01","LANGUAGE":"RU","COUPONCODE":"9001196300379670","COUPONNAME":"1000275479000214"}
但我想得到:
key: {"Coupon":"1-2NAZTM69","GUID":null}
value: {"COUPONTYPE":"MULTI","COUPONCONTACTRELATIONSHIPTYPE":"03","MARKETINGAREA":"VKUSOMANIA","Coupon":"1-2NAZTM69","COUPONORIGIN":"Siebel","COUPONSTATUS":"01","LANGUAGE":"RU","COUPONCODE":"9001196300379670","COUPONNAME":"1000275479000214"}
我做错了什么,我该如何解决?
我找到了这个解决方案 - 使用 EXTRACTJSONFIELD ksql 函数
CREATE STREAM coupon_with_guid WITH (KEY_FORMAT = 'JSON', VALUE_FORMAT = 'JSON') AS
SELECT
EXTRACTJSONFIELD(personal_coupons.Coupon, '$.Coupon') as Coupon,
COUPON_AND_GUID.guid,
CouponType,
MarketingArea,
AS_VALUE(personal_coupons.Coupon),
CouponCode,
CouponName
FROM personal_coupons
LEFT JOIN coupon_and_guid ON personal_coupons.Coupon = coupon_and_guid.coupon
PARTITION BY EXTRACTJSONFIELD(personal_coupons.Coupon, '$.Coupon'),COUPON_AND_GUID.guid EMIT CHANGES;
我有来自主题的优惠券信息流:
CREATE STREAM personal_coupons
(Coupon VARCHAR KEY,
CouponType VARCHAR,
MarketingArea VARCHAR,
CouponCode VARCHAR,
CouponName VARCHAR) WITH
(KAFKA_TOPIC = 'Coupons_Personal',
VALUE_FORMAT = 'JSON');
我 table 有两个字段 - 优惠券和 GUID
CREATE TABLE coupon_and_guid(Coupon varchar PRIMARY KEY,
guid varchar)
WITH (KAFKA_TOPIC = 'Coupon_GUID',VALUE_FORMAT = 'JSON');
我尝试加入:
CREATE STREAM coupon_with_guid WITH (KEY_FORMAT = 'JSON', VALUE_FORMAT = 'JSON') AS
SELECT
personal_coupons.Coupon,
COUPON_AND_GUID.guid,
CouponType,
MarketingArea,
AS_VALUE(personal_coupons.Coupon),
CouponCode,
CouponName
FROM personal_coupons
LEFT JOIN coupon_and_guid ON personal_coupons.Coupon = coupon_and_guid.coupon
PARTITION BY personal_coupons.Coupon,COUPON_AND_GUID.guid EMIT CHANGES;
我收到了格式为:
的消息key: {"PERSONAL_COUPONS_COUPON":"{\"Coupon\":\"1-2NAZTM69\"}","GUID":null}
value: {"COUPONTYPE":"MULTI","COUPONCONTACTRELATIONSHIPTYPE":"03","MARKETINGAREA":"VKUSOMANIA","KSQL_COL_0":"{\"Coupon\":\"1-2NAZTM69\"}","COUPONORIGIN":"Siebel","COUPONSTATUS":"01","LANGUAGE":"RU","COUPONCODE":"9001196300379670","COUPONNAME":"1000275479000214"}
但我想得到:
key: {"Coupon":"1-2NAZTM69","GUID":null}
value: {"COUPONTYPE":"MULTI","COUPONCONTACTRELATIONSHIPTYPE":"03","MARKETINGAREA":"VKUSOMANIA","Coupon":"1-2NAZTM69","COUPONORIGIN":"Siebel","COUPONSTATUS":"01","LANGUAGE":"RU","COUPONCODE":"9001196300379670","COUPONNAME":"1000275479000214"}
我做错了什么,我该如何解决?
我找到了这个解决方案 - 使用 EXTRACTJSONFIELD ksql 函数
CREATE STREAM coupon_with_guid WITH (KEY_FORMAT = 'JSON', VALUE_FORMAT = 'JSON') AS
SELECT
EXTRACTJSONFIELD(personal_coupons.Coupon, '$.Coupon') as Coupon,
COUPON_AND_GUID.guid,
CouponType,
MarketingArea,
AS_VALUE(personal_coupons.Coupon),
CouponCode,
CouponName
FROM personal_coupons
LEFT JOIN coupon_and_guid ON personal_coupons.Coupon = coupon_and_guid.coupon
PARTITION BY EXTRACTJSONFIELD(personal_coupons.Coupon, '$.Coupon'),COUPON_AND_GUID.guid EMIT CHANGES;