将 spark 数据框中每个分区的记录写入 xml 文件

Write records per partition in spark data frame to a xml file

我必须对 spark 数据帧中每个分区的文件进行记录计数,然后我必须将输出写入 XML 文件。

这是我的数据框。

dfMainOutputFinalWithoutNull.coalesce(1).write.partitionBy("DataPartition","StatementTypeCode")
  .format("csv")
  .option("nullValue", "")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsdisu/SPARK/FinancialLineItem/output")

现在我必须计算每个分区中每个文件中的记录数,然后将输出写入一个 XML 文件。

这就是我正在尝试的方式。

val count =dfMainOutputFinalWithoutNull.groupBy("DataPartition","StatementTypeCode").count

  count.write.format("com.databricks.spark.xml")
  .option("rootTag", "items")
  .option("rowTag", "item")
  .save("s3://trfsdisu/SPARK/FinancialLineItem/Descr")

我能够打印每个分区的总记录数并打印出来,但是当我尝试创建 xml 文件时,出现以下错误。

java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.xml. Please find packages at http://spark.apache.org/third-party-projects.html

我正在使用 Spark 2.2.0, Zeppelin 0.7.2

所以我必须导入 com.databricks.spark.xml 这个,但是为什么因为如果我不导入 csv 文件 com.databricks.spark.csv.

另外,我可以使用缓存 dfMainOutputFinalWithoutNull 因为我会使用它两次来写入它的数据然后计算它的分区记录然后写入 xml 文件吗?

并且我添加了这个依赖项

  <!-- https://mvnrepository.com/artifact/com.databricks/spark-xml_2.10 -->
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-xml_2.10</artifactId>
    <version>0.2.0</version>
</dependency>

并重新启动解释器。然后我收到以下错误。

java.lang.NullPointerException
    at org.apache.zeppelin.spark.Utils.invokeMethod(Utils.java:38)
    at org.apache.zeppelin.spark.Utils.invokeMethod(Utils.java:33)
    at org.apache.zeppelin.spark.SparkInterpreter.createSparkContext_2(SparkInterpreter.java:391)
    at org.apache.zeppelin.spark.SparkInterpreter.createSparkContext(SparkInterpreter.java:380)
    at org.apache.zeppelin.spark.SparkInterpreter.getSparkContext(SparkInterpreter.java:146)

我会回答我的问题

所以我在 zeppelin 中添加了以下依赖项

Scala 2.11

groupId: com.databricks
artifactId: spark-xml_2.11
version: 0.4.1

在飞艇下方添加

com.databricks:spark-xml_2.11:0.4.1

然后我就可以创建文件了。