使用 RowCoder 编码 JSON 字符串

Encoding JSON String with RowCoder

假设我有一个 JSON 字符串

{"targetTable": "table", "primaryKey": {"A": "a"}, "payload": {"A": "a", "B": "b"}}

我有兴趣使用 RowCoder.of(schema) 将其序列化为 org.apache.beam.sdk.values.Row,架构定义如下

import java.io.InputStream;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.schemas.Schema;

Schema schema1 = Schema.builder()
    .addStringField("A")
    .build();

Schema schema2 = Schema.builder()
    .addStringField("A")
    .addStringField("B")
    .build();

Schema schema = Schema.of(
    Schema.Field.of("targetTable", Schema.FieldType.STRING),
    Schema.Field.of("primaryKey", Schema.FieldType.row(schema1)),
    Schema.Field.of("payload", Schema.FieldType.row(schema2)));

String jsonString = "{\"targetTable\": \"table\", \"primaryKey\": {\"A\": \"a\"}, \"payload\": {\"A\": \"a\", \"B\": \"b\"}}"

InputStream input = ???;
Row row = RowCoder.of(schema).decode(input);

// assert(row.getString("targetTable") == "hello");
// assert(row.getRow("primaryKey").getSchema().equivalent(schemaA1));
// assert(row.getRow("payload").getSchema().equivalent(schemaA0));

如何正确构建 InputStream 以将嵌套的 JSON 结构转换为梁行?我也试过这个但是我很困惑并且不确定如何正确构建 byte[].

InputStream input = new ByteArrayInputStream(jsonString.getBytes(StandardCharsets.UTF_8));

谢谢。

您可以为此使用 JsonToRow,它会为您执行字符串反序列化和 Schema 编码。在定义了一个与您的 JSON 结构相似的 Schema 之后。

请注意,不支持某些数据类型,例如 Schema.TypeName.DATETIME,您可能需要改用 Schema.TypeName.STRING

import org.apache.beam.sdk.transforms.JsonToRow;

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(schema))

https://cloud.google.com/architecture/e-commerce/patterns/converting-json