使用 Python 创建 Nifi PutSQL 处理器

Create Nifi PutSQL Processor with Python

我想将我的 Nifi-ETL 管道(主要是 PUTSQL 处理器)从我的开发实例转移到我的 Apache Nifi 生产实例,最好使用 Python 以实现可重用性。

我想我会试一试,只是尝试复制粘贴它们。

  1. 在 /nifi-api/processors/{id}
  2. 上使用 GET 请求从 DEV 获取处理器
  3. PUT 处理器到 /nifi-api/processors/{id}
  4. 上的 PUT 请求的 PRD Nifi 实例

代码:

# GET processor and parse to JSON
response = requests.get(nifi_url_dev + 'processors/' + proc_id
                            , headers=header)
processor = json.loads(response.content)


# PUT processor 
processor['revision']['version'] = 0 # reset version
payload = json.dumps(processor).encode('utf8')
response = requests.put(nifi_url_prd + 'processors/' + proc_id
                        , data=payload
                        , headers=header)

PUT 失败 409 HTTP Conflict Error。我猜这是因为我试图将资源放在一个 URI 上,该 URI 期望资源已经存在于那个地方。

documentation 在处理器 API 旁边列出了“创建处理器、设置属性、计划”,但是在查看它时,没有专门用于创建的 API - 我决定使用 PUT,因为它说“更新处理器”,这是我在其中看到的最接近从头开始创建新处理器的内容。

您对如何使用 Python 创建处理器有什么想法吗?是复制现有的还是创建全新的?

所以问题是 API 文档有点误导... 创建新处理器的正确 API 是 process-groups/{process_group_id}/processors 。尽管有描述,它也列在“进程组”下的文档中,而不是“处理器”下。

以下对我有用 - 虽然有必要稍微调整 json:主要是删除开发环境的任何 ID。

# GET processor and parse to JSON
response = requests.get(nifi_url_dev + 'processors/' + proc_id
                            , headers=header)
processor = json.loads(response.content)


# PUT processor 
del processor["id"]  # Processor ID cannot be specified.
del processor["uri"]  # Processor ID cannot be specified.
del processor["component"]["id"]  # Processor ID cannot be specified.

del processor["component"]["parentGroupId"]  # Parent process group id should not be specified.
# If specified, the parent process group id must be the same as specified in the POST-URI.

processor['revision']['version'] = 0 # reset version
payload = json.dumps(processor).encode('utf8')
response = requests.post(nifi_url_prd + 'process-groups/' + process_group + '/processors'
                        , data=payload
                        , headers=header)