升级 Flink 弃用的函数调用
Upgrading Flink deprecated function calls
我目前正在尝试升级应用于数据流的方法调用 assignTimestampsAndWatermarks
。数据流看起来像这样:
DataStream<Auction> auctions = env.addSource(new AuctionSourceFunction(auctionSrcRates))
.name("Custom Source")
.setParallelism(params.getInt("p-auction-source", 1))
.assignTimestampsAndWatermarks(new AuctionTimestampAssigner());
AssignerWithPeriodicWatermark 看起来像这样:
private static final class AuctionTimestampAssigner implements AssignerWithPeriodicWatermarks<Auction> {
private long maxTimestamp = Long.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(maxTimestamp);
}
@Override
public long extractTimestamp(Auction element, long previousElementTimestamp) {
maxTimestamp = Math.max(maxTimestamp, element.dateTime);
return element.dateTime;
}
}
要从已弃用的调用升级到当前的最佳做法,我需要采取哪些步骤?谢谢
您的水印生成器假定事件按时间戳是有序的,或者至少接受任何无序事件都会延迟。这相当于
assignTimestampsAndWatermarks(
WatermarkStrategy
.<Auction>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.dateTime))
我目前正在尝试升级应用于数据流的方法调用 assignTimestampsAndWatermarks
。数据流看起来像这样:
DataStream<Auction> auctions = env.addSource(new AuctionSourceFunction(auctionSrcRates))
.name("Custom Source")
.setParallelism(params.getInt("p-auction-source", 1))
.assignTimestampsAndWatermarks(new AuctionTimestampAssigner());
AssignerWithPeriodicWatermark 看起来像这样:
private static final class AuctionTimestampAssigner implements AssignerWithPeriodicWatermarks<Auction> {
private long maxTimestamp = Long.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(maxTimestamp);
}
@Override
public long extractTimestamp(Auction element, long previousElementTimestamp) {
maxTimestamp = Math.max(maxTimestamp, element.dateTime);
return element.dateTime;
}
}
要从已弃用的调用升级到当前的最佳做法,我需要采取哪些步骤?谢谢
您的水印生成器假定事件按时间戳是有序的,或者至少接受任何无序事件都会延迟。这相当于
assignTimestampsAndWatermarks(
WatermarkStrategy
.<Auction>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.dateTime))