使用 Avro Schema 将 Hudi 与 Kafka 集成的问题
Issue for Integrating Hudi with Kafka using Avro Schema
我正在尝试将 Hudi 与 Kafka 主题集成。
遵循的步骤:
- 在 Confluent 中使用模式注册表中定义的模式创建了 Kafka 主题。
- 使用 kafka-avro-console-producer,我正在尝试生成数据。
- 运行 Hudi Delta Streamer 以连续模式消费数据。
基础设施:
- AWS EMR
- Spark 2.4.4
- Hudi 实用程序(尝试使用 0.6.0 和 0.7.0)
- Avro(已尝试 avro-1.8.2、avro-1.9.2 和 avro-1.10.0)
我收到以下错误堆栈跟踪。有人可以帮我解决这个问题吗?
21/02/24 13:02:08 ERROR TaskResultGetter: Exception while getting task result
org.apache.spark.SparkException: Error reading attempting to read avro data -- encountered an unknown fingerprint: 103427103938146401, not sure what schema to use. This could happen if you registered additional schemas after starting your spark context.
at org.apache.spark.serializer.GenericAvroSerializer$$anonfun.apply(GenericAvroSerializer.scala:141)
at org.apache.spark.serializer.GenericAvroSerializer$$anonfun.apply(GenericAvroSerializer.scala:138)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
at org.apache.spark.serializer.GenericAvroSerializer.deserializeDatum(GenericAvroSerializer.scala:137)
at org.apache.spark.serializer.GenericAvroSerializer.read(GenericAvroSerializer.scala:162)
at org.apache.spark.serializer.GenericAvroSerializer.read(GenericAvroSerializer.scala:47)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:371)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:88)
at org.apache.spark.scheduler.TaskResultGetter$$anon$$anonfun$run.apply$mcV$sp(TaskResultGetter.scala:72)
at org.apache.spark.scheduler.TaskResultGetter$$anon$$anonfun$run.apply(TaskResultGetter.scala:63)
at org.apache.spark.scheduler.TaskResultGetter$$anon$$anonfun$run.apply(TaskResultGetter.scala:63)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.scheduler.TaskResultGetter$$anon.run(TaskResultGetter.scala:62)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/02/24 13:02:08 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
21/02/24 13:02:08 INFO YarnScheduler: Cancelling stage 1
21/02/24 13:02:08 INFO YarnScheduler: Killing all running tasks in stage 1: Stage cancelled
21/02/24 13:02:08 INFO DAGScheduler: ResultStage 1 (isEmpty at DeltaSync.java:380) failed in 1.415 s due to Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error reading attempting to read avro data -- encountered an unknown fingerprint: 103427103938146401, not sure what schema to use. This could happen if you registered additional schemas after starting your spark context.
21/02/24 13:02:08 INFO DAGScheduler: Job 5 failed: isEmpty at DeltaSync.java:380, took 1.422265 s
21/02/24 13:02:08 ERROR HoodieDeltaStreamer: Shutting down delta-sync due to exception
org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error reading attempting to read avro data -- encountered an unknown fingerprint: 103427103938146401, not sure what schema to use. This could happen if you registered additional schemas after starting your spark context.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:2029)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:2028)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$take.apply(RDD.scala:1364)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty.apply$mcZ$sp(RDD.scala:1472)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty.apply(RDD.scala:1472)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty.apply(RDD.scala:1472)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1471)
at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:380)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:255)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService[=10=](HoodieDeltaStreamer.java:587)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
请打开一个 github 问题 (https://github.com/apache/hudi/issues) 以获得及时回复。
我能够在 spark 命令中使用正确版本的 jars 解决问题。
--packages org.apache.spark:spark-avro_2.12:3.0.0,org.apache.hudi:hudi-utilities-bundle_2.12:0.7.0,org.apache.avro:avro:1.10.1
当我在 spark 命令中添加上面的内容时,我再也看不到错误了。
我正在尝试将 Hudi 与 Kafka 主题集成。
遵循的步骤:
- 在 Confluent 中使用模式注册表中定义的模式创建了 Kafka 主题。
- 使用 kafka-avro-console-producer,我正在尝试生成数据。
- 运行 Hudi Delta Streamer 以连续模式消费数据。
基础设施:
- AWS EMR
- Spark 2.4.4
- Hudi 实用程序(尝试使用 0.6.0 和 0.7.0)
- Avro(已尝试 avro-1.8.2、avro-1.9.2 和 avro-1.10.0)
我收到以下错误堆栈跟踪。有人可以帮我解决这个问题吗?
21/02/24 13:02:08 ERROR TaskResultGetter: Exception while getting task result
org.apache.spark.SparkException: Error reading attempting to read avro data -- encountered an unknown fingerprint: 103427103938146401, not sure what schema to use. This could happen if you registered additional schemas after starting your spark context.
at org.apache.spark.serializer.GenericAvroSerializer$$anonfun.apply(GenericAvroSerializer.scala:141)
at org.apache.spark.serializer.GenericAvroSerializer$$anonfun.apply(GenericAvroSerializer.scala:138)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
at org.apache.spark.serializer.GenericAvroSerializer.deserializeDatum(GenericAvroSerializer.scala:137)
at org.apache.spark.serializer.GenericAvroSerializer.read(GenericAvroSerializer.scala:162)
at org.apache.spark.serializer.GenericAvroSerializer.read(GenericAvroSerializer.scala:47)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:371)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:88)
at org.apache.spark.scheduler.TaskResultGetter$$anon$$anonfun$run.apply$mcV$sp(TaskResultGetter.scala:72)
at org.apache.spark.scheduler.TaskResultGetter$$anon$$anonfun$run.apply(TaskResultGetter.scala:63)
at org.apache.spark.scheduler.TaskResultGetter$$anon$$anonfun$run.apply(TaskResultGetter.scala:63)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.scheduler.TaskResultGetter$$anon.run(TaskResultGetter.scala:62)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/02/24 13:02:08 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
21/02/24 13:02:08 INFO YarnScheduler: Cancelling stage 1
21/02/24 13:02:08 INFO YarnScheduler: Killing all running tasks in stage 1: Stage cancelled
21/02/24 13:02:08 INFO DAGScheduler: ResultStage 1 (isEmpty at DeltaSync.java:380) failed in 1.415 s due to Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error reading attempting to read avro data -- encountered an unknown fingerprint: 103427103938146401, not sure what schema to use. This could happen if you registered additional schemas after starting your spark context.
21/02/24 13:02:08 INFO DAGScheduler: Job 5 failed: isEmpty at DeltaSync.java:380, took 1.422265 s
21/02/24 13:02:08 ERROR HoodieDeltaStreamer: Shutting down delta-sync due to exception
org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: org.apache.spark.SparkException: Error reading attempting to read avro data -- encountered an unknown fingerprint: 103427103938146401, not sure what schema to use. This could happen if you registered additional schemas after starting your spark context.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:2029)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:2028)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$take.apply(RDD.scala:1364)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty.apply$mcZ$sp(RDD.scala:1472)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty.apply(RDD.scala:1472)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty.apply(RDD.scala:1472)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1471)
at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:380)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:255)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService[=10=](HoodieDeltaStreamer.java:587)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
请打开一个 github 问题 (https://github.com/apache/hudi/issues) 以获得及时回复。
我能够在 spark 命令中使用正确版本的 jars 解决问题。
--packages org.apache.spark:spark-avro_2.12:3.0.0,org.apache.hudi:hudi-utilities-bundle_2.12:0.7.0,org.apache.avro:avro:1.10.1
当我在 spark 命令中添加上面的内容时,我再也看不到错误了。