Jolt 规范 - 如何将元素插入数组
Jolt Spec - How to insert element into Array
如何将不需要输入 json 的常量元素插入数组?
我的意图
我捕获了 MySQL 在 JSON 中格式化的 CDC,并且我添加了一个指示二进制日志时间的新列。
然后将 JSON 转换为 AVRO,因此我需要自动生成 avsc(CDC 包括列类型信息)以防 MySQL table 发生变化。
现在,我无法将元素插入到 avsc
中的 $.fields[]
输入
{
"database": "test",
"es": 1555381078000,
"table": "table_name",
"mysqlType": {
"bool_type": "tinyint(1)",
"tinyint_type": "tinyint(4)",
"SMALLINT_type": "smallint(6)",
"MEDIUMINT_type": "mediumint(9)",
"int_type": "int(11)",
"integer_type": "int(11)",
"bigint_type": "bigint(20)",
"float_type": "float",
"double_type": "double",
"decimal_type": "decimal(10,0)",
"decimal_type2": "decimal(20,20)",
"varchar_type": "varchar(20)",
"date_type": "date",
"time_type": "time",
"datetime_type": "datetime",
"timestamp_type": "timestamp"
}
}
当前规格
[
{
"operation": "shift",
"spec": {
"database": "schema.namespace",
"table": "schema.name",
"#record": "schema.type",
"#auto generated by jolt": "schema.doc",
"mysqlType": {
"*": {
"tinyint*|smallint*|mediumint*|int*|date": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#int": "schema.fields.[#3].type[]"
},
"bigint*|datetime|timestamp": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#long": "schema.fields.[#3].type[]"
},
"float|double|decimal*": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#long": "schema.fields.[#3].type[]"
},
"*": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#string": "schema.fields.[#3].type[]"
}
},
"#__binlog_time": "schema.fields[#2].name",
"#null": "schema.fields[#2].type[]",
"#long": "schema.fields[#2].type[]"
}
}
}
]
当前输出
当前的 jolt 规范将新元素放入 $.fields[]
的元素中是错误的
{
"schema" : {
"type" : "record",
"doc" : "auto generated by jolt",
"namespace" : "test",
"name" : "table_name",
"fields" : [ {
"name" : "bool_type",
"type" : [ "null", "int" ]
}, {
"name" : "tinyint_type",
"type" : [ "null", "int" ]
}, { // wrong there
"name" : [ "__binlog_time", "SMALLINT_type" ],
"type" : [ "null", "long", "null", "int" ]
}, {
"name" : "MEDIUMINT_type",
"type" : [ "null", "int" ]
}, {
"name" : "int_type",
"type" : [ "null", "int" ]
}, {
"name" : "integer_type",
"type" : [ "null", "int" ]
}, {
"name" : "bigint_type",
"type" : [ "null", "long" ]
}, {
"name" : "float_type",
"type" : [ "null", "long" ]
}, {
"name" : "double_type",
"type" : [ "null", "long" ]
}, {
"name" : "decimal_type",
"type" : [ "null", "long" ]
}, {
"name" : "decimal_type2",
"type" : [ "null", "long" ]
}, {
"name" : "varchar_type",
"type" : [ "null", "string" ]
}, {
"name" : "date_type",
"type" : [ "null", "int" ]
}, {
"name" : "time_type",
"type" : [ "null", "string" ]
}, {
"name" : "datetime_type",
"type" : [ "null", "long" ]
}, {
"name" : "timestamp_type",
"type" : [ "null", "long" ]
} ]
}
}
想要输出
将元素 {"name":"new_column","type":["null","string"]}
插入到
数组 $.fields[]
{
"schema" : {
"type" : "record",
"doc" : "auto generated by jolt",
"namespace" : "test",
"name" : "table_name",
"fields" : [ {
"name" : "bool_type",
"type" : [ "null", "int" ]
}, {
"name" : "tinyint_type",
"type" : [ "null", "int" ]
}, {
"name" : "SMALLINT_type",
"type" : [ "null", "int" ]
}, {
"name" : "MEDIUMINT_type",
"type" : [ "null", "int" ]
}, {
"name" : "int_type",
"type" : [ "null", "int" ]
}, {
"name" : "integer_type",
"type" : [ "null", "int" ]
}, {
"name" : "bigint_type",
"type" : [ "null", "long" ]
}, {
"name" : "float_type",
"type" : [ "null", "long" ]
}, {
"name" : "double_type",
"type" : [ "null", "long" ]
}, {
"name" : "decimal_type",
"type" : [ "null", "long" ]
}, {
"name" : "decimal_type2",
"type" : [ "null", "long" ]
}, {
"name" : "varchar_type",
"type" : [ "null", "string" ]
}, {
"name" : "date_type",
"type" : [ "null", "int" ]
}, {
"name" : "time_type",
"type" : [ "null", "string" ]
}, {
"name" : "datetime_type",
"type" : [ "null", "long" ]
}, {
"name" : "timestamp_type",
"type" : [ "null", "long" ]
}, { // new element(but no need be the last element)
"name" : "__binlog_time",
"type" : [ "null", "long" ]
} ]
}
}
您可以先使用 default
规范将新字段添加到输入 JSON,然后更新您的 "optional long" 匹配器以包含 long
:
[
{
"operation": "default",
"spec": {
"mysqlType": {
"binlog_time": "long"
}
}
},
{
"operation": "shift",
"spec": {
"database": "schema.namespace",
"table": "schema.name",
"#record": "schema.type",
"#auto generated by jolt": "schema.doc",
"mysqlType": {
"*": {
"tinyint*|smallint*|mediumint*|int*|date": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#int": "schema.fields.[#3].type[]"
},
"bigint*|datetime|timestamp|long": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#long": "schema.fields.[#3].type[]"
},
"float|double|decimal*": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#long": "schema.fields.[#3].type[]"
},
"*": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#string": "schema.fields.[#3].type[]"
}
}
}
}
}
]
如何将不需要输入 json 的常量元素插入数组?
我的意图
我捕获了 MySQL 在 JSON 中格式化的 CDC,并且我添加了一个指示二进制日志时间的新列。
然后将 JSON 转换为 AVRO,因此我需要自动生成 avsc(CDC 包括列类型信息)以防 MySQL table 发生变化。
现在,我无法将元素插入到 avsc
中的$.fields[]
输入
{
"database": "test",
"es": 1555381078000,
"table": "table_name",
"mysqlType": {
"bool_type": "tinyint(1)",
"tinyint_type": "tinyint(4)",
"SMALLINT_type": "smallint(6)",
"MEDIUMINT_type": "mediumint(9)",
"int_type": "int(11)",
"integer_type": "int(11)",
"bigint_type": "bigint(20)",
"float_type": "float",
"double_type": "double",
"decimal_type": "decimal(10,0)",
"decimal_type2": "decimal(20,20)",
"varchar_type": "varchar(20)",
"date_type": "date",
"time_type": "time",
"datetime_type": "datetime",
"timestamp_type": "timestamp"
}
}
当前规格
[
{
"operation": "shift",
"spec": {
"database": "schema.namespace",
"table": "schema.name",
"#record": "schema.type",
"#auto generated by jolt": "schema.doc",
"mysqlType": {
"*": {
"tinyint*|smallint*|mediumint*|int*|date": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#int": "schema.fields.[#3].type[]"
},
"bigint*|datetime|timestamp": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#long": "schema.fields.[#3].type[]"
},
"float|double|decimal*": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#long": "schema.fields.[#3].type[]"
},
"*": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#string": "schema.fields.[#3].type[]"
}
},
"#__binlog_time": "schema.fields[#2].name",
"#null": "schema.fields[#2].type[]",
"#long": "schema.fields[#2].type[]"
}
}
}
]
当前输出
当前的 jolt 规范将新元素放入 $.fields[]
{
"schema" : {
"type" : "record",
"doc" : "auto generated by jolt",
"namespace" : "test",
"name" : "table_name",
"fields" : [ {
"name" : "bool_type",
"type" : [ "null", "int" ]
}, {
"name" : "tinyint_type",
"type" : [ "null", "int" ]
}, { // wrong there
"name" : [ "__binlog_time", "SMALLINT_type" ],
"type" : [ "null", "long", "null", "int" ]
}, {
"name" : "MEDIUMINT_type",
"type" : [ "null", "int" ]
}, {
"name" : "int_type",
"type" : [ "null", "int" ]
}, {
"name" : "integer_type",
"type" : [ "null", "int" ]
}, {
"name" : "bigint_type",
"type" : [ "null", "long" ]
}, {
"name" : "float_type",
"type" : [ "null", "long" ]
}, {
"name" : "double_type",
"type" : [ "null", "long" ]
}, {
"name" : "decimal_type",
"type" : [ "null", "long" ]
}, {
"name" : "decimal_type2",
"type" : [ "null", "long" ]
}, {
"name" : "varchar_type",
"type" : [ "null", "string" ]
}, {
"name" : "date_type",
"type" : [ "null", "int" ]
}, {
"name" : "time_type",
"type" : [ "null", "string" ]
}, {
"name" : "datetime_type",
"type" : [ "null", "long" ]
}, {
"name" : "timestamp_type",
"type" : [ "null", "long" ]
} ]
}
}
想要输出
将元素 {"name":"new_column","type":["null","string"]}
插入到
数组 $.fields[]
{
"schema" : {
"type" : "record",
"doc" : "auto generated by jolt",
"namespace" : "test",
"name" : "table_name",
"fields" : [ {
"name" : "bool_type",
"type" : [ "null", "int" ]
}, {
"name" : "tinyint_type",
"type" : [ "null", "int" ]
}, {
"name" : "SMALLINT_type",
"type" : [ "null", "int" ]
}, {
"name" : "MEDIUMINT_type",
"type" : [ "null", "int" ]
}, {
"name" : "int_type",
"type" : [ "null", "int" ]
}, {
"name" : "integer_type",
"type" : [ "null", "int" ]
}, {
"name" : "bigint_type",
"type" : [ "null", "long" ]
}, {
"name" : "float_type",
"type" : [ "null", "long" ]
}, {
"name" : "double_type",
"type" : [ "null", "long" ]
}, {
"name" : "decimal_type",
"type" : [ "null", "long" ]
}, {
"name" : "decimal_type2",
"type" : [ "null", "long" ]
}, {
"name" : "varchar_type",
"type" : [ "null", "string" ]
}, {
"name" : "date_type",
"type" : [ "null", "int" ]
}, {
"name" : "time_type",
"type" : [ "null", "string" ]
}, {
"name" : "datetime_type",
"type" : [ "null", "long" ]
}, {
"name" : "timestamp_type",
"type" : [ "null", "long" ]
}, { // new element(but no need be the last element)
"name" : "__binlog_time",
"type" : [ "null", "long" ]
} ]
}
}
您可以先使用 default
规范将新字段添加到输入 JSON,然后更新您的 "optional long" 匹配器以包含 long
:
[
{
"operation": "default",
"spec": {
"mysqlType": {
"binlog_time": "long"
}
}
},
{
"operation": "shift",
"spec": {
"database": "schema.namespace",
"table": "schema.name",
"#record": "schema.type",
"#auto generated by jolt": "schema.doc",
"mysqlType": {
"*": {
"tinyint*|smallint*|mediumint*|int*|date": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#int": "schema.fields.[#3].type[]"
},
"bigint*|datetime|timestamp|long": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#long": "schema.fields.[#3].type[]"
},
"float|double|decimal*": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#long": "schema.fields.[#3].type[]"
},
"*": {
"": "schema.fields.[#3].name",
"#null": "schema.fields.[#3].type[]",
"#string": "schema.fields.[#3].type[]"
}
}
}
}
}
]