flink 1.7.2 dataset 不支持 kafka sink 吗?
Does the flink 1.7.2 dataset not support kafka sink?
flink 1.7.2数据集不支持kafka sink吗?
做完批量操作后我需要发布消息到kafka,意思是source是我的postgres,sink是我的kafka。
可能吗?
开箱即用:还没有,您必须从一开始就使用数据流,或者如上所述创建您自己的自定义输出格式。
然而,有计划在 Flink 2.0 中长期统一 Apache Flink 项目中的 DataSet
和 DataStream
API:https://flink.apache.org/roadmap.html
您可以创建自己的输出格式并使用 Kafka Producer 生成到 Kafka。检查下面的代码。
...
data.output(new KafkaOPFormat());
env.execute();
import java.io.IOException;
import java.util.Properties;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaOPFormat extends RichOutputFormat<Tuple2<String, String>> {
private final Properties properties = new Properties();
private KafkaProducer<String, String> producer;
@Override
public void configure(Configuration configuration) {
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
@Override
public void open(int i, int i1) throws IOException {
producer = new KafkaProducer<String, String>(properties);
}
@Override
public void writeRecord(Tuple2<String, String> record) throws IOException {
producer.send(new ProducerRecord<>(record.f0, record.f1));
}
@Override
public void close() throws IOException {
producer.close();
}
}
PS: 我不记得所有的配置,请检查你的配置并相应地修改。
flink 1.7.2数据集不支持kafka sink吗?
做完批量操作后我需要发布消息到kafka,意思是source是我的postgres,sink是我的kafka。
可能吗?
开箱即用:还没有,您必须从一开始就使用数据流,或者如上所述创建您自己的自定义输出格式。
然而,有计划在 Flink 2.0 中长期统一 Apache Flink 项目中的 DataSet
和 DataStream
API:https://flink.apache.org/roadmap.html
您可以创建自己的输出格式并使用 Kafka Producer 生成到 Kafka。检查下面的代码。
...
data.output(new KafkaOPFormat());
env.execute();
import java.io.IOException;
import java.util.Properties;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaOPFormat extends RichOutputFormat<Tuple2<String, String>> {
private final Properties properties = new Properties();
private KafkaProducer<String, String> producer;
@Override
public void configure(Configuration configuration) {
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
@Override
public void open(int i, int i1) throws IOException {
producer = new KafkaProducer<String, String>(properties);
}
@Override
public void writeRecord(Tuple2<String, String> record) throws IOException {
producer.send(new ProducerRecord<>(record.f0, record.f1));
}
@Override
public void close() throws IOException {
producer.close();
}
}
PS: 我不记得所有的配置,请检查你的配置并相应地修改。