Pyspark - 从具有最小值和最大值范围的数组中获取值
Pyspark - getting values from an array that has a range of min and max values
我正在尝试在 PySpark 中编写一个查询,它将从数组中获取正确的值。
例如,我有一个名为 df 的数据框,其中包含三列,'companyId'、'companySize' 和 'weightingRange'。 'companySize' 列只是员工人数。 'weightingRange' 列是一个包含以下内容的数组
[ {"minimum":0, "maximum":100, "weight":123},
{"minimum":101, "maximum":200, "weight":456},
{"minimum":201, "maximum":500, "weight":789}
]
因此数据框看起来像这样(weightingRange 如上所述,在下面的示例中被截断以便格式更清晰)
+-----------+-------------+------------------------+--+
| companyId | companySize | weightingRange | |
+-----------+-------------+------------------------+--+
| ABC1 | 150 | [{"maximum":100, etc}] | |
| ABC2 | 50 | [{"maximum":100, etc}] | |
+-----------+-------------+------------------------+--+
因此,对于公司规模 = 150 的条目,我需要 return 将权重 456 放入名为 'companyWeighting'
的列中
所以它应该显示以下内容
+-----------+-------------+------------------------+------------------+
| companyId | companySize | weightingRange | companyWeighting |
+-----------+-------------+------------------------+------------------+
| ABC1 | 150 | [{"maximum":100, etc}] | 456 |
| ABC2 | 50 | [{"maximum":100, etc}] | 123 |
+-----------+-------------+------------------------+------------------+
我看过
df.withColumn("tmp",explode(col("weightingRange"))).select("tmp.*")
然后加入,但尝试应用它将笛卡尔数据。
感谢建议!
你可以这样处理,
首先创建一个示例数据框,
import pyspark.sql.functions as F
df = spark.createDataFrame([
('ABC1', 150, [ {"min":0, "max":100, "weight":123},
{"min":101, "max":200, "weight":456},
{"min":201, "max":500, "weight":789}]),
('ABC2', 50, [ {"min":0, "max":100, "weight":123},
{"min":101, "max":200, "weight":456},
{"min":201, "max":500, "weight":789}])],
['companyId' , 'companySize', 'weightingRange'])
然后,创建一个 udf
函数并将其应用于每一行以获取新列,
def get_weight(wt,wt_rnge):
for _d in wt_rnge:
if _d['min'] <= wt <= _d['max']:
return _d['weight']
get_weight_udf = F.udf(lambda x,y: get_weight(x,y))
df = df.withColumn('companyWeighting', get_weight_udf(F.col('companySize'), F.col('weightingRange')))
df.show()
您得到的输出为,
+---------+-----------+--------------------+----------------+
|companyId|companySize| weightingRange|companyWeighting|
+---------+-----------+--------------------+----------------+
| ABC1| 150|[Map(weight -> 12...| 456|
| ABC2| 50|[Map(weight -> 12...| 123|
+---------+-----------+--------------------+----------------+
我正在尝试在 PySpark 中编写一个查询,它将从数组中获取正确的值。
例如,我有一个名为 df 的数据框,其中包含三列,'companyId'、'companySize' 和 'weightingRange'。 'companySize' 列只是员工人数。 'weightingRange' 列是一个包含以下内容的数组
[ {"minimum":0, "maximum":100, "weight":123},
{"minimum":101, "maximum":200, "weight":456},
{"minimum":201, "maximum":500, "weight":789}
]
因此数据框看起来像这样(weightingRange 如上所述,在下面的示例中被截断以便格式更清晰)
+-----------+-------------+------------------------+--+
| companyId | companySize | weightingRange | |
+-----------+-------------+------------------------+--+
| ABC1 | 150 | [{"maximum":100, etc}] | |
| ABC2 | 50 | [{"maximum":100, etc}] | |
+-----------+-------------+------------------------+--+
因此,对于公司规模 = 150 的条目,我需要 return 将权重 456 放入名为 'companyWeighting'
的列中所以它应该显示以下内容
+-----------+-------------+------------------------+------------------+
| companyId | companySize | weightingRange | companyWeighting |
+-----------+-------------+------------------------+------------------+
| ABC1 | 150 | [{"maximum":100, etc}] | 456 |
| ABC2 | 50 | [{"maximum":100, etc}] | 123 |
+-----------+-------------+------------------------+------------------+
我看过
df.withColumn("tmp",explode(col("weightingRange"))).select("tmp.*")
然后加入,但尝试应用它将笛卡尔数据。
感谢建议!
你可以这样处理,
首先创建一个示例数据框,
import pyspark.sql.functions as F
df = spark.createDataFrame([
('ABC1', 150, [ {"min":0, "max":100, "weight":123},
{"min":101, "max":200, "weight":456},
{"min":201, "max":500, "weight":789}]),
('ABC2', 50, [ {"min":0, "max":100, "weight":123},
{"min":101, "max":200, "weight":456},
{"min":201, "max":500, "weight":789}])],
['companyId' , 'companySize', 'weightingRange'])
然后,创建一个 udf
函数并将其应用于每一行以获取新列,
def get_weight(wt,wt_rnge):
for _d in wt_rnge:
if _d['min'] <= wt <= _d['max']:
return _d['weight']
get_weight_udf = F.udf(lambda x,y: get_weight(x,y))
df = df.withColumn('companyWeighting', get_weight_udf(F.col('companySize'), F.col('weightingRange')))
df.show()
您得到的输出为,
+---------+-----------+--------------------+----------------+
|companyId|companySize| weightingRange|companyWeighting|
+---------+-----------+--------------------+----------------+
| ABC1| 150|[Map(weight -> 12...| 456|
| ABC2| 50|[Map(weight -> 12...| 123|
+---------+-----------+--------------------+----------------+