akka.streams.Source 你可以发出值(类似于 monix.BehaviorSubject)
akka.streams.Source that you can emit values (similar to monix.BehaviorSubject)
我正在寻找 akka.stream.scaladsl.Source
构造方法,它可以让我简单地从代码的不同位置发出下一个值(例如,监视系统事件)。
- 我需要类似于
Promise
的东西。 Promise 向 Future
发出单个值。我需要向 Source
. 发出多个值
- 喜欢
monix.reactive.subjects.BehaviorSubject.onNext(_)
- 我不太关心背压。
目前我已经使用 monix 和 akka-streams(下面的代码)实现了这个,但我希望只有 akka-streams 解决方案:
import akka.stream.scaladsl.{Flow, Sink, Source}
import monix.reactive.subjects.BehaviorSubject
import monix.execution.Scheduler.Implicits.global
val bs = BehaviorSubject("") //monix subject is sink and source at the same time
//this is how it is currently implemented
def createSource() = {
val s1 = Source.fromPublisher(bs.toReactivePublisher) //here we create source from subject
Flow.fromSinkAndSourceCoupled[String, String](Sink.ignore, s1)
}
//somewhere else in code... some event happened
//this is how it works in monix.
val res:Future[Ack] = bs.onNext("Event #1471 just happened!") //here we emit value
Source
抽象,顾名思义,提供 API 来处理数据源。相反,您需要查看使用数据的抽象 - Sink
。而Sink.foreach
操作就是你要找的,最有可能的是:https://doc.akka.io/docs/akka/current/stream/operators/Sink/foreach.html
在你的例子中,代码看起来像这样:
import akka.stream.scaladsl.{Sink, Source}
val s1 = Source.// your WS akka stream source
s1.runWith(Sink.foreach(write))
希望对您有所帮助!
我认为您正在寻找的是 sink.foreach
。它为每个元素调用给定的过程 received.I 认为代码如下所示:
s1.runWith(Sink.foreach(write))
本质上,正在做的是对于源流,接收器尝试写入该流的每个元素。
编辑
我认为您正在寻找 maybe
。它会创建一个源,一旦物化的 Promise 完成并在 documentation
之外发出 value.Check
编辑
futureSource 也可以 work.It 流式传输给定未来源的元素,一旦它成功完成。
如果有帮助请告诉我!!
也许您正在寻找Actor Source
文档中的示例:
import akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSource
trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Exception) extends Protocol
val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = {
case Complete =>
}, failureMatcher = {
case Fail(ex) => ex
}, bufferSize = 8, overflowStrategy = OverflowStrategy.fail)
val ref = source
.collect {
case Message(msg) => msg
}
.to(Sink.foreach(println))
.run()
ref ! Message("msg1")
这样您就可以通过 actor 系统向 actor 发送消息,这些消息将从 ActorSource
下游发出。
https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html or https://doc.akka.io/docs/akka/current/stream/operators/Source/fromPublisher.html 是您所需要的,具体取决于您的 Source 从何处使用数据。
我正在寻找 akka.stream.scaladsl.Source
构造方法,它可以让我简单地从代码的不同位置发出下一个值(例如,监视系统事件)。
- 我需要类似于
Promise
的东西。 Promise 向Future
发出单个值。我需要向Source
. 发出多个值
- 喜欢
monix.reactive.subjects.BehaviorSubject.onNext(_)
- 我不太关心背压。
目前我已经使用 monix 和 akka-streams(下面的代码)实现了这个,但我希望只有 akka-streams 解决方案:
import akka.stream.scaladsl.{Flow, Sink, Source}
import monix.reactive.subjects.BehaviorSubject
import monix.execution.Scheduler.Implicits.global
val bs = BehaviorSubject("") //monix subject is sink and source at the same time
//this is how it is currently implemented
def createSource() = {
val s1 = Source.fromPublisher(bs.toReactivePublisher) //here we create source from subject
Flow.fromSinkAndSourceCoupled[String, String](Sink.ignore, s1)
}
//somewhere else in code... some event happened
//this is how it works in monix.
val res:Future[Ack] = bs.onNext("Event #1471 just happened!") //here we emit value
Source
抽象,顾名思义,提供 API 来处理数据源。相反,您需要查看使用数据的抽象 - Sink
。而Sink.foreach
操作就是你要找的,最有可能的是:https://doc.akka.io/docs/akka/current/stream/operators/Sink/foreach.html
在你的例子中,代码看起来像这样:
import akka.stream.scaladsl.{Sink, Source}
val s1 = Source.// your WS akka stream source
s1.runWith(Sink.foreach(write))
希望对您有所帮助!
我认为您正在寻找的是 sink.foreach
。它为每个元素调用给定的过程 received.I 认为代码如下所示:
s1.runWith(Sink.foreach(write))
本质上,正在做的是对于源流,接收器尝试写入该流的每个元素。
编辑
我认为您正在寻找 maybe
。它会创建一个源,一旦物化的 Promise 完成并在 documentation
编辑
futureSource 也可以 work.It 流式传输给定未来源的元素,一旦它成功完成。
如果有帮助请告诉我!!
也许您正在寻找Actor Source
文档中的示例:
import akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSource
trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Exception) extends Protocol
val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = {
case Complete =>
}, failureMatcher = {
case Fail(ex) => ex
}, bufferSize = 8, overflowStrategy = OverflowStrategy.fail)
val ref = source
.collect {
case Message(msg) => msg
}
.to(Sink.foreach(println))
.run()
ref ! Message("msg1")
这样您就可以通过 actor 系统向 actor 发送消息,这些消息将从 ActorSource
下游发出。
https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html or https://doc.akka.io/docs/akka/current/stream/operators/Source/fromPublisher.html 是您所需要的,具体取决于您的 Source 从何处使用数据。