根据火花结构化流中的多个条件从另一列更新列值
Update column value from another columns based on multiple conditions in spark structured streaming
我想根据多个条件使用另外两列更新一列中的值。例如 - 流是这样的:
+---+---+----+---+
| A | B | C | D |
+---+---+----+---+
| a | T | 10 | 0 |
| a | T | 100| 0 |
| a | L | 0 | 0 |
| a | L | 1 | 0 |
+---+---+----+---+
我有多个条件,比如 -
(B = "T" && C > 20 ) OR (B = "L" && C = 0)
值 "T"
、20
、"L"
和 0
是动态的。 AND/OR
运算符也在 运行 时提供。我想在条件成立时使 D = 1
,否则它应该保持 D = 0
。条件数也是动态的。
我尝试将它与 spark-sql
中的 UPDATE
命令一起使用,即 UPDATE df SET D = '1' WHERE CONDITIONS
。但是它说尚不支持更新。结果数据框应该是 -
+---+---+----+---+
| A | B | C | D |
+---+---+----+---+
| a | T | 10 | 0 |
| a | T | 100| 1 |
| a | L | 0 | 1 |
| a | L | 1 | 0 |
+---+---+----+---+
有什么办法可以做到这一点吗?
希望您正在使用 Python。 post Scala 也一样!使用 udf
PYTHON
>>> df.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 0|
| a| L| 0| 0|
| a| L| 1| 0|
+---+---+---+---+
>>> def get_column(B, C):
... return int((B == "T" and C > 20) or (B == "L" and C == 0))
...
>>> fun = udf(get_column)
>>> res = df.withColumn("D", fun(df['B'], df['C']))>>> res.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 1|
| a| L| 0| 1|
| a| L| 1| 0|
+---+---+---+---+
SCALA
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> df.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 0|
| a| L| 0| 0|
| a| L| 1| 0|
+---+---+---+---+
scala> def get_column(B : String, C : Int) : Int = {
| if((B == "T" && C > 20) || (B == "L" && C == 0))
| 1
| else
| 0
| }
get_column: (B: String, C: Int)Int
scala> val fun = udf(get_column _)
fun: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(StringType, IntegerType)
))
scala> val res = df.withColumn("D", fun(df("B"), df("C")))
res: org.apache.spark.sql.DataFrame = [A: string, B: string ... 2 more fields]
scala> res.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 1|
| a| L| 0| 1|
| a| L| 1| 0|
+---+---+---+---+
您也可以像这样使用 case
when
和 otherwise
:
PYTHON
>>> df.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 0|
| a| L| 0| 0|
| a| L| 1| 0|
+---+---+---+---+
>>> new_column = when(
(col("B") == "T") & (col("C") > 20), 1
).when((col("B") == "L") & (col("C") == 0), 1).otherwise(0)
>>> res = df.withColumn("D", new_column)
>>> res.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 1|
| a| L| 0| 1|
| a| L| 1| 0|
+---+---+---+---+
SCALA
scala> df.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 0|
| a| L| 0| 0|
| a| L| 1| 0|
+---+---+---+---+
scala> val new_column = when(
| col("B") === "T" && col("C") > 20, 1
| ).when(col("B") === "L" && col("C") === 0, 1 ).otherwise(0)
new_column: org.apache.spark.sql.Column = CASE WHEN ((B = T) AND (C > 20)) THEN 1 WHEN ((B = L) AND (C = 0)) THEN 1 ELSE 0 END
scala> df.withColumn("D", new_column).show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 1|
| a| L| 0| 1|
| a| L| 1| 0|
+---+---+---+---+
我想根据多个条件使用另外两列更新一列中的值。例如 - 流是这样的:
+---+---+----+---+
| A | B | C | D |
+---+---+----+---+
| a | T | 10 | 0 |
| a | T | 100| 0 |
| a | L | 0 | 0 |
| a | L | 1 | 0 |
+---+---+----+---+
我有多个条件,比如 -
(B = "T" && C > 20 ) OR (B = "L" && C = 0)
值 "T"
、20
、"L"
和 0
是动态的。 AND/OR
运算符也在 运行 时提供。我想在条件成立时使 D = 1
,否则它应该保持 D = 0
。条件数也是动态的。
我尝试将它与 spark-sql
中的 UPDATE
命令一起使用,即 UPDATE df SET D = '1' WHERE CONDITIONS
。但是它说尚不支持更新。结果数据框应该是 -
+---+---+----+---+
| A | B | C | D |
+---+---+----+---+
| a | T | 10 | 0 |
| a | T | 100| 1 |
| a | L | 0 | 1 |
| a | L | 1 | 0 |
+---+---+----+---+
有什么办法可以做到这一点吗?
希望您正在使用 Python。 post Scala 也一样!使用 udf
PYTHON
>>> df.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 0|
| a| L| 0| 0|
| a| L| 1| 0|
+---+---+---+---+
>>> def get_column(B, C):
... return int((B == "T" and C > 20) or (B == "L" and C == 0))
...
>>> fun = udf(get_column)
>>> res = df.withColumn("D", fun(df['B'], df['C']))>>> res.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 1|
| a| L| 0| 1|
| a| L| 1| 0|
+---+---+---+---+
SCALA
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> df.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 0|
| a| L| 0| 0|
| a| L| 1| 0|
+---+---+---+---+
scala> def get_column(B : String, C : Int) : Int = {
| if((B == "T" && C > 20) || (B == "L" && C == 0))
| 1
| else
| 0
| }
get_column: (B: String, C: Int)Int
scala> val fun = udf(get_column _)
fun: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(StringType, IntegerType)
))
scala> val res = df.withColumn("D", fun(df("B"), df("C")))
res: org.apache.spark.sql.DataFrame = [A: string, B: string ... 2 more fields]
scala> res.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 1|
| a| L| 0| 1|
| a| L| 1| 0|
+---+---+---+---+
您也可以像这样使用 case
when
和 otherwise
:
PYTHON
>>> df.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 0|
| a| L| 0| 0|
| a| L| 1| 0|
+---+---+---+---+
>>> new_column = when(
(col("B") == "T") & (col("C") > 20), 1
).when((col("B") == "L") & (col("C") == 0), 1).otherwise(0)
>>> res = df.withColumn("D", new_column)
>>> res.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 1|
| a| L| 0| 1|
| a| L| 1| 0|
+---+---+---+---+
SCALA
scala> df.show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 0|
| a| L| 0| 0|
| a| L| 1| 0|
+---+---+---+---+
scala> val new_column = when(
| col("B") === "T" && col("C") > 20, 1
| ).when(col("B") === "L" && col("C") === 0, 1 ).otherwise(0)
new_column: org.apache.spark.sql.Column = CASE WHEN ((B = T) AND (C > 20)) THEN 1 WHEN ((B = L) AND (C = 0)) THEN 1 ELSE 0 END
scala> df.withColumn("D", new_column).show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| a| T| 10| 0|
| a| T|100| 1|
| a| L| 0| 1|
| a| L| 1| 0|
+---+---+---+---+