如何将所有消息从一个kafka主题(avro格式)复制并转换到另一个主题(json格式)

How to copy and transform all messages from one kafka topic (in avro format) to another topic (in json format)

我的团队正在使用 Kafka Confluent(企业版),我们对 kafka 还很陌生。
我们有 2 个主题 A 和 B。

主题 A 将接收所有 json 消息并由 Avro Schema 格式化(使用 URL 到模式注册表)。

由于某些原因,我们使用的开发工具不支持接收来自主题A的avro格式的消息。我们创建了Topic B,想用KsqlDB把topic A的所有消息复制到topic B,同时把所有的消息从avro格式转成普通的json格式,这样我们就可以开发一个组件,可以提取json 来自主题 B 的消息。

你能告诉我创建 ksql 流的代码吗?

注册入站 Avro 数据流

CREATE STREAM MY_AVRO_SOURCE
  WITH (KAFKA_TOPIC='my_source_topic', FORMAT='AVRO');

告诉 ksqlDB 从主题开头读取所有消息

SET 'auto.offset.reset' = 'earliest';

将所有消息写入 JSON

中的新流
CREATE STREAM MY_JSON_TARGET 
  WITH (FORMAT='JSON') 
  AS SELECT * FROM MY_AVRO_SOURCE;

默认情况下,这将填充一个名为 MY_JSON_TARGET 的目标主题;如果要使用不同的目标主题名称,可以在 WITH 子句中指定 KAFKA_TOPIC

如果只有源中的值(不是键)是 Avro,则在 WHERE 子句中使用 VALUE_FORMAT 而不是 FORMAT

参考:https://docs.ksqldb.io/en/latest/reference/serialization/