Apache Beam - 无法在具有多个输出标签的 DoFn 上推断编码器

Apache Beam - Unable to infer a Coder on a DoFn with multiple output tags

我正在尝试使用 Apache Beam 执行管道,但在尝试放置一些输出标签时出现错误:

import com.google.cloud.Tuple;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;

import java.lang.reflect.Type;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * The Transformer.
 */
class Transformer {
    final static TupleTag<Map<String, String>> successfulTransformation = new TupleTag<>();
    final static TupleTag<Tuple<String, String>> failedTransformation = new TupleTag<>();

    /**
     * The entry point of the application.
     *
     * @param args the input arguments
     */
    public static void main(String... args) {
        TransformerOptions options = PipelineOptionsFactory.fromArgs(args)
                .withValidation()
                .as(TransformerOptions.class);

        Pipeline p = Pipeline.create(options);

        p.apply("Input", PubsubIO
                .readMessagesWithAttributes()
                .withIdAttribute("id")
                .fromTopic(options.getTopicName()))
                .apply(Window.<PubsubMessage>into(FixedWindows
                        .of(Duration.standardSeconds(60))))
                .apply("Transform",
                        ParDo.of(new JsonTransformer())
                                .withOutputTags(successfulTransformation,
                                        TupleTagList.of(failedTransformation)));

        p.run().waitUntilFinish();
    }

    /**
     * Deserialize the input and convert it to a key-value pairs map.
     */
    static class JsonTransformer extends DoFn<PubsubMessage, Map<String, String>> {

        /**
         * Process each element.
         *
         * @param c the processing context
         */
        @ProcessElement
        public void processElement(ProcessContext c) {
            String messagePayload = new String(c.element().getPayload());
            try {
                Type type = new TypeToken<Map<String, String>>() {
                }.getType();
                Gson gson = new Gson();
                Map<String, String> map = gson.fromJson(messagePayload, type);
                c.output(map);
            } catch (Exception e) {
                LOG.error("Failed to process input {} -- adding to dead letter file", c.element(), e);
                String attributes = c.element()
                        .getAttributeMap()
                        .entrySet().stream().map((entry) ->
                                String.format("%s -> %s\n", entry.getKey(), entry.getValue()))
                        .collect(Collectors.joining());
                c.output(failedTransformation, Tuple.of(attributes, messagePayload));
            }

        }
    }
}

显示的错误是:

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Transform.out1 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V. Building a Coder using a registered CoderProvider failed. See suppressed exceptions for detailed failures. Using the default output Coder from the producing PTransform failed: Unable to provide a Coder for V. Building a Coder using a registered CoderProvider failed.

我尝试了不同的方法来解决这个问题,但我想我就是不明白问题出在哪里。我知道这些行会导致错误发生:

.withOutputTags(successfulTransformation,TupleTagList.of(failedTransformation))

但我不知道它的哪一部分,哪一部分需要特定的编码器以及错误中的 "V" 是什么(来自 "Unable to provide a Coder for V")。

为什么会出现错误?我还尝试查看 Apache Beam 的文档,但它们似乎没有解释这种用法,我也没有从讨论编码器的部分中了解很多。

谢谢

首先,我建议如下 -- 更改:

final static TupleTag<Map<String, String>> successfulTransformation = 
    new TupleTag<>();
final static TupleTag<Tuple<String, String>> failedTransformation = 
    new TupleTag<>();

进入这个:

final static TupleTag<Map<String, String>> successfulTransformation = 
    new TupleTag<Map<String, String>>() {};
final static TupleTag<Tuple<String, String>> failedTransformation = 
    new TupleTag<Tuple<String, String>>() {};

这应该有助于编码器推断确定侧输出的类型。此外,您是否为 Tuple 正确注册了 CoderProvider

感谢@Ben Chambers 的回答,Kotlin 是:

val successTag = object : TupleTag<MyObj>() {}
val deadLetterTag = object : TupleTag<String>() {}