注册编码器不适用于数据流

Registered Coder does not work on Dataflow

使用 Apache Beam SDK,注册编码器不起作用。

我想将 SimpleFunction 与 BigQuery 的 TableSchema 一起使用,但它需要序列化。 我将 TableSchemaCoder 添加到 CodeRegistry 但它似乎没有被使用。

我该如何解决?

// Coder

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;

public class TableSchemaCoder extends AtomicCoder<TableSchema> {
    public static class FieldSchema {
        private String name;
        private String type;
        private String mode;

        public FieldSchema(String name, String type, String mode) {
            this.name = name;
            this.type = type;
            this.mode = mode;
        }

        /* setter / getter */
    }

    private final StringUtf8Coder stringCoder = StringUtf8Coder.of();

    @Override
    public TableSchema decode(InputStream inStream) throws IOException {
        return new SchemaBuilder().build(stringCoder.decode(inStream));
    }

    @Override
    public void encode(TableSchema value, OutputStream outStream) throws IOException {
        List<JSONObject> fields = new ArrayList<>();
        for (TableFieldSchema s : value.getFields()) {
            fields.add(new JSONObject(new FieldSchema(s.getName(), s.getType(), s.getMode())));
        }
        String json = new JSONArray(fields).toString();
        stringCoder.encode(json, outStream);
    }
}



// Pipeline

// ...

CodeRegistry cr = pipeline.getCodeRegistry
cr.registerCoderForClass(TableSchema.class, TableSchemaCoder())

// ...

TableSchema schema = getSchema()
pipeline.apply(MapElements.via(RowParser(schema)))

错误信息:

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.transforms.MapElements@7ac2e39b
        at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
        at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
        at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
        at org.apache.beam.sdk.transforms.MapElements.expand(MapElements.java:118)
        at org.apache.beam.sdk.transforms.MapElements.expand(MapElements.java:30)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
        at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)

Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.model.TableSchema
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
        ... 9 more

您没有共享 RowParser 的代码,但我猜它有一个 TableSchema 作为字段。 Coder 仅用于在您的管道中编码数据。 RowParser 等函数必须使用 Java 不使用注册编码器的序列化。

根据您生成 table 架构的方式,您有几种选择:

  1. 让 RowParser 将其存储为字符串或某些其他可序列化格式。它可以有一个用于实际 TableSchema 对象的瞬态字段,并从可序列化格式初始化该字段(如果它为空)。

  2. 实施 Java 序列化挂钩以序列化 RowParser,避免序列化 TableSchema。这可能与上述类似。

  3. 第一次使用 RowParser 时计算架构。