在 Akka Streams Sink 之后获取原始元素的参考?
Get reference of original element after Akka Streams Sink?
我正在尝试使用 Amqp alpakka 连接器作为源和接收器。
Source<CommittableReadResult, NotUsed> AMQP_SOURCE
-> processAndGetResponse
-> Sink<ByteString, CompletionStage<Done>> AMQP_SINK
我想确认从Amqp队列中获取的消息,在Sink操作成功后。像这样:
amqp_source(committableReadResult)
-> processAndGetResponse
-> amqp_sink
-> IfSinkOperationSuccess.Then(committableReadResult.ack())
我怎样才能做到这一点?我基本上只想在接收操作成功后才将消息标记为确认。
一般来说,如果你有一个 Sink
,它具体化为 Future[Done]
(Scala API,我相信 Java 等价于 CompletionStage[Done]
) ,您可以在 mapAsync
阶段内 运行 一个带有 Sink
的子流。数据只有成功才会通过
通过快速阅读 Alpakka AMQP API,Sink
都以这种方式实现,因此该方法会奏效。
假设您的 doSomethingWithMessage
函数同时产生 CommittableReadResult
和 WriteMessage
,Scala 中的类似以下内容将起作用:
def doSomethingWithMessage(in: CommittableReadResult): (CommittableReadResult, WriteMessage) = ???
amqpSource
.map(doSomethingWithMessage)
.mapAsync(parallelism) { tup =>
val (crr, wm) = tup
val fut = Source.single(crr, wm).runWith(amqpSink)
fut.flatMap(_ => crr.ack().map(_ => crr.message))
}.runWith(Sink.ignore)
我正在尝试使用 Amqp alpakka 连接器作为源和接收器。
Source<CommittableReadResult, NotUsed> AMQP_SOURCE
-> processAndGetResponse
-> Sink<ByteString, CompletionStage<Done>> AMQP_SINK
我想确认从Amqp队列中获取的消息,在Sink操作成功后。像这样:
amqp_source(committableReadResult)
-> processAndGetResponse
-> amqp_sink
-> IfSinkOperationSuccess.Then(committableReadResult.ack())
我怎样才能做到这一点?我基本上只想在接收操作成功后才将消息标记为确认。
一般来说,如果你有一个 Sink
,它具体化为 Future[Done]
(Scala API,我相信 Java 等价于 CompletionStage[Done]
) ,您可以在 mapAsync
阶段内 运行 一个带有 Sink
的子流。数据只有成功才会通过
通过快速阅读 Alpakka AMQP API,Sink
都以这种方式实现,因此该方法会奏效。
假设您的 doSomethingWithMessage
函数同时产生 CommittableReadResult
和 WriteMessage
,Scala 中的类似以下内容将起作用:
def doSomethingWithMessage(in: CommittableReadResult): (CommittableReadResult, WriteMessage) = ???
amqpSource
.map(doSomethingWithMessage)
.mapAsync(parallelism) { tup =>
val (crr, wm) = tup
val fut = Source.single(crr, wm).runWith(amqpSink)
fut.flatMap(_ => crr.ack().map(_ => crr.message))
}.runWith(Sink.ignore)