在 java 中实现 DestinationAvailabilityListener 时出错
Getting error while implementing DestinationAvailabilityListener in java
我有一个用例,我需要侦听 weblogic 12c 中托管的统一分布式队列的可用成员。我阅读并发现 DestinationAvailabilityListener 接口有可能适合需要的方法。以下是我的代码:
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import javax.naming.Context;
import org.apache.kafka.connect.connector.ConnectorContext;
import weblogic.jms.extensions.DestinationAvailabilityListener;
import weblogic.jms.extensions.DestinationDetail;
import weblogic.jms.extensions.JMSDestinationAvailabilityHelper;
import weblogic.jms.extensions.RegistrationHandle;
public class QueueMonitor extends Thread implements DestinationAvailabilityListener,WebLogicJmsTask {
private Hashtable<String, String> wlsEnvParamHashTbl = null;
private final Object containerLock = new Object();
private final CountDownLatch startLatch ;
private RegistrationHandle registrationHandle;
private ArrayList<String> containerMap;
boolean shutdown =false,changeflg=false;
private final ConnectorContext context;
public QueueMonitor(Map<String, String> props,ConnectorContext context) {
super();
wlsEnvParamHashTbl = new Hashtable<String, String>();
wlsEnvParamHashTbl.put(Context.PROVIDER_URL, props.get(WEBLOGIC_T3_URL_DESTINATION_CONFIG)); // set Weblogic JMS URL
wlsEnvParamHashTbl.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); // set Weblogic JNDI
wlsEnvParamHashTbl.put(Context.SECURITY_PRINCIPAL, props.get(WEBLOGIC_USERNAME_CONFIG)); // set Weblogic UserName
wlsEnvParamHashTbl.put(Context.SECURITY_CREDENTIALS, props.get(WEBLOGIC_PASSWORD_CONFIG)); // set Weblogic PassWord
for (Map.Entry<String,String> entry : wlsEnvParamHashTbl.entrySet())
System.out.println("Key = " + entry.getKey() +
", Value = " + entry.getValue());
System.out.println(props.get(WEBLOGIC_JMS_DESTINATION_CONFIG));
this.context=context;
this.startLatch = new CountDownLatch(1);
JMSDestinationAvailabilityHelper dah = JMSDestinationAvailabilityHelper.getInstance();
try {this.registrationHandle= dah.register(wlsEnvParamHashTbl, props.get(WEBLOGIC_JMS_DESTINATION_CONFIG), this);
startLatch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
catch(Exception e)
{
System.out.println(e);
}
}
public void run()
{
while(!shutdown)
{
if(changeflg)
{
context.requestTaskReconfiguration();
changeflg=false;
}
}
if(shutdown)
{
System.out.println("Called shutdown, returning");
return;
}
}
public synchronized String getContainers()
{
String list=null;
synchronized (containerLock) {
list=String.join(",", containerMap);
}
return list;
}
@Override
public void onDestinationsAvailable(String destJNDIName, List<DestinationDetail> physicalAvailableMembers) {
synchronized (containerLock) {
System.out.println("destJNDIName is :"+destJNDIName);
// For all Physical destinations, start a container
for (DestinationDetail detail : physicalAvailableMembers) {
System.out.println("member is :"+detail.getJNDIName());
containerMap.add(detail.getJNDIName());
//containerMap.put(detail.getJNDIName(), detail.getJNDIName());
}
if(startLatch.getCount()==0)
{
changeflg=true;
}
}
startLatch.countDown();
}
@Override
public void onDestinationsUnavailable(String destJNDIName, List<DestinationDetail> physicalUnavailableMembers) {
// TODO Auto-generated method stub
synchronized (containerLock) {
// Shutdown all containers whose physical members are no longer available
for (DestinationDetail detail : physicalUnavailableMembers) {
containerMap.remove(detail.getJNDIName());
// maybe i will need to do somethinh here
}
changeflg=true;
}
}
@Override
public void onFailure(String destJndiName, Exception exception) {
// Looks like a cluster wide failure
System.out.println("inside on failure");
shutdown();
System.out.println(exception);
}
public void shutdown() {
// Unregister for events about destination availability
registrationHandle.unregister();
// Shut down containers
synchronized (containerLock) {
containerMap.removeAll(containerMap);
}
shutdown=true;
}
}
在 运行 时它会转到 onFailure 并出现以下错误:
<Error> <Kernel> <WL-000802> <ExecuteRequest failed
java.lang.NullPointerException.
at com.bt.connect.QueueMonitor.shutdown(QueueMonitor.java:128)
at com.bt.connect.QueueMonitor.onFailure(QueueMonitor.java:118)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.run(JMSDestinationAvailabilityHelper.java:490)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.callOutListener(JMSDestinationAvailabilityHelper.java:451)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.onFailure(JMSDestinationAvailabilityHelper.java:487)
at weblogic.jms.common.CDS$DD2Listener.reportException(CDS.java:1145)
at weblogic.jms.common.CDS.lookupDDAndCalloutListenerSingle(CDS.java:459)
at weblogic.jms.common.CDS.lookupDDAndCalloutListener(CDS.java:412)
at weblogic.jms.common.CDS.access0(CDS.java:52)
at weblogic.jms.common.CDS$DDListenerRegistrationTimerListener.timerExpired(CDS.java:255)
at weblogic.timers.internal.TimerImpl.run(TimerImpl.java:301)
at weblogic.work.ExecuteRequestAdapter.execute(ExecuteRequestAdapter.java:21)
at weblogic.kernel.ExecuteThread.execute(ExecuteThread.java:147)
at weblogic.kernel.ExecuteThread.run(ExecuteThread.java:119)
我正在使用 java 8. 谁能帮帮我?提前致谢。
我之前已经解决了代码中的问题,与可能需要帮助的其他人共享相同的问题,主要问题是 wlthint3client.jar 是必需的。最终代码git如下。
代码是kafkaconnect api 从Weblogic Uniform分布式队列中读取并写入到kafka队列中。
我有一个用例,我需要侦听 weblogic 12c 中托管的统一分布式队列的可用成员。我阅读并发现 DestinationAvailabilityListener 接口有可能适合需要的方法。以下是我的代码:
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import javax.naming.Context;
import org.apache.kafka.connect.connector.ConnectorContext;
import weblogic.jms.extensions.DestinationAvailabilityListener;
import weblogic.jms.extensions.DestinationDetail;
import weblogic.jms.extensions.JMSDestinationAvailabilityHelper;
import weblogic.jms.extensions.RegistrationHandle;
public class QueueMonitor extends Thread implements DestinationAvailabilityListener,WebLogicJmsTask {
private Hashtable<String, String> wlsEnvParamHashTbl = null;
private final Object containerLock = new Object();
private final CountDownLatch startLatch ;
private RegistrationHandle registrationHandle;
private ArrayList<String> containerMap;
boolean shutdown =false,changeflg=false;
private final ConnectorContext context;
public QueueMonitor(Map<String, String> props,ConnectorContext context) {
super();
wlsEnvParamHashTbl = new Hashtable<String, String>();
wlsEnvParamHashTbl.put(Context.PROVIDER_URL, props.get(WEBLOGIC_T3_URL_DESTINATION_CONFIG)); // set Weblogic JMS URL
wlsEnvParamHashTbl.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); // set Weblogic JNDI
wlsEnvParamHashTbl.put(Context.SECURITY_PRINCIPAL, props.get(WEBLOGIC_USERNAME_CONFIG)); // set Weblogic UserName
wlsEnvParamHashTbl.put(Context.SECURITY_CREDENTIALS, props.get(WEBLOGIC_PASSWORD_CONFIG)); // set Weblogic PassWord
for (Map.Entry<String,String> entry : wlsEnvParamHashTbl.entrySet())
System.out.println("Key = " + entry.getKey() +
", Value = " + entry.getValue());
System.out.println(props.get(WEBLOGIC_JMS_DESTINATION_CONFIG));
this.context=context;
this.startLatch = new CountDownLatch(1);
JMSDestinationAvailabilityHelper dah = JMSDestinationAvailabilityHelper.getInstance();
try {this.registrationHandle= dah.register(wlsEnvParamHashTbl, props.get(WEBLOGIC_JMS_DESTINATION_CONFIG), this);
startLatch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
catch(Exception e)
{
System.out.println(e);
}
}
public void run()
{
while(!shutdown)
{
if(changeflg)
{
context.requestTaskReconfiguration();
changeflg=false;
}
}
if(shutdown)
{
System.out.println("Called shutdown, returning");
return;
}
}
public synchronized String getContainers()
{
String list=null;
synchronized (containerLock) {
list=String.join(",", containerMap);
}
return list;
}
@Override
public void onDestinationsAvailable(String destJNDIName, List<DestinationDetail> physicalAvailableMembers) {
synchronized (containerLock) {
System.out.println("destJNDIName is :"+destJNDIName);
// For all Physical destinations, start a container
for (DestinationDetail detail : physicalAvailableMembers) {
System.out.println("member is :"+detail.getJNDIName());
containerMap.add(detail.getJNDIName());
//containerMap.put(detail.getJNDIName(), detail.getJNDIName());
}
if(startLatch.getCount()==0)
{
changeflg=true;
}
}
startLatch.countDown();
}
@Override
public void onDestinationsUnavailable(String destJNDIName, List<DestinationDetail> physicalUnavailableMembers) {
// TODO Auto-generated method stub
synchronized (containerLock) {
// Shutdown all containers whose physical members are no longer available
for (DestinationDetail detail : physicalUnavailableMembers) {
containerMap.remove(detail.getJNDIName());
// maybe i will need to do somethinh here
}
changeflg=true;
}
}
@Override
public void onFailure(String destJndiName, Exception exception) {
// Looks like a cluster wide failure
System.out.println("inside on failure");
shutdown();
System.out.println(exception);
}
public void shutdown() {
// Unregister for events about destination availability
registrationHandle.unregister();
// Shut down containers
synchronized (containerLock) {
containerMap.removeAll(containerMap);
}
shutdown=true;
}
}
在 运行 时它会转到 onFailure 并出现以下错误:
<Error> <Kernel> <WL-000802> <ExecuteRequest failed
java.lang.NullPointerException.
at com.bt.connect.QueueMonitor.shutdown(QueueMonitor.java:128)
at com.bt.connect.QueueMonitor.onFailure(QueueMonitor.java:118)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.run(JMSDestinationAvailabilityHelper.java:490)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.callOutListener(JMSDestinationAvailabilityHelper.java:451)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.onFailure(JMSDestinationAvailabilityHelper.java:487)
at weblogic.jms.common.CDS$DD2Listener.reportException(CDS.java:1145)
at weblogic.jms.common.CDS.lookupDDAndCalloutListenerSingle(CDS.java:459)
at weblogic.jms.common.CDS.lookupDDAndCalloutListener(CDS.java:412)
at weblogic.jms.common.CDS.access0(CDS.java:52)
at weblogic.jms.common.CDS$DDListenerRegistrationTimerListener.timerExpired(CDS.java:255)
at weblogic.timers.internal.TimerImpl.run(TimerImpl.java:301)
at weblogic.work.ExecuteRequestAdapter.execute(ExecuteRequestAdapter.java:21)
at weblogic.kernel.ExecuteThread.execute(ExecuteThread.java:147)
at weblogic.kernel.ExecuteThread.run(ExecuteThread.java:119)
我正在使用 java 8. 谁能帮帮我?提前致谢。
我之前已经解决了代码中的问题,与可能需要帮助的其他人共享相同的问题,主要问题是 wlthint3client.jar 是必需的。最终代码git如下。
代码是kafkaconnect api 从Weblogic Uniform分布式队列中读取并写入到kafka队列中。