基于ActorPublisher实现自定义Akka Streams Source
Implementing custom Akka Streams Source based on ActorPublisher
我想在 Akka Stream 中实现自定义 Source[ByteSting]
。此源应该只从提供的文件中读取数据并在提供的字节范围内并将其传播到下游。
起初我想,这可以通过实现混入 ActorPublisher 的 Actor 来完成。此实现类似于 akka.stream.impl.io.FilePublisher
,它从提供的路径读取整个文件,而不仅仅是给定字节范围内的数据:
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Path, StandardOpenOption}
import akka.actor.{ActorLogging, DeadLetterSuppression, Props}
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.util.ByteString
import scala.annotation.tailrec
import scala.util.control.NonFatal
class FilePublisher(pathToFile: Path, startByte: Long, endByte: Long) extends ActorPublisher[ByteString]
with ActorLogging{
import FilePublisher._
private val chunksToBuffer = 10
private var bytesLeftToRead = endByte - startByte + 1
private var fileChannel: FileChannel = _
private val buffer = ByteBuffer.allocate(8096)
private var bufferedChunks: Vector[ByteString] = _
override def preStart(): Unit = {
try {
log.info("Starting")
fileChannel = FileChannel.open(pathToFile, StandardOpenOption.READ)
bufferedChunks = readAhead(Vector.empty, Some(startByte))
log.info("Chunks {}", bufferedChunks)
} catch {
case NonFatal(ex) => onErrorThenStop(ex)
}
}
override def postStop(): Unit = {
log.info("Stopping")
if (fileChannel ne null)
try fileChannel.close() catch {
case NonFatal(ex) => log.error(ex, "Error during file channel close")
}
}
override def receive: Receive = {
case Request =>
readAndSignalNext()
log.info("Got request")
case Continue =>
log.info("Continuing reading")
readAndSignalNext()
case Cancel =>
log.info("Cancel message got")
context.stop(self)
}
private def readAndSignalNext() = {
log.info("Reading and signaling")
if (isActive) {
bufferedChunks = readAhead(signalOnNext(bufferedChunks), None)
if (isActive && totalDemand > 0) self ! Continue
}
}
@tailrec
private def signalOnNext(chunks: Vector[ByteString]): Vector[ByteString] = {
if (chunks.nonEmpty && totalDemand > 0) {
log.info("Signaling")
onNext(chunks.head)
signalOnNext(chunks.tail)
} else {
if (chunks.isEmpty && bytesLeftToRead > 0) {
onCompleteThenStop()
}
chunks
}
}
@tailrec
private def readAhead(currentlyBufferedChunks: Vector[ByteString], startPosition: Option[Long]): Vector[ByteString] = {
if (currentlyBufferedChunks.size < chunksToBuffer) {
val bytesRead = readDataFromChannel(startPosition)
log.info("Bytes read {}", bytesRead)
bytesRead match {
case Int.MinValue => Vector.empty
case -1 =>
log.info("EOF reached")
currentlyBufferedChunks // EOF reached
case _ =>
buffer.flip()
val chunk = ByteString(buffer)
buffer.clear()
bytesLeftToRead -= bytesRead
val trimmedChunk = if (bytesLeftToRead >= 0) chunk else chunk.dropRight(bytesLeftToRead.toInt)
readAhead(currentlyBufferedChunks :+ trimmedChunk, None)
}
} else {
currentlyBufferedChunks
}
}
private def readDataFromChannel(startPosition: Option[Long]): Int = {
try {
startPosition match {
case Some(position) => fileChannel.read(buffer, position)
case None => fileChannel.read(buffer)
}
} catch {
case NonFatal(ex) =>
log.error(ex, "Got error reading data from file channel")
Int.MinValue
}
}
}
object FilePublisher {
private case object Continue extends DeadLetterSuppression
def props(path: Path, startByte: Long, endByte: Long): Props = Props(classOf[FilePublisher], path, startByte, endByte)
}
但事实证明,当我在 FilePublisher
的支持下实现 Source
时:
val fileSource = Source.actorPublisher(FilePublisher.props(pathToFile, 0, fileLength))
val future = fileSource.runWith(Sink.seq)
什么都没有发生,源也没有向下游传播数据。
有没有其他正确的方法可以根据我的 FilePublisher
来实现 Source
,或者我不应该使用这个 API 而只是像 here 描述的那样实现自定义处理阶段?
CustomStage 方法的问题在于其简单的实现将在此阶段立即执行 IO。我想,我可以将 IO 从舞台移动到自定义线程池或演员,但这需要舞台和演员之间的某种形式的同步。
谢谢。
我注意到您目前没有为 IO 操作使用单独的调度程序。 Here's 文档部分解释了为什么不这样做会导致您的应用程序中出现严重阻塞。
Akka Streams 通过使用特定的、基于线程池的调度程序将 FilePublisher
包装在 FileSource
中。您可以查看他们的代码以获取灵感 here.
问题是receive
方法的模式匹配错误导致的:
这一行 case Request =>
应该改为 case Request(_)
因为 Request
实际上是带有单个参数 (final case class Request(n: Long)
) 的 case class 而不是我想的 case 对象。
我想在 Akka Stream 中实现自定义 Source[ByteSting]
。此源应该只从提供的文件中读取数据并在提供的字节范围内并将其传播到下游。
起初我想,这可以通过实现混入 ActorPublisher 的 Actor 来完成。此实现类似于 akka.stream.impl.io.FilePublisher
,它从提供的路径读取整个文件,而不仅仅是给定字节范围内的数据:
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Path, StandardOpenOption}
import akka.actor.{ActorLogging, DeadLetterSuppression, Props}
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.util.ByteString
import scala.annotation.tailrec
import scala.util.control.NonFatal
class FilePublisher(pathToFile: Path, startByte: Long, endByte: Long) extends ActorPublisher[ByteString]
with ActorLogging{
import FilePublisher._
private val chunksToBuffer = 10
private var bytesLeftToRead = endByte - startByte + 1
private var fileChannel: FileChannel = _
private val buffer = ByteBuffer.allocate(8096)
private var bufferedChunks: Vector[ByteString] = _
override def preStart(): Unit = {
try {
log.info("Starting")
fileChannel = FileChannel.open(pathToFile, StandardOpenOption.READ)
bufferedChunks = readAhead(Vector.empty, Some(startByte))
log.info("Chunks {}", bufferedChunks)
} catch {
case NonFatal(ex) => onErrorThenStop(ex)
}
}
override def postStop(): Unit = {
log.info("Stopping")
if (fileChannel ne null)
try fileChannel.close() catch {
case NonFatal(ex) => log.error(ex, "Error during file channel close")
}
}
override def receive: Receive = {
case Request =>
readAndSignalNext()
log.info("Got request")
case Continue =>
log.info("Continuing reading")
readAndSignalNext()
case Cancel =>
log.info("Cancel message got")
context.stop(self)
}
private def readAndSignalNext() = {
log.info("Reading and signaling")
if (isActive) {
bufferedChunks = readAhead(signalOnNext(bufferedChunks), None)
if (isActive && totalDemand > 0) self ! Continue
}
}
@tailrec
private def signalOnNext(chunks: Vector[ByteString]): Vector[ByteString] = {
if (chunks.nonEmpty && totalDemand > 0) {
log.info("Signaling")
onNext(chunks.head)
signalOnNext(chunks.tail)
} else {
if (chunks.isEmpty && bytesLeftToRead > 0) {
onCompleteThenStop()
}
chunks
}
}
@tailrec
private def readAhead(currentlyBufferedChunks: Vector[ByteString], startPosition: Option[Long]): Vector[ByteString] = {
if (currentlyBufferedChunks.size < chunksToBuffer) {
val bytesRead = readDataFromChannel(startPosition)
log.info("Bytes read {}", bytesRead)
bytesRead match {
case Int.MinValue => Vector.empty
case -1 =>
log.info("EOF reached")
currentlyBufferedChunks // EOF reached
case _ =>
buffer.flip()
val chunk = ByteString(buffer)
buffer.clear()
bytesLeftToRead -= bytesRead
val trimmedChunk = if (bytesLeftToRead >= 0) chunk else chunk.dropRight(bytesLeftToRead.toInt)
readAhead(currentlyBufferedChunks :+ trimmedChunk, None)
}
} else {
currentlyBufferedChunks
}
}
private def readDataFromChannel(startPosition: Option[Long]): Int = {
try {
startPosition match {
case Some(position) => fileChannel.read(buffer, position)
case None => fileChannel.read(buffer)
}
} catch {
case NonFatal(ex) =>
log.error(ex, "Got error reading data from file channel")
Int.MinValue
}
}
}
object FilePublisher {
private case object Continue extends DeadLetterSuppression
def props(path: Path, startByte: Long, endByte: Long): Props = Props(classOf[FilePublisher], path, startByte, endByte)
}
但事实证明,当我在 FilePublisher
的支持下实现 Source
时:
val fileSource = Source.actorPublisher(FilePublisher.props(pathToFile, 0, fileLength))
val future = fileSource.runWith(Sink.seq)
什么都没有发生,源也没有向下游传播数据。
有没有其他正确的方法可以根据我的 FilePublisher
来实现 Source
,或者我不应该使用这个 API 而只是像 here 描述的那样实现自定义处理阶段?
CustomStage 方法的问题在于其简单的实现将在此阶段立即执行 IO。我想,我可以将 IO 从舞台移动到自定义线程池或演员,但这需要舞台和演员之间的某种形式的同步。 谢谢。
我注意到您目前没有为 IO 操作使用单独的调度程序。 Here's 文档部分解释了为什么不这样做会导致您的应用程序中出现严重阻塞。
Akka Streams 通过使用特定的、基于线程池的调度程序将 FilePublisher
包装在 FileSource
中。您可以查看他们的代码以获取灵感 here.
问题是receive
方法的模式匹配错误导致的:
这一行 case Request =>
应该改为 case Request(_)
因为 Request
实际上是带有单个参数 (final case class Request(n: Long)
) 的 case class 而不是我想的 case 对象。