Apache Beam 拆分为多个管道输出

Apache Beam Split to Multiple Pipeline Output

我正在订阅一个主题,它包含不同的事件类型,它们以不同的属性传入。

读取元素后,根据它们的属性,我需要将它们移动到不同的地方。这是示例代码:

    Options options =PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);
    pipeline
    .apply(
        "ReadType1",
        EventIO.<T>readJsons()
            .of(T.class)
            .withPubsubTimestampAttributeName(null)
            .withOptions(options))
    .apply(
        Filter.by(
            new SerializableFunction<T, Boolean>() {
              @Override
              public Boolean apply(T input) {
                return input.attributes.get("type").equals("type1");
              }
            }))
    .apply(
        "WindowMetrics",
        Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
    .apply("AsJsons", AsJsons.of(T.class))
    .apply(
        "Write File(s)",
        TextIO.write()
            .withWindowedWrites()
            .withNumShards(options.getNumShards())
            .to(
                new WindowedFilenamePolicy(
                    options.getRunOutputDirectory(),
                    options.getUseCurrentDateForOutputDirectory(),
                    options.getOutputFilenamePrefix(),
                    options.getOutputShardTemplate(),
                    options.getOutputFilenameSuffix()))
            .withTempDirectory(
                NestedValueProvider.of(
                    options.getTempDirectory(),
                    (SerializableFunction<String, ResourceId>)
                        input -> FileBasedSink.convertToFileResourceIfPossible(input))));


pipeline.apply("ReadType2",
        EventIO.<T>readJsons().of(T.class)
                .withPubsubTimestampAttributeName(null)
                .withOptions(options))
        .apply(Filter.by(new SerializableFunction<T, Boolean>() {
          @Override
          public Boolean apply(Event input) {
            return input.attributes.get("type").equals("type2");
          }
        })).apply( "WindowMetrics",
        Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
        .apply("AsJsons", AsJsons.of(T.class))
        .apply(
                "Write File(s)",
                TextIO.write()
                        .withWindowedWrites()
                        .withNumShards(options.getNumShards())
                        .to(
                                new WindowedFilenamePolicy(
                                        options.getBatchOutputDirectory(),
                                        options.getUseCurrentDateForOutputDirectory(),
                                        options.getOutputFilenamePrefix(),
                                        options.getOutputShardTemplate(),
                                        options.getOutputFilenameSuffix()))
                        .withTempDirectory(
                                NestedValueProvider.of(
                                        options.getTempDirectory(),
                                        (SerializableFunction<String, ResourceId>)
                                                input -> FileBasedSink.convertToFileResourceIfPossible(input))));

pipeline.apply("ReadType3",
        EventIO.<Event>readJsons().of(T.class)
                .withPubsubTimestampAttributeName(null)
                .withOptions(options))
        .apply(Filter.by(new SerializableFunction<T, Boolean>() {
          @Override
          public Boolean apply(T input) {
            return input.attributes.get("type").equals("type3");
          }
        })).apply( "WindowMetrics",
        Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
        .apply("AsJsons", AsJsons.of(T.class))
        .apply(
                "Write File(s)",
                TextIO.write()
                        .withWindowedWrites()
                        .withNumShards(options.getNumShards())
                        .to(
                                new WindowedFilenamePolicy(
                                        options.getCustomIntervalOutputDirectory(),
                                        options.getUseCurrentDateForOutputDirectory(),
                                        options.getOutputFilenamePrefix(),
                                        options.getOutputShardTemplate(),
                                        options.getOutputFilenameSuffix()))
                        .withTempDirectory(
                                NestedValueProvider.of(
                                        options.getTempDirectory(),
                                        (SerializableFunction<String, ResourceId>)
                                                input -> FileBasedSink.convertToFileResourceIfPossible(input))));

pipeline.run();

基本上我读取一个事件并根据它们的属性过滤它们并写入文件。作业在数据流中失败,因为 Workflow failed. Causes: The pubsub configuration contains errors: Subscription 'sub-name' is consumed by multiple stages, this will result in undefined behavior.

那么在同一作业中拆分管道的合适方法是什么?

我尝试了 Pipeline1、Pipeline2、Pipeline3,但最终需要多个作业名称到 运行 多个管道,我不确定这样做是否正确。

答案应该是在Beam中使用Partition。

https://beam.apache.org/documentation/transforms/java/elementwise/partition/

同一个订阅的两个 EventIO 转换是错误的原因。您需要消除其中一个转换才能使其工作。这可以通过将订阅消费到单个 PCollection 中,然后分别将两个过滤分支应用于该集合来完成。这是一个部分示例:

// single PCollection of the events consumed from the subscription
PCollection<T> events = pipeline
  .apply("Read Events",
    EventIO.<T>readJsons()
      .of(T.class)
      .withPubsubTimestampAttributeName(null)
      .withOptions(options));

// PCollection of type1 events
PCollection<T> typeOneEvents = events.apply(
  Filter.by(
    new SerializableFunction<T, Boolean>() {
      @Override
      public Boolean apply(T input) {
        return input.attributes.get("type").equals("type1");
      }}));
// TODO typeOneEvents.apply("WindowMetrics / AsJsons / Write File(s)")

// PCollection of type2 events
PCollection<T> typeTwoEvents = events.apply(
  Filter.by(
    new SerializableFunction<T, Boolean>() {
      @Override
      public Boolean apply(T input) {
        return input.attributes.get("type").equals("type2");
      }}));
// TODO typeTwoEvents.apply("WindowMetrics / AsJsons / Write File(s)")

另一种可能性是使用 Apache Beam 提供的一些其他转换。这样做可能会稍微简化您的解决方案。一旦这样的变换就是Partition。分区允许基于分区函数将单个 PCollection 拆分为固定数量的 PCollection。使用分区的部分示例是:

// single PCollection of the events consumed from the subscription
PCollectionList<T> eventsByType = pipeline
  .apply("Read Events",
    EventIO.<T>readJsons()
      .of(T.class)
      .withPubsubTimestampAttributeName(null)
      .withOptions(options))
  .apply("Partition By Type",
    Partition.of(2, new PartitionFn<T>() {
      public int partitionFor(T event, int numPartitions) {
        return input.attributes.get("type").equals("type1") ? 0 : 1;
      }}));

PCollection<T> typeOneEvents = eventsByType.get(0);
// TODO typeOneEvents.apply("WindowMetrics / AsJsons / Write File(s)")

PCollection<T> typeTwoEvents = eventsByType.get(1);
// TODO typeTwoEvents.apply("WindowMetrics / AsJsons / Write File(s)")