如何将数据序列化为 Spark 中的 AVRO 模式(Java)?

How to serialize the data to AVRO schema in Spark (with Java)?

我已经定义了一个 AVRO 模式,并使用 avro-tools 为这些模式生成了一些 classes。现在,我想将数据序列化到磁盘。我为此找到了一些关于 scala 的答案,但没有找到 Java。 class Article 是用 avro-tools 生成的,由我定义的模式制成。

这是我尝试执行此操作的代码的简化版本:

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<Article> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String fileContent = fileNameContent._2();

    // An object from my avro schema
    Article a = new Article(fileContent);

    Processing processing = new Processing();
    // .... some processing of the content here ... //

    processing.serializeArticleToDisk(avroFileName);

    return a;
});

其中serializeArticleToDisk(avroFileName)定义如下:

public void serializeArticleToDisk(String filename) throws IOException{
    // Serialize article to disk
    DatumWriter<Article> articleDatumWriter = new SpecificDatumWriter<Article>(Article.class);
    DataFileWriter<Article> dataFileWriter = new DataFileWriter<Article>(articleDatumWriter);
    dataFileWriter.create(this.article.getSchema(), new File(filename));
    dataFileWriter.append(this.article);
    dataFileWriter.close();
}

其中 Article 是我的 avro 架构。

现在,映射器向我抛出错误:

java.io.FileNotFoundException: hdfs:/...path.../avroFileName.avro (No such file or directory)   
at java.io.FileOutputStream.open0(Native Method)    
at java.io.FileOutputStream.open(FileOutputStream.java:270)     
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)   
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)   
at org.apache.avro.file.SyncableFileOutputStream.<init>(SyncableFileOutputStream.java:60)   
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at sentences.ProcessXML.serializeArticleToDisk(ProcessXML.java:207)     
. . . rest of the stacktrace ... 

虽然文件路径是正确的。

之后我使用了 collect() 方法,因此 map 函数中的其他所有内容都可以正常工作(序列化部分除外)。

我对 Spark 还很陌生,所以我不确定这是否真的是微不足道的事情。我怀疑我需要使用一些写入函数,而不是在映射器中进行写入(不过不确定这是否属实)。有什么解决办法吗?

编辑:

我显示的错误堆栈跟踪的最后一行实际上是在这部分:

dataFileWriter.create(this.article.getSchema(), new File(filename));

这是引发实际错误的部分。我假设 dataFileWriter 需要用其他东西替换。有什么想法吗?

看来你使用Spark的方式不对

Map是一个变换函数。仅调用 map 不会调用 RDD 的计算。您必须像 forEach()collect().

这样调用 action

另请注意,提供给 map 的 lambda 将在驱动程序中序列化并传输到集群中的某些 Node

已添加

尝试使用 Spark SQL 和 Spark-Avro 将 Spark DataFrame 保存为 Avro 格式:

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("/examples/people.txt")
    .map(Person::parse);

// Apply a schema to an RDD
DataFrame peopleDF = sqlContext.createDataFrame(people, Person.class);
peopleDF.write()
    .format("com.databricks.spark.avro")
    .save("/output");

这个解决方案没有使用数据帧,也没有抛出任何错误:

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.NullWritable;
import org.apache.avro.mapred.AvroKey;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Tuple2;

   .  .  .  .  .

// Serializing to AVRO
JavaPairRDD<AvroKey<Article>, NullWritable> javaPairRDD = processingFiles.mapToPair(r -> {    
    return new Tuple2<AvroKey<Article>, NullWritable>(new AvroKey<Article>(r), NullWritable.get());
});
Job job = AvroUtils.getJobOutputKeyAvroSchema(Article.getClassSchema());
javaPairRDD.saveAsNewAPIHadoopFile(outputDataPath, AvroKey.class, NullWritable.class, AvroKeyOutputFormat.class, 
        job.getConfiguration());

其中 AvroUtils.getJobOutputKeyAvroSchema 是:

public static Job getJobOutputKeyAvroSchema(Schema avroSchema) {
    Job job;

    try {
        job = new Job();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }

    AvroJob.setOutputKeySchema(job, avroSchema);
    return job;
}

可在此处找到 Spark + Avro 的类似内容 -> https://github.com/CeON/spark-utils