数据源io.pivotal.greenplum.spark.GreenplumRelationProvider不支持流式写入
Data source io.pivotal.greenplum.spark.GreenplumRelationProvider does not support streamed writing
我正在尝试从 kafka 读取数据并使用 spark 将其上传到 greenplum 数据库。我正在使用 greenplum-spark 连接器,但我得到数据源 io.pivotal.greenplum.spark.GreenplumRelationProvider 不支持流式写入。
是greenplum源不支持流数据吗?我可以在网站上看到 "Continuous ETL pipeline (streaming)".
我尝试将数据源作为 "greenplum" 和 "io.pivotal.greenplum.spark.GreenplumRelationProvider" 放入 .format("datasource")
val EventStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", args(0))
.option("subscribe", args(1))
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load
val gscWriteOptionMap = Map(
"url" -> "link for greenplum",
"user" -> "****",
"password" -> "****",
"dbschema" -> "dbname"
)
val stateEventDS = EventStream
.selectExpr("CAST(key AS String)", "*****(value)")
.as[(String, ******)]
.map(_._2)
val EventOutputStream = stateEventDS.writeStream
.format("io.pivotal.greenplum.spark.GreenplumRelationProvider")
.options(gscWriteOptionMap)
.start()
assetEventOutputStream.awaitTermination()
您使用的是什么版本的 GPDB / Spark?
您可以绕过 spark,转而使用 Greenplum-Kafka 连接器。
https://gpdb.docs.pivotal.io/5170/greenplum-kafka/overview.html
在早期版本中,Greenplum-Spark 连接器公开了一个名为 io.pivotal.greenplum.spark.GreenplumRelationProvider 的 Spark 数据源,用于将 Greenplum 数据库中的数据读取到 Spark DataFrame 中。
在以后的版本中,连接器公开了一个名为greenplum 的Spark 数据源,用于在Spark 和Greenplum 数据库之间传输数据。
应该是这样的--
val EventOutputStream = stateEventDS.write.format("greenplum")
.options(gscWriteOptionMap)
.保存()
参见:https://greenplum-spark.docs.pivotal.io/160/write_to_gpdb.html
Greenplum Spark 结构化流
演示如何使用 JDBC
将 writeStream API 与 GPDB 一起使用
以下代码块使用速率流源读取并使用基于 JDBC 的接收器分批流式传输到 GPDB
基于批处理的流
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import scala.concurrent.duration._
val sq = spark.
readStream.
format("rate").
load.
writeStream.
format("myjdbc").
option("checkpointLocation", "/tmp/jdbc-checkpoint").
trigger(Trigger.ProcessingTime(10.seconds)).
start
基于记录的流媒体
这使用 ForeachWriter
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import scala.concurrent.duration._
val url="jdbc:postgresql://gsc-dev:5432/gpadmin"
val user ="gpadmin"
val pwd = "changeme"
val jdbcWriter = new JDBCSink(url,user, pwd)
val sq = spark.
readStream.
format("rate").
load.
writeStream.
format(jdbcWriter).
option("checkpointLocation", "/tmp/jdbc-checkpoint").
trigger(Trigger.ProcessingTime(10.seconds)).
start
我正在尝试从 kafka 读取数据并使用 spark 将其上传到 greenplum 数据库。我正在使用 greenplum-spark 连接器,但我得到数据源 io.pivotal.greenplum.spark.GreenplumRelationProvider 不支持流式写入。 是greenplum源不支持流数据吗?我可以在网站上看到 "Continuous ETL pipeline (streaming)".
我尝试将数据源作为 "greenplum" 和 "io.pivotal.greenplum.spark.GreenplumRelationProvider" 放入 .format("datasource")
val EventStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", args(0))
.option("subscribe", args(1))
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load
val gscWriteOptionMap = Map(
"url" -> "link for greenplum",
"user" -> "****",
"password" -> "****",
"dbschema" -> "dbname"
)
val stateEventDS = EventStream
.selectExpr("CAST(key AS String)", "*****(value)")
.as[(String, ******)]
.map(_._2)
val EventOutputStream = stateEventDS.writeStream
.format("io.pivotal.greenplum.spark.GreenplumRelationProvider")
.options(gscWriteOptionMap)
.start()
assetEventOutputStream.awaitTermination()
您使用的是什么版本的 GPDB / Spark? 您可以绕过 spark,转而使用 Greenplum-Kafka 连接器。
https://gpdb.docs.pivotal.io/5170/greenplum-kafka/overview.html
在早期版本中,Greenplum-Spark 连接器公开了一个名为 io.pivotal.greenplum.spark.GreenplumRelationProvider 的 Spark 数据源,用于将 Greenplum 数据库中的数据读取到 Spark DataFrame 中。
在以后的版本中,连接器公开了一个名为greenplum 的Spark 数据源,用于在Spark 和Greenplum 数据库之间传输数据。
应该是这样的--
val EventOutputStream = stateEventDS.write.format("greenplum") .options(gscWriteOptionMap) .保存()
参见:https://greenplum-spark.docs.pivotal.io/160/write_to_gpdb.html
Greenplum Spark 结构化流
演示如何使用 JDBC
将 writeStream API 与 GPDB 一起使用以下代码块使用速率流源读取并使用基于 JDBC 的接收器分批流式传输到 GPDB
基于批处理的流
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import scala.concurrent.duration._
val sq = spark.
readStream.
format("rate").
load.
writeStream.
format("myjdbc").
option("checkpointLocation", "/tmp/jdbc-checkpoint").
trigger(Trigger.ProcessingTime(10.seconds)).
start
基于记录的流媒体
这使用 ForeachWriter
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import scala.concurrent.duration._
val url="jdbc:postgresql://gsc-dev:5432/gpadmin"
val user ="gpadmin"
val pwd = "changeme"
val jdbcWriter = new JDBCSink(url,user, pwd)
val sq = spark.
readStream.
format("rate").
load.
writeStream.
format(jdbcWriter).
option("checkpointLocation", "/tmp/jdbc-checkpoint").
trigger(Trigger.ProcessingTime(10.seconds)).
start