使用 Spark 和 java 编写 CSV 文件 - 处理空值和引号
Writing CSV file using Spark and java - handling empty values and quotes
初始数据在 Dataset 中,我正在尝试写入管道分隔文件,我希望每个非空单元格和非空值都放在引号中。空值或空值不应包含引号
result.coalesce(1).write()
.option("delimiter", "|")
.option("header", "true")
.option("nullValue", "")
.option("quoteAll", "false")
.csv(Location);
预期输出:
"London"||"UK"
"Delhi"|"India"
"Moscow"|"Russia"
当前输出:
London||UK
Delhi|India
Moscow|Russia
如果我将 "quoteAll" 更改为 "true",我得到的输出是:
"London"|""|"UK"
"Delhi"|"India"
"Moscow"|"Russia"
Spark 版本是 2.3,java 版本是 java8
编辑和警告: 没有看到 java 标签。这是 Scala 解决方案,它使用 foldLeft
作为遍历所有列的循环。如果将其替换为 Java 友好循环,则一切都应按原样运行。稍后我会试着回顾一下。
程序化解决方案可以是
val columns = result.columns
val randomColumnName = "RND"
val result2 = columns.foldLeft(result) { (data, column) =>
data
.withColumnRenamed(column, randomColumnName)
.withColumn(column,
when(col(randomColumnName).isNull, "")
.otherwise(concat(lit("\""), col(randomColumnName), lit("\"")))
)
.drop(randomColumnName)
}
这将生成周围带有 "
的字符串,并在 null 中写入空字符串。如果您需要保留空值,请保留它们。
那就写下来吧:
result2.coalesce(1).write()
.option("delimiter", "|")
.option("header", "true")
.option("quoteAll", "false")
.csv(Location);
Java回答。 CSV 转义不仅仅是在周围添加 " 符号。您应该在字符串内部处理 "。因此,让我们使用 StringEscapeUtils 并定义将调用它的 UDF。然后将 UDF 应用于每一列。
import org.apache.commons.text.StringEscapeUtils;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import java.util.Arrays;
public class Test {
void test(Dataset<Row> result, String Location) {
// define UDF
UserDefinedFunction escape = udf(
(String str) -> str.isEmpty()?"":StringEscapeUtils.escapeCsv(str), DataTypes.StringType
);
// call udf for each column
Column columns[] = Arrays.stream(result.schema().fieldNames())
.map(f -> escape.apply(col(f)).as(f))
.toArray(Column[]::new);
// save the result
result.select(columns)
.coalesce(1).write()
.option("delimiter", "|")
.option("header", "true")
.option("nullValue", "")
.option("quoteAll", "false")
.csv(Location);
}
}
旁注:coalesce(1) 是一个糟糕的决定。它收集一个执行者的所有数据。对于巨大的数据集,您可以在生产中获得执行器 OOM。
这当然不是一个有效的答案,我正在根据 Artem Aliev 给出的答案对其进行修改,但认为它对少数人有用,所以发布这个答案
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;<br/>
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;<br/>
public class Quotes {<br/>
private static final String DELIMITER = "|";
private static final String Location = "Give location here";
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.master("local")
.appName("Spark Session")
.enableHiveSupport()
.getOrCreate();
Dataset<Row> result = sparkSession.read()
.option("header", "true")
.option("delimiter",DELIMITER)
.csv("Sample file to read"); //Give the details of file to read here
UserDefinedFunction udfQuotesNonNull = udf(
(String abc) -> (abc!=null? "\""+abc+"\"":abc),DataTypes.StringType
);
result = result.withColumn("ind_val", monotonically_increasing_id()); //inducing a new column to be used for join as there is no identity column in source dataset
Dataset<Row> dataset1 = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val"))); //Dataset used for storing temporary results
Dataset<Row> dataset = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val"))); //Dataset used for storing output
String[] str = result.schema().fieldNames();
dataset1.show();
for(int j=0; j<str.length-1;j++)
{
dataset1 = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val")),(udfQuotesNonNull.apply(col(str[j]).cast("string")).alias("\""+str[j]+"\"")));
dataset=dataset.join(dataset1,"ind_val"); //Joining based on induced column
}
result = dataset.drop("ind_val");
result.coalesce(1).write()
.option("delimiter", DELIMITER)
.option("header", "true")
.option("quoteAll", "false")
.option("nullValue", null)
.option("quote", "\u0000")
.option("spark.sql.sources.writeJobUUID", false)
.csv(Location);
}
}
初始数据在 Dataset
result.coalesce(1).write()
.option("delimiter", "|")
.option("header", "true")
.option("nullValue", "")
.option("quoteAll", "false")
.csv(Location);
预期输出:
"London"||"UK"
"Delhi"|"India"
"Moscow"|"Russia"
当前输出:
London||UK
Delhi|India
Moscow|Russia
如果我将 "quoteAll" 更改为 "true",我得到的输出是:
"London"|""|"UK"
"Delhi"|"India"
"Moscow"|"Russia"
Spark 版本是 2.3,java 版本是 java8
编辑和警告: 没有看到 java 标签。这是 Scala 解决方案,它使用 foldLeft
作为遍历所有列的循环。如果将其替换为 Java 友好循环,则一切都应按原样运行。稍后我会试着回顾一下。
程序化解决方案可以是
val columns = result.columns
val randomColumnName = "RND"
val result2 = columns.foldLeft(result) { (data, column) =>
data
.withColumnRenamed(column, randomColumnName)
.withColumn(column,
when(col(randomColumnName).isNull, "")
.otherwise(concat(lit("\""), col(randomColumnName), lit("\"")))
)
.drop(randomColumnName)
}
这将生成周围带有 "
的字符串,并在 null 中写入空字符串。如果您需要保留空值,请保留它们。
那就写下来吧:
result2.coalesce(1).write()
.option("delimiter", "|")
.option("header", "true")
.option("quoteAll", "false")
.csv(Location);
Java回答。 CSV 转义不仅仅是在周围添加 " 符号。您应该在字符串内部处理 "。因此,让我们使用 StringEscapeUtils 并定义将调用它的 UDF。然后将 UDF 应用于每一列。
import org.apache.commons.text.StringEscapeUtils;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import java.util.Arrays;
public class Test {
void test(Dataset<Row> result, String Location) {
// define UDF
UserDefinedFunction escape = udf(
(String str) -> str.isEmpty()?"":StringEscapeUtils.escapeCsv(str), DataTypes.StringType
);
// call udf for each column
Column columns[] = Arrays.stream(result.schema().fieldNames())
.map(f -> escape.apply(col(f)).as(f))
.toArray(Column[]::new);
// save the result
result.select(columns)
.coalesce(1).write()
.option("delimiter", "|")
.option("header", "true")
.option("nullValue", "")
.option("quoteAll", "false")
.csv(Location);
}
}
旁注:coalesce(1) 是一个糟糕的决定。它收集一个执行者的所有数据。对于巨大的数据集,您可以在生产中获得执行器 OOM。
这当然不是一个有效的答案,我正在根据 Artem Aliev 给出的答案对其进行修改,但认为它对少数人有用,所以发布这个答案
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;<br/>
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;<br/>
public class Quotes {<br/>
private static final String DELIMITER = "|";
private static final String Location = "Give location here";
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.master("local")
.appName("Spark Session")
.enableHiveSupport()
.getOrCreate();
Dataset<Row> result = sparkSession.read()
.option("header", "true")
.option("delimiter",DELIMITER)
.csv("Sample file to read"); //Give the details of file to read here
UserDefinedFunction udfQuotesNonNull = udf(
(String abc) -> (abc!=null? "\""+abc+"\"":abc),DataTypes.StringType
);
result = result.withColumn("ind_val", monotonically_increasing_id()); //inducing a new column to be used for join as there is no identity column in source dataset
Dataset<Row> dataset1 = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val"))); //Dataset used for storing temporary results
Dataset<Row> dataset = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val"))); //Dataset used for storing output
String[] str = result.schema().fieldNames();
dataset1.show();
for(int j=0; j<str.length-1;j++)
{
dataset1 = result.select((udfQuotesNonNull.apply(col("ind_val").cast("string")).alias("ind_val")),(udfQuotesNonNull.apply(col(str[j]).cast("string")).alias("\""+str[j]+"\"")));
dataset=dataset.join(dataset1,"ind_val"); //Joining based on induced column
}
result = dataset.drop("ind_val");
result.coalesce(1).write()
.option("delimiter", DELIMITER)
.option("header", "true")
.option("quoteAll", "false")
.option("nullValue", null)
.option("quote", "\u0000")
.option("spark.sql.sources.writeJobUUID", false)
.csv(Location);
}
}