带有 ScalaPB 的 SparkSQL:从 DataFrame 转换为原型数据集时跳过原型字段时出错
SparkSQL with ScalaPB: Error while skipping proto fields while converting from DataFrame to proto dataset
我有以下原型消息,需要使用 ScalaPB 通过 Spark 编写:
message EnforcementData
{
required int32 id = 1;
required int32 source = 2;
required int32 flagsEnforceOption = 4;
required int32 categoryEnforceOption = 5;
optional TypeA a= 100;
optional TypeB b= 101;
}
TypeA
和 TypeB
是接收方 EnforcementData
的子 classes,它使用 protobuf-net 对其进行反序列化。
现在,我的 Spark 数据框可以有 a 列或 b 列。假设 df 是我的数据框,我调用如下:
df.withColumn(b, null).as[EnforcementData].map(_.toByteArray)
类型 A 消息
df.withColumn(a, null).as[EnforcementData].map(_.toByteArray)
类型 B 消息
但是使用 protobuf-net 反序列化消息的接收方抛出 Whosebug 异常。我也尝试传递一个虚拟案例 class 而不是 null 但它似乎仍然不起作用。
请告诉我如何处理这个问题?
我能够通过重建案例 class 并明确跳过可选的子 class 字段来解决这个问题。即
//for TypeA messages,
df.withColumn(b, null)
.as[EnforcementData]
.map{case EnforcementData(id, source, flag, cat, a, _) => EnforcementData(id, source, flag, cat, a = a)
}
//for TypeB messages,
df.withColumn(s, null)
.as[EnforcementData]
.map{case EnforcementData(id, source, flag, cat, _, b) => EnforcementData(id, source, flag, cat, b = b)
}
我有以下原型消息,需要使用 ScalaPB 通过 Spark 编写:
message EnforcementData
{
required int32 id = 1;
required int32 source = 2;
required int32 flagsEnforceOption = 4;
required int32 categoryEnforceOption = 5;
optional TypeA a= 100;
optional TypeB b= 101;
}
TypeA
和 TypeB
是接收方 EnforcementData
的子 classes,它使用 protobuf-net 对其进行反序列化。
现在,我的 Spark 数据框可以有 a 列或 b 列。假设 df 是我的数据框,我调用如下:
df.withColumn(b, null).as[EnforcementData].map(_.toByteArray)
类型 A 消息df.withColumn(a, null).as[EnforcementData].map(_.toByteArray)
类型 B 消息
但是使用 protobuf-net 反序列化消息的接收方抛出 Whosebug 异常。我也尝试传递一个虚拟案例 class 而不是 null 但它似乎仍然不起作用。
请告诉我如何处理这个问题?
我能够通过重建案例 class 并明确跳过可选的子 class 字段来解决这个问题。即
//for TypeA messages,
df.withColumn(b, null)
.as[EnforcementData]
.map{case EnforcementData(id, source, flag, cat, a, _) => EnforcementData(id, source, flag, cat, a = a)
}
//for TypeB messages,
df.withColumn(s, null)
.as[EnforcementData]
.map{case EnforcementData(id, source, flag, cat, _, b) => EnforcementData(id, source, flag, cat, b = b)
}