PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?
PySpark DataFrames - way to enumerate without converting to Pandas?
我有一个很大的pyspark.sql.dataframe.DataFrame,名字叫df。
我需要一些枚举记录的方法——因此,能够访问具有特定索引的记录。 (或 select 组索引范围的记录)
在 pandas 中,我只能做出
indexes=[2,3,6,7]
df[indexes]
这里我想要类似的东西,(并且不将数据帧转换为 pandas)
我能得到的最接近的是:
通过以下方式枚举原始数据框中的所有对象:
indexes=np.arange(df.count())
df_indexed=df.withColumn('index', indexes)
- 正在使用 where() 函数搜索我需要的值。
问题:
- 为什么它不起作用以及如何让它起作用?如何向数据框添加一行?
以后做这样的东西行吗:
indexes=[2,3,6,7]
df1.where("index in indexes").collect()
有什么更快更简单的处理方法吗?
它不起作用,因为:
withColumn
的第二个参数应该是 Column
而不是集合。 np.array
在这里不起作用
- 当您将
"index in indexes"
作为 SQL 表达式传递给 where
时 indexes
超出范围并且未解析为有效标识符
PySpark >= 1.4.0
您可以使用相应的 window 函数添加行号,并使用 Column.isin
方法或格式正确的查询字符串进行查询:
from pyspark.sql.functions import col, rowNumber
from pyspark.sql.window import Window
w = Window.orderBy()
indexed = df.withColumn("index", rowNumber().over(w))
# Using DSL
indexed.where(col("index").isin(set(indexes)))
# Using SQL expression
indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
看起来 window 函数在没有 PARTITION BY
子句的情况下调用会将所有数据移动到单个分区,所以以上可能不是最好的解决方案。
Any faster and simpler way to deal with it?
不是真的。 Spark DataFrame 不支持随机行访问。
PairedRDD
可以使用 lookup
方法访问,如果使用 HashPartitioner
对数据进行分区,该方法相对较快。还有 indexed-rdd 项目支持高效查找。
编辑:
独立于 PySpark 版本你可以尝试这样的事情:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
row = Row("char")
row_with_index = Row("char", "index")
df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
df.show(5)
## +----+
## |char|
## +----+
## | a|
## | b|
## | c|
## | d|
## | e|
## +----+
## only showing top 5 rows
# This part is not tested but should work and save some work later
schema = StructType(
df.schema.fields[:] + [StructField("index", LongType(), False)])
indexed = (df.rdd # Extract rdd
.zipWithIndex() # Add index
.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
.toDF(schema)) # It will work without schema but will be more expensive
# inSet in Spark < 1.3
indexed.where(col("index").isin(indexes))
如果您想要一个保证不会冲突但不需要 .over(partitionBy())
的数字范围,那么您可以使用 monotonicallyIncreasingId()
。
from pyspark.sql.functions import monotonicallyIncreasingId
df.select(monotonicallyIncreasingId().alias("rowId"),"*")
请注意,这些值并不是特别 "neat"。每个分区都有一个取值范围,输出不会连续。例如。 0, 1, 2, 8589934592, 8589934593, 8589934594
。
这是 2015 年 4 月 28 日添加到 Spark 的:https://github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2
您当然可以添加一个数组用于索引,确实是您选择的数组:
在 Scala 中,首先我们需要创建一个索引数组:
val index_array=(1 to df.count.toInt).toArray
index_array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
您现在可以将此列附加到您的 DF。首先,为此,您需要打开我们的 DF 并将其作为数组获取,然后用 index_array 压缩它,然后我们将新数组转换回 RDD。最后一步是获取它作为 DF:
final_df = sc.parallelize((df.collect.map(
x=>(x(0),x(1))) zip index_array).map(
x=>(x._1._1.toString,x._1._2.toString,x._2))).
toDF("column_name")
之后索引会更清晰
monotonicallyIncreasingId()
- 这将按递增顺序而不是按顺序分配行号。
2 列示例输出:
|---------------------|------------------|
| RowNo | Heading 2 |
|---------------------|------------------|
| 1 | xy |
|---------------------|------------------|
| 12 | xz |
|---------------------|------------------|
如果要分配行号,请使用以下技巧。
已在 spark-2.0.1 及更高版本中测试。
df.createOrReplaceTempView("df")
dfRowId = spark.sql("select *, row_number() over (partition by 0) as rowNo from df")
2 列示例输出:
|---------------------|------------------|
| RowNo | Heading 2 |
|---------------------|------------------|
| 1 | xy |
|---------------------|------------------|
| 2 | xz |
|---------------------|------------------|
希望对您有所帮助。
选择 Pyspark DataFrame 的单行 n,尝试:
df.where(df.id == n).show()
给定一个 Pyspark DataFrame:
df = spark.createDataFrame([(1, 143.5, 5.6, 28, 'M', 100000),\
(2, 167.2, 5.4, 45, 'M', None),\
(3, None , 5.2, None, None, None),\
], ['id', 'weight', 'height', 'age', 'gender', 'income'])
选择第3行,试试:
df.where('id == 3').show()
或:
df.where(df.id == 3).show()
选择具有行 ID 的多行(在本例中为第 2 行和第 3 行),尝试:
id = {"2", "3"}
df.where(df.id.isin(id)).show()
from pyspark.sql.functions import monotonically_increasing_id
df.withColumn("Atr4", monotonically_increasing_id())
If you only need incremental values (like an ID) and if there is no
constraint that the numbers need to be consecutive, you could use
monotonically_increasing_id(). The only guarantee when using this
function is that the values will be increasing for each row, however,
the values themself can differ each execution.
我有一个很大的pyspark.sql.dataframe.DataFrame,名字叫df。 我需要一些枚举记录的方法——因此,能够访问具有特定索引的记录。 (或 select 组索引范围的记录)
在 pandas 中,我只能做出
indexes=[2,3,6,7]
df[indexes]
这里我想要类似的东西,(并且不将数据帧转换为 pandas)
我能得到的最接近的是:
通过以下方式枚举原始数据框中的所有对象:
indexes=np.arange(df.count()) df_indexed=df.withColumn('index', indexes)
- 正在使用 where() 函数搜索我需要的值。
问题:
- 为什么它不起作用以及如何让它起作用?如何向数据框添加一行?
以后做这样的东西行吗:
indexes=[2,3,6,7] df1.where("index in indexes").collect()
有什么更快更简单的处理方法吗?
它不起作用,因为:
withColumn
的第二个参数应该是Column
而不是集合。np.array
在这里不起作用- 当您将
"index in indexes"
作为 SQL 表达式传递给where
时indexes
超出范围并且未解析为有效标识符
PySpark >= 1.4.0
您可以使用相应的 window 函数添加行号,并使用 Column.isin
方法或格式正确的查询字符串进行查询:
from pyspark.sql.functions import col, rowNumber
from pyspark.sql.window import Window
w = Window.orderBy()
indexed = df.withColumn("index", rowNumber().over(w))
# Using DSL
indexed.where(col("index").isin(set(indexes)))
# Using SQL expression
indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
看起来 window 函数在没有 PARTITION BY
子句的情况下调用会将所有数据移动到单个分区,所以以上可能不是最好的解决方案。
Any faster and simpler way to deal with it?
不是真的。 Spark DataFrame 不支持随机行访问。
PairedRDD
可以使用 lookup
方法访问,如果使用 HashPartitioner
对数据进行分区,该方法相对较快。还有 indexed-rdd 项目支持高效查找。
编辑:
独立于 PySpark 版本你可以尝试这样的事情:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
row = Row("char")
row_with_index = Row("char", "index")
df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
df.show(5)
## +----+
## |char|
## +----+
## | a|
## | b|
## | c|
## | d|
## | e|
## +----+
## only showing top 5 rows
# This part is not tested but should work and save some work later
schema = StructType(
df.schema.fields[:] + [StructField("index", LongType(), False)])
indexed = (df.rdd # Extract rdd
.zipWithIndex() # Add index
.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
.toDF(schema)) # It will work without schema but will be more expensive
# inSet in Spark < 1.3
indexed.where(col("index").isin(indexes))
如果您想要一个保证不会冲突但不需要 .over(partitionBy())
的数字范围,那么您可以使用 monotonicallyIncreasingId()
。
from pyspark.sql.functions import monotonicallyIncreasingId
df.select(monotonicallyIncreasingId().alias("rowId"),"*")
请注意,这些值并不是特别 "neat"。每个分区都有一个取值范围,输出不会连续。例如。 0, 1, 2, 8589934592, 8589934593, 8589934594
。
这是 2015 年 4 月 28 日添加到 Spark 的:https://github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2
您当然可以添加一个数组用于索引,确实是您选择的数组: 在 Scala 中,首先我们需要创建一个索引数组:
val index_array=(1 to df.count.toInt).toArray
index_array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
您现在可以将此列附加到您的 DF。首先,为此,您需要打开我们的 DF 并将其作为数组获取,然后用 index_array 压缩它,然后我们将新数组转换回 RDD。最后一步是获取它作为 DF:
final_df = sc.parallelize((df.collect.map(
x=>(x(0),x(1))) zip index_array).map(
x=>(x._1._1.toString,x._1._2.toString,x._2))).
toDF("column_name")
之后索引会更清晰
monotonicallyIncreasingId()
- 这将按递增顺序而不是按顺序分配行号。
2 列示例输出:
|---------------------|------------------|
| RowNo | Heading 2 |
|---------------------|------------------|
| 1 | xy |
|---------------------|------------------|
| 12 | xz |
|---------------------|------------------|
如果要分配行号,请使用以下技巧。
已在 spark-2.0.1 及更高版本中测试。
df.createOrReplaceTempView("df")
dfRowId = spark.sql("select *, row_number() over (partition by 0) as rowNo from df")
2 列示例输出:
|---------------------|------------------|
| RowNo | Heading 2 |
|---------------------|------------------|
| 1 | xy |
|---------------------|------------------|
| 2 | xz |
|---------------------|------------------|
希望对您有所帮助。
选择 Pyspark DataFrame 的单行 n,尝试:
df.where(df.id == n).show()
给定一个 Pyspark DataFrame:
df = spark.createDataFrame([(1, 143.5, 5.6, 28, 'M', 100000),\
(2, 167.2, 5.4, 45, 'M', None),\
(3, None , 5.2, None, None, None),\
], ['id', 'weight', 'height', 'age', 'gender', 'income'])
选择第3行,试试:
df.where('id == 3').show()
或:
df.where(df.id == 3).show()
选择具有行 ID 的多行(在本例中为第 2 行和第 3 行),尝试:
id = {"2", "3"}
df.where(df.id.isin(id)).show()
from pyspark.sql.functions import monotonically_increasing_id
df.withColumn("Atr4", monotonically_increasing_id())
If you only need incremental values (like an ID) and if there is no constraint that the numbers need to be consecutive, you could use monotonically_increasing_id(). The only guarantee when using this function is that the values will be increasing for each row, however, the values themself can differ each execution.