将列的值分配给 sql 中的变量使用 jinja 模板语言
Assign value of a column to variable in sql use jinja template language
我有一个像这样的 sql 文件来转换 table 有一列包含 json 字符串
{{ config(materialized='table') }}
with customer_orders as (
select
time,
data as jsonData,
{% set my_dict = fromjson( jsonData ) %}
{% do log("Printout: " ~ my_dict, info=true) %}
from `warehouses.raw_data.customer_orders`
limit 5
)
select *
from customer_orders
当我运行dbt run
时,它return像这样:
Running with dbt=0.21.0
Encountered an error:
the JSON object must be str, bytes or bytearray, not Undefined
我什至无法打印出我想要的列的值:
{{ config(materialized='table') }}
with customer_orders as (
select
time,
tag,
data as jsonData,
{% do log("Printout: " ~ data, info=true) %}
from `warehouses.raw_data.customer_orders`
limit 5
)
select *
from customer_orders
22:42:58 | Concurrency: 1 threads (target='dev')
22:42:58 |
Printout:
22:42:58 | Done.
但是如果我创建另一个模型来打印出 jsonData:
的值
{%- set payment_methods = dbt_utils.get_column_values(
table=ref('customer_orders_model'),
column='jsonData'
) -%}
{% do log(payment_methods, info=true) %}
{% for json in payment_methods %}
{% set my_dict = fromjson(json) %}
{% do log(my_dict, info=true) %}
{% endfor %}
它打印出我想要的json值
Running with dbt=0.21.0
This is log
Found 2 models, 0 tests, 0 snapshots, 0 analyses, 372 macros, 0 operations, 0 seed files, 0 sources, 0 exposures
21:41:15 | Concurrency: 1 threads (target='dev')
21:41:15 |
['{"log": "ok", "path": "/var/log/containers/...log", "time": "2021-10-26T08:50:52.412932061Z", "offset": 527, "stream": "stdout", "@timestamp": 1635238252.412932}']
{'log': 'ok', 'path': '/var/log/containers/...log', 'time': '2021-10-26T08:50:52.412932061Z', 'offset': 527, 'stream': 'stdout', '@timestamp': 1635238252.412932}
21:41:21 | Done.
但我想在模型文件中处理此 json数据,如上面的 customer_orders_model
。
如何获取列的值并将其分配给变量并继续处理我想要的任何内容(检查 json 中是否有我想要的键并将其值设置为新列)。
备注:My purpose is that: In my table, has a json string column, I want extract this json string column into many columns so I can easily write sql query what I want.
你很亲近!需要记住的是,dbt 和 jinja 主要用于渲染文本。不在大括号中的任何内容都只是文本字符串。
因此,在您的第一个示例中,data
和 jsonData
是较大查询的子字符串(这也是一个字符串)。所以它们不是 Jinja 知道的变量,这解释了它们是 Undefined
的错误消息
with customer_orders as (
select
time,
data as jsonData,
from `warehouses.raw_data.customer_orders`
limit 5
)
select *
from customer_orders
这就是 dbt_utils.get_column_values()
对您有效的原因,因为该宏实际上运行查询以获取数据并将结果分配给变量。 run_query
macro 对这种情况很有帮助(我相当确定 get_column_values
在后台使用 run_query
)。
关于你原来的问题,你想把一个 JSON 字典变成多列,我首先建议让你的数据库直接这样做。许多数据库都有让你这样做的功能。 jinja 主要用于动态生成 SQL 查询,而不是用于操作数据。即使您可以将所有 JSON 加载到 jinja 中,我也不知道您如何在不使用 INSERT INTO VALUES
语句的情况下将其写回到 table 中,恕我直言,违背dbt的设计原则。
对于 BigQuery 数据库,Google 有一个 JSON functions in Standard SQL
如果您的列是 JSON 字符串,我认为您可以使用 JSON_EXTRACT
来获取所需键的值
例如:
with customer_orders as (
select
time,
tag,
data as jsonData,
json_extract(data, '$.log') AS log,
from `dc-warehouses.raw_data.logs_trackfoe_prod`
limit 5
)
select *
from customer_orders
我有一个像这样的 sql 文件来转换 table 有一列包含 json 字符串
{{ config(materialized='table') }}
with customer_orders as (
select
time,
data as jsonData,
{% set my_dict = fromjson( jsonData ) %}
{% do log("Printout: " ~ my_dict, info=true) %}
from `warehouses.raw_data.customer_orders`
limit 5
)
select *
from customer_orders
当我运行dbt run
时,它return像这样:
Running with dbt=0.21.0
Encountered an error:
the JSON object must be str, bytes or bytearray, not Undefined
我什至无法打印出我想要的列的值:
{{ config(materialized='table') }}
with customer_orders as (
select
time,
tag,
data as jsonData,
{% do log("Printout: " ~ data, info=true) %}
from `warehouses.raw_data.customer_orders`
limit 5
)
select *
from customer_orders
22:42:58 | Concurrency: 1 threads (target='dev')
22:42:58 |
Printout:
22:42:58 | Done.
但是如果我创建另一个模型来打印出 jsonData:
的值{%- set payment_methods = dbt_utils.get_column_values(
table=ref('customer_orders_model'),
column='jsonData'
) -%}
{% do log(payment_methods, info=true) %}
{% for json in payment_methods %}
{% set my_dict = fromjson(json) %}
{% do log(my_dict, info=true) %}
{% endfor %}
它打印出我想要的json值
Running with dbt=0.21.0
This is log
Found 2 models, 0 tests, 0 snapshots, 0 analyses, 372 macros, 0 operations, 0 seed files, 0 sources, 0 exposures
21:41:15 | Concurrency: 1 threads (target='dev')
21:41:15 |
['{"log": "ok", "path": "/var/log/containers/...log", "time": "2021-10-26T08:50:52.412932061Z", "offset": 527, "stream": "stdout", "@timestamp": 1635238252.412932}']
{'log': 'ok', 'path': '/var/log/containers/...log', 'time': '2021-10-26T08:50:52.412932061Z', 'offset': 527, 'stream': 'stdout', '@timestamp': 1635238252.412932}
21:41:21 | Done.
但我想在模型文件中处理此 json数据,如上面的 customer_orders_model
。
如何获取列的值并将其分配给变量并继续处理我想要的任何内容(检查 json 中是否有我想要的键并将其值设置为新列)。
备注:My purpose is that: In my table, has a json string column, I want extract this json string column into many columns so I can easily write sql query what I want.
你很亲近!需要记住的是,dbt 和 jinja 主要用于渲染文本。不在大括号中的任何内容都只是文本字符串。
因此,在您的第一个示例中,data
和 jsonData
是较大查询的子字符串(这也是一个字符串)。所以它们不是 Jinja 知道的变量,这解释了它们是 Undefined
with customer_orders as (
select
time,
data as jsonData,
from `warehouses.raw_data.customer_orders`
limit 5
)
select *
from customer_orders
这就是 dbt_utils.get_column_values()
对您有效的原因,因为该宏实际上运行查询以获取数据并将结果分配给变量。 run_query
macro 对这种情况很有帮助(我相当确定 get_column_values
在后台使用 run_query
)。
关于你原来的问题,你想把一个 JSON 字典变成多列,我首先建议让你的数据库直接这样做。许多数据库都有让你这样做的功能。 jinja 主要用于动态生成 SQL 查询,而不是用于操作数据。即使您可以将所有 JSON 加载到 jinja 中,我也不知道您如何在不使用 INSERT INTO VALUES
语句的情况下将其写回到 table 中,恕我直言,违背dbt的设计原则。
对于 BigQuery 数据库,Google 有一个 JSON functions in Standard SQL
如果您的列是 JSON 字符串,我认为您可以使用 JSON_EXTRACT
来获取所需键的值
例如:
with customer_orders as (
select
time,
tag,
data as jsonData,
json_extract(data, '$.log') AS log,
from `dc-warehouses.raw_data.logs_trackfoe_prod`
limit 5
)
select *
from customer_orders