从 Apache Flink 中的 Kafka 代理读取最新数据
Reading latest data from Kafka broker in Apache Flink
我想从Kafka接收最新的数据给Flink程序,但是Flink正在读取历史数据。
我已将 auto.offset.reset
设置为 latest
,如下所示,但它不起作用
properties.setProperty("auto.offset.reset", "latest");
Flink Programm 正在使用以下代码从 Kafka 接收数据
//getting stream from Kafka and giving it assignTimestampsAndWatermarks
DataStream<JoinedStreamEvent> raw_stream = envrionment.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
new JoinSchema(), properties)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
我正在关注关于
https://issues.apache.org/jira/browse/FLINK-4280 ,建议以下面提到的方式添加源
Properties props = new Properties();
...
FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("topic", schema, props);
kafka.setStartFromEarliest();
kafka.setStartFromLatest();
kafka.setEnableCommitOffsets(boolean); // if true, commits on checkpoint if checkpointing is enabled, otherwise, periodically.
kafka.setForwardMetrics(boolean);
...
env.addSource(kafka)
我也这样做了,但是,我无法访问 setStartFromLatest()
FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09<JoinedStreamEvent>( "test", new JoinSchema(),properties);
What should I do to receive the latest values that are being sent to
Kafka rather than receiving values from history?
通过为发送者和消费者创建名为 test1
的新 group id
并保持主题名称与 test
.
相同,问题已解决
Now I am wondering, is this the best way to solve this issue? because
every time I need to give a new group id
Is there some way I can just read data that is being sent to Kafka?
我相信这对你有用。它对我有用。修改属性和您的 kafka 主题。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "ip:port");
properties.setProperty("zookeeper.connect", "ip:port");
properties.setProperty("group.id", "your-group-id");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer09<>("your-topic", new SimpleStringSchema(), properties));
stream.writeAsText("your-path", FileSystem.WriteMode.OVERWRITE)
.setParallelism(1);
env.execute();
}
我想从Kafka接收最新的数据给Flink程序,但是Flink正在读取历史数据。
我已将 auto.offset.reset
设置为 latest
,如下所示,但它不起作用
properties.setProperty("auto.offset.reset", "latest");
Flink Programm 正在使用以下代码从 Kafka 接收数据
//getting stream from Kafka and giving it assignTimestampsAndWatermarks
DataStream<JoinedStreamEvent> raw_stream = envrionment.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
new JoinSchema(), properties)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
我正在关注关于 https://issues.apache.org/jira/browse/FLINK-4280 ,建议以下面提到的方式添加源
Properties props = new Properties();
...
FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("topic", schema, props);
kafka.setStartFromEarliest();
kafka.setStartFromLatest();
kafka.setEnableCommitOffsets(boolean); // if true, commits on checkpoint if checkpointing is enabled, otherwise, periodically.
kafka.setForwardMetrics(boolean);
...
env.addSource(kafka)
我也这样做了,但是,我无法访问 setStartFromLatest()
FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09<JoinedStreamEvent>( "test", new JoinSchema(),properties);
What should I do to receive the latest values that are being sent to Kafka rather than receiving values from history?
通过为发送者和消费者创建名为 test1
的新 group id
并保持主题名称与 test
.
Now I am wondering, is this the best way to solve this issue? because every time I need to give a new group id
Is there some way I can just read data that is being sent to Kafka?
我相信这对你有用。它对我有用。修改属性和您的 kafka 主题。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "ip:port");
properties.setProperty("zookeeper.connect", "ip:port");
properties.setProperty("group.id", "your-group-id");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer09<>("your-topic", new SimpleStringSchema(), properties));
stream.writeAsText("your-path", FileSystem.WriteMode.OVERWRITE)
.setParallelism(1);
env.execute();
}