Spark 2.1 中kudu API 写入和更新的方法
How to write and update by kudu API in Spark 2.1
我想用Kudu来写和更新API。
这是 Maven 依赖项:
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.1.0</version>
</dependency>
在下面的代码中,我不知道 KuduContext
参数。
我在 spark2 中的代码-shell:
val kuduContext = new KuduContext("master:7051")
在 Spark 2.1 流中也出现同样的错误:
import org.apache.kudu.spark.kudu._
import org.apache.kudu.client._
val sparkConf = new SparkConf().setAppName("DirectKafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val messages = KafkaUtils.createDirectStream("")
messages.foreachRDD(rdd => {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val bb = spark.read.options(Map("kudu.master" -> "master:7051","kudu.table" -> "table")).kudu //good
val kuduContext = new KuduContext("master:7051") //error
})
然后报错:
org.apache.spark.SparkException: Only one SparkContext may be running
in this JVM (see SPARK-2243). To ignore this error, set
spark.driver.allowMultipleContexts = true. The currently running
SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
将您的 Kudu 版本更新到最新版本(当前为 1.5.0)。 KuduContext
在更高版本中将 SparkContext
作为输入参数,应该可以防止出现此问题。
此外,在 foreachRDD
之外进行初始 Spark 初始化。在您提供的代码中,将 spark
和 kuduContext
移出 foreach。此外,您不需要创建单独的 sparkConf
,您可以只使用较新的 SparkSession
。
val spark = SparkSession.builder.appName("DirectKafka").master("local[*]").getOrCreate()
import spark.implicits._
val kuduContext = new KuduContext("master:7051", spark.sparkContext)
val bb = spark.read.options(Map("kudu.master" -> "master:7051", "kudu.table" -> "table")).kudu
val messages = KafkaUtils.createDirectStream("")
messages.foreachRDD(rdd => {
// do something with the bb table and messages
})
我想用Kudu来写和更新API。 这是 Maven 依赖项:
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.1.0</version>
</dependency>
在下面的代码中,我不知道 KuduContext
参数。
我在 spark2 中的代码-shell:
val kuduContext = new KuduContext("master:7051")
在 Spark 2.1 流中也出现同样的错误:
import org.apache.kudu.spark.kudu._
import org.apache.kudu.client._
val sparkConf = new SparkConf().setAppName("DirectKafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val messages = KafkaUtils.createDirectStream("")
messages.foreachRDD(rdd => {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val bb = spark.read.options(Map("kudu.master" -> "master:7051","kudu.table" -> "table")).kudu //good
val kuduContext = new KuduContext("master:7051") //error
})
然后报错:
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
将您的 Kudu 版本更新到最新版本(当前为 1.5.0)。 KuduContext
在更高版本中将 SparkContext
作为输入参数,应该可以防止出现此问题。
此外,在 foreachRDD
之外进行初始 Spark 初始化。在您提供的代码中,将 spark
和 kuduContext
移出 foreach。此外,您不需要创建单独的 sparkConf
,您可以只使用较新的 SparkSession
。
val spark = SparkSession.builder.appName("DirectKafka").master("local[*]").getOrCreate()
import spark.implicits._
val kuduContext = new KuduContext("master:7051", spark.sparkContext)
val bb = spark.read.options(Map("kudu.master" -> "master:7051", "kudu.table" -> "table")).kudu
val messages = KafkaUtils.createDirectStream("")
messages.foreachRDD(rdd => {
// do something with the bb table and messages
})