将 Spark Structured Streaming 与 Confluent Schema Registry 集成

Integrating Spark Structured Streaming with the Confluent Schema Registry

我在 Spark Structured Streaming 中使用 Kafka Source 来接收 Confluent 编码的 Avro 记录。我打算使用 Confluent Schema Registry,但与 spark structured streaming 的集成似乎是不可能的。

我看到了这个问题,但无法让它与 Confluent Schema Registry 一起使用。 Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)

我花了几个月的时间阅读源代码并进行测试。简而言之,Spark 只能处理 String 和 Binary 序列化。您必须手动反序列化数据。在 spark 中,创建 confluent rest 服务对象以获取模式。使用 Avro 解析器将响应对象中的模式字符串转换为 Avro 模式。接下来,正常阅读 Kafka 主题。然后使用 Confluent KafkaAvroDeSerializer 映射二进制类型 "value" 列。我强烈建议进入这些 类 的源代码,因为这里发生了很多事情,为了简洁起见,我将省略许多细节。

//Used Confluent version 3.2.2 to write this. 
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema

case class DeserializedFromKafkaRecord(key: String, value: String)

val schemaRegistryURL = "http://127.0.0.1:8081"

val topicName = "Schema-Registry-Example-topic1"
val subjectValueName = topicName + "-value"

//create RestService object
val restService = new RestService(schemaRegistryURL)

//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)

//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)

//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)

//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)

//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null

//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sql.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("subscribe", topicName)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 20)  //remove for prod
  .load()

//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
  row =>
    if (keyDeserializer == null) {
      keyDeserializer = new KafkaAvroDeserializer
      keyDeserializer.configure(props.asJava, true)  //isKey = true
    }
    if (valueDeserializer == null) {
      valueDeserializer = new KafkaAvroDeserializer
      valueDeserializer.configure(props.asJava, false) //isKey = false
    }

    //Pass the Avro schema.
    val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
    val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString

    DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString)
}

val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", false)
    .start()

This library 会为您完成这项工作。它通过 Spark Structured Stream 连接到 Confluent Schema Registry。

对于 Confluent,它处理与有效负载一起发送的架构 ID。

在 README 中,您将找到有关操作方法的代码片段。

披露:我为 ABSA 工作,我开发了这个库。

Databricks 现在提供此功能,但您必须为此付费:-(

dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()

参见: https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html 了解更多信息

A​​BRIS 是一个很好的免费替代品。请参阅:https://github.com/AbsaOSS/ABRiS 我们可以看到的唯一缺点是您需要在运行时提供 avro 模式文件,以便框架可以在将数据帧发布到 Kafka 主题之前在您的数据帧上强制执行此模式。

对于任何想在 pyspark 中执行此操作的人:felipe 引用的库之前在 JVM 上对我来说运行良好,因此我编写了一个小的包装函数将其集成到 python 中。这看起来很 hacky,因为很多在 scala 语言中隐含的类型必须在 py4j 中显式指定。到目前为止,即使在 spark 2.4.1 中也运行良好。

def expand_avro(spark_context, sql_context, data_frame, schema_registry_url, topic):
    j = spark_context._gateway.jvm
    dataframe_deserializer = j.za.co.absa.abris.avro.AvroSerDe.DataframeDeserializer(data_frame._jdf)
    naming_strategy = getattr(
        getattr(j.za.co.absa.abris.avro.read.confluent.SchemaManager,
                "SchemaStorageNamingStrategies$"), "MODULE$").TOPIC_NAME()
    conf = getattr(getattr(j.scala.collection.immutable.Map, "EmptyMap$"), "MODULE$")
    conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.url", schema_registry_url))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.topic", topic))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.id", "latest"))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.naming.strategy", naming_strategy))
    schema_path = j.scala.Option.apply(None)
    conf = j.scala.Option.apply(conf)
    policy = getattr(j.za.co.absa.abris.avro.schemas.policy.SchemaRetentionPolicies, "RETAIN_SELECTED_COLUMN_ONLY$")()
    data_frame = dataframe_deserializer.fromConfluentAvro("value", schema_path, conf, policy)
    data_frame = DataFrame(data_frame, sql_context)
    return data_frame

为此,您必须将库添加到 spark 包中,例如

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages ' \
    'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1,' \
    'org.apache.spark:spark-avro_2.11:2.4.1,' \
    'za.co.absa:abris_2.11:2.2.2 ' \
    '--repositories https://packages.confluent.io/maven/ ' \
    'pyspark-shell'

免责声明

此代码仅在本地主机上测试过,据报道在集群环境中会遇到序列化程序问题。有一个 alternative solution(第 7-9 步,第 10 步中使用 Scala 代码)将模式 ID 提取到列中,查找每个唯一 ID,然后使用模式广播变量,这将在规模上更好地工作。

此外,还有一个 external library AbsaOSS/ABRiS 也解决了使用 Registry with Spark


由于删除了最有用的 the other answer,我想通过一些重构和评论重新添加它。

这是需要的依赖项。 使用 Confluent 5.x 和 Spark 2.4

测试的代码
     <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <exclusions> 
                <!-- Conflicts with Spark's version -->
                <exclusion> 
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
     </dependency>
 
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

这里是 Scala 实现(仅在 master=local[*] 上本地测试)

第一部分,定义导入、一些字段和一些辅助方法来获取模式

import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode

object App {

  private var schemaRegistryClient: SchemaRegistryClient = _

  private var kafkaAvroDeserializer: AvroDeserializer = _

  def lookupTopicSchema(topic: String, isKey: Boolean = false) = {
    schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
  }

  def avroSchemaToSparkSchema(avroSchema: String) = {
    SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
  }

 // ... continues below

然后定义一个简单的主要方法来解析 CMD args 以获取 Kafka 详细信息

  def main(args: Array[String]): Unit = {
    val cmd: CommandLine = parseArg(args)

    val master = cmd.getOptionValue("master", "local[*]")
    val spark = SparkSession.builder()
      .appName(App.getClass.getName)
      .master(master)
      .getOrCreate()

    val bootstrapServers = cmd.getOptionValue("bootstrap-server")
    val topic = cmd.getOptionValue("topic")
    val schemaRegistryUrl = cmd.getOptionValue("schema-registry")

    consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)

    spark.stop()
  }


  // ... still continues

然后是消费Kafka主题并反序列化的重要方法

  private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = {
    import spark.implicits._

    // Setup the Avro deserialization UDF
    schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient) 
    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      kafkaAvroDeserializer.deserialize(bytes)
    )

    // Load the raw Kafka topic (byte stream)
    val rawDf = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()

    // Deserialize byte stream into strings (Avro fields become JSON)
    import org.apache.spark.sql.functions._
    val jsonDf = rawDf.select(
      // 'key.cast(DataTypes.StringType),  // string keys are simplest to use
      callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
      callUDF("deserialize", 'value).as("value")
      // excluding topic, partition, offset, timestamp, etc
    )

    // Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
    val dfValueSchema = {
      val rawSchema = lookupTopicSchema(topic)
      avroSchemaToSparkSchema(rawSchema)
    }

    // Apply structured schema to JSON stream
    val parsedDf = jsonDf.select(
      'key, // keys are usually plain strings
      // values are JSONified Avro records
      from_json('value, dfValueSchema.dataType).alias("value")
    ).select(
      'key,
      $"value.*" // flatten out the value
    )

    // parsedDf.printSchema()

    // Sample schema output
    // root
    // |-- key: string (nullable = true)
    // |-- header: struct (nullable = true)
    // |    |-- time: long (nullable = true)
    // |    ...

    // TODO: Do something interesting with this stream
    parsedDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("truncate", false)
      .start()
      .awaitTermination()
  }

 // still continues

命令行解析器允许传入 bootstrap 服务器、模式注册表、主题名称和 Spark 主机。

  private def parseArg(args: Array[String]): CommandLine = {
    import org.apache.commons.cli._

    val options = new Options

    val masterOption = new Option("m", "master", true, "Spark master")
    masterOption.setRequired(false)
    options.addOption(masterOption)

    val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
    bootstrapOption.setRequired(true)
    options.addOption(bootstrapOption)

    val topicOption = new Option("t", "topic", true, "Kafka topic")
    topicOption.setRequired(true)
    options.addOption(topicOption)

    val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
    schemaRegOption.setRequired(true)
    options.addOption(schemaRegOption)

    val parser = new BasicParser
    parser.parse(options, args)
  }

  // still continues

为了让上面的 UDF 工作,需要一个反序列化器来将字节的 DataFrame 转换为包含反序列化 Avro 的数据帧

  // Simple wrapper around Confluent deserializer
  class AvroDeserializer extends AbstractKafkaAvroDeserializer {
    def this(client: SchemaRegistryClient) {
      this()
      // TODO: configure the deserializer for authentication 
      this.schemaRegistry = client
    }

    override def deserialize(bytes: Array[Byte]): String = {
      val value = super.deserialize(bytes)
      value match {
        case str: String =>
          str
        case _ =>
          val genericRecord = value.asInstanceOf[GenericRecord]
          genericRecord.toString
      }
    }
  }

} // end 'object App'

将这些块中的每一个放在一起,在将 -b localhost:9092 -s http://localhost:8081 -t myTopic 添加到 运行 Configurations > Program Arguments

后,它在 IntelliJ 中工作

这是我的代码示例,将 spark 结构化流与 kafka 和模式注册表(scala 中的代码)集成在一起[=13​​=]

import org.apache.spark.sql.SparkSession
import io.confluent.kafka.schemaregistry.client.rest.RestService // <artifactId>kafka-schema-registry</artifactId>
import org.apache.spark.sql.avro.from_avro // <artifactId>spark-avro_${scala.compat.version}</artifactId>
import org.apache.spark.sql.functions.col

object KafkaConsumerAvro {

  def main(args: Array[String]): Unit = {

    val KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
    val SCHEMA_REGISTRY_URL = "http://localhost:8081"
    val TOPIC = "transactions"

    val spark: SparkSession = SparkSession.builder().appName("KafkaConsumerAvro").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
      .option("subscribe", TOPIC)
      .option("startingOffsets", "earliest") // from starting
      .load()

//     Prints Kafka schema with columns (topic, offset, partition e.t.c)
    df.printSchema()

//    Create REST service to access schema registry and retrieve topic schema (latest)
    val restService = new RestService(SCHEMA_REGISTRY_URL)
    val valueRestResponseSchema = restService.getLatestVersion(TOPIC + "-value")
    val jsonSchema = valueRestResponseSchema.getSchema

    val transactionDF = df.select(
      col("key").cast("string"), // cast to string from binary value
      from_avro(col("value"), jsonSchema).as("transaction"), // convert from avro value
      col("topic"),
      col("offset"),
      col("timestamp"),
      col("timestampType"))
    transactionDF.printSchema()

//    Stream data to console for testing
    transactionDF.writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination()
  }

}

从kafka主题读取时,我们有这种模式:

密钥:二进制 |值:二进制 |主题:字符串 |分区:整数 |偏移量:长 |时间戳:时间戳|时间戳类型:整数 |

正如我们所见,key 和 value 是二进制的,所以我们需要将 key 转换为字符串,在这种情况下,value 是 avro 格式的,所以我们可以通过调用 from_avro 函数来实现。

除了 Spark 和 Kafka 依赖,我们还需要这个依赖:

<!-- READ AND WRITE AVRO DATA -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-avro_${scala.compat.version}</artifactId>
  <version>${spark.version}</version>
</dependency>
<!-- INTEGRATION WITH SCHEMA REGISTRY -->
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry</artifactId>
  <version>${confluent.version}</version>
</dependency>

根据@cricket_007 的回答,我创建了以下解决方案,可以 运行 在我们的集群环境中,包括以下新功能:

  • 您需要添加广播变量以将一些值传输到集群环境的映射操作中。 Schema.Parser 和 KafkaAvroDeserializer 都不能在 spark 中序列化,所以这就是为什么你需要在 map 操作中初始化它们
  • 我的结构化流式处理使用了 foreachBatch 输出接收器。
  • 我应用 org.apache.spark.sql.avro.SchemaConverters 将 avro 模式格式转换为 spark StructType,以便您可以在 from_json 列函数中使用它来解析 Kafka 主题字段(键和值)中的数据帧。

首先,你需要加载一些包:

SCALA_VERSION="2.11"
SPARK_VERSION="2.4.4"
CONFLUENT_VERSION="5.2.2"

jars=(
  "org.apache.spark:spark-sql-kafka-0-10_${SCALA_VERSION}:${SPARK_VERSION}"    ## format("kafka")
  "org.apache.spark:spark-avro_${SCALA_VERSION}:${SPARK_VERSION}"    ## SchemaConverters
  "io.confluent:kafka-schema-registry:${CONFLUENT_VERSION}"   ## import io.confluent.kafka.schemaregistry.client.rest.RestService
  "io.confluent:kafka-avro-serializer:${CONFLUENT_VERSION}"   ## import io.confluent.kafka.serializers.KafkaAvroDeserializer
)

./bin/spark-shell --packages ${"${jars[*]}"// /,}

这是我在 spark-shell:

中测试的全部代码
import org.apache.avro.Schema
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.schemaregistry.client.rest.RestService

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.avro.SchemaConverters

import scala.collection.JavaConverters._
import java.time.LocalDateTime

spark.sparkContext.setLogLevel("Error")

val brokerServers = "xxx.yyy.zzz:9092"
val topicName = "mytopic" 
val schemaRegistryURL = "http://xxx.yyy.zzz:8081"

val restService = new RestService(schemaRegistryURL)

val exParser = new Schema.Parser
//-- For both key and value
val schemaNames = Seq("key", "value")
val schemaStrings = schemaNames.map(i => (i -> restService.getLatestVersion(s"$topicName-$i").getSchema)).toMap
val tempStructMap = schemaStrings.transform((k,v) => SchemaConverters.toSqlType(exParser.parse(v)).dataType)
val schemaStruct = new StructType().add("key", tempStructMap("key")).add("value", tempStructMap("value"))
//-- For key only 
// val schemaStrings = restService.getLatestVersion(s"$topicName-key").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType
//-- For value only 
// val schemaStrings = restService.getLatestVersion(s"$topicName-value").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType


val query = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerServers)
  .option("subscribe", topicName)
  .load()
  .writeStream
  .outputMode("append")
  //.option("checkpointLocation", s"cos://$bucket.service/checkpoints/$tableName")
  .foreachBatch((batchDF: DataFrame, batchId: Long) => {

    val bcTopicName = sc.broadcast(topicName)
    val bcSchemaRegistryURL = sc.broadcast(schemaRegistryURL)
    val bcSchemaStrings = sc.broadcast(schemaStrings)
    
    val rstDF = batchDF.map {
      row =>
      
        val props = Map("schema.registry.url" -> bcSchemaRegistryURL.value)
        //-- For both key and value
        val isKeys =  Map("key" -> true, "value" -> false)
        val deserializers = isKeys.transform{ (k,v) => 
            val des = new KafkaAvroDeserializer
            des.configure(props.asJava, v)
            des 
        }
        //-- For key only 
        // val deserializer = new KafkaAvroDeserializer
        // deserializer.configure(props.asJava, true)
        //-- For value only 
        // val deserializer = new KafkaAvroDeserializer
        // deserializer.configure(props.asJava, false)
        

        val inParser = new Schema.Parser
        //-- For both key and value
        val values = bcSchemaStrings.value.transform( (k,v) => 
            deserializers(k).deserialize(bcTopicName.value, row.getAs[Array[Byte]](k), inParser.parse(v)).toString)
        s"""{"key": ${values("key")}, "value": ${values("value")} }"""
        //-- For key only 
        // deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("key"), inParser.parse(bcSchemaStrings.value)).toString
        //-- For value only 
        // deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("value"), inParser.parse(bcSchemaStrings.value)).toString  
      }
      .select(from_json(col("value"), schemaStruct).as("root"))
      .select("root.*")

    println(s"${LocalDateTime.now} --- Batch $batchId: ${rstDF.count} rows")
    rstDF.printSchema
    rstDF.show(false)    

  })
  .trigger(Trigger.ProcessingTime("60 seconds"))
  .start()

query.awaitTermination()

pyspark 的另一个非常简单的替代方案(不完全支持模式注册,如模式注册、兼容性检查等)可能是:

import requests

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import *

# variables
topic = "my-topic"
schemaregistry = "http://localhost:8081"
kafka_brokers = "kafka1:9092,kafka2:9092"

# retrieve the latest schema
response = requests.get('{}/subjects/{}-value/versions/latest/schema'.format(schemaregistry, topic))

# error check
response.raise_for_status()

# extract the schema from the response
schema = response.text

# run the query
query = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", topic) \
    .load() \
    # The magic goes here:
    # Skip the first 5 bytes (reserved by schema registry encoding protocol)
    .selectExpr("substring(value, 6) as avro_value") \
    .select(from_avro(col("avro_value"), schema).alias("data")) \
    .select(col("data.my_field")) \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()

总结上面的一些回答,加上自己的一些经验,写这篇文章的时候是这些选项:

  1. 3rd party Abris library. This is what we used initially, but it doesn't seem to support a permissive mode where you can drop malformed packages. It will crash the stream 遇到格式错误的消息时。如果你能保证消息的有效性,那没关系,但这对我们来说是个问题,因为它在流重启后一直试图解析格式错误的消息。
  2. 解析 Avro 数据的自定义 UDF,如 OneCricketeer 的回答所述。提供最大的灵活性,但也需要最多的自定义代码。
  3. Using Databrick's from_avro variant 允许您简单地传递 URL,它会找到正确的模式并为您解析。工作得很好,但只在他们的环境中可用,因此很难在代码库中测试。
  4. Using Spark's built-in from_avro function. This functions allows you to pass a JSON schema and parse it from there. Only fix that you have to apply is that in Confluent's Wire format,在实际 Avro 二进制数据开始之前有一个魔法字节和 4 个模式字节,正如 dudssource 的回答中所指出的那样。您可以在 Scala 中像这样解析它:
val restService             = new RestService(espConfig.schemaRegistryUrl)
val valueRestResponseSchema = restService.getVersion(espConfig.fullTopicName + "-value", schemaVersion)
valueRestResponseSchema.getSchema

streamDf
  .withColumn("binary_data", substring(6, Int.MaxValue))
  .withColumn("parsed_data", from_avr('binary_data, jsonSchema, Map("MODE" -> "PERMISSIVE")))