FLINK:如何使用相同的 StreamExecutionEnvironment 从多个 kafka 集群读取
FLINK: How to read from multiple kafka cluster using same StreamExecutionEnvironment
我想在FLINK中读取多个KAFKA集群的数据
但结果是 kafkaMessageStream 仅从第一个 Kafka 读取。
只有当我为两个 Kafka 分别拥有 2 个流时,我才能从两个 Kafka 集群读取,这不是我想要的。
是否可以将多个来源附加到单个 reader。
示例代码
public class KafkaReader<T> implements Reader<T>{
private StreamExecutionEnvironment executionEnvironment ;
public StreamExecutionEnvironment getExecutionEnvironment(Properties properties){
executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setRestartStrategy( RestartStrategies.fixedDelayRestart(3, 1500));
executionEnvironment.enableCheckpointing(
Integer.parseInt(properties.getProperty(Constants.SSE_CHECKPOINT_INTERVAL,"5000")), CheckpointingMode.EXACTLY_ONCE);
executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000);
//executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//try {
// executionEnvironment.setStateBackend(new FsStateBackend(new Path(Constants.SSE_CHECKPOINT_PATH)));
// The RocksDBStateBackend or The FsStateBackend
//} catch (IOException e) {
// LOGGER.error("Exception during initialization of stateBackend in execution environment"+e.getMessage());
}
return executionEnvironment;
}
public DataStream<T> readFromMultiKafka(Properties properties_k1, Properties properties_k2 ,DeserializationSchema<T> deserializationSchema) {
DataStream<T> kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08<T>(
properties_k1.getProperty(Constants.TOPIC),deserializationSchema,
properties_k1));
executionEnvironment.addSource(new FlinkKafkaConsumer08<T>(
properties_k2.getProperty(Constants.TOPIC),deserializationSchema,
properties_k2));
return kafkaMessageStream;
}
public DataStream<T> readFromKafka(Properties properties,DeserializationSchema<T> deserializationSchema) {
DataStream<T> kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08<T>(
properties.getProperty(Constants.TOPIC),deserializationSchema,
properties));
return kafkaMessageStream;
}
}
我的通话:
public static void main( String[] args ) throws Exception
{
Properties pk1 = new Properties();
pk1.setProperty(Constants.TOPIC, "flink_test");
pk1.setProperty("zookeeper.connect", "localhost:2181");
pk1.setProperty("group.id", "1");
pk1.setProperty("bootstrap.servers", "localhost:9092");
Properties pk2 = new Properties();
pk2.setProperty(Constants.TOPIC, "flink_test");
pk2.setProperty("zookeeper.connect", "localhost:2182");
pk2.setProperty("group.id", "1");
pk2.setProperty("bootstrap.servers", "localhost:9093");
Reader<String> reader = new KafkaReader<String>();
//Do not work
StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1);
DataStream<String> dataStream1 = reader.readFromMultiKafka(pk1,pk2,new SimpleStringSchema());
DataStream<ImpressionObject> transform = new TsvTransformer().transform(dataStream);
transform.print();
//Works:
StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1);
DataStream<String> dataStream1 = reader.readFromKafka(pk1, new SimpleStringSchema());
DataStream<String> dataStream2 = reader.readFromKafka(pk2, new SimpleStringSchema());
DataStream<Tuple2<String, Integer>> transform1 = dataStream1.flatMap(new LineSplitter()).keyBy(0)
.timeWindow(Time.seconds(5)).sum(1).setParallelism(5);
DataStream<Tuple2<String, Integer>> transform2 = dataStream2.flatMap(new LineSplitter()).keyBy(0)
.timeWindow(Time.seconds(5)).sum(1).setParallelism(5);
transform1.print();
transform2.print();
environment.execute("Kafka Reader");
}
要解决此问题,我建议您为每个集群创建单独的 FlinkKafkaConsumer 实例(这就是您已经在做的),然后合并生成的流:
StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1);
DataStream<String> dataStream1 = reader.readFromKafka(pk1, new SimpleStringSchema());
DataStream<String> dataStream2 = reader.readFromKafka(pk2, new SimpleStringSchema());
DataStream<String> finalStream = dataStream1.union(dataStream2);
我想在FLINK中读取多个KAFKA集群的数据
但结果是 kafkaMessageStream 仅从第一个 Kafka 读取。
只有当我为两个 Kafka 分别拥有 2 个流时,我才能从两个 Kafka 集群读取,这不是我想要的。
是否可以将多个来源附加到单个 reader。
示例代码
public class KafkaReader<T> implements Reader<T>{
private StreamExecutionEnvironment executionEnvironment ;
public StreamExecutionEnvironment getExecutionEnvironment(Properties properties){
executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setRestartStrategy( RestartStrategies.fixedDelayRestart(3, 1500));
executionEnvironment.enableCheckpointing(
Integer.parseInt(properties.getProperty(Constants.SSE_CHECKPOINT_INTERVAL,"5000")), CheckpointingMode.EXACTLY_ONCE);
executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000);
//executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//try {
// executionEnvironment.setStateBackend(new FsStateBackend(new Path(Constants.SSE_CHECKPOINT_PATH)));
// The RocksDBStateBackend or The FsStateBackend
//} catch (IOException e) {
// LOGGER.error("Exception during initialization of stateBackend in execution environment"+e.getMessage());
}
return executionEnvironment;
}
public DataStream<T> readFromMultiKafka(Properties properties_k1, Properties properties_k2 ,DeserializationSchema<T> deserializationSchema) {
DataStream<T> kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08<T>(
properties_k1.getProperty(Constants.TOPIC),deserializationSchema,
properties_k1));
executionEnvironment.addSource(new FlinkKafkaConsumer08<T>(
properties_k2.getProperty(Constants.TOPIC),deserializationSchema,
properties_k2));
return kafkaMessageStream;
}
public DataStream<T> readFromKafka(Properties properties,DeserializationSchema<T> deserializationSchema) {
DataStream<T> kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08<T>(
properties.getProperty(Constants.TOPIC),deserializationSchema,
properties));
return kafkaMessageStream;
}
}
我的通话:
public static void main( String[] args ) throws Exception
{
Properties pk1 = new Properties();
pk1.setProperty(Constants.TOPIC, "flink_test");
pk1.setProperty("zookeeper.connect", "localhost:2181");
pk1.setProperty("group.id", "1");
pk1.setProperty("bootstrap.servers", "localhost:9092");
Properties pk2 = new Properties();
pk2.setProperty(Constants.TOPIC, "flink_test");
pk2.setProperty("zookeeper.connect", "localhost:2182");
pk2.setProperty("group.id", "1");
pk2.setProperty("bootstrap.servers", "localhost:9093");
Reader<String> reader = new KafkaReader<String>();
//Do not work
StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1);
DataStream<String> dataStream1 = reader.readFromMultiKafka(pk1,pk2,new SimpleStringSchema());
DataStream<ImpressionObject> transform = new TsvTransformer().transform(dataStream);
transform.print();
//Works:
StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1);
DataStream<String> dataStream1 = reader.readFromKafka(pk1, new SimpleStringSchema());
DataStream<String> dataStream2 = reader.readFromKafka(pk2, new SimpleStringSchema());
DataStream<Tuple2<String, Integer>> transform1 = dataStream1.flatMap(new LineSplitter()).keyBy(0)
.timeWindow(Time.seconds(5)).sum(1).setParallelism(5);
DataStream<Tuple2<String, Integer>> transform2 = dataStream2.flatMap(new LineSplitter()).keyBy(0)
.timeWindow(Time.seconds(5)).sum(1).setParallelism(5);
transform1.print();
transform2.print();
environment.execute("Kafka Reader");
}
要解决此问题,我建议您为每个集群创建单独的 FlinkKafkaConsumer 实例(这就是您已经在做的),然后合并生成的流:
StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1);
DataStream<String> dataStream1 = reader.readFromKafka(pk1, new SimpleStringSchema());
DataStream<String> dataStream2 = reader.readFromKafka(pk2, new SimpleStringSchema());
DataStream<String> finalStream = dataStream1.union(dataStream2);