删除sparklyr中的重复行
Deleting duplicated rows in sparklyr
我需要使用 sparklyr
.
根据另一列中的重复项删除一列中重复的行
iris 数据集有许多观察结果,其中 4 个特征是相同的。 Sepal.Width
、Petal.Length
、Petal.Width
和 Species
的值相似(仅 Sepal.Length
列的行不同)。
让我们在 spark
中创建一个 iris 的副本
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3")
iris_spark <- copy_to(sc, iris)
基础 R 方法
这是基本的 R 方法,它将删除重复的行,只保留具有 Sepal.Length
最大值的行:
iris_order = iris[order(iris[,'Sepal.Length'],-iris[,'Sepal.Length']),] ### sort first
iris_subset = iris_order[!duplicated(iris_order$Sepal.Length),] ### Keep highest
dim(iris_subset) # 35 5
但这不适用于 tbl_spark
对象:
iris_spark_order = iris_spark[order(iris_spark[,'Sepal.Length'],-iris_spark[,'Sepal.Length']),]
Error in iris_spark[, "Sepal.Length"] : incorrect number of dimensions
Tidyverse
我可以想到两种可能的 dplyr
解决方案,它们适用于 data.frame
但不适用于 tbl_spark
:
1)
library(dplyr)
iris %>% distinct()
iris_spark %>% distinct()
Error: org.apache.spark.sql.AnalysisException: cannot resolve '`Sepal.Length`' given input columns: [iris.Sepal_Length, iris.Sepal_Width, iris.Petal_Width, iris.Petal_Length, iris.Species]; line 1 pos 16;
'Distinct
+- 'Project ['Sepal.Length]
+- SubqueryAlias iris
+- LogicalRDD [Sepal_Length#13, Sepal_Width#14, Petal_Length#15, Petal_Width#16, Species#17], false
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:92)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:89)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:107)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:107)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression(QueryPlan.scala:106)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform.apply(QueryPlan.scala:122)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:122)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:89)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at sparklyr.Invoke.invoke(invoke.scala:147)
at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
at sparklyr.StreamHandler.read(stream.scala:66)
at sparklyr.BackendHandler.channelRead0(handler.scala:51)
at sparklyr.BackendHandler.channelRead0(handler.scala:4)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Unknown Source)
2)
iris_order <- arrange(iris, Sepal.Length)
iris_subset <- iris_order [!duplicated(iris_order $Sepal.Length),]
但不适用于 tbl_spark
对象:
library(dplyr)
iris_order <- arrange(iris_spark, Sepal.Length)
iris_subset <- iris_order [!duplicated(iris_order$Sepal.Length),]
Error in iris_order[!duplicated(iris_order$Sepal.Length), ] :
incorrect number of dimensions
data.table
DT
data.frame
的解决方案
library(data.table)
df <- iris # iris resides in package that is locked so copy to new object
unique(setDT(df)[order(Sepal.Length, -Species)], by = "Sepal.Length")
但不适用于 tbl_spark
对象:
unique(setDT(iris_spark)[order(Sepal.Length)], by = "Sepal.Length")
Error in setDT(iris_spark) :
All elements in argument 'x' to 'setDT' must be of same length, but the profile of input lengths (length:frequency) is: [1:1, 2:1]
The first entry with fewer than 2 entries is 1
那么实际上如何用sparklyr
在Spark中完成这个任务?
filter
可以与 sparklyr
一起使用
library(dplyr)
library(sparklyr)
iris_spark %>%
group_by(Sepal.Length) %>%
filter(n() ==1)
如果问题像问题中所述的那样简单,您想要取一列的最大值,给定 n - 1 个分组列,则简单的聚合就足够了:
iris_spark %>%
group_by(Sepal_Width, Petal_Length, Petal_Width, Species) %>%
summarise(Sepal_Length=max(Sepal_Length))
如果您不关心您会得到哪个值*,并且列数会有所不同,您可以删除重复项(这在内部使用 first
,不能在 dplyr
没有 window):
iris_spark %>%
spark_dataframe() %>%
invoke(
"dropDuplicates",
list("Sepal_Width", "Petal_Length" ,"Petal_Width", "Species")) %>%
sdf_register()
如果您关心顺序, 在技术上是正确的,但可扩展性不是很好。相反,您可以将剩余的列组合成一个 struct
并取其 max
(structs
使用字典顺序)。
iris_spark %>%
group_by(Sepal_Width, Petal_Length, Petal_Width, Species) %>%
# You can put additional values in the struct
summarise(values=max(struct(Sepal_Length))) %>%
mutate(Sepal_Length=values.Sepal_Length)
* 重要的是要强调任何前面的顺序都会被忽略,即使玩具示例可能另有说明。
我需要使用 sparklyr
.
iris 数据集有许多观察结果,其中 4 个特征是相同的。 Sepal.Width
、Petal.Length
、Petal.Width
和 Species
的值相似(仅 Sepal.Length
列的行不同)。
让我们在 spark
中创建一个 iris 的副本library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3")
iris_spark <- copy_to(sc, iris)
基础 R 方法
这是基本的 R 方法,它将删除重复的行,只保留具有 Sepal.Length
最大值的行:
iris_order = iris[order(iris[,'Sepal.Length'],-iris[,'Sepal.Length']),] ### sort first
iris_subset = iris_order[!duplicated(iris_order$Sepal.Length),] ### Keep highest
dim(iris_subset) # 35 5
但这不适用于 tbl_spark
对象:
iris_spark_order = iris_spark[order(iris_spark[,'Sepal.Length'],-iris_spark[,'Sepal.Length']),]
Error in iris_spark[, "Sepal.Length"] : incorrect number of dimensions
Tidyverse
我可以想到两种可能的 dplyr
解决方案,它们适用于 data.frame
但不适用于 tbl_spark
:
1)
library(dplyr)
iris %>% distinct()
iris_spark %>% distinct()
Error: org.apache.spark.sql.AnalysisException: cannot resolve '`Sepal.Length`' given input columns: [iris.Sepal_Length, iris.Sepal_Width, iris.Petal_Width, iris.Petal_Length, iris.Species]; line 1 pos 16;
'Distinct
+- 'Project ['Sepal.Length]
+- SubqueryAlias iris
+- LogicalRDD [Sepal_Length#13, Sepal_Width#14, Petal_Length#15, Petal_Width#16, Species#17], false
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:92)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:89)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:107)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:107)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression(QueryPlan.scala:106)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:118)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform.apply(QueryPlan.scala:122)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:122)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun.apply(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:89)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis.apply(CheckAnalysis.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at sparklyr.Invoke.invoke(invoke.scala:147)
at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
at sparklyr.StreamHandler.read(stream.scala:66)
at sparklyr.BackendHandler.channelRead0(handler.scala:51)
at sparklyr.BackendHandler.channelRead0(handler.scala:4)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Unknown Source)
2)
iris_order <- arrange(iris, Sepal.Length)
iris_subset <- iris_order [!duplicated(iris_order $Sepal.Length),]
但不适用于 tbl_spark
对象:
library(dplyr)
iris_order <- arrange(iris_spark, Sepal.Length)
iris_subset <- iris_order [!duplicated(iris_order$Sepal.Length),]
Error in iris_order[!duplicated(iris_order$Sepal.Length), ] : incorrect number of dimensions
data.table
DT
data.frame
library(data.table)
df <- iris # iris resides in package that is locked so copy to new object
unique(setDT(df)[order(Sepal.Length, -Species)], by = "Sepal.Length")
但不适用于 tbl_spark
对象:
unique(setDT(iris_spark)[order(Sepal.Length)], by = "Sepal.Length")
Error in setDT(iris_spark) : All elements in argument 'x' to 'setDT' must be of same length, but the profile of input lengths (length:frequency) is: [1:1, 2:1] The first entry with fewer than 2 entries is 1
那么实际上如何用sparklyr
在Spark中完成这个任务?
filter
可以与 sparklyr
library(dplyr)
library(sparklyr)
iris_spark %>%
group_by(Sepal.Length) %>%
filter(n() ==1)
如果问题像问题中所述的那样简单,您想要取一列的最大值,给定 n - 1 个分组列,则简单的聚合就足够了:
iris_spark %>%
group_by(Sepal_Width, Petal_Length, Petal_Width, Species) %>%
summarise(Sepal_Length=max(Sepal_Length))
如果您不关心您会得到哪个值*,并且列数会有所不同,您可以删除重复项(这在内部使用 first
,不能在 dplyr
没有 window):
iris_spark %>%
spark_dataframe() %>%
invoke(
"dropDuplicates",
list("Sepal_Width", "Petal_Length" ,"Petal_Width", "Species")) %>%
sdf_register()
如果您关心顺序,struct
并取其 max
(structs
使用字典顺序)。
iris_spark %>%
group_by(Sepal_Width, Petal_Length, Petal_Width, Species) %>%
# You can put additional values in the struct
summarise(values=max(struct(Sepal_Length))) %>%
mutate(Sepal_Length=values.Sepal_Length)
* 重要的是要强调任何前面的顺序都会被忽略,即使玩具示例可能另有说明。