替换嵌套结构火花数据框中的列值
Replacing column values in nested structure spark dataframe
我在 Databricks 中获得了 VCF 数据格式。我想根据字典重命名主题。
我得到了字典,在那里我得到了关键的新名字。然后我得到了获取新值的函数,到目前为止 return 值有效:
import pyspark.sql.functions as F
keys= {'old_name': 'new_name'}
mapping_func = lambda x: keys.get(x)
df.withColumn('foo', udf(mapping_func, F.StringType())('geno.sampleId'))
正在制作新专栏 foo
。我需要在嵌套结构中分配值:(最后一行)
StructField(contigName,StringType,true)
StructField(start,LongType,true)
StructField(end,LongType,true)
StructField(names,ArrayType(StringType,true),true)
StructField(referenceAllele,StringType,true)
StructField(alternateAlleles,ArrayType(StringType,true),true)
StructField(qual,DoubleType,true)
StructField(filters,ArrayType(StringType,true),true)
StructField(splitFromMultiAllelic,BooleanType,true)
StructField(geno,StructType(List(StructField(sampleId,StringType,true),StructField(CN,IntegerType,true),StructField(phased,BooleanType,true),StructField(calls,ArrayType(IntegerType,true),true))),true)
像这样:
df = df.withColumn(F.col('geno').sampleId, udf(mapping_func, F.StringType())('geno.sampleId'))
但这就是说
Column is not iterable
我如何将值分配到适当的位置?
Scala 2.12 和 spark 3.01
据我了解,这里不需要使用UDF。您可以简单地使用映射列表达式来代替:
from itertools import chain
import pyspark.sql.functions as F
keys_map = F.create_map(*[F.lit(x)for x in chain(*keys.items())])
现在,要更新结构中的嵌套字段,您需要重新创建整个结构列(对于 Spark 3.1+,您将使用 ):
df = df.withColumn(
"geno",
F.struct(
keys_map[F.col("geno.sampleId")].alias("sampleId"), # replaces sampleId value according to your keys mapping
F.col("geno.CN").alias("CN"),
F.col("geno.phased").alias("phased"),
F.col("geno.calls").alias("calls")
)
)
我在 Databricks 中获得了 VCF 数据格式。我想根据字典重命名主题。
我得到了字典,在那里我得到了关键的新名字。然后我得到了获取新值的函数,到目前为止 return 值有效:
import pyspark.sql.functions as F
keys= {'old_name': 'new_name'}
mapping_func = lambda x: keys.get(x)
df.withColumn('foo', udf(mapping_func, F.StringType())('geno.sampleId'))
正在制作新专栏 foo
。我需要在嵌套结构中分配值:(最后一行)
StructField(contigName,StringType,true)
StructField(start,LongType,true)
StructField(end,LongType,true)
StructField(names,ArrayType(StringType,true),true)
StructField(referenceAllele,StringType,true)
StructField(alternateAlleles,ArrayType(StringType,true),true)
StructField(qual,DoubleType,true)
StructField(filters,ArrayType(StringType,true),true)
StructField(splitFromMultiAllelic,BooleanType,true)
StructField(geno,StructType(List(StructField(sampleId,StringType,true),StructField(CN,IntegerType,true),StructField(phased,BooleanType,true),StructField(calls,ArrayType(IntegerType,true),true))),true)
像这样:
df = df.withColumn(F.col('geno').sampleId, udf(mapping_func, F.StringType())('geno.sampleId'))
但这就是说
Column is not iterable
我如何将值分配到适当的位置?
Scala 2.12 和 spark 3.01
据我了解,这里不需要使用UDF。您可以简单地使用映射列表达式来代替:
from itertools import chain
import pyspark.sql.functions as F
keys_map = F.create_map(*[F.lit(x)for x in chain(*keys.items())])
现在,要更新结构中的嵌套字段,您需要重新创建整个结构列(对于 Spark 3.1+,您将使用
df = df.withColumn(
"geno",
F.struct(
keys_map[F.col("geno.sampleId")].alias("sampleId"), # replaces sampleId value according to your keys mapping
F.col("geno.CN").alias("CN"),
F.col("geno.phased").alias("phased"),
F.col("geno.calls").alias("calls")
)
)