解码器中的 Apache beam IOException

Apache beam IOException in decoder

我有一个简单的管道,它通过 KafkaIO reader 从 Kafka 读取数据并将 next 转换为管道。最后,它以 avro 格式写入 GCP。因此,当我 运行 DataFlow 中的管道时,它工作得很好,但是当 运行ner 是 DirectRunner 时,它会从主题中读取所有数据并抛出异常。

java.lang.IllegalArgumentException: Forbidden IOException when reading from InputStream
    at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
    at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
    at org.apache.beam.runners.direct.CloningBundleFactory$CloningBundle.add(CloningBundleFactory.java:84)
    at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$OutputWindowedValueToBundle.outputWindowedValue(GroupAlsoByWindowEvaluatorFactory.java:251)
    at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$OutputWindowedValueToBundle.outputWindowedValue(GroupAlsoByWindowEvaluatorFactory.java:237)
    at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.lambda$onTrigger(ReduceFnRunner.java:1057)
    at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
    at org.apache.beam.repackaged.direct_java.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
    at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
    at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
    at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$GroupAlsoByWindowEvaluator.processElement(GroupAlsoByWindowEvaluatorFactory.java:185)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
    at org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:73)
    at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:136)
    at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60)
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
    at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
    ... 19 more

我使用自定义序列化器和反序列化器来读取 avro 和获取 paylod。

卡夫卡Reader

  private PTransform<PBegin, PCollection<KV<String, AvroGenericRecord>>> createKafkaRead(Map<String, Object> configUpdates) {
        return KafkaIO.<String, AvroGenericRecord>read()
                .withBootstrapServers(bootstrapServers)
                .withConsumerConfigUpdates(configUpdates)
                .withTopics(inputTopics)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializerAndCoder(BeamKafkaAvroGenericDeserializer.class, AvroGenericCoder.of(serDeConfig()))
                .withMaxNumRecords(maxNumRecords)
                .commitOffsetsInFinalize()
                .withoutMetadata();
    }

AvroGenericCoder

public class AvroGenericCoder extends CustomCoder<AvroGenericRecord> {
    private final Map<String, Object> config;
    private transient BeamKafkaAvroGenericDeserializer deserializer;
    private transient BeamKafkaAvroGenericSerializer serializer;

    public static AvroGenericCoder of(Map<String, Object> config) {
        return new AvroGenericCoder(config);
    }

    protected AvroGenericCoder(Map<String, Object> config) {
        this.config = config;
    }

    private BeamKafkaAvroGenericDeserializer getDeserializer() {
        if (deserializer == null) {
            BeamKafkaAvroGenericDeserializer d = new BeamKafkaAvroGenericDeserializer();
            d.configure(config, false);
            deserializer = d;
        }
        return deserializer;
    }

    private BeamKafkaAvroGenericSerializer getSerializer() {
        if (serializer == null) {
            serializer = new BeamKafkaAvroGenericSerializer();
        }
        return serializer;
    }

    @Override
    public void encode(AvroGenericRecord record, OutputStream outStream) {
        getSerializer().serialize(record, outStream);
    }

    @Override
    public AvroGenericRecord decode(InputStream inStream) {
        try {
            return getDeserializer().deserialize(null, IOUtils.toByteArray(inStream));
        } catch (IOException e) {
            throw new RuntimeException("Error translating into bytes ", e);
        }
    }

    @Override
    public void verifyDeterministic() {
    }

    @Override
    public Object structuralValue(AvroGenericRecord value) {
        return super.structuralValue(value);
    }

    @Override
    public int hashCode() {
        return HashCodeBuilder.reflectionHashCode(this);
    }

    @Override
    public boolean equals(Object obj) {
        return EqualsBuilder.reflectionEquals(this, obj);
    }
}

这是主管道

PCollection<AvroGenericRecord> records = p.apply(readKafkaTr)
                .apply(Window.<AvroGenericRecord>into(FixedWindows.of(Duration.standardMinutes(options.getWindowInMinutes())))
                        .triggering(AfterWatermark.pastEndOfWindow()
                                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                                        .plusDelayOf(Duration.standardMinutes(options.getWindowInMinutes())))
                                .withLateFirings(AfterPane.elementCountAtLeast(options.getElementsCountToWaitAfterWatermark())))
                        .withAllowedLateness(Duration.standardDays(options.getAfterWatermarkInDays()))
                        .discardingFiredPanes()
                );

        records.apply(Filter.by((ProcessFunction<AvroGenericRecord, Boolean>) Objects::nonNull))
                .apply(new WriteAvroFilesTr(options.getBasePath(), options.getNumberOfShards()));

是的,我认为@RyanSkraba 是对的 - DirectRunner 做了很多不是所有其他跑步者都做的事情(因为 DirectRunner 的最初目标是用于测试,所以与其他跑步者相比,它执行许多额外的检查)。

顺便说一句,为什么在这种情况下不使用 Beam AvroCoder?如何将它与 KafkaIO 一起使用的简单示例: https://github.com/aromanenko-dev/beam-issues/blob/master/kafka-io/src/main/java/KafkaAvro.java