org.apache.avro.UnresolvedUnionException: 不并集 [{"type":"bytes","logicalType":"decimal","precision":18,"scale":4 },"null"]: 0.0000
org.apache.avro.UnresolvedUnionException: Not in union [{"type":"bytes","logicalType":"decimal","precision":18,"scale":4},"null"]: 0.0000
我正在尝试读取存储在 s3 中的配置单元 table 中的数据,将其转换为 Avro 格式,然后使用 Avro 记录来构建最终对象并将其推送到 kafka 主题。在我尝试发布的对象中,我有一个嵌套对象,其中包含字符串和小数类型的字段 (CarCostDetails)。当这个对象为 null 时,我可以将记录推送到 kafka,但是如果这个对象填充了任何值 (0, +/-),那么我会得到这个异常 org.apache.avro.UnresolvedUnionException: Not in union [{"type":"bytes","logicalType":"decimal","precision":18,"scale":4},"null"]: 40000.0000
当我做 producer.send()
我没有在我的项目中定义架构。我在我的项目中使用预定义模式作为外部依赖项
示例:
CarDataLoad.scala
class CarDataLoad extends ApplicationRunner with Serializable {
override def run(args: ApplicationArguments): Unit = {
val spark = new SparkSession.Builder()
.appName("s3-to-kafka")
.enableHiveSupport
.getOrCreate()
getData(spark)
}
def getData(sparkSession: SparkSession){
val avroPath = copyToAvro(sparkSession)
val car = sparkSession.read.avro(avroPath)
import sparkSession.implicits._
val avroData = car.select(
$"car_specs",
$"car_cost_details",
$"car_key"
)
ingestDataframeToKafka(sparkSession, avroData)
}
def copyToAvro(sparkSession: SparkSession): String = {
sourceDf = sparkSession.read.table("sample_table")
val targetPath = s"s3://some/target/path"
//write to a path (internal libraries to do that) in avro format
targetPath
}
def ingestDataframeToKafka(sparkSession: SparkSession, dataframe: sql.DataFrame): Unit ={
val batchProducer: CarProducerClass = new CarProducerClass(kafkaBootstapServers, kafkaSchemaRegistryUrl,
kafkaClientIdConfig, topic)
dataframe.collect.foreach(
row => {
val result = batchProducer.publishRecord(row)
}
)
batchProducer.closeProducer();
}
}
制作人class -
CarProducerClass.java
import org.apache.kafka.clients.producer.*;
import org.apache.spark.sql.Row;
import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.*;
public class CarProducerClass {
private void initializeProducer() {
log.info("Initializing producer");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstapServers);
props.put("schema.registry.url", kafkaSchemaRegistryUrl);
props.put("acks", "1");
props.put("batch.size", 16384);
props.put("buffer.memory", 33554432);
props.put("retries",3);
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaClientIdConfig);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("key.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicNameStrategy");
props.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");
log.info("Created producer");
producer = new KafkaProducer(props);
}
}
public Boolean publishRecord(Row row) {
Boolean publishRecordFlag = false;
if (producer == null) {
initializeProducer();
}
Car.Builder car = new Car.newBuilder();
car.setCarSpecs(buildCarSpecs(row.getAs("car_specs")))
car.setCarCostDetails(buildCarCostDetails(row.getAs("car_cost_details")))
CarKey.Builder carKey = new CarKey.Builder();
Row car_key = row.getAs("car_key");
carKey.setKey(car_key.getAs("car_id"))
try{
ProducerRecord<CarKey, Car> producerRecord
= new ProducerRecord(topic, null, System.currentTimeMillis(), carKey.build(), car.build());
//Exception occurs here
RecordMetadata metadata = (RecordMetadata) producer.send(producerRecord).get();
} catch (Exception e){
log.info("Exception caught");
e.printStackTrace();
}
public CarSpecs buildCarSpecs (Row car_specs){
CarSpecs.Builder kafkaCarSpecs = CarSpecs.newBuilder();
kafkaCarSpecs.setCarName("CX5");
kafkaCarSpecs.setCarBrand("Mazda");
}
public CostDetails buildCarCostDetails (Row car_cost_details){
CarSpecs.Builder kafkaCarSpecs = CarSpecs.newBuilder();
kafkaCarSpecs.setPurchaseCity(car_cost_details.getAs("purchase_city"));
kafkaCarSpecs.setPurchaseState(car_cost_details.getAs("purchase_state"));
kafkaCarSpecs.setBasePrice((BigDecimal)car_cost_details.getAs("base_price"));
kafkaCarSpecs.setTax((BigDecimal)car_cost_details.getAs("tax"));
kafkaCarSpecs.setTotalCost((BigDecimal)car_cost_details.getAs("total_cost"));
kafkaCarSpecs.setOtherCosts((BigDecimal)car_cost_details.getAs("other_costs"));
}
public void closeProducer(){
producer.close();
}}
Avro Schema(在另一个已生产的项目中预定义)
CarSpecs.avdl
protocol CarSpecsProtocol {
record CarSpecs {
string name;
string brand;
}
}
CarCostDetails.avdl
protocol CarCostDetailsProtocol {
record CarCostDetails {
string purchase_city;
string purchase_state;
decimal(18, 4) base_price;
union { decimal(18,4), null} tax;
union { decimal(18,4), null} total_cost;
union { decimal(18,4), null} other_costs;
}
}
Car.avdl
protocol CarProtocol {
import idl "CarCostDetails.avdl";
import idl "CarSpecs.avdl";
record Car {
union { null, CarSpecs} car_specs = null;
union { null, CarCostDetails} car_cost_details = null;
}
}
CarKey.avdl
protocol CarKeyProtocol {
record CarKey {
string id;
}
}
Avro 生成了 Java 个对象
@AvroGenerated
public class CarSpecs extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc
private String name;
private String brand;
}
@AvroGenerated
import java.math.BigDecimal;
public class CarCostDetails extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc
private String purchaseCity;
private String purchaseState;
private BigDecimal basePrice;
private BigDecimal tax;
private BigDecimal totalCost;
private BigDecimal otherCosts;
}
@AvroGenerated
public class Car extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc
private CarSpecs carSpecs;
private CarCostDetails carCostDetails;
}
@AvroGenerated
public class CarKey extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc
private String id;
}
我已经尝试过的:
- 像这样在 spark 命令中传递 spark-avro 包
--packages org.apache.spark:spark-avro_2.11:2.4.3
- 按照实际架构中的顺序对字段进行排序
- 为所有 decimal/BigDecimal 个字段设置默认值 0
- 正在检查这些字段的源数据类型是否为 java.Math.BigDecimal。是的。
- 将值显式转换为 BigDecimal(如上例)
以上所有仍然导致org.apache.avro.UnresolvedUnionException
向全局配置添加十进制转换(在向 Kafka 发送任何消息之前在运行时执行一次,例如,在 initializeProducer 中):
import org.apache.avro.specific.SpecificData;
import org.apache.avro.Conversions;
SpecificData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
您可能会在从应用于 MODEL$ 的 Avro 模式生成的静态构造函数中看到类似的行,因此请记住添加消息中使用的所有转换。
以下观察基于 avro 1.10.1 库源和运行时行为。
应应用 MODEL$ 配置(参见 SpecificData.getForClass),但如果 SpecificData 和您的消息 class 由不同的 class 加载器加载(在我的应用程序中就是这种情况——两个独立的 OSGI 包)。
在这种情况下 getForClass falls back to global instance.
然后 GenericData.resolveUnion throws UnresolvedUnionException 因为 conversionsByClass 不包含 [= 的值54=]键
getSchemaName 在 SpecificData returns Schema.Type.STRING 中覆盖 BigDecimal class(还有一些其他,参见 SpecificData.stringableClasses)。
然后将此 STRING 与联合模式 (getIndexNamed) 中定义的值匹配,但未找到(因为它不是“字节”或“空”)。
我正在尝试读取存储在 s3 中的配置单元 table 中的数据,将其转换为 Avro 格式,然后使用 Avro 记录来构建最终对象并将其推送到 kafka 主题。在我尝试发布的对象中,我有一个嵌套对象,其中包含字符串和小数类型的字段 (CarCostDetails)。当这个对象为 null 时,我可以将记录推送到 kafka,但是如果这个对象填充了任何值 (0, +/-),那么我会得到这个异常 org.apache.avro.UnresolvedUnionException: Not in union [{"type":"bytes","logicalType":"decimal","precision":18,"scale":4},"null"]: 40000.0000
当我做 producer.send()
我没有在我的项目中定义架构。我在我的项目中使用预定义模式作为外部依赖项
示例: CarDataLoad.scala
class CarDataLoad extends ApplicationRunner with Serializable {
override def run(args: ApplicationArguments): Unit = {
val spark = new SparkSession.Builder()
.appName("s3-to-kafka")
.enableHiveSupport
.getOrCreate()
getData(spark)
}
def getData(sparkSession: SparkSession){
val avroPath = copyToAvro(sparkSession)
val car = sparkSession.read.avro(avroPath)
import sparkSession.implicits._
val avroData = car.select(
$"car_specs",
$"car_cost_details",
$"car_key"
)
ingestDataframeToKafka(sparkSession, avroData)
}
def copyToAvro(sparkSession: SparkSession): String = {
sourceDf = sparkSession.read.table("sample_table")
val targetPath = s"s3://some/target/path"
//write to a path (internal libraries to do that) in avro format
targetPath
}
def ingestDataframeToKafka(sparkSession: SparkSession, dataframe: sql.DataFrame): Unit ={
val batchProducer: CarProducerClass = new CarProducerClass(kafkaBootstapServers, kafkaSchemaRegistryUrl,
kafkaClientIdConfig, topic)
dataframe.collect.foreach(
row => {
val result = batchProducer.publishRecord(row)
}
)
batchProducer.closeProducer();
}
}
制作人class - CarProducerClass.java
import org.apache.kafka.clients.producer.*;
import org.apache.spark.sql.Row;
import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.*;
public class CarProducerClass {
private void initializeProducer() {
log.info("Initializing producer");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstapServers);
props.put("schema.registry.url", kafkaSchemaRegistryUrl);
props.put("acks", "1");
props.put("batch.size", 16384);
props.put("buffer.memory", 33554432);
props.put("retries",3);
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaClientIdConfig);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("key.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicNameStrategy");
props.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");
log.info("Created producer");
producer = new KafkaProducer(props);
}
}
public Boolean publishRecord(Row row) {
Boolean publishRecordFlag = false;
if (producer == null) {
initializeProducer();
}
Car.Builder car = new Car.newBuilder();
car.setCarSpecs(buildCarSpecs(row.getAs("car_specs")))
car.setCarCostDetails(buildCarCostDetails(row.getAs("car_cost_details")))
CarKey.Builder carKey = new CarKey.Builder();
Row car_key = row.getAs("car_key");
carKey.setKey(car_key.getAs("car_id"))
try{
ProducerRecord<CarKey, Car> producerRecord
= new ProducerRecord(topic, null, System.currentTimeMillis(), carKey.build(), car.build());
//Exception occurs here
RecordMetadata metadata = (RecordMetadata) producer.send(producerRecord).get();
} catch (Exception e){
log.info("Exception caught");
e.printStackTrace();
}
public CarSpecs buildCarSpecs (Row car_specs){
CarSpecs.Builder kafkaCarSpecs = CarSpecs.newBuilder();
kafkaCarSpecs.setCarName("CX5");
kafkaCarSpecs.setCarBrand("Mazda");
}
public CostDetails buildCarCostDetails (Row car_cost_details){
CarSpecs.Builder kafkaCarSpecs = CarSpecs.newBuilder();
kafkaCarSpecs.setPurchaseCity(car_cost_details.getAs("purchase_city"));
kafkaCarSpecs.setPurchaseState(car_cost_details.getAs("purchase_state"));
kafkaCarSpecs.setBasePrice((BigDecimal)car_cost_details.getAs("base_price"));
kafkaCarSpecs.setTax((BigDecimal)car_cost_details.getAs("tax"));
kafkaCarSpecs.setTotalCost((BigDecimal)car_cost_details.getAs("total_cost"));
kafkaCarSpecs.setOtherCosts((BigDecimal)car_cost_details.getAs("other_costs"));
}
public void closeProducer(){
producer.close();
}}
Avro Schema(在另一个已生产的项目中预定义)
CarSpecs.avdl
protocol CarSpecsProtocol {
record CarSpecs {
string name;
string brand;
}
}
CarCostDetails.avdl
protocol CarCostDetailsProtocol {
record CarCostDetails {
string purchase_city;
string purchase_state;
decimal(18, 4) base_price;
union { decimal(18,4), null} tax;
union { decimal(18,4), null} total_cost;
union { decimal(18,4), null} other_costs;
}
}
Car.avdl
protocol CarProtocol {
import idl "CarCostDetails.avdl";
import idl "CarSpecs.avdl";
record Car {
union { null, CarSpecs} car_specs = null;
union { null, CarCostDetails} car_cost_details = null;
}
}
CarKey.avdl
protocol CarKeyProtocol {
record CarKey {
string id;
}
}
Avro 生成了 Java 个对象
@AvroGenerated
public class CarSpecs extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc
private String name;
private String brand;
}
@AvroGenerated
import java.math.BigDecimal;
public class CarCostDetails extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc
private String purchaseCity;
private String purchaseState;
private BigDecimal basePrice;
private BigDecimal tax;
private BigDecimal totalCost;
private BigDecimal otherCosts;
}
@AvroGenerated
public class Car extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc
private CarSpecs carSpecs;
private CarCostDetails carCostDetails;
}
@AvroGenerated
public class CarKey extends SpecificRecordBase implements SpecificRecord {
//basic generated fields like Schema SCHEMA$, SpecificData MODEL$ etc
private String id;
}
我已经尝试过的:
- 像这样在 spark 命令中传递 spark-avro 包
--packages org.apache.spark:spark-avro_2.11:2.4.3
- 按照实际架构中的顺序对字段进行排序
- 为所有 decimal/BigDecimal 个字段设置默认值 0
- 正在检查这些字段的源数据类型是否为 java.Math.BigDecimal。是的。
- 将值显式转换为 BigDecimal(如上例)
以上所有仍然导致org.apache.avro.UnresolvedUnionException
向全局配置添加十进制转换(在向 Kafka 发送任何消息之前在运行时执行一次,例如,在 initializeProducer 中):
import org.apache.avro.specific.SpecificData;
import org.apache.avro.Conversions;
SpecificData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
您可能会在从应用于 MODEL$ 的 Avro 模式生成的静态构造函数中看到类似的行,因此请记住添加消息中使用的所有转换。
以下观察基于 avro 1.10.1 库源和运行时行为。
应应用MODEL$ 配置(参见 SpecificData.getForClass),但如果 SpecificData 和您的消息 class 由不同的 class 加载器加载(在我的应用程序中就是这种情况——两个独立的 OSGI 包)。 在这种情况下 getForClass falls back to global instance.
然后 GenericData.resolveUnion throws UnresolvedUnionException 因为 conversionsByClass 不包含 [= 的值54=]键 getSchemaName 在 SpecificData returns Schema.Type.STRING 中覆盖 BigDecimal class(还有一些其他,参见 SpecificData.stringableClasses)。 然后将此 STRING 与联合模式 (getIndexNamed) 中定义的值匹配,但未找到(因为它不是“字节”或“空”)。