Stream window processing 使用 Flink 和 kinesis streams 处理不工作
Stream window processing processing with Flink and kinesis streams is not working
我正在使用 Flink 读取运动流。它根据时间 window 和密钥聚合某些事件。代码在 reduce 之后不做任何事情。没有数据映射到输出 csv 中。我已经等了很多分钟了(即使时间window只有两分钟)。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(CommonTimeConstants.TWO_MINUTES.toMilliseconds());
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(1, TimeUnit.MINUTES)));
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, PropertyFileUtils.get("aws.region", ""));
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, PropertyFileUtils.get("aws.accessKeyId", ""));
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, PropertyFileUtils.get("aws.secretAccessKey", ""));
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
DataStream<APIActionLog> apiLogRecords = env.addSource(new FlinkKinesisConsumer<>(
ProjectProperties.SOURCE_ENV_PREFIX, // stream name
new StreamedApiLogRecordDeserializationSchema(),
consumerConfig));
apiLogRecords.assignTimestampsAndWatermarks(API_LOG_RECORD_BOUNDED_OUT_OF_ORDERNESS_TIMESTAMP_EXTRACTOR);
DataStream<Tuple7<String, String, String, String, Timestamp, String, Integer>> skuPlatformTsCount =
apiLogRecords.flatMap(collecting events...)
.keyBy(Key based on some parameters of the event...)
.timeWindow(TWO_MINUTES)
.reduce(adding up event parameter..., window function...)
.map(Map to get a different tuple format...);
skuPlatformTsCount.writeAsCsv("/Users/uday/Desktop/out.csv", FileSystem.WriteMode.OVERWRITE);
env.execute("Processing ATC Log Stream");
}
private static final BoundedOutOfOrdernessTimestampExtractor<APIActionLog> API_LOG_RECORD_BOUNDED_OUT_OF_ORDERNESS_TIMESTAMP_EXTRACTOR =
new BoundedOutOfOrdernessTimestampExtractor<APIActionLog>(TEN_SECONDS) {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(APIActionLog apiActionLog) {
return apiActionLog.getTs().getTime();
}
};
这是一个愚蠢的错误。
apiLogRecords.assignTimestampsAndWatermarks(API_LOG_RECORD_BOUNDED_OUT_OF_ORDERNESS_TIMESTAMP_EXTRACTOR);
调用 returns 具有指定水印的新流。这个返回值应该在后面的操作中使用。
我正在使用 Flink 读取运动流。它根据时间 window 和密钥聚合某些事件。代码在 reduce 之后不做任何事情。没有数据映射到输出 csv 中。我已经等了很多分钟了(即使时间window只有两分钟)。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(CommonTimeConstants.TWO_MINUTES.toMilliseconds());
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(1, TimeUnit.MINUTES)));
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, PropertyFileUtils.get("aws.region", ""));
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, PropertyFileUtils.get("aws.accessKeyId", ""));
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, PropertyFileUtils.get("aws.secretAccessKey", ""));
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
DataStream<APIActionLog> apiLogRecords = env.addSource(new FlinkKinesisConsumer<>(
ProjectProperties.SOURCE_ENV_PREFIX, // stream name
new StreamedApiLogRecordDeserializationSchema(),
consumerConfig));
apiLogRecords.assignTimestampsAndWatermarks(API_LOG_RECORD_BOUNDED_OUT_OF_ORDERNESS_TIMESTAMP_EXTRACTOR);
DataStream<Tuple7<String, String, String, String, Timestamp, String, Integer>> skuPlatformTsCount =
apiLogRecords.flatMap(collecting events...)
.keyBy(Key based on some parameters of the event...)
.timeWindow(TWO_MINUTES)
.reduce(adding up event parameter..., window function...)
.map(Map to get a different tuple format...);
skuPlatformTsCount.writeAsCsv("/Users/uday/Desktop/out.csv", FileSystem.WriteMode.OVERWRITE);
env.execute("Processing ATC Log Stream");
}
private static final BoundedOutOfOrdernessTimestampExtractor<APIActionLog> API_LOG_RECORD_BOUNDED_OUT_OF_ORDERNESS_TIMESTAMP_EXTRACTOR =
new BoundedOutOfOrdernessTimestampExtractor<APIActionLog>(TEN_SECONDS) {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(APIActionLog apiActionLog) {
return apiActionLog.getTs().getTime();
}
};
这是一个愚蠢的错误。
apiLogRecords.assignTimestampsAndWatermarks(API_LOG_RECORD_BOUNDED_OUT_OF_ORDERNESS_TIMESTAMP_EXTRACTOR);
调用 returns 具有指定水印的新流。这个返回值应该在后面的操作中使用。