使用 Spark 从 kafka 反序列化结构化流

Deserializing structured stream from kafka with Spark

我尝试使用下面的 scala 代码从 Spark 3.0.2 读取一个 kafka 主题。

这是我的进口商品:

import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode

从模式注册表服务检索模式并将其转换为 spark 格式的函数:

var schemaRegistryClient: SchemaRegistryClient = _
var kafkaAvroDeserializer: AvroDeserializer = _

def lookupTopicSchema(topic: String): String = {
  schemaRegistryClient.getLatestSchemaMetadata(topic).getSchema
}

def avroSchemaToSparkSchema(avroSchema: String): SchemaConverters.SchemaType = {
  SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
}

class 的定义以反序列化来自 kafka 主题的数据:

object DeserializerWrapper {
  val deserializer: AvroDeserializer = kafkaAvroDeserializer
}

class AvroDeserializer extends AbstractKafkaAvroDeserializer {
  def this(client: SchemaRegistryClient) {
    this()
    this.schemaRegistry = client
  }

  override def deserialize(bytes: Array[Byte]): String = {
    val value = super.deserialize(bytes)
    Option(value) match {
      case Some(Array()) | None => null
      case Some(a) =>  
        val genericRecord = a.asInstanceOf[GenericRecord]
        genericRecord.toString
    }
  }
}

执行作业的主要函数:

def main(): Unit = {

  val spark = SparkSession.builder.getOrCreate

  val bootstrapServers = "kafka1:9192"
  val topic = "sample_topic"
  val shemaRegistryName = "avro_schema_A"
  val schemaRegistryUrl = "http://myHost:8001"

  schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
  val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)

  spark.udf.register("deserialize", (bytes: Array[Byte]) => DeserializerWrapper.deserializer.deserialize(bytes))

  val kafkaDataFrame = (spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", topic)
    .option("kafka.group.id", "group_id")
    .option("startingOffsets", "latest")
    .load()
  )

  val valueDataFrame = kafkaDataFrame.selectExpr("""deserialize(value) AS message""")

  import org.apache.spark.sql.functions._

  val dfValueSchema = {
    val rawSchema = lookupTopicSchema(shemaRegistryName)
    avroSchemaToSparkSchema(rawSchema)
  }

  val formattedDataFrame = (valueDataFrame
    .select(from_json(col("message"), dfValueSchema.dataType).alias("parsed_value"))
    .select("parsed_value.*")
  )

  (formattedDataFrame
    .writeStream
    .format("console")
    .outputMode(OutputMode.Append())
    .option("truncate", false)
    .start()
    .awaitTermination()
  )
}

当我执行主函数时,作业崩溃并出现错误:

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+-----+-----+-----+-----+-----+
|col-A|col-B|col-C|col-D|col-E|col-F|col-G|
+-----+-----+-----+-----+-----+-----+-----+
+-----+-----+-----+-----+-----+-----+-----+

21/07/23 15:41:31 ERROR TaskSetManager: Task 0 in stage 3.0 failed 4 times; aborting job
21/07/23 15:41:31 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@227770fb is aborting.
21/07/23 15:41:31 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@227770fb aborted.
21/07/23 15:41:31 ERROR MicroBatchExecution: Query [id = 5c76f495-b2c8-46e7-90b0-dd715c8466f3, runId = c38ccde6-3c98-488c-bfd9-a71a17296503] terminated with error
org.apache.spark.SparkException: Writing job aborted.
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:413)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:322)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:329)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
        at org.apache.spark.sql.Dataset.$anonfun$collect(Dataset.scala:2940)
        at org.apache.spark.sql.Dataset.$anonfun$withAction(Dataset.scala:3618)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:586)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:581)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:581)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:223)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:191)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 13, 172.20.0.4, executor 1): org.apache.spark.SparkException: Failed to execute user defined function($read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda19/1813496428: (binary) => string)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1130)
        at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:457)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.subExpr_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at scala.collection.Iterator$$anon.next(Iterator.scala:459)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run(WriteToDataSourceV2Exec.scala:441)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2(WriteToDataSourceV2Exec.scala:385)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:462)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at $line55.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res3(<console>:41)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f(ScalaUDF.scala:157)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1127)
        ... 17 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage(DAGScheduler.scala:2008)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$adapted(DAGScheduler.scala:2007)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed(DAGScheduler.scala:973)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$adapted(DAGScheduler.scala:973)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
        at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:382)
        ... 37 more
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda19/1813496428: (binary) => string)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1130)
        at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:457)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.subExpr_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at scala.collection.Iterator$$anon.next(Iterator.scala:459)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run(WriteToDataSourceV2Exec.scala:441)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2(WriteToDataSourceV2Exec.scala:385)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:462)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at $line55.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res3(<console>:41)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f(ScalaUDF.scala:157)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1127)
        ... 17 more

udf 似乎抛出了 NullPointerException,但我明确实施了模式匹配来避免它。

有人可以帮忙吗? 非常感谢。

我终于把使用ABRiS库的反序列化过程搞定了。这是很容易与 Schema 注册表建立连接并对 kafka 流进行 Ser/Deser 操作的方法。

@OneCricketeer 非常感谢您对此提供的帮助。

有关信息,可以简单地建立与 SR 的连接:

import za.co.absa.abris.config.AbrisConfig
val abrisConfig = AbrisConfig
    .fromConfluentAvro
    .downloadReaderSchemaById(123)
    .usingSchemaRegistry("http://schema-registry.com:8081")

然后,转换过程用一行代码完成:

import za.co.absa.abris.avro.functions.from_avro
val outputDf = inputStreamDf.select(from_avro(col("value"), abrisConfig) as "data")