从 BigQuery 导出到 MySQL 时出错

Error when exporting from BigQuery to MySQL

我正在尝试将 table 从 BigQuery 导出到 Google 云 MySQL 数据库。

我发现这个运算符叫做 BigQueryToMySqlOperator(在此处记录 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/transfers/bigquery_to_mysql/index.html?highlight=bigquerytomysqloperator#module-airflow.providers.google.cloud.transfers.bigquery_to_mysql

当我将包含此任务的 DAG 部署到 cloud composer 时,任务总是失败并出现错误

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1113, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1287, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1317, in _execute_task
    result = task_copy.execute(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py", line 166, in execute
    for rows in self._bq_get_data():
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py", line 138, in _bq_get_data
    response = cursor.get_tabledata(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 2508, in get_tabledata
    return self.hook.get_tabledata(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 1284, in get_tabledata
    rows = self.list_rows(dataset_id, table_id, max_results, selected_fields, page_token, start_index)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 412, in inner_wrapper
    raise AirflowException(
airflow.exceptions.AirflowException: You must use keyword arguments in this methods rather than positional

我真的不明白为什么会抛出这个错误。谁能帮我弄清楚哪里出了问题,或者我应该如何将数据从 BigQuery 导出到 MySQL 数据库?非常感谢您的帮助!

编辑:我的运营商代码基本上是这样的

transfer_data = BigQueryToMySqlOperator(
            task_id='task_id',
            dataset_table='origin_bq_table',
            mysql_table='dest_table_name',
            replace=True,
        )

根据堆栈跟踪,您最有可能使用 apache-airflow-providers-google==2.2.0

airflow.exceptions.AirflowException: You must use keyword arguments in this methods rather than positional

此错误源于GoogleBaseHook,可追溯到BigQueryToMySqlOperator。

BigQueryToMySqlOperator > BigQueryHook > BigQueryConnection > BigQueryCursor > get_tabledata

您收到 AirflowException 的原因是 get_tabledata 作为执行方法的一部分被调用。

不幸的是,运算符的 test 并不全面,因为它只检查调用的方法是否使用了正确的参数。

我认为这将需要 google 提供程序的新版本,其中 BigQueryToMySqlOperator 中的游标使用关键字参数调用 list_rows 而不是 get_tabledata,它调用 list_rows带有位置参数。

我也在 Airflow 存储库中做了一个 Github Issue