Apache Beam 流处理事件时间

Apache beam stream processing event time

我正在尝试使用 apache beam 创建一个事件处理流。

我的流中发生的步骤:

  1. 以 avro 格式读取 kafka 主题并使用模式注册表反序列化 avro
  2. 创建固定大小window(1 小时)每 10 分钟触发一次(处理时间)
  3. 在 GCP 中按主题名称划分目录编写 avro 文件。 (文件名 = 架构 + 开始-结束-window-窗格)

现在让我们深入代码。

  1. 此代码显示了我如何从 Kafka 读取数据。我使用自定义反序列化器和编码器使用模式注册表正确反序列化(在我的例子中是 hortonworks)。
KafkaIO.<String, AvroGenericRecord>read()
               .withBootstrapServers(bootstrapServers)
               .withConsumerConfigUpdates(configUpdates)
               .withTopics(inputTopics)
               .withKeyDeserializer(StringDeserializer.class)
               .withValueDeserializerAndCoder(BeamKafkaAvroGenericDeserializer.class, AvroGenericCoder.of(serDeConfig()))
               .commitOffsetsInFinalize()
               .withoutMetadata();
  1. KafkaIO 读取记录后的管道中正在创建 windowing.
records.apply(Window.<AvroGenericRecord>into(FixedWindows.of(Duration.standardHours(1)))
                .triggering(AfterWatermark.pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))
                        .withLateFirings(AfterPane.elementCountAtLeast(1))
                )
                .withAllowedLateness(Duration.standardMinutes(5))
                .discardingFiredPanes()
        )

我想通过此 window 实现的是每 1 小时 按事件时间对数据进行分组,并每 10 分钟 触发一次.

  1. 按 window 分组后,它开始写入 Google 云存储 (GCS)。
public class WriteAvroFilesTr extends PTransform<PCollection<AvroGenericRecord>, WriteFilesResult<AvroDestination>> {
    private String baseDir;
    private int numberOfShards;

    public WriteAvroFilesTr(String baseDir, int numberOfShards) {
        this.baseDir = baseDir;
        this.numberOfShards = numberOfShards;
    }

    @Override
    public WriteFilesResult<AvroDestination> expand(PCollection<AvroGenericRecord> input) {
        ResourceId tempDir = getTempDir(baseDir);

        return input.apply(AvroIO.<AvroGenericRecord>writeCustomTypeToGenericRecords()
                .withTempDirectory(tempDir)
                .withWindowedWrites()
                .withNumShards(numberOfShards)
                .to(new DynamicAvroGenericRecordDestinations(baseDir, Constants.FILE_EXTENSION))
        );
    }

    private ResourceId getTempDir(String baseDir) {
        return FileSystems.matchNewResource(baseDir + "/temp", true);
    }
}

public class DynamicAvroGenericRecordDestinations extends DynamicAvroDestinations<AvroGenericRecord, AvroDestination, GenericRecord> {
    private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
    private final String baseDir;
    private final String fileExtension;

    public DynamicAvroGenericRecordDestinations(String baseDir, String fileExtension) {
        this.baseDir = baseDir;
        this.fileExtension = fileExtension;
    }

    @Override
    public Schema getSchema(AvroDestination destination) {
        return new Schema.Parser().parse(destination.jsonSchema);
    }

    @Override
    public GenericRecord formatRecord(AvroGenericRecord record) {
        return record.getRecord();
    }

    @Override
    public AvroDestination getDestination(AvroGenericRecord record) {
        Schema schema = record.getRecord().getSchema();
        return AvroDestination.of(record.getName(), record.getDate(), record.getVersionId(), schema.toString());
    }

    @Override
    public AvroDestination getDefaultDestination() {
        return new AvroDestination();
    }

    @Override
    public FileBasedSink.FilenamePolicy getFilenamePolicy(AvroDestination destination) {
        String pathStr = baseDir + "/" + destination.name + "/" + destination.date + "/" + destination.name;
        return new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(pathStr), destination.version, fileExtension);
    }

    private static class WindowedFilenamePolicy extends FileBasedSink.FilenamePolicy {
        final ResourceId outputFilePrefix;
        final String fileExtension;
        final Integer version;

        WindowedFilenamePolicy(ResourceId outputFilePrefix, Integer version, String fileExtension) {
            this.outputFilePrefix = outputFilePrefix;
            this.version = version;
            this.fileExtension = fileExtension;
        }

        @Override
        public ResourceId windowedFilename(
                int shardNumber,
                int numShards,
                BoundedWindow window,
                PaneInfo paneInfo,
                FileBasedSink.OutputFileHints outputFileHints) {

            IntervalWindow intervalWindow = (IntervalWindow) window;

            String filenamePrefix =
                    outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");

            String filename =
                    String.format("%s-%s(%s-%s)-(%s-of-%s)%s", filenamePrefix,
                            version,
                            formatter.print(intervalWindow.start()),
                            formatter.print(intervalWindow.end()),
                            shardNumber,
                            numShards - 1,
                            fileExtension);
            ResourceId result = outputFilePrefix.getCurrentDirectory();
            return result.resolve(filename, RESOLVE_FILE);
        }


        @Override
        public ResourceId unwindowedFilename(
                int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
            throw new UnsupportedOperationException("Expecting windowed outputs only");
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(
                    DisplayData.item("fileNamePrefix", outputFilePrefix.toString())
                            .withLabel("File Name Prefix"));
        }
    }

}

我已经写下了整个管道。它工作得很好,但我误解了(不确定)我按事件时间处理事件。

有人可以查看我的代码(尤其是我阅读并按 windows 分组的 1 和 2 步)是否 windows 按事件时间?

P.S. 对于 Kafka 中的每条记录,我在里面都有时间戳字段。

UPD

谢谢jjayadeep

我在 KafkaIO 自定义 TimestampPolicy 中包含

static class CustomTimestampPolicy extends TimestampPolicy<String, AvroGenericRecord> {

        protected Instant currentWatermark;

        CustomTimestampPolicy(Optional<Instant> previousWatermark) {
            this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, AvroGenericRecord> record) {
            currentWatermark = Instant.ofEpochMilli(record.getKV().getValue().getTimestamp());
            return currentWatermark;
        }

        @Override
        public Instant getWatermark(PartitionContext ctx) {
            return currentWatermark;
        }
    }

来自此处的文档 [1] KafkaIO 默认使用事件时间作为处理时间

By default, record timestamp (event time) is set to processing time in KafkaIO reader and source watermark is current wall time. If a topic has Kafka server-side ingestion timestamp enabled ('LogAppendTime'), it can enabled with KafkaIO.Read.withLogAppendTime(). A custom timestamp policy can be provided by implementing TimestampPolicyFactory. See KafkaIO.Read.withTimestampPolicyFactory(TimestampPolicyFactory) for more information.

处理时间也是默认的时间戳方法,如下所述

// set event times and watermark based on LogAppendTime. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.

1 - https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.html