从 SSHOperator 解码 UTF-8 编码的 Xcom 值
Decode UTF-8 encoded Xcom value from SSHOperator
我有两个 Airflow 任务要进行通信。 SSHOperator return 打印最后一行,在本例中为“remote_IP”。但是,SSHOperator 的 return 值是使用 UTF-8 编码的。
Read_remote_IP = SSHOperator(
task_id='Read_remote_IP',
ssh_hook=hook,
command="echo remote_IP ",
)
Read_SSH_Output = BashOperator(
task_id='Read_SSH_Output',
bash_command="echo {{ti.xcom_pull(task_ids='Read_remote_IP')}} ",
)
如何对 SSHOperator Read_remote_IP
return 值进行非编码?另外,BashOperator Read_SSH_Output
如何解码编码值?
我目前的解决方案是引入另一个Python运算符来将字符串转换回来,欢迎提供其他解决方案。
import base64
from airflow.operators.python import PythonOperator
def _decode_message(task_name, ti):
message = ti.xcom_pull(task_ids=task_name)
print(message)
return base64.b64decode(message).decode()
Decode_SSH_Output = PythonOperator(
task_id='Decode_SSH_Output',
python_callable=_decode_message,
op_args=['Read_remote_IP'],
)
我有两个 Airflow 任务要进行通信。 SSHOperator return 打印最后一行,在本例中为“remote_IP”。但是,SSHOperator 的 return 值是使用 UTF-8 编码的。
Read_remote_IP = SSHOperator(
task_id='Read_remote_IP',
ssh_hook=hook,
command="echo remote_IP ",
)
Read_SSH_Output = BashOperator(
task_id='Read_SSH_Output',
bash_command="echo {{ti.xcom_pull(task_ids='Read_remote_IP')}} ",
)
如何对 SSHOperator Read_remote_IP
return 值进行非编码?另外,BashOperator Read_SSH_Output
如何解码编码值?
我目前的解决方案是引入另一个Python运算符来将字符串转换回来,欢迎提供其他解决方案。
import base64
from airflow.operators.python import PythonOperator
def _decode_message(task_name, ti):
message = ti.xcom_pull(task_ids=task_name)
print(message)
return base64.b64decode(message).decode()
Decode_SSH_Output = PythonOperator(
task_id='Decode_SSH_Output',
python_callable=_decode_message,
op_args=['Read_remote_IP'],
)