在 ReactiveKafkaProducerTemplate 发送方法中模拟 SenderResult
Mock SenderResult in ReactiveKafkaProducerTemplate send method
我正在尝试模拟 reactiveKafkaConsumerTemplate 的发送方法。
@Mock
private ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;
@Mock
private ReactiveKafkaProducerTemplate<String, List<Object>> reactiveKafkaProducerTemplate;
Mockito.when(reactiveKafkaConsumerTemplate.receiveAutoAck())
.thenReturn(createConsumerRecords(2));
Mockito.when(reactiveKafkaProducerTemplate
.send(Mockito.anyString(),Mockito.anyString(),Mockito.anyList()))
.thenReturn(???);
我正在尝试将 reactiveProducerTemplate 的发送方法模拟为 return 一个 SenderResult。有可能这样做吗?如果是,有人可以指点我 documentation/sample 来执行此操作。我花了很多时间寻找解决方案,但找不到。
更新:我根据 Gary
的建议尝试了以下操作
ProducerRecord<String, List<Object>> record
= new ProducerRecord<String, List<Object>>(topic,"key", objectSetup.setup());
RecordMetadata meta
= new RecordMetadata(new TopicPartition("topic",0),0,0,0,(long)1,2,1);
Mockito.when(reactiveKafkaProducerTemplate.send(topic,"key",objectSetup.setup())
.thenReturn(Mono.just(new SendResult<>(record, meta))));
我在 .thenReturn(Mono.just(new SendResult<>(record, meta))) 行收到以下异常。它没有提到异常中的空值,我也没有看到任何空值。
java.lang.NullPointerException
at com.ServiceTests.cTestMethod(ServiceTests.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod[=12=](ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke[=12=](ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod(TestMethodTestDescriptor.java:210)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
更新 2:我可以使用 Gary 的代码片段创建模拟。这是我要测试的代码
public void sendToKafka(ConsumerRecord<String, String> consumerRecord){
log.info("sending to topic={}, {}={},", destinationTopic, Metric.class.getSimpleName(), consumerRecord);
List<Object> metrics = transformRecord(consumerRecord);
kafkaProducerTemplate.send(destinationTopic, consumerRecord.key(), metrics)
.doOnSuccess(senderResult -> log.info("sent {} offset : {}", metrics, senderResult.recordMetadata().offset()))
.doOnError(throwable -> log.error("Error while sending message to destination topic : {}", throwable.getMessage()))
.subscribe();
}
当我从测试中调用此方法时,我可以看到该模板是模拟模板,但是,我在线上得到了一个 java.lang.NullPointerException
.doOnSuccess(senderResult -> log.info("sent {} offset : {}", metrics, senderResult.recordMetadata().offset()))
异常没有提供任何关于空值的细节。我确认了 consumerRecord 并且 metrics 不为空。
发现问题出在设置上。实际代码需要 3 个参数,在设置中我只模拟了发送方法的 2 个参数。
将代码更新为:
when(reactiveKafkaProducerTemplate.send(Mockito.anyString(),Mockito.anyString(), Mockito.anyList())).thenReturn(Mono.just(result));
@Test
void test() {
ReactiveKafkaProducerTemplate<String, String> template = mock(ReactiveKafkaProducerTemplate.class);
RecordMetadata meta = new RecordMetadata(new TopicPartition("foo", 0), 0L, 0L, 0L, 0L, 0, 2);
SenderResult result = mock(SenderResult.class);
when(result.recordMetadata()).thenReturn(meta);
when(template.send("foo", "bar")).thenReturn(Mono.just(result));
template.send("foo", "bar")
.doOnNext(sr -> {
assertThat(sr.recordMetadata().toString()).isEqualTo("foo-0@0");
})
.subscribe();
}
我正在尝试模拟 reactiveKafkaConsumerTemplate 的发送方法。
@Mock
private ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;
@Mock
private ReactiveKafkaProducerTemplate<String, List<Object>> reactiveKafkaProducerTemplate;
Mockito.when(reactiveKafkaConsumerTemplate.receiveAutoAck())
.thenReturn(createConsumerRecords(2));
Mockito.when(reactiveKafkaProducerTemplate
.send(Mockito.anyString(),Mockito.anyString(),Mockito.anyList()))
.thenReturn(???);
我正在尝试将 reactiveProducerTemplate 的发送方法模拟为 return 一个 SenderResult。有可能这样做吗?如果是,有人可以指点我 documentation/sample 来执行此操作。我花了很多时间寻找解决方案,但找不到。
更新:我根据 Gary
的建议尝试了以下操作ProducerRecord<String, List<Object>> record
= new ProducerRecord<String, List<Object>>(topic,"key", objectSetup.setup());
RecordMetadata meta
= new RecordMetadata(new TopicPartition("topic",0),0,0,0,(long)1,2,1);
Mockito.when(reactiveKafkaProducerTemplate.send(topic,"key",objectSetup.setup())
.thenReturn(Mono.just(new SendResult<>(record, meta))));
我在 .thenReturn(Mono.just(new SendResult<>(record, meta))) 行收到以下异常。它没有提到异常中的空值,我也没有看到任何空值。
java.lang.NullPointerException
at com.ServiceTests.cTestMethod(ServiceTests.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod[=12=](ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke[=12=](ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod(TestMethodTestDescriptor.java:210)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
更新 2:我可以使用 Gary 的代码片段创建模拟。这是我要测试的代码
public void sendToKafka(ConsumerRecord<String, String> consumerRecord){
log.info("sending to topic={}, {}={},", destinationTopic, Metric.class.getSimpleName(), consumerRecord);
List<Object> metrics = transformRecord(consumerRecord);
kafkaProducerTemplate.send(destinationTopic, consumerRecord.key(), metrics)
.doOnSuccess(senderResult -> log.info("sent {} offset : {}", metrics, senderResult.recordMetadata().offset()))
.doOnError(throwable -> log.error("Error while sending message to destination topic : {}", throwable.getMessage()))
.subscribe();
}
当我从测试中调用此方法时,我可以看到该模板是模拟模板,但是,我在线上得到了一个 java.lang.NullPointerException .doOnSuccess(senderResult -> log.info("sent {} offset : {}", metrics, senderResult.recordMetadata().offset()))
异常没有提供任何关于空值的细节。我确认了 consumerRecord 并且 metrics 不为空。
发现问题出在设置上。实际代码需要 3 个参数,在设置中我只模拟了发送方法的 2 个参数。 将代码更新为:
when(reactiveKafkaProducerTemplate.send(Mockito.anyString(),Mockito.anyString(), Mockito.anyList())).thenReturn(Mono.just(result));
@Test
void test() {
ReactiveKafkaProducerTemplate<String, String> template = mock(ReactiveKafkaProducerTemplate.class);
RecordMetadata meta = new RecordMetadata(new TopicPartition("foo", 0), 0L, 0L, 0L, 0L, 0, 2);
SenderResult result = mock(SenderResult.class);
when(result.recordMetadata()).thenReturn(meta);
when(template.send("foo", "bar")).thenReturn(Mono.just(result));
template.send("foo", "bar")
.doOnNext(sr -> {
assertThat(sr.recordMetadata().toString()).isEqualTo("foo-0@0");
})
.subscribe();
}