如何在 Java Spark 中使用 foreachBatch() 写入 Cassandra?
How to write to Cassandra using foreachBatch() in Java Spark?
我有以下代码,我想使用 spark 2.4 结构化流写入 cassandra foreachBatch
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
Dataset<Row> values=df.selectExpr(
"split(value,',')[0] as field1",
"split(value,',')[1] as field2",
"split(value,',')[2] as field3",
"split(value,',')[3] as field4",
"split(value,',')[4] as field5");
//TODO write into cassandra
values.writeStream().foreachBatch(
new VoidFunction2<Dataset<String>, Long> {
public void call(Dataset<String> dataset, Long batchId) {
// Transform and write batchDF
}
).start();
尝试将其添加到您的 pom.xml:
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.4.2</version>
</dependency>
在那之后 import cassandra 隐含:
import org.apache.spark.sql.cassandra._
你可以在 df 上使用 cassandraFormat 方法:
dataset
.write
.cassandraFormat("table","keyspace")
.save()
当您使用 .forEachBatch
时,您的代码就像处理普通数据集一样工作...在 Java 中,代码可能如下所示(完整源代码为 here):
.foreachBatch((VoidFunction2<Dataset<Row>, Long>) (df, batchId) ->
df.write()
.format("org.apache.spark.sql.cassandra")
.options(ImmutableMap.of("table", "sttest", "keyspace", "test"))
.mode(SaveMode.Append)
.save()
)
2020 年 9 月更新:Spark Cassandra Connector 2.5.0
中添加了对 spark 结构化流的支持
我有以下代码,我想使用 spark 2.4 结构化流写入 cassandra foreachBatch
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
Dataset<Row> values=df.selectExpr(
"split(value,',')[0] as field1",
"split(value,',')[1] as field2",
"split(value,',')[2] as field3",
"split(value,',')[3] as field4",
"split(value,',')[4] as field5");
//TODO write into cassandra
values.writeStream().foreachBatch(
new VoidFunction2<Dataset<String>, Long> {
public void call(Dataset<String> dataset, Long batchId) {
// Transform and write batchDF
}
).start();
尝试将其添加到您的 pom.xml:
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.4.2</version>
</dependency>
在那之后 import cassandra 隐含:
import org.apache.spark.sql.cassandra._
你可以在 df 上使用 cassandraFormat 方法:
dataset
.write
.cassandraFormat("table","keyspace")
.save()
当您使用 .forEachBatch
时,您的代码就像处理普通数据集一样工作...在 Java 中,代码可能如下所示(完整源代码为 here):
.foreachBatch((VoidFunction2<Dataset<Row>, Long>) (df, batchId) ->
df.write()
.format("org.apache.spark.sql.cassandra")
.options(ImmutableMap.of("table", "sttest", "keyspace", "test"))
.mode(SaveMode.Append)
.save()
)
2020 年 9 月更新:Spark Cassandra Connector 2.5.0
中添加了对 spark 结构化流的支持