Apache Flink 从 Csv 文件生成 Kafka

Apache Flink Produce Kafka from Csv File

我想从 CSV 文件生成 kafka,但 kafka 输出如下;

org.apache.flink.streaming.api.datastream.DataStreamSource@28aaa5a7

我该怎么做?

我的代码;

public static class SimpleStringGenerator implements SourceFunction<String> {
        private static final long serialVersionUID = 2174904787118597072L;
        boolean running = true;
        long i = 0;
        @Override
        public void run(SourceContext<String> ctx) throws Exception {



            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<String> text = env.readTextFile("/home/train/Desktop/yaz/aa/1");

            ctx.collect(String.valueOf(text));
            Thread.sleep(10);

        }
        

text 是一个 DataStream 对象,它表示无限的元素流(在您的代码中,测试文件中的每一行都是一个不同的元素),因此它不是实际的文件内容。

如果您想要将这些元素生成到 Kafka,则需要初始化一个 Kafka 接收器并将您的 DataStream 对象连接到它。

来自 Flink 文档:

DataStream<String> stream = ...;
        
KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers(brokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
        
stream.sinkTo(sink);