使用数据帧在 Java 中对 Spark 中的 n 列求和
Summing n columns in Spark in Java using dataframes
String[] col = {"a","b","c"}
数据:
id a b c d e
101 1 1 1 1 1
102 2 2 2 2 2
103 3 3 3 3 3
预期输出:- id 以及列字符串中指定的列总和
id (a+b+c)
101 3
102 6
103 9
如何使用数据帧来做到这一点?
您可以使用表达式创建字符串,然后使用 expr
创建列。换句话说,在这种情况下,您想要创建随后可以使用的字符串 "a+b+c"。这适用于任意数量的列。
在 Scala 中它可以如下所示(翻译成 Java 应该相当简单):
import org.apache.spark.sql.functions.expr
val df = Seq((101,1,1,1,1,1),(102,2,2,2,2,2),(103,3,3,3,3,3)).toDF("id", "a", "b", "c", "d", "e")
val cols = Seq("a", "b", "c")
val expression = cols.mkString("+")
val colName = "(" + expression + ")"
df.select($"id", expr(expression).as(colName))
这会给你:
+---+-------+
| id|(a+b+c)|
+---+-------+
|101| 3|
|102| 6|
|103| 9|
+---+-------+
有很多不同的方法可以做到这一点。你可以使用 map
,像这样:
val df = Seq((101,1,1,1,1,1),(102,2,2,2,2,2),(103,3,3,3,3,3)).toDF("id", "a", "b", "c", "d", "e")
df.map(row => (row.getString(0), row.getInt(1)+row.getInt(2)+row.getInt(3)))
.toDF("id", "a+b+c")
或者您可以使用 udf
,像这样:
import org.apache.spark.sql.functions._
import spark.implicits._
val addCols = udf((a: Int, b:Int, c: Int) => a+b+c)
df.select('id, addCols('a, 'b, 'c) as "a+b+c")
或者采纳 Shaido 的建议 :)
如果您正在使用 java
,您可以执行以下操作
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
static SparkConf conf = new SparkConf().setMaster("local").setAppName("simple");
static SparkContext sc = new SparkContext(conf);
static SQLContext sqlContext = new SQLContext(sc);
public static void main(String[] args) {
Dataset<Row> df = sqlContext.read()
.format("com.databricks.spark.csv")
.option("delimiter", " ")
.option("header", true)
.option("inferSchema", true)
.load("path to the input text file");
sqlContext.udf().register("sums", (Integer a, Integer b, Integer c) -> a+b+c, DataTypes.IntegerType);
df.registerTempTable("temp");
sqlContext.sql("SELECT id, sums(a, b, c) AS `(a+b+c)` FROM temp").show(false);
}
你的输出应该是
+---+-------+
|id |(a+b+c)|
+---+-------+
|101|3 |
|102|6 |
|103|9 |
+---+-------+
如果您更喜欢不使用 sql 查询并使用 api,那么您可以按以下方式进行操作
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;
UserDefinedFunction mode = udf((Integer a, Integer b, Integer c) -> a+b+c, DataTypes.IntegerType);
df.select(col("id"), mode.apply(col("a"), col("b"), col("c")).as("(a+b+c)")).show(false);
这在 Java 对我有用:
final var allDataFamilyDf = allDataDf.withColumn("FamilySize",
functions.col("SibSp").plus(functions.col("Parch")));
更简洁的Java方法(如@shaido-reinstate-monica所述):
String[] columnNames = {"a","b","c"}; // columnNames is the list of column names to be added together
Buffer<Column> sums = JavaConversions.asScalaBuffer(ImmutableList.of(columnNames).stream().map(name -> col(name)).collect(Collectors.toList()));
String expression = sums.mkString("+");
df.selectExpr("id", expression); // where df is the dataset with columns "id", "a", "b", and "c"
String[] col = {"a","b","c"}
数据:
id a b c d e
101 1 1 1 1 1
102 2 2 2 2 2
103 3 3 3 3 3
预期输出:- id 以及列字符串中指定的列总和
id (a+b+c)
101 3
102 6
103 9
如何使用数据帧来做到这一点?
您可以使用表达式创建字符串,然后使用 expr
创建列。换句话说,在这种情况下,您想要创建随后可以使用的字符串 "a+b+c"。这适用于任意数量的列。
在 Scala 中它可以如下所示(翻译成 Java 应该相当简单):
import org.apache.spark.sql.functions.expr
val df = Seq((101,1,1,1,1,1),(102,2,2,2,2,2),(103,3,3,3,3,3)).toDF("id", "a", "b", "c", "d", "e")
val cols = Seq("a", "b", "c")
val expression = cols.mkString("+")
val colName = "(" + expression + ")"
df.select($"id", expr(expression).as(colName))
这会给你:
+---+-------+
| id|(a+b+c)|
+---+-------+
|101| 3|
|102| 6|
|103| 9|
+---+-------+
有很多不同的方法可以做到这一点。你可以使用 map
,像这样:
val df = Seq((101,1,1,1,1,1),(102,2,2,2,2,2),(103,3,3,3,3,3)).toDF("id", "a", "b", "c", "d", "e")
df.map(row => (row.getString(0), row.getInt(1)+row.getInt(2)+row.getInt(3)))
.toDF("id", "a+b+c")
或者您可以使用 udf
,像这样:
import org.apache.spark.sql.functions._
import spark.implicits._
val addCols = udf((a: Int, b:Int, c: Int) => a+b+c)
df.select('id, addCols('a, 'b, 'c) as "a+b+c")
或者采纳 Shaido 的建议 :)
如果您正在使用 java
,您可以执行以下操作
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
static SparkConf conf = new SparkConf().setMaster("local").setAppName("simple");
static SparkContext sc = new SparkContext(conf);
static SQLContext sqlContext = new SQLContext(sc);
public static void main(String[] args) {
Dataset<Row> df = sqlContext.read()
.format("com.databricks.spark.csv")
.option("delimiter", " ")
.option("header", true)
.option("inferSchema", true)
.load("path to the input text file");
sqlContext.udf().register("sums", (Integer a, Integer b, Integer c) -> a+b+c, DataTypes.IntegerType);
df.registerTempTable("temp");
sqlContext.sql("SELECT id, sums(a, b, c) AS `(a+b+c)` FROM temp").show(false);
}
你的输出应该是
+---+-------+
|id |(a+b+c)|
+---+-------+
|101|3 |
|102|6 |
|103|9 |
+---+-------+
如果您更喜欢不使用 sql 查询并使用 api,那么您可以按以下方式进行操作
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;
UserDefinedFunction mode = udf((Integer a, Integer b, Integer c) -> a+b+c, DataTypes.IntegerType);
df.select(col("id"), mode.apply(col("a"), col("b"), col("c")).as("(a+b+c)")).show(false);
这在 Java 对我有用:
final var allDataFamilyDf = allDataDf.withColumn("FamilySize",
functions.col("SibSp").plus(functions.col("Parch")));
更简洁的Java方法(如@shaido-reinstate-monica所述):
String[] columnNames = {"a","b","c"}; // columnNames is the list of column names to be added together
Buffer<Column> sums = JavaConversions.asScalaBuffer(ImmutableList.of(columnNames).stream().map(name -> col(name)).collect(Collectors.toList()));
String expression = sums.mkString("+");
df.selectExpr("id", expression); // where df is the dataset with columns "id", "a", "b", and "c"