Spring 使用 CXF 客户端竞争启动 Condition/Connection 超时
Spring Boot with CXF Client Race Condition/Connection Timeout
我在 Spring 启动应用程序中配置了一个 CXF 客户端,如下所示:
@Bean
public ConsumerSupportService consumerSupportService() {
JaxWsProxyFactoryBean jaxWsProxyFactoryBean = new JaxWsProxyFactoryBean();
jaxWsProxyFactoryBean.setServiceClass(ConsumerSupportService.class);
jaxWsProxyFactoryBean.setAddress("https://www.someservice.com/service?wsdl");
jaxWsProxyFactoryBean.setBindingId(SOAPBinding.SOAP12HTTP_BINDING);
WSAddressingFeature wsAddressingFeature = new WSAddressingFeature();
wsAddressingFeature.setAddressingRequired(true);
jaxWsProxyFactoryBean.getFeatures().add(wsAddressingFeature);
ConsumerSupportService service = (ConsumerSupportService) jaxWsProxyFactoryBean.create();
Client client = ClientProxy.getClient(service);
AddressingProperties addressingProperties = new AddressingProperties();
AttributedURIType to = new AttributedURIType();
to.setValue(applicationProperties.getWex().getServices().getConsumersupport().getTo());
addressingProperties.setTo(to);
AttributedURIType action = new AttributedURIType();
action.setValue("http://serviceaction/SearchConsumer");
addressingProperties.setAction(action);
client.getRequestContext().put("javax.xml.ws.addressing.context", addressingProperties);
setClientTimeout(client);
return service;
}
private void setClientTimeout(Client client) {
HTTPConduit conduit = (HTTPConduit) client.getConduit();
HTTPClientPolicy policy = new HTTPClientPolicy();
policy.setConnectionTimeout(applicationProperties.getWex().getServices().getClient().getConnectionTimeout());
policy.setReceiveTimeout(applicationProperties.getWex().getServices().getClient().getReceiveTimeout());
conduit.setClient(policy);
}
同一个服务 bean 被同一个应用程序序列中的两个不同线程访问。如果我连续执行此特定序列 10 次,我将至少有 3 次从服务调用中获得连接超时。我看到的是:
Caused by: java.io.IOException: Timed out waiting for response to operation {http://theservice.com}SearchConsumer.
at org.apache.cxf.endpoint.ClientImpl.waitResponse(ClientImpl.java:685) ~[cxf-core-3.2.0.jar:3.2.0]
at org.apache.cxf.endpoint.ClientImpl.processResult(ClientImpl.java:608) ~[cxf-core-3.2.0.jar:3.2.0]
如果我更改顺序以使其中一个线程不调用此服务,则错误就会消失。所以,这里似乎发生了某种竞争条件。如果我查看此服务的代理管理器中的日志,我可以看到这两个服务调用都非常快速地做出 return 响应,但是第二个服务调用似乎卡在了代码中的某个地方,实际上从来没有放开连接,直到达到超时值。很长一段时间以来,我一直在努力追查这个问题的原因,但一直没有成功。
我读过一些关于 CXF 客户端代理是否线程安全的不同意见,但我的印象是它们是。如果事实并非如此,我应该为每次调用创建一个新的客户端代理,还是使用代理池?
事实证明这是代理不是 thread-safe 的问题。我最后做的是利用一种类似于 posted 的解决方案,在这个 post 的底部:Is this JAX-WS client call thread safe? - 我为代理创建了一个池,我用它来访问代理以 thread-safe 方式来自多个线程。这似乎很有效。
public class JaxWSServiceProxyPool<T> extends GenericObjectPool<T> {
JaxWSServiceProxyPool(Supplier<T> factory, GenericObjectPoolConfig poolConfig) {
super(new BasePooledObjectFactory<T>() {
@Override
public T create() throws Exception {
return factory.get();
}
@Override
public PooledObject<T> wrap(T t) {
return new DefaultPooledObject<>(t);
}
}, poolConfig != null ? poolConfig : new GenericObjectPoolConfig());
}
}
然后我创建了一个简单的 "registry" class 来保存对各种池的引用。
@Component
public class JaxWSServiceProxyPoolRegistry {
private static final Map<Class, JaxWSServiceProxyPool> registry = new HashMap<>();
public synchronized <T> void register(Class<T> serviceTypeClass, Supplier<T> factory, GenericObjectPoolConfig poolConfig) {
Assert.notNull(serviceTypeClass);
Assert.notNull(factory);
if (!registry.containsKey(serviceTypeClass)) {
registry.put(serviceTypeClass, new JaxWSServiceProxyPool<>(factory, poolConfig));
}
}
public <T> void register(Class<T> serviceTypeClass, Supplier<T> factory) {
register(serviceTypeClass, factory, null);
}
@SuppressWarnings("unchecked")
public <T> JaxWSServiceProxyPool<T> getServiceProxyPool(Class<T> serviceTypeClass) {
Assert.notNull(serviceTypeClass);
return registry.get(serviceTypeClass);
}
}
为了使用它,我做了:
JaxWSServiceProxyPoolRegistry jaxWSServiceProxyPoolRegistry = new JaxWSServiceProxyPoolRegistry();
jaxWSServiceProxyPoolRegistry.register(ConsumerSupportService.class,
this::buildConsumerSupportServiceClient,
getConsumerSupportServicePoolConfig());
其中 buildConsumerSupportServiceClient 使用 JaxWsProxyFactoryBean 构建客户端。
为了从池中检索实例,我注入了我的注册表 class 然后执行:
JaxWSServiceProxyPool<ConsumerSupportService> consumerSupportServiceJaxWSServiceProxyPool = jaxWSServiceProxyPoolRegistry.getServiceProxyPool(ConsumerSupportService.class);
然后 borrow/return 对象 from/to 必要的池。
到目前为止,这似乎运作良好。我已经对它执行了一些相当重的负载测试并且它被阻止了。
我在 Spring 启动应用程序中配置了一个 CXF 客户端,如下所示:
@Bean
public ConsumerSupportService consumerSupportService() {
JaxWsProxyFactoryBean jaxWsProxyFactoryBean = new JaxWsProxyFactoryBean();
jaxWsProxyFactoryBean.setServiceClass(ConsumerSupportService.class);
jaxWsProxyFactoryBean.setAddress("https://www.someservice.com/service?wsdl");
jaxWsProxyFactoryBean.setBindingId(SOAPBinding.SOAP12HTTP_BINDING);
WSAddressingFeature wsAddressingFeature = new WSAddressingFeature();
wsAddressingFeature.setAddressingRequired(true);
jaxWsProxyFactoryBean.getFeatures().add(wsAddressingFeature);
ConsumerSupportService service = (ConsumerSupportService) jaxWsProxyFactoryBean.create();
Client client = ClientProxy.getClient(service);
AddressingProperties addressingProperties = new AddressingProperties();
AttributedURIType to = new AttributedURIType();
to.setValue(applicationProperties.getWex().getServices().getConsumersupport().getTo());
addressingProperties.setTo(to);
AttributedURIType action = new AttributedURIType();
action.setValue("http://serviceaction/SearchConsumer");
addressingProperties.setAction(action);
client.getRequestContext().put("javax.xml.ws.addressing.context", addressingProperties);
setClientTimeout(client);
return service;
}
private void setClientTimeout(Client client) {
HTTPConduit conduit = (HTTPConduit) client.getConduit();
HTTPClientPolicy policy = new HTTPClientPolicy();
policy.setConnectionTimeout(applicationProperties.getWex().getServices().getClient().getConnectionTimeout());
policy.setReceiveTimeout(applicationProperties.getWex().getServices().getClient().getReceiveTimeout());
conduit.setClient(policy);
}
同一个服务 bean 被同一个应用程序序列中的两个不同线程访问。如果我连续执行此特定序列 10 次,我将至少有 3 次从服务调用中获得连接超时。我看到的是:
Caused by: java.io.IOException: Timed out waiting for response to operation {http://theservice.com}SearchConsumer.
at org.apache.cxf.endpoint.ClientImpl.waitResponse(ClientImpl.java:685) ~[cxf-core-3.2.0.jar:3.2.0]
at org.apache.cxf.endpoint.ClientImpl.processResult(ClientImpl.java:608) ~[cxf-core-3.2.0.jar:3.2.0]
如果我更改顺序以使其中一个线程不调用此服务,则错误就会消失。所以,这里似乎发生了某种竞争条件。如果我查看此服务的代理管理器中的日志,我可以看到这两个服务调用都非常快速地做出 return 响应,但是第二个服务调用似乎卡在了代码中的某个地方,实际上从来没有放开连接,直到达到超时值。很长一段时间以来,我一直在努力追查这个问题的原因,但一直没有成功。
我读过一些关于 CXF 客户端代理是否线程安全的不同意见,但我的印象是它们是。如果事实并非如此,我应该为每次调用创建一个新的客户端代理,还是使用代理池?
事实证明这是代理不是 thread-safe 的问题。我最后做的是利用一种类似于 posted 的解决方案,在这个 post 的底部:Is this JAX-WS client call thread safe? - 我为代理创建了一个池,我用它来访问代理以 thread-safe 方式来自多个线程。这似乎很有效。
public class JaxWSServiceProxyPool<T> extends GenericObjectPool<T> {
JaxWSServiceProxyPool(Supplier<T> factory, GenericObjectPoolConfig poolConfig) {
super(new BasePooledObjectFactory<T>() {
@Override
public T create() throws Exception {
return factory.get();
}
@Override
public PooledObject<T> wrap(T t) {
return new DefaultPooledObject<>(t);
}
}, poolConfig != null ? poolConfig : new GenericObjectPoolConfig());
}
}
然后我创建了一个简单的 "registry" class 来保存对各种池的引用。
@Component
public class JaxWSServiceProxyPoolRegistry {
private static final Map<Class, JaxWSServiceProxyPool> registry = new HashMap<>();
public synchronized <T> void register(Class<T> serviceTypeClass, Supplier<T> factory, GenericObjectPoolConfig poolConfig) {
Assert.notNull(serviceTypeClass);
Assert.notNull(factory);
if (!registry.containsKey(serviceTypeClass)) {
registry.put(serviceTypeClass, new JaxWSServiceProxyPool<>(factory, poolConfig));
}
}
public <T> void register(Class<T> serviceTypeClass, Supplier<T> factory) {
register(serviceTypeClass, factory, null);
}
@SuppressWarnings("unchecked")
public <T> JaxWSServiceProxyPool<T> getServiceProxyPool(Class<T> serviceTypeClass) {
Assert.notNull(serviceTypeClass);
return registry.get(serviceTypeClass);
}
}
为了使用它,我做了:
JaxWSServiceProxyPoolRegistry jaxWSServiceProxyPoolRegistry = new JaxWSServiceProxyPoolRegistry();
jaxWSServiceProxyPoolRegistry.register(ConsumerSupportService.class,
this::buildConsumerSupportServiceClient,
getConsumerSupportServicePoolConfig());
其中 buildConsumerSupportServiceClient 使用 JaxWsProxyFactoryBean 构建客户端。
为了从池中检索实例,我注入了我的注册表 class 然后执行:
JaxWSServiceProxyPool<ConsumerSupportService> consumerSupportServiceJaxWSServiceProxyPool = jaxWSServiceProxyPoolRegistry.getServiceProxyPool(ConsumerSupportService.class);
然后 borrow/return 对象 from/to 必要的池。
到目前为止,这似乎运作良好。我已经对它执行了一些相当重的负载测试并且它被阻止了。