如何使用 akka 中的 Source 读取二进制 avro 文件数据?
How to read binary avro fileData, with Source in akka?
我正在尝试使用来自 akka Streams 的 Source 读取一个 avro 文件。
akka 流中的源读取数据,如 FileIO.FromPath(文件),它将根据 (\n) 字符读取和分隔行,其中 avro 是如何工作的?
流量:
object AvroFlow {
def apply(jobDate: String): Flow[GenericRecord, GenericRecord, NotUsed] = {
Flow[GenericRecord].map { rec => rec.put("date", "20190812") rec}
}
}
图表:
object AvroRunner {
def build (src: Source[GenericRecord, NotUsed],
flw: Flow[GenericRecord, GenericRecord, NotUsed],
snk:Flow[GenericRecord, Future[Done])
: AvroRunner = {
new AvroRunner(srtc,flw,snk)
}
}
class AvroRunner private(src: Source[GenericRecord, NotUsed],
flw: Flow[GenericRecord, GenericRecord, NotUsed],
snk:Flow[GenericRecord, Future[Done]){
import scala.concurrent.ExecutionContext.Implicits.global
val GraphRunner = RunnableGraph.fromGraph(GraphDSL.create() {implicit builder =>
import GraphDSL.Implicits._
src ~> flw ~> snk
ClosedShape
})
}
创建 avro 数据对象的 akka Source
的最简单方法不是来自原始二进制文件本身。相反,从 avro 库提供的 DataFileReader
创建源。
从 the documentation 我们首先从 java.io.File
生成器创建文件 reader:
def createFileReader[T : ClassTag](fileGenerator : () => File) : DataFileReader[T] =
new DataFileReader[T](file(), new SpecificDatumReader[T](classTag[T].runtimeClass))
这可以用来创建一个 scala Iterator
:
def dataFileReaderToIterator[T](dataFileReader : DataFileReader[T]) : Iterator[T] =
new Iterator[T] {
override def hasNext : Boolean = dataFileReader.hasNext
override def next() : T = dataFileReader.next
}
我们现在可以从文件生成器构建流源:
def fileToAvroSource[T](fileGenerator : () => File) : Source[T, _] =
Source.fromIterator[T](() => dataFileReaderToIterator[T](createFileReader(fileGenerator)))
背压?
看来 avro 正在使用标准 BufferedReader/OutputStream 技术来读取 File
。因此,上述实现应该一直向文件源提供背压。但是,我还没有确认是这种情况...
我正在尝试使用来自 akka Streams 的 Source 读取一个 avro 文件。
akka 流中的源读取数据,如 FileIO.FromPath(文件),它将根据 (\n) 字符读取和分隔行,其中 avro 是如何工作的?
流量:
object AvroFlow {
def apply(jobDate: String): Flow[GenericRecord, GenericRecord, NotUsed] = {
Flow[GenericRecord].map { rec => rec.put("date", "20190812") rec}
}
}
图表:
object AvroRunner {
def build (src: Source[GenericRecord, NotUsed],
flw: Flow[GenericRecord, GenericRecord, NotUsed],
snk:Flow[GenericRecord, Future[Done])
: AvroRunner = {
new AvroRunner(srtc,flw,snk)
}
}
class AvroRunner private(src: Source[GenericRecord, NotUsed],
flw: Flow[GenericRecord, GenericRecord, NotUsed],
snk:Flow[GenericRecord, Future[Done]){
import scala.concurrent.ExecutionContext.Implicits.global
val GraphRunner = RunnableGraph.fromGraph(GraphDSL.create() {implicit builder =>
import GraphDSL.Implicits._
src ~> flw ~> snk
ClosedShape
})
}
创建 avro 数据对象的 akka Source
的最简单方法不是来自原始二进制文件本身。相反,从 avro 库提供的 DataFileReader
创建源。
从 the documentation 我们首先从 java.io.File
生成器创建文件 reader:
def createFileReader[T : ClassTag](fileGenerator : () => File) : DataFileReader[T] =
new DataFileReader[T](file(), new SpecificDatumReader[T](classTag[T].runtimeClass))
这可以用来创建一个 scala Iterator
:
def dataFileReaderToIterator[T](dataFileReader : DataFileReader[T]) : Iterator[T] =
new Iterator[T] {
override def hasNext : Boolean = dataFileReader.hasNext
override def next() : T = dataFileReader.next
}
我们现在可以从文件生成器构建流源:
def fileToAvroSource[T](fileGenerator : () => File) : Source[T, _] =
Source.fromIterator[T](() => dataFileReaderToIterator[T](createFileReader(fileGenerator)))
背压?
看来 avro 正在使用标准 BufferedReader/OutputStream 技术来读取 File
。因此,上述实现应该一直向文件源提供背压。但是,我还没有确认是这种情况...