Apache Beam 流处理事件时间
Apache beam stream processing event time
我正在尝试使用 apache beam 创建一个事件处理流。
我的流中发生的步骤:
- 以 avro 格式读取 kafka 主题并使用模式注册表反序列化 avro
- 创建固定大小window(1 小时)每 10 分钟触发一次(处理时间)
- 在 GCP 中按主题名称划分目录编写 avro 文件。 (文件名 = 架构 + 开始-结束-window-窗格)
现在让我们深入代码。
- 此代码显示了我如何从 Kafka 读取数据。我使用自定义反序列化器和编码器使用模式注册表正确反序列化(在我的例子中是 hortonworks)。
KafkaIO.<String, AvroGenericRecord>read()
.withBootstrapServers(bootstrapServers)
.withConsumerConfigUpdates(configUpdates)
.withTopics(inputTopics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(BeamKafkaAvroGenericDeserializer.class, AvroGenericCoder.of(serDeConfig()))
.commitOffsetsInFinalize()
.withoutMetadata();
- 在 KafkaIO 读取记录后的管道中正在创建 windowing.
records.apply(Window.<AvroGenericRecord>into(FixedWindows.of(Duration.standardHours(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))
.withLateFirings(AfterPane.elementCountAtLeast(1))
)
.withAllowedLateness(Duration.standardMinutes(5))
.discardingFiredPanes()
)
我想通过此 window 实现的是每 1 小时 按事件时间对数据进行分组,并每 10 分钟 触发一次.
- 按 window 分组后,它开始写入 Google 云存储 (GCS)。
public class WriteAvroFilesTr extends PTransform<PCollection<AvroGenericRecord>, WriteFilesResult<AvroDestination>> {
private String baseDir;
private int numberOfShards;
public WriteAvroFilesTr(String baseDir, int numberOfShards) {
this.baseDir = baseDir;
this.numberOfShards = numberOfShards;
}
@Override
public WriteFilesResult<AvroDestination> expand(PCollection<AvroGenericRecord> input) {
ResourceId tempDir = getTempDir(baseDir);
return input.apply(AvroIO.<AvroGenericRecord>writeCustomTypeToGenericRecords()
.withTempDirectory(tempDir)
.withWindowedWrites()
.withNumShards(numberOfShards)
.to(new DynamicAvroGenericRecordDestinations(baseDir, Constants.FILE_EXTENSION))
);
}
private ResourceId getTempDir(String baseDir) {
return FileSystems.matchNewResource(baseDir + "/temp", true);
}
}
和
public class DynamicAvroGenericRecordDestinations extends DynamicAvroDestinations<AvroGenericRecord, AvroDestination, GenericRecord> {
private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
private final String baseDir;
private final String fileExtension;
public DynamicAvroGenericRecordDestinations(String baseDir, String fileExtension) {
this.baseDir = baseDir;
this.fileExtension = fileExtension;
}
@Override
public Schema getSchema(AvroDestination destination) {
return new Schema.Parser().parse(destination.jsonSchema);
}
@Override
public GenericRecord formatRecord(AvroGenericRecord record) {
return record.getRecord();
}
@Override
public AvroDestination getDestination(AvroGenericRecord record) {
Schema schema = record.getRecord().getSchema();
return AvroDestination.of(record.getName(), record.getDate(), record.getVersionId(), schema.toString());
}
@Override
public AvroDestination getDefaultDestination() {
return new AvroDestination();
}
@Override
public FileBasedSink.FilenamePolicy getFilenamePolicy(AvroDestination destination) {
String pathStr = baseDir + "/" + destination.name + "/" + destination.date + "/" + destination.name;
return new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(pathStr), destination.version, fileExtension);
}
private static class WindowedFilenamePolicy extends FileBasedSink.FilenamePolicy {
final ResourceId outputFilePrefix;
final String fileExtension;
final Integer version;
WindowedFilenamePolicy(ResourceId outputFilePrefix, Integer version, String fileExtension) {
this.outputFilePrefix = outputFilePrefix;
this.version = version;
this.fileExtension = fileExtension;
}
@Override
public ResourceId windowedFilename(
int shardNumber,
int numShards,
BoundedWindow window,
PaneInfo paneInfo,
FileBasedSink.OutputFileHints outputFileHints) {
IntervalWindow intervalWindow = (IntervalWindow) window;
String filenamePrefix =
outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");
String filename =
String.format("%s-%s(%s-%s)-(%s-of-%s)%s", filenamePrefix,
version,
formatter.print(intervalWindow.start()),
formatter.print(intervalWindow.end()),
shardNumber,
numShards - 1,
fileExtension);
ResourceId result = outputFilePrefix.getCurrentDirectory();
return result.resolve(filename, RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Expecting windowed outputs only");
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(
DisplayData.item("fileNamePrefix", outputFilePrefix.toString())
.withLabel("File Name Prefix"));
}
}
}
我已经写下了整个管道。它工作得很好,但我误解了(不确定)我按事件时间处理事件。
有人可以查看我的代码(尤其是我阅读并按 windows 分组的 1 和 2 步)是否 windows 按事件时间?
P.S. 对于 Kafka 中的每条记录,我在里面都有时间戳字段。
UPD
谢谢jjayadeep
我在 KafkaIO 自定义 TimestampPolicy 中包含
static class CustomTimestampPolicy extends TimestampPolicy<String, AvroGenericRecord> {
protected Instant currentWatermark;
CustomTimestampPolicy(Optional<Instant> previousWatermark) {
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, AvroGenericRecord> record) {
currentWatermark = Instant.ofEpochMilli(record.getKV().getValue().getTimestamp());
return currentWatermark;
}
@Override
public Instant getWatermark(PartitionContext ctx) {
return currentWatermark;
}
}
来自此处的文档 [1] KafkaIO 默认使用事件时间作为处理时间
By default, record timestamp (event time) is set to processing time in KafkaIO reader and source watermark is current wall time. If a topic has Kafka server-side ingestion timestamp enabled ('LogAppendTime'), it can enabled with KafkaIO.Read.withLogAppendTime(). A custom timestamp policy can be provided by implementing TimestampPolicyFactory. See KafkaIO.Read.withTimestampPolicyFactory(TimestampPolicyFactory) for more information.
处理时间也是默认的时间戳方法,如下所述
// set event times and watermark based on LogAppendTime. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
1 - https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
我正在尝试使用 apache beam 创建一个事件处理流。
我的流中发生的步骤:
- 以 avro 格式读取 kafka 主题并使用模式注册表反序列化 avro
- 创建固定大小window(1 小时)每 10 分钟触发一次(处理时间)
- 在 GCP 中按主题名称划分目录编写 avro 文件。 (文件名 = 架构 + 开始-结束-window-窗格)
现在让我们深入代码。
- 此代码显示了我如何从 Kafka 读取数据。我使用自定义反序列化器和编码器使用模式注册表正确反序列化(在我的例子中是 hortonworks)。
KafkaIO.<String, AvroGenericRecord>read()
.withBootstrapServers(bootstrapServers)
.withConsumerConfigUpdates(configUpdates)
.withTopics(inputTopics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(BeamKafkaAvroGenericDeserializer.class, AvroGenericCoder.of(serDeConfig()))
.commitOffsetsInFinalize()
.withoutMetadata();
- 在 KafkaIO 读取记录后的管道中正在创建 windowing.
records.apply(Window.<AvroGenericRecord>into(FixedWindows.of(Duration.standardHours(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))
.withLateFirings(AfterPane.elementCountAtLeast(1))
)
.withAllowedLateness(Duration.standardMinutes(5))
.discardingFiredPanes()
)
我想通过此 window 实现的是每 1 小时 按事件时间对数据进行分组,并每 10 分钟 触发一次.
- 按 window 分组后,它开始写入 Google 云存储 (GCS)。
public class WriteAvroFilesTr extends PTransform<PCollection<AvroGenericRecord>, WriteFilesResult<AvroDestination>> {
private String baseDir;
private int numberOfShards;
public WriteAvroFilesTr(String baseDir, int numberOfShards) {
this.baseDir = baseDir;
this.numberOfShards = numberOfShards;
}
@Override
public WriteFilesResult<AvroDestination> expand(PCollection<AvroGenericRecord> input) {
ResourceId tempDir = getTempDir(baseDir);
return input.apply(AvroIO.<AvroGenericRecord>writeCustomTypeToGenericRecords()
.withTempDirectory(tempDir)
.withWindowedWrites()
.withNumShards(numberOfShards)
.to(new DynamicAvroGenericRecordDestinations(baseDir, Constants.FILE_EXTENSION))
);
}
private ResourceId getTempDir(String baseDir) {
return FileSystems.matchNewResource(baseDir + "/temp", true);
}
}
和
public class DynamicAvroGenericRecordDestinations extends DynamicAvroDestinations<AvroGenericRecord, AvroDestination, GenericRecord> {
private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
private final String baseDir;
private final String fileExtension;
public DynamicAvroGenericRecordDestinations(String baseDir, String fileExtension) {
this.baseDir = baseDir;
this.fileExtension = fileExtension;
}
@Override
public Schema getSchema(AvroDestination destination) {
return new Schema.Parser().parse(destination.jsonSchema);
}
@Override
public GenericRecord formatRecord(AvroGenericRecord record) {
return record.getRecord();
}
@Override
public AvroDestination getDestination(AvroGenericRecord record) {
Schema schema = record.getRecord().getSchema();
return AvroDestination.of(record.getName(), record.getDate(), record.getVersionId(), schema.toString());
}
@Override
public AvroDestination getDefaultDestination() {
return new AvroDestination();
}
@Override
public FileBasedSink.FilenamePolicy getFilenamePolicy(AvroDestination destination) {
String pathStr = baseDir + "/" + destination.name + "/" + destination.date + "/" + destination.name;
return new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(pathStr), destination.version, fileExtension);
}
private static class WindowedFilenamePolicy extends FileBasedSink.FilenamePolicy {
final ResourceId outputFilePrefix;
final String fileExtension;
final Integer version;
WindowedFilenamePolicy(ResourceId outputFilePrefix, Integer version, String fileExtension) {
this.outputFilePrefix = outputFilePrefix;
this.version = version;
this.fileExtension = fileExtension;
}
@Override
public ResourceId windowedFilename(
int shardNumber,
int numShards,
BoundedWindow window,
PaneInfo paneInfo,
FileBasedSink.OutputFileHints outputFileHints) {
IntervalWindow intervalWindow = (IntervalWindow) window;
String filenamePrefix =
outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");
String filename =
String.format("%s-%s(%s-%s)-(%s-of-%s)%s", filenamePrefix,
version,
formatter.print(intervalWindow.start()),
formatter.print(intervalWindow.end()),
shardNumber,
numShards - 1,
fileExtension);
ResourceId result = outputFilePrefix.getCurrentDirectory();
return result.resolve(filename, RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Expecting windowed outputs only");
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(
DisplayData.item("fileNamePrefix", outputFilePrefix.toString())
.withLabel("File Name Prefix"));
}
}
}
我已经写下了整个管道。它工作得很好,但我误解了(不确定)我按事件时间处理事件。
有人可以查看我的代码(尤其是我阅读并按 windows 分组的 1 和 2 步)是否 windows 按事件时间?
P.S. 对于 Kafka 中的每条记录,我在里面都有时间戳字段。
UPD
谢谢jjayadeep
我在 KafkaIO 自定义 TimestampPolicy 中包含
static class CustomTimestampPolicy extends TimestampPolicy<String, AvroGenericRecord> {
protected Instant currentWatermark;
CustomTimestampPolicy(Optional<Instant> previousWatermark) {
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, AvroGenericRecord> record) {
currentWatermark = Instant.ofEpochMilli(record.getKV().getValue().getTimestamp());
return currentWatermark;
}
@Override
public Instant getWatermark(PartitionContext ctx) {
return currentWatermark;
}
}
来自此处的文档 [1] KafkaIO 默认使用事件时间作为处理时间
By default, record timestamp (event time) is set to processing time in KafkaIO reader and source watermark is current wall time. If a topic has Kafka server-side ingestion timestamp enabled ('LogAppendTime'), it can enabled with KafkaIO.Read.withLogAppendTime(). A custom timestamp policy can be provided by implementing TimestampPolicyFactory. See KafkaIO.Read.withTimestampPolicyFactory(TimestampPolicyFactory) for more information.
处理时间也是默认的时间戳方法,如下所述
// set event times and watermark based on LogAppendTime. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
1 - https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.html