Kafka 间歇性测试 failing/succeding
Kafka test failing/succeding intermittently
我正在尝试为 kafka 0.8.2 中的 kafka scala 客户端的抽象编写一个简单的测试。它基本上只是向 kafka 写一条消息,然后我尝试读回它。但是,我遇到了间歇性失败的问题,因此我将测试代码归结为以下代码。该测试有时(很少)通过,有时失败。我在做什么?
package mykafkatest
import java.net.ServerSocket
import java.nio.file.Files
import java.util.{UUID, Properties}
import kafka.consumer.{Whitelist, ConsumerConfig, Consumer}
import kafka.producer.{ProducerConfig, Producer, KeyedMessage}
import kafka.serializer.StringDecoder
import kafka.server.KafkaConfig
import kafka.server.KafkaServerStartable
import org.apache.curator.test.TestingServer
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
class KafkaSenderTest extends org.scalatest.FunSpecLike with org.scalatest.ShouldMatchers with org.scalatest.BeforeAndAfterAll {
import scala.concurrent.ExecutionContext.Implicits.global
val zkServer = new TestingServer()
val socket = new ServerSocket(0)
val port = socket.getLocalPort.toString
socket.close()
val tmpDir = Files.createTempDirectory("kafka-test-logs")
val serverProps = new Properties
serverProps.put("broker.id", port)
serverProps.put("log.dirs", tmpDir.toAbsolutePath.toString)
serverProps.put("host.name", "localhost")
serverProps.put("zookeeper.connect", zkServer.getConnectString)
serverProps.put("port", port)
val config = new KafkaConfig(serverProps)
val kafkaServer = new KafkaServerStartable(config)
override def beforeAll ={
kafkaServer.startup()
}
override def afterAll = {
kafkaServer.shutdown()
}
it("should put messages on a kafka queue") {
println("zkServer: " + zkServer.getConnectString)
println("broker port: " + port)
val consumerProps = new Properties()
consumerProps.put("group.id", UUID.randomUUID().toString)
consumerProps.put("zookeeper.connect", zkServer.getConnectString)
val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps))
val topic = "some-topic"
val filterSpec = new Whitelist(topic)
val stream = consumerConnector.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head
val producerProps = new Properties()
producerProps.put("metadata.broker.list","localhost:"+port)
val sender = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
val keyedMessage = new KeyedMessage[Array[Byte], Array[Byte]](topic, "awesome message".getBytes("UTF-8"))
sender.send(keyedMessage)
val msg = Await.result(Future { stream.take(1) }, 5 seconds)
msg.headOption should not be(empty)
}
}
编辑:
我用下面的 build.sbt 和上面的代码创建了一个新项目作为测试 class。
name := "mykafkatest"
version := "1.0"
scalaVersion := "2.11.5"
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka" % "0.8.2.0",
"org.scalatest" %% "scalatest" % "2.2.2" % "test",
"org.apache.curator" % "curator-test" % "2.7.0" % "test"
)
并且测试似乎更频繁地通过,但它仍然断断续续地失败...
我认为这是某种消息缓冲问题。如果您发送 200 条消息(对我而言):
(1 to 200).foreach(i => sender.send(keyedMessage))
199 条消息失败。我尝试更改配置,但找不到使 1 条消息起作用的任何魔法,但我确信有一些配置可以使它起作用。
您可能遇到竞争条件,导致消费者在消息发送后实际完成初始化,然后忽略消息,因为默认情况下它从最大偏移量开始。
尝试添加
consumerProps.put("auto.offset.reset", "smallest")
到您的消费者财产
我正在尝试为 kafka 0.8.2 中的 kafka scala 客户端的抽象编写一个简单的测试。它基本上只是向 kafka 写一条消息,然后我尝试读回它。但是,我遇到了间歇性失败的问题,因此我将测试代码归结为以下代码。该测试有时(很少)通过,有时失败。我在做什么?
package mykafkatest
import java.net.ServerSocket
import java.nio.file.Files
import java.util.{UUID, Properties}
import kafka.consumer.{Whitelist, ConsumerConfig, Consumer}
import kafka.producer.{ProducerConfig, Producer, KeyedMessage}
import kafka.serializer.StringDecoder
import kafka.server.KafkaConfig
import kafka.server.KafkaServerStartable
import org.apache.curator.test.TestingServer
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
class KafkaSenderTest extends org.scalatest.FunSpecLike with org.scalatest.ShouldMatchers with org.scalatest.BeforeAndAfterAll {
import scala.concurrent.ExecutionContext.Implicits.global
val zkServer = new TestingServer()
val socket = new ServerSocket(0)
val port = socket.getLocalPort.toString
socket.close()
val tmpDir = Files.createTempDirectory("kafka-test-logs")
val serverProps = new Properties
serverProps.put("broker.id", port)
serverProps.put("log.dirs", tmpDir.toAbsolutePath.toString)
serverProps.put("host.name", "localhost")
serverProps.put("zookeeper.connect", zkServer.getConnectString)
serverProps.put("port", port)
val config = new KafkaConfig(serverProps)
val kafkaServer = new KafkaServerStartable(config)
override def beforeAll ={
kafkaServer.startup()
}
override def afterAll = {
kafkaServer.shutdown()
}
it("should put messages on a kafka queue") {
println("zkServer: " + zkServer.getConnectString)
println("broker port: " + port)
val consumerProps = new Properties()
consumerProps.put("group.id", UUID.randomUUID().toString)
consumerProps.put("zookeeper.connect", zkServer.getConnectString)
val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps))
val topic = "some-topic"
val filterSpec = new Whitelist(topic)
val stream = consumerConnector.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head
val producerProps = new Properties()
producerProps.put("metadata.broker.list","localhost:"+port)
val sender = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
val keyedMessage = new KeyedMessage[Array[Byte], Array[Byte]](topic, "awesome message".getBytes("UTF-8"))
sender.send(keyedMessage)
val msg = Await.result(Future { stream.take(1) }, 5 seconds)
msg.headOption should not be(empty)
}
}
编辑: 我用下面的 build.sbt 和上面的代码创建了一个新项目作为测试 class。
name := "mykafkatest"
version := "1.0"
scalaVersion := "2.11.5"
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka" % "0.8.2.0",
"org.scalatest" %% "scalatest" % "2.2.2" % "test",
"org.apache.curator" % "curator-test" % "2.7.0" % "test"
)
并且测试似乎更频繁地通过,但它仍然断断续续地失败...
我认为这是某种消息缓冲问题。如果您发送 200 条消息(对我而言):
(1 to 200).foreach(i => sender.send(keyedMessage))
199 条消息失败。我尝试更改配置,但找不到使 1 条消息起作用的任何魔法,但我确信有一些配置可以使它起作用。
您可能遇到竞争条件,导致消费者在消息发送后实际完成初始化,然后忽略消息,因为默认情况下它从最大偏移量开始。
尝试添加
consumerProps.put("auto.offset.reset", "smallest")
到您的消费者财产