Jolt 规范 - 如何将元素插入数组

Jolt Spec - How to insert element into Array

如何将不需要输入 json 的常量元素插入数组?

我的意图

我捕获了 MySQL 在 JSON 中格式化的 CDC,并且我添加了一个指示二进制日志时间的新列。

然后将 JSON 转换为 AVRO,因此我需要自动生成 avsc(CDC 包括列类型信息)以防 MySQL table 发生变化。

现在,我无法将元素插入到 avsc

中的 $.fields[]

输入

{
  "database": "test",
  "es": 1555381078000,
  "table": "table_name",
  "mysqlType": {
    "bool_type": "tinyint(1)",
    "tinyint_type": "tinyint(4)",
    "SMALLINT_type": "smallint(6)",
    "MEDIUMINT_type": "mediumint(9)",
    "int_type": "int(11)",
    "integer_type": "int(11)",
    "bigint_type": "bigint(20)",
    "float_type": "float",
    "double_type": "double",
    "decimal_type": "decimal(10,0)",
    "decimal_type2": "decimal(20,20)",
    "varchar_type": "varchar(20)",
    "date_type": "date",
    "time_type": "time",
    "datetime_type": "datetime",
    "timestamp_type": "timestamp"
  }
}

当前规格

[
  {
    "operation": "shift",
    "spec": {
      "database": "schema.namespace",
      "table": "schema.name",
      "#record": "schema.type",
      "#auto generated by jolt": "schema.doc",
      "mysqlType": {
        "*": {
          "tinyint*|smallint*|mediumint*|int*|date": {
            "": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#int": "schema.fields.[#3].type[]"
          },
          "bigint*|datetime|timestamp": {
            "": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#long": "schema.fields.[#3].type[]"
          },
          "float|double|decimal*": {
            "": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#long": "schema.fields.[#3].type[]"
          },
          "*": {
            "": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#string": "schema.fields.[#3].type[]"
          }
        },
        "#__binlog_time": "schema.fields[#2].name",
        "#null": "schema.fields[#2].type[]",
        "#long": "schema.fields[#2].type[]"
      }
    }
    }
]

当前输出

当前的 jolt 规范将新元素放入 $.fields[]

的元素中是错误的
{
  "schema" : {
    "type" : "record",
    "doc" : "auto generated by jolt",
    "namespace" : "test",
    "name" : "table_name",
    "fields" : [ {
      "name" : "bool_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "tinyint_type",
      "type" : [ "null", "int" ]
    }, { // wrong there
      "name" : [ "__binlog_time", "SMALLINT_type" ],
      "type" : [ "null", "long", "null", "int" ]
    }, {
      "name" : "MEDIUMINT_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "int_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "integer_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "bigint_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "float_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "double_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "decimal_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "decimal_type2",
      "type" : [ "null", "long" ]
    }, {
      "name" : "varchar_type",
      "type" : [ "null", "string" ]
    }, {
      "name" : "date_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "time_type",
      "type" : [ "null", "string" ]
    }, {
      "name" : "datetime_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "timestamp_type",
      "type" : [ "null", "long" ]
    } ]
  }
}

想要输出

将元素 {"name":"new_column","type":["null","string"]} 插入到 数组 $.fields[]

{
  "schema" : {
    "type" : "record",
    "doc" : "auto generated by jolt",
    "namespace" : "test",
    "name" : "table_name",
    "fields" : [ {
      "name" : "bool_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "tinyint_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "SMALLINT_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "MEDIUMINT_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "int_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "integer_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "bigint_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "float_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "double_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "decimal_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "decimal_type2",
      "type" : [ "null", "long" ]
    }, {
      "name" : "varchar_type",
      "type" : [ "null", "string" ]
    }, {
      "name" : "date_type",
      "type" : [ "null", "int" ]
    }, {
      "name" : "time_type",
      "type" : [ "null", "string" ]
    }, {
      "name" : "datetime_type",
      "type" : [ "null", "long" ]
    }, {
      "name" : "timestamp_type",
      "type" : [ "null", "long" ]
    }, { // new element(but no need be the last element)
      "name" : "__binlog_time",
      "type" : [ "null", "long" ]
    } ]
  }
}

您可以先使用 default 规范将新字段添加到输入 JSON,然后更新您的 "optional long" 匹配器以包含 long:

[
  {
    "operation": "default",
    "spec": {
      "mysqlType": {
        "binlog_time": "long"
      }
    }
  },
  {
    "operation": "shift",
    "spec": {
      "database": "schema.namespace",
      "table": "schema.name",
      "#record": "schema.type",
      "#auto generated by jolt": "schema.doc",
      "mysqlType": {
        "*": {
          "tinyint*|smallint*|mediumint*|int*|date": {
            "": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#int": "schema.fields.[#3].type[]"
          },
          "bigint*|datetime|timestamp|long": {
            "": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#long": "schema.fields.[#3].type[]"
          },
          "float|double|decimal*": {
            "": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#long": "schema.fields.[#3].type[]"
          },
          "*": {
            "": "schema.fields.[#3].name",
            "#null": "schema.fields.[#3].type[]",
            "#string": "schema.fields.[#3].type[]"
          }
        }
      }
    }
  }
]