当通过异步执行程序使用 restTemplate 时,MockRestService 出现 ConcurrentModificationException

ConcurrentModificationException with MockRestService when restTemplate used via async executors

尝试使用 spring 的 MockRestService 模拟其余响应以进行集成测试,但 AbstractRequestExpectationManager 将 运行 保留为 ConcurrentModificationException 实际代码时异步使用 rest 模板。

测试伪代码片段:

@Autowired
RestTemplate restTemplate;
MockRestServiceServer mockRestServiceServer;

@Test
public test() {
    // given
    mockRestServiceServer = MockRestServiceServer
            .bindTo( restTemplate )
            .ignoreExpectOrder()  // supported from spring 4.3
            .build();
    prepareRestResponse( "/resource/url", "mock json content".getBytes() );
    // when
    myservice.refreshPricesForProductGroup( 2 );

    // then
    // assertions
}

private void prepareRestResponse( final String urlTail, final byte[] responseContent ) {
    mockRestServiceServer
            .expect( requestTo( endsWith( urlTail ) ) )
            .andExpect( method( HttpMethod.GET ) )
            .andRespond( withSuccess()
                    .body( responseContent )
                    .contentType( APPLICATION_JSON_UTF8 ) );
}

访问其余模板的实际代码:

@Autowired
Executor executor
@Autowired
PriceRestClient priceClient
@Autowired
ProductRestClient productClient

/../

private void refreshPricesForProductGroup( final int groupId ) {

    List<Product> products = productClient.findAllProductsForGroup( groupId );

    products.forEach( p ->
            executor.execute( () -> {
                final Price price = priceClient.getPrice( p.getId() );
                priceRepository.updatePrice( price );
            } )
    );
}

PriceRestClient.getPrice() 执行简单的 rest 调用:

Price getPrice( String productId ) {

    try {
        ResponseEntity<byte[]> entity = restTemplate.exchange(
                restUtil.getProductPriceDataUrl(),
                HttpMethod.GET,
                restUtil.createGzipEncodingRequestEntity(),
                byte[].class,
                productId );

        if ( entity.getStatusCode() == HttpStatus.OK ) {
            String body = restUtil.unmarshalGzipBody( entity.getBody() );
            return priceEntityParser.parse( body );
        }

    } catch ( HttpClientErrorException e ) {
        // TODO
    } catch ( ResourceAccessException e ) {
        // TODO
    } catch ( IOException e ) {
        // TODO
    }

    return null;
}

抛出异常:

Exception in thread "AsyncExecutor-2" java.util.ConcurrentModificationException
    at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711)
    at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:734)
    at org.springframework.test.web.client.AbstractRequestExpectationManager$RequestExpectationGroup.findExpectation(AbstractRequestExpectationManager.java:167)
    at org.springframework.test.web.client.UnorderedRequestExpectationManager.validateRequestInternal(UnorderedRequestExpectationManager.java:42)
    at org.springframework.test.web.client.AbstractRequestExpectationManager.validateRequest(AbstractRequestExpectationManager.java:71)
    at org.springframework.test.web.client.MockRestServiceServer$MockClientHttpRequestFactory.executeInternal(MockRestServiceServer.java:286)
    at org.springframework.mock.http.client.MockClientHttpRequest.execute(MockClientHttpRequest.java:93)
    at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:93)
    at com.mycompany.myproduct.web.client.HttpRequestInterceptorLoggingClient.interceptReq(HttpRequestInterceptorLoggingClient.java:32)
    at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:85)
    at org.springframework.http.client.InterceptingClientHttpRequest.executeInternal(InterceptingClientHttpRequest.java:69)
    at org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
    at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:596)
    at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:557)
    at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:475)
    at com.mycompany.myproduct.rest.PriceRestClient.getPrice(PriceRestClient.java:48)
    at com.mycompany.myproduct.service.ProductPriceSourcingService.lambda$null(ProductPriceSourcingService.java:132)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我是不是做错了什么,或者它可能是 MockRestService 的错误?

我 运行 遇到了使用 RestTemplate / RestOperations 对象的多个线程的相同问题。 MockRestServiceServer 似乎不是线程安全的。

我现在正在尝试解决这个问题,到目前为止,我已经复制了 MockRestService class 并且正在为 expectedRequests 和 actualRequests 尝试 CopyOnWriteArrayList。它似乎解决了问题,但现在我的测试失败了。

"Fixed" 通过创建 UnorderedRequestExpectationManager:

的副本
package cucumber.testbeans;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;

import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.test.web.client.*;

/**
* Essentially an {@link UnorderedRequestExpectationManager}, but overrides some implementations of {@link AbstractRequestExpectationManager}
* in order to allow adding more expectations after the first request has been made.
*/
public class RestRequestExpectationManager extends AbstractRequestExpectationManager {

    private final RequestExpectationGroup remainingExpectations = new RequestExpectationGroup();

    @Override
    public ResponseActions expectRequest( ExpectedCount count, RequestMatcher matcher ) {
        //        Assert.state(getRequests().isEmpty(), "Cannot add more expectations after actual requests are made.");
        RequestExpectation expectation = new DefaultRequestExpectation( count, matcher );
        getExpectations().add( expectation );
        return expectation;
    }

    @Override
    public ClientHttpResponse validateRequest( ClientHttpRequest request ) throws IOException {
        //        if (getRequests().isEmpty()) {
        afterExpectationsDeclared();
        //        }
        ClientHttpResponse response = validateRequestInternal( request );
        getRequests().add( request );
        return response;
    }

    @Override
    protected void afterExpectationsDeclared() {
        this.remainingExpectations.updateAll( getExpectations() );
    }

    @Override
    public ClientHttpResponse validateRequestInternal( ClientHttpRequest request ) throws IOException {
        RequestExpectation expectation = this.remainingExpectations.findExpectation( request );
        if ( expectation != null ) {
            ClientHttpResponse response = expectation.createResponse( request );
            this.remainingExpectations.update( expectation );
            return response;
        }
        throw createUnexpectedRequestError( request );
    }

    /**
    * Same as {@link AbstractRequestExpectationManager.RequestExpectationGroup}, but synchronizes operations on the {@code expectations}
    * set, so async operation would be possible.
    */
    private static class RequestExpectationGroup {

        private final Set<RequestExpectation> expectations = Collections.synchronizedSet( new LinkedHashSet<>() );

        public Set<RequestExpectation> getExpectations() {
            return this.expectations;
        }

        public void update( RequestExpectation expectation ) {
            if ( expectation.hasRemainingCount() ) {
                getExpectations().add( expectation );
            } else {
                getExpectations().remove( expectation );
            }
        }

        public void updateAll( Collection<RequestExpectation> expectations ) {
            for ( RequestExpectation expectation : expectations ) {
                update( expectation );
            }
        }

        public RequestExpectation findExpectation( ClientHttpRequest request ) throws IOException {
            synchronized ( this.expectations ) {
                for ( RequestExpectation expectation : getExpectations() ) {
                    try {
                        expectation.match( request );
                        return expectation;
                    } catch ( AssertionError error ) {
                        // Ignore
                    }
                }
                return null;
            }
        }
    }
}

发生了两件值得注意的事情:

  • 首先是 RestRequestExpectationManager 中的注释行,这使我们能够在处理完第一个请求后添加期望(与手头的 ConcurrentModificationException 问题无关);
  • 然后 expectations 中的同步 RestRequestExpectationManager.RequestExpectationGroup 以支持异步操作。似乎在为我工作。

初始化 MockRestServiceServer 如下:

MockRestServiceServer mockRestServiceServer = MockRestServiceServer
            .bindTo( restTemplate )
            .build( new RestRequestExpectationManager() );

我只需要线程安全,所以我稍微重构了 并删除了其他功能(即时修改预期)。留在这里供参考。

import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.test.web.client.AbstractRequestExpectationManager;
import org.springframework.test.web.client.RequestExpectation;
import org.springframework.test.web.client.UnorderedRequestExpectationManager;

import java.io.IOException;
import java.util.*;

public class SynchronizedUnorderedRequestExpectationManager extends AbstractRequestExpectationManager {
    private final List<RequestExpectation> expectations = Collections.synchronizedList(new LinkedList());
    private final List<ClientHttpRequest> requests = Collections.synchronizedList(new LinkedList());
    private final SynchronizedRequestExpectationGroup remainingExpectations = new SynchronizedRequestExpectationGroup();

    @Override
    protected List<RequestExpectation> getExpectations() {
        return this.expectations;
    }

    @Override
    protected List<ClientHttpRequest> getRequests() {
        return this.requests;
    }

    protected void afterExpectationsDeclared() {
        this.remainingExpectations.updateAll(this.getExpectations());
    }

    public ClientHttpResponse validateRequestInternal(ClientHttpRequest request) throws IOException {
        RequestExpectation expectation = this.remainingExpectations.findExpectation(request);

        if (expectation != null) {
            ClientHttpResponse response = expectation.createResponse(request);
            this.remainingExpectations.update(expectation);
            return response;
        }

        throw this.createUnexpectedRequestError(request);
    }

    public void reset() {
        super.reset();
        this.remainingExpectations.reset();
    }

    protected static class SynchronizedRequestExpectationGroup extends RequestExpectationGroup {

        private final Set<RequestExpectation> expectations = Collections.synchronizedSet(new LinkedHashSet<>());

        @Override
        public Set<RequestExpectation> getExpectations() {
            return this.expectations;
        }

        @Override
        public void updateAll(Collection<RequestExpectation> expectations) {
            synchronized (expectations) {
                super.updateAll(expectations);
            }
        }

        @Override
        public RequestExpectation findExpectation(ClientHttpRequest request) throws IOException {
            synchronized (expectations) {
                return super.findExpectation(request);
            }
        }
    }
}

背景

代码基本上是从UnorderedRequestExpectationManager. Unfortunately this is still necessary as the original class is tightly coupled to the AbstractRequestExpectationManager.RequestExpectationGroup which is not thread safe. In order to replace the dependency this class needs to be rewritten. Other thread unsafe collection dependencies (from AbstractRequestExpectationManager) are replaced by overriding the getExpectations and getRequests方法中复制过来的。集合迭代由关键部分保护。

用法

RestTemplate restTemplate = new RestTemplate();
MockRestServiceServer mockServer = MockRestServiceServer.bindTo(restTemplate)
    .build(new SynchronizedUnorderedRequestExpectationManager());

谢谢劳尔! (已投票)