无法使用 Airflow 2.1.2 DAG 中的 HiveOperator 连接到 Hive

Unable to connect to Hive using HiveOperator from Airflow 2.1.2 DAG

我一直在努力 运行 来自 HiveOperator 任务的 Hive 查询。 Hive 和 Airflow 安装在 docker 容器中,我可以从 Airflow 容器的 python 代码查询 Hive 表,也可以通过 Hive CLI 成功查询。但是当我 运行 Airflow DAG 时,我看到一个错误,指出找不到 hive/beeline 文件。

DAG:

dag_hive = DAG(dag_id = "hive_script",
          schedule_interval = '* * * * *',
            start_date = airflow.utils.dates.days_ago(1))

hql_query = """
CREATE TABLE IF NOT EXISTS mydb.test_af(
`test` int);
insert into mydb.test_af values (1);
"""

hive_task = HiveOperator(hql = hql_query,
          task_id = "hive_script_task",
            hive_cli_conn_id = "hive_local",
              dag = dag_hive
              )

hive_task

if __name__ == '__main__ ':
      dag_hive.cli()

日志:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/hive/operators/hive.py", line 156, in execute
    self.hook.run_cli(hql=self.hql, schema=self.schema, hive_conf=self.hiveconfs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/hive/hooks/hive.py", line 249, in run_cli
    hive_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=tmp_dir, close_fds=True
  File "/usr/local/lib/python3.7/subprocess.py", line 800, in __init__
    restore_signals, start_new_session)
  File "/usr/local/lib/python3.7/subprocess.py", line 1551, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'beeline': 'beeline'
[2021-08-19 12:22:04,291] {taskinstance.py:1551} INFO - Marking task as FAILED. dag_id=***_script, task_id=***_script_task, execution_date=20210819T122100, start_date=20210819T122204, end_date=20210819T122204
[2021-08-19 12:22:04,323] {local_task_job.py:149} INFO - Task exited with return code 1

如果有人能帮助我就好了。 提前致谢...

您需要在 Apache Airflow 镜像中安装 beeline。这取决于您使用的 Airflow 图像,但 Airflow 的“参考”图像仅包含最常见的提供程序,而配置单元不在其中。您应该扩展或自定义图像以添加直线以在气流图像中的路径中可用。

您可以在 https://airflow.apache.org/docs/docker-stack/build.html#adding-new-apt-package

阅读有关 extending/customising 气流图像的更多信息