Apache Flink:如何根据事件类型将事件下沉到不同的 Kafka 主题?
Apache Flink: How to sink events to different Kafka topics depending on the event type?
我想知道是否可以使用 Flink Kafka 接收器根据事件类型编写不同主题的事件?
假设我们有不同类型的事件:通知、消息和好友请求。我们想将这些事件流式传输到名为:notification-topic、messages-topic、friendsRequest-topic 的不同主题。
我尝试了很多不同的方法来解决这个问题,但仍然找不到正确的解决方案。我听说我可以使用 ProcessFunction
但是它怎么会和我的问题有关呢?
如果您使用的是 Kafka:
FlinkKafkaProducer011<Event> producer = new FlinkKafkaProducer011<>(
"default.topic",
new KeyedSerializationSchema<Event>() {
@Override
public byte[] serializeKey( Event element ) {
return null; or element.getKey to bytes...
}
@Override
public byte[] serializeValue( Event element ) {
return event.toBytes() ...
}
@Override
public String getTargetTopic( Event element ) {
return element.getTopic();
}
},
parameterTool.getProperties());
input.addSink(producer);
它将为每个事件调用 getTargetTopic
,以获取您要将事件路由到的主题。它将覆盖 "default.topic"
我想知道是否可以使用 Flink Kafka 接收器根据事件类型编写不同主题的事件? 假设我们有不同类型的事件:通知、消息和好友请求。我们想将这些事件流式传输到名为:notification-topic、messages-topic、friendsRequest-topic 的不同主题。
我尝试了很多不同的方法来解决这个问题,但仍然找不到正确的解决方案。我听说我可以使用 ProcessFunction
但是它怎么会和我的问题有关呢?
如果您使用的是 Kafka:
FlinkKafkaProducer011<Event> producer = new FlinkKafkaProducer011<>(
"default.topic",
new KeyedSerializationSchema<Event>() {
@Override
public byte[] serializeKey( Event element ) {
return null; or element.getKey to bytes...
}
@Override
public byte[] serializeValue( Event element ) {
return event.toBytes() ...
}
@Override
public String getTargetTopic( Event element ) {
return element.getTopic();
}
},
parameterTool.getProperties());
input.addSink(producer);
它将为每个事件调用 getTargetTopic
,以获取您要将事件路由到的主题。它将覆盖 "default.topic"