从 pyspark 保存数据帧时在 Ignite table 中创建索引
Create index in Ignite table when save dataframe from pyspark
我使用以下代码将 Spark 数据帧保存到 Apache Ignite table:
df.write\
.format("ignite")\
.option("table","REPORT")\
.option("primaryKeyFields", ', '.join(map(str, df.schema.names[:-1])))\
.option("config",configFile)\
.option("compression", "gzip")\
.mode("overwrite")\
.save()
但是,我找不到如何使用此 owerwrite-saving 在字段上创建索引。
我需要这个,但是在 .save() 操作中:
CREATE INDEX REPORT_FIELD_IDX ON PUBLIC.REPORT (FIELD)
不,在使用 Spark 保存 DataFrame 时不能这样做。创建 table 和创建索引是两个不同的操作。
Here都是DataFrame保存到Ignite的选项,可以看到,没有创建索引的选项。
使用如下语法非常简单:
CREATE INDEX IF NOT EXISTS AGE_IDX ON "PUBLIC".Person (AGE)
如果没有创建新的 table,则 IF NOT EXISTS 将起作用,但什么也不会发生。否则,将创建索引。
它可以是 运行 使用任何可以与 Ignite 一起使用的 SQL 工具(webconsole、visor、sqlline、jdbc、odbc 等),但我猜你是打算从 Spark 工作中做到这一点。所以你可以尝试使用 IgniteSparkSession 或 IgniteRDD 来 运行 SQL 而不是 Ignite:
IgniteSparkSession igniteSession = IgniteSparkSession.builder()
.appName("Spark Ignite example")
.igniteConfig(configPath)
.getOrCreate();
igniteSession.sqlContext().sql("CREATE INDEX IF NOT EXISTS AGE_IDX ON \"PUBLIC\".Person (AGE)");
或
val cacheRdd = igniteContext.fromCache("partitioned")
val result = cacheRdd.sql(
"CREATE INDEX IF NOT EXISTS AGE_IDX ON \"PUBLIC\".Person (AGE)")
我使用以下代码将 Spark 数据帧保存到 Apache Ignite table:
df.write\
.format("ignite")\
.option("table","REPORT")\
.option("primaryKeyFields", ', '.join(map(str, df.schema.names[:-1])))\
.option("config",configFile)\
.option("compression", "gzip")\
.mode("overwrite")\
.save()
但是,我找不到如何使用此 owerwrite-saving 在字段上创建索引。
我需要这个,但是在 .save() 操作中:
CREATE INDEX REPORT_FIELD_IDX ON PUBLIC.REPORT (FIELD)
不,在使用 Spark 保存 DataFrame 时不能这样做。创建 table 和创建索引是两个不同的操作。
Here都是DataFrame保存到Ignite的选项,可以看到,没有创建索引的选项。
使用如下语法非常简单:
CREATE INDEX IF NOT EXISTS AGE_IDX ON "PUBLIC".Person (AGE)
如果没有创建新的 table,则 IF NOT EXISTS 将起作用,但什么也不会发生。否则,将创建索引。
它可以是 运行 使用任何可以与 Ignite 一起使用的 SQL 工具(webconsole、visor、sqlline、jdbc、odbc 等),但我猜你是打算从 Spark 工作中做到这一点。所以你可以尝试使用 IgniteSparkSession 或 IgniteRDD 来 运行 SQL 而不是 Ignite:
IgniteSparkSession igniteSession = IgniteSparkSession.builder()
.appName("Spark Ignite example")
.igniteConfig(configPath)
.getOrCreate();
igniteSession.sqlContext().sql("CREATE INDEX IF NOT EXISTS AGE_IDX ON \"PUBLIC\".Person (AGE)");
或
val cacheRdd = igniteContext.fromCache("partitioned")
val result = cacheRdd.sql(
"CREATE INDEX IF NOT EXISTS AGE_IDX ON \"PUBLIC\".Person (AGE)")