在 airflow bashOperator 中使用 HBase put 命令

Using HBase put command in airflow bashOperator

我正在尝试使用 Airflow BashOperator 任务将一些数据插入 Hbase table。我尝试先调用 hbase shell 然后将一些数据插入我的 table:

logg_data_to_hbase = BashOperator(
    task_id='data_to_hbase',
    dag=test_dag,
    bash_command="hbase shell && put 'tablename', 'rowname','columnvalue', 1000")

我收到一条错误消息说错误 - Bash 命令失败。

[2022-01-06 11:01:17,077] {bash_operator.py:100} INFO - Temporary script location: /tmp/airflowtmpcKRT8C/data_to_hbaseY7y25j

[2022-01-06 11:01:17,077] {bash_operator.py:110} INFO - Running command: hbase shell && put 'tablename', 'rowname','columnvalue', 1000

[2022-01-06 11:01:17,091] {bash_operator.py:119} INFO - Output:

[2022-01-06 11:01:28,659] {bash_operator.py:123} INFO - /tmp/airflowtmpcKRT8C/data_to_hbaseY7y25j: line 1: put: command not found

[2022-01-06 11:01:28,660] {bash_operator.py:127} INFO - Command exited with return code 127

[2022-01-06 11:01:28,672] {models.py:1788} ERROR - Bash command failed

Traceback (most recent call last): File "/opt/python-2.7.16-AF-1.10.2-XXX/lib/python2.7/site-packages/airflow/models.py", line 1652, in _run_raw_task result = task_copy.execute(context=context) File "/opt/python-2.7.16-AF-1.10.2-XXX/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 131, in execute raise AirflowException("Bash command failed") AirflowException: Bash command failed

我需要更改什么才能执行 put 命令?

最后一行需要更改。

bash_command = "echo \"put 'tablename', 'rowname','columnvalue', '1000'\" | hbase shell"

这样 put 命令在 | 的帮助下在 hbase shell 中执行。