将回调方法实现转换为 akka 流源
Converting a callback-method implementation into an akka stream Source
我正在与我无法控制的 java 图书馆的数据发布者合作。发布者库使用典型的回调设置;库代码中的某处(库是 java 但为了简洁起见,我将在 scala 中进行描述):
type DataType = ???
trait DataConsumer {
def onData(data : DataType) : Unit
}
库的用户需要编写实现 onData
方法的 class 并将其传递给 DataProducer
,库代码如下所示:
class DataProducer(consumer : DataConsumer) {...}
DataProducer
有自己无法控制的内部线程,以及随附的数据缓冲区,即每当有另一个 DataType
对象要使用时调用 onData
。
所以,我的问题是:如何编写一个层,将 convert/translate 原始库模式转换为 akka 流 Source 对象?
提前谢谢你。
有多种方法可以解决这个问题。一种是使用 ActorPublisher: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors ,您可以在其中更改回调,以便它向演员发送消息。根据回调的工作方式,您也可以使用 mapAsync(将回调转换为 Future)。仅当一个请求恰好产生一个回调调用时才有效。
回调 --> 来源
详细说明 Endre Varga 的回答,下面是创建 DataConsumer
回调函数的代码,它将消息发送到 akka 流 Source
。
注意:创建功能性 ActorPublish 比我在下面指出的要多得多。特别是,需要进行缓冲以处理 DataProducer
调用 onData
比 Sink
发出信号需求更快的情况(参见 example)。下面的代码只是设置了 "wiring".
import akka.actor.ActorRef
import akka.actor.Actor.noSender
import akka.stream.Actor.ActorPublisher
/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
def receive : Receive = {
case message : DataType => deliverBuf() //defined in example link
}
}
class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
override def onData(data : DataType) = sourceActor.tell(data, noSender)
}
//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]
//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)
//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))
//setup the incoming data feed from 3rd party library
val dataProducer = DataProducer(actorConsumer)
回调 --> 全流
最初的问题专门要求对 Source 进行回调,但如果整个流(而不仅仅是 Source)已经可用,则处理回调更容易处理。这是因为可以使用 Source#actorRef 函数将流具体化为 ActorRef
。例如:
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val bufferSize = 100
val streamRef =
Source
.actorRef[DataType](bufferSize, overflowStrategy)
.via(someFlow)
.to(someSink)
.run()
val streamConsumer = new DataConsumer {
override def onData(data : DataType) : Unit = streamRef ! data
}
val dataProducer = DataProducer(streamConsumer)
我正在与我无法控制的 java 图书馆的数据发布者合作。发布者库使用典型的回调设置;库代码中的某处(库是 java 但为了简洁起见,我将在 scala 中进行描述):
type DataType = ???
trait DataConsumer {
def onData(data : DataType) : Unit
}
库的用户需要编写实现 onData
方法的 class 并将其传递给 DataProducer
,库代码如下所示:
class DataProducer(consumer : DataConsumer) {...}
DataProducer
有自己无法控制的内部线程,以及随附的数据缓冲区,即每当有另一个 DataType
对象要使用时调用 onData
。
所以,我的问题是:如何编写一个层,将 convert/translate 原始库模式转换为 akka 流 Source 对象?
提前谢谢你。
有多种方法可以解决这个问题。一种是使用 ActorPublisher: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors ,您可以在其中更改回调,以便它向演员发送消息。根据回调的工作方式,您也可以使用 mapAsync(将回调转换为 Future)。仅当一个请求恰好产生一个回调调用时才有效。
回调 --> 来源
详细说明 Endre Varga 的回答,下面是创建 DataConsumer
回调函数的代码,它将消息发送到 akka 流 Source
。
注意:创建功能性 ActorPublish 比我在下面指出的要多得多。特别是,需要进行缓冲以处理 DataProducer
调用 onData
比 Sink
发出信号需求更快的情况(参见 example)。下面的代码只是设置了 "wiring".
import akka.actor.ActorRef
import akka.actor.Actor.noSender
import akka.stream.Actor.ActorPublisher
/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
def receive : Receive = {
case message : DataType => deliverBuf() //defined in example link
}
}
class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
override def onData(data : DataType) = sourceActor.tell(data, noSender)
}
//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]
//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)
//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))
//setup the incoming data feed from 3rd party library
val dataProducer = DataProducer(actorConsumer)
回调 --> 全流
最初的问题专门要求对 Source 进行回调,但如果整个流(而不仅仅是 Source)已经可用,则处理回调更容易处理。这是因为可以使用 Source#actorRef 函数将流具体化为 ActorRef
。例如:
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val bufferSize = 100
val streamRef =
Source
.actorRef[DataType](bufferSize, overflowStrategy)
.via(someFlow)
.to(someSink)
.run()
val streamConsumer = new DataConsumer {
override def onData(data : DataType) : Unit = streamRef ! data
}
val dataProducer = DataProducer(streamConsumer)