处理消息后提交 Kafka 消费者偏移量的好模式是什么?
What is a good pattern for committing Kafka consumer offset after processing message?
我正在使用 Akka Streams Kafka 将 Kafka 消息通过管道传输到远程服务。我想保证服务只接收每条消息一次(至少一次和最多一次传递)。
这是我想出的代码:
private def startFlow[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
topicPattern: String,
mapCommittableMessageToSinkMessage: Function[CommittableMessage[String, String], T]) {
val groupId = config.getString("group-id")
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
import system.dispatcher // the ExecutionContext that will be used in ask call below
Consumer.committableSource(consumerSettings, Subscriptions
.topicPattern(topicPattern))
.map(message => (message, mapCommittableMessageToSinkMessage(message)))
.mapAsync(1)(tuple => ask(subscriber, tuple._2).map(_ => tuple._1))
.mapAsync(1)(message => message.committableOffset.commitScaladsl())
.runWith(Sink.ignore)
}
如代码所示,它映射原始消息的元组,以及传递给订阅者(发送到远程服务的参与者)的转换消息。元组的目的是在订阅者完成处理后提交偏移量。
这似乎是一种反模式,但我不确定是否有更好的方法。有什么更好的建议吗?
谢谢!
一种使它更简洁、更易于更改的方法是使用 GraphDSL。它将允许您生成一个图形分支来承载消息的 Committable
部分,而另一个分支可以执行所有需要的业务逻辑。
图表示例可以是(为了更清楚起见省略所有样板):
val src = Consumer.committableSource(consumerSettings, Subscriptions
.topicPattern(topicPattern))
val businessLogic = Flow[CommittableMessage[String, String]].mapAsync(1)(message => ask(subscriber, mapCommittableMessageToSinkMessage(message)))
val snk = Flow[CommittableMessage[String, String]].mapAsync(1)(message => message.committableOffset.commitScaladsl())
.runWith(Sink.ignore) // look into Sink.foldAsync for a more compact re-write of this part
src ~> broadcast
broadcast ~> businessLogic ~> zip.in0
broadcast ~> zip.in1
zip.out.map(_._2) ~> snk
这是在上面的回答中使用@stefano-bonetti 方法工作的完整代码:
private def startStream[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
topicSuffix: String,
convertCommittableMessageToSubscriberMessage: Function[CommittableMessage[String, String], T]) {
val groupId = config.getString("group-id")
val subscriberName = subscriber.path.name
val customerId = config.getString("customer-id")
val topicPattern = s"^$customerId\.$topicSuffix$$"
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withGroupId(s"$groupId.$subscriberName")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
import system.dispatcher // the ExecutionContext that will be used in ask call below
val src = Consumer.committableSource(consumerSettings, Subscriptions.topicPattern(topicPattern))
val businessLogic = Flow[CommittableMessage[String, String]]
.mapAsync(1)(message => subscriber.ask(convertCommittableMessageToSubscriberMessage(message)))
val snk = Flow[CommittableMessage[String, String]]
.mapAsync(1)(message => message.committableOffset.commitScaladsl())
.to(Sink.ignore)
val decider: Supervision.Decider = {
case e => {
system.log.error("error in stream", e)
Supervision.Stop
}
}
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[CommittableMessage[String, String]](2))
val zip = builder.add(Zip[Any, CommittableMessage[String, String]])
src ~> broadcast
broadcast ~> businessLogic ~> zip.in0
broadcast ~> zip.in1
zip.out.map(_._2) ~> snk
ClosedShape
})
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.run(materializer)
}
我正在使用 Akka Streams Kafka 将 Kafka 消息通过管道传输到远程服务。我想保证服务只接收每条消息一次(至少一次和最多一次传递)。
这是我想出的代码:
private def startFlow[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
topicPattern: String,
mapCommittableMessageToSinkMessage: Function[CommittableMessage[String, String], T]) {
val groupId = config.getString("group-id")
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
import system.dispatcher // the ExecutionContext that will be used in ask call below
Consumer.committableSource(consumerSettings, Subscriptions
.topicPattern(topicPattern))
.map(message => (message, mapCommittableMessageToSinkMessage(message)))
.mapAsync(1)(tuple => ask(subscriber, tuple._2).map(_ => tuple._1))
.mapAsync(1)(message => message.committableOffset.commitScaladsl())
.runWith(Sink.ignore)
}
如代码所示,它映射原始消息的元组,以及传递给订阅者(发送到远程服务的参与者)的转换消息。元组的目的是在订阅者完成处理后提交偏移量。
这似乎是一种反模式,但我不确定是否有更好的方法。有什么更好的建议吗?
谢谢!
一种使它更简洁、更易于更改的方法是使用 GraphDSL。它将允许您生成一个图形分支来承载消息的 Committable
部分,而另一个分支可以执行所有需要的业务逻辑。
图表示例可以是(为了更清楚起见省略所有样板):
val src = Consumer.committableSource(consumerSettings, Subscriptions
.topicPattern(topicPattern))
val businessLogic = Flow[CommittableMessage[String, String]].mapAsync(1)(message => ask(subscriber, mapCommittableMessageToSinkMessage(message)))
val snk = Flow[CommittableMessage[String, String]].mapAsync(1)(message => message.committableOffset.commitScaladsl())
.runWith(Sink.ignore) // look into Sink.foldAsync for a more compact re-write of this part
src ~> broadcast
broadcast ~> businessLogic ~> zip.in0
broadcast ~> zip.in1
zip.out.map(_._2) ~> snk
这是在上面的回答中使用@stefano-bonetti 方法工作的完整代码:
private def startStream[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
topicSuffix: String,
convertCommittableMessageToSubscriberMessage: Function[CommittableMessage[String, String], T]) {
val groupId = config.getString("group-id")
val subscriberName = subscriber.path.name
val customerId = config.getString("customer-id")
val topicPattern = s"^$customerId\.$topicSuffix$$"
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withGroupId(s"$groupId.$subscriberName")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
import system.dispatcher // the ExecutionContext that will be used in ask call below
val src = Consumer.committableSource(consumerSettings, Subscriptions.topicPattern(topicPattern))
val businessLogic = Flow[CommittableMessage[String, String]]
.mapAsync(1)(message => subscriber.ask(convertCommittableMessageToSubscriberMessage(message)))
val snk = Flow[CommittableMessage[String, String]]
.mapAsync(1)(message => message.committableOffset.commitScaladsl())
.to(Sink.ignore)
val decider: Supervision.Decider = {
case e => {
system.log.error("error in stream", e)
Supervision.Stop
}
}
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[CommittableMessage[String, String]](2))
val zip = builder.add(Zip[Any, CommittableMessage[String, String]])
src ~> broadcast
broadcast ~> businessLogic ~> zip.in0
broadcast ~> zip.in1
zip.out.map(_._2) ~> snk
ClosedShape
})
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.run(materializer)
}