Kafka 0.8 的高级消费者 api 生命周期
High level Consumer api lifecycle for Kafka 0.8
我找不到高级消费者的生命周期描述。我在 0.8.2.2 上,我不能使用来自 kafka-clients 的 "modern" 消费者。这是我的代码:
def consume(numberOfEvents: Int, await: Duration = 100.millis): List[MessageEnvelope] = {
val consumerProperties = new Properties()
consumerProperties.put("zookeeper.connect", kafkaConfig.zooKeeperConnectString)
consumerProperties.put("group.id", consumerGroup)
consumerProperties.put("auto.offset.reset", "smallest")
val consumer = Consumer.create(new ConsumerConfig(consumerProperties))
try {
val messageStreams = consumer.createMessageStreams(
Predef.Map(kafkaConfig.topic -> 1),
new DefaultDecoder,
new MessageEnvelopeDecoder)
val receiveMessageFuture = Future[List[MessageEnvelope]] {
messageStreams(kafkaConfig.topic)
.flatMap(stream => stream.take(numberOfEvents).map(_.message()))
}
Await.result(receiveMessageFuture, await)
} finally {
consumer.shutdown()
}
我不清楚。我应该在每次检索消息后关闭消费者,还是可以保留实例并将其重新用于消息提取?我想重用实例是正确的方法,但找不到一些文章/最佳实践。
我正在尝试重用消费者和/或消息流。它对我来说效果不佳,我找不到原因。
如果我尝试重用 messageStreams,我会遇到异常:
2017-04-17_19:57:57.088 ERROR MessageEnvelopeConsumer - Error while awaiting for messages java.lang.IllegalStateException: Iterator is in failed state
java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
at scala.collection.IterableLike$class.take(IterableLike.scala:134)
at kafka.consumer.KafkaStream.take(KafkaStream.scala:25)
发生在这里:
def consume(numberOfEvents: Int, await: Duration = 100.millis): List[MessageEnvelope] = {
try {
val receiveMessageFuture = Future[List[MessageEnvelope]] {
messageStreams(kafkaConfig.topic)
.flatMap(stream => stream.take(numberOfEvents).map(_.message()))
}
Try(Await.result(receiveMessageFuture, await)) match {
case Success(result) => result
case Failure(_: TimeoutException) => List.empty
case Failure(e) =>
// ===> never got any message from topic
logger.error(s"Error while awaiting for messages ${e.getClass.getName}: ${e.getMessage}", e)
List.empty
}
} catch {
case e: Exception =>
logger.warn(s"Error while consuming messages", e)
List.empty
}
}
我每次都尝试创建消息流:
运气不好...
2017-04-17_20:02:44.236 WARN MessageEnvelopeConsumer - Error while consuming messages
kafka.common.MessageStreamsExistException: ZookeeperConsumerConnector can create message streams at most once
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:151)
at MessageEnvelopeConsumer.consume(MessageEnvelopeConsumer.scala:47)
发生在这里:
def consume(numberOfEvents: Int, await: Duration = 100.millis): List[MessageEnvelope] = {
try {
val messageStreams = consumer.createMessageStreams(
Predef.Map(kafkaConfig.topic -> 1),
new DefaultDecoder,
new MessageEnvelopeDecoder)
val receiveMessageFuture = Future[List[MessageEnvelope]] {
messageStreams(kafkaConfig.topic)
.flatMap(stream => stream.take(numberOfEvents).map(_.message()))
}
Try(Await.result(receiveMessageFuture, await)) match {
case Success(result) => result
case Failure(_: TimeoutException) => List.empty
case Failure(e) =>
logger.error(s"Error while awaiting for messages ${e.getClass.getName}: ${e.getMessage}", e)
List.empty
}
} catch {
case e: Exception =>
// ===> now exception raised here
logger.warn(s"Error while consuming messages", e)
List.empty
}
}
UPD
我使用了基于迭代器的方法。它看起来是这样的:
// consumerProperties.put("consumer.timeout.ms", "100")
private lazy val consumer: ConsumerConnector = Consumer.create(new ConsumerConfig(consumerProperties))
private lazy val messageStreams: Seq[KafkaStream[Array[Byte], MessageEnvelope]] =
consumer.createMessageStreamsByFilter(Whitelist(kafkaConfig.topic), 1, new DefaultDecoder, new MessageEnvelopeDecoder)
private lazy val iterator: ConsumerIterator[Array[Byte], MessageEnvelope] = {
val stream = messageStreams.head
stream.iterator()
}
def consume(): List[MessageEnvelope] = {
try {
if (iterator.hasNext) {
val fromKafka: MessageAndMetadata[Array[Byte], MessageEnvelope] = iterator.next
List(fromKafka.message())
} else {
List.empty
}
} catch {
case _: ConsumerTimeoutException =>
List.empty
case e: Exception =>
logger.warn(s"Error while consuming messages", e)
List.empty
}
}
现在我想弄清楚它是否自动将偏移量提交给 ZK...
持续关闭会导致不必要的消费者组重新平衡,从而极大地影响性能。请参阅本文了解最佳做法:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
我的回答是最新的问题更新。迭代器方法按预期对我有效。
我找不到高级消费者的生命周期描述。我在 0.8.2.2 上,我不能使用来自 kafka-clients 的 "modern" 消费者。这是我的代码:
def consume(numberOfEvents: Int, await: Duration = 100.millis): List[MessageEnvelope] = {
val consumerProperties = new Properties()
consumerProperties.put("zookeeper.connect", kafkaConfig.zooKeeperConnectString)
consumerProperties.put("group.id", consumerGroup)
consumerProperties.put("auto.offset.reset", "smallest")
val consumer = Consumer.create(new ConsumerConfig(consumerProperties))
try {
val messageStreams = consumer.createMessageStreams(
Predef.Map(kafkaConfig.topic -> 1),
new DefaultDecoder,
new MessageEnvelopeDecoder)
val receiveMessageFuture = Future[List[MessageEnvelope]] {
messageStreams(kafkaConfig.topic)
.flatMap(stream => stream.take(numberOfEvents).map(_.message()))
}
Await.result(receiveMessageFuture, await)
} finally {
consumer.shutdown()
}
我不清楚。我应该在每次检索消息后关闭消费者,还是可以保留实例并将其重新用于消息提取?我想重用实例是正确的方法,但找不到一些文章/最佳实践。
我正在尝试重用消费者和/或消息流。它对我来说效果不佳,我找不到原因。
如果我尝试重用 messageStreams,我会遇到异常:
2017-04-17_19:57:57.088 ERROR MessageEnvelopeConsumer - Error while awaiting for messages java.lang.IllegalStateException: Iterator is in failed state
java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
at scala.collection.IterableLike$class.take(IterableLike.scala:134)
at kafka.consumer.KafkaStream.take(KafkaStream.scala:25)
发生在这里:
def consume(numberOfEvents: Int, await: Duration = 100.millis): List[MessageEnvelope] = {
try {
val receiveMessageFuture = Future[List[MessageEnvelope]] {
messageStreams(kafkaConfig.topic)
.flatMap(stream => stream.take(numberOfEvents).map(_.message()))
}
Try(Await.result(receiveMessageFuture, await)) match {
case Success(result) => result
case Failure(_: TimeoutException) => List.empty
case Failure(e) =>
// ===> never got any message from topic
logger.error(s"Error while awaiting for messages ${e.getClass.getName}: ${e.getMessage}", e)
List.empty
}
} catch {
case e: Exception =>
logger.warn(s"Error while consuming messages", e)
List.empty
}
}
我每次都尝试创建消息流:
运气不好...
2017-04-17_20:02:44.236 WARN MessageEnvelopeConsumer - Error while consuming messages
kafka.common.MessageStreamsExistException: ZookeeperConsumerConnector can create message streams at most once
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:151)
at MessageEnvelopeConsumer.consume(MessageEnvelopeConsumer.scala:47)
发生在这里:
def consume(numberOfEvents: Int, await: Duration = 100.millis): List[MessageEnvelope] = {
try {
val messageStreams = consumer.createMessageStreams(
Predef.Map(kafkaConfig.topic -> 1),
new DefaultDecoder,
new MessageEnvelopeDecoder)
val receiveMessageFuture = Future[List[MessageEnvelope]] {
messageStreams(kafkaConfig.topic)
.flatMap(stream => stream.take(numberOfEvents).map(_.message()))
}
Try(Await.result(receiveMessageFuture, await)) match {
case Success(result) => result
case Failure(_: TimeoutException) => List.empty
case Failure(e) =>
logger.error(s"Error while awaiting for messages ${e.getClass.getName}: ${e.getMessage}", e)
List.empty
}
} catch {
case e: Exception =>
// ===> now exception raised here
logger.warn(s"Error while consuming messages", e)
List.empty
}
}
UPD
我使用了基于迭代器的方法。它看起来是这样的:
// consumerProperties.put("consumer.timeout.ms", "100")
private lazy val consumer: ConsumerConnector = Consumer.create(new ConsumerConfig(consumerProperties))
private lazy val messageStreams: Seq[KafkaStream[Array[Byte], MessageEnvelope]] =
consumer.createMessageStreamsByFilter(Whitelist(kafkaConfig.topic), 1, new DefaultDecoder, new MessageEnvelopeDecoder)
private lazy val iterator: ConsumerIterator[Array[Byte], MessageEnvelope] = {
val stream = messageStreams.head
stream.iterator()
}
def consume(): List[MessageEnvelope] = {
try {
if (iterator.hasNext) {
val fromKafka: MessageAndMetadata[Array[Byte], MessageEnvelope] = iterator.next
List(fromKafka.message())
} else {
List.empty
}
} catch {
case _: ConsumerTimeoutException =>
List.empty
case e: Exception =>
logger.warn(s"Error while consuming messages", e)
List.empty
}
}
现在我想弄清楚它是否自动将偏移量提交给 ZK...
持续关闭会导致不必要的消费者组重新平衡,从而极大地影响性能。请参阅本文了解最佳做法:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
我的回答是最新的问题更新。迭代器方法按预期对我有效。