Flink程序中如何逐行读取Kafka Topic
How to read Kafka Topic line by line in Flink program
首先,我在Kafka topic中加载了一个CSV文件,我可以通过Flink程序打印Topic。代码如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties prop = new Properties();
prop.setProperty("bootstrap.servers",
"10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");
prop.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer= new FlinkKafkaConsumer<>
("flinkTopic", new SimpleStringSchema(),prop);
myConsumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(myConsumer);
stream.print();
env.execute("Flink Streaming Java API Skeleton");
我的问题是我想逐行阅读主题并分别处理每一行,请您指导我如何逐行阅读 Kafka 主题好吗?
任何帮助将不胜感激。
对于您可能会做的事情的示例,我建议您通过在线方式进行操作 Apache Flink Training。您可以使用 filter、map、flatmap、Windows 和 ProcessFunctions 等操作逐行处理流。
您可能想知道如何方便地处理 CSV 数据。最简单的方法是使用 Table/SQL API,其中有一个 Kafka Connector of its own, and a CSV Format。
在不使用 Flink 的 SQL 引擎的情况下,您可以实现一个将每行文本转换为 POJO 的映射函数。有一个 here 的例子。或者实现您自己的 de/serializer,您使用它来代替 SimpleStringSchema。
首先,我在Kafka topic中加载了一个CSV文件,我可以通过Flink程序打印Topic。代码如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties prop = new Properties();
prop.setProperty("bootstrap.servers",
"10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");
prop.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer= new FlinkKafkaConsumer<>
("flinkTopic", new SimpleStringSchema(),prop);
myConsumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(myConsumer);
stream.print();
env.execute("Flink Streaming Java API Skeleton");
我的问题是我想逐行阅读主题并分别处理每一行,请您指导我如何逐行阅读 Kafka 主题好吗?
任何帮助将不胜感激。
对于您可能会做的事情的示例,我建议您通过在线方式进行操作 Apache Flink Training。您可以使用 filter、map、flatmap、Windows 和 ProcessFunctions 等操作逐行处理流。
您可能想知道如何方便地处理 CSV 数据。最简单的方法是使用 Table/SQL API,其中有一个 Kafka Connector of its own, and a CSV Format。
在不使用 Flink 的 SQL 引擎的情况下,您可以实现一个将每行文本转换为 POJO 的映射函数。有一个 here 的例子。或者实现您自己的 de/serializer,您使用它来代替 SimpleStringSchema。