Airflow 的 bigqueryoperator 不与 udf 一起工作
Airflow's bigqueryoperator not working with udf
我正在尝试 运行 Airflow 中的基本 bigquery 运算符(使用 Google 的 Composer)任务,它使用用户定义的函数 (UDF)。
例子完全来自https://cloud.google.com/bigquery/docs/reference/standard-sql/user-defined-functions和运行s在BigQuery中。
然而,当我上传到 composer 时,我得到 "Function not found: multiplyInputs..." 请参阅下面的 python 脚本。
BigQueryOperator 的 udf_config 字段需要一个列表,因此我将我的 UDF 定义为包含一个字符串的列表 - 不确定这是否正确,因为它显然没有注册为 UDF
非常感谢任何帮助。
import datetime
from airflow import models
from airflow.contrib.operators import bigquery_operator
yesterday = datetime.datetime.combine(datetime.datetime.today() -
datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG
immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your
email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least
5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'vital-platform-791'
}
with models.DAG('udf_example',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
table = 'udf_table'
# flatten fe table
task_id = table + '_fe'
udf_config = ["""CREATE TEMPORARY FUNCTION multiplyInputs(x
FLOAT64, y FLOAT64)
RETURNS FLOAT64
LANGUAGE js AS \"""
return x*y;
\""";
"""]
print udf_config
query = """WITH numbers AS
(SELECT 1 AS x, 5 as y
UNION ALL
SELECT 2 AS x, 10 as y
UNION ALL
SELECT 3 as x, 15 as y)
SELECT x, y, multiplyInputs(x, y) as product
FROM numbers"""
print query
query = query
destination_table = 'vital-platform-791.alpha_factors.
{table}_fe'.format(table=table)
t_fe = bigquery_operator.BigQueryOperator(task_id=task_id,
bql=query,
destination_dataset_table=destination_table,
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
udf_config=udf_config)
这个例子让我有点困惑。看起来您只需要合并 udf_config
和 query
:
query = ""CREATE TEMPORARY FUNCTION multiplyInputs(x
FLOAT64, y FLOAT64)
RETURNS FLOAT64
LANGUAGE js AS \"""
return x*y;
\""";
WITH numbers AS
(SELECT 1 AS x, 5 as y
UNION ALL
SELECT 2 AS x, 10 as y
UNION ALL
SELECT 3 as x, 15 as y)
SELECT x, y, multiplyInputs(x, y) as product
FROM numbers;"""
在 Google 云存储中上传您的 UDF 函数并将其传递给 udf_config
参数。
例如:
你的UDF函数在gs://test-bucket/testfolder/udf.js
然后在你的airflow dag中使用:
udf_gcs_path = "gs://test-bucket/testfolder/udf.js"
bigquery_operator.BigQueryOperator(task_id=task_id,
bql=query,
destination_dataset_table=destination_table,
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
udf_config=[{"resourceUri": udf_gcs_path}])
参考文献:
我正在尝试 运行 Airflow 中的基本 bigquery 运算符(使用 Google 的 Composer)任务,它使用用户定义的函数 (UDF)。
例子完全来自https://cloud.google.com/bigquery/docs/reference/standard-sql/user-defined-functions和运行s在BigQuery中。
然而,当我上传到 composer 时,我得到 "Function not found: multiplyInputs..." 请参阅下面的 python 脚本。
BigQueryOperator 的 udf_config 字段需要一个列表,因此我将我的 UDF 定义为包含一个字符串的列表 - 不确定这是否正确,因为它显然没有注册为 UDF
非常感谢任何帮助。
import datetime
from airflow import models
from airflow.contrib.operators import bigquery_operator
yesterday = datetime.datetime.combine(datetime.datetime.today() -
datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG
immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your
email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least
5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': 'vital-platform-791'
}
with models.DAG('udf_example',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
table = 'udf_table'
# flatten fe table
task_id = table + '_fe'
udf_config = ["""CREATE TEMPORARY FUNCTION multiplyInputs(x
FLOAT64, y FLOAT64)
RETURNS FLOAT64
LANGUAGE js AS \"""
return x*y;
\""";
"""]
print udf_config
query = """WITH numbers AS
(SELECT 1 AS x, 5 as y
UNION ALL
SELECT 2 AS x, 10 as y
UNION ALL
SELECT 3 as x, 15 as y)
SELECT x, y, multiplyInputs(x, y) as product
FROM numbers"""
print query
query = query
destination_table = 'vital-platform-791.alpha_factors.
{table}_fe'.format(table=table)
t_fe = bigquery_operator.BigQueryOperator(task_id=task_id,
bql=query,
destination_dataset_table=destination_table,
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
udf_config=udf_config)
这个例子让我有点困惑。看起来您只需要合并 udf_config
和 query
:
query = ""CREATE TEMPORARY FUNCTION multiplyInputs(x
FLOAT64, y FLOAT64)
RETURNS FLOAT64
LANGUAGE js AS \"""
return x*y;
\""";
WITH numbers AS
(SELECT 1 AS x, 5 as y
UNION ALL
SELECT 2 AS x, 10 as y
UNION ALL
SELECT 3 as x, 15 as y)
SELECT x, y, multiplyInputs(x, y) as product
FROM numbers;"""
在 Google 云存储中上传您的 UDF 函数并将其传递给 udf_config
参数。
例如:
你的UDF函数在gs://test-bucket/testfolder/udf.js
然后在你的airflow dag中使用:
udf_gcs_path = "gs://test-bucket/testfolder/udf.js"
bigquery_operator.BigQueryOperator(task_id=task_id,
bql=query,
destination_dataset_table=destination_table,
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
udf_config=[{"resourceUri": udf_gcs_path}])
参考文献: