Scala:从 spark 结构化流读取 Kafka Avro 消息时出错

Scala: Error reading Kafka Avro messages from spark structured streaming

我一直在尝试使用 Scala 2.11 从 spark structured streaming (2.4.4) 读取 Kafka 的 avro 序列化消息。 为此,我使用了 spark-avro(下面的依赖项)。 我使用 confluent-kafka 库从 python 生成 kafka 消息。 Spark streaming 能够使用模式使用消息,但它无法正确读取字段的值。 我准备了一个简单的例子来说明问题,代码在这里可用: https://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala

我在 python 中创建记录,记录的架构是:

{
    "type": "record",
    "namespace": "example",
    "name": "RawRecord",
    "fields": [
        {"name": "int_field","type": "int"},
        {"name": "string_field","type": "string"}
    ]
}

它们是这样生成的:

from time import sleep
from confluent_kafka.avro import AvroProducer, load, loads

def generate_records():
    avro_producer_settings = {
        'bootstrap.servers': "localhost:19092",
        'group.id': 'groupid',
        'schema.registry.url': "http://127.0.0.1:8081"
    }
    producer = AvroProducer(avro_producer_settings)
    key_schema = loads('"string"')
    value_schema = load("schema.avsc")
    i = 1
    while True:
        row = {"int_field": int(i), "string_field": str(i)}
        producer.produce(topic="avro_topic", key="key-{}".format(i), 
                         value=row, key_schema=key_schema, value_schema=value_schema)
        print(row)
        sleep(1)
        i+=1

spark structured streaming(在 Scala 中)的消费是这样完成的:

import org.apache.spark.sql.{ Dataset, Row}
import org.apache.spark.sql.streaming.{ OutputMode, StreamingQuery}
import org.apache.spark.sql.avro._
...
        try {

            log.info("----- reading schema")
            val jsonFormatSchema = new String(Files.readAllBytes(
                                                    Paths.get("./src/main/resources/schema.avsc")))

            val ds:Dataset[Row] = sparkSession
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", kafkaServers)
                .option("subscribe", topic)
                .load()

            val output:Dataset[Row] = ds
                .select(from_avro(ds.col("value"), jsonFormatSchema) as "record")
                .select("record.*")

            output.printSchema()

            var query: StreamingQuery = output.writeStream.format("console")
                .option("truncate", "false").outputMode(OutputMode.Append()).start();


            query.awaitTermination();

        } catch {
            case e: Exception => log.error("onApplicationEvent error: ", e)
            //case _: Throwable => log.error("onApplicationEvent error:")
        }
...

在 spark 中打印模式,奇怪的是字段可以为空,尽管 avro 模式不允许这样做。 Spark 显示:

root
 |-- int_field: integer (nullable = true)
 |-- string_field: string (nullable = true)

我已经与 python 中的另一个消费者核对过消息,消息没有问题,但是 与消息内容无关,spark 显示了这一点。

+---------+------------+
|int_field|string_field|
+---------+------------+
|0        |            |
+---------+------------+

使用的主要依赖是:

<properties>
    <spark.version>2.4.4</spark.version>
    <scala.version>2.11</scala.version>
</properties>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-avro_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>

有谁知道为什么会这样?

提前致谢。 重现错误的代码在这里:

https://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala

解决方案

The problem was that i was using the confluent_kafka library in python and i was reading the avro messages in spark structured streaming using spark-avro library.

Confluent_kafka library uses confluent's avro format and spark avro reads using standard avro format.

The difference is that in order to use schema registry, confluent avro prepends the message with four bytes that indicates which schema should be used.

Source: https://www.confluent.io/blog/kafka-connect-tutorial-transfer-avro-schemas-across-schema-registry-clusters/

For being able to use confluent avro and read it from spark structured streaming i replaced spark-avro library for Abris ( abris allow to integrate avro and confluent avro with spark). https://github.com/AbsaOSS/ABRiS

解决方案

The problem was that i was using the confluent_kafka library in python and i was reading the avro messages in spark structured streaming using spark-avro library.

Confluent_kafka library uses confluent's avro format and spark avro reads using standard avro format.

The difference is that in order to use schema registry, confluent avro prepends the message with four bytes that indicates which schema should be used.

Source: https://www.confluent.io/blog/kafka-connect-tutorial-transfer-avro-schemas-across-schema-registry-clusters/

For being able to use confluent avro and read it from spark structured streaming i replaced spark-avro library for Abris ( abris allow to integrate avro and confluent avro with spark). https://github.com/AbsaOSS/ABRiS

我的依赖是这样改变的:

<properties>
        <spark.version>2.4.4</spark.version>
        <scala.version>2.11</scala.version>
</properties>
<!-- SPARK- AVRO -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-avro_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<!-- SPARK -AVRO AND CONFLUENT-AVRO -->
<dependency>
    <groupId>za.co.absa</groupId>
    <artifactId>abris_2.11</artifactId>
    <version>3.1.1</version>
</dependency>

在这里你可以看到一个简单的例子,它获取消息并将其值反序列化为 avro 和 confluent avro。

var input: Dataset[Row] = sparkSession.readStream
    //.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
    .format("kafka")
    .option("kafka.bootstrap.servers", kafkaServers)
    .option("subscribe", topicConsumer)
    .option("failOnDataLoss", "false")
    // .option("startingOffsets", "latest")
    // .option("startingOffsets", "earliest")
    .load();


// READ WITH spark-avro library (standard avro)

val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./src/main/resources/schema.avsc")))

var inputAvroDeserialized: Dataset[Row] = input
    .select(from_avro(functions.col("value"), jsonFormatSchema) as "record")
    .select("record.*")

//READ WITH Abris library (confuent avro) 

val schemaRegistryConfig = Map(
    SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
    SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> topicConsumer,
    SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME, // choose a subject name strategy
    SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest" // set to "latest" if you want the latest schema version to used
)

var inputConfluentAvroDeserialized: Dataset[Row] = inputConfluentAvroSerialized
    .select(from_confluent_avro(functions.col("value"), schemaRegistryConfig) as "record")
    .select("record.*")