使用 Python 创建 Nifi PutSQL 处理器
Create Nifi PutSQL Processor with Python
我想将我的 Nifi-ETL 管道(主要是 PUTSQL 处理器)从我的开发实例转移到我的 Apache Nifi
生产实例,最好使用 Python
以实现可重用性。
我想我会试一试,只是尝试复制粘贴它们。
- 在 /nifi-api/processors/{id}
上使用 GET 请求从 DEV 获取处理器
- PUT 处理器到 /nifi-api/processors/{id}
上的 PUT 请求的 PRD Nifi 实例
代码:
# GET processor and parse to JSON
response = requests.get(nifi_url_dev + 'processors/' + proc_id
, headers=header)
processor = json.loads(response.content)
# PUT processor
processor['revision']['version'] = 0 # reset version
payload = json.dumps(processor).encode('utf8')
response = requests.put(nifi_url_prd + 'processors/' + proc_id
, data=payload
, headers=header)
PUT 失败 409 HTTP Conflict Error
。我猜这是因为我试图将资源放在一个 URI 上,该 URI 期望资源已经存在于那个地方。
documentation 在处理器 API 旁边列出了“创建处理器、设置属性、计划”,但是在查看它时,没有专门用于创建的 API - 我决定使用 PUT,因为它说“更新处理器”,这是我在其中看到的最接近从头开始创建新处理器的内容。
您对如何使用 Python 创建处理器有什么想法吗?是复制现有的还是创建全新的?
所以问题是 API 文档有点误导...
创建新处理器的正确 API 是 process-groups/{process_group_id}/processors
。尽管有描述,它也列在“进程组”下的文档中,而不是“处理器”下。
以下对我有用 - 虽然有必要稍微调整 json:主要是删除开发环境的任何 ID。
# GET processor and parse to JSON
response = requests.get(nifi_url_dev + 'processors/' + proc_id
, headers=header)
processor = json.loads(response.content)
# PUT processor
del processor["id"] # Processor ID cannot be specified.
del processor["uri"] # Processor ID cannot be specified.
del processor["component"]["id"] # Processor ID cannot be specified.
del processor["component"]["parentGroupId"] # Parent process group id should not be specified.
# If specified, the parent process group id must be the same as specified in the POST-URI.
processor['revision']['version'] = 0 # reset version
payload = json.dumps(processor).encode('utf8')
response = requests.post(nifi_url_prd + 'process-groups/' + process_group + '/processors'
, data=payload
, headers=header)
我想将我的 Nifi-ETL 管道(主要是 PUTSQL 处理器)从我的开发实例转移到我的 Apache Nifi
生产实例,最好使用 Python
以实现可重用性。
我想我会试一试,只是尝试复制粘贴它们。
- 在 /nifi-api/processors/{id} 上使用 GET 请求从 DEV 获取处理器
- PUT 处理器到 /nifi-api/processors/{id} 上的 PUT 请求的 PRD Nifi 实例
代码:
# GET processor and parse to JSON
response = requests.get(nifi_url_dev + 'processors/' + proc_id
, headers=header)
processor = json.loads(response.content)
# PUT processor
processor['revision']['version'] = 0 # reset version
payload = json.dumps(processor).encode('utf8')
response = requests.put(nifi_url_prd + 'processors/' + proc_id
, data=payload
, headers=header)
PUT 失败 409 HTTP Conflict Error
。我猜这是因为我试图将资源放在一个 URI 上,该 URI 期望资源已经存在于那个地方。
documentation 在处理器 API 旁边列出了“创建处理器、设置属性、计划”,但是在查看它时,没有专门用于创建的 API - 我决定使用 PUT,因为它说“更新处理器”,这是我在其中看到的最接近从头开始创建新处理器的内容。
您对如何使用 Python 创建处理器有什么想法吗?是复制现有的还是创建全新的?
所以问题是 API 文档有点误导...
创建新处理器的正确 API 是 process-groups/{process_group_id}/processors
。尽管有描述,它也列在“进程组”下的文档中,而不是“处理器”下。
以下对我有用 - 虽然有必要稍微调整 json:主要是删除开发环境的任何 ID。
# GET processor and parse to JSON
response = requests.get(nifi_url_dev + 'processors/' + proc_id
, headers=header)
processor = json.loads(response.content)
# PUT processor
del processor["id"] # Processor ID cannot be specified.
del processor["uri"] # Processor ID cannot be specified.
del processor["component"]["id"] # Processor ID cannot be specified.
del processor["component"]["parentGroupId"] # Parent process group id should not be specified.
# If specified, the parent process group id must be the same as specified in the POST-URI.
processor['revision']['version'] = 0 # reset version
payload = json.dumps(processor).encode('utf8')
response = requests.post(nifi_url_prd + 'process-groups/' + process_group + '/processors'
, data=payload
, headers=header)