Pyspark - 如果数据类型为 BIGINT/DOUBLE/Integer,则用“0”填充空字符串
Pyspark- Fill an empty strings with a '0' if Data type is BIGINT/DOUBLE/Integer
如果使用 pyspark
的数据框中的列数据类型为 BIGINT/DOUBLE/Integer,我将尝试用“0”填充空字符串
data = [("James","","Smith","36","M",3000,"1.2"),
("Michael","Rose"," ","40","M",4000,"2.0"),
("Robert","","Williams","42","M",4000,"5.0"),
("Maria","Anne"," ","39","F", ," "),
("Jen","Mary","Brown"," ","F",-1,"")
]
schema = StructType([
StructField("firstname",StringType(),True),
StructField("middlename",StringType(),True),
StructField("lastname",StringType(),True),
StructField("age", StringType(), True),
StructField("gender", StringType(), True),
StructField("salary", IntegerType(), True),
StructField("amount", DoubleType(), True)
])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
我正在尝试这样。
df.select( *[ F.when(F.dtype 在 ('integertype','doubletype') 和 F.col(列).ishaving( " "),'0').otherwise(F.col(column)).alias(column) for column in df.columns]).show()
预期输出:
+---------+----------+--------+---+------+------+------+
|firstname|middlename|lastname|age|gender|salary|amount|
+---------+----------+--------+---+------+------+------+
| James| | Smith| 36| M| 3000| 1.2|
| Michael| Rose| | 40| M| 4000| 2.0|
| Robert| |Williams| 42| M| 4000| 5.0|
| Maria| Anne| | 39| F| 0| 0|
| Jen| Mary| Brown| | F| -1| 0|
+---------+----------+--------+---+------+------+------+
你可以利用 reduce 来完成这个,它使代码更清晰,更容易理解
另外创建一个to_fill
列表来根据您的条件匹配列,可以根据您的场景进一步修改。
数据准备
data = [("James","","Smith","36","M",3000,1.2),
("Michael","Rose"," ","40","M",4000,2.0),
("Robert","","Williams","42","M",4000,5.0),
("Maria","Anne"," ","39","F",None,None),
("Jen","Mary","Brown"," ","F",-1,None)
]
schema = StructType([
StructField("firstname",StringType(),True),
StructField("middlename",StringType(),True),
StructField("lastname",StringType(),True),
StructField("age", StringType(), True),
StructField("gender", StringType(), True),
StructField("salary", IntegerType(), True),
StructField("amount", DoubleType(), True)
])
sparkDF = sql.createDataFrame(data=data,schema=schema)
sparkDF.show()
+---------+----------+--------+---+------+------+------+
|firstname|middlename|lastname|age|gender|salary|amount|
+---------+----------+--------+---+------+------+------+
| James| | Smith| 36| M| 3000| 1.2|
| Michael| Rose| | 40| M| 4000| 2.0|
| Robert| |Williams| 42| M| 4000| 5.0|
| Maria| Anne| | 39| F| null| null|
| Jen| Mary| Brown| | F| -1| null|
+---------+----------+--------+---+------+------+------+
减少
to_fill = [ c for c,d in sparkDF.dtypes if d in ['int','bigint','double']]
# to_fill --> ['salary','amount']
sparkDF = reduce(
lambda df, x: df.withColumn(x, F.when(F.col(x).isNull(),0).otherwise(F.col(x))),
to_fill,
sparkDF,
)
sparkDF.show()
+---------+----------+--------+---+------+------+------+
|firstname|middlename|lastname|age|gender|salary|amount|
+---------+----------+--------+---+------+------+------+
| James| | Smith| 36| M| 3000| 1.2|
| Michael| Rose| | 40| M| 4000| 2.0|
| Robert| |Williams| 42| M| 4000| 5.0|
| Maria| Anne| | 39| F| 0| 0.0|
| Jen| Mary| Brown| | F| -1| 0.0|
+---------+----------+--------+---+------+------+------+
你可以试试这个:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.master("local").appName("test").getOrCreate()
data = [("James", "", "Smith", "36", "", 3000, 1.2),
("Michael", "Rose", "", "40", "M", 4000, 2.0),
("Robert", "", "Williams", "42", "M", 4000, 5.0),
("Maria", "Anne", " ", "39", "F", None, None),
("Jen", "Mary", "Brown", " ", "F", -1, None)
]
schema = StructType([StructField("firstname", StringType(), True),StructField("middlename", StringType(), True),StructField("lastname", StringType(), True),StructField("age", StringType(), True),StructField("gender", StringType(), True),StructField("salary", IntegerType(), True),StructField("amount", DoubleType(), True)])
dfa = spark.createDataFrame(data=data, schema=schema)
dfa.show()
def removenull(dfa):
dfa = dfa.select([trim(col(c)).alias(c) for c in dfa.columns])
for i in dfa.columns:
dfa = dfa.withColumn(i , when(col(i)=="", None ).otherwise(col(i)))
return dfa
removenull(dfa).show()
输出:
+---------+----------+--------+----+------+------+------+
|firstname|middlename|lastname| age|gender|salary|amount|
+---------+----------+--------+----+------+------+------+
| James| null| Smith| 36| null| 3000| 1.2|
| Michael| Rose| null| 40| M| 4000| 2.0|
| Robert| null|Williams| 42| M| 4000| 5.0|
| Maria| Anne| null| 39| F| null| null|
| Jen| Mary| Brown|null| F| -1| null|
+---------+----------+--------+----+------+------+------+
如果使用 pyspark
的数据框中的列数据类型为 BIGINT/DOUBLE/Integer,我将尝试用“0”填充空字符串data = [("James","","Smith","36","M",3000,"1.2"),
("Michael","Rose"," ","40","M",4000,"2.0"),
("Robert","","Williams","42","M",4000,"5.0"),
("Maria","Anne"," ","39","F", ," "),
("Jen","Mary","Brown"," ","F",-1,"")
]
schema = StructType([
StructField("firstname",StringType(),True),
StructField("middlename",StringType(),True),
StructField("lastname",StringType(),True),
StructField("age", StringType(), True),
StructField("gender", StringType(), True),
StructField("salary", IntegerType(), True),
StructField("amount", DoubleType(), True)
])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
我正在尝试这样。
df.select( *[ F.when(F.dtype 在 ('integertype','doubletype') 和 F.col(列).ishaving( " "),'0').otherwise(F.col(column)).alias(column) for column in df.columns]).show()
预期输出:
+---------+----------+--------+---+------+------+------+
|firstname|middlename|lastname|age|gender|salary|amount|
+---------+----------+--------+---+------+------+------+
| James| | Smith| 36| M| 3000| 1.2|
| Michael| Rose| | 40| M| 4000| 2.0|
| Robert| |Williams| 42| M| 4000| 5.0|
| Maria| Anne| | 39| F| 0| 0|
| Jen| Mary| Brown| | F| -1| 0|
+---------+----------+--------+---+------+------+------+
你可以利用 reduce 来完成这个,它使代码更清晰,更容易理解
另外创建一个to_fill
列表来根据您的条件匹配列,可以根据您的场景进一步修改。
数据准备
data = [("James","","Smith","36","M",3000,1.2),
("Michael","Rose"," ","40","M",4000,2.0),
("Robert","","Williams","42","M",4000,5.0),
("Maria","Anne"," ","39","F",None,None),
("Jen","Mary","Brown"," ","F",-1,None)
]
schema = StructType([
StructField("firstname",StringType(),True),
StructField("middlename",StringType(),True),
StructField("lastname",StringType(),True),
StructField("age", StringType(), True),
StructField("gender", StringType(), True),
StructField("salary", IntegerType(), True),
StructField("amount", DoubleType(), True)
])
sparkDF = sql.createDataFrame(data=data,schema=schema)
sparkDF.show()
+---------+----------+--------+---+------+------+------+
|firstname|middlename|lastname|age|gender|salary|amount|
+---------+----------+--------+---+------+------+------+
| James| | Smith| 36| M| 3000| 1.2|
| Michael| Rose| | 40| M| 4000| 2.0|
| Robert| |Williams| 42| M| 4000| 5.0|
| Maria| Anne| | 39| F| null| null|
| Jen| Mary| Brown| | F| -1| null|
+---------+----------+--------+---+------+------+------+
减少
to_fill = [ c for c,d in sparkDF.dtypes if d in ['int','bigint','double']]
# to_fill --> ['salary','amount']
sparkDF = reduce(
lambda df, x: df.withColumn(x, F.when(F.col(x).isNull(),0).otherwise(F.col(x))),
to_fill,
sparkDF,
)
sparkDF.show()
+---------+----------+--------+---+------+------+------+
|firstname|middlename|lastname|age|gender|salary|amount|
+---------+----------+--------+---+------+------+------+
| James| | Smith| 36| M| 3000| 1.2|
| Michael| Rose| | 40| M| 4000| 2.0|
| Robert| |Williams| 42| M| 4000| 5.0|
| Maria| Anne| | 39| F| 0| 0.0|
| Jen| Mary| Brown| | F| -1| 0.0|
+---------+----------+--------+---+------+------+------+
你可以试试这个:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.master("local").appName("test").getOrCreate()
data = [("James", "", "Smith", "36", "", 3000, 1.2),
("Michael", "Rose", "", "40", "M", 4000, 2.0),
("Robert", "", "Williams", "42", "M", 4000, 5.0),
("Maria", "Anne", " ", "39", "F", None, None),
("Jen", "Mary", "Brown", " ", "F", -1, None)
]
schema = StructType([StructField("firstname", StringType(), True),StructField("middlename", StringType(), True),StructField("lastname", StringType(), True),StructField("age", StringType(), True),StructField("gender", StringType(), True),StructField("salary", IntegerType(), True),StructField("amount", DoubleType(), True)])
dfa = spark.createDataFrame(data=data, schema=schema)
dfa.show()
def removenull(dfa):
dfa = dfa.select([trim(col(c)).alias(c) for c in dfa.columns])
for i in dfa.columns:
dfa = dfa.withColumn(i , when(col(i)=="", None ).otherwise(col(i)))
return dfa
removenull(dfa).show()
输出:
+---------+----------+--------+----+------+------+------+
|firstname|middlename|lastname| age|gender|salary|amount|
+---------+----------+--------+----+------+------+------+
| James| null| Smith| 36| null| 3000| 1.2|
| Michael| Rose| null| 40| M| 4000| 2.0|
| Robert| null|Williams| 42| M| 4000| 5.0|
| Maria| Anne| null| 39| F| null| null|
| Jen| Mary| Brown|null| F| -1| null|
+---------+----------+--------+----+------+------+------+