Flink 从 GenericRecord Stream 生成动态流
Flink Generate Dynamic Stream from GenericRecord Stream
我有一个用例,当我们为模式注册表中的主题起诉 TopicRecordNameStrategy 时,多个类型的 Avro 记录出现在单个 Kafka 主题中。
现在我已经编写了一个消费者来阅读该主题并构建一个 GenericRecord 数据流。现在我无法将此流以镶木地板格式下沉到 hdfs/s3,因为此流包含不同类型的模式记录。
因此,我通过应用过滤器并创建不同的流然后分别下沉每个流来为每种类型过滤不同的记录。
下面是我正在使用的代码---
``
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.util.List;
import java.util.Properties;
public class EventStreamProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(EventStreamProcessor.class);
private static final String KAFKA_TOPICS = "events";
private static Properties properties = new Properties();
private static String schemaRegistryUrl = "";
private static CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
public static void main(String args[]) throws Exception {
ParameterTool para = ParameterTool.fromArgs(args);
InputStream inputStreamProperties = EventStreamProcessor.class.getClassLoader().getResourceAsStream(para.get("properties"));
properties.load(inputStreamProperties);
int numSlots = para.getInt("numslots", 1);
int parallelism = para.getInt("parallelism");
String outputPath = para.get("output");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.getConfig().enableForceAvro();
env.enableCheckpointing(60000);
ExecutionConfig executionConfig = env.getConfig();
executionConfig.disableForceKryo();
executionConfig.enableForceAvro();
FlinkKafkaConsumer kafkaConsumer010 = new FlinkKafkaConsumer(KAFKA_TOPICS,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);
Path path = new Path(outputPath);
DataStream<GenericRecord> dataStream = env.addSource(kafkaConsumer010).name("bike_flow_source");
try {
final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema("events-com.events.search_list")))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord.get(Constants.EVENT_NAME).toString().equals("search_list")) {
return true;
}
return false;
}).addSink(sink).name("search_list_sink").setParallelism(parallelism);
final StreamingFileSink<GenericRecord> sink_search_details = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema("events-com.events.search_details")))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord.get(Constants.EVENT_NAME).toString().equals("search_details")) {
return true;
}
return false;
}).addSink(sink_search_details).name("search_details_sink").setParallelism(parallelism);
final StreamingFileSink<GenericRecord> search_list = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema("events-com.events.search_list")))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord.get(Constants.EVENT_NAME).toString().equals("search_list")) {
return true;
}
return false;
}).addSink(search_list).name("search_list_sink").setParallelism(parallelism);
} catch (Exception e) {
LOGGER.info("exception in sinking event");
}
env.execute("event_stream_processor");
}
}
``
所以这对我来说效率很低,因为
- 每次添加新事件时,我都必须更改代码。
- 我必须通过过滤器创建多个流。
所以请向我建议是否可以在不创建多个流的情况下编写 GenericRecord 流。
如果不是,我如何使用一些配置文件使这段代码更加动态,这样每次我都不必为新事件再次编写相同的代码?
请提出一些更好的方法来解决这个问题。
我正在尝试点赞,但它不起作用....
for (EventConfig eventConfig : eventTypesList) {
LOGGER.info("creating a stream for ", eventConfig.getEvent_name());
String key = eventConfig.getEvent_name();
final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
DataStream<GenericRecord> stream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});
Tuple2<DataStream<GenericRecord>, StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
streamMap.put(key, tuple2);
}
DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
searchStream.map(new Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));
请提供实现此目的的正确方法。
谢谢。
好吧,您可以简单地将可能的消息类型列表作为配置参数传递,然后简单地对其进行迭代。你会得到这样的东西:
messageTypes.foreach( msgType => {
final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(msgType)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord.get(Constants.EVENT_NAME).toString().equals(msgType)) {
return true;
}
return false;
}).addSink(sink).name(msgType+"_sink").setParallelism(parallelism);
}})
这意味着您只需要在新消息类型到达时使用更改后的配置重新启动作业。
我有一个用例,当我们为模式注册表中的主题起诉 TopicRecordNameStrategy 时,多个类型的 Avro 记录出现在单个 Kafka 主题中。
现在我已经编写了一个消费者来阅读该主题并构建一个 GenericRecord 数据流。现在我无法将此流以镶木地板格式下沉到 hdfs/s3,因为此流包含不同类型的模式记录。 因此,我通过应用过滤器并创建不同的流然后分别下沉每个流来为每种类型过滤不同的记录。
下面是我正在使用的代码--- ``
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.util.List;
import java.util.Properties;
public class EventStreamProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(EventStreamProcessor.class);
private static final String KAFKA_TOPICS = "events";
private static Properties properties = new Properties();
private static String schemaRegistryUrl = "";
private static CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
public static void main(String args[]) throws Exception {
ParameterTool para = ParameterTool.fromArgs(args);
InputStream inputStreamProperties = EventStreamProcessor.class.getClassLoader().getResourceAsStream(para.get("properties"));
properties.load(inputStreamProperties);
int numSlots = para.getInt("numslots", 1);
int parallelism = para.getInt("parallelism");
String outputPath = para.get("output");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.getConfig().enableForceAvro();
env.enableCheckpointing(60000);
ExecutionConfig executionConfig = env.getConfig();
executionConfig.disableForceKryo();
executionConfig.enableForceAvro();
FlinkKafkaConsumer kafkaConsumer010 = new FlinkKafkaConsumer(KAFKA_TOPICS,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);
Path path = new Path(outputPath);
DataStream<GenericRecord> dataStream = env.addSource(kafkaConsumer010).name("bike_flow_source");
try {
final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema("events-com.events.search_list")))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord.get(Constants.EVENT_NAME).toString().equals("search_list")) {
return true;
}
return false;
}).addSink(sink).name("search_list_sink").setParallelism(parallelism);
final StreamingFileSink<GenericRecord> sink_search_details = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema("events-com.events.search_details")))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord.get(Constants.EVENT_NAME).toString().equals("search_details")) {
return true;
}
return false;
}).addSink(sink_search_details).name("search_details_sink").setParallelism(parallelism);
final StreamingFileSink<GenericRecord> search_list = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema("events-com.events.search_list")))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord.get(Constants.EVENT_NAME).toString().equals("search_list")) {
return true;
}
return false;
}).addSink(search_list).name("search_list_sink").setParallelism(parallelism);
} catch (Exception e) {
LOGGER.info("exception in sinking event");
}
env.execute("event_stream_processor");
}
}
``
所以这对我来说效率很低,因为
- 每次添加新事件时,我都必须更改代码。
- 我必须通过过滤器创建多个流。
所以请向我建议是否可以在不创建多个流的情况下编写 GenericRecord 流。 如果不是,我如何使用一些配置文件使这段代码更加动态,这样每次我都不必为新事件再次编写相同的代码?
请提出一些更好的方法来解决这个问题。
我正在尝试点赞,但它不起作用....
for (EventConfig eventConfig : eventTypesList) {
LOGGER.info("creating a stream for ", eventConfig.getEvent_name());
String key = eventConfig.getEvent_name();
final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
DataStream<GenericRecord> stream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});
Tuple2<DataStream<GenericRecord>, StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
streamMap.put(key, tuple2);
}
DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
searchStream.map(new Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));
请提供实现此目的的正确方法。
谢谢。
好吧,您可以简单地将可能的消息类型列表作为配置参数传递,然后简单地对其进行迭代。你会得到这样的东西:
messageTypes.foreach( msgType => {
final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(msgType)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord.get(Constants.EVENT_NAME).toString().equals(msgType)) {
return true;
}
return false;
}).addSink(sink).name(msgType+"_sink").setParallelism(parallelism);
}})
这意味着您只需要在新消息类型到达时使用更改后的配置重新启动作业。