覆盖数据流中的 AvroIO 默认编码器
Override AvroIO default Coder in Dataflow
我正在尝试使用自定义 Coder 来处理 Dataflow 中的数据。我所做的如下:
- 已将数据从 BigQuery 导出到 avro 文件
- 使用 avro-tools-1.7 从这些文件中的模式自动生成 class。7.jar
- 使用 Kryo
为 class 编写了自定义编码器
- 将 class 注释为
@DefaultCoder(MyCustomCoder.class)
- 使用
p.getCoderRegistry().registerCoder(MyCustomClass.class, MyCustomCoder.class);
注册了我的编码器
- 使用
PCollection<MyCustomClass> pc = p.apply(AvroIO.Read.named("Name").from("gs://bucket/path/to/*.avro").withSchema(MyCustomClass.class));
从 avro 文件读取数据
问题是,如果我的 Coder 中有错误,我的工作只会在洗牌步骤中失败。看起来 Dataflow 没有使用我的自定义 Coder 从 avro 文件加载数据。
真的是这样吗?如果是这样,有没有办法覆盖用于加载数据的编码器?
AvroIO 目前总是使用 built-in AvroCoder 来读取输入文件。您可以稍后在管道中像您描述的那样更改编码器。如果您的数据实际上没有以 AvroIO 可以读取的方式编码,您应该使用不同的来源,例如 FileBasedSource.
的新子类
我正在尝试使用自定义 Coder 来处理 Dataflow 中的数据。我所做的如下:
- 已将数据从 BigQuery 导出到 avro 文件
- 使用 avro-tools-1.7 从这些文件中的模式自动生成 class。7.jar
- 使用 Kryo 为 class 编写了自定义编码器
- 将 class 注释为
@DefaultCoder(MyCustomCoder.class)
- 使用
p.getCoderRegistry().registerCoder(MyCustomClass.class, MyCustomCoder.class);
注册了我的编码器
- 使用
PCollection<MyCustomClass> pc = p.apply(AvroIO.Read.named("Name").from("gs://bucket/path/to/*.avro").withSchema(MyCustomClass.class));
从 avro 文件读取数据
问题是,如果我的 Coder 中有错误,我的工作只会在洗牌步骤中失败。看起来 Dataflow 没有使用我的自定义 Coder 从 avro 文件加载数据。 真的是这样吗?如果是这样,有没有办法覆盖用于加载数据的编码器?
AvroIO 目前总是使用 built-in AvroCoder 来读取输入文件。您可以稍后在管道中像您描述的那样更改编码器。如果您的数据实际上没有以 AvroIO 可以读取的方式编码,您应该使用不同的来源,例如 FileBasedSource.
的新子类