无法使用 Airflow 2.1.2 DAG 中的 HiveOperator 连接到 Hive
Unable to connect to Hive using HiveOperator from Airflow 2.1.2 DAG
我一直在努力 运行 来自 HiveOperator 任务的 Hive 查询。 Hive 和 Airflow 安装在 docker 容器中,我可以从 Airflow 容器的 python 代码查询 Hive 表,也可以通过 Hive CLI 成功查询。但是当我 运行 Airflow DAG 时,我看到一个错误,指出找不到 hive/beeline 文件。
DAG:
dag_hive = DAG(dag_id = "hive_script",
schedule_interval = '* * * * *',
start_date = airflow.utils.dates.days_ago(1))
hql_query = """
CREATE TABLE IF NOT EXISTS mydb.test_af(
`test` int);
insert into mydb.test_af values (1);
"""
hive_task = HiveOperator(hql = hql_query,
task_id = "hive_script_task",
hive_cli_conn_id = "hive_local",
dag = dag_hive
)
hive_task
if __name__ == '__main__ ':
dag_hive.cli()
日志:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/hive/operators/hive.py", line 156, in execute
self.hook.run_cli(hql=self.hql, schema=self.schema, hive_conf=self.hiveconfs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/hive/hooks/hive.py", line 249, in run_cli
hive_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=tmp_dir, close_fds=True
File "/usr/local/lib/python3.7/subprocess.py", line 800, in __init__
restore_signals, start_new_session)
File "/usr/local/lib/python3.7/subprocess.py", line 1551, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'beeline': 'beeline'
[2021-08-19 12:22:04,291] {taskinstance.py:1551} INFO - Marking task as FAILED. dag_id=***_script, task_id=***_script_task, execution_date=20210819T122100, start_date=20210819T122204, end_date=20210819T122204
[2021-08-19 12:22:04,323] {local_task_job.py:149} INFO - Task exited with return code 1
如果有人能帮助我就好了。
提前致谢...
您需要在 Apache Airflow 镜像中安装 beeline。这取决于您使用的 Airflow 图像,但 Airflow 的“参考”图像仅包含最常见的提供程序,而配置单元不在其中。您应该扩展或自定义图像以添加直线以在气流图像中的路径中可用。
您可以在 https://airflow.apache.org/docs/docker-stack/build.html#adding-new-apt-package
阅读有关 extending/customising 气流图像的更多信息
我一直在努力 运行 来自 HiveOperator 任务的 Hive 查询。 Hive 和 Airflow 安装在 docker 容器中,我可以从 Airflow 容器的 python 代码查询 Hive 表,也可以通过 Hive CLI 成功查询。但是当我 运行 Airflow DAG 时,我看到一个错误,指出找不到 hive/beeline 文件。
DAG:
dag_hive = DAG(dag_id = "hive_script",
schedule_interval = '* * * * *',
start_date = airflow.utils.dates.days_ago(1))
hql_query = """
CREATE TABLE IF NOT EXISTS mydb.test_af(
`test` int);
insert into mydb.test_af values (1);
"""
hive_task = HiveOperator(hql = hql_query,
task_id = "hive_script_task",
hive_cli_conn_id = "hive_local",
dag = dag_hive
)
hive_task
if __name__ == '__main__ ':
dag_hive.cli()
日志:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/hive/operators/hive.py", line 156, in execute
self.hook.run_cli(hql=self.hql, schema=self.schema, hive_conf=self.hiveconfs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/hive/hooks/hive.py", line 249, in run_cli
hive_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=tmp_dir, close_fds=True
File "/usr/local/lib/python3.7/subprocess.py", line 800, in __init__
restore_signals, start_new_session)
File "/usr/local/lib/python3.7/subprocess.py", line 1551, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'beeline': 'beeline'
[2021-08-19 12:22:04,291] {taskinstance.py:1551} INFO - Marking task as FAILED. dag_id=***_script, task_id=***_script_task, execution_date=20210819T122100, start_date=20210819T122204, end_date=20210819T122204
[2021-08-19 12:22:04,323] {local_task_job.py:149} INFO - Task exited with return code 1
如果有人能帮助我就好了。 提前致谢...
您需要在 Apache Airflow 镜像中安装 beeline。这取决于您使用的 Airflow 图像,但 Airflow 的“参考”图像仅包含最常见的提供程序,而配置单元不在其中。您应该扩展或自定义图像以添加直线以在气流图像中的路径中可用。
您可以在 https://airflow.apache.org/docs/docker-stack/build.html#adding-new-apt-package
阅读有关 extending/customising 气流图像的更多信息