在 Akka Streams Sink 之后获取原始元素的参考?

Get reference of original element after Akka Streams Sink?

我正在尝试使用 Amqp alpakka 连接器作为源和接收器。

Source<CommittableReadResult, NotUsed> AMQP_SOURCE  
      -> processAndGetResponse 
      -> Sink<ByteString, CompletionStage<Done>> AMQP_SINK 

我想确认从Amqp队列中获取的消息,在Sink操作成功后。像这样:

amqp_source(committableReadResult) 
    -> processAndGetResponse 
    -> amqp_sink 
    -> IfSinkOperationSuccess.Then(committableReadResult.ack())

我怎样才能做到这一点?我基本上只想在接收操作成功后才将消息标记为确认。

一般来说,如果你有一个 Sink,它具体化为 Future[Done](Scala API,我相信 Java 等价于 CompletionStage[Done]) ,您可以在 mapAsync 阶段内 运行 一个带有 Sink 的子流。数据只有成功才会通过

通过快速阅读 Alpakka AMQP API,Sink 都以这种方式实现,因此该方法会奏效。

假设您的 doSomethingWithMessage 函数同时产生 CommittableReadResultWriteMessage,Scala 中的类似以下内容将起作用:

def doSomethingWithMessage(in: CommittableReadResult): (CommittableReadResult, WriteMessage) = ???

amqpSource
  .map(doSomethingWithMessage)
  .mapAsync(parallelism) { tup =>
    val (crr, wm) = tup
    val fut = Source.single(crr, wm).runWith(amqpSink)
    fut.flatMap(_ => crr.ack().map(_ => crr.message))
  }.runWith(Sink.ignore)