如何解析 akka 流中的 avro byteString 流
How can I parse stream of avro byteString in akka stream
我在 s3 存储桶中有 Avro 文件并尝试流式传输和解析到一个案例中 class。
我有要解析的架构,但不知道如何继续。
我使用 s3.download
从 s3 存储桶下载和流式传输文件,然后将其转换为 utf8string。
请协助,考虑到我正在获取的输入流,我如何使用我们的架构进行解析。
我将根据您要求使用架构使用 Avro(反)序列化消息来回答这个问题。
I have the schema to parse but don't know how to proceed with it.
并假设您已经从 s3.buckets 下载邮件。然后我将使用我在 PostgreSQL 上持久化消息的示例,只是为了一个工作示例。但是您可以假定您的 s3.bucket 连接。
我正在使用 com.sksamuel.avro4s 库来创建我的 Avro 序列化程序。这是放入 build.sbt
:
中的必要库
val akkaVersion = "2.6.10"
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.sksamuel.avro4s" %% "avro4s-core" % "4.0.4",
"org.xerial.snappy" % "snappy-java" % "1.1.8.2",
"org.postgresql" % "postgresql" % "42.2.2",
"com.github.dnvriend" %% "akka-persistence-jdbc" % "3.4.0",
然后创建序列化程序,在我的例子中是 MyFirstAvroSerializer
,扩展 akka.serialization.Serializer
。它有一个模式,在我的例子中就是这种情况 class CompanyRegistry
。基本上,你必须实现方法 identifier
必须有一个唯一的 ID,toBinary
和 fromBinary
来转换消息,以及 includeManifest
这是错误的,因为我不需要清单。
import akka.serialization.Serializer
import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream, AvroSchema}
import com.typesafe.config.ConfigFactory
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
case class BankAccount(iban: String, bankCode: String, amount: Double, currency: String)
case class CompanyRegistry(name: String, accounts: Seq[BankAccount], activityCode: String, marketCap: Double)
class MyFirstAvroSerializer extends Serializer {
val schema = AvroSchema[CompanyRegistry]
override def identifier: Int = 454874
override def toBinary(o: AnyRef): Array[Byte] = o match {
case c: CompanyRegistry =>
val baos = new ByteArrayOutputStream()
val avroOutputStream = AvroOutputStream.binary[CompanyRegistry].to(baos).build() // schema
avroOutputStream.write(c)
avroOutputStream.flush()
avroOutputStream.close()
baos.toByteArray
case _ => throw new IllegalArgumentException(s"we only support CompanyRegistry for Avro")
}
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
val avroInputStream = AvroInputStream.binary[CompanyRegistry].from(new ByteArrayInputStream(bytes)).build(schema)
val companyRegistryIterator: Iterator[CompanyRegistry] = avroInputStream.iterator
val companyRegistry = companyRegistryIterator.next()
avroInputStream.close()
companyRegistry
}
override def includeManifest: Boolean = false
}
然后你必须配置你的项目在演员之间交换 akka 消息时调用这个序列化程序。通过添加特定配置在 application.conf
上配置它。在我的例子中是 avroSerializable
。您将 MyFirstAvroSerializer
设置在 serializers
范围内,将大小写 class 设置在 serialization-bindings
范围内。我也配置了 Akka-remote 但你可以忽略它。
avroSerializable {
akka {
actor {
provider = remote
#allow-java-serialization = off
serializers {
java = "akka.serialization.JavaSerializer"
avro = "org.github.felipegutierrez.explore.akka.classic.remote.serialization.MyFirstAvroSerializer"
}
serialization-bindings {
"org.github.felipegutierrez.explore.akka.classic.remote.serialization.CompanyRegistry" = avro
"java.io.Serializable" = java
}
}
remote {
artery {
enabled = on
transport = aeron-udp
canonical.hostname = "localhost"
}
}
}
}
正如我一开始所说,我正在使用 PostgreSQL。但在您的情况下,它将是 s3 存储桶存储配置。为了完整起见,我将离开这里,因为我在创建演员系统时调用了这个配置。
postgresStore {
akka.persistence.journal.plugin = "jdbc-journal"
akka.persistence.snapshot-store.plugin = "jdbc-snapshot-store"
akka.actor.allow-java-serialization = on
# create JDBC configuration to Akka persistence
akka-persistence-jdbc {
shared-databases {
slick {
profile = "slick.jdbc.PostgresProfile$"
db {
numThreads = 10
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost:5432/rtjvm"
user = "docker"
password = "docker"
}
}
}
}
# dbinding the JDBC plugins with the configureation created above
jdbc-journal {
use-shared-db = "slick"
}
jdbc-snapshot-store {
use-shared-db = "slick"
}
}
现在是时候创建演员系统和一个简单的演员了 SimplePersistentActor
并通过网络发送消息。 SimplePersistentActor
只是一个非常简单的 actor,它接受我发送的消息,没什么特别的。
object AvroSerialization_Persistence {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.load().getConfig("postgresStore")
.withFallback(ConfigFactory.load("avroSerializable"))
val system = ActorSystem("postgresStoreSystem", config)
val simplePersistentActor = system.actorOf(SimplePersistentActor.props("avro-actor"), "personAvroActor")
val companyRegistryMsg = CompanyRegistry(
"Google",
Seq(
BankAccount("US-1234", "google-bank", 4.3, "gazillion dollars"),
BankAccount("GB-4321", "google-bank", 0.5, "trillion pounds")
),
"ads",
523895
)
simplePersistentActor ! companyRegistryMsg
}
}
我在 s3 存储桶中有 Avro 文件并尝试流式传输和解析到一个案例中 class。 我有要解析的架构,但不知道如何继续。
我使用 s3.download
从 s3 存储桶下载和流式传输文件,然后将其转换为 utf8string。
请协助,考虑到我正在获取的输入流,我如何使用我们的架构进行解析。
我将根据您要求使用架构使用 Avro(反)序列化消息来回答这个问题。
I have the schema to parse but don't know how to proceed with it.
并假设您已经从 s3.buckets 下载邮件。然后我将使用我在 PostgreSQL 上持久化消息的示例,只是为了一个工作示例。但是您可以假定您的 s3.bucket 连接。
我正在使用 com.sksamuel.avro4s 库来创建我的 Avro 序列化程序。这是放入 build.sbt
:
val akkaVersion = "2.6.10"
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.sksamuel.avro4s" %% "avro4s-core" % "4.0.4",
"org.xerial.snappy" % "snappy-java" % "1.1.8.2",
"org.postgresql" % "postgresql" % "42.2.2",
"com.github.dnvriend" %% "akka-persistence-jdbc" % "3.4.0",
然后创建序列化程序,在我的例子中是 MyFirstAvroSerializer
,扩展 akka.serialization.Serializer
。它有一个模式,在我的例子中就是这种情况 class CompanyRegistry
。基本上,你必须实现方法 identifier
必须有一个唯一的 ID,toBinary
和 fromBinary
来转换消息,以及 includeManifest
这是错误的,因为我不需要清单。
import akka.serialization.Serializer
import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream, AvroSchema}
import com.typesafe.config.ConfigFactory
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
case class BankAccount(iban: String, bankCode: String, amount: Double, currency: String)
case class CompanyRegistry(name: String, accounts: Seq[BankAccount], activityCode: String, marketCap: Double)
class MyFirstAvroSerializer extends Serializer {
val schema = AvroSchema[CompanyRegistry]
override def identifier: Int = 454874
override def toBinary(o: AnyRef): Array[Byte] = o match {
case c: CompanyRegistry =>
val baos = new ByteArrayOutputStream()
val avroOutputStream = AvroOutputStream.binary[CompanyRegistry].to(baos).build() // schema
avroOutputStream.write(c)
avroOutputStream.flush()
avroOutputStream.close()
baos.toByteArray
case _ => throw new IllegalArgumentException(s"we only support CompanyRegistry for Avro")
}
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
val avroInputStream = AvroInputStream.binary[CompanyRegistry].from(new ByteArrayInputStream(bytes)).build(schema)
val companyRegistryIterator: Iterator[CompanyRegistry] = avroInputStream.iterator
val companyRegistry = companyRegistryIterator.next()
avroInputStream.close()
companyRegistry
}
override def includeManifest: Boolean = false
}
然后你必须配置你的项目在演员之间交换 akka 消息时调用这个序列化程序。通过添加特定配置在 application.conf
上配置它。在我的例子中是 avroSerializable
。您将 MyFirstAvroSerializer
设置在 serializers
范围内,将大小写 class 设置在 serialization-bindings
范围内。我也配置了 Akka-remote 但你可以忽略它。
avroSerializable {
akka {
actor {
provider = remote
#allow-java-serialization = off
serializers {
java = "akka.serialization.JavaSerializer"
avro = "org.github.felipegutierrez.explore.akka.classic.remote.serialization.MyFirstAvroSerializer"
}
serialization-bindings {
"org.github.felipegutierrez.explore.akka.classic.remote.serialization.CompanyRegistry" = avro
"java.io.Serializable" = java
}
}
remote {
artery {
enabled = on
transport = aeron-udp
canonical.hostname = "localhost"
}
}
}
}
正如我一开始所说,我正在使用 PostgreSQL。但在您的情况下,它将是 s3 存储桶存储配置。为了完整起见,我将离开这里,因为我在创建演员系统时调用了这个配置。
postgresStore {
akka.persistence.journal.plugin = "jdbc-journal"
akka.persistence.snapshot-store.plugin = "jdbc-snapshot-store"
akka.actor.allow-java-serialization = on
# create JDBC configuration to Akka persistence
akka-persistence-jdbc {
shared-databases {
slick {
profile = "slick.jdbc.PostgresProfile$"
db {
numThreads = 10
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost:5432/rtjvm"
user = "docker"
password = "docker"
}
}
}
}
# dbinding the JDBC plugins with the configureation created above
jdbc-journal {
use-shared-db = "slick"
}
jdbc-snapshot-store {
use-shared-db = "slick"
}
}
现在是时候创建演员系统和一个简单的演员了 SimplePersistentActor
并通过网络发送消息。 SimplePersistentActor
只是一个非常简单的 actor,它接受我发送的消息,没什么特别的。
object AvroSerialization_Persistence {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.load().getConfig("postgresStore")
.withFallback(ConfigFactory.load("avroSerializable"))
val system = ActorSystem("postgresStoreSystem", config)
val simplePersistentActor = system.actorOf(SimplePersistentActor.props("avro-actor"), "personAvroActor")
val companyRegistryMsg = CompanyRegistry(
"Google",
Seq(
BankAccount("US-1234", "google-bank", 4.3, "gazillion dollars"),
BankAccount("GB-4321", "google-bank", 0.5, "trillion pounds")
),
"ads",
523895
)
simplePersistentActor ! companyRegistryMsg
}
}