TypeError: missing 1 required positional argument: 's' in Airflow
TypeError: missing 1 required positional argument: 's' in Airflow
我目前有一段代码正试图进入气流。
代码从 table 中获取数据并执行 def map_manufacturer_model 中的函数并将其输出到 bigquery table.
我知道代码可以在我的电脑上运行,但是,当我将它移动到 airflow 时,我收到错误消息:
类型错误:缺少 1 个必需的位置参数:'s'
任何人都可以提供任何指导,因为我现在很困难!
谢谢
import pandas as pd
import datetime
from airflow.operators import python_operator
from google.cloud import bigquery
from airflow import models
client = bigquery.Client()
bqclient = bigquery.Client()
# Output table for dataframe
table_id = "Staging"
# Dataframe Code
query_string = """
SELECT * FROM `Staging`
"""
gasdata= (
bqclient.query(query_string)
.result()
.to_dataframe(
create_bqstorage_client=True,
))
manufacturers = {'G4F0': 'FLN', 'G4F1': 'FLN', 'G4F9': 'FLN', 'G4K0': 'HWL', 'E6S1': 'LPG', 'E6S2': 'LPG'}
meter_models = {'G4F0': {'1': 'G4SZV-1', '2': 'G4SZV-2'},
'G4F9': {'': 'G4SZV-1'},
'G4F1': {'': 'G4SDZV-2'},
'G4K0': {'': 'BK-G4E'},
'E6S1': {'': 'E6VG470'},
'E6S2': {'': 'E6VG470'},
}
def map_manufacturer_model(s):
s = str(s)
model = ''
try:
manufacturer = manufacturers[s[:4]]
for k, m in meter_models[s[:4]].items():
if s[-4:].startswith(k):
model = m
break
except KeyError:
manufacturer = ''
return pd.Series({'NewMeterManufacturer': manufacturer,
'NewMeterModel': model
})
gas_data[['NewMeterManufacturer', 'NewMeterModel']] = gas_data['NewSerialNumber'].apply(map_manufacturer_model)
job_config = bigquery.LoadJobConfig(
# Specify a (partial) schema. All columns are always written to the
# table. The schema is used to assist in data type definitions.
schema=[
],
write_disposition="WRITE_TRUNCATE",
)
job = client.load_table_from_dataframe(
gas_data, table_id, job_config=job_config
) # Make an API request.
job.result() # Wait for the job to complete.
table = client.get_table(table_id) # Make an API request.
print(
"Loaded {} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id
)
)
print('Loaded DATAFRAME into BQ TABLE')
default_dag_args = {'owner': 'Owner',
'start_date': datetime.datetime(2021, 12, 15),
'retries': 1,
}
with models.DAG('test_dag',
schedule_interval='0 8 * * *',
default_args=default_dag_args) as dag:
map_manufacturer_model_function = python_operator.PythonOperator(
task_id='map_manufacturer_model_function',
python_callable=map_manufacturer_model
)
当您实际调用 PythonOperator 时,您并没有传入 s
参数。
您可以将 op_kwargs 添加到作为调用的一部分传递的参数中,如下所示:
map_manufacturer_model_function = python_operator.PythonOperator(
task_id='map_manufacturer_model_function',
python_callable=map_manufacturer_model,
op_kwargs={'s':'stringValue'}
)
请参阅此处的 python 运算符示例:
https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html
我目前有一段代码正试图进入气流。
代码从 table 中获取数据并执行 def map_manufacturer_model 中的函数并将其输出到 bigquery table.
我知道代码可以在我的电脑上运行,但是,当我将它移动到 airflow 时,我收到错误消息:
类型错误:缺少 1 个必需的位置参数:'s'
任何人都可以提供任何指导,因为我现在很困难!
谢谢
import pandas as pd
import datetime
from airflow.operators import python_operator
from google.cloud import bigquery
from airflow import models
client = bigquery.Client()
bqclient = bigquery.Client()
# Output table for dataframe
table_id = "Staging"
# Dataframe Code
query_string = """
SELECT * FROM `Staging`
"""
gasdata= (
bqclient.query(query_string)
.result()
.to_dataframe(
create_bqstorage_client=True,
))
manufacturers = {'G4F0': 'FLN', 'G4F1': 'FLN', 'G4F9': 'FLN', 'G4K0': 'HWL', 'E6S1': 'LPG', 'E6S2': 'LPG'}
meter_models = {'G4F0': {'1': 'G4SZV-1', '2': 'G4SZV-2'},
'G4F9': {'': 'G4SZV-1'},
'G4F1': {'': 'G4SDZV-2'},
'G4K0': {'': 'BK-G4E'},
'E6S1': {'': 'E6VG470'},
'E6S2': {'': 'E6VG470'},
}
def map_manufacturer_model(s):
s = str(s)
model = ''
try:
manufacturer = manufacturers[s[:4]]
for k, m in meter_models[s[:4]].items():
if s[-4:].startswith(k):
model = m
break
except KeyError:
manufacturer = ''
return pd.Series({'NewMeterManufacturer': manufacturer,
'NewMeterModel': model
})
gas_data[['NewMeterManufacturer', 'NewMeterModel']] = gas_data['NewSerialNumber'].apply(map_manufacturer_model)
job_config = bigquery.LoadJobConfig(
# Specify a (partial) schema. All columns are always written to the
# table. The schema is used to assist in data type definitions.
schema=[
],
write_disposition="WRITE_TRUNCATE",
)
job = client.load_table_from_dataframe(
gas_data, table_id, job_config=job_config
) # Make an API request.
job.result() # Wait for the job to complete.
table = client.get_table(table_id) # Make an API request.
print(
"Loaded {} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id
)
)
print('Loaded DATAFRAME into BQ TABLE')
default_dag_args = {'owner': 'Owner',
'start_date': datetime.datetime(2021, 12, 15),
'retries': 1,
}
with models.DAG('test_dag',
schedule_interval='0 8 * * *',
default_args=default_dag_args) as dag:
map_manufacturer_model_function = python_operator.PythonOperator(
task_id='map_manufacturer_model_function',
python_callable=map_manufacturer_model
)
当您实际调用 PythonOperator 时,您并没有传入 s
参数。
您可以将 op_kwargs 添加到作为调用的一部分传递的参数中,如下所示:
map_manufacturer_model_function = python_operator.PythonOperator(
task_id='map_manufacturer_model_function',
python_callable=map_manufacturer_model,
op_kwargs={'s':'stringValue'}
)
请参阅此处的 python 运算符示例: https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html