使用 RowCoder 编码 JSON 字符串
Encoding JSON String with RowCoder
假设我有一个 JSON 字符串
{"targetTable": "table", "primaryKey": {"A": "a"}, "payload": {"A": "a", "B": "b"}}
我有兴趣使用 RowCoder.of(schema)
将其序列化为 org.apache.beam.sdk.values.Row
,架构定义如下
import java.io.InputStream;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.schemas.Schema;
Schema schema1 = Schema.builder()
.addStringField("A")
.build();
Schema schema2 = Schema.builder()
.addStringField("A")
.addStringField("B")
.build();
Schema schema = Schema.of(
Schema.Field.of("targetTable", Schema.FieldType.STRING),
Schema.Field.of("primaryKey", Schema.FieldType.row(schema1)),
Schema.Field.of("payload", Schema.FieldType.row(schema2)));
String jsonString = "{\"targetTable\": \"table\", \"primaryKey\": {\"A\": \"a\"}, \"payload\": {\"A\": \"a\", \"B\": \"b\"}}"
InputStream input = ???;
Row row = RowCoder.of(schema).decode(input);
// assert(row.getString("targetTable") == "hello");
// assert(row.getRow("primaryKey").getSchema().equivalent(schemaA1));
// assert(row.getRow("payload").getSchema().equivalent(schemaA0));
如何正确构建 InputStream
以将嵌套的 JSON 结构转换为梁行?我也试过这个但是我很困惑并且不确定如何正确构建 byte[]
.
InputStream input = new ByteArrayInputStream(jsonString.getBytes(StandardCharsets.UTF_8));
谢谢。
您可以为此使用 JsonToRow
,它会为您执行字符串反序列化和 Schema
编码。在定义了一个与您的 JSON 结构相似的 Schema
之后。
请注意,不支持某些数据类型,例如 Schema.TypeName.DATETIME
,您可能需要改用 Schema.TypeName.STRING
。
import org.apache.beam.sdk.transforms.JsonToRow;
PCollection<String> json = ...
PCollection<MyUserType> = json
.apply("Parse JSON to Beam Rows", JsonToRow.withSchema(schema))
见https://cloud.google.com/architecture/e-commerce/patterns/converting-json
假设我有一个 JSON 字符串
{"targetTable": "table", "primaryKey": {"A": "a"}, "payload": {"A": "a", "B": "b"}}
我有兴趣使用 RowCoder.of(schema)
将其序列化为 org.apache.beam.sdk.values.Row
,架构定义如下
import java.io.InputStream;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.schemas.Schema;
Schema schema1 = Schema.builder()
.addStringField("A")
.build();
Schema schema2 = Schema.builder()
.addStringField("A")
.addStringField("B")
.build();
Schema schema = Schema.of(
Schema.Field.of("targetTable", Schema.FieldType.STRING),
Schema.Field.of("primaryKey", Schema.FieldType.row(schema1)),
Schema.Field.of("payload", Schema.FieldType.row(schema2)));
String jsonString = "{\"targetTable\": \"table\", \"primaryKey\": {\"A\": \"a\"}, \"payload\": {\"A\": \"a\", \"B\": \"b\"}}"
InputStream input = ???;
Row row = RowCoder.of(schema).decode(input);
// assert(row.getString("targetTable") == "hello");
// assert(row.getRow("primaryKey").getSchema().equivalent(schemaA1));
// assert(row.getRow("payload").getSchema().equivalent(schemaA0));
如何正确构建 InputStream
以将嵌套的 JSON 结构转换为梁行?我也试过这个但是我很困惑并且不确定如何正确构建 byte[]
.
InputStream input = new ByteArrayInputStream(jsonString.getBytes(StandardCharsets.UTF_8));
谢谢。
您可以为此使用 JsonToRow
,它会为您执行字符串反序列化和 Schema
编码。在定义了一个与您的 JSON 结构相似的 Schema
之后。
请注意,不支持某些数据类型,例如 Schema.TypeName.DATETIME
,您可能需要改用 Schema.TypeName.STRING
。
import org.apache.beam.sdk.transforms.JsonToRow;
PCollection<String> json = ...
PCollection<MyUserType> = json
.apply("Parse JSON to Beam Rows", JsonToRow.withSchema(schema))
见https://cloud.google.com/architecture/e-commerce/patterns/converting-json