EmbeddedKafka 不工作导致 Scala 错误

EmbeddedKafka not working cause an Scala error

我有一个基于 spring-boot java-gradle 的工作服务,可以生成和使用 Kafka 消息。但是我无法使用带有 @EmbeddedKafka 注释的 spring-kafka-test 库或使用 @ClassRule 方式创建集成测试。在这两种方式中,我最终都会遇到相同的错误(下面针对 Scala 指出)。如果有人知道幕后可能发生的事情,那将非常有帮助。

Spring 引导版本:2.1.6.RELEASE Spring卡夫卡版本:2.2.7.RELEASE

生产者配置:

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    private String bootstrapAddress;

    public KafkaProducerConfig(@Value("${spring.kafka.bootstrap-servers}") String bootstrapAddress) {
        this.bootstrapAddress = bootstrapAddress;
    }

    @Bean
    public ProducerFactory<String, Greeting> greetingProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
        return new KafkaTemplate<>(greetingProducerFactory());
    }
}

生产者代码:

@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Greeting> kafkaTemplate;

    @Value(value = "${kiosk.kafka.topic.greeting}")
    private String greetingTopicName;

    public void sendMessage(Greeting greeting) {

        ListenableFuture<SendResult<String, Greeting>> future = kafkaTemplate.send(greetingTopicName, greeting);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Greeting>>() {

            @Override
            public void onSuccess(SendResult<String, Greeting> result) {
                System.out.println("Sent greeting=[" + greeting + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("Unable to send greeting=[" + greeting + "] due to : " + ex.getMessage());
            }
        });
    }
}

属性文件:

spring.kafka.consumer.auto-offset-reset=earliest
spring.embedded.kafka.brokers=localhost:9092
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.consumer.group-id=1

并且我在开始测试时尝试了两种方法和相同的错误:

测试class规则:

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class PKafkaProducerClassRuleTest {

    private static final Logger LOGGER =
        LoggerFactory.getLogger(PKafkaProducerClassRuleTest.class);

    private static String SENDER_TOPIC = "sender.t";

    static {
        System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
    }

    @Autowired
    private PinPadKafkaProducer sender;

    private KafkaMessageListenerContainer<String, String> container;

    private BlockingQueue<ConsumerRecord<String, String>> records;

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka =
        new EmbeddedKafkaRule(1, true, SENDER_TOPIC);

使用 EmbeddedKafka 测试:

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1,
    topics = {
        "sender.t" })
public class PKafkaProducerEmbeddedKafkaTest {

    private static final Logger LOGGER =
        LoggerFactory.getLogger(PKafkaProducerEmbeddedKafkaTest.class);

    private static String SENDER_TOPIC = "sender.t";

    static {
        System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY,
            "spring.kafka.bootstrap-servers");
    }

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Autowired
    private PinPadKafkaProducer sender;

    @Test
    public void someTest() {
        sender.sendMessage(new Greeting("msgtest", "nametest"));
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, SENDER_TOPIC);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

}

两种情况下的错误:

19/11/2019 13:51:56.774+0100 ERROR o.s.t.c.TestContextManager [] - Caught exception while allowing TestExecutionListener [org.springframework.test.context.web.ServletTestExecutionListener@e9cee0d] to prepare test instance [com.goldcar.kiosk.PKafkaProducerEmbeddedKafkaTest@39271181]
java.lang.IllegalStateException: Failed to load ApplicationContext
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:125)
    at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:108)
    at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190)
    at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132)
    at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:246)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runReflectiveCall(SpringJUnit4ClassRunner.java:289)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access[=15=]0(ParentRunner.java:58)
    at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
    at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
    at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
    at org.gradle.internal.concurrent.ManagedExecutorImpl.run(ManagedExecutorImpl.java:46)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:405)
    at org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer.customizeContext(EmbeddedKafkaContextCustomizer.java:109)
    at org.springframework.boot.test.context.SpringBootContextLoader$ContextCustomizerAdapter.initialize(SpringBootContextLoader.java:300)
    at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:621)
    at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:365)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:310)
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:119)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
    ... 49 common frames omitted
Caused by: java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
    at kafka.cluster.EndPoint$.<init>(EndPoint.scala:32)
    at kafka.cluster.EndPoint$.<clinit>(EndPoint.scala)
    at kafka.server.Defaults$.<init>(KafkaConfig.scala:68)
    at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
    at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:781)
    at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
    at kafka.utils.TestUtils$.createBrokerConfig(TestUtils.scala:234)
    at kafka.utils.TestUtils.createBrokerConfig(TestUtils.scala)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.createBrokerProperties(EmbeddedKafkaBroker.java:239)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:214)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1837)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1774)
    ... 58 common frames omitted

您的类路径中的 kafka jar 版本似乎不匹配。

如果您使用与 Boot 设置的默认版本不同的 kafka-clients 版本,就会发生这种情况。

有关如何覆盖所有 kafka jar 版本的示例,请参阅 this appendix

When you use spring-kafka-test (version 2.2.x) with the 2.1.x kafka-clients jar, you need to override certain transitive dependencies, as follows:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring.kafka.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>${spring.kafka.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
        </exclusion>
    </exclusions>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.1</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.1</version>
    <classifier>test</classifier>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.1</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.1</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

Note that when switching to scala 2.12 (recommended for 2.1.x and higher), the 2.11 version must be excluded from spring-kafka-test.

在gradle中使用@Gary Russell 回答应用的解决方案:

gradle.properties:

spring_kafka_test_version=2.2.7.RELEASE
kafka_test_version=2.1.1
kafka_clients_test_version=2.1.1

build.gradle:

implementation group: 'org.springframework.kafka', name: 'spring-kafka'
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafka_clients_test_version
implementation group: 'org.apache.kafka', name: 'kafka_2.12', version: kafka_test_version

testImplementation (group: 'org.springframework.kafka', name: 'spring-kafka-test', version: spring_kafka_test_version) {
    exclude module: 'kafka_2.11'
}
testImplementation group: 'org.apache.kafka', name: 'kafka_2.12', version: kafka_test_version, classifier: 'test'
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafka_clients_test_version