Pyspark 在变量赋值中
Pypsark isin variable assignment
我有一个包含 50k 条记录 (dfa) 的 PySpark 数据框和另一个包含 40k 条记录 (dfb) 的数据框。在 dfa 中,我想创建一个新列,用 'present' else 'not_present'.
标记 dfb 中的 40k 条记录
我知道 pandas 有这方面的语法,但我找不到 PySpark 语法。
输入:
dfa
col1
col2
xyz
row
abc
row
def
row
df2
col1
col2
xyz
row
abc
row
预期输出:
df3
col1
col2
col3
xyz
row
present
abc
row
present
def
row
not_pre
df3 = df1.join(df2.select('col1', F.lit('present').alias('col3')).distinct(), 'col1', 'left')
df3 = df3.fillna('not_pre', 'col3')
完整示例:
from pyspark.sql import functions as F
df1 = spark.createDataFrame(
[('xyz', 'row'),
('abc', 'row'),
('def', 'row')],
['col1', 'col2']
)
df2 = spark.createDataFrame(
[('xyz', 'row'),
('abc', 'row')],
['col1', 'col2']
)
df3 = df1.join(df2.select('col1', F.lit('present').alias('col3')).distinct(), 'col1', 'left')
df3 = df3.fillna('not_pre', 'col3')
df3.show()
# +----+----+-------+
# |col1|col2| col3|
# +----+----+-------+
# | xyz| row|present|
# | abc| row|present|
# | def| row|not_pre|
# +----+----+-------+
如果您想使用两列的组合进行检查,您也可以尝试这样做。
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
simpleData = [("xyz","row"), \
("abc","row"), \
("def","row")
]
columns= ["col1","col2"]
df1 = spark.createDataFrame(data = simpleData, schema = columns)
simpleData2 = [("xyz","row"), \
("abc","row")
]
columns2= ["col1","col2"]
df2 = spark.createDataFrame(data = simpleData2, schema = columns2)
joined = (df1.alias("df1").join(df2.alias("df2"),(col("df1.col1") == col("df2.col1")) & (col("df1.col2") == col("df2.col2")),"left"))
df = joined.select(col("df1.*"),col("df2.col1").isNotNull().cast("integer").alias("flag")).withColumn("col3",when(col('flag')==1,lit("present")).otherwise("not_present")).drop('flag')
df.show()
我有一个包含 50k 条记录 (dfa) 的 PySpark 数据框和另一个包含 40k 条记录 (dfb) 的数据框。在 dfa 中,我想创建一个新列,用 'present' else 'not_present'.
标记 dfb 中的 40k 条记录我知道 pandas 有这方面的语法,但我找不到 PySpark 语法。
输入: dfa
col1 | col2 |
---|---|
xyz | row |
abc | row |
def | row |
df2
col1 | col2 |
---|---|
xyz | row |
abc | row |
预期输出:
df3
col1 | col2 | col3 |
---|---|---|
xyz | row | present |
abc | row | present |
def | row | not_pre |
df3 = df1.join(df2.select('col1', F.lit('present').alias('col3')).distinct(), 'col1', 'left')
df3 = df3.fillna('not_pre', 'col3')
完整示例:
from pyspark.sql import functions as F
df1 = spark.createDataFrame(
[('xyz', 'row'),
('abc', 'row'),
('def', 'row')],
['col1', 'col2']
)
df2 = spark.createDataFrame(
[('xyz', 'row'),
('abc', 'row')],
['col1', 'col2']
)
df3 = df1.join(df2.select('col1', F.lit('present').alias('col3')).distinct(), 'col1', 'left')
df3 = df3.fillna('not_pre', 'col3')
df3.show()
# +----+----+-------+
# |col1|col2| col3|
# +----+----+-------+
# | xyz| row|present|
# | abc| row|present|
# | def| row|not_pre|
# +----+----+-------+
如果您想使用两列的组合进行检查,您也可以尝试这样做。
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
simpleData = [("xyz","row"), \
("abc","row"), \
("def","row")
]
columns= ["col1","col2"]
df1 = spark.createDataFrame(data = simpleData, schema = columns)
simpleData2 = [("xyz","row"), \
("abc","row")
]
columns2= ["col1","col2"]
df2 = spark.createDataFrame(data = simpleData2, schema = columns2)
joined = (df1.alias("df1").join(df2.alias("df2"),(col("df1.col1") == col("df2.col1")) & (col("df1.col2") == col("df2.col2")),"left"))
df = joined.select(col("df1.*"),col("df2.col1").isNotNull().cast("integer").alias("flag")).withColumn("col3",when(col('flag')==1,lit("present")).otherwise("not_present")).drop('flag')
df.show()