在ksql中转换数据

Transform data in ksql

我正在尝试将数据从一种格式转换为另一种格式(一种模式转换为另一种模式)。

示例:

payload = {
    'a' : 'a1',
    'b' : 'b1'
}

我想将此有效负载转换为另一种形式让我们说

payload_transform = {
   'a':{
      'b' : 'b1'
    }
    'c' : 'a1'
}

考虑到数据(payload)来自 Kafka,我想在消费者中看到 payload_transform 通过转换

ksql 可以吗?

更新:

我们可以做一级吗:

payload = {
    'a' : 'a1',
    'b' : 'b1'
}

payload = {
    'confluent' : 'a1',
    'b' : 'b1'
}

我们可以添加条件吗?

例如:如果有效负载中存在 'b' 密钥,则生成

payload = {
        'confluent' : 'a1',
        'b' : 'b1'
    }

否则:

payload = {
        'kafka' : 'a1',
        'b' : 'b1'
    }

虽然 KSQL 确实支持 un-nesting JSON(使用 EXTRACTJSONFIELD),但目前(2018 年 3 月/0.5 版)不支持构建嵌套结构。它目前也不支持嵌套的 Avro。

更新问题的更新回复

  • 您可以重命名字段,只需使用 SQL AS 子句:

    SELECT A AS NEW_COL, B FROM INPUT_STREAM

你能详细描述一下你想在这里做什么吗?在您给出的示例中,有条件地重命名字段没有意义。也许也可以试试 KSQL 看看什么对你有用。