如何解析 akka 流中的 avro byteString 流

How can I parse stream of avro byteString in akka stream

我在 s3 存储桶中有 Avro 文件并尝试流式传输和解析到一个案例中 class。 我有要解析的架构,但不知道如何继续。

我使用 s3.download 从 s3 存储桶下载和流式传输文件,然后将其转换为 utf8string。

请协助,考虑到我正在获取的输入流,我如何使用我们的架构进行解析。

我将根据您要求使用架构使用 Avro(反)序列化消息来回答这个问题。

I have the schema to parse but don't know how to proceed with it.

并假设您已经从 s3.buckets 下载邮件。然后我将使用我在 PostgreSQL 上持久化消息的示例,只是为了一个工作示例。但是您可以假定您的 s3.bucket 连接。

我正在使用 com.sksamuel.avro4s 库来创建我的 Avro 序列化程序。这是放入 build.sbt:

中的必要库
  val akkaVersion = "2.6.10"
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-persistence" % akkaVersion,
  "com.sksamuel.avro4s" %% "avro4s-core" % "4.0.4",
  "org.xerial.snappy" % "snappy-java" % "1.1.8.2",
  "org.postgresql" % "postgresql" % "42.2.2",
  "com.github.dnvriend" %% "akka-persistence-jdbc" % "3.4.0",

然后创建序列化程序,在我的例子中是 MyFirstAvroSerializer,扩展 akka.serialization.Serializer。它有一个模式,在我的例子中就是这种情况 class CompanyRegistry。基本上,你必须实现方法 identifier 必须有一个唯一的 ID,toBinaryfromBinary 来转换消息,以及 includeManifest 这是错误的,因为我不需要清单。

import akka.serialization.Serializer
import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream, AvroSchema}
import com.typesafe.config.ConfigFactory

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

case class BankAccount(iban: String, bankCode: String, amount: Double, currency: String)
case class CompanyRegistry(name: String, accounts: Seq[BankAccount], activityCode: String, marketCap: Double)

class MyFirstAvroSerializer extends Serializer {
  val schema = AvroSchema[CompanyRegistry]
  override def identifier: Int = 454874
  override def toBinary(o: AnyRef): Array[Byte] = o match {
    case c: CompanyRegistry =>
      val baos = new ByteArrayOutputStream()
      val avroOutputStream = AvroOutputStream.binary[CompanyRegistry].to(baos).build() // schema
      avroOutputStream.write(c)
      avroOutputStream.flush()
      avroOutputStream.close()
      baos.toByteArray
    case _ => throw new IllegalArgumentException(s"we only support CompanyRegistry for Avro")
  }
  override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
    val avroInputStream = AvroInputStream.binary[CompanyRegistry].from(new ByteArrayInputStream(bytes)).build(schema)
    val companyRegistryIterator: Iterator[CompanyRegistry] = avroInputStream.iterator
    val companyRegistry = companyRegistryIterator.next()
    avroInputStream.close()
    companyRegistry
  }
  override def includeManifest: Boolean = false
}

然后你必须配置你的项目在演员之间交换 akka 消息时调用这个序列化程序。通过添加特定配置在 application.conf 上配置它。在我的例子中是 avroSerializable。您将 MyFirstAvroSerializer 设置在 serializers 范围内,将大小写 class 设置在 serialization-bindings 范围内。我也配置了 Akka-remote 但你可以忽略它。

avroSerializable {
  akka {
    actor {
      provider = remote
      #allow-java-serialization = off
      serializers {
        java = "akka.serialization.JavaSerializer"
        avro = "org.github.felipegutierrez.explore.akka.classic.remote.serialization.MyFirstAvroSerializer"
      }
      serialization-bindings {
        "org.github.felipegutierrez.explore.akka.classic.remote.serialization.CompanyRegistry" = avro
        "java.io.Serializable" = java
      }
    }
    remote {
      artery {
        enabled = on
        transport = aeron-udp
        canonical.hostname = "localhost"
      }
    }
  }
}

正如我一开始所说,我正在使用 PostgreSQL。但在您的情况下,它将是 s3 存储桶存储配置。为了完整起见,我将离开这里,因为我在创建演员系统时调用了这个配置。

postgresStore {
  akka.persistence.journal.plugin = "jdbc-journal"
  akka.persistence.snapshot-store.plugin = "jdbc-snapshot-store"
  akka.actor.allow-java-serialization = on
  # create JDBC configuration to Akka persistence
  akka-persistence-jdbc {
    shared-databases {
      slick {
        profile = "slick.jdbc.PostgresProfile$"
        db {
          numThreads = 10
          driver = "org.postgresql.Driver"
          url = "jdbc:postgresql://localhost:5432/rtjvm"
          user = "docker"
          password = "docker"
        }
      }
    }
  }
  # dbinding the JDBC plugins with the configureation created above
  jdbc-journal {
    use-shared-db = "slick"
  }
  jdbc-snapshot-store {
    use-shared-db = "slick"
  }
}

现在是时候创建演员系统和一个简单的演员了 SimplePersistentActor 并通过网络发送消息。 SimplePersistentActor 只是一个非常简单的 actor,它接受我发送的消息,没什么特别的。

object AvroSerialization_Persistence {
  def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load().getConfig("postgresStore")
      .withFallback(ConfigFactory.load("avroSerializable"))
    val system = ActorSystem("postgresStoreSystem", config)
    val simplePersistentActor = system.actorOf(SimplePersistentActor.props("avro-actor"), "personAvroActor")
    val companyRegistryMsg = CompanyRegistry(
      "Google",
      Seq(
        BankAccount("US-1234", "google-bank", 4.3, "gazillion dollars"),
        BankAccount("GB-4321", "google-bank", 0.5, "trillion pounds")
      ),
      "ads",
      523895
    )
    simplePersistentActor ! companyRegistryMsg
  }
}