Nifi 执行脚本外部库不工作
Nifi execute script extrenal library not working
我写了一个具有jedis连接池功能的jar,通过它我在nifi中编写了groovy脚本用于redis位置搜索。但它的行为很奇怪,有时它工作有时不工作。
Redis.java
public class Redis {
private static Object staticLock = new Object();
private static JedisPool pool;
private static String host;
private static int port;
private static int connectTimeout;
private static int operationTimeout;
private static String password;
private static JedisPoolConfig config;
public static void initializeSettings(String host, int port, String password, int connectTimeout, int operationTimeout) {
Redis.host = host;
Redis.port = port;
Redis.password = password;
Redis.connectTimeout = connectTimeout;
Redis.operationTimeout = operationTimeout;
}
public static JedisPool getPoolInstance() {
if (pool == null) { // avoid synchronization lock if initialization has already happened
synchronized(staticLock) {
if (pool == null) { // don't re-initialize if another thread beat us to it.
JedisPoolConfig poolConfig = getPoolConfig();
boolean useSsl = port == 6380 ? true : false;
int db = 0;
String clientName = "MyClientName"; // null means use default
SSLSocketFactory sslSocketFactory = null; // null means use default
SSLParameters sslParameters = null; // null means use default
HostnameVerifier hostnameVerifier = new SimpleHostNameVerifier(host);
pool = new JedisPool(poolConfig, host, port);
//(poolConfig, host, port, connectTimeout,operationTimeout,password, db,
// clientName, useSsl, sslSocketFactory, sslParameters, hostnameVerifier);
}
}
}
return pool;
}
public static JedisPoolConfig getPoolConfig() {
if (config == null) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
int maxConnections = 200;
poolConfig.setMaxTotal(maxConnections);
poolConfig.setMaxIdle(maxConnections);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setMaxWaitMillis(operationTimeout);
poolConfig.setMinIdle(50);
Redis.config = poolConfig;
}
return config;
}
public static String getPoolCurrentUsage()
{
JedisPool jedisPool = getPoolInstance();
JedisPoolConfig poolConfig = getPoolConfig();
int active = jedisPool.getNumActive();
int idle = jedisPool.getNumIdle();
int total = active + idle;
String log = String.format(
"JedisPool: Active=%d, Idle=%d, Waiters=%d, total=%d, maxTotal=%d, minIdle=%d, maxIdle=%d",
active,
idle,
jedisPool.getNumWaiters(),
total,
poolConfig.getMaxTotal(),
poolConfig.getMinIdle(),
poolConfig.getMaxIdle()
);
return log;
}
private static class SimpleHostNameVerifier implements HostnameVerifier {
private String exactCN;
private String wildCardCN;
public SimpleHostNameVerifier(String cacheHostname)
{
exactCN = "CN=" + cacheHostname;
wildCardCN = "CN=*" + cacheHostname.substring(cacheHostname.indexOf('.'));
}
public boolean verify(String s, SSLSession sslSession) {
try {
String cn = sslSession.getPeerPrincipal().getName();
return cn.equalsIgnoreCase(wildCardCN) || cn.equalsIgnoreCase(exactCN);
} catch (SSLPeerUnverifiedException ex) {
return false;
}
}
}
}
自定义函数:
public class Functions {
SecureRandom rand = new SecureRandom();
private static final String UTF8= "UTF-8";
public static JedisPool jedisPool=null;
public static String searchPlace(double lattitude,double longitude) {
try(Jedis jedis = jedisPool.getResource()) {
}
catch(Exception e){
log.error('execption',e);
}
}
}
Groovy 脚本:
import org.apache.nifi.processor.ProcessContext;
import com.customlib.functions.*;
def flowFile = session.get();
if (flowFile == null) {
return;
}
def flowFiles = [] as List<FlowFile>
def failflowFiles = [] as List<FlowFile>
def input=null;
def data=null;
static onStart(ProcessContext context){
Redis.initializeSettings("host", 6379, null,0,0);
Functions.jedisPool= Redis.getPoolInstance();
}
static onStop(ProcessContext context){
Functions.jedisPool.destroy();
}
try{
log.warn('is jedispool connected::::'+Functions.jedisPool.isClosed());
def inputStream = session.read(flowFile)
def writer = new StringWriter();
IOUtils.copy(inputStream, writer, "UTF-8");
data=writer.toString();
input = new JsonSlurper().parseText( data );
log.warn('place is::::'+Functions.getLocationByLatLong(input["data"]["lat"], input["data"]["longi"]);
.......
...........
}
catch(Exception e){
}
newFlowFile = session.write(newFlowFile, { outputStream ->
outputStream.write( data.getBytes(StandardCharsets.UTF_8) )
} as OutputStreamCallback)
failflowFiles<< newFlowFile;
}
session.transfer(flowFiles, REL_SUCCESS)
session.transfer(failflowFiles, REL_FAILURE)
session.remove(flowFile)
nifi 在 3 节点集群中。函数lib配置在groovyscript模块directory.In上面groovy脚本处理器,日志语句是jedispool connected:::::有时是打印 false
,有时 true
但是在第一次部署 jar 之后每次都可以。但后来它是不可预测的,我没有得到代码中的错误。 groovy脚本将如何加载 jar。如何使用 groovy 脚本实现基于库的搜索。
Redis.pool 在初始化后永远不会得到 null。您正在调用 pool.destroy() 但未将其设置为空。
getPoolInstance() 检查池是否为空,然后才创建一个新池。
我看不出有任何理由让 2 个变量保存对同一个池的引用:在 Redis 和函数中 class。
我写了一个具有jedis连接池功能的jar,通过它我在nifi中编写了groovy脚本用于redis位置搜索。但它的行为很奇怪,有时它工作有时不工作。
Redis.java
public class Redis {
private static Object staticLock = new Object();
private static JedisPool pool;
private static String host;
private static int port;
private static int connectTimeout;
private static int operationTimeout;
private static String password;
private static JedisPoolConfig config;
public static void initializeSettings(String host, int port, String password, int connectTimeout, int operationTimeout) {
Redis.host = host;
Redis.port = port;
Redis.password = password;
Redis.connectTimeout = connectTimeout;
Redis.operationTimeout = operationTimeout;
}
public static JedisPool getPoolInstance() {
if (pool == null) { // avoid synchronization lock if initialization has already happened
synchronized(staticLock) {
if (pool == null) { // don't re-initialize if another thread beat us to it.
JedisPoolConfig poolConfig = getPoolConfig();
boolean useSsl = port == 6380 ? true : false;
int db = 0;
String clientName = "MyClientName"; // null means use default
SSLSocketFactory sslSocketFactory = null; // null means use default
SSLParameters sslParameters = null; // null means use default
HostnameVerifier hostnameVerifier = new SimpleHostNameVerifier(host);
pool = new JedisPool(poolConfig, host, port);
//(poolConfig, host, port, connectTimeout,operationTimeout,password, db,
// clientName, useSsl, sslSocketFactory, sslParameters, hostnameVerifier);
}
}
}
return pool;
}
public static JedisPoolConfig getPoolConfig() {
if (config == null) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
int maxConnections = 200;
poolConfig.setMaxTotal(maxConnections);
poolConfig.setMaxIdle(maxConnections);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setMaxWaitMillis(operationTimeout);
poolConfig.setMinIdle(50);
Redis.config = poolConfig;
}
return config;
}
public static String getPoolCurrentUsage()
{
JedisPool jedisPool = getPoolInstance();
JedisPoolConfig poolConfig = getPoolConfig();
int active = jedisPool.getNumActive();
int idle = jedisPool.getNumIdle();
int total = active + idle;
String log = String.format(
"JedisPool: Active=%d, Idle=%d, Waiters=%d, total=%d, maxTotal=%d, minIdle=%d, maxIdle=%d",
active,
idle,
jedisPool.getNumWaiters(),
total,
poolConfig.getMaxTotal(),
poolConfig.getMinIdle(),
poolConfig.getMaxIdle()
);
return log;
}
private static class SimpleHostNameVerifier implements HostnameVerifier {
private String exactCN;
private String wildCardCN;
public SimpleHostNameVerifier(String cacheHostname)
{
exactCN = "CN=" + cacheHostname;
wildCardCN = "CN=*" + cacheHostname.substring(cacheHostname.indexOf('.'));
}
public boolean verify(String s, SSLSession sslSession) {
try {
String cn = sslSession.getPeerPrincipal().getName();
return cn.equalsIgnoreCase(wildCardCN) || cn.equalsIgnoreCase(exactCN);
} catch (SSLPeerUnverifiedException ex) {
return false;
}
}
}
}
自定义函数:
public class Functions {
SecureRandom rand = new SecureRandom();
private static final String UTF8= "UTF-8";
public static JedisPool jedisPool=null;
public static String searchPlace(double lattitude,double longitude) {
try(Jedis jedis = jedisPool.getResource()) {
}
catch(Exception e){
log.error('execption',e);
}
}
}
Groovy 脚本:
import org.apache.nifi.processor.ProcessContext;
import com.customlib.functions.*;
def flowFile = session.get();
if (flowFile == null) {
return;
}
def flowFiles = [] as List<FlowFile>
def failflowFiles = [] as List<FlowFile>
def input=null;
def data=null;
static onStart(ProcessContext context){
Redis.initializeSettings("host", 6379, null,0,0);
Functions.jedisPool= Redis.getPoolInstance();
}
static onStop(ProcessContext context){
Functions.jedisPool.destroy();
}
try{
log.warn('is jedispool connected::::'+Functions.jedisPool.isClosed());
def inputStream = session.read(flowFile)
def writer = new StringWriter();
IOUtils.copy(inputStream, writer, "UTF-8");
data=writer.toString();
input = new JsonSlurper().parseText( data );
log.warn('place is::::'+Functions.getLocationByLatLong(input["data"]["lat"], input["data"]["longi"]);
.......
...........
}
catch(Exception e){
}
newFlowFile = session.write(newFlowFile, { outputStream ->
outputStream.write( data.getBytes(StandardCharsets.UTF_8) )
} as OutputStreamCallback)
failflowFiles<< newFlowFile;
}
session.transfer(flowFiles, REL_SUCCESS)
session.transfer(failflowFiles, REL_FAILURE)
session.remove(flowFile)
nifi 在 3 节点集群中。函数lib配置在groovyscript模块directory.In上面groovy脚本处理器,日志语句是jedispool connected:::::有时是打印 false
,有时 true
但是在第一次部署 jar 之后每次都可以。但后来它是不可预测的,我没有得到代码中的错误。 groovy脚本将如何加载 jar。如何使用 groovy 脚本实现基于库的搜索。
Redis.pool 在初始化后永远不会得到 null。您正在调用 pool.destroy() 但未将其设置为空。
getPoolInstance() 检查池是否为空,然后才创建一个新池。
我看不出有任何理由让 2 个变量保存对同一个池的引用:在 Redis 和函数中 class。