Apache Ignite Spark 集成不适用于架构名称

Apache Ignite Spark Integration not working with Schema Name

我正在使用 Apache Ignite Spark 连接器 (ignite-spark-2.7.5) 使用以下代码将我的 DataFrame 保存到 Ignite table。

val ignite = Ignition.start(CONFIG); 
  catalog_opportunities_agg.write 
    .format(FORMAT_IGNITE) 
    .option(OPTION_CONFIG_FILE, CONFIG) 
    .option(OPTION_TABLE, "s1.club") 
    .option("user", "ignite") 
    .option("password", "ignite") 
    .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "club_id") 
    .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") 
    .mode(SaveMode.Overwrite) 
    .save() 
Ignition.stop(false);

该代码对于 public 模式(未提及模式名称)工作正常,但一旦我将模式名称 (s1) 添加到其中,它就开始失败。

错误堆栈:

19/09/04 10:24:06 ERROR Executor: Exception in task 7.0 in stage 2.0 (TID 208) 
java.util.NoSuchElementException: None.get 
        at scala.None$.get(Option.scala:347) 
        at scala.None$.get(Option.scala:345) 
        at org.apache.ignite.spark.impl.QueryHelper$.org$apache$ignite$spark$impl$QueryHelper$$savePartition(QueryHelper.scala:155) 
        at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable.apply(QueryHelper.scala:117) 
        at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable.apply(QueryHelper.scala:116) 
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:935) 
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:935) 
        at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2101) 
        at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2101) 
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
        at org.apache.spark.scheduler.Task.run(Task.scala:121) 
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408) 
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
        at java.lang.Thread.run(Thread.java:748) 
19/09/04 10:24:06 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 202) 
java.util.NoSuchElementException: None.get 
        at scala.None$.get(Option.scala:347) 
        at scala.None$.get(Option.scala:345) 
        at org.apache.ignite.spark.impl.QueryHelper$.org$apache$ignite$spark$impl$QueryHelper$$savePartition(QueryHelper.scala:155) 
        at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable.apply(QueryHelper.scala:117) 
        at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable.apply(QueryHelper.scala:116) 
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:935) 
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:935) 
        at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2101) 
        at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2101) 
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
        at org.apache.spark.scheduler.Task.run(Task.scala:121) 
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408) 
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
        at java.lang.Thread.run(Thread.java:748) 
19/09/04 10:24:06 ERROR Executor: Exception in task 5.0 in stage 2.0 (TID 206)

请指出我做错了什么。

我认为它不理解架构语法。而不是:

.option(OPTION_TABLE, "s1.club") 

尝试:

.option(OPTION_SCHEMA, "s1") 
.option(OPTION_TABLE, "club") 

请注意,只要 table 名称是唯一的,您就不需要指定架构:

If this is not specified, all schemata will be scanned for a table name which matches the given table name and the first matching table will be used. This option can be used when there are multiple tables in different schemata with the same table name to disambiguate the tables.