Flink - 如何使用 withTimestampAssigner 从 Event Payload 获取时间(不使用 Kafka 时间戳)
Flink - How to use withTimestampAssigner getting time from Event Payload (without using Kafka timestamps)
我正在尝试了解如何在 Kafka 源的 WatermarkStrategy 中使用 withTimestampAssigner()。我需要使用的“时间”在消息负载中。
为此,我有以下代码:
FlinkKafkaConsumer<Event> kafkaData =
new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
kafkaData.assignTimestampsAndWatermarks(
WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner(Event, Event.time))
DataStream<Event> stream = env.addSource(kafkaData);
EventDeserializationSchema() 是这样的:
public class EventDeserializationSchema implements DeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
private static final CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age", CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();
private static final ObjectMapper mapper = new CsvMapper();
@Override
public Event deserialize(byte[] message) throws IOException {
return mapper.readerFor(Event.class).with(schema).readValue(message);
}
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}
和事件:
import java.io.Serializable;
public class Event implements Serializable {
public String firstName;
public String lastName;
private int age;
public String time;
public Event() {
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
}
我想了解的是如何将 time 提供给 withTimeStampAssigner():
.withTimestampAssigner(???))
变量应该是Event.time但是从flink页面我不太明白。
我一直在寻找
这让我有点困惑,因为我不明白在我的情况下,解决方案是否非常简单,或者我需要额外的上下文。我发现的所有示例都带有 .forBoundedOutOfOrderness() 或以前版本的 flink,其中实现与此不同:
谢谢!
如果来源(例如 FlinkKafkaConsumer
)没有提供您想要使用的时间戳,那么您需要提供 TimestampAssigner
。这是一个将事件和先前分配的时间戳(如果有)作为输入的函数,returns 时间戳。在你的情况下,它看起来像这样:
FlinkKafkaConsumer<Event> kafkaData =
new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
WatermarkStrategy<Event> wmStrategy =
WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTime());
DataStream<Event> stream = env.addSource(
kafkaData.assignTimestampsAndWatermarks(wmStrategy));
(注意:这不太有效,因为您的 getTime()
方法 return 是一个字符串。您需要解析字符串和 return 长 - 通常它将是一个 long 代表自纪元以来的毫秒数。)
涉及 TimestampAssignerSupplier.Context
或 WatermarkGeneratorSupplier.Context
的情况适用于您需要访问 lower-level API 以执行更多自定义操作的情况。在这种情况下没有必要。
我正在尝试了解如何在 Kafka 源的 WatermarkStrategy 中使用 withTimestampAssigner()。我需要使用的“时间”在消息负载中。
为此,我有以下代码:
FlinkKafkaConsumer<Event> kafkaData =
new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
kafkaData.assignTimestampsAndWatermarks(
WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner(Event, Event.time))
DataStream<Event> stream = env.addSource(kafkaData);
EventDeserializationSchema() 是这样的:
public class EventDeserializationSchema implements DeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
private static final CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age", CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();
private static final ObjectMapper mapper = new CsvMapper();
@Override
public Event deserialize(byte[] message) throws IOException {
return mapper.readerFor(Event.class).with(schema).readValue(message);
}
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}
和事件:
import java.io.Serializable;
public class Event implements Serializable {
public String firstName;
public String lastName;
private int age;
public String time;
public Event() {
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
}
我想了解的是如何将 time 提供给 withTimeStampAssigner():
.withTimestampAssigner(???))
变量应该是Event.time但是从flink页面我不太明白。
我一直在寻找
这让我有点困惑,因为我不明白在我的情况下,解决方案是否非常简单,或者我需要额外的上下文。我发现的所有示例都带有 .forBoundedOutOfOrderness() 或以前版本的 flink,其中实现与此不同:
谢谢!
如果来源(例如 FlinkKafkaConsumer
)没有提供您想要使用的时间戳,那么您需要提供 TimestampAssigner
。这是一个将事件和先前分配的时间戳(如果有)作为输入的函数,returns 时间戳。在你的情况下,它看起来像这样:
FlinkKafkaConsumer<Event> kafkaData =
new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
WatermarkStrategy<Event> wmStrategy =
WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTime());
DataStream<Event> stream = env.addSource(
kafkaData.assignTimestampsAndWatermarks(wmStrategy));
(注意:这不太有效,因为您的 getTime()
方法 return 是一个字符串。您需要解析字符串和 return 长 - 通常它将是一个 long 代表自纪元以来的毫秒数。)
涉及 TimestampAssignerSupplier.Context
或 WatermarkGeneratorSupplier.Context
的情况适用于您需要访问 lower-level API 以执行更多自定义操作的情况。在这种情况下没有必要。