数据流/光束累加器编码器

Dataflow / Beam Accumulator coder

我正在开发一个数据流管道,它使用 SqlTransform 库以及 org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf 中定义的波束聚合函数。

这里是一段代码:

import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;

public class EtlSqlTransformations extends PTransform<PCollection<Row>, PCollection<Row>> {
    @Override
    public PCollection<Row> expand(PCollection<Row> input) {
        String sql1 = "SELECT campaign_id, format_id, story_id, browser, device, os, " +
                "       COUNTIF(event_type = 'track_click_through') as clickthrough, " +
                "       FROM PCOLLECTION " +
                "       GROUP BY campaign_id, format_id, story_id, browser, device, os";

        PCollection<Row> groupedImpressions = input.apply("groupedImpressions",
                SqlTransform.query(sql1).registerUdaf("COUNTIF", new CountIf.CountIfFn()));
        return groupedImpressions;
    }
}

这在本地测试时工作正常(我也创建了一些测试,工作正常):


        PCollection<Row> results = rowPCollection.apply("SQL TRANSFORM", new EtlSqlTransformations());

        // Expected output
        Row expectedRow = Row.withSchema(EtlSqlTransformations.outputSchema)
                .addValues("1044", ...)
                .build();

        PAssert.that(results).containsInAnyOrder(expectedRow);
        pipeline.run();

问题是当我想使用 Dataflow 在 Google 云中部署时,我有以下输出:

[WARNING] 
java.lang.RuntimeException: java.io.IOException: Could not obtain a Coder for the accumulator
    at org.apache.beam.runners.core.construction.PipelineTranslation.leaveCompositeTransform (PipelineTranslation.java:78)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0 (TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
Caused by: java.io.IOException: Could not obtain a Coder for the accumulator
    at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:207)
    at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179)
    at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438)
    at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248)
    at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
    at org.apache.beam.runners.core.construction.PipelineTranslation.leaveCompositeTransform (PipelineTranslation.java:75)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0 (TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Cannot infer coder for type parameter AccumT
    at org.apache.beam.sdk.coders.CoderRegistry.getCoder (CoderRegistry.java:328)
    at org.apache.beam.sdk.transforms.CombineFnBase$AbstractGlobalCombineFn.getAccumulatorCoder (CombineFnBase.java:119)
    at org.apache.beam.sdk.transforms.Combine$CombineFn.getAccumulatorCoder (Combine.java:391)
    at org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter$WrappedCombinerBase.getAccumulatorCoder (AggregationCombineFnAdapter.java:75)
    at org.apache.beam.sdk.transforms.CombineFns$ComposedCombineFn.getAccumulatorCoder (CombineFns.java:430)
    at org.apache.beam.sdk.schemas.transforms.SchemaAggregateFn$Inner.getAccumulatorCoder (SchemaAggregateFn.java:335)
    at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:204)
    at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179)
    at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438)
    at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248)
    at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
    at org.apache.beam.runners.core.construction.PipelineTranslation.leaveCompositeTransform (PipelineTranslation.java:75)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0 (TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  57.006 s
[INFO] Finished at: 2021-10-25T13:54:23Z
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project connected-stories-analytics-logger-store: An exception occured while executing the Java class. java.io.IOException: Could not obtain a Coder for the accumulator: Cannot infer coder for type parameter AccumT -> [Help 1]

我搜索了这个问题,但一无所获,只是一些类似的情况,他们开发了自己的 Agg 函数,需要定义 CODER。 在这种情况下,我不知道在哪里可以找到编码器,或者我是否必须自己创建它,因为我正在使用 Beam 函数 (org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf)。

所以问题是:

  1. 有没有办法使用这个beam函数在Dataflow中添加需要运行的编码器?
  2. 如果我需要更改功能并创建一个新功能以及 CODER,我该怎么做?

谢谢

好的,我通过自己实现 COUTIF 解决了这个问题。

    public static class CountConditional extends Combine.CombineFn<Boolean, Long, Long> {
    @Override
    public Long createAccumulator() {
        return Long.valueOf(0);
    }

    @Override
    public Long addInput(Long accumulator, Boolean input) {
        if (input) {
            ++accumulator;
        }
        return accumulator;
    }

    @Override
    public Long mergeAccumulators(Iterable<Long> accumulators) {
        Long v = Long.valueOf(0);
        Iterator<Long> ite = accumulators.iterator();
        while (ite.hasNext()) {
            v += ite.next();
        }
        return v;
    }

    @Override
    public Long extractOutput(Long accumulator) {
        return accumulator;
    }
}

我将它放在使用它的同一个 PTransform 中,因此我不必与编码人员打交道。