在 alsoTo 中定义的 Sink 什么也没有
Nothing comes to Sink defined in alsoTo
alsoTo 似乎对我不起作用。项目不会进入其中定义的接收器。这是我的。
val merged: Source[ArticleWithKeywords, _] = ...
val (ks, fut) = merged
.alsoTo(Flow[ArticleWithKeywords].map { a => a.id -> a.ids.toList }.to(queueManager.getIdsForAnsSink))
.map(_.id)
.groupedWithin(100, 5 seconds)
.mapAsync(4) { ids => runReferenceFetching(ids) }
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.both)
.run()
但我看到项目到达 runReferenceFetching。我错过了什么?
原来问题与 alsoTo
无关。问题出在使用 Source.fromPublisher
创建的接收器上。我错误地认为我可以使用相同的 Publisher[T]
创建多个接收器。由于已经有另一个水槽,第二个没有用。
alsoTo 似乎对我不起作用。项目不会进入其中定义的接收器。这是我的。
val merged: Source[ArticleWithKeywords, _] = ...
val (ks, fut) = merged
.alsoTo(Flow[ArticleWithKeywords].map { a => a.id -> a.ids.toList }.to(queueManager.getIdsForAnsSink))
.map(_.id)
.groupedWithin(100, 5 seconds)
.mapAsync(4) { ids => runReferenceFetching(ids) }
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.both)
.run()
但我看到项目到达 runReferenceFetching。我错过了什么?
原来问题与 alsoTo
无关。问题出在使用 Source.fromPublisher
创建的接收器上。我错误地认为我可以使用相同的 Publisher[T]
创建多个接收器。由于已经有另一个水槽,第二个没有用。