Reactive-Kafka:如何在异常时暂停消费者并按需重试
Reactive-Kafka : How to pause the consumer on exception and retry on demand
我已经在 Google 组上问了 this 问题,但还没有收到任何回复。所以在这里为不同的观众张贴这个。
我们正在为我们的应用程序使用 Reactive-Kafka。我们有如下场景,如果在处理消息时发生任何异常,我们希望停止向消费者发送消息。应在规定时间后或根据消费者方的明确请求重试消息。使用我们目前的方法,假设,如果消费者的数据库关闭了一段时间,它仍然会尝试从 kafka 读取并处理消息,但是由于数据库问题导致处理失败。这将使应用程序不必要地忙碌。取而代之的是,我们希望暂停消费者在规定的时间内接收消息(比如,等待 30 分钟重试)。
我们不确定如何处理这种情况。
是否可以这样做?我错过了什么吗?
这是从反应式 kafka 中获取的示例代码:
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
Future {
/**
* Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
*/
}.map(_ => msg.committableOffset).recover {
case ex => {
/**
* HOW TO DO ????????
* On exception, I would like to tell stream to stop sending messages and pause the consumer and try again within stipulated time
* or on demand from the last committed offset
*/
throw ex
}
}
}
.batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
batch.updated(elem)
}
.mapAsync(3)(_.commitScaladsl())
.runWith(Sink.ignore)
有一个 recoverWithRetries
组合器用于此目的。有关参考,请参阅 and the docs。
您可以提取您的来源
val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
Future {
/**
* Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
*/
}.map(_ => msg.committableOffset)
然后
src
.recoverWithRetries(attempts = -1, {case e: MyDatabaseException =>
logger.error(e)
src.delay(30.minutes, DelayOverflowStrategy.backpressure)})
...
(重试attempts=-1表示无限重试)
请注意,您可能需要将 src
的物化值从 akka.kafka.scaladsl.Consumer.Control
映射到 akka.NotUsed
以便在 recoverWithRetries
中引用它:
val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
Future {
/**
* Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
*/
}.map(_ => msg.committableOffset)
.mapMaterializedValue(_ => akka.NotUsed)
我已经在 Google 组上问了 this 问题,但还没有收到任何回复。所以在这里为不同的观众张贴这个。
我们正在为我们的应用程序使用 Reactive-Kafka。我们有如下场景,如果在处理消息时发生任何异常,我们希望停止向消费者发送消息。应在规定时间后或根据消费者方的明确请求重试消息。使用我们目前的方法,假设,如果消费者的数据库关闭了一段时间,它仍然会尝试从 kafka 读取并处理消息,但是由于数据库问题导致处理失败。这将使应用程序不必要地忙碌。取而代之的是,我们希望暂停消费者在规定的时间内接收消息(比如,等待 30 分钟重试)。 我们不确定如何处理这种情况。
是否可以这样做?我错过了什么吗?
这是从反应式 kafka 中获取的示例代码:
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
Future {
/**
* Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
*/
}.map(_ => msg.committableOffset).recover {
case ex => {
/**
* HOW TO DO ????????
* On exception, I would like to tell stream to stop sending messages and pause the consumer and try again within stipulated time
* or on demand from the last committed offset
*/
throw ex
}
}
}
.batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
batch.updated(elem)
}
.mapAsync(3)(_.commitScaladsl())
.runWith(Sink.ignore)
有一个 recoverWithRetries
组合器用于此目的。有关参考,请参阅
您可以提取您的来源
val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
Future {
/**
* Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
*/
}.map(_ => msg.committableOffset)
然后
src
.recoverWithRetries(attempts = -1, {case e: MyDatabaseException =>
logger.error(e)
src.delay(30.minutes, DelayOverflowStrategy.backpressure)})
...
(重试attempts=-1表示无限重试)
请注意,您可能需要将 src
的物化值从 akka.kafka.scaladsl.Consumer.Control
映射到 akka.NotUsed
以便在 recoverWithRetries
中引用它:
val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
Future {
/**
* Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
*/
}.map(_ => msg.committableOffset)
.mapMaterializedValue(_ => akka.NotUsed)