Spring 使用 CXF 客户端竞争启动 Condition/Connection 超时

Spring Boot with CXF Client Race Condition/Connection Timeout

我在 Spring 启动应用程序中配置了一个 CXF 客户端,如下所示:

    @Bean
    public ConsumerSupportService consumerSupportService() {
        JaxWsProxyFactoryBean jaxWsProxyFactoryBean = new JaxWsProxyFactoryBean();
        jaxWsProxyFactoryBean.setServiceClass(ConsumerSupportService.class);
        jaxWsProxyFactoryBean.setAddress("https://www.someservice.com/service?wsdl");
        jaxWsProxyFactoryBean.setBindingId(SOAPBinding.SOAP12HTTP_BINDING);
        WSAddressingFeature wsAddressingFeature = new WSAddressingFeature();
        wsAddressingFeature.setAddressingRequired(true);
        jaxWsProxyFactoryBean.getFeatures().add(wsAddressingFeature);

        ConsumerSupportService service =  (ConsumerSupportService) jaxWsProxyFactoryBean.create();

        Client client = ClientProxy.getClient(service);
        AddressingProperties addressingProperties = new AddressingProperties();
        AttributedURIType to = new AttributedURIType();
        to.setValue(applicationProperties.getWex().getServices().getConsumersupport().getTo());
        addressingProperties.setTo(to);
        AttributedURIType action = new AttributedURIType();
        action.setValue("http://serviceaction/SearchConsumer");
        addressingProperties.setAction(action);
        client.getRequestContext().put("javax.xml.ws.addressing.context", addressingProperties);

        setClientTimeout(client);

        return service;
    }

    private void setClientTimeout(Client client) {
        HTTPConduit conduit = (HTTPConduit) client.getConduit();
        HTTPClientPolicy policy = new HTTPClientPolicy();
        policy.setConnectionTimeout(applicationProperties.getWex().getServices().getClient().getConnectionTimeout());
        policy.setReceiveTimeout(applicationProperties.getWex().getServices().getClient().getReceiveTimeout());
        conduit.setClient(policy);
    }

同一个服务 bean 被同一个应用程序序列中的两个不同线程访问。如果我连续执行此特定序列 10 次,我将至少有 3 次从服务调用中获得连接超时。我看到的是:

Caused by: java.io.IOException: Timed out waiting for response to operation {http://theservice.com}SearchConsumer.
    at org.apache.cxf.endpoint.ClientImpl.waitResponse(ClientImpl.java:685) ~[cxf-core-3.2.0.jar:3.2.0]
    at org.apache.cxf.endpoint.ClientImpl.processResult(ClientImpl.java:608) ~[cxf-core-3.2.0.jar:3.2.0]

如果我更改顺序以使其中一个线程不调用此服务,则错误就会消失。所以,这里似乎发生了某种竞争条件。如果我查看此服务的代理管理器中的日志,我可以看到这两个服务调用都非常快速地做出 return 响应,但是第二个服务调用似乎卡在了代码中的某个地方,实际上从来没有放开连接,直到达到超时值。很长一段时间以来,我一直在努力追查这个问题的原因,但一直没有成功。

我读过一些关于 CXF 客户端代理是否线程安全的不同意见,但我的印象是它们是。如果事实并非如此,我应该为每次调用创建一个新的客户端代理,还是使用代理池?

事实证明这是代理不是 thread-safe 的问题。我最后做的是利用一种类似于 posted 的解决方案,在这个 post 的底部:Is this JAX-WS client call thread safe? - 我为代理创建了一个池,我用它来访问代理以 thread-safe 方式来自多个线程。这似乎很有效。

public class JaxWSServiceProxyPool<T> extends GenericObjectPool<T> {
    JaxWSServiceProxyPool(Supplier<T> factory, GenericObjectPoolConfig poolConfig) {
        super(new BasePooledObjectFactory<T>() {
            @Override
            public T create() throws Exception {
                return factory.get();
            }

            @Override
            public PooledObject<T> wrap(T t) {
                return new DefaultPooledObject<>(t);
            }
        }, poolConfig != null ? poolConfig : new GenericObjectPoolConfig());
    }
}

然后我创建了一个简单的 "registry" class 来保存对各种池的引用。

@Component
public class JaxWSServiceProxyPoolRegistry {
    private static final Map<Class, JaxWSServiceProxyPool> registry = new HashMap<>();

    public synchronized <T> void register(Class<T> serviceTypeClass, Supplier<T> factory, GenericObjectPoolConfig poolConfig) {
        Assert.notNull(serviceTypeClass);
        Assert.notNull(factory);
        if (!registry.containsKey(serviceTypeClass)) {
            registry.put(serviceTypeClass, new JaxWSServiceProxyPool<>(factory, poolConfig));
        }
    }

    public <T> void register(Class<T> serviceTypeClass, Supplier<T> factory) {
        register(serviceTypeClass, factory, null);
    }

    @SuppressWarnings("unchecked")
    public <T> JaxWSServiceProxyPool<T> getServiceProxyPool(Class<T> serviceTypeClass) {
        Assert.notNull(serviceTypeClass);
        return registry.get(serviceTypeClass);
    }
}

为了使用它,我做了:

JaxWSServiceProxyPoolRegistry jaxWSServiceProxyPoolRegistry = new JaxWSServiceProxyPoolRegistry();
jaxWSServiceProxyPoolRegistry.register(ConsumerSupportService.class,
            this::buildConsumerSupportServiceClient,
            getConsumerSupportServicePoolConfig());

其中 buildConsumerSupportServiceClient 使用 JaxWsProxyFactoryBean 构建客户端。

为了从池中检索实例,我注入了我的注册表 class 然后执行:

JaxWSServiceProxyPool<ConsumerSupportService> consumerSupportServiceJaxWSServiceProxyPool = jaxWSServiceProxyPoolRegistry.getServiceProxyPool(ConsumerSupportService.class);

然后 borrow/return 对象 from/to 必要的池。

到目前为止,这似乎运作良好。我已经对它执行了一些相当重的负载测试并且它被阻止了。