Pyspark - 如果数据类型为 BIGINT/DOUBLE/Integer,则用“0”填充空字符串

Pyspark- Fill an empty strings with a '0' if Data type is BIGINT/DOUBLE/Integer

如果使用 pyspark

的数据框中的列数据类型为 BIGINT/DOUBLE/Integer,我将尝试用“0”填充空字符串
data = [("James","","Smith","36","M",3000,"1.2"),
    ("Michael","Rose"," ","40","M",4000,"2.0"),
    ("Robert","","Williams","42","M",4000,"5.0"),
    ("Maria","Anne"," ","39","F", ," "),
    ("Jen","Mary","Brown"," ","F",-1,"")
  ]

schema = StructType([ 
    StructField("firstname",StringType(),True), 
    StructField("middlename",StringType(),True), 
    StructField("lastname",StringType(),True), 
    StructField("age", StringType(), True), 
    StructField("gender", StringType(), True), 
    StructField("salary", IntegerType(), True),
    StructField("amount", DoubleType(), True) 
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()

我正在尝试这样。

df.select( *[ F.when(F.dtype 在 ('integertype','doubletype') 和 F.col(列).ishaving( " "),'0').otherwise(F.col(column)).alias(column) for column in df.columns]).show()

预期输出:

+---------+----------+--------+---+------+------+------+                        
|firstname|middlename|lastname|age|gender|salary|amount|
+---------+----------+--------+---+------+------+------+
|    James|          |   Smith| 36|     M|  3000|   1.2|
|  Michael|      Rose|        | 40|     M|  4000|   2.0|
|   Robert|          |Williams| 42|     M|  4000|   5.0|
|    Maria|      Anne|        | 39|     F|     0|     0|
|      Jen|      Mary|   Brown|   |     F|    -1|     0|
+---------+----------+--------+---+------+------+------+

你可以利用 reduce 来完成这个,它使代码更清晰,更容易理解

另外创建一个to_fill列表来根据您的条件匹配列,可以根据您的场景进一步修改。

数据准备

data = [("James","","Smith","36","M",3000,1.2),
    ("Michael","Rose"," ","40","M",4000,2.0),
    ("Robert","","Williams","42","M",4000,5.0),
    ("Maria","Anne"," ","39","F",None,None),
    ("Jen","Mary","Brown"," ","F",-1,None)
  ]

schema = StructType([ 
    StructField("firstname",StringType(),True), 
    StructField("middlename",StringType(),True), 
    StructField("lastname",StringType(),True), 
    StructField("age", StringType(), True), 
    StructField("gender", StringType(), True), 
    StructField("salary", IntegerType(), True),
    StructField("amount", DoubleType(), True) 
  ])
 
sparkDF = sql.createDataFrame(data=data,schema=schema)
sparkDF.show()

+---------+----------+--------+---+------+------+------+
|firstname|middlename|lastname|age|gender|salary|amount|
+---------+----------+--------+---+------+------+------+
|    James|          |   Smith| 36|     M|  3000|   1.2|
|  Michael|      Rose|        | 40|     M|  4000|   2.0|
|   Robert|          |Williams| 42|     M|  4000|   5.0|
|    Maria|      Anne|        | 39|     F|  null|  null|
|      Jen|      Mary|   Brown|   |     F|    -1|  null|
+---------+----------+--------+---+------+------+------+

减少

to_fill =  [ c for c,d in sparkDF.dtypes if d in ['int','bigint','double']] 

# to_fill --> ['salary','amount']

sparkDF = reduce(
    lambda df, x: df.withColumn(x, F.when(F.col(x).isNull(),0).otherwise(F.col(x))),
    to_fill,
    sparkDF,
)

sparkDF.show()

+---------+----------+--------+---+------+------+------+
|firstname|middlename|lastname|age|gender|salary|amount|
+---------+----------+--------+---+------+------+------+
|    James|          |   Smith| 36|     M|  3000|   1.2|
|  Michael|      Rose|        | 40|     M|  4000|   2.0|
|   Robert|          |Williams| 42|     M|  4000|   5.0|
|    Maria|      Anne|        | 39|     F|     0|   0.0|
|      Jen|      Mary|   Brown|   |     F|    -1|   0.0|
+---------+----------+--------+---+------+------+------+

你可以试试这个:

from pyspark.sql import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    spark = SparkSession.builder.master("local").appName("test").getOrCreate()
    data = [("James", "", "Smith", "36", "", 3000, 1.2),
            ("Michael", "Rose", "", "40", "M", 4000, 2.0),
            ("Robert", "", "Williams", "42", "M", 4000, 5.0),
            ("Maria", "Anne", " ", "39", "F", None, None),
            ("Jen", "Mary", "Brown", " ", "F", -1, None)
            ]
    
    schema = StructType([StructField("firstname", StringType(), True),StructField("middlename", StringType(), True),StructField("lastname", StringType(), True),StructField("age", StringType(), True),StructField("gender", StringType(), True),StructField("salary", IntegerType(), True),StructField("amount", DoubleType(), True)])
    
    dfa = spark.createDataFrame(data=data, schema=schema)
    dfa.show()
    
    
    def removenull(dfa):
            dfa = dfa.select([trim(col(c)).alias(c) for c in dfa.columns])
            for i in dfa.columns:
                    dfa = dfa.withColumn(i , when(col(i)=="", None ).otherwise(col(i)))
            return dfa
    
    removenull(dfa).show()

输出:

+---------+----------+--------+----+------+------+------+
|firstname|middlename|lastname| age|gender|salary|amount|
+---------+----------+--------+----+------+------+------+
|    James|      null|   Smith|  36|  null|  3000|   1.2|
|  Michael|      Rose|    null|  40|     M|  4000|   2.0|
|   Robert|      null|Williams|  42|     M|  4000|   5.0|
|    Maria|      Anne|    null|  39|     F|  null|  null|
|      Jen|      Mary|   Brown|null|     F|    -1|  null|
+---------+----------+--------+----+------+------+------+