阿帕奇弗林克 1.0.0 。事件时间相关的迁移问题
Apache Flink 1.0.0 . Event Time related migration problems
我尝试将一些简单的 Task 迁移到 Flink 1.0.0 版本,但失败并出现以下异常:
java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
代码由通过 Kafka 主题连接的两个独立任务组成,其中一个任务是简单的消息生成器,另一个任务是简单的消息消费者,它使用 timeWindowAll 来计算分钟消息到达率。
同样,类似的代码在 0.10.2 版本上没有任何问题,但现在看起来系统错误地解释了一些事件时间戳,如 Long.MIN_VALUE,导致任务失败。
问题是我是不是出了什么问题,或者是一些会在未来版本中修复的错误?
主要任务:
public class Test1_0_0 {
// Max Time lag between events time to current System time
static final Time maxTimeLag = Time.of(3, TimeUnit.SECONDS);
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
// Setting Event Time usage
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setBufferTimeout(1);
// Properties initialization
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
// Overwrites the default properties by one provided by command line
ParameterTool parameterTool = ParameterTool.fromArgs(args);
for(Map.Entry<String, String> e: parameterTool.toMap().entrySet()) {
properties.setProperty(e.getKey(),e.getValue());
}
//properties.setProperty("auto.offset.reset", "smallest");
System.out.println("Properties: " + properties);
DataStream<Message> stream = env
.addSource(new FlinkKafkaConsumer09<Message>("test", new MessageSDSchema(), properties)).filter(message -> message != null);
// The call to rebalance() causes data to be re-partitioned so that all machines receive messages
// (for example, when the number of Kafka partitions is fewer than the number of Flink parallel instances).
stream.rebalance()
.assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag));
// Counts messages
stream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() {
@Override
public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception {
Integer count = 0;
if (values.iterator().hasNext()) {
for (Message value : values) {
count++;
}
collector.collect("Arrived last minute: " + count);
}
}
}).print();
// execute program
env.execute("Messages Counting");
}
}
时间戳提取器:
public class MessageTimestampExtractor implements AssignerWithPeriodicWatermarks<Message>, Serializable {
private static final long serialVersionUID = 7526472295622776147L;
// Maximum lag between the current processing time and the timestamp of an event
long maxTimeLag = 0L;
long currentWatermarkTimestamp = 0L;
public MessageTimestampExtractor() {
}
public MessageTimestampExtractor(Time maxTimeLag) {
this.maxTimeLag = maxTimeLag.toMilliseconds();
}
/**
* Assigns a timestamp to an element, in milliseconds since the Epoch.
*
* <p>The method is passed the previously assigned timestamp of the element.
* That previous timestamp may have been assigned from a previous assigner,
* by ingestion time. If the element did not carry a timestamp before, this value is
* {@code Long.MIN_VALUE}.
*
* @param message The element that the timestamp is wil be assigned to.
* @param previousElementTimestamp The previous internal timestamp of the element,
* or a negative value, if no timestamp has been assigned, yet.
* @return The new timestamp.
*/
@Override
public long extractTimestamp(Message message, long previousElementTimestamp) {
long timestamp = message.getTimestamp();
currentWatermarkTimestamp = Math.max(timestamp, currentWatermarkTimestamp);
return timestamp;
}
/**
* Returns the current watermark. This method is periodically called by the
* system to retrieve the current watermark. The method may return null to
* indicate that no new Watermark is available.
*
* <p>The returned watermark will be emitted only if it is non-null and larger
* than the previously emitted watermark. If the current watermark is still
* identical to the previous one, no progress in event time has happened since
* the previous call to this method.
*
* <p>If this method returns a value that is smaller than the previously returned watermark,
* then the implementation does not properly handle the event stream timestamps.
* In that case, the returned watermark will not be emitted (to preserve the contract of
* ascending watermarks), and the violation will be logged and registered in the metrics.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*
* @see org.apache.flink.streaming.api.watermark.Watermark
* @see ExecutionConfig#getAutoWatermarkInterval()
*/
@Override
public Watermark getCurrentWatermark() {
if(currentWatermarkTimestamp <= 0) {
return new Watermark(Long.MIN_VALUE);
}
return new Watermark(currentWatermarkTimestamp - maxTimeLag);
}
public long getMaxTimeLag() {
return maxTimeLag;
}
public void setMaxTimeLag(long maxTimeLag) {
this.maxTimeLag = maxTimeLag;
}
}
问题是调用 assignTimestampsAndWatermarks
returns 一个使用时间戳提取器的新 DataStream
。因此,您必须使用返回的 DataStream
对其进行后续操作。
DataStream<Message> timestampStream = stream.rebalance()
.assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag));
// Counts Strings
timestampStream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() {
@Override
public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception {
Integer count = 0;
if (values.iterator().hasNext()) {
for (Message value : values) {
count++;
}
collector.collect("Arrived last minute: " + count);
}
}
}).print();
我尝试将一些简单的 Task 迁移到 Flink 1.0.0 版本,但失败并出现以下异常:
java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
代码由通过 Kafka 主题连接的两个独立任务组成,其中一个任务是简单的消息生成器,另一个任务是简单的消息消费者,它使用 timeWindowAll 来计算分钟消息到达率。
同样,类似的代码在 0.10.2 版本上没有任何问题,但现在看起来系统错误地解释了一些事件时间戳,如 Long.MIN_VALUE,导致任务失败。
问题是我是不是出了什么问题,或者是一些会在未来版本中修复的错误?
主要任务:
public class Test1_0_0 {
// Max Time lag between events time to current System time
static final Time maxTimeLag = Time.of(3, TimeUnit.SECONDS);
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
// Setting Event Time usage
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setBufferTimeout(1);
// Properties initialization
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
// Overwrites the default properties by one provided by command line
ParameterTool parameterTool = ParameterTool.fromArgs(args);
for(Map.Entry<String, String> e: parameterTool.toMap().entrySet()) {
properties.setProperty(e.getKey(),e.getValue());
}
//properties.setProperty("auto.offset.reset", "smallest");
System.out.println("Properties: " + properties);
DataStream<Message> stream = env
.addSource(new FlinkKafkaConsumer09<Message>("test", new MessageSDSchema(), properties)).filter(message -> message != null);
// The call to rebalance() causes data to be re-partitioned so that all machines receive messages
// (for example, when the number of Kafka partitions is fewer than the number of Flink parallel instances).
stream.rebalance()
.assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag));
// Counts messages
stream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() {
@Override
public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception {
Integer count = 0;
if (values.iterator().hasNext()) {
for (Message value : values) {
count++;
}
collector.collect("Arrived last minute: " + count);
}
}
}).print();
// execute program
env.execute("Messages Counting");
}
}
时间戳提取器:
public class MessageTimestampExtractor implements AssignerWithPeriodicWatermarks<Message>, Serializable {
private static final long serialVersionUID = 7526472295622776147L;
// Maximum lag between the current processing time and the timestamp of an event
long maxTimeLag = 0L;
long currentWatermarkTimestamp = 0L;
public MessageTimestampExtractor() {
}
public MessageTimestampExtractor(Time maxTimeLag) {
this.maxTimeLag = maxTimeLag.toMilliseconds();
}
/**
* Assigns a timestamp to an element, in milliseconds since the Epoch.
*
* <p>The method is passed the previously assigned timestamp of the element.
* That previous timestamp may have been assigned from a previous assigner,
* by ingestion time. If the element did not carry a timestamp before, this value is
* {@code Long.MIN_VALUE}.
*
* @param message The element that the timestamp is wil be assigned to.
* @param previousElementTimestamp The previous internal timestamp of the element,
* or a negative value, if no timestamp has been assigned, yet.
* @return The new timestamp.
*/
@Override
public long extractTimestamp(Message message, long previousElementTimestamp) {
long timestamp = message.getTimestamp();
currentWatermarkTimestamp = Math.max(timestamp, currentWatermarkTimestamp);
return timestamp;
}
/**
* Returns the current watermark. This method is periodically called by the
* system to retrieve the current watermark. The method may return null to
* indicate that no new Watermark is available.
*
* <p>The returned watermark will be emitted only if it is non-null and larger
* than the previously emitted watermark. If the current watermark is still
* identical to the previous one, no progress in event time has happened since
* the previous call to this method.
*
* <p>If this method returns a value that is smaller than the previously returned watermark,
* then the implementation does not properly handle the event stream timestamps.
* In that case, the returned watermark will not be emitted (to preserve the contract of
* ascending watermarks), and the violation will be logged and registered in the metrics.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*
* @see org.apache.flink.streaming.api.watermark.Watermark
* @see ExecutionConfig#getAutoWatermarkInterval()
*/
@Override
public Watermark getCurrentWatermark() {
if(currentWatermarkTimestamp <= 0) {
return new Watermark(Long.MIN_VALUE);
}
return new Watermark(currentWatermarkTimestamp - maxTimeLag);
}
public long getMaxTimeLag() {
return maxTimeLag;
}
public void setMaxTimeLag(long maxTimeLag) {
this.maxTimeLag = maxTimeLag;
}
}
问题是调用 assignTimestampsAndWatermarks
returns 一个使用时间戳提取器的新 DataStream
。因此,您必须使用返回的 DataStream
对其进行后续操作。
DataStream<Message> timestampStream = stream.rebalance()
.assignTimestampsAndWatermarks(new MessageTimestampExtractor(maxTimeLag));
// Counts Strings
timestampStream.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Message, String, TimeWindow>() {
@Override
public void apply(TimeWindow timeWindow, Iterable<Message> values, Collector<String> collector) throws Exception {
Integer count = 0;
if (values.iterator().hasNext()) {
for (Message value : values) {
count++;
}
collector.collect("Arrived last minute: " + count);
}
}
}).print();