NIFI - 将 binary.zip 作为 varbinary 上传到 SQL 服务器
NIFI - upload binary.zip to SQL Server as varbinary
我正在尝试将 binary.zip 作为 varbinary
类型的列内容上传到 SQL 服务器。
目标 Table:
CREATE TABLE myTable ( zipFile varbinary(MAX) );
我的NIFI流程很简单:
-> GetFile:
filter:binary.zip
-> UpdateAttribute:<br>
sql.args.1.type = -3 # as varbinary according to JDBC types enumeration
sql.args.1.value = ??? # I don't know what to put here ! (I've triying everything!)
sql.args.1.format= ??? # Is It required? I triyed 'hex'
-> PutSQL:<br>
SQLstatement= INSERT INTO myTable (zip_file) VALUES (?);
我应该在 sql.args.1.value
中输入什么?
我认为它应该是流文件负载,但它可以作为 PutSQL 中 INSERT 的一部分工作?暂时没有!
谢谢!
解决方案更新:
基于https://issues.apache.org/jira/browse/NIFI-8052
(假设我正在发送一些数据作为属性参数)
import java.nio.charset.StandardCharsets
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
def flowFile = session.get()
def lookup = context.controllerServiceLookup
def dbServiceName = flowFile.getAttribute('DatabaseConnectionPoolName')
def tableName = flowFile.getAttribute('table_name')
def fieldName = flowFile.getAttribute('field_name')
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find
{ cs -> lookup.getControllerServiceName(cs) == dbServiceName }
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
def sql = new Sql(conn)
flowFile.read{ rawIn->
def parms = [rawIn ]
sql.executeInsert "INSERT INTO " + tableName + " (date, "+ fieldName + ") VALUES (CAST( GETDATE() AS Date ) , ?) ", parms
}
conn?.close()
if(!flowFile) return
session.transfer(flowFile, REL_SUCCESS)
session.commit()
也许有一种 nifi 本机方法来插入 blob,但是您可以使用 ExecuteGroovyScript
而不是 UpdateAttribute 和 PutSQL
在处理器级别添加 SQL.mydb
参数并将其 link 添加到所需的 DBCP 池。
使用以下脚本正文:
def ff=session.get()
if(!ff)return
def statement = "INSERT INTO myTable (zip_file) VALUES (:p_zip_file)"
def params = [
p_zip_file: SQL.mydb.BLOB(ff.read()) //cast flow file content as BLOB sql type
]
SQL.mydb.executeInsert(params, statement) //committed automatically on flow file success
//transfer to success without changes
REL_SUCCESS << ff
在脚本中 SQL.mydb
是对 groovy.sql.Sql oblject
的引用
我正在尝试将 binary.zip 作为 varbinary
类型的列内容上传到 SQL 服务器。
目标 Table:
CREATE TABLE myTable ( zipFile varbinary(MAX) );
我的NIFI流程很简单:
-> GetFile:
filter:binary.zip
-> UpdateAttribute:<br>
sql.args.1.type = -3 # as varbinary according to JDBC types enumeration
sql.args.1.value = ??? # I don't know what to put here ! (I've triying everything!)
sql.args.1.format= ??? # Is It required? I triyed 'hex'
-> PutSQL:<br>
SQLstatement= INSERT INTO myTable (zip_file) VALUES (?);
我应该在 sql.args.1.value
中输入什么?
我认为它应该是流文件负载,但它可以作为 PutSQL 中 INSERT 的一部分工作?暂时没有!
谢谢!
解决方案更新:
基于https://issues.apache.org/jira/browse/NIFI-8052 (假设我正在发送一些数据作为属性参数)
import java.nio.charset.StandardCharsets
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
def flowFile = session.get()
def lookup = context.controllerServiceLookup
def dbServiceName = flowFile.getAttribute('DatabaseConnectionPoolName')
def tableName = flowFile.getAttribute('table_name')
def fieldName = flowFile.getAttribute('field_name')
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find
{ cs -> lookup.getControllerServiceName(cs) == dbServiceName }
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
def sql = new Sql(conn)
flowFile.read{ rawIn->
def parms = [rawIn ]
sql.executeInsert "INSERT INTO " + tableName + " (date, "+ fieldName + ") VALUES (CAST( GETDATE() AS Date ) , ?) ", parms
}
conn?.close()
if(!flowFile) return
session.transfer(flowFile, REL_SUCCESS)
session.commit()
也许有一种 nifi 本机方法来插入 blob,但是您可以使用 ExecuteGroovyScript
而不是 UpdateAttribute 和 PutSQL
在处理器级别添加 SQL.mydb
参数并将其 link 添加到所需的 DBCP 池。
使用以下脚本正文:
def ff=session.get()
if(!ff)return
def statement = "INSERT INTO myTable (zip_file) VALUES (:p_zip_file)"
def params = [
p_zip_file: SQL.mydb.BLOB(ff.read()) //cast flow file content as BLOB sql type
]
SQL.mydb.executeInsert(params, statement) //committed automatically on flow file success
//transfer to success without changes
REL_SUCCESS << ff
在脚本中 SQL.mydb
是对 groovy.sql.Sql oblject