Apache Beam 拆分为多个管道输出
Apache Beam Split to Multiple Pipeline Output
我正在订阅一个主题,它包含不同的事件类型,它们以不同的属性传入。
读取元素后,根据它们的属性,我需要将它们移动到不同的地方。这是示例代码:
Options options =PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(
"ReadType1",
EventIO.<T>readJsons()
.of(T.class)
.withPubsubTimestampAttributeName(null)
.withOptions(options))
.apply(
Filter.by(
new SerializableFunction<T, Boolean>() {
@Override
public Boolean apply(T input) {
return input.attributes.get("type").equals("type1");
}
}))
.apply(
"WindowMetrics",
Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
.apply("AsJsons", AsJsons.of(T.class))
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
new WindowedFilenamePolicy(
options.getRunOutputDirectory(),
options.getUseCurrentDateForOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(
NestedValueProvider.of(
options.getTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
pipeline.apply("ReadType2",
EventIO.<T>readJsons().of(T.class)
.withPubsubTimestampAttributeName(null)
.withOptions(options))
.apply(Filter.by(new SerializableFunction<T, Boolean>() {
@Override
public Boolean apply(Event input) {
return input.attributes.get("type").equals("type2");
}
})).apply( "WindowMetrics",
Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
.apply("AsJsons", AsJsons.of(T.class))
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
new WindowedFilenamePolicy(
options.getBatchOutputDirectory(),
options.getUseCurrentDateForOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(
NestedValueProvider.of(
options.getTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
pipeline.apply("ReadType3",
EventIO.<Event>readJsons().of(T.class)
.withPubsubTimestampAttributeName(null)
.withOptions(options))
.apply(Filter.by(new SerializableFunction<T, Boolean>() {
@Override
public Boolean apply(T input) {
return input.attributes.get("type").equals("type3");
}
})).apply( "WindowMetrics",
Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
.apply("AsJsons", AsJsons.of(T.class))
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
new WindowedFilenamePolicy(
options.getCustomIntervalOutputDirectory(),
options.getUseCurrentDateForOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(
NestedValueProvider.of(
options.getTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
pipeline.run();
基本上我读取一个事件并根据它们的属性过滤它们并写入文件。作业在数据流中失败,因为 Workflow failed. Causes: The pubsub configuration contains errors: Subscription 'sub-name' is consumed by multiple stages, this will result in undefined behavior.
那么在同一作业中拆分管道的合适方法是什么?
我尝试了 Pipeline1、Pipeline2、Pipeline3,但最终需要多个作业名称到 运行 多个管道,我不确定这样做是否正确。
答案应该是在Beam中使用Partition。
https://beam.apache.org/documentation/transforms/java/elementwise/partition/
同一个订阅的两个 EventIO
转换是错误的原因。您需要消除其中一个转换才能使其工作。这可以通过将订阅消费到单个 PCollection
中,然后分别将两个过滤分支应用于该集合来完成。这是一个部分示例:
// single PCollection of the events consumed from the subscription
PCollection<T> events = pipeline
.apply("Read Events",
EventIO.<T>readJsons()
.of(T.class)
.withPubsubTimestampAttributeName(null)
.withOptions(options));
// PCollection of type1 events
PCollection<T> typeOneEvents = events.apply(
Filter.by(
new SerializableFunction<T, Boolean>() {
@Override
public Boolean apply(T input) {
return input.attributes.get("type").equals("type1");
}}));
// TODO typeOneEvents.apply("WindowMetrics / AsJsons / Write File(s)")
// PCollection of type2 events
PCollection<T> typeTwoEvents = events.apply(
Filter.by(
new SerializableFunction<T, Boolean>() {
@Override
public Boolean apply(T input) {
return input.attributes.get("type").equals("type2");
}}));
// TODO typeTwoEvents.apply("WindowMetrics / AsJsons / Write File(s)")
另一种可能性是使用 Apache Beam 提供的一些其他转换。这样做可能会稍微简化您的解决方案。一旦这样的变换就是Partition。分区允许基于分区函数将单个 PCollection 拆分为固定数量的 PCollection。使用分区的部分示例是:
// single PCollection of the events consumed from the subscription
PCollectionList<T> eventsByType = pipeline
.apply("Read Events",
EventIO.<T>readJsons()
.of(T.class)
.withPubsubTimestampAttributeName(null)
.withOptions(options))
.apply("Partition By Type",
Partition.of(2, new PartitionFn<T>() {
public int partitionFor(T event, int numPartitions) {
return input.attributes.get("type").equals("type1") ? 0 : 1;
}}));
PCollection<T> typeOneEvents = eventsByType.get(0);
// TODO typeOneEvents.apply("WindowMetrics / AsJsons / Write File(s)")
PCollection<T> typeTwoEvents = eventsByType.get(1);
// TODO typeTwoEvents.apply("WindowMetrics / AsJsons / Write File(s)")
我正在订阅一个主题,它包含不同的事件类型,它们以不同的属性传入。
读取元素后,根据它们的属性,我需要将它们移动到不同的地方。这是示例代码:
Options options =PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(
"ReadType1",
EventIO.<T>readJsons()
.of(T.class)
.withPubsubTimestampAttributeName(null)
.withOptions(options))
.apply(
Filter.by(
new SerializableFunction<T, Boolean>() {
@Override
public Boolean apply(T input) {
return input.attributes.get("type").equals("type1");
}
}))
.apply(
"WindowMetrics",
Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
.apply("AsJsons", AsJsons.of(T.class))
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
new WindowedFilenamePolicy(
options.getRunOutputDirectory(),
options.getUseCurrentDateForOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(
NestedValueProvider.of(
options.getTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
pipeline.apply("ReadType2",
EventIO.<T>readJsons().of(T.class)
.withPubsubTimestampAttributeName(null)
.withOptions(options))
.apply(Filter.by(new SerializableFunction<T, Boolean>() {
@Override
public Boolean apply(Event input) {
return input.attributes.get("type").equals("type2");
}
})).apply( "WindowMetrics",
Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
.apply("AsJsons", AsJsons.of(T.class))
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
new WindowedFilenamePolicy(
options.getBatchOutputDirectory(),
options.getUseCurrentDateForOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(
NestedValueProvider.of(
options.getTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
pipeline.apply("ReadType3",
EventIO.<Event>readJsons().of(T.class)
.withPubsubTimestampAttributeName(null)
.withOptions(options))
.apply(Filter.by(new SerializableFunction<T, Boolean>() {
@Override
public Boolean apply(T input) {
return input.attributes.get("type").equals("type3");
}
})).apply( "WindowMetrics",
Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
.apply("AsJsons", AsJsons.of(T.class))
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
new WindowedFilenamePolicy(
options.getCustomIntervalOutputDirectory(),
options.getUseCurrentDateForOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(
NestedValueProvider.of(
options.getTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
pipeline.run();
基本上我读取一个事件并根据它们的属性过滤它们并写入文件。作业在数据流中失败,因为 Workflow failed. Causes: The pubsub configuration contains errors: Subscription 'sub-name' is consumed by multiple stages, this will result in undefined behavior.
那么在同一作业中拆分管道的合适方法是什么?
我尝试了 Pipeline1、Pipeline2、Pipeline3,但最终需要多个作业名称到 运行 多个管道,我不确定这样做是否正确。
答案应该是在Beam中使用Partition。
https://beam.apache.org/documentation/transforms/java/elementwise/partition/
同一个订阅的两个 EventIO
转换是错误的原因。您需要消除其中一个转换才能使其工作。这可以通过将订阅消费到单个 PCollection
中,然后分别将两个过滤分支应用于该集合来完成。这是一个部分示例:
// single PCollection of the events consumed from the subscription
PCollection<T> events = pipeline
.apply("Read Events",
EventIO.<T>readJsons()
.of(T.class)
.withPubsubTimestampAttributeName(null)
.withOptions(options));
// PCollection of type1 events
PCollection<T> typeOneEvents = events.apply(
Filter.by(
new SerializableFunction<T, Boolean>() {
@Override
public Boolean apply(T input) {
return input.attributes.get("type").equals("type1");
}}));
// TODO typeOneEvents.apply("WindowMetrics / AsJsons / Write File(s)")
// PCollection of type2 events
PCollection<T> typeTwoEvents = events.apply(
Filter.by(
new SerializableFunction<T, Boolean>() {
@Override
public Boolean apply(T input) {
return input.attributes.get("type").equals("type2");
}}));
// TODO typeTwoEvents.apply("WindowMetrics / AsJsons / Write File(s)")
另一种可能性是使用 Apache Beam 提供的一些其他转换。这样做可能会稍微简化您的解决方案。一旦这样的变换就是Partition。分区允许基于分区函数将单个 PCollection 拆分为固定数量的 PCollection。使用分区的部分示例是:
// single PCollection of the events consumed from the subscription
PCollectionList<T> eventsByType = pipeline
.apply("Read Events",
EventIO.<T>readJsons()
.of(T.class)
.withPubsubTimestampAttributeName(null)
.withOptions(options))
.apply("Partition By Type",
Partition.of(2, new PartitionFn<T>() {
public int partitionFor(T event, int numPartitions) {
return input.attributes.get("type").equals("type1") ? 0 : 1;
}}));
PCollection<T> typeOneEvents = eventsByType.get(0);
// TODO typeOneEvents.apply("WindowMetrics / AsJsons / Write File(s)")
PCollection<T> typeTwoEvents = eventsByType.get(1);
// TODO typeTwoEvents.apply("WindowMetrics / AsJsons / Write File(s)")