Apache Kafka - 实现 KTable
Apache Kafka - Implementing a KTable
我是 Kafka Streams 的新手 API,我正在尝试创建一个 KTable。我有一个输入主题:s-order-topic
,是一个json格式的消息,如下图
{ "current_ts": "2019-12-24 13:16:40.316952",
"primary_keys": ["ID"],
"before": null,
"tokens": {"txid":"3.17.2493",
"csn":"64913009"},
"op_type":"I",
"after": { "CODE":"AAAA41",
"STATUS":"COMPLETED",
"ID":24},
"op_ts":"2019-12-24 13:16:40.316941",
"table":"S_ORDER"}
我阅读了来自该主题的消息,我想创建一个 KTable,其中包含 key,字段 "after":"ID"
对于 value "after"
字段内的所有字段("ID"
除外)。
只有当我使用默认的聚合函数即计数时,我才成功创建了一个 KTable。但是我很难创建自己的聚合函数。下面我展示了我尝试创建 KTable 的代码部分。
KTable<Long, String> s_table = builder.stream("s-order-topic", Consumed.with(Serdes.Long(),Serdes.String()))
.mapValues(value -> {
String time;
JSONObject json = new JSONObject(value);
if (json.getString("op_type").equals("I")) {
time = "after";
}else {
time = "before";
}
JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
return json2.toString();
})
.groupBy((key, value) -> {
JSONObject json = new JSONObject(value);
return json.getLong("ID");
}, Grouped.with(Serdes.Long(), Serdes.String()))
.aggregate( ... );
如何实现这个 KTable?
我是否正确地处理了问题?
(mapValues -> 仅保留 "before"/"after" 字段。groupBy -> 将 ID 设为消息的键。聚合 -> ? )
我找到了解决方案。我实现了如下所示的 KTable:
KTable<String, String> s_table = builder.stream("s-order-topic", Consumed.with(Serdes.String(),Serdes.String()))
.mapValues(value -> {
String time;
JSONObject json = new JSONObject(value);
if (json.getString("op_type").equals("I")) {
time = "after";
}else {
time = "before";
}
JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
return json2.toString();
})
.groupBy((key, value) -> {
JSONObject json = new JSONObject(value);
return String.valueOf(json.getLong("ID"));
}, Grouped.with(Serdes.String(), Serdes.String()))
.reduce((prev,newval)->newval);
aggregate
函数不适合这种情况,我改用了reduce
函数。
控制台消费者的输出如下所示:
15 {"CODE":"AAAA17","STATUS":"PENDING","ID":15}
18 {"CODE":"AAAA50","STATUS":"SUBMITTED","ID":18}
4 {"CODE":"AAAA80","STATUS":"SUBMITTED","ID":4}
19 {"CODE":"AAAA83","STATUS":"SUBMITTED","ID":19}
18 {"CODE":"AAAA33","STATUS":"COMPLETED","ID":18}
5 {"CODE":"AAAA38","STATUS":"PENDING","ID":5}
10 {"CODE":"AAAA1","STATUS":"COMPLETED","ID":10}
3 {"CODE":"AAAA68","STATUS":"NOT COMPLETED","ID":3}
9 {"CODE":"AAAA89","STATUS":"PENDING","ID":9}
我是 Kafka Streams 的新手 API,我正在尝试创建一个 KTable。我有一个输入主题:s-order-topic
,是一个json格式的消息,如下图
{ "current_ts": "2019-12-24 13:16:40.316952",
"primary_keys": ["ID"],
"before": null,
"tokens": {"txid":"3.17.2493",
"csn":"64913009"},
"op_type":"I",
"after": { "CODE":"AAAA41",
"STATUS":"COMPLETED",
"ID":24},
"op_ts":"2019-12-24 13:16:40.316941",
"table":"S_ORDER"}
我阅读了来自该主题的消息,我想创建一个 KTable,其中包含 key,字段 "after":"ID"
对于 value "after"
字段内的所有字段("ID"
除外)。
只有当我使用默认的聚合函数即计数时,我才成功创建了一个 KTable。但是我很难创建自己的聚合函数。下面我展示了我尝试创建 KTable 的代码部分。
KTable<Long, String> s_table = builder.stream("s-order-topic", Consumed.with(Serdes.Long(),Serdes.String()))
.mapValues(value -> {
String time;
JSONObject json = new JSONObject(value);
if (json.getString("op_type").equals("I")) {
time = "after";
}else {
time = "before";
}
JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
return json2.toString();
})
.groupBy((key, value) -> {
JSONObject json = new JSONObject(value);
return json.getLong("ID");
}, Grouped.with(Serdes.Long(), Serdes.String()))
.aggregate( ... );
如何实现这个 KTable?
我是否正确地处理了问题?
(mapValues -> 仅保留 "before"/"after" 字段。groupBy -> 将 ID 设为消息的键。聚合 -> ? )
我找到了解决方案。我实现了如下所示的 KTable:
KTable<String, String> s_table = builder.stream("s-order-topic", Consumed.with(Serdes.String(),Serdes.String()))
.mapValues(value -> {
String time;
JSONObject json = new JSONObject(value);
if (json.getString("op_type").equals("I")) {
time = "after";
}else {
time = "before";
}
JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
return json2.toString();
})
.groupBy((key, value) -> {
JSONObject json = new JSONObject(value);
return String.valueOf(json.getLong("ID"));
}, Grouped.with(Serdes.String(), Serdes.String()))
.reduce((prev,newval)->newval);
aggregate
函数不适合这种情况,我改用了reduce
函数。
控制台消费者的输出如下所示:
15 {"CODE":"AAAA17","STATUS":"PENDING","ID":15}
18 {"CODE":"AAAA50","STATUS":"SUBMITTED","ID":18}
4 {"CODE":"AAAA80","STATUS":"SUBMITTED","ID":4}
19 {"CODE":"AAAA83","STATUS":"SUBMITTED","ID":19}
18 {"CODE":"AAAA33","STATUS":"COMPLETED","ID":18}
5 {"CODE":"AAAA38","STATUS":"PENDING","ID":5}
10 {"CODE":"AAAA1","STATUS":"COMPLETED","ID":10}
3 {"CODE":"AAAA68","STATUS":"NOT COMPLETED","ID":3}
9 {"CODE":"AAAA89","STATUS":"PENDING","ID":9}