在 java 中实现 DestinationAvailabilityListener 时出错

Getting error while implementing DestinationAvailabilityListener in java

我有一个用例,我需要侦听 weblogic 12c 中托管的统一分布式队列的可用成员。我阅读并发现 DestinationAvailabilityListener 接口有可能适合需要的方法。以下是我的代码:

import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import javax.naming.Context;

import org.apache.kafka.connect.connector.ConnectorContext;

import weblogic.jms.extensions.DestinationAvailabilityListener;
import weblogic.jms.extensions.DestinationDetail;
import weblogic.jms.extensions.JMSDestinationAvailabilityHelper;
import weblogic.jms.extensions.RegistrationHandle;

public class QueueMonitor extends Thread implements DestinationAvailabilityListener,WebLogicJmsTask {
        private Hashtable<String, String> wlsEnvParamHashTbl = null;
        private final Object containerLock = new Object();
        private final  CountDownLatch startLatch ;
        private RegistrationHandle registrationHandle;
        private ArrayList<String> containerMap;
        boolean shutdown =false,changeflg=false;
        private final ConnectorContext context;
        
    public QueueMonitor(Map<String, String> props,ConnectorContext context) {
            super();
            wlsEnvParamHashTbl = new Hashtable<String, String>();
            wlsEnvParamHashTbl.put(Context.PROVIDER_URL, props.get(WEBLOGIC_T3_URL_DESTINATION_CONFIG)); // set Weblogic JMS URL
            wlsEnvParamHashTbl.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); // set Weblogic JNDI
            wlsEnvParamHashTbl.put(Context.SECURITY_PRINCIPAL, props.get(WEBLOGIC_USERNAME_CONFIG)); // set Weblogic UserName
            wlsEnvParamHashTbl.put(Context.SECURITY_CREDENTIALS, props.get(WEBLOGIC_PASSWORD_CONFIG)); // set Weblogic PassWord
            for (Map.Entry<String,String> entry : wlsEnvParamHashTbl.entrySet())  
                System.out.println("Key = " + entry.getKey() + 
                                 ", Value = " + entry.getValue()); 
        System.out.println(props.get(WEBLOGIC_JMS_DESTINATION_CONFIG));
            this.context=context;
            this.startLatch = new CountDownLatch(1);
            JMSDestinationAvailabilityHelper dah = JMSDestinationAvailabilityHelper.getInstance();
            try {this.registrationHandle= dah.register(wlsEnvParamHashTbl, props.get(WEBLOGIC_JMS_DESTINATION_CONFIG), this);
            
                startLatch.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            catch(Exception e)
            {
                System.out.println(e);
            }
        }

    public void run()
    {
        while(!shutdown)
        {
            if(changeflg)
            {
                context.requestTaskReconfiguration();
                changeflg=false;
            }
            
        }
        if(shutdown)
        {
            System.out.println("Called shutdown, returning");
            return;
        }
        
    }
    
    public synchronized String getContainers()
    {
        String list=null;
        synchronized (containerLock) {
            list=String.join(",", containerMap);
        }
        return list;
    }

      @Override
         public void onDestinationsAvailable(String destJNDIName, List<DestinationDetail> physicalAvailableMembers) {
            synchronized (containerLock) {
                System.out.println("destJNDIName is :"+destJNDIName);
              // For all Physical destinations, start a container
              for (DestinationDetail detail : physicalAvailableMembers) {
                  System.out.println("member is :"+detail.getJNDIName());
                  containerMap.add(detail.getJNDIName());
                //containerMap.put(detail.getJNDIName(), detail.getJNDIName());
              }
              if(startLatch.getCount()==0)
              {
                  changeflg=true;
              }
            }
            startLatch.countDown();       
          }

        @Override
        public void onDestinationsUnavailable(String destJNDIName, List<DestinationDetail> physicalUnavailableMembers) {
            // TODO Auto-generated method stub
            synchronized (containerLock) {
                  // Shutdown all containers whose physical members are no longer available
                  for (DestinationDetail detail : physicalUnavailableMembers) {
                    containerMap.remove(detail.getJNDIName());
                    // maybe i will need to do somethinh here
                  }
                  changeflg=true;
                }
            
        }

        @Override
          public void onFailure(String destJndiName, Exception exception) {
            // Looks like a cluster wide failure
             System.out.println("inside on failure");
            shutdown();
            System.out.println(exception);
            
          }
        public void shutdown() {
            // Unregister for events about destination availability
              registrationHandle.unregister();

            // Shut down containers
            synchronized (containerLock) {
                containerMap.removeAll(containerMap);
            }
            
            shutdown=true;
          }
}

在 运行 时它会转到 onFailure 并出现以下错误:

<Error> <Kernel> <WL-000802> <ExecuteRequest failed
 java.lang.NullPointerException.
        at com.bt.connect.QueueMonitor.shutdown(QueueMonitor.java:128)
        at com.bt.connect.QueueMonitor.onFailure(QueueMonitor.java:118)
        at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.run(JMSDestinationAvailabilityHelper.java:490)
        at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.callOutListener(JMSDestinationAvailabilityHelper.java:451)
        at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.onFailure(JMSDestinationAvailabilityHelper.java:487)
        at weblogic.jms.common.CDS$DD2Listener.reportException(CDS.java:1145)
        at weblogic.jms.common.CDS.lookupDDAndCalloutListenerSingle(CDS.java:459)
        at weblogic.jms.common.CDS.lookupDDAndCalloutListener(CDS.java:412)
        at weblogic.jms.common.CDS.access0(CDS.java:52)
        at weblogic.jms.common.CDS$DDListenerRegistrationTimerListener.timerExpired(CDS.java:255)
        at weblogic.timers.internal.TimerImpl.run(TimerImpl.java:301)
        at weblogic.work.ExecuteRequestAdapter.execute(ExecuteRequestAdapter.java:21)
        at weblogic.kernel.ExecuteThread.execute(ExecuteThread.java:147)
        at weblogic.kernel.ExecuteThread.run(ExecuteThread.java:119)

我正在使用 java 8. 谁能帮帮我?提前致谢。

我之前已经解决了代码中的问题,与可能需要帮助的其他人共享相同的问题,主要问题是 wlthint3client.jar 是必需的。最终代码git如下。

MyJMSKafkaConnect

代码是kafkaconnect api 从Weblogic Uniform分布式队列中读取并写入到kafka队列中。