如何在 Java Spark 中使用 foreachBatch() 写入 Cassandra?

How to write to Cassandra using foreachBatch() in Java Spark?

我有以下代码,我想使用 spark 2.4 结构化流写入 cassandra foreachBatch

        Dataset<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "topic1")
                .load();

        Dataset<Row> values=df.selectExpr(
                "split(value,',')[0] as field1",
                "split(value,',')[1] as field2",
                "split(value,',')[2] as field3",
                "split(value,',')[3] as field4",
                "split(value,',')[4] as field5");

//TODO write into cassandra 

values.writeStream().foreachBatch(
                    new VoidFunction2<Dataset<String>, Long> {
                public void call(Dataset<String> dataset, Long batchId) {

                    // Transform and write batchDF
       
            }
            ).start();

尝试将其添加到您的 pom.xml:

<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.4.2</version>
</dependency>

在那之后 import cassandra 隐含:

import org.apache.spark.sql.cassandra._

你可以在 df 上使用 cassandraFormat 方法:

dataset
      .write
      .cassandraFormat("table","keyspace")
      .save()

当您使用 .forEachBatch 时,您的代码就像处理普通数据集一样工作...在 Java 中,代码可能如下所示(完整源代码为 here):

.foreachBatch((VoidFunction2<Dataset<Row>, Long>) (df, batchId) ->
         df.write()
         .format("org.apache.spark.sql.cassandra")
         .options(ImmutableMap.of("table", "sttest", "keyspace", "test"))
         .mode(SaveMode.Append)
         .save()
)

2020 年 9 月更新:Spark Cassandra Connector 2.5.0

中添加了对 spark 结构化流的支持