根据 pyspark 中的现有列值创建新列
Creating new column based on an existing column value in pyspark
我有一个数据框,其中包含机场名称的现有列,我想创建另一个包含机场名称缩写的列。
例如,我有一个包含以下值的现有列:
SEATTLE TACOMA AIRPORT, WA US
MIAMI INTERNATIONAL AIRPORT, FL US
SAN FRANCISCO INTERNATIONAL AIRPORT, CA US
MIAMI INTERNATIONAL AIRPORT, FL US
MIAMI INTERNATIONAL AIRPORT, FL US
SAN FRANCISCO INTERNATIONAL AIRPORT, CA US
SEATTLE TACOMA AIRPORT, WA US
我想创建一个包含相关缩写的新列,例如 SEA、MIA 和 SFO。我在想我可以使用 for 循环来实现它,但我不太确定如何准确地编写代码。
您可以在数据框中添加新列,这将创建新的数据框
您可以使用 dataframe.withColumn(newcolumnname, case statement to decode name to abbreviations)
这里有 2 个示例方法:
- 使用字典和 UDF
- 使用第二个 DataFrame 加入
from pyspark.sql.functions import col, udf, StringType
s = """\
SEATTLE TACOMA AIRPORT, WA US
MIAMI INTERNATIONAL AIRPORT, FL US
SAN FRANCISCO INTERNATIONAL AIRPORT, CA US
MIAMI INTERNATIONAL AIRPORT, FL US
MIAMI INTERNATIONAL AIRPORT, FL US
SAN FRANCISCO INTERNATIONAL AIRPORT, CA US
SEATTLE TACOMA AIRPORT, WA US"""
abbr = {
"SEATTLE TACOMA AIRPORT": "SEA",
"MIAMI INTERNATIONAL AIRPORT": "MIA",
"SAN FRANCISCO INTERNATIONAL AIRPORT": "SFO",
}
df = spark.read.csv(sc.parallelize(s.splitlines()))
print("=== df ===")
df.show()
# =================================
# 1. using a UDF
# =================================
print("=== using a UDF ===")
udf_airport_to_abbr = udf(lambda airport: abbr[airport], StringType())
df.withColumn("abbr", udf_airport_to_abbr("_c0")).show()
# =================================
# 2. using a join
# =================================
# you may want to create this df in some different way ;)
df_abbrs = spark.read.csv(sc.parallelize(["%s,%s" % x for x in abbr.items()]))
print("=== df_abbrs ===")
df_abbrs.show()
print("=== using a join ===")
df.join(df_abbrs, on="_c0").show()
输出:
=== df ===
+--------------------+------+
| _c0| _c1|
+--------------------+------+
|SEATTLE TACOMA AI...| WA US|
|MIAMI INTERNATION...| FL US|
|SAN FRANCISCO INT...| CA US|
|MIAMI INTERNATION...| FL US|
|MIAMI INTERNATION...| FL US|
|SAN FRANCISCO INT...| CA US|
|SEATTLE TACOMA AI...| WA US|
+--------------------+------+
=== using a UDF ===
+--------------------+------+----+
| _c0| _c1|abbr|
+--------------------+------+----+
|SEATTLE TACOMA AI...| WA US| SEA|
|MIAMI INTERNATION...| FL US| MIA|
|SAN FRANCISCO INT...| CA US| SFO|
|MIAMI INTERNATION...| FL US| MIA|
|MIAMI INTERNATION...| FL US| MIA|
|SAN FRANCISCO INT...| CA US| SFO|
|SEATTLE TACOMA AI...| WA US| SEA|
+--------------------+------+----+
=== df_abbrs ===
+--------------------+---+
| _c0|_c1|
+--------------------+---+
|SEATTLE TACOMA AI...|SEA|
|MIAMI INTERNATION...|MIA|
|SAN FRANCISCO INT...|SFO|
+--------------------+---+
=== using a join ===
+--------------------+------+---+
| _c0| _c1|_c1|
+--------------------+------+---+
|SEATTLE TACOMA AI...| WA US|SEA|
|SEATTLE TACOMA AI...| WA US|SEA|
|SAN FRANCISCO INT...| CA US|SFO|
|SAN FRANCISCO INT...| CA US|SFO|
|MIAMI INTERNATION...| FL US|MIA|
|MIAMI INTERNATION...| FL US|MIA|
|MIAMI INTERNATION...| FL US|MIA|
+--------------------+------+---+
我有一个数据框,其中包含机场名称的现有列,我想创建另一个包含机场名称缩写的列。
例如,我有一个包含以下值的现有列:
SEATTLE TACOMA AIRPORT, WA US
MIAMI INTERNATIONAL AIRPORT, FL US
SAN FRANCISCO INTERNATIONAL AIRPORT, CA US
MIAMI INTERNATIONAL AIRPORT, FL US
MIAMI INTERNATIONAL AIRPORT, FL US
SAN FRANCISCO INTERNATIONAL AIRPORT, CA US
SEATTLE TACOMA AIRPORT, WA US
我想创建一个包含相关缩写的新列,例如 SEA、MIA 和 SFO。我在想我可以使用 for 循环来实现它,但我不太确定如何准确地编写代码。
您可以在数据框中添加新列,这将创建新的数据框 您可以使用 dataframe.withColumn(newcolumnname, case statement to decode name to abbreviations)
这里有 2 个示例方法:
- 使用字典和 UDF
- 使用第二个 DataFrame 加入
from pyspark.sql.functions import col, udf, StringType
s = """\
SEATTLE TACOMA AIRPORT, WA US
MIAMI INTERNATIONAL AIRPORT, FL US
SAN FRANCISCO INTERNATIONAL AIRPORT, CA US
MIAMI INTERNATIONAL AIRPORT, FL US
MIAMI INTERNATIONAL AIRPORT, FL US
SAN FRANCISCO INTERNATIONAL AIRPORT, CA US
SEATTLE TACOMA AIRPORT, WA US"""
abbr = {
"SEATTLE TACOMA AIRPORT": "SEA",
"MIAMI INTERNATIONAL AIRPORT": "MIA",
"SAN FRANCISCO INTERNATIONAL AIRPORT": "SFO",
}
df = spark.read.csv(sc.parallelize(s.splitlines()))
print("=== df ===")
df.show()
# =================================
# 1. using a UDF
# =================================
print("=== using a UDF ===")
udf_airport_to_abbr = udf(lambda airport: abbr[airport], StringType())
df.withColumn("abbr", udf_airport_to_abbr("_c0")).show()
# =================================
# 2. using a join
# =================================
# you may want to create this df in some different way ;)
df_abbrs = spark.read.csv(sc.parallelize(["%s,%s" % x for x in abbr.items()]))
print("=== df_abbrs ===")
df_abbrs.show()
print("=== using a join ===")
df.join(df_abbrs, on="_c0").show()
输出:
=== df ===
+--------------------+------+
| _c0| _c1|
+--------------------+------+
|SEATTLE TACOMA AI...| WA US|
|MIAMI INTERNATION...| FL US|
|SAN FRANCISCO INT...| CA US|
|MIAMI INTERNATION...| FL US|
|MIAMI INTERNATION...| FL US|
|SAN FRANCISCO INT...| CA US|
|SEATTLE TACOMA AI...| WA US|
+--------------------+------+
=== using a UDF ===
+--------------------+------+----+
| _c0| _c1|abbr|
+--------------------+------+----+
|SEATTLE TACOMA AI...| WA US| SEA|
|MIAMI INTERNATION...| FL US| MIA|
|SAN FRANCISCO INT...| CA US| SFO|
|MIAMI INTERNATION...| FL US| MIA|
|MIAMI INTERNATION...| FL US| MIA|
|SAN FRANCISCO INT...| CA US| SFO|
|SEATTLE TACOMA AI...| WA US| SEA|
+--------------------+------+----+
=== df_abbrs ===
+--------------------+---+
| _c0|_c1|
+--------------------+---+
|SEATTLE TACOMA AI...|SEA|
|MIAMI INTERNATION...|MIA|
|SAN FRANCISCO INT...|SFO|
+--------------------+---+
=== using a join ===
+--------------------+------+---+
| _c0| _c1|_c1|
+--------------------+------+---+
|SEATTLE TACOMA AI...| WA US|SEA|
|SEATTLE TACOMA AI...| WA US|SEA|
|SAN FRANCISCO INT...| CA US|SFO|
|SAN FRANCISCO INT...| CA US|SFO|
|MIAMI INTERNATION...| FL US|MIA|
|MIAMI INTERNATION...| FL US|MIA|
|MIAMI INTERNATION...| FL US|MIA|
+--------------------+------+---+