使用 Apache Airflow 启动 Apache Nifi - 如何生成流文件

Starting Apache Nifi with Apache Airflow - how to generate a flow file

我为我的 ETL 管道设置了 Apache Nifi,并希望使用 Apache Airflow 启动(并稍后监控)特定处理器。

我看到两种在气流 DAG 中实现此目的的方法:

  1. 从头开始生成流文件并将其插入 Nifi queue/processor
  2. 触发“生成流文件处理器”以创建流文件,然后将其插入队列

我查看了 airflow 官方文档并知道如何使用 PythonOperator:

编写(基本)DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(
    dag_id='python_nifi_operator',
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['example'],
)

def generate_flow_file():
    """Generate and insert a flow file"""
    # connect to Nifi
    pass
    # access processor
    pass
    # create flow file
    pass 
    # insert flow file 
    pass 
    return 'Success-message for the log'

run_this = PythonOperator(
    task_id='generate_a_custom_flow_file',
    python_callable=generate_flow_file,
    dag=dag,
)

问题是:如何使用 Python 生成流文件? 我一直在寻找一个库,但我只找到其他带有代码摘录的 Whosebug 帖子这对我没有帮助,我什至找不到他们使用的软件包的文档。欢迎任何提示/完整的代码示例/链接。

没有 API 到 'generate' FlowFile,拥有一个 FlowFile 真的没有多大意义。

就是说,您可以使用 GenerateFlowFile 处理器和 stop/start 它与 REST API - 之前有问题询问如何使用 API https://nifi.apache.org/docs/nifi-docs/rest-api/index.html https://pypi.org/project/nipyapi/

或者你可以让一个 ListenHTTP/HandleHttpRequest 在端点上监听 Airflow,你可以在 Python 中通过向配置的端点发送一个空的 HTTP 请求来触发它,从而生成一个 FlowFile