Apache Flink:使用事件时间对数据集进行排序并从多个输入文件创建数据流

Apache Flink: Sorting dataset and creating DataStream from multiple input files with Event times

目前,我正在开展一个项目,其中有一个 CSV 文件需要在 "stream processed" 之前进行预处理。因此,我需要执行批处理和流处理。详细地说,我的 data.csv 文件需要在特定字段上进行预处理和排序,这将作为流处理的 EventTime 时间戳。下面的批处理脚本生成预处理输出:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Long, String, String>> compactData = env
    .readTextFile("data.csv")
    .flatMap(new RichFlatMapFunction<String, Tuple3<Long, String, String>>() {
      private CustomDelegate delegate;
      @Override
      public void open(Configuration parameters) throws Exception {
        delegate = new CustomDelegate();
      }
      @Override
      public void flatMap(String s, Collector<Tuple3<Long, String, String>> out)
          throws Exception {
        Tuple3<Long, String, String> datum = delegate.deserializeRide(s);
        if (datum != null)
          out.collect(datum);
      }
    });
compactData.partitionByRange(0)
    .sortPartition(0, Order.ASCENDING)
    .writeAsCsv("output_dir", "\n", ",");
env.execute();

我的默认并行度是 32,当批处理脚本(上面)结束执行时,会创建 output_dir 目录,它包含 32 个文件。

问题一:我的问题是这些文件是不是按照全局顺序生成的。本质上,文件 1 中的记录是否比文件 2 中的记录具有更小的值(依此类推)?如果不是,我怎么能保证以前的或等价的东西?

正如我上面提到的,我使用 output_dir 中的文件作为我的流处理作业的输入,它由前一个(即 EventTime)的第一个字段加上时间戳。流作业的代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Phase 0: Collect input and create timestamps
DataStream<Tuple3<Long, String, Integer>> dataStream = env
    .readTextFile("output_dir")
    .map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
      @Override
      public Tuple3<Long, String, Integer> map(String s) throws Exception {
        String[] tokens = s.split(",");
        return new Tuple3<Long, String, Integer>(Long.parseLong(tokens[0]),
            tokens[1] + "-" + tokens[2], 1);
      }
    })
    .assignTimestampsAndWatermarks(
        new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {
          @Override
          public long extractAscendingTimestamp(Tuple3<Long, String, Integer> t) {
            return t.f0;
          }
        });
env.execute();

问题2:我将目录output_dir定义为输入,其中包含按字段0排序的文件。这些记录会被解析并放在数据流中吗基于我想要的顺序(即在他们的字段 0 上)。如果不是,我相信我在分配时间戳时会遇到问题(对吗?)?我可以从多个文件中读取 DataStream(就像我现在所做的那样),还是必须将所有文件合并为一个文件,然后通过从单个文件中连续读取所有记录来创建 DataStream?

Question 1: My question is whether those files are produced based on the global order. In essence, do the records in file 1 have smaller values compared to the records in file 2 (and so on.)? If no, how can I guarantee the previous or something equivalent?

没有。由于有 32 个分区,每个分区中的数据是有序的。但不保证不同输出文件之间的数据顺序。您可以手动将 sortPartition operator 的并行度设置为 1 或实现您自己的 Partitioner 而不是 hash partitioner。

Question 2: I define as input the directory output_dir, which contains the files sorted on field 0. Will the records be parsed and placed on the data stream based on the ordering that I want (i.e., on their field 0). If no, I believe that I will have problems with assigning timestamps (right?)? Can I have the DataStream be read from multiple files (as I do now), or do I have to combine all files into one, and create the DataStream by reading all records serially from a single file?

假设有32个输出文件,如果你的streaming job的并行度也是32,那么每个文件都会消耗一个并行度,这个输入文件的所有数据都会按照出现的数据顺序进行处理在当前并行度的文件中。但是一旦您尝试从 32 并行度聚合数据或尝试打乱数据,数据的顺序将不再排序。如果您希望接收方对数据进行全局排序,您可能必须将所有数据放在一个文件中,并使用一个并行度的流处理作业来处理它们。