SparkR DataFrame 分区问题
SparkR DataFrame partitioning issue
在我的 R 脚本中,我有一个 SparkDataFrame
两列(时间、值),其中包含四个不同月份的数据。因为我需要将我的函数分别应用于每个月,所以我想我会 repartition
将它分成四个分区,每个分区将保存一个单独月份的数据。
我创建了一个名为 partition 的附加列,其整数值为 0 - 3,然后通过该特定列调用了 repartition
方法。
遗憾的是,正如本主题中所描述的那样:
,使用 repartition
方法,我们只能确保所有具有相同键的数据最终会在同一个分区中,但是具有不同键的数据也可以在同一个分区中结束。
在我的例子中,执行下面可见的代码会导致创建 4 个分区,但只用数据填充其中的 2 个。
我想我应该使用 partitionBy
方法,但是如果是 SparkR,我不知道该怎么做。
官方文档指出此方法适用于名为 WindowSpec
而不是 DataFrame
的东西。
我真的很感激能在这件事上提供一些帮助,因为我不知道如何将此方法合并到我的代码中。
sparkR.session(
master="local[*]", sparkConfig = list(spark.sql.shuffle.partitions="4"))
df <- as.DataFrame(inputDat) # this is a dataframe with added partition column
repartitionedDf <- repartition(df, col = df$partition)
schema <- structType(
structField("time", "timestamp"),
structField("value", "double"),
structField("partition", "string"))
processedDf <- dapply(repartitionedDf,
function(x) { data.frame(produceHourlyResults(x), stringsAsFactors = FALSE) },
schema)
你用错了方法。如果你
need to apply my function to an each month separately
你应该使用 gapply
那
Groups the SparkDataFrame using the specified columns and applies the R function to each group.
df %>% group_by("month") %>% gapply(fun, schema)
或
df %>% gapply("month", fun, schema)
In my case, executing code visible below results in creating 4 partitions but populating only 2 of them with data.
这表明哈希冲突。合理增加分区数量超过唯一键数量应该可以解决问题:
spark.sql.shuffle.partitions 17
I guess i should be using the partitionBy method, however
没有。 partitionBy
与 window functions () 一起使用。
地址 :
i decided to use dapply with separate partitions in order to be able to easily save each month into separate CSV file
哈希分区器不是那样工作的
你可以在writer里用partitionBy
试一下,但我不确定SparkR是否直接支持。它在结构化流中受支持,对于批处理,您可能必须调用 Java 方法或使用带有 Metastore 的表:
createDataFrame(iris) %>% createOrReplaceTempView("iris_view")
sql(
"CREATE TABLE iris
USING csv PARTITIONED BY(species)
LOCATION '/tmp/iris' AS SELECT * FROM iris_view"
)
在我的 R 脚本中,我有一个 SparkDataFrame
两列(时间、值),其中包含四个不同月份的数据。因为我需要将我的函数分别应用于每个月,所以我想我会 repartition
将它分成四个分区,每个分区将保存一个单独月份的数据。
我创建了一个名为 partition 的附加列,其整数值为 0 - 3,然后通过该特定列调用了 repartition
方法。
遗憾的是,正如本主题中所描述的那样:
repartition
方法,我们只能确保所有具有相同键的数据最终会在同一个分区中,但是具有不同键的数据也可以在同一个分区中结束。
在我的例子中,执行下面可见的代码会导致创建 4 个分区,但只用数据填充其中的 2 个。
我想我应该使用 partitionBy
方法,但是如果是 SparkR,我不知道该怎么做。
官方文档指出此方法适用于名为 WindowSpec
而不是 DataFrame
的东西。
我真的很感激能在这件事上提供一些帮助,因为我不知道如何将此方法合并到我的代码中。
sparkR.session(
master="local[*]", sparkConfig = list(spark.sql.shuffle.partitions="4"))
df <- as.DataFrame(inputDat) # this is a dataframe with added partition column
repartitionedDf <- repartition(df, col = df$partition)
schema <- structType(
structField("time", "timestamp"),
structField("value", "double"),
structField("partition", "string"))
processedDf <- dapply(repartitionedDf,
function(x) { data.frame(produceHourlyResults(x), stringsAsFactors = FALSE) },
schema)
你用错了方法。如果你
need to apply my function to an each month separately
你应该使用 gapply
那
Groups the SparkDataFrame using the specified columns and applies the R function to each group.
df %>% group_by("month") %>% gapply(fun, schema)
或
df %>% gapply("month", fun, schema)
In my case, executing code visible below results in creating 4 partitions but populating only 2 of them with data.
这表明哈希冲突。合理增加分区数量超过唯一键数量应该可以解决问题:
spark.sql.shuffle.partitions 17
I guess i should be using the partitionBy method, however
没有。 partitionBy
与 window functions (
地址
i decided to use dapply with separate partitions in order to be able to easily save each month into separate CSV file
哈希分区器不是那样工作的
你可以在writer里用partitionBy
试一下,但我不确定SparkR是否直接支持。它在结构化流中受支持,对于批处理,您可能必须调用 Java 方法或使用带有 Metastore 的表:
createDataFrame(iris) %>% createOrReplaceTempView("iris_view")
sql(
"CREATE TABLE iris
USING csv PARTITIONED BY(species)
LOCATION '/tmp/iris' AS SELECT * FROM iris_view"
)