将列的值分配给 sql 中的变量使用 jinja 模板语言

Assign value of a column to variable in sql use jinja template language

我有一个像这样的 sql 文件来转换 table 有一列包含 json 字符串

{{ config(materialized='table') }}

with customer_orders as (
  select
    time,
    data as jsonData,

{% set my_dict = fromjson( jsonData ) %}
{% do log("Printout: " ~ my_dict, info=true) %}

  from `warehouses.raw_data.customer_orders`

  limit 5
)

select *
from customer_orders

当我运行dbt run时,它return像这样:

Running with dbt=0.21.0
Encountered an error:
the JSON object must be str, bytes or bytearray, not Undefined

我什至无法打印出我想要的列的值:


{{ config(materialized='table') }}

with customer_orders as (
  select
    time,
    tag,
    data as jsonData,

{% do log("Printout: " ~ data, info=true) %}

  from `warehouses.raw_data.customer_orders`

  limit 5
)

select *
from customer_orders
22:42:58 | Concurrency: 1 threads (target='dev')
22:42:58 | 
Printout: 
22:42:58 | Done.

但是如果我创建另一个模型来打印出 jsonData:

的值
{%- set payment_methods = dbt_utils.get_column_values(
    table=ref('customer_orders_model'),
    column='jsonData'
) -%}

{% do log(payment_methods, info=true) %}

{% for json in payment_methods %}
{% set my_dict = fromjson(json) %}
{% do log(my_dict, info=true) %}
{% endfor %}

它打印出我想要的json值

Running with dbt=0.21.0
This is log

Found 2 models, 0 tests, 0 snapshots, 0 analyses, 372 macros, 0 operations, 0 seed files, 0 sources, 0 exposures

21:41:15 | Concurrency: 1 threads (target='dev')
21:41:15 |

['{"log": "ok", "path": "/var/log/containers/...log", "time": "2021-10-26T08:50:52.412932061Z", "offset": 527, "stream": "stdout", "@timestamp": 1635238252.412932}']

{'log': 'ok', 'path': '/var/log/containers/...log', 'time': '2021-10-26T08:50:52.412932061Z', 'offset': 527, 'stream': 'stdout', '@timestamp': 1635238252.412932}

21:41:21 | Done.

但我想在模型文件中处理此 json数据,如上面的 customer_orders_model

如何获取列的值并将其分配给变量并继续处理我想要的任何内容(检查 json 中是否有我想要的键并将其值设置为新列)。

备注:My purpose is that: In my table, has a json string column, I want extract this json string column into many columns so I can easily write sql query what I want.

你很亲近!需要记住的是,dbt 和 jinja 主要用于渲染文本。不在大括号中的任何内容都只是文本字符串。

因此,在您的第一个示例中,datajsonData 是较大查询的子字符串(这也是一个字符串)。所以它们不是 Jinja 知道的变量,这解释了它们是 Undefined

的错误消息
with customer_orders as (
  select
    time,
    data as jsonData,
  from `warehouses.raw_data.customer_orders`
  limit 5
)
select *
from customer_orders

这就是 dbt_utils.get_column_values() 对您有效的原因,因为该宏实际上运行查询以获取数据并将结果分配给变量。 run_query macro 对这种情况很有帮助(我相当确定 get_column_values 在后台使用 run_query)。

关于你原来的问题,你想把一个 JSON 字典变成多列,我首先建议让你的数据库直接这样做。许多数据库都有让你这样做的功能。 jinja 主要用于动态生成 SQL 查询,而不是用于操作数据。即使您可以将所有 JSON 加载到 jinja 中,我也不知道您如何在不使用 INSERT INTO VALUES 语句的情况下将其写回到 table 中,恕我直言,违背dbt的设计原则。

对于 BigQuery 数据库,Google 有一个 JSON functions in Standard SQL

如果您的列是 JSON 字符串,我认为您可以使用 JSON_EXTRACT 来获取所需键的值

例如:

with customer_orders as (
  select
    time,
    tag,
    data as jsonData,
    json_extract(data, '$.log') AS log,
  from `dc-warehouses.raw_data.logs_trackfoe_prod`
  limit 5
)
select *
from customer_orders