如何使用 Akka Persistence 保存流式数据
How to save streaming data using Akka Persistence
我使用 StreamRefs 在集群中的参与者之间建立流连接。目前,在写入节点中,我手动将传入消息保存到日志文件中,但我想知道是否可以将其替换为用于写入的持久性 Sink
和用于在 actor 启动时读取的持久性 Source
来自 Akka Persistence 杂志。我一直在考虑用 Persistent actor 的 persist { evt => ... }
替换日志文件接收器,但由于它是异步执行的,所以我将失去背压。那么是否可以将带有背压的流式数据写入Akka Persistence journal,并在actor recover上以流式方式读取这些数据?
当前实施:
object Writer {
case class WriteSinkRequest(userId: String)
case class WriteSinkReady(userId: String, sinkRef: SinkRef[ByteString])
case class ReadSourceRequest(userId: String)
case class ReadSourceReady(userId: String, sourceRef: SourceRef[ByteString])
}
class Writer extends Actor {
// code omitted
val logsDir = "logs"
val path = Files.createDirectories(FileSystems.getDefault.getPath(logsDir))
def logFile(id: String) = {
path.resolve(id)
}
def logFileSink(logId: String): Sink[ByteString, Future[IOResult]] = FileIO.toPath(logFile(logId), Set(CREATE, WRITE, APPEND))
def logFileSource(logId: String): Source[ByteString, Future[IOResult]] = FileIO.fromPath(logFile(logId))
override def receive: Receive = {
case WriteSinkRequest(userId) =>
// obtain the source you want to offer:
val sink = logFileSink(userId)
// materialize the SinkRef (the remote is like a source of data for us):
val ref: Future[SinkRef[ByteString]] = StreamRefs.sinkRef[ByteString]().to(sink).run()
// wrap the SinkRef in some domain message, such that the sender knows what source it is
val reply: Future[WriteSinkReady] = ref.map(WriteSinkReady(userId, _))
// reply to sender
reply.pipeTo(sender())
case ReadSourceRequest(userId) =>
val source = logFileSource(userId)
val ref: Future[SourceRef[ByteString]] = source.runWith(StreamRefs.sourceRef())
val reply: Future[ReadSourceReady] = ref.map(ReadSourceReady(userId, _))
reply pipeTo sender()
}
}
P.S。是否可以创建不是 "save-to-journal" 接收器,而是流:
incoming data to write ~> save to persistence journal ~> data that was written
?
以背压方式将数据流式传输到持久性 actor 的一个想法是使用 Sink.actorRefWithAck
:让 actor 在持久化消息时发送确认消息。这看起来像下面这样:
// ...
case class WriteSinkReady(userId: String, sinkRef: SinkRef[MyMsg])
// ...
def receive = {
case WriteSinkRequest(userId) =>
val persistentActor: ActorRef = ??? // a persistent actor that handles MyMsg messages
// as well as the messages used in persistentSink
val persistentSink: Sink[MyMsg, NotUsed] = Sink.actorRefWithAck[MyMsg](
persistentActor,
/* additional parameters: see the docs */
)
val ref: Future[SinkRef[MyMsg]] = StreamRefs.sinkRef[MyMsg]().to(persistentSink).run()
val reply: Future[WriteSinkReady] = ref.map(WriteSinkReady(userId, _))
reply.pipeTo(sender())
case ReadSourceRequest(userId) =>
// ...
}
以上示例使用自定义案例 class 命名为 MyMsg
而不是 ByteString
.
在发件人中,假设它是一个演员:
def receive = {
case WriteSinkReady(userId, sinkRef) =>
source.runWith(sinkRef) // source is a Source[MyMsg, _]
// ...
}
发送方中的物化流将消息发送给持久化 actor。
我使用 StreamRefs 在集群中的参与者之间建立流连接。目前,在写入节点中,我手动将传入消息保存到日志文件中,但我想知道是否可以将其替换为用于写入的持久性 Sink
和用于在 actor 启动时读取的持久性 Source
来自 Akka Persistence 杂志。我一直在考虑用 Persistent actor 的 persist { evt => ... }
替换日志文件接收器,但由于它是异步执行的,所以我将失去背压。那么是否可以将带有背压的流式数据写入Akka Persistence journal,并在actor recover上以流式方式读取这些数据?
当前实施:
object Writer {
case class WriteSinkRequest(userId: String)
case class WriteSinkReady(userId: String, sinkRef: SinkRef[ByteString])
case class ReadSourceRequest(userId: String)
case class ReadSourceReady(userId: String, sourceRef: SourceRef[ByteString])
}
class Writer extends Actor {
// code omitted
val logsDir = "logs"
val path = Files.createDirectories(FileSystems.getDefault.getPath(logsDir))
def logFile(id: String) = {
path.resolve(id)
}
def logFileSink(logId: String): Sink[ByteString, Future[IOResult]] = FileIO.toPath(logFile(logId), Set(CREATE, WRITE, APPEND))
def logFileSource(logId: String): Source[ByteString, Future[IOResult]] = FileIO.fromPath(logFile(logId))
override def receive: Receive = {
case WriteSinkRequest(userId) =>
// obtain the source you want to offer:
val sink = logFileSink(userId)
// materialize the SinkRef (the remote is like a source of data for us):
val ref: Future[SinkRef[ByteString]] = StreamRefs.sinkRef[ByteString]().to(sink).run()
// wrap the SinkRef in some domain message, such that the sender knows what source it is
val reply: Future[WriteSinkReady] = ref.map(WriteSinkReady(userId, _))
// reply to sender
reply.pipeTo(sender())
case ReadSourceRequest(userId) =>
val source = logFileSource(userId)
val ref: Future[SourceRef[ByteString]] = source.runWith(StreamRefs.sourceRef())
val reply: Future[ReadSourceReady] = ref.map(ReadSourceReady(userId, _))
reply pipeTo sender()
}
}
P.S。是否可以创建不是 "save-to-journal" 接收器,而是流:
incoming data to write ~> save to persistence journal ~> data that was written
?
以背压方式将数据流式传输到持久性 actor 的一个想法是使用 Sink.actorRefWithAck
:让 actor 在持久化消息时发送确认消息。这看起来像下面这样:
// ...
case class WriteSinkReady(userId: String, sinkRef: SinkRef[MyMsg])
// ...
def receive = {
case WriteSinkRequest(userId) =>
val persistentActor: ActorRef = ??? // a persistent actor that handles MyMsg messages
// as well as the messages used in persistentSink
val persistentSink: Sink[MyMsg, NotUsed] = Sink.actorRefWithAck[MyMsg](
persistentActor,
/* additional parameters: see the docs */
)
val ref: Future[SinkRef[MyMsg]] = StreamRefs.sinkRef[MyMsg]().to(persistentSink).run()
val reply: Future[WriteSinkReady] = ref.map(WriteSinkReady(userId, _))
reply.pipeTo(sender())
case ReadSourceRequest(userId) =>
// ...
}
以上示例使用自定义案例 class 命名为 MyMsg
而不是 ByteString
.
在发件人中,假设它是一个演员:
def receive = {
case WriteSinkReady(userId, sinkRef) =>
source.runWith(sinkRef) // source is a Source[MyMsg, _]
// ...
}
发送方中的物化流将消息发送给持久化 actor。