解码器中的 Apache beam IOException
Apache beam IOException in decoder
我有一个简单的管道,它通过 KafkaIO reader 从 Kafka 读取数据并将 next 转换为管道。最后,它以 avro 格式写入 GCP。因此,当我 运行 DataFlow 中的管道时,它工作得很好,但是当 运行ner 是 DirectRunner 时,它会从主题中读取所有数据并抛出异常。
java.lang.IllegalArgumentException: Forbidden IOException when reading from InputStream
at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
at org.apache.beam.runners.direct.CloningBundleFactory$CloningBundle.add(CloningBundleFactory.java:84)
at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$OutputWindowedValueToBundle.outputWindowedValue(GroupAlsoByWindowEvaluatorFactory.java:251)
at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$OutputWindowedValueToBundle.outputWindowedValue(GroupAlsoByWindowEvaluatorFactory.java:237)
at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.lambda$onTrigger(ReduceFnRunner.java:1057)
at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
at org.apache.beam.repackaged.direct_java.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$GroupAlsoByWindowEvaluator.processElement(GroupAlsoByWindowEvaluatorFactory.java:185)
at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:73)
at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:136)
at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
... 19 more
我使用自定义序列化器和反序列化器来读取 avro 和获取 paylod。
卡夫卡Reader
private PTransform<PBegin, PCollection<KV<String, AvroGenericRecord>>> createKafkaRead(Map<String, Object> configUpdates) {
return KafkaIO.<String, AvroGenericRecord>read()
.withBootstrapServers(bootstrapServers)
.withConsumerConfigUpdates(configUpdates)
.withTopics(inputTopics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(BeamKafkaAvroGenericDeserializer.class, AvroGenericCoder.of(serDeConfig()))
.withMaxNumRecords(maxNumRecords)
.commitOffsetsInFinalize()
.withoutMetadata();
}
AvroGenericCoder
public class AvroGenericCoder extends CustomCoder<AvroGenericRecord> {
private final Map<String, Object> config;
private transient BeamKafkaAvroGenericDeserializer deserializer;
private transient BeamKafkaAvroGenericSerializer serializer;
public static AvroGenericCoder of(Map<String, Object> config) {
return new AvroGenericCoder(config);
}
protected AvroGenericCoder(Map<String, Object> config) {
this.config = config;
}
private BeamKafkaAvroGenericDeserializer getDeserializer() {
if (deserializer == null) {
BeamKafkaAvroGenericDeserializer d = new BeamKafkaAvroGenericDeserializer();
d.configure(config, false);
deserializer = d;
}
return deserializer;
}
private BeamKafkaAvroGenericSerializer getSerializer() {
if (serializer == null) {
serializer = new BeamKafkaAvroGenericSerializer();
}
return serializer;
}
@Override
public void encode(AvroGenericRecord record, OutputStream outStream) {
getSerializer().serialize(record, outStream);
}
@Override
public AvroGenericRecord decode(InputStream inStream) {
try {
return getDeserializer().deserialize(null, IOUtils.toByteArray(inStream));
} catch (IOException e) {
throw new RuntimeException("Error translating into bytes ", e);
}
}
@Override
public void verifyDeterministic() {
}
@Override
public Object structuralValue(AvroGenericRecord value) {
return super.structuralValue(value);
}
@Override
public int hashCode() {
return HashCodeBuilder.reflectionHashCode(this);
}
@Override
public boolean equals(Object obj) {
return EqualsBuilder.reflectionEquals(this, obj);
}
}
这是主管道
PCollection<AvroGenericRecord> records = p.apply(readKafkaTr)
.apply(Window.<AvroGenericRecord>into(FixedWindows.of(Duration.standardMinutes(options.getWindowInMinutes())))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(options.getWindowInMinutes())))
.withLateFirings(AfterPane.elementCountAtLeast(options.getElementsCountToWaitAfterWatermark())))
.withAllowedLateness(Duration.standardDays(options.getAfterWatermarkInDays()))
.discardingFiredPanes()
);
records.apply(Filter.by((ProcessFunction<AvroGenericRecord, Boolean>) Objects::nonNull))
.apply(new WriteAvroFilesTr(options.getBasePath(), options.getNumberOfShards()));
是的,我认为@RyanSkraba 是对的 - DirectRunner 做了很多不是所有其他跑步者都做的事情(因为 DirectRunner 的最初目标是用于测试,所以与其他跑步者相比,它执行许多额外的检查)。
顺便说一句,为什么在这种情况下不使用 Beam AvroCoder
?如何将它与 KafkaIO
一起使用的简单示例:
https://github.com/aromanenko-dev/beam-issues/blob/master/kafka-io/src/main/java/KafkaAvro.java
我有一个简单的管道,它通过 KafkaIO reader 从 Kafka 读取数据并将 next 转换为管道。最后,它以 avro 格式写入 GCP。因此,当我 运行 DataFlow 中的管道时,它工作得很好,但是当 运行ner 是 DirectRunner 时,它会从主题中读取所有数据并抛出异常。
java.lang.IllegalArgumentException: Forbidden IOException when reading from InputStream
at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
at org.apache.beam.runners.direct.CloningBundleFactory$CloningBundle.add(CloningBundleFactory.java:84)
at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$OutputWindowedValueToBundle.outputWindowedValue(GroupAlsoByWindowEvaluatorFactory.java:251)
at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$OutputWindowedValueToBundle.outputWindowedValue(GroupAlsoByWindowEvaluatorFactory.java:237)
at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.lambda$onTrigger(ReduceFnRunner.java:1057)
at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
at org.apache.beam.repackaged.direct_java.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
at org.apache.beam.repackaged.direct_java.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
at org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory$GroupAlsoByWindowEvaluator.processElement(GroupAlsoByWindowEvaluatorFactory.java:185)
at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at org.apache.beam.sdk.util.VarInt.decodeLong(VarInt.java:73)
at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:136)
at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
... 19 more
我使用自定义序列化器和反序列化器来读取 avro 和获取 paylod。
卡夫卡Reader
private PTransform<PBegin, PCollection<KV<String, AvroGenericRecord>>> createKafkaRead(Map<String, Object> configUpdates) {
return KafkaIO.<String, AvroGenericRecord>read()
.withBootstrapServers(bootstrapServers)
.withConsumerConfigUpdates(configUpdates)
.withTopics(inputTopics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(BeamKafkaAvroGenericDeserializer.class, AvroGenericCoder.of(serDeConfig()))
.withMaxNumRecords(maxNumRecords)
.commitOffsetsInFinalize()
.withoutMetadata();
}
AvroGenericCoder
public class AvroGenericCoder extends CustomCoder<AvroGenericRecord> {
private final Map<String, Object> config;
private transient BeamKafkaAvroGenericDeserializer deserializer;
private transient BeamKafkaAvroGenericSerializer serializer;
public static AvroGenericCoder of(Map<String, Object> config) {
return new AvroGenericCoder(config);
}
protected AvroGenericCoder(Map<String, Object> config) {
this.config = config;
}
private BeamKafkaAvroGenericDeserializer getDeserializer() {
if (deserializer == null) {
BeamKafkaAvroGenericDeserializer d = new BeamKafkaAvroGenericDeserializer();
d.configure(config, false);
deserializer = d;
}
return deserializer;
}
private BeamKafkaAvroGenericSerializer getSerializer() {
if (serializer == null) {
serializer = new BeamKafkaAvroGenericSerializer();
}
return serializer;
}
@Override
public void encode(AvroGenericRecord record, OutputStream outStream) {
getSerializer().serialize(record, outStream);
}
@Override
public AvroGenericRecord decode(InputStream inStream) {
try {
return getDeserializer().deserialize(null, IOUtils.toByteArray(inStream));
} catch (IOException e) {
throw new RuntimeException("Error translating into bytes ", e);
}
}
@Override
public void verifyDeterministic() {
}
@Override
public Object structuralValue(AvroGenericRecord value) {
return super.structuralValue(value);
}
@Override
public int hashCode() {
return HashCodeBuilder.reflectionHashCode(this);
}
@Override
public boolean equals(Object obj) {
return EqualsBuilder.reflectionEquals(this, obj);
}
}
这是主管道
PCollection<AvroGenericRecord> records = p.apply(readKafkaTr)
.apply(Window.<AvroGenericRecord>into(FixedWindows.of(Duration.standardMinutes(options.getWindowInMinutes())))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(options.getWindowInMinutes())))
.withLateFirings(AfterPane.elementCountAtLeast(options.getElementsCountToWaitAfterWatermark())))
.withAllowedLateness(Duration.standardDays(options.getAfterWatermarkInDays()))
.discardingFiredPanes()
);
records.apply(Filter.by((ProcessFunction<AvroGenericRecord, Boolean>) Objects::nonNull))
.apply(new WriteAvroFilesTr(options.getBasePath(), options.getNumberOfShards()));
是的,我认为@RyanSkraba 是对的 - DirectRunner 做了很多不是所有其他跑步者都做的事情(因为 DirectRunner 的最初目标是用于测试,所以与其他跑步者相比,它执行许多额外的检查)。
顺便说一句,为什么在这种情况下不使用 Beam AvroCoder
?如何将它与 KafkaIO
一起使用的简单示例:
https://github.com/aromanenko-dev/beam-issues/blob/master/kafka-io/src/main/java/KafkaAvro.java