如何将消息发布回 RabbitMQ 以便其他消费者可以使用?
How to release a message back to RabbitMQ so it's available for another consumer?
我正在使用 Alpakka AMQP 库 (https://developer.lightbend.com/docs/alpakka/current/amqp.html) 处理反应流中的 RabbitMQ 消息并将它们转储到 Kafka。
我们使用的是 Avro 和 Schema Registry,因此格式错误的消息无法通过验证,需要根据具体情况进行处理。我们想要的行为是应用程序死机,并在 30 秒 - 1 分钟后重新启动,在此期间我们可以使用 UI 将消息从队列中拉出以查看它有什么问题。
由于某些原因,我的nack()
似乎没有释放回消息,它一直处于Unacked
状态,直到释放后才能查看。如何做到这一点?我将包括我的代码片段:
Flow[CommittableIncomingMessage]
.map { msg =>
rabbitReceivedCounter.increment()
reconciliationGauge.increment()
val json = msg.message.bytes.utf8String
val record = Try(converter.convert(json)) match {
case Success(result) => result
case Failure(e) => e match {
case e: JsonToAvroConversionException ⇒
val errorMsg = s"Failed to serialize to avro: ${e.getMessage} - Field: ${e.getFieldName} - Value: ${e.getValue}"
failurePublisher.publishError(errorMsg, StatusCodes.BadRequest.intValue)
kafkaFailureCounter.increment()
Await.result(msg.nack(), Duration.Undefined)
throw e
nack()
是一个 java.util.Future
,所以为了测试我在它周围放了一个 Await
以确保问题不是我在信号可以到达 Rabbit。
当流中抛出异常时,导致错误的元素通常会丢失。一个想法是将 Avro 转换卸载到 returns 转换结果和原始消息的 actor:这种方法将允许您 ack
或 nack
一条消息,具体取决于是否它可以转换为 Avro。
例如,演员可能看起来像下面这样:
case class AvroResult(avro: Option[Avro], msg: CommittableIncomingMessage)
// ^ change this to whatever type you're using
class ConverterActor extends Actor {
val converter = ???
def receive = {
case msg: CommittableIncomingMessage =>
try {
val json = msg.message.bytes.utf8String
val avro = converter.convert(json)
sender() ! AvroResult(Some(avro), msg)
} catch {
case _ =>
sender() ! AvroResult(None, msg)
}
}
}
那你就可以在你的直播中询问这位演员了:
val converterActor = system.actorOf(Props[ConverterActor])
val source: Source[Avro, _] = amqpSource
.ask[AvroResult](converterActor)
.mapAsync(1) {
case AvroResult(Some(avro), msg) => // ack
msg.ack().map(_ => Some(avro))
case AvroResult(None, msg) => // nack
msg.nack().map(_ => None)
}
.collect {
case Some(avro) => avro
}
上面的 Source
向下游发出 Avro 转换的消息,这些消息用 ack
确认。不可转换的消息被 nack
拒绝并且不会传递到下游。
我正在使用 Alpakka AMQP 库 (https://developer.lightbend.com/docs/alpakka/current/amqp.html) 处理反应流中的 RabbitMQ 消息并将它们转储到 Kafka。
我们使用的是 Avro 和 Schema Registry,因此格式错误的消息无法通过验证,需要根据具体情况进行处理。我们想要的行为是应用程序死机,并在 30 秒 - 1 分钟后重新启动,在此期间我们可以使用 UI 将消息从队列中拉出以查看它有什么问题。
由于某些原因,我的nack()
似乎没有释放回消息,它一直处于Unacked
状态,直到释放后才能查看。如何做到这一点?我将包括我的代码片段:
Flow[CommittableIncomingMessage]
.map { msg =>
rabbitReceivedCounter.increment()
reconciliationGauge.increment()
val json = msg.message.bytes.utf8String
val record = Try(converter.convert(json)) match {
case Success(result) => result
case Failure(e) => e match {
case e: JsonToAvroConversionException ⇒
val errorMsg = s"Failed to serialize to avro: ${e.getMessage} - Field: ${e.getFieldName} - Value: ${e.getValue}"
failurePublisher.publishError(errorMsg, StatusCodes.BadRequest.intValue)
kafkaFailureCounter.increment()
Await.result(msg.nack(), Duration.Undefined)
throw e
nack()
是一个 java.util.Future
,所以为了测试我在它周围放了一个 Await
以确保问题不是我在信号可以到达 Rabbit。
当流中抛出异常时,导致错误的元素通常会丢失。一个想法是将 Avro 转换卸载到 returns 转换结果和原始消息的 actor:这种方法将允许您 ack
或 nack
一条消息,具体取决于是否它可以转换为 Avro。
例如,演员可能看起来像下面这样:
case class AvroResult(avro: Option[Avro], msg: CommittableIncomingMessage)
// ^ change this to whatever type you're using
class ConverterActor extends Actor {
val converter = ???
def receive = {
case msg: CommittableIncomingMessage =>
try {
val json = msg.message.bytes.utf8String
val avro = converter.convert(json)
sender() ! AvroResult(Some(avro), msg)
} catch {
case _ =>
sender() ! AvroResult(None, msg)
}
}
}
那你就可以在你的直播中询问这位演员了:
val converterActor = system.actorOf(Props[ConverterActor])
val source: Source[Avro, _] = amqpSource
.ask[AvroResult](converterActor)
.mapAsync(1) {
case AvroResult(Some(avro), msg) => // ack
msg.ack().map(_ => Some(avro))
case AvroResult(None, msg) => // nack
msg.nack().map(_ => None)
}
.collect {
case Some(avro) => avro
}
上面的 Source
向下游发出 Avro 转换的消息,这些消息用 ack
确认。不可转换的消息被 nack
拒绝并且不会传递到下游。