Spark 结构化流到 kudu 上下文
Spark structured stream to kudu context
我想阅读 kafka 主题,然后通过 spark streaming 将其写入 kudu table。
我的第一个方法
// sessions and contexts
val conf = new SparkConf().setMaster("local[2]").setAppName("TestMain")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
val sparkContext = sparkSession.sparkContext
val kuduContext = new KuduContext("...", sparkContext);
// structure
val schema: StructType = StructType(
StructField("userNo", IntegerType, true) ::
StructField("bandNo", IntegerType, false) ::
StructField("ipv4", StringType, false) :: Nil);
// kudu - prepare table
kuduContext.deleteTable("test_table");
kuduContext.createTable("test_table", schema, Seq("userNo"), new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(List("userNo").asJava, 3))
// get stream from kafka
val parsed = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "...")
.option("startingOffsets", "latest")
.option("subscribe", "feed_api_band_get_popular_post_list")
.load()
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
// write it to kudu
kuduContext.insertRows(parsed.toDF(), "test_table");
现在它抱怨
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:36)
我的第二种方法
看来我更改了我的代码以使用传统 KafkaUtils.createDirectStream
KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
).foreachRDD(rdd => {
rdd.foreach(record => {
// write to kudu.............
println(record.value());
})
});
ssc.start();
ssc.awaitTermination();
那么,哪一个是正确的方法呢?或者有什么方法可以使第一种方法成为 运行 吗?
Spark 版本为 2.2.0。
这两种方法似乎都是正确的。第一个使用 Spark Structured 流式处理方式,其中数据以表格形式附加。第二种方法是通过传统的 DStream 做事方式
我相信目前没有 Kudu 支持将 KuduContext 与 Spark 结构化流一起使用。我遇到了类似的问题,不得不转而使用传统的 Kudu 客户端并实现 ForeachWriter[Row] class。我使用了示例 here 并且能够实现解决方案。
第一种方法是不正确的,正如您已经从错误中看到的那样,非常清楚:Queries with streaming sources must be executed with writeStream.start()
。这只适用于批处理。
第二个使用 DStream
,因此不是结构化流。
还有第三种和第四种方法。
从 Kudu 1.9.0 开始,支持结构化流式传输 issue 修复,并按预期使用:
parsed
.writeStream
.format("kudu")
.option("kudu.master", kuduMaster)
.option("kudu.table", tableName)
.option("kudu.operation", operation)
.start()
请注意,如果您使用的是Cloudera,此方法仅适用于cdh6.2.0及以上版本:
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.9.0-cdh6.2.0</version>
<scope>test</scope>
</dependency>
我的解决方案是查看 code from SparkContext 并查看 kuduContext.insertRows(df, table)
和其他方法的作用,然后创建 ForeachWriter[Row]
:
val kuduContext = new KuduContext(master, sparkContext)
parsed
.toDF()
.writeStream
.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean =
kuduContext.tableExists(table)
override def process(value: Row): Unit = {
val kuduClient = kuduContext.syncClient
val kuduSession = kuduClient.newSession()
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND)
kuduSession.setIgnoreAllDuplicateRows(ignoreDuplicates)
val kuduTable = kuduClient.openTable(kuduSinkConfiguration.table)
val operation = getOperationFunction(kuduTable) //get the kuduTable.newInsert(), newUpsert(), etc.
kuduSession.setIgnoreAllDuplicateRows(ignoreDuplicates)
val row = operation.getRow
row.add("userNo", value.getAs[Int]("userNo"))
row.add("bandNo", value.getAs[Int]("bandNo"))
row.add("ipv4", value.getAs[String]("ipv4"))
kuduSession.apply(operation)
kuduSession.flush()
kuduSession.close()
}
override def close(errorOrNull: Throwable): Unit = Unit
})
.start()
我们还可以使用 Spark 版本 2.2.0 和 cloudera 版本 CDH 5.14 将结构化流数据加载到 Kudu table。您只需要下载对应于 CDH6.2 的 spark-kudu-2.2.11 jar 并将其作为 jar 传递到您的 spark-submit 命令中。这将在下面的语句中识别 kudu 格式并轻松加载数据框。
已解析
.writeStream
格式("kudu")
.option("kudu.master", kuduMaster)
.option("kudu.table", table名称)
.option("kudu.operation", 操作)
.start()
JAR 可以从以下网址下载:https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2_2.11/1.10.0-cdh6.3.2
Spark 提交语句:
spark2-submit --master local[*] --deploy-mode client --jars spark-sql-kafka-0-10_2.11-2.2.0.jar,kafka-clients-0.10.0.0.jar,spark-streaming-kafka-0-10_2.11-2.2.0.jar,kudu-spark2_2.11-1.10.0-cdh6.3.2.jar,kudu-client-1.10.0- cdh6.3.2.jar /path_of_python_code/rdd-stream-read.py
注意- Kudu-client 是可选的。可能必须与集群部署模式一起使用。
使用的 writestream 语句:
查询=dfCols.writeStream.format("kudu").选项("kudu.master","host:7051,host:7051,host:7051").选项("kudu.table","impala::db.kudu_table_name")。选项("kudu.operation","upsert").选项("checkpointLocation","file:///path_of_dir/checkpoint/").start()
我想阅读 kafka 主题,然后通过 spark streaming 将其写入 kudu table。
我的第一个方法
// sessions and contexts
val conf = new SparkConf().setMaster("local[2]").setAppName("TestMain")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
val sparkContext = sparkSession.sparkContext
val kuduContext = new KuduContext("...", sparkContext);
// structure
val schema: StructType = StructType(
StructField("userNo", IntegerType, true) ::
StructField("bandNo", IntegerType, false) ::
StructField("ipv4", StringType, false) :: Nil);
// kudu - prepare table
kuduContext.deleteTable("test_table");
kuduContext.createTable("test_table", schema, Seq("userNo"), new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(List("userNo").asJava, 3))
// get stream from kafka
val parsed = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "...")
.option("startingOffsets", "latest")
.option("subscribe", "feed_api_band_get_popular_post_list")
.load()
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
// write it to kudu
kuduContext.insertRows(parsed.toDF(), "test_table");
现在它抱怨
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:36)
我的第二种方法
看来我更改了我的代码以使用传统 KafkaUtils.createDirectStream
KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
).foreachRDD(rdd => {
rdd.foreach(record => {
// write to kudu.............
println(record.value());
})
});
ssc.start();
ssc.awaitTermination();
那么,哪一个是正确的方法呢?或者有什么方法可以使第一种方法成为 运行 吗?
Spark 版本为 2.2.0。
这两种方法似乎都是正确的。第一个使用 Spark Structured 流式处理方式,其中数据以表格形式附加。第二种方法是通过传统的 DStream 做事方式
我相信目前没有 Kudu 支持将 KuduContext 与 Spark 结构化流一起使用。我遇到了类似的问题,不得不转而使用传统的 Kudu 客户端并实现 ForeachWriter[Row] class。我使用了示例 here 并且能够实现解决方案。
第一种方法是不正确的,正如您已经从错误中看到的那样,非常清楚:Queries with streaming sources must be executed with writeStream.start()
。这只适用于批处理。
第二个使用 DStream
,因此不是结构化流。
还有第三种和第四种方法。
从 Kudu 1.9.0 开始,支持结构化流式传输 issue 修复,并按预期使用:
parsed
.writeStream
.format("kudu")
.option("kudu.master", kuduMaster)
.option("kudu.table", tableName)
.option("kudu.operation", operation)
.start()
请注意,如果您使用的是Cloudera,此方法仅适用于cdh6.2.0及以上版本:
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.9.0-cdh6.2.0</version>
<scope>test</scope>
</dependency>
我的解决方案是查看 code from SparkContext 并查看 kuduContext.insertRows(df, table)
和其他方法的作用,然后创建 ForeachWriter[Row]
:
val kuduContext = new KuduContext(master, sparkContext)
parsed
.toDF()
.writeStream
.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean =
kuduContext.tableExists(table)
override def process(value: Row): Unit = {
val kuduClient = kuduContext.syncClient
val kuduSession = kuduClient.newSession()
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND)
kuduSession.setIgnoreAllDuplicateRows(ignoreDuplicates)
val kuduTable = kuduClient.openTable(kuduSinkConfiguration.table)
val operation = getOperationFunction(kuduTable) //get the kuduTable.newInsert(), newUpsert(), etc.
kuduSession.setIgnoreAllDuplicateRows(ignoreDuplicates)
val row = operation.getRow
row.add("userNo", value.getAs[Int]("userNo"))
row.add("bandNo", value.getAs[Int]("bandNo"))
row.add("ipv4", value.getAs[String]("ipv4"))
kuduSession.apply(operation)
kuduSession.flush()
kuduSession.close()
}
override def close(errorOrNull: Throwable): Unit = Unit
})
.start()
我们还可以使用 Spark 版本 2.2.0 和 cloudera 版本 CDH 5.14 将结构化流数据加载到 Kudu table。您只需要下载对应于 CDH6.2 的 spark-kudu-2.2.11 jar 并将其作为 jar 传递到您的 spark-submit 命令中。这将在下面的语句中识别 kudu 格式并轻松加载数据框。
已解析 .writeStream 格式("kudu") .option("kudu.master", kuduMaster) .option("kudu.table", table名称) .option("kudu.operation", 操作) .start()
JAR 可以从以下网址下载:https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2_2.11/1.10.0-cdh6.3.2
Spark 提交语句:
spark2-submit --master local[*] --deploy-mode client --jars spark-sql-kafka-0-10_2.11-2.2.0.jar,kafka-clients-0.10.0.0.jar,spark-streaming-kafka-0-10_2.11-2.2.0.jar,kudu-spark2_2.11-1.10.0-cdh6.3.2.jar,kudu-client-1.10.0- cdh6.3.2.jar /path_of_python_code/rdd-stream-read.py
注意- Kudu-client 是可选的。可能必须与集群部署模式一起使用。
使用的 writestream 语句:
查询=dfCols.writeStream.format("kudu").选项("kudu.master","host:7051,host:7051,host:7051").选项("kudu.table","impala::db.kudu_table_name")。选项("kudu.operation","upsert").选项("checkpointLocation","file:///path_of_dir/checkpoint/").start()