替换嵌套结构火花数据框中的列值

Replacing column values in nested structure spark dataframe

我在 Databricks 中获得了 VCF 数据格式。我想根据字典重命名主题。

我得到了字典,在那里我得到了关键的新名字。然后我得到了获取新值的函数,到目前为止 return 值有效:

import pyspark.sql.functions as F

keys= {'old_name': 'new_name'}
mapping_func = lambda x: keys.get(x) 
df.withColumn('foo', udf(mapping_func, F.StringType())('geno.sampleId'))

正在制作新专栏 foo。我需要在嵌套结构中分配值:(最后一行)

StructField(contigName,StringType,true)
StructField(start,LongType,true)
StructField(end,LongType,true)
StructField(names,ArrayType(StringType,true),true)
StructField(referenceAllele,StringType,true)
StructField(alternateAlleles,ArrayType(StringType,true),true)
StructField(qual,DoubleType,true)
StructField(filters,ArrayType(StringType,true),true)
StructField(splitFromMultiAllelic,BooleanType,true)
StructField(geno,StructType(List(StructField(sampleId,StringType,true),StructField(CN,IntegerType,true),StructField(phased,BooleanType,true),StructField(calls,ArrayType(IntegerType,true),true))),true)

像这样:

 df =  df.withColumn(F.col('geno').sampleId, udf(mapping_func, F.StringType())('geno.sampleId'))

但这就是说

Column is not iterable

我如何将值分配到适当的位置?

Scala 2.12 和 spark 3.01

据我了解,这里不需要使用UDF。您可以简单地使用映射列表达式来代替:

from itertools import chain
import pyspark.sql.functions as F

keys_map = F.create_map(*[F.lit(x)for x in chain(*keys.items())])

现在,要更新结构中的嵌套字段,您需要重新创建整个结构列(对于 Spark 3.1+,您将使用 ):

df = df.withColumn(
    "geno",
    F.struct(
        keys_map[F.col("geno.sampleId")].alias("sampleId"), # replaces sampleId value according to your keys mapping
        F.col("geno.CN").alias("CN"),
        F.col("geno.phased").alias("phased"),
        F.col("geno.calls").alias("calls")
    )
)