使用 Kafka Streams 的 Scala Embedded Kafka 中的生产者错误
Producer error in Scala Embedded Kafka with Kafka Streams
我有一个测试,它会不时地留下一个开放的生产者线程,并持续记录错误。
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
测试有效,但有时会像上面那样失败。
test("My test") {
val topology = Application.getTopology(...)
val streams = new KafkaStreams(topology,properties)
withRunningKafka {
createCustomTopic(eventTopic)
val streamId = UUIDs.newUuid().toString
logger.info(s"Creating stream with Application ID: [$streamId]")
val streams = new KafkaStreams(topology, streamConfig(streamId, PropertiesConfig.asScalaMap(props)))
try {
publishToKafka(eventTopic, key = keyMSite1UID1, message = event11a)
// ... several more publishings
Thread.sleep(publishingDelay) // Give time to initialize
streams.start()
Thread.sleep(deletionDelay)
withConsumer[MyKey, MyEvent, Unit] { consumer =>
val consumedMessages: Stream[(MyKey, MyEvent)] =
consumer.consumeLazily[(MyKey, MyEvent)](eventTopic)
val messages = consumedMessages.take(20).toList
messages.foreach(tuple => logger.info("EVENT END: " + tuple))
messages.size should be(6)
// several assertions here
}
} finally {
streams.close()
}
}(config)
}
一个特殊之处在于,流应用程序会针对它使用的同一主题生成删除事件。
此套件中有两个类似的测试。我在 sbt 下执行测试套件,如下所示:
testOnly *MyTest
五分之四的执行留下悬空线程,无限期地发布这些错误。 3个一组出现,我也不知道为什么。
我试过在调用 close() 后设置延迟,但它似乎没有帮助。
如何避免悬挂生产者线程?
在您的测试中,您创建了两个 KafkaStreams
个实例,但您只创建了 close()
个。我假设缺少 Producer
属于您不关闭的实例。请注意,即使您从未调用过 KafkaStreams#start()
,您也需要调用 KafkaStreams#close()
。
我有一个测试,它会不时地留下一个开放的生产者线程,并持续记录错误。
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
测试有效,但有时会像上面那样失败。
test("My test") {
val topology = Application.getTopology(...)
val streams = new KafkaStreams(topology,properties)
withRunningKafka {
createCustomTopic(eventTopic)
val streamId = UUIDs.newUuid().toString
logger.info(s"Creating stream with Application ID: [$streamId]")
val streams = new KafkaStreams(topology, streamConfig(streamId, PropertiesConfig.asScalaMap(props)))
try {
publishToKafka(eventTopic, key = keyMSite1UID1, message = event11a)
// ... several more publishings
Thread.sleep(publishingDelay) // Give time to initialize
streams.start()
Thread.sleep(deletionDelay)
withConsumer[MyKey, MyEvent, Unit] { consumer =>
val consumedMessages: Stream[(MyKey, MyEvent)] =
consumer.consumeLazily[(MyKey, MyEvent)](eventTopic)
val messages = consumedMessages.take(20).toList
messages.foreach(tuple => logger.info("EVENT END: " + tuple))
messages.size should be(6)
// several assertions here
}
} finally {
streams.close()
}
}(config)
}
一个特殊之处在于,流应用程序会针对它使用的同一主题生成删除事件。
此套件中有两个类似的测试。我在 sbt 下执行测试套件,如下所示:
testOnly *MyTest
五分之四的执行留下悬空线程,无限期地发布这些错误。 3个一组出现,我也不知道为什么。
我试过在调用 close() 后设置延迟,但它似乎没有帮助。 如何避免悬挂生产者线程?
在您的测试中,您创建了两个 KafkaStreams
个实例,但您只创建了 close()
个。我假设缺少 Producer
属于您不关闭的实例。请注意,即使您从未调用过 KafkaStreams#start()
,您也需要调用 KafkaStreams#close()
。