DataProc Avro 版本导致图像 v1.0.0 错误

DataProc Avro Version Causing Error on Image v1.0.0

我们正在 运行使用 dataproc image 1.0 and spark-redshift 执行一些数据处理作业。

我们有两个集群,这里有一些细节:


自上周五 (2016-08-05 AEST) 的某个时候起,我们的代码在集群 B 上停止工作并出现以下错误,而集群 A 运行ning 没有问题。

以下代码可以在集群 B(或任何具有映像 v1.0.0 的新集群)上重现该问题,而在集群 A 上 运行s fine

示例 PySpark 代码:

from pyspark import SparkContext, SQLContext
sc = SparkContext()
sql_context = SQLContext(sc)

rdd = sc.parallelize([{'user_id': 'test'}])
df = rdd.toDF()

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "FOO")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "BAR")

df\
    .write\
    .format("com.databricks.spark.redshift") \
    .option("url", "jdbc:redshift://foo.ap-southeast-2.redshift.amazonaws.com/bar") \
    .option("dbtable", 'foo') \
    .option("tempdir", "s3n://bar") \
    .option("extracopyoptions", "TRUNCATECOLUMNS") \
    .mode("append") \
    .save()

以上代码在Cluster B上出现以下两种情况均失败,而在A上运行ning fine。请注意,RedshiftJDBC41-1.1.10.1010.jar 是通过集群初始化脚本创建的。


它产生的错误(Trace):

2016-08-08 06:12:23 WARN  TaskSetManager:70 - Lost task 6.0 in stage 45.0 (TID 121275, foo.bar.internal): 
    java.lang.NoSuchMethodError: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
    at org.apache.avro.mapreduce.AvroKeyRecordWriter.<init>(AvroKeyRecordWriter.java:55)
    at org.apache.avro.mapreduce.AvroKeyOutputFormat$RecordWriterFactory.create(AvroKeyOutputFormat.java:79)
    at org.apache.avro.mapreduce.AvroKeyOutputFormat.getRecordWriter(AvroKeyOutputFormat.java:105)
    at com.databricks.spark.avro.AvroOutputWriter.<init>(AvroOutputWriter.scala:82)
    at com.databricks.spark.avro.AvroOutputWriterFactory.newInstance(AvroOutputWriterFactory.scala:31)
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:255)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

2016-08-08 06:12:24 ERROR YarnScheduler:74 - Lost executor 63 on kinesis-ma-sw-o7he.c.bupa-ma.internal: Container marked as failed: container_1470632577663_0003_01_000065 on host: kinesis-ma-sw-o7he.c.bupa-ma.internal. Exit status: 50. Diagnostics: Exception from container-launch.
Container id: container_1470632577663_0003_01_000065
Exit code: 50
Stack trace: ExitCodeException exitCode=50:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
    at org.apache.hadoop.util.Shell.run(Shell.java:456)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

SparkRedshift:1.0.0 需要 com.databricks.spark-avro:2.0.1, which requires org.apache.avro:1.7.6.

检查集群 A 上 org.apache.avro.generic.GenericData 的版本时:

root@foo-bar-m:/home/foo# spark-shell \
>     --verbose \
>     --master "local[*]" \
>     --deploy-mode client \
>     --packages com.databricks:spark-redshift_2.10:1.0.0 \
>     --jars "/usr/lib/hadoop/lib/RedshiftJDBC41-1.1.10.1010.jar"

它产生 (Trace):

scala> import org.apache.avro.generic._
import org.apache.avro.generic._

scala> val c = GenericData.get()
c: org.apache.avro.generic.GenericData = org.apache.avro.generic.GenericData@496a514f

scala> c.getClass.getProtectionDomain().getCodeSource()
res0: java.security.CodeSource = (file:/usr/lib/hadoop/lib/bigquery-connector-0.7.5-hadoop2.jar <no signer certificates>)

同时 运行在群集 B 上执行相同的命令:

scala> import org.apache.avro.generic._
import org.apache.avro.generic._

scala> val c = GenericData.get()
c: org.apache.avro.generic.GenericData = org.apache.avro.generic.GenericData@72bec302

scala> c.getClass.getProtectionDomain().getCodeSource()
res0: java.security.CodeSource = (file:/usr/lib/hadoop/lib/bigquery-connector-0.7.7-hadoop2.jar <no signer certificates>)

Screenshot of Env 在集群 B 上。(对所有删节表示歉意)。 我们已经尝试了 here and here 中描述的方法,但没有成功。

这真的很令人沮丧,因为 DataProc 更新图像内容 而没有 与不可变版本完全相反的发布版本。现在我们的代码坏了,我们无法回滚到以前的版本。

抱歉给您带来麻烦!它当然不是为了在图像版本中发生破坏性更改。请注意,已推出子次要版本 "under the hood",用于非破坏性错误修复和 Dataproc 特定补丁。

您可以恢复使用上周之前的 1.0.* 版本,只需在从命令行部署集群时指定 --image-version 1.0.8

gcloud dataproc clusters create --image-version 1.0.8

编辑:为了进一步说明,我们调查了有问题的 Avro 版本,并验证了 Avro 版本号在任何最近的次要 Dataproc 版本中确实 没有 更改。核心问题是Hadoop本身有一个潜在的bug,Hadoop本身在/usr/lib/hadoop/lib/下带avro-1.7.4而Spark使用avro-1.7.7。巧合的是 Google 的 bigquery connectory 也使用了 avro-1.7.7 但事实证明这与已知的 Spark/Hadoop problem with 1.7.4 vs 1.7.7 是正交的。最近的图像更新被认为是非破坏性的,因为版本实际上没有改变,但是类加载顺序以不确定的方式改变了,Hadoop 的坏 avro 版本过去是纯粹靠运气从 Spark 作业中隐藏的,并且不再意外地隐藏在最新的图像中.

Dataproc 的 preview 图像目前包括对 Hadoop 层中 avro 版本的修复,当它出现时应该会进入任何未来的 Dataproc 1.1 版本;您可能想考虑尝试 preview 版本,看看 Spark 2.0 是否是无缝过渡。