如何将 pyspark 数据帧分成两行
How to slice a pyspark dataframe in two row-wise
我在 Databricks 工作。
我有一个包含 500 行的数据框,我想创建两个包含 100 行的数据框,另一个包含剩余的 400 行。
+--------------------+----------+
| userid| eventdate|
+--------------------+----------+
|00518b128fc9459d9...|2017-10-09|
|00976c0b7f2c4c2ca...|2017-12-16|
|00a60fb81aa74f35a...|2017-12-04|
|00f9f7234e2c4bf78...|2017-05-09|
|0146fe6ad7a243c3b...|2017-11-21|
|016567f169c145ddb...|2017-10-16|
|01ccd278777946cb8...|2017-07-05|
我已尝试以下操作,但收到错误消息
df1 = df[:99]
df2 = df[100:499]
TypeError: unexpected item type: <type 'slice'>
如果我不介意在两个数据框中有相同的行,那么我可以使用 sample
。例如我有一个包含 354 行的数据框。
>>> df.count()
354
>>> df.sample(False,0.5,0).count() //approx. 50%
179
>>> df.sample(False,0.1,0).count() //approx. 10%
34
或者,如果我想严格拆分而不出现重复项,我可以这样做
df1 = df.limit(100) //100 rows
df2 = df.subtract(df1) //Remaining rows
Spark 数据帧不能像你写的那样被索引。您可以使用 head 方法来创建以获取前 n 行。这将 return 一个 Row() 对象列表而不是数据框。因此,您可以将它们转换回数据帧并使用从原始数据帧中减去其余行。
#Take the 100 top rows convert them to dataframe
#Also you need to provide the schema also to avoid errors
df1 = sqlContext.createDataFrame(df.head(100), df.schema)
#Take the rest of the rows
df2 = df.subtract(df1)
如果您使用 spark 2.0+,您也可以使用 SparkSession 而不是 spark sqlContext。此外,如果您对获取前 100 行不感兴趣并且想要随机拆分,您可以像这样使用 randomSplit:
df1,df2 = df.randomSplit([0.20, 0.80],seed=1234)
起初我误会了,以为你想对列进行切片。如果要 select 行子集,一种方法是使用 monotonically_increasing_id()
创建索引列。来自文档:
The generated ID is guaranteed to be monotonically increasing and
unique, but not consecutive.
您可以使用此 ID 对数据框进行排序并使用 limit()
对其进行子集化,以确保您准确获得所需的行。
例如:
import pyspark.sql.functions as f
import string
# create a dummy df with 500 rows and 2 columns
N = 500
numbers = [i%26 for i in range(N)]
letters = [string.ascii_uppercase[n] for n in numbers]
df = sqlCtx.createDataFrame(
zip(numbers, letters),
('numbers', 'letters')
)
# add an index column
df = df.withColumn('index', f.monotonically_increasing_id())
# sort ascending and take first 100 rows for df1
df1 = df.sort('index').limit(100)
# sort descending and take 400 rows for df2
df2 = df.sort('index', ascending=False).limit(400)
只是为了验证这是否符合您的要求:
df1.count()
#100
df2.count()
#400
我们还可以验证索引列不重叠:
df1.select(f.min('index').alias('min'), f.max('index').alias('max')).show()
#+---+---+
#|min|max|
#+---+---+
#| 0| 99|
#+---+---+
df2.select(f.min('index').alias('min'), f.max('index').alias('max')).show()
#+---+----------+
#|min| max|
#+---+----------+
#|100|8589934841|
#+---+----------+
在这里提供一个更简单的解决方案,更类似于所要求的:
(适用于 Spark 2.4 +)
# Starting
print('Starting row count:',df.count())
print('Starting column count:',len(df.columns))
# Slice rows
df2 = df.limit(3)
print('Sliced row count:',df2.count())
# Slice columns
cols_list = df.columns[0:1]
df3 = df.select(cols_list)
print('Sliced column count:',len(df3.columns))
这样试试:
df1_list = df.collect()[:99] #this will return list
df1 = spark.createDataFrame(df1) #convert it to spark dataframe
同样适用于此:
df2_list = df.collect()[100:499]
df2 = spark.createDataFrame(df2)
在这两种解决方案中,我认为我们需要将 df1
更改为 df1_list
,并在第二句中将 df2
更改为 df2_list
。
我在 Databricks 工作。
我有一个包含 500 行的数据框,我想创建两个包含 100 行的数据框,另一个包含剩余的 400 行。
+--------------------+----------+
| userid| eventdate|
+--------------------+----------+
|00518b128fc9459d9...|2017-10-09|
|00976c0b7f2c4c2ca...|2017-12-16|
|00a60fb81aa74f35a...|2017-12-04|
|00f9f7234e2c4bf78...|2017-05-09|
|0146fe6ad7a243c3b...|2017-11-21|
|016567f169c145ddb...|2017-10-16|
|01ccd278777946cb8...|2017-07-05|
我已尝试以下操作,但收到错误消息
df1 = df[:99]
df2 = df[100:499]
TypeError: unexpected item type: <type 'slice'>
如果我不介意在两个数据框中有相同的行,那么我可以使用 sample
。例如我有一个包含 354 行的数据框。
>>> df.count()
354
>>> df.sample(False,0.5,0).count() //approx. 50%
179
>>> df.sample(False,0.1,0).count() //approx. 10%
34
或者,如果我想严格拆分而不出现重复项,我可以这样做
df1 = df.limit(100) //100 rows
df2 = df.subtract(df1) //Remaining rows
Spark 数据帧不能像你写的那样被索引。您可以使用 head 方法来创建以获取前 n 行。这将 return 一个 Row() 对象列表而不是数据框。因此,您可以将它们转换回数据帧并使用从原始数据帧中减去其余行。
#Take the 100 top rows convert them to dataframe
#Also you need to provide the schema also to avoid errors
df1 = sqlContext.createDataFrame(df.head(100), df.schema)
#Take the rest of the rows
df2 = df.subtract(df1)
如果您使用 spark 2.0+,您也可以使用 SparkSession 而不是 spark sqlContext。此外,如果您对获取前 100 行不感兴趣并且想要随机拆分,您可以像这样使用 randomSplit:
df1,df2 = df.randomSplit([0.20, 0.80],seed=1234)
起初我误会了,以为你想对列进行切片。如果要 select 行子集,一种方法是使用 monotonically_increasing_id()
创建索引列。来自文档:
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
您可以使用此 ID 对数据框进行排序并使用 limit()
对其进行子集化,以确保您准确获得所需的行。
例如:
import pyspark.sql.functions as f
import string
# create a dummy df with 500 rows and 2 columns
N = 500
numbers = [i%26 for i in range(N)]
letters = [string.ascii_uppercase[n] for n in numbers]
df = sqlCtx.createDataFrame(
zip(numbers, letters),
('numbers', 'letters')
)
# add an index column
df = df.withColumn('index', f.monotonically_increasing_id())
# sort ascending and take first 100 rows for df1
df1 = df.sort('index').limit(100)
# sort descending and take 400 rows for df2
df2 = df.sort('index', ascending=False).limit(400)
只是为了验证这是否符合您的要求:
df1.count()
#100
df2.count()
#400
我们还可以验证索引列不重叠:
df1.select(f.min('index').alias('min'), f.max('index').alias('max')).show()
#+---+---+
#|min|max|
#+---+---+
#| 0| 99|
#+---+---+
df2.select(f.min('index').alias('min'), f.max('index').alias('max')).show()
#+---+----------+
#|min| max|
#+---+----------+
#|100|8589934841|
#+---+----------+
在这里提供一个更简单的解决方案,更类似于所要求的:
(适用于 Spark 2.4 +)
# Starting
print('Starting row count:',df.count())
print('Starting column count:',len(df.columns))
# Slice rows
df2 = df.limit(3)
print('Sliced row count:',df2.count())
# Slice columns
cols_list = df.columns[0:1]
df3 = df.select(cols_list)
print('Sliced column count:',len(df3.columns))
这样试试:
df1_list = df.collect()[:99] #this will return list
df1 = spark.createDataFrame(df1) #convert it to spark dataframe
同样适用于此:
df2_list = df.collect()[100:499]
df2 = spark.createDataFrame(df2)
在这两种解决方案中,我认为我们需要将 df1
更改为 df1_list
,并在第二句中将 df2
更改为 df2_list
。