如何使用 akka 中的 Source 读取二进制 avro 文件数据?

How to read binary avro fileData, with Source in akka?

我正在尝试使用来自 akka Streams 的 Source 读取一个 avro 文件。

akka 流中的源读取数据,如 FileIO.FromPath(文件),它将根据 (\n) 字符读取和分隔行,其中 avro 是如何工作的?

流量:

    object AvroFlow  {
    def apply(jobDate: String): Flow[GenericRecord, GenericRecord, NotUsed] = {
            Flow[GenericRecord].map { rec => rec.put("date", "20190812") rec}       
    }
  }

图表:

object AvroRunner {
    def build (src: Source[GenericRecord, NotUsed],
                                     flw: Flow[GenericRecord, GenericRecord, NotUsed],
                                     snk:Flow[GenericRecord, Future[Done])
    : AvroRunner = {
      new AvroRunner(srtc,flw,snk)
    }
  }
class AvroRunner private(src: Source[GenericRecord, NotUsed],
                                     flw: Flow[GenericRecord, GenericRecord, NotUsed],
                                     snk:Flow[GenericRecord, Future[Done]){
  import scala.concurrent.ExecutionContext.Implicits.global
  val GraphRunner = RunnableGraph.fromGraph(GraphDSL.create() {implicit builder =>
    import GraphDSL.Implicits._
    src ~> flw ~> snk
    ClosedShape
  })
}

创建 avro 数据对象的 akka Source 的最简单方法不是来自原始二进制文件本身。相反,从 avro 库提供的 DataFileReader 创建源。

the documentation 我们首先从 java.io.File 生成器创建文件 reader:

def createFileReader[T : ClassTag](fileGenerator : () => File) : DataFileReader[T] = 
  new DataFileReader[T](file(), new SpecificDatumReader[T](classTag[T].runtimeClass))

这可以用来创建一个 scala Iterator:

def dataFileReaderToIterator[T](dataFileReader : DataFileReader[T]) : Iterator[T] = 
  new Iterator[T] {
    override def hasNext : Boolean = dataFileReader.hasNext

    override def next() : T = dataFileReader.next
  }

我们现在可以从文件生成器构建流源:

def fileToAvroSource[T](fileGenerator : () => File) : Source[T, _] = 
  Source.fromIterator[T](() => dataFileReaderToIterator[T](createFileReader(fileGenerator)))

背压?

看来 avro 正在使用标准 BufferedReader/OutputStream 技术来读取 File。因此,上述实现应该一直向文件源提供背压。但是,我还没有确认是这种情况...