Apache Flink 从 Csv 文件生成 Kafka
Apache Flink Produce Kafka from Csv File
我想从 CSV 文件生成 kafka,但 kafka 输出如下;
org.apache.flink.streaming.api.datastream.DataStreamSource@28aaa5a7
我该怎么做?
我的代码;
public static class SimpleStringGenerator implements SourceFunction<String> {
private static final long serialVersionUID = 2174904787118597072L;
boolean running = true;
long i = 0;
@Override
public void run(SourceContext<String> ctx) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("/home/train/Desktop/yaz/aa/1");
ctx.collect(String.valueOf(text));
Thread.sleep(10);
}
text
是一个 DataStream 对象,它表示无限的元素流(在您的代码中,测试文件中的每一行都是一个不同的元素),因此它不是实际的文件内容。
如果您想要将这些元素生成到 Kafka,则需要初始化一个 Kafka 接收器并将您的 DataStream 对象连接到它。
来自 Flink 文档:
DataStream<String> stream = ...;
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
stream.sinkTo(sink);
我想从 CSV 文件生成 kafka,但 kafka 输出如下;
org.apache.flink.streaming.api.datastream.DataStreamSource@28aaa5a7
我该怎么做?
我的代码;
public static class SimpleStringGenerator implements SourceFunction<String> {
private static final long serialVersionUID = 2174904787118597072L;
boolean running = true;
long i = 0;
@Override
public void run(SourceContext<String> ctx) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("/home/train/Desktop/yaz/aa/1");
ctx.collect(String.valueOf(text));
Thread.sleep(10);
}
text
是一个 DataStream 对象,它表示无限的元素流(在您的代码中,测试文件中的每一行都是一个不同的元素),因此它不是实际的文件内容。
如果您想要将这些元素生成到 Kafka,则需要初始化一个 Kafka 接收器并将您的 DataStream 对象连接到它。
来自 Flink 文档:
DataStream<String> stream = ...;
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
stream.sinkTo(sink);