Apache Iceberg 无法将数据写入 table

Can't write data into the table by Apache Iceberg

我正在尝试通过 Apache Iceberg 0.9.1 将简单数据写入 table,但显示错误消息。我想直接通过 Hadoop CRUD 数据。 我创建了一个 hadooptable ,并尝试从 table 中读取。之后我尝试将数据写入 table 。 我准备了一个包含一行的 json 文件。我的代码读取了json对象,并安排了数据的顺序,但是最后一步写数据总是报错。我已经更改了某些版本的依赖包,但显示了另一个错误消息。 包的版本有问题吗? 请帮助我。

这是我的源代码:


public class IcebergTest {

    public static void main(String[] args) {
        testWithoutCatalog();
        readDataWithouCatalog();
        writeDataWithoutCatalog();

    }

    public static void testWithoutCatalog() {

        Schema bookSchema = new Schema(optional(1, "title", Types.StringType.get()),
                optional(2, "price", Types.LongType.get()), 
                optional(3, "author", Types.StringType.get()),               
                optional(4, "genre", Types.StringType.get()));
        PartitionSpec bookspec = PartitionSpec.builderFor(bookSchema).identity("title").build();

        Configuration conf = new Configuration();
        
        String warehousePath = "hdfs://hadoop01:9000/warehouse_path/xgfying/books3";

        HadoopTables tables = new HadoopTables(conf);
        Table table = tables.create(bookSchema, bookspec, warehousePath);
    }

    public static void readDataWithouCatalog(){
        .......
    }

    public static void writeDataWithoutCatalog(){
        SparkSession spark = SparkSession.builder().master("local[2]").getOrCreate();
        Dataset<Row> df = spark.read().json("src/test/data/books3.json");       
        System.out.println(" this is the writing data : "+df.select("title","price","author","genre")
                                                            .first().toString());
        df.select("title","price","author","genre")
          .write().format("iceberg").mode("append")
          .save("hdfs://hadoop01:9000/warehouse_path/xgfying/books3");
        // System.out.println(df.write().format("iceberg").mode("append").toString());
    }

}

这是错误信息:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/11/18 15:51:36 INFO SparkContext: Running Spark version 2.4.5
.......
file:///C:/tmp/icebergtest1/src/test/data/books3.json, range: 0-75, partition values: [empty row]
20/11/18 15:51:52 ERROR Utils: Aborting task
java.lang.ExceptionInInitializerError
        at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:232)
        at org.apache.iceberg.spark.source.SparkAppenderFactory.newAppender(SparkAppenderFactory.java:61)
        at org.apache.iceberg.spark.source.BaseWriter.openCurrent(BaseWriter.java:105)
        at org.apache.iceberg.spark.source.PartitionedWriter.write(PartitionedWriter.java:63)
        at org.apache.iceberg.spark.source.Writer$Partitioned24Writer.write(Writer.java:271)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run.apply(WriteToDataSourceV2Exec.scala:118)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run.apply(WriteToDataSourceV2Exec.scala:116)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute.apply(WriteToDataSourceV2Exec.scala:67)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute.apply(WriteToDataSourceV2Exec.scala:66)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Cannot find constructor for interface org.apache.parquet.column.page.PageWriteStore       
        Missing org.apache.parquet.hadoop.ColumnChunkPageWriteStore(org.apache.parquet.hadoop.CodecFactory$BytesCompressor,org.apache.parquet.schema.MessageType,org.apache.parquet.bytes.ByteBufferAllocator,int) [java.lang.NoSuchMethodException: org.apache.parquet.hadoop.ColumnChunkPageWriteStore.<init>(org.apache.parquet.hadoop.CodecFactory$BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)]
        at org.apache.iceberg.common.DynConstructors$Builder.build(DynConstructors.java:235)
        at org.apache.iceberg.parquet.ParquetWriter.<clinit>(ParquetWriter.java:55)
        ... 19 more
20/11/18 15:51:52 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 2, attempt 0, stage 2.0)
20/11/18 15:51:52 ERROR DataWritingSparkTask: Aborted commit for partition 0 (task 2, attempt 0, stage 2.0)

这是我的 pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>icebergtest</groupId>
  <artifactId>icebergtest1</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>icebergtest1</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <iceberg.version>0.9.1</iceberg.version>
        <hadoop.version>2.7.0</hadoop.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
          <!-- org.apache.hadoop BEGIN-->
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
          <version>${hadoop.version}</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>${hadoop.version}</version>
          
    <!--将netty包排除-->
      <exclusions>
        <exclusion>
          <groupId>io.netty</groupId>
          <artifactId>netty</artifactId>
        </exclusion>
      </exclusions>
          
      </dependency>
      
      <!--解决io.netty.buffer.PooledByteBufAllocator.defaultNumHeapArena()I异常,-->
      <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
          <version>4.1.18.Final</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-core</artifactId>
          <version>${hadoop.version}</version>
      </dependency>
      
          <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-auth</artifactId>
          <version>${hadoop.version}</version>
      </dependency>
      
          <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
          <version>${hadoop.version}</version>
      </dependency>
      <!-- org.apache.hadoop END-->

      <!-- org.apache.iceberg BEGIN-->
      <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-core</artifactId>
        <version>${iceberg.version}</version>
      </dependency>


      <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-api</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-parquet</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
     

      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-common</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-orc</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-data</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-hive</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-arrow</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-spark</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-bundled-guava</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-spark-runtime</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-spark2</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-flink</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
        <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-pig</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      
      <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-mr</artifactId>
          <version>${iceberg.version}</version>
      </dependency>
      <!-- org.apache.iceberg END-->
      
      
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.4.5</version>
            <exclusions>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>commons-compiler</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        
        
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <!--<version>2.7.9</version>-->
            <version>2.6.6</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <!--<version>2.7.9.4</version>-->
            <version>2.6.5</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <!--<version>2.7.9</version>-->
            <version>2.6.5</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
        
        <dependency>
           <groupId>org.apache.parquet</groupId>
           <artifactId>parquet-avro</artifactId>
           <version>1.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
           <groupId>org.apache.parquet</groupId>
           <artifactId>parquet-column</artifactId>
           <version>1.11.1</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>



    
  </dependencies>
</project>

Missing org.apache.parquet.hadoop.ColumnChunkPageWriteStore(org.apache.parquet.hadoop.CodecFactory$BytesCompressor,org.apache.parquet.schema.MessageType,org.apache.parquet.bytes.ByteBufferAllocator,int) [java.lang.NoSuchMethodException: org.apache.parquet.hadoop.ColumnChunkPageWriteStore.(org.apache.parquet.hadoop.CodecFactory$BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)]

意味着你正在使用 ColumnChunkPageWriteStore 的构造函数,它接受 4 个参数,类型为 (org.apache.parquet.hadoop.CodecFactory$BytesCompressor, org.apache.parquet.schema.MessageType, org.apache.parquet.bytes.ByteBufferAllocator, int)

找不到您正在使用的构造函数。这就是为什么 NoSuchMethodError

根据 https://jar-download.com/artifacts/org.apache.parquet/parquet-hadoop/1.8.1/source-code/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java ,您需要 1.8.1 的 parquet-hadoop

将您的 mvn import 更改为旧版本。我查看了 1.8.1 源代码,它具有您需要的正确构造函数。