持久化 Spark DataFrame 以点燃
Persisting Spark DataFrame to Ignite
我想将 Spark Dataframe 持久化到 Ignite。当我探索时,我遇到了 ignite-spark,这有助于做到这一点。但目前 ignite-spark 仅适用于 Spark 2.3,不适用于 Spark 2.4。
所以我回退到
的传统方法
df.write.format("jdbc")
现在,我的代码如下所示。
df.write
.format("jdbc")
.option("url", "jdbc:ignite:thin://127.0.0.1:10800")
.option("dbtable", "sample_table")
.option("user", "ignite")
.option("password", "ignite")
.mode(SaveMode.Overwrite)
.save()
我现在面临的问题是由于我的 DataFrame 中缺少 Ignite 所必需的主键,请提出如何解决此问题的建议。
下面的错误堆栈跟踪:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Exception in thread "main" java.sql.SQLException: No PRIMARY KEY defined for CREATE TABLE
at org.apache.ignite.internal.jdbc.thin.JdbcThinConnection.sendRequest(JdbcThinConnection.java:750)
at org.apache.ignite.internal.jdbc.thin.JdbcThinStatement.execute0(JdbcThinStatement.java:212)
at org.apache.ignite.internal.jdbc.thin.JdbcThinStatement.executeUpdate(JdbcThinStatement.java:340)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:859)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at com.ev.spark.job.Ignite$.delayedEndpoint$com$ev$spark$job$Ignite(Ignite.scala:52)
at com.ev.spark.job.Ignite$delayedInit$body.apply(Ignite.scala:9)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.ev.spark.job.Ignite$.main(Ignite.scala:9)
at com.ev.spark.job.Ignite.main(Ignite.scala)
编辑:
我正在寻找一种解决方案,以便在保留 DF 之前即时创建 table。在我的例子中,我的 DF 中已经有一个或多个字段,我必须以某种方式与 Spark 通信以用作 table 创建的主键。
如果它只需要一个具有唯一值的列(作为主键),您可以自己创建它,保存数据框,然后从 Ignite 中删除该列。
请参考这个link(可以直接去Directly with dataframe API
):
希望对您有所帮助!
尝试预先创建底层 Ignite table Ignite DDL. Define some primary key, such as id
. Then use Spark API to connect to Ignite and use this dynamically created Ignite table. Manually increment id
and pass into DataFrames API. For instance, this Ignite API 可用于生成唯一 ID。
至于不支持的Spark 2.4版本,我已经开了ticket for Ignite community. Hopefully, the ticket will be taken into 2.7.6 release8月
Spark 包含多个保存模式,如果您要使用的 table 存在,将应用这些模式:
* Overwrite - with this option you will try to re-create existed table or create new and load data there using IgniteDataStreamer implementation
* Append - with this option you will not try to re-create existed table or create new table and just load the data to existed table
* ErrorIfExists - with this option you will get the exception if the table that you are going to use exists
* Ignore - with this option nothing will be done in case if the table that you are going to use exists. If the table already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data.
在您的示例中,您尝试通过重新创建缓存来存储数据,但您没有提供 Ignite table 详细信息。请尝试在使用 "Overwrite" 保存模式时添加下一个选项:
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id")
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated")
https://apacheignite-fs.readme.io/docs/ignite-data-frame#section-saving-dataframes
此外,考虑使用追加模式而不是每次都重新创建table。
BR,
安德烈
我想将 Spark Dataframe 持久化到 Ignite。当我探索时,我遇到了 ignite-spark,这有助于做到这一点。但目前 ignite-spark 仅适用于 Spark 2.3,不适用于 Spark 2.4。
所以我回退到
的传统方法df.write.format("jdbc")
现在,我的代码如下所示。
df.write
.format("jdbc")
.option("url", "jdbc:ignite:thin://127.0.0.1:10800")
.option("dbtable", "sample_table")
.option("user", "ignite")
.option("password", "ignite")
.mode(SaveMode.Overwrite)
.save()
我现在面临的问题是由于我的 DataFrame 中缺少 Ignite 所必需的主键,请提出如何解决此问题的建议。
下面的错误堆栈跟踪:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Exception in thread "main" java.sql.SQLException: No PRIMARY KEY defined for CREATE TABLE
at org.apache.ignite.internal.jdbc.thin.JdbcThinConnection.sendRequest(JdbcThinConnection.java:750)
at org.apache.ignite.internal.jdbc.thin.JdbcThinStatement.execute0(JdbcThinStatement.java:212)
at org.apache.ignite.internal.jdbc.thin.JdbcThinStatement.executeUpdate(JdbcThinStatement.java:340)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:859)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at com.ev.spark.job.Ignite$.delayedEndpoint$com$ev$spark$job$Ignite(Ignite.scala:52)
at com.ev.spark.job.Ignite$delayedInit$body.apply(Ignite.scala:9)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.ev.spark.job.Ignite$.main(Ignite.scala:9)
at com.ev.spark.job.Ignite.main(Ignite.scala)
编辑:
我正在寻找一种解决方案,以便在保留 DF 之前即时创建 table。在我的例子中,我的 DF 中已经有一个或多个字段,我必须以某种方式与 Spark 通信以用作 table 创建的主键。
如果它只需要一个具有唯一值的列(作为主键),您可以自己创建它,保存数据框,然后从 Ignite 中删除该列。
请参考这个link(可以直接去Directly with dataframe API
):
希望对您有所帮助!
尝试预先创建底层 Ignite table Ignite DDL. Define some primary key, such as id
. Then use Spark API to connect to Ignite and use this dynamically created Ignite table. Manually increment id
and pass into DataFrames API. For instance, this Ignite API 可用于生成唯一 ID。
至于不支持的Spark 2.4版本,我已经开了ticket for Ignite community. Hopefully, the ticket will be taken into 2.7.6 release8月
Spark 包含多个保存模式,如果您要使用的 table 存在,将应用这些模式:
* Overwrite - with this option you will try to re-create existed table or create new and load data there using IgniteDataStreamer implementation
* Append - with this option you will not try to re-create existed table or create new table and just load the data to existed table
* ErrorIfExists - with this option you will get the exception if the table that you are going to use exists
* Ignore - with this option nothing will be done in case if the table that you are going to use exists. If the table already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data.
在您的示例中,您尝试通过重新创建缓存来存储数据,但您没有提供 Ignite table 详细信息。请尝试在使用 "Overwrite" 保存模式时添加下一个选项:
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id")
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated")
https://apacheignite-fs.readme.io/docs/ignite-data-frame#section-saving-dataframes
此外,考虑使用追加模式而不是每次都重新创建table。
BR, 安德烈