Nifi 执行脚本外部库不工作

Nifi execute script extrenal library not working

我写了一个具有jedis连接池功能的jar,通过它我在nifi中编写了groovy脚本用于redis位置搜索。但它的行为很奇怪,有时它工作有时不工作。

Redis.java

public class Redis {
    private static Object staticLock = new Object();
    private static JedisPool pool;
    private static String host;
    private static int port; 
    private static int connectTimeout; 
    private static int operationTimeout; 
    private static String password;
    private static JedisPoolConfig config;

    public static void initializeSettings(String host, int port, String password, int connectTimeout, int operationTimeout) {
        Redis.host = host;
        Redis.port = port;
        Redis.password = password;
        Redis.connectTimeout = connectTimeout;
        Redis.operationTimeout = operationTimeout;
    }
    
    
   

    public static JedisPool getPoolInstance() {
        
        if (pool == null) { // avoid synchronization lock if initialization has already happened
            synchronized(staticLock) {
                if (pool == null) { // don't re-initialize if another thread beat us to it.
                    JedisPoolConfig poolConfig = getPoolConfig();
                    boolean useSsl = port == 6380 ? true : false;
                    int db = 0;
                    String clientName = "MyClientName"; // null means use default
                    SSLSocketFactory sslSocketFactory = null; // null means use default
                    SSLParameters sslParameters = null; // null means use default
                    HostnameVerifier hostnameVerifier = new SimpleHostNameVerifier(host);
                    pool = new JedisPool(poolConfig, host, port);
                            
                            //(poolConfig, host, port, connectTimeout,operationTimeout,password, db,
//                            clientName, useSsl, sslSocketFactory, sslParameters, hostnameVerifier);
                }
            }
        }
        return pool;
    }

    public static JedisPoolConfig getPoolConfig() {
        if (config == null) {
            JedisPoolConfig poolConfig = new JedisPoolConfig();
            int maxConnections = 200;
            poolConfig.setMaxTotal(maxConnections);
            poolConfig.setMaxIdle(maxConnections);
            poolConfig.setBlockWhenExhausted(true);
            poolConfig.setMaxWaitMillis(operationTimeout);
            poolConfig.setMinIdle(50);
            Redis.config = poolConfig;
        }

        return config;
    }

    public static String getPoolCurrentUsage()
    {
        JedisPool jedisPool = getPoolInstance();
        JedisPoolConfig poolConfig = getPoolConfig();

        int active = jedisPool.getNumActive();
        int idle = jedisPool.getNumIdle();
        int total = active + idle;
        String log = String.format(
                "JedisPool: Active=%d, Idle=%d, Waiters=%d, total=%d, maxTotal=%d, minIdle=%d, maxIdle=%d",
                active,
                idle,
                jedisPool.getNumWaiters(),
                total,
                poolConfig.getMaxTotal(),
                poolConfig.getMinIdle(),
                poolConfig.getMaxIdle()
        );

        return log;
    }

    private static class SimpleHostNameVerifier implements HostnameVerifier {

        private String exactCN;
        private String wildCardCN;
        public SimpleHostNameVerifier(String cacheHostname)
        {
            exactCN = "CN=" + cacheHostname;
            wildCardCN = "CN=*" + cacheHostname.substring(cacheHostname.indexOf('.'));
        }

        public boolean verify(String s, SSLSession sslSession) {
            try {
                String cn = sslSession.getPeerPrincipal().getName();
                return cn.equalsIgnoreCase(wildCardCN) || cn.equalsIgnoreCase(exactCN);
            } catch (SSLPeerUnverifiedException ex) {
                return false;
            }
        }
    }
}

自定义函数:

public class Functions {

    SecureRandom rand = new SecureRandom(); 
    private static final String UTF8= "UTF-8";


    public static JedisPool jedisPool=null;

public static String searchPlace(double lattitude,double longitude) {

try(Jedis jedis = jedisPool.getResource()) {
}
catch(Exception e){
log.error('execption',e);
}
}

}

Groovy 脚本:

    import org.apache.nifi.processor.ProcessContext;
    import com.customlib.functions.*;
    
    def flowFile = session.get();
    if (flowFile == null) {
        return;
    }
    def flowFiles = [] as List<FlowFile>
    def failflowFiles = [] as List<FlowFile>
    def input=null;
    def data=null;
    
    
    
     static onStart(ProcessContext context){
        Redis.initializeSettings("host", 6379, null,0,0);
         Functions.jedisPool= Redis.getPoolInstance();
        
      }
    
      static onStop(ProcessContext context){
       Functions.jedisPool.destroy();
      }
    
          try{
    log.warn('is jedispool connected::::'+Functions.jedisPool.isClosed());
            def inputStream = session.read(flowFile)
            def writer = new StringWriter();
            IOUtils.copy(inputStream, writer, "UTF-8");
            data=writer.toString();
            input = new JsonSlurper().parseText( data );
    log.warn('place is::::'+Functions.getLocationByLatLong(input["data"]["lat"],  input["data"]["longi"]);
             .......
             ...........
          }
          catch(Exception e){
        }
        newFlowFile = session.write(newFlowFile, { outputStream -> 
                         outputStream.write( data.getBytes(StandardCharsets.UTF_8) )
                } as OutputStreamCallback)
                failflowFiles<< newFlowFile;
                    }
        
    session.transfer(flowFiles, REL_SUCCESS)
    session.transfer(failflowFiles, REL_FAILURE)
    session.remove(flowFile)

nifi 在 3 节点集群中。函数lib配置在groovyscript模块directory.In上面groovy脚本处理器,日志语句是jedispool connected:::::有时是打印 false,有时 true 但是在第一次部署 jar 之后每次都可以。但后来它是不可预测的,我没有得到代码中的错误。 groovy脚本将如何加载 jar。如何使用 groovy 脚本实现基于库的搜索。

Redis.pool 在初始化后永远不会得到 null。您正在调用 pool.destroy() 但未将其设置为空。

getPoolInstance() 检查池是否为空,然后才创建一个新池。

我看不出有任何理由让 2 个变量保存对同一个池的引用:在 Redis 和函数中 class。