如何将所有消息从一个kafka主题(avro格式)复制并转换到另一个主题(json格式)
How to copy and transform all messages from one kafka topic (in avro format) to another topic (in json format)
我的团队正在使用 Kafka Confluent(企业版),我们对 kafka 还很陌生。
我们有 2 个主题 A 和 B。
主题 A 将接收所有 json 消息并由 Avro Schema 格式化(使用 URL 到模式注册表)。
由于某些原因,我们使用的开发工具不支持接收来自主题A的avro格式的消息。我们创建了Topic B,想用KsqlDB把topic A的所有消息复制到topic B,同时把所有的消息从avro格式转成普通的json格式,这样我们就可以开发一个组件,可以提取json 来自主题 B 的消息。
你能告诉我创建 ksql 流的代码吗?
注册入站 Avro 数据流
CREATE STREAM MY_AVRO_SOURCE
WITH (KAFKA_TOPIC='my_source_topic', FORMAT='AVRO');
告诉 ksqlDB 从主题开头读取所有消息
SET 'auto.offset.reset' = 'earliest';
将所有消息写入 JSON
中的新流
CREATE STREAM MY_JSON_TARGET
WITH (FORMAT='JSON')
AS SELECT * FROM MY_AVRO_SOURCE;
默认情况下,这将填充一个名为 MY_JSON_TARGET
的目标主题;如果要使用不同的目标主题名称,可以在 WITH
子句中指定 KAFKA_TOPIC
。
如果只有源中的值(不是键)是 Avro,则在 WHERE
子句中使用 VALUE_FORMAT
而不是 FORMAT
。
参考:https://docs.ksqldb.io/en/latest/reference/serialization/
我的团队正在使用 Kafka Confluent(企业版),我们对 kafka 还很陌生。
我们有 2 个主题 A 和 B。
主题 A 将接收所有 json 消息并由 Avro Schema 格式化(使用 URL 到模式注册表)。
由于某些原因,我们使用的开发工具不支持接收来自主题A的avro格式的消息。我们创建了Topic B,想用KsqlDB把topic A的所有消息复制到topic B,同时把所有的消息从avro格式转成普通的json格式,这样我们就可以开发一个组件,可以提取json 来自主题 B 的消息。
你能告诉我创建 ksql 流的代码吗?
注册入站 Avro 数据流
CREATE STREAM MY_AVRO_SOURCE
WITH (KAFKA_TOPIC='my_source_topic', FORMAT='AVRO');
告诉 ksqlDB 从主题开头读取所有消息
SET 'auto.offset.reset' = 'earliest';
将所有消息写入 JSON
中的新流CREATE STREAM MY_JSON_TARGET
WITH (FORMAT='JSON')
AS SELECT * FROM MY_AVRO_SOURCE;
默认情况下,这将填充一个名为 MY_JSON_TARGET
的目标主题;如果要使用不同的目标主题名称,可以在 WITH
子句中指定 KAFKA_TOPIC
。
如果只有源中的值(不是键)是 Avro,则在 WHERE
子句中使用 VALUE_FORMAT
而不是 FORMAT
。
参考:https://docs.ksqldb.io/en/latest/reference/serialization/