Spark 1.3.1 中如何使用 Java 读取 AVRO 数据?

How do I use Java to read AVRO data in Spark 1.3.1?

我正在尝试开发一个 Java 读取 AVRO 记录 (https://avro.apache.org/) from HDFS put there by a technology called Gobblin (https://github.com/linkedin/gobblin/wiki) 的 Spark 应用程序。

HDFS AVRO 数据文件示例:

/gobblin/work/job-output/KAFKA/kafka-gobblin-hdfs-test/20150910213846_append/part.task_kafka-gobblin-hdfs-test_1441921123461_0.avro

不幸的是,我发现 Java 中编写的示例有限。

我发现的最好的东西是用 Scala 编写的(使用 Hadoop 版本 1 库)。

如有任何帮助,我们将不胜感激。

目前我正在考虑使用以下代码,但我不确定如何从我的 AVRO 数据中提取值的 HashMap:

JavaPairRDD avroRDD = sc.newAPIHadoopFile( 
    path, 
    AvroKeyInputFormat.class, 
    AvroKey.class, 
    NullWritable.class, 
    new Configuration() );

// JavaPairRDD avroRDD = sc.newAPIHadoopFile( 
//    path, 
//    AvroKeyValueInputFormat.class, 
//    AvroKey.class, 
//    AvroValue.class, 
//    new Configuration() );

我当前的 Maven 依赖项:

<dependencies>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.3.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.7.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-mapred</artifactId>
        <version>1.7.6</version>
        <classifier>hadoop2</classifier>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.4.3</version>
    </dependency>


    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <scope>test</scope>
    </dependency>

</dependencies>

我写了一个小原型,它能够读取我的示例 Gobblin Avro 记录作为输入,并使用 Spark 输出相关结果 (spark-hdfs-avro-test)。值得一提的是,我需要解决几个问题。 如有任何意见或反馈,我们将不胜感激。

问题 1: 当前 Avro 版本 (1.7.7) 和 Java 序列化存在问题:

引用:

Spark relies on Java's Serializable interface to serialize objects. Avro objects don't implement Serializable. So, to work with Avro objects in Spark, you need to subclass your Avro generated classes and implement Serializable, e.g. https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java.

为了解决这个问题,我编写了自己的可序列化包装器 classes:

问题 2: 我的 Avro 消息不包含 "Key" 值。

不幸的是,我无法使用任何开箱即用的输入格式,不得不自己编写:AvroValueInputFormat

public class AvroValueInputFormat<T> extends FileInputFormat<NullWritable, AvroValue<T>> {

我无法使用以下内容:

# org.apache.avro.mapreduce.AvroKeyInputFormat
public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {

# org.apache.avro.mapreduce.AvroKeyValueInputFormat
public class AvroKeyValueInputFormat<K, V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {

问题 3: 我无法使用 AvroJob class setter 来设置架构值,我不得不手动执行此操作。

    hadoopConf.set( "avro.schema.input.key", Schema.create( org.apache.avro.Schema.Type.NULL ).toString() ); //$NON-NLS-1$
    hadoopConf.set( "avro.schema.input.value", Event.SCHEMA$.toString() ); //$NON-NLS-1$
    hadoopConf.set( "avro.schema.output.key", Schema.create( org.apache.avro.Schema.Type.NULL ).toString() ); //$NON-NLS-1$
    hadoopConf.set( "avro.schema.output.value", SeverityEventCount.SCHEMA$.toString() ); //$NON-NLS-1$