如何使用 Pyspark Dataframe 分配一行?
how can I assign a row with Pyspark Dataframe?
能否将下面的表达式从 Pandas 转换为 Pyspark Dataframe,我尝试查看 Pyspark 中 loc 的等价物?
import pandas as pd
df3 = pd.DataFrame(columns=["Devices","months"])
new_entry = {'Devices': 'device1', 'months': 'month1'}
df3.loc[len(df3)] = new_entry
在 pyspark 中,您需要 union
向现有数据框添加新行。但是 Spark 数据帧是无序的,并且没有 pandas 中的索引,所以没有这样的等价物。对于你给出的例子,你在 Pyspark 中是这样写的:
from pyspark.sql.types import *
schema = StructType([
StructField('Devices', StringType(), True),
StructField('months', TimestampType(), True)
])
df = spark.createDataFrame(sc.emptyRDD(), schema)
new_row_df = spark.createDataFrame([("device1", "month1")], ["Devices", "months"])
# or using spark.sql()
# new_row_df = spark.sql("select 'device1' as Devices, 'month1' as months")
df = df.union(new_row_df)
df.show()
#+-------+------+
#|Devices|months|
#+-------+------+
#|device1|month1|
#+-------+------+
如果你想在“特定位置”添加一行,你可以通过定义一个排序使用例如row_number
函数创建列索引,然后过滤在执行合并之前要将新行分配到的行号:
from pyspark.sql import functions as F
from pyspark.sql import Window
df = df.withColumn("rn", F.row_number().over(Window.orderBy("Devices")))
# df.loc[1] = ...
df = df.filter("rn <> 1").drop("rn").union(new_row_df)
能否将下面的表达式从 Pandas 转换为 Pyspark Dataframe,我尝试查看 Pyspark 中 loc 的等价物?
import pandas as pd
df3 = pd.DataFrame(columns=["Devices","months"])
new_entry = {'Devices': 'device1', 'months': 'month1'}
df3.loc[len(df3)] = new_entry
在 pyspark 中,您需要 union
向现有数据框添加新行。但是 Spark 数据帧是无序的,并且没有 pandas 中的索引,所以没有这样的等价物。对于你给出的例子,你在 Pyspark 中是这样写的:
from pyspark.sql.types import *
schema = StructType([
StructField('Devices', StringType(), True),
StructField('months', TimestampType(), True)
])
df = spark.createDataFrame(sc.emptyRDD(), schema)
new_row_df = spark.createDataFrame([("device1", "month1")], ["Devices", "months"])
# or using spark.sql()
# new_row_df = spark.sql("select 'device1' as Devices, 'month1' as months")
df = df.union(new_row_df)
df.show()
#+-------+------+
#|Devices|months|
#+-------+------+
#|device1|month1|
#+-------+------+
如果你想在“特定位置”添加一行,你可以通过定义一个排序使用例如row_number
函数创建列索引,然后过滤在执行合并之前要将新行分配到的行号:
from pyspark.sql import functions as F
from pyspark.sql import Window
df = df.withColumn("rn", F.row_number().over(Window.orderBy("Devices")))
# df.loc[1] = ...
df = df.filter("rn <> 1").drop("rn").union(new_row_df)