即使测试通过,在 Kafka 测试期间也会出现异常
Getting Exceptions during KafkaTest eventhough the test passes
我是 运行 使用 EmbeddedKafka 的 Junit 测试,我能够生产和消费到 embeddedKafka 代理,并成功断言我发送的数据。
但是我可以在断言完成后出现的堆栈跟踪中看到很多异常。
1)
java.io.IOException: 在读取响应之前断开与 0 的连接
在 org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) ~[kafka-clients-2.0.1.jar:na]
在 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:240) ~[kafka_2.11-2.0.1.jar:na]
在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [kafka_2.11-2.0.1.jar:na]
2)
2019-10-04 15:49:27.123 WARN 1812 --- [r-0-send-thread] kafka.controller.RequestSendThread : [RequestSendThread controllerId=0] 控制器 0 与代理的连接 localhost:54745 (id: 0 rack: null) 不成功
java.net.SocketTimeoutException: 1000 毫秒内连接失败
在 kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:280) [kafka_2.11-2.0.1.jar:na]
在 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233) [kafka_2.11-2.0.1.jar:na]
在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [kafka_2.11-2.0.1.jar:na]
3)
java.io.IOException:连接到 localhost:54745 (id: 0 rack: null) 失败。
在 org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70) ~[kafka-clients-2.0.1.jar:na]
在 kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279) [kafka_2.11-2.0.1.jar:na]
在 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233) [kafka_2.11-2.0.1.jar:na]
在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [kafka_2.11-2.0.1.jar:na]
4)
java.lang.InterruptedException: 空
在 java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(未知来源)~[na:1.8.0_181]
在 java.util.concurrent.CountDownLatch.await(未知来源)~[na:1.8.0_181]
在 kafka.utils.ShutdownableThread.pause(ShutdownableThread.scala:69) [kafka_2.11-2.0.1.jar:na]
在 kafka.controller.RequestSendThread.backoff$1(ControllerChannelManager.scala:221) ~[kafka_2.11-2.0.1.jar:na]
在 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:235) ~[kafka_2.11-2.0.1.jar:na]
在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [kafka_2.11-2.0.1.jar:na]
我的测试:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, classes = TestApplication.class)
@ActiveProfiles("test")
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
//@EmbeddedKafka(controlledShutdown = true)
public class KafkaUtilTest {
private static String TOPIC = "PE";
@Autowired
KafkaUtil kafkaUtil;
@Autowired
ConfigurationProperties configProperties;
SchedulingCallRequestDTO request;
ScheduleOrderResponseDTOv2 response;
Consumer<String, Object> consumer;
HashMap<String, String> expectedHeaderValueMap;
@ClassRule
public static EmbeddedKafkaRule embeddedKafkarule = new EmbeddedKafkaRule(1, true, TOPIC);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
System.setProperty("spring.kafka.producer.bootstrap-servers",
embeddedKafkarule.getEmbeddedKafka().getBrokersAsString());
}
@AfterClass
public static void tearDown() {
embeddedKafkarule.getEmbeddedKafka().getKafkaServers().forEach(b -> b.shutdown());
embeddedKafkarule.getEmbeddedKafka().getKafkaServers().forEach(b -> b.awaitShutdown());
}
@Before
public void init() {
readFile("0211");
expectedHeaderValueMap = getExpectedHeaderValueMap();
Map<String, Object> consumerConfigs = new HashMap<>(
KafkaTestUtils.consumerProps("consumer", "true", embeddedKafkarule.getEmbeddedKafka()));
consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerConfigs.put(JsonDeserializer.TRUSTED_PACKAGES, "com.adessa.promiseengine.dto.kafka");
consumer = new DefaultKafkaConsumerFactory<String, Object>(consumerConfigs).createConsumer();
List<String> topics = new ArrayList<>();
topics.add(TOPIC);
TopicPartition topicPartition1 = new TopicPartition(TOPIC, 0);
TopicPartition topicPartition2 = new TopicPartition(TOPIC, 1);
List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
topicPartitions.add(topicPartition1);
topicPartitions.add(topicPartition2);
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);
}
@Test
public void testPublish() throws Exception {
kafkaUtil.publishToKafka(request, response);
kafkaUtil.kafkaTemplate.flush();
ConsumerRecord<String, Object> consumedRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);
assertRecord(consumedRecord);
}
private void readFile(String testSequenceNo) {}
}
为什么会有这些异常?它们发生在什么过程中?请帮忙
您正在使用 class 规则,当测试完成时代理被拆除,但您让消费者保持打开状态;您需要在测试结束时关闭消费者。
我是 运行 使用 EmbeddedKafka 的 Junit 测试,我能够生产和消费到 embeddedKafka 代理,并成功断言我发送的数据。
但是我可以在断言完成后出现的堆栈跟踪中看到很多异常。
1)
java.io.IOException: 在读取响应之前断开与 0 的连接 在 org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) ~[kafka-clients-2.0.1.jar:na] 在 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:240) ~[kafka_2.11-2.0.1.jar:na] 在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [kafka_2.11-2.0.1.jar:na]
2)
2019-10-04 15:49:27.123 WARN 1812 --- [r-0-send-thread] kafka.controller.RequestSendThread : [RequestSendThread controllerId=0] 控制器 0 与代理的连接 localhost:54745 (id: 0 rack: null) 不成功
java.net.SocketTimeoutException: 1000 毫秒内连接失败 在 kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:280) [kafka_2.11-2.0.1.jar:na] 在 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233) [kafka_2.11-2.0.1.jar:na] 在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [kafka_2.11-2.0.1.jar:na]
3)
java.io.IOException:连接到 localhost:54745 (id: 0 rack: null) 失败。 在 org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70) ~[kafka-clients-2.0.1.jar:na] 在 kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279) [kafka_2.11-2.0.1.jar:na] 在 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233) [kafka_2.11-2.0.1.jar:na] 在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [kafka_2.11-2.0.1.jar:na]
4)
java.lang.InterruptedException: 空 在 java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(未知来源)~[na:1.8.0_181] 在 java.util.concurrent.CountDownLatch.await(未知来源)~[na:1.8.0_181] 在 kafka.utils.ShutdownableThread.pause(ShutdownableThread.scala:69) [kafka_2.11-2.0.1.jar:na] 在 kafka.controller.RequestSendThread.backoff$1(ControllerChannelManager.scala:221) ~[kafka_2.11-2.0.1.jar:na] 在 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:235) ~[kafka_2.11-2.0.1.jar:na] 在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [kafka_2.11-2.0.1.jar:na]
我的测试:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, classes = TestApplication.class)
@ActiveProfiles("test")
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
//@EmbeddedKafka(controlledShutdown = true)
public class KafkaUtilTest {
private static String TOPIC = "PE";
@Autowired
KafkaUtil kafkaUtil;
@Autowired
ConfigurationProperties configProperties;
SchedulingCallRequestDTO request;
ScheduleOrderResponseDTOv2 response;
Consumer<String, Object> consumer;
HashMap<String, String> expectedHeaderValueMap;
@ClassRule
public static EmbeddedKafkaRule embeddedKafkarule = new EmbeddedKafkaRule(1, true, TOPIC);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
System.setProperty("spring.kafka.producer.bootstrap-servers",
embeddedKafkarule.getEmbeddedKafka().getBrokersAsString());
}
@AfterClass
public static void tearDown() {
embeddedKafkarule.getEmbeddedKafka().getKafkaServers().forEach(b -> b.shutdown());
embeddedKafkarule.getEmbeddedKafka().getKafkaServers().forEach(b -> b.awaitShutdown());
}
@Before
public void init() {
readFile("0211");
expectedHeaderValueMap = getExpectedHeaderValueMap();
Map<String, Object> consumerConfigs = new HashMap<>(
KafkaTestUtils.consumerProps("consumer", "true", embeddedKafkarule.getEmbeddedKafka()));
consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerConfigs.put(JsonDeserializer.TRUSTED_PACKAGES, "com.adessa.promiseengine.dto.kafka");
consumer = new DefaultKafkaConsumerFactory<String, Object>(consumerConfigs).createConsumer();
List<String> topics = new ArrayList<>();
topics.add(TOPIC);
TopicPartition topicPartition1 = new TopicPartition(TOPIC, 0);
TopicPartition topicPartition2 = new TopicPartition(TOPIC, 1);
List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
topicPartitions.add(topicPartition1);
topicPartitions.add(topicPartition2);
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);
}
@Test
public void testPublish() throws Exception {
kafkaUtil.publishToKafka(request, response);
kafkaUtil.kafkaTemplate.flush();
ConsumerRecord<String, Object> consumedRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);
assertRecord(consumedRecord);
}
private void readFile(String testSequenceNo) {}
}
为什么会有这些异常?它们发生在什么过程中?请帮忙
您正在使用 class 规则,当测试完成时代理被拆除,但您让消费者保持打开状态;您需要在测试结束时关闭消费者。