如何在运行时使用 ScalaPB 在 Spark 上一般解压 google.protobuf.Any?

How to generically unpack google.protobuf.Any on Spark with ScalaPB at runtime?

我有以下 protobuf 设置:

import "google/protobuf/any.proto";

message EntityEnvelope {
  string id = 1;
  string entity_type = 2;
  google.protobuf.Any entity = 3;
}

message EntityABC {
  string name = 1;
}

message EntityXYZ {
  string desc = 1;
}

其中 EntityEnvelope.entity 可以是打包为 google.protobuf.Any 的任何类型。每条 protobuf 消息都以 Base64 编码存储在磁盘中。

阅读这些消息时,如果我在解包时在编译类型中使用特定的实体类型,它会完美地工作:

import scalapb.spark.Implicits._

spark.read.format("text")
    .load("/tmp/entity").as[String]
    .map { s => Base64.getDecoder.decode(s) }
    .map { bytes => EntityEnvelope.parseFrom(bytes) }
    .map { envelope => envelope.getEntity.unpack[EntityXYZ]}
    .show()

但我想使用相同的代码在运行时读取任何类型的实体,而不必指定其类型。我得到的“更接近”是(但甚至没有编译):

  val entityClass = Class.forName(qualifiedEntityClassNameFromRuntime)

  spark.read.format("text")
    .load("/tmp/entity").as[String]
    .map { s => Base64.getDecoder.decode(s) }
    .map { bytes => EntityEnvelope.parseFrom(bytes) }
    .map { envelope => toJavaProto(envelope.getEntity).unpack(entityClass)} // ERROR: No implicits found for Encoder[Any]
    .show()

由于 Any 包含 typeUrl,是否可以找到正确的描述符并在运行时自动解压,而无需在编译时指定类型?

要在编译时解压您不知道其类型的 Anys,您需要构建一个从 typeUrl 到您可能期望的所有类型的伴随对象的映射,并使用它来调用unpack。可以这样做:

val typeUrlToCompanion = Map[String, scalapb.GeneratedMessageCompanion[_ <: scalapb.GeneratedMessage]](
    "type.googleapis.com/myexample.Person2" -> Person2
    // more types
  )

然后用

解压
envelope.getEntity.unpack(typeUrlToCompanion(envelope.getEntity.typeUrl)

这会给你一个 GeneratedMessage - 所有 ScalaPB 消息的父特征。

但是,这样做之后,您将遇到另一个问题。 Spark 数据帧是结构化的,它们有一个由命名列和类型组成的模式,就像关系数据库中的 table 一样。在构建数据框时,需要在运行时知道模式。但是,您似乎想要创建一个数据框,其中每一行都有不同的类型(无论 Any 恰好是什么),并且类型的集合只有在解包时才知道 - 所以这与数据框的设计不兼容。

取决于您想做什么,有几个选项可供考虑:

  1. 不使用 Any,而是使用 protobufs 的 oneof 特性,这样所有可能的类型都是已知的,并且可以自动创建模式(并且您不必处理解包任何)
  2. 或者,按 typeUrl 对数据框进行分区,这样您最终会得到不同的数据框,其中每个数据框的所有项目都属于同一类型。然后用已知类型解压每一个。可以使用上面的 typeUrlToCompanion 方法来完成。