从火花更新卡桑德拉

update cassandra from spark

我是 cassandra tfm.foehis 中的 table,有数据。

我第一次从spark到cassandra充数据的时候,用了这组命令:

import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._

val wkdir="/home/adminbigdata/tablas/"
val fileIn= "originales/22_FOEHIS2.csv"
val fileOut= "22_FOEHIS_PRE2"
val fileCQL= "22_FOEHISCQL"

val data = sc.textFile(wkdir + fileIn).filter(!_.contains("----")).map(_.trim.replaceAll(" +", "")).map(_.dropRight(1)).map(_.drop(1)).map(_.replaceAll(",", "")).filter(array => array(6) != "MOBIDI").filter(array => array(17) != "").saveAsTextFile(wkdir + fileOut)
val firstDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "DROPMALFORMED").option("delimiter", "|").load(wkdir + fileOut)
val columns: Array[String] = firstDF.columns
val reorderedColumnNames: Array[String] = Array("hoclic","hodtac","hohrac","hotpac","honrac","hocdan","hocdrs","hocdsl","hocol","hocpny","hodesf","hodtcl","hodtcm","hodtea","hodtra","hodtrc","hodtto","hodtua","hohrcl","hohrcm","hohrea","hohrra","hohrrc","hohrua","holinh","holinr","honumr","hoobs","hooe","hotdsc","hotour","housca","houscl","houscm","housea","houser","housra","housrc")
val secondDF= firstDF.select(reorderedColumnNames.head, reorderedColumnNames.tail: _*)
secondDF.write.cassandraFormat("foehis", "tfm").save()

但是当我使用相同的脚本加载新数据时,出现错误。我不知道怎么了? 这是消息:

java.lang.UnsupportedOperationException: 'SaveMode is set to ErrorIfExists and Table
tfm.foehis already exists and contains data.
Perhaps you meant to set the DataFrame write mode to Append?
Example: df.write.format.options.mode(SaveMode.Append).save()" '

错误消息清楚地告诉您需要使用追加模式并显示了您可以使用它做什么。在您的情况下,这是因为目标 table 已经存在,并且写入模式设置为 "error if exists"。如果你还想写数据,代码应该如下:

import org.apache.spark.sql.SaveMode
secondDF.write.cassandraFormat("foehis", "tfm").mode(SaveMode.Append).save()