如何使用 NiFi 处理器获取 csv 文件行内容并将其保存到数据库中?
How can I get the csv file line content and save it to the database using NiFi processors?
查看示例场景:
csv 文件内容
john|doe|1
stacy|doe|2
数据库字段
fname | lname | list_index | raw_text
我的 objective 是使用 nifi 处理器提取 CSV 文件内容并将其保存到数据库中。请参阅下面数据库中插入的示例输出,包括 raw_text 列中插入的记录。
fname | lname | list_index | raw_text
john | doe | 1 | "john|doe|1 "
stacy | doe | 2 | "stacy|doe|2"
如果你需要做大量的数据定制那么你可以使用 ExecuteScript
处理器来做数据操作,管道应该是 -
ListFile -> FetchFile -> ExecuteScript -> PutDatabaseRecord
如下配置您的 ExecuteScript
,
脚本引擎:python
脚本正文:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import sys
import os
import datetime
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
with wrap(inputStream) as f:
lines = f.readlines()
updated_lines = []
header_line = 'fname|lname|list_index|raw_text' + '\n'
updated_lines.append(header_line)
for line in lines:
updated_line = line.strip() + '|"' + line.strip() + '"' + '\n'
updated_lines.append(updated_line)
with wrap(outputStream, 'w') as filehandle:
filehandle.writelines("%s" % line for line in updated_lines)
flow_file = session.get()
if flow_file:
try:
session.write(flow_file, PyStreamCallback())
session.transfer(flow_file, ExecuteScript.REL_SUCCESS)
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
excp = str(exc_type) + str(fname)+ str(exc_tb.tb_lineno)
attrMap = {'exception': str(excp)}
flow_file = session.putAllAttributes(flow_file, attrMap)
session.transfer(flow_file, ExecuteScript.REL_FAILURE)
相应地配置 PutDatabaseRecord
。
查看示例场景:
csv 文件内容
john|doe|1
stacy|doe|2
数据库字段
fname | lname | list_index | raw_text
我的 objective 是使用 nifi 处理器提取 CSV 文件内容并将其保存到数据库中。请参阅下面数据库中插入的示例输出,包括 raw_text 列中插入的记录。
fname | lname | list_index | raw_text
john | doe | 1 | "john|doe|1 "
stacy | doe | 2 | "stacy|doe|2"
如果你需要做大量的数据定制那么你可以使用 ExecuteScript
处理器来做数据操作,管道应该是 -
ListFile -> FetchFile -> ExecuteScript -> PutDatabaseRecord
如下配置您的 ExecuteScript
,
脚本引擎:python
脚本正文:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import sys
import os
import datetime
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
with wrap(inputStream) as f:
lines = f.readlines()
updated_lines = []
header_line = 'fname|lname|list_index|raw_text' + '\n'
updated_lines.append(header_line)
for line in lines:
updated_line = line.strip() + '|"' + line.strip() + '"' + '\n'
updated_lines.append(updated_line)
with wrap(outputStream, 'w') as filehandle:
filehandle.writelines("%s" % line for line in updated_lines)
flow_file = session.get()
if flow_file:
try:
session.write(flow_file, PyStreamCallback())
session.transfer(flow_file, ExecuteScript.REL_SUCCESS)
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
excp = str(exc_type) + str(fname)+ str(exc_tb.tb_lineno)
attrMap = {'exception': str(excp)}
flow_file = session.putAllAttributes(flow_file, attrMap)
session.transfer(flow_file, ExecuteScript.REL_FAILURE)
相应地配置 PutDatabaseRecord
。