如何使用 NiFi 处理器获取 csv 文件行内容并将其保存到数据库中?

How can I get the csv file line content and save it to the database using NiFi processors?

查看示例场景:

csv 文件内容

john|doe|1
stacy|doe|2

数据库字段

fname | lname | list_index | raw_text

我的 objective 是使用 nifi 处理器提取 CSV 文件内容并将其保存到数据库中。请参阅下面数据库中插入的示例输出,包括 raw_text 列中插入的记录。

 fname | lname | list_index | raw_text
  john | doe   |     1      | "john|doe|1 " 
 stacy | doe   |     2      | "stacy|doe|2"

如果你需要做大量的数据定制那么你可以使用 ExecuteScript 处理器来做数据操作,管道应该是 -

ListFile -> FetchFile -> ExecuteScript -> PutDatabaseRecord

如下配置您的 ExecuteScript

脚本引擎:python

脚本正文:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import sys
import os
import datetime


class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):

        with wrap(inputStream) as f:
            lines = f.readlines()
            updated_lines = []
            header_line = 'fname|lname|list_index|raw_text' + '\n'
            updated_lines.append(header_line)
            for line in lines:
                updated_line = line.strip() + '|"' + line.strip() + '"' + '\n'
                updated_lines.append(updated_line)

            with wrap(outputStream, 'w') as filehandle:
                filehandle.writelines("%s" % line for line in updated_lines)

flow_file = session.get()

if flow_file:
    try:
        session.write(flow_file, PyStreamCallback())
        session.transfer(flow_file, ExecuteScript.REL_SUCCESS)

    except Exception as e:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        excp = str(exc_type) + str(fname)+ str(exc_tb.tb_lineno)
        attrMap = {'exception': str(excp)}
        flow_file = session.putAllAttributes(flow_file, attrMap)
        session.transfer(flow_file, ExecuteScript.REL_FAILURE)

相应地配置 PutDatabaseRecord