使用 Kafka Streams 进行自定义转换
Using Kafka Streams for Custom transformation
我一直在使用 Apache Kafka 实施 ETL
数据管道。我已经使用 Kafka Connect 进行提取和加载。
Connect 将读取源数据并以 JSON.
的形式写入可用的 Kafka 主题实际数据
在转换阶段,我想从 Kafka 主题中读取 JSON 数据,然后需要根据一些自定义业务逻辑将其转换为 SQL 查询,然后需要写入输出 Kafka主题。
截至目前,我已经编写了一个生产者-消费者应用程序,它从主题中读取并进行转换,然后写入输出主题。
是否可以使用 Kafka 流实现相同的效果 API?如果是,请提供一些样品。
查看 Kafka Streams, or KSQL。 KSQL 在 Kafka Streams 之上运行,并为您提供了一种非常简单的方法来构建您正在谈论的那种聚合。
下面是一个在 KSQL 中聚合数据流的例子
SELECT PAGE_ID,COUNT(*) FROM PAGE_CLICKS WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY PAGE_ID
查看更多信息:https://www.confluent.io/blog/using-ksql-to-analyse-query-and-transform-data-in-kafka
你可以获取 KSQL 的输出,它实际上只是一个 Kafka 主题,并通过 Kafka Connect 流式传输,例如到 Elasticsearch、Cassandra 等。
我一直在使用 Apache Kafka 实施 ETL
数据管道。我已经使用 Kafka Connect 进行提取和加载。
Connect 将读取源数据并以 JSON.
的形式写入可用的 Kafka 主题实际数据在转换阶段,我想从 Kafka 主题中读取 JSON 数据,然后需要根据一些自定义业务逻辑将其转换为 SQL 查询,然后需要写入输出 Kafka主题。
截至目前,我已经编写了一个生产者-消费者应用程序,它从主题中读取并进行转换,然后写入输出主题。
是否可以使用 Kafka 流实现相同的效果 API?如果是,请提供一些样品。
查看 Kafka Streams, or KSQL。 KSQL 在 Kafka Streams 之上运行,并为您提供了一种非常简单的方法来构建您正在谈论的那种聚合。
下面是一个在 KSQL 中聚合数据流的例子
SELECT PAGE_ID,COUNT(*) FROM PAGE_CLICKS WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY PAGE_ID
查看更多信息:https://www.confluent.io/blog/using-ksql-to-analyse-query-and-transform-data-in-kafka
你可以获取 KSQL 的输出,它实际上只是一个 Kafka 主题,并通过 Kafka Connect 流式传输,例如到 Elasticsearch、Cassandra 等。