如何在运行时使用 ScalaPB 在 Spark 上一般解压 google.protobuf.Any?
How to generically unpack google.protobuf.Any on Spark with ScalaPB at runtime?
我有以下 protobuf 设置:
import "google/protobuf/any.proto";
message EntityEnvelope {
string id = 1;
string entity_type = 2;
google.protobuf.Any entity = 3;
}
message EntityABC {
string name = 1;
}
message EntityXYZ {
string desc = 1;
}
其中 EntityEnvelope.entity
可以是打包为 google.protobuf.Any
的任何类型。每条 protobuf 消息都以 Base64 编码存储在磁盘中。
阅读这些消息时,如果我在解包时在编译类型中使用特定的实体类型,它会完美地工作:
import scalapb.spark.Implicits._
spark.read.format("text")
.load("/tmp/entity").as[String]
.map { s => Base64.getDecoder.decode(s) }
.map { bytes => EntityEnvelope.parseFrom(bytes) }
.map { envelope => envelope.getEntity.unpack[EntityXYZ]}
.show()
但我想使用相同的代码在运行时读取任何类型的实体,而不必指定其类型。我得到的“更接近”是(但甚至没有编译):
val entityClass = Class.forName(qualifiedEntityClassNameFromRuntime)
spark.read.format("text")
.load("/tmp/entity").as[String]
.map { s => Base64.getDecoder.decode(s) }
.map { bytes => EntityEnvelope.parseFrom(bytes) }
.map { envelope => toJavaProto(envelope.getEntity).unpack(entityClass)} // ERROR: No implicits found for Encoder[Any]
.show()
由于 Any
包含 typeUrl
,是否可以找到正确的描述符并在运行时自动解压,而无需在编译时指定类型?
要在编译时解压您不知道其类型的 Anys,您需要构建一个从 typeUrl
到您可能期望的所有类型的伴随对象的映射,并使用它来调用unpack
。可以这样做:
val typeUrlToCompanion = Map[String, scalapb.GeneratedMessageCompanion[_ <: scalapb.GeneratedMessage]](
"type.googleapis.com/myexample.Person2" -> Person2
// more types
)
然后用
解压
envelope.getEntity.unpack(typeUrlToCompanion(envelope.getEntity.typeUrl)
这会给你一个 GeneratedMessage
- 所有 ScalaPB 消息的父特征。
但是,这样做之后,您将遇到另一个问题。 Spark 数据帧是结构化的,它们有一个由命名列和类型组成的模式,就像关系数据库中的 table 一样。在构建数据框时,需要在运行时知道模式。但是,您似乎想要创建一个数据框,其中每一行都有不同的类型(无论 Any
恰好是什么),并且类型的集合只有在解包时才知道 - 所以这与数据框的设计不兼容。
取决于您想做什么,有几个选项可供考虑:
- 不使用
Any
,而是使用 protobufs 的 oneof
特性,这样所有可能的类型都是已知的,并且可以自动创建模式(并且您不必处理解包任何)
- 或者,按
typeUrl
对数据框进行分区,这样您最终会得到不同的数据框,其中每个数据框的所有项目都属于同一类型。然后用已知类型解压每一个。可以使用上面的 typeUrlToCompanion
方法来完成。
我有以下 protobuf 设置:
import "google/protobuf/any.proto";
message EntityEnvelope {
string id = 1;
string entity_type = 2;
google.protobuf.Any entity = 3;
}
message EntityABC {
string name = 1;
}
message EntityXYZ {
string desc = 1;
}
其中 EntityEnvelope.entity
可以是打包为 google.protobuf.Any
的任何类型。每条 protobuf 消息都以 Base64 编码存储在磁盘中。
阅读这些消息时,如果我在解包时在编译类型中使用特定的实体类型,它会完美地工作:
import scalapb.spark.Implicits._
spark.read.format("text")
.load("/tmp/entity").as[String]
.map { s => Base64.getDecoder.decode(s) }
.map { bytes => EntityEnvelope.parseFrom(bytes) }
.map { envelope => envelope.getEntity.unpack[EntityXYZ]}
.show()
但我想使用相同的代码在运行时读取任何类型的实体,而不必指定其类型。我得到的“更接近”是(但甚至没有编译):
val entityClass = Class.forName(qualifiedEntityClassNameFromRuntime)
spark.read.format("text")
.load("/tmp/entity").as[String]
.map { s => Base64.getDecoder.decode(s) }
.map { bytes => EntityEnvelope.parseFrom(bytes) }
.map { envelope => toJavaProto(envelope.getEntity).unpack(entityClass)} // ERROR: No implicits found for Encoder[Any]
.show()
由于 Any
包含 typeUrl
,是否可以找到正确的描述符并在运行时自动解压,而无需在编译时指定类型?
要在编译时解压您不知道其类型的 Anys,您需要构建一个从 typeUrl
到您可能期望的所有类型的伴随对象的映射,并使用它来调用unpack
。可以这样做:
val typeUrlToCompanion = Map[String, scalapb.GeneratedMessageCompanion[_ <: scalapb.GeneratedMessage]](
"type.googleapis.com/myexample.Person2" -> Person2
// more types
)
然后用
解压envelope.getEntity.unpack(typeUrlToCompanion(envelope.getEntity.typeUrl)
这会给你一个 GeneratedMessage
- 所有 ScalaPB 消息的父特征。
但是,这样做之后,您将遇到另一个问题。 Spark 数据帧是结构化的,它们有一个由命名列和类型组成的模式,就像关系数据库中的 table 一样。在构建数据框时,需要在运行时知道模式。但是,您似乎想要创建一个数据框,其中每一行都有不同的类型(无论 Any
恰好是什么),并且类型的集合只有在解包时才知道 - 所以这与数据框的设计不兼容。
取决于您想做什么,有几个选项可供考虑:
- 不使用
Any
,而是使用 protobufs 的oneof
特性,这样所有可能的类型都是已知的,并且可以自动创建模式(并且您不必处理解包任何) - 或者,按
typeUrl
对数据框进行分区,这样您最终会得到不同的数据框,其中每个数据框的所有项目都属于同一类型。然后用已知类型解压每一个。可以使用上面的typeUrlToCompanion
方法来完成。