python Spark avro

python Spark avro

尝试编写 avro 时,出现以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 35.0 failed 1 times, most recent failure: Lost task 7.0 in stage 35.0 (TID 110, localhost): java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.avro.mapred.AvroWrapper

我读入了一个包含 3 条记录的 avro 文件:

avro_rdd = sc.newAPIHadoopFile(
    "threerecords.avro",
    "org.apache.avro.mapreduce.AvroKeyInputFormat",
    "org.apache.avro.mapred.AvroKey",
    "org.apache.hadoop.io.NullWritable",
    keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
    conf=None)

output = avro_rdd.map(lambda x: x[0]).collect()

然后我尝试写出一条记录(输出保存在 avro 中):

conf = {"avro.schema.input.key": reduce(lambda x, y: x + y, sc.textFile("myschema.avsc", 1).collect())}

sc.parallelize([output[0]]).map(lambda x: (x, None)).saveAsNewAPIHadoopFile(
    "output.avro",
    "org.apache.avro.mapreduce.AvroKeyOutputFormat",
    "org.apache.avro.mapred.AvroKey",
    "org.apache.hadoop.io.NullWritable",
    keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
    conf=conf)

我如何绕过 error/write 成功输出单个 avro 记录?我知道我的架构是正确的,因为它来自 avro 本身。

目前似乎不支持此功能。您现在正在尝试将 java 地图用作 Avro 记录并再次将其转换为 Java 地图。这就是为什么您会收到有关 java 哈希图的错误的原因。

有来自 staslos 的拉取请求以添加 Avro 输出格式,有关拉取请求和示例,请参阅 link

AvroConverters.scala 中缺少一个转换器,用于将 java 映射转换回 avro 格式。

上周我遇到了完全相同的问题。基于 rfkortekass 的回答,我使用了来自 staslos 的 pull request 并尝试设置一个在 avro 文件中读取/写入的简单示例。

我设法让一个简单的演示工作。你可以在这里找到文件 https://github.com/totor31/spark-avro-python-converters

我对 scala、java 和 maven 一无所知,所以这是非常实验性的:我的主要目标是从 spark 示例目录中提取数量非常有限的文件,以获得一个编译框架允许生成可用的 jar 文件。

如果更有知识的人想与我的存储库进行交互,请随时询问。

非常感谢 rfkortekass 指出拉取请求,它为我节省了很多时间。

几个月后回来:

在新版本中使用 spark DataFrames 是编写 avro 文件的好方法:参见 https://github.com/databricks/spark-avro