如何在加入 apache beam 后从 PCollection<Row> 中提取信息?

How to extract information from PCollection<Row> after a join in apache beam?

我有两个示例数据流,我对其执行 innerJoin。我想扩展这段示例连接代码并在连接发生后添加一些逻辑

public class JoinExample {

  public static void main(String[] args) {
    final Pipeline pipeline = Pipeline.create(pipelineOpts);

    PCollection<Row> adStream =
        pipeline
            .apply(From.source("kafka.adStream"))
            .apply(Select.fieldNames("ad.id", "ad.name"))
            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))));

    PCollection<Row> clickStream =
        pipeline
            .apply(From.source("kafka.clickStream"))
            .apply(Select.fieldNames("ad.id", "numClicks"))
            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))));

    adStream
        .apply(Join.<Row, Row>innerJoin(clickStream).using("id"))
        .apply(ConsoleOutput.of(Row::toString)); // Instead of this output, I would like to just print the ad name and num clicks after the join

    pipeline.run();
  }

我想在加入后使用这样的 DoFcn 打印广告名称和点击次数:

 adStream
    .apply(Join.<Row, Row>innerJoin(clickStream).using("id"))
    .apply(ParDo.of(new DoFcn(PCollection<Row>, int>() {

      public void processElement(ProcessContext c) {
        // Since there are two rows after the join, how can I get info from each row?
        // Example in:
        //    ad.id = 1, ad.name = test
        //    ad.id = 1, numClicks = 1000
        
        // After join
        // Row: [Row:[1, test], Row:[1, 1000]]
        
        // I tried this statement but it is incorrect
        Row one = c.element.getRow(0);  // This API is not available
      }
     } 

关于如何从连接的数据中提取此信息的任何想法?

如您所知,Schema Join 方法模拟 SQL 联接,其中联接的结果是联接的 PCollections 中的行的串联。为了查看哪些行进入内部联接,您必须使用 CoGroup 实用程序来联接 PCollections。这个 returns 一个 Row 对象,每个 PCollection 都有单独的可迭代对象,其中包含与键匹配的 Rows。示例:


import org.apache.beam.sdk.schemas.transforms.CoGroup;
import org.apache.beam.sdk.values.PCollectionTuple;

public class JoinExample {

  public static void main(String[] args) {
    final Pipeline pipeline = Pipeline.create(pipelineOpts);

    PCollection<Row> adStream =
        pipeline
            .apply(From.source("kafka.adStream"))
            .apply(Select.fieldNames("ad.id", "ad.name"))
            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))));

    PCollection<Row> clickStream =
        pipeline
            .apply(From.source("kafka.clickStream"))
            .apply(Select.fieldNames("ad.id", "numClicks"))          
            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))));

    // The names given here for the PCollections can be used to retrieve the
    // the rows in the consuming PTransform. See below:
    PCollectionTuple.of("adStream", adStream, "clickStream", clickStream)
      // This selects the common field name in both adStream and clickStream 
      // to join on. See the documentation for ways of joining on
      // different keys.
      .apply(CoGroup.join(By.fieldNames("id")))
      .apply(ParDo.of(new DoFn<Row, int>() {
        public void processElement(ProcessContext c) 

          // Get key.
          String id = c.element.getValue("key").id;

          // Get rows from the adStream and clickStream PCollections that 
          // share the same id.
          Iterable<Row> adStream = c.element.getValue("adStream");
          Iterable<Row> clickStream = c.element.getValue("clickStream");

          return 0;
        }
      }));

     pipeline.run();
  }
}