TypeError: missing 1 required positional argument: 's' in Airflow

TypeError: missing 1 required positional argument: 's' in Airflow

我目前有一段代码正试图进入气流。

代码从 table 中获取数据并执行 def map_manufacturer_model 中的函数并将其输出到 bigquery table.

我知道代码可以在我的电脑上运行,但是,当我将它移动到 airflow 时,我收到错误消息:

类型错误:缺少 1 个必需的位置参数:'s'

任何人都可以提供任何指导,因为我现在很困难!

谢谢

import pandas as pd
import datetime
from airflow.operators import python_operator
from google.cloud import bigquery
from airflow import models

client = bigquery.Client()

bqclient = bigquery.Client()

# Output table for dataframe
table_id = "Staging"

# Dataframe Code
query_string = """
SELECT * FROM `Staging`
"""
gasdata= (
    bqclient.query(query_string)
        .result()
        .to_dataframe(
        create_bqstorage_client=True,
    ))

manufacturers = {'G4F0': 'FLN', 'G4F1': 'FLN', 'G4F9': 'FLN', 'G4K0': 'HWL', 'E6S1': 'LPG', 'E6S2': 'LPG'}

meter_models = {'G4F0': {'1': 'G4SZV-1', '2': 'G4SZV-2'},
                'G4F9': {'': 'G4SZV-1'},
                'G4F1': {'': 'G4SDZV-2'},
                'G4K0': {'': 'BK-G4E'},
                'E6S1': {'': 'E6VG470'},
                'E6S2': {'': 'E6VG470'},
                }


def map_manufacturer_model(s):
    s = str(s)
    model = ''
    try:
        manufacturer = manufacturers[s[:4]]
        for k, m in meter_models[s[:4]].items():
            if s[-4:].startswith(k):
                model = m
                break
    except KeyError:
        manufacturer = ''

    return pd.Series({'NewMeterManufacturer': manufacturer,
                      'NewMeterModel': model
                      })


gas_data[['NewMeterManufacturer', 'NewMeterModel']] = gas_data['NewSerialNumber'].apply(map_manufacturer_model)

job_config = bigquery.LoadJobConfig(
    # Specify a (partial) schema. All columns are always written to the
    # table. The schema is used to assist in data type definitions.
    schema=[
    ],
    write_disposition="WRITE_TRUNCATE",
)

job = client.load_table_from_dataframe(
    gas_data, table_id, job_config=job_config
)  # Make an API request.
job.result()  # Wait for the job to complete.

table = client.get_table(table_id)  # Make an API request.
print(
    "Loaded {} rows and {} columns to {}".format(
        table.num_rows, len(table.schema), table_id
    )
)
print('Loaded DATAFRAME into BQ TABLE')


default_dag_args = {'owner': 'Owner',
                    'start_date': datetime.datetime(2021, 12, 15),
                    'retries': 1,
                    }

with models.DAG('test_dag',
                schedule_interval='0 8 * * *',
                default_args=default_dag_args) as dag:

    map_manufacturer_model_function = python_operator.PythonOperator(
        task_id='map_manufacturer_model_function',
        python_callable=map_manufacturer_model
    )

当您实际调用 PythonOperator 时,您并没有传入 s 参数。 您可以将 op_kwargs 添加到作为调用的一部分传递的参数中,如下所示:

map_manufacturer_model_function = python_operator.PythonOperator(
        task_id='map_manufacturer_model_function',
        python_callable=map_manufacturer_model,
        op_kwargs={'s':'stringValue'}
    )

请参阅此处的 python 运算符示例: https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html