如何通过 R 在 Spark 1.6 中保留反连接?
How to LEFT ANTI JOIN in Spark 1.6 via R?
我可以在这两个表中设置以下 spark v.1.6.0 连接和加载。
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local",
version = "1.6.0",
hadoop_version = 2.4)
iris_r <- iris %>% mutate(id = row_number())
mtcars_r <- mtcars %>% mutate(id = row_number())
iris_db <- copy_to(sc, iris_r, name = "iris_db", overwrite = TRUE)
mtcars_db <- copy_to(sc, mtcars_r, name = "mtcars_db", overwrite = TRUE)
df <- iris_db %>% anti_join(mtcars_db, by = "id")
df
但是,当我尝试查看或收集 df 时,出现以下错误,
Error: org.apache.spark.sql.AnalysisException:
Unsupported language features in query: SELECT * FROM `iris_db` AS `TBL_LEFT`
WHERE NOT EXISTS (
SELECT 1 FROM `mtcars_db` AS `TBL_RIGHT`
WHERE (`TBL_LEFT`.`id` = `TBL_RIGHT`.`id`)
)
TOK_QUERY 1, 0,51, 14
TOK_FROM 1, 4,10, 14
TOK_TABREF 1, 6,10, 14
TOK_TABNAME 1, 6,6, 14
iris_db 1, 6,6, 14
TBL_LEFT 1, 10,10, 27
TOK_INSERT 0, -1,51, 0
TOK_DESTINATION 0, -1,-1, 0
TOK_DIR 0, -1,-1, 0
TOK_TMP_FILE 0, -1,-1, 0
TOK_SELECT 0, 0,2, 0
TOK_SELEXPR 0, 2,2, 0
TOK_ALLCOLREF 0, 2,2, 0
TOK_WHERE 3, 13,51, 6
NOT 3, 15,51, 6
TOK_SUBQUERY_EXPR 3, 17,51, 10
TOK_SUBQUERY_OP 3, 17,17, 10
EXISTS 3, 17,17, 10
TOK_QUERY 4, 19,51, 16
TOK_FROM 4, 27,33, 16
TOK_TABREF 4, 29,33, 16
TOK_TABNAME 4, 29,29, 16
mtcars_db 4, 29,29, 16
TBL_RIGHT 4, 33,33, 31
TOK_INSERT 0, -1,49, 0
TOK_DESTINATION 0, -1,-1, 0
TOK_DIR 0, -1,-1, 0
TOK_TMP_FILE 0, -1,-1, 0
TOK_SELECT 4, 23,25, 9
TOK_SELEXPR 4, 25,25, 9
1 4, 25,25, 9
TOK_WHERE 5, 37,49, 25
= 5, 39,49, 25
. 5, 40,42, 19
TOK_TABLE_OR_COL 5, 40,40, 9
TBL_LEFT 5, 40,40, 9
id 5, 42,42, 20
. 5, 46,48, 38
TOK_TABLE_OR_COL 5, 46,46, 27
TBL_RIGHT 5, 46,46, 27
id 5, 48,48, 39
scala.NotImplementedError: No parse rules for ASTNode type: 864, text: TOK_SUBQUERY_EXPR :
TOK_SUBQUERY_EXPR 3, 17,51, 10
TOK_SUBQUERY_OP 3, 17,17, 10
EXISTS 3, 17,17, 10
TOK_QUERY 4, 19,51, 16
TOK_FROM 4, 27,33, 16
TOK_TABREF 4, 29,33, 16
TOK_TABNAME 4, 29,29, 16
mtcars_db 4, 29,29, 16
TBL_RIGHT 4, 33,33, 31
TOK_INSERT 0, -1,49, 0
TOK_DESTINATION 0, -1,-1, 0
TOK_DIR 0, -1,-1, 0
TOK_TMP_FILE 0, -1,-1, 0
TOK_SELECT 4, 23,25, 9
TOK_SELEXPR 4, 25,25, 9
1 4, 25,25, 9
TOK_WHERE 5, 37,49, 25
= 5, 39,49, 25
. 5, 40,42, 19
TOK_TABLE_OR_COL 5, 40,40, 9
TBL_LEFT 5, 40,40, 9
id 5, 42,42, 20
. 5, 46,48, 38
TOK_TABLE_OR_COL 5, 46,46, 27
TBL_RIGHT 5, 46,46, 27
id 5, 48,48, 39
" +
org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1721)
;
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:326)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl.apply(ExtendedHiveQlParser.scala:41)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$$anonfun$apply.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$$anonfun$apply.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$$anon$$anonfun$apply.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$$anonfun$apply.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:890)
at scala.util.parsing.combinator.PackratParsers$$anon.apply(PackratParsers.scala:110)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295)
at org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse.apply(HiveContext.scala:66)
at org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse.apply(HiveContext.scala:66)
at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState.apply(ClientWrapper.scala:279)
at org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1(ClientWrapper.scala:226)
at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:225)
at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:268)
at org.apache.spark.sql.hive.HiveQLDialect.parse(HiveContext.scala:65)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:211)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:211)
at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others.apply(SparkSQLParser.scala:114)
at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others.apply(SparkSQLParser.scala:113)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$$anonfun$apply.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$$anonfun$apply.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$$anon$$anonfun$apply.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$$anonfun$apply.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:890)
at scala.util.parsing.combinator.PackratParsers$$anon.apply(PackratParsers.scala:110)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:208)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:208)
at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:43)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231)
at org.apache.spark.sql.hive.HiveContext.parseSql(HiveContext.scala:331)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sparklyr.Invoke$.invoke(invoke.scala:102)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
at sparklyr.StreamHandler$.read(stream.scala:62)
at sparklyr.BackendHandler.channelRead0(handler.scala:52)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChann
如果我切换到 Spark 2.0.1,这个错误就会消失。对于这个问题,假设我被迫使用 1.6。是否有支持的方式来执行此连接?
LEFT ANTI JOIN
可以换成FULL OUTER JOIN
然后选择:
df1 <- copy_to(sc,
data.frame(id=c(1, 2, 3), x=c("a", "b", "c")),
name="df1", overwrite=TRUE)
df2 <- copy_to(sc,
data.frame(id=c(1, 3), y=c(2, -2)),
name="df2", overwrite=TRUE)
df1 %>%
full_join(df2 %>% mutate(id_ = id), by="id") %>%
filter(is.null(id_)) %>%
select(one_of(colnames(df1)))
如果有重复的列名,您也必须更正。
请注意,您不应该:
- 使用
row_number()
生成全局 ID - 它不会缩放并且不提供所需的正确性保证。
- 在 Spark 数据帧上使用
copy_to
。它将数据收集到本地节点,因此不适用于大型数据集。
我可以在这两个表中设置以下 spark v.1.6.0 连接和加载。
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local",
version = "1.6.0",
hadoop_version = 2.4)
iris_r <- iris %>% mutate(id = row_number())
mtcars_r <- mtcars %>% mutate(id = row_number())
iris_db <- copy_to(sc, iris_r, name = "iris_db", overwrite = TRUE)
mtcars_db <- copy_to(sc, mtcars_r, name = "mtcars_db", overwrite = TRUE)
df <- iris_db %>% anti_join(mtcars_db, by = "id")
df
但是,当我尝试查看或收集 df 时,出现以下错误,
Error: org.apache.spark.sql.AnalysisException:
Unsupported language features in query: SELECT * FROM `iris_db` AS `TBL_LEFT`
WHERE NOT EXISTS (
SELECT 1 FROM `mtcars_db` AS `TBL_RIGHT`
WHERE (`TBL_LEFT`.`id` = `TBL_RIGHT`.`id`)
)
TOK_QUERY 1, 0,51, 14
TOK_FROM 1, 4,10, 14
TOK_TABREF 1, 6,10, 14
TOK_TABNAME 1, 6,6, 14
iris_db 1, 6,6, 14
TBL_LEFT 1, 10,10, 27
TOK_INSERT 0, -1,51, 0
TOK_DESTINATION 0, -1,-1, 0
TOK_DIR 0, -1,-1, 0
TOK_TMP_FILE 0, -1,-1, 0
TOK_SELECT 0, 0,2, 0
TOK_SELEXPR 0, 2,2, 0
TOK_ALLCOLREF 0, 2,2, 0
TOK_WHERE 3, 13,51, 6
NOT 3, 15,51, 6
TOK_SUBQUERY_EXPR 3, 17,51, 10
TOK_SUBQUERY_OP 3, 17,17, 10
EXISTS 3, 17,17, 10
TOK_QUERY 4, 19,51, 16
TOK_FROM 4, 27,33, 16
TOK_TABREF 4, 29,33, 16
TOK_TABNAME 4, 29,29, 16
mtcars_db 4, 29,29, 16
TBL_RIGHT 4, 33,33, 31
TOK_INSERT 0, -1,49, 0
TOK_DESTINATION 0, -1,-1, 0
TOK_DIR 0, -1,-1, 0
TOK_TMP_FILE 0, -1,-1, 0
TOK_SELECT 4, 23,25, 9
TOK_SELEXPR 4, 25,25, 9
1 4, 25,25, 9
TOK_WHERE 5, 37,49, 25
= 5, 39,49, 25
. 5, 40,42, 19
TOK_TABLE_OR_COL 5, 40,40, 9
TBL_LEFT 5, 40,40, 9
id 5, 42,42, 20
. 5, 46,48, 38
TOK_TABLE_OR_COL 5, 46,46, 27
TBL_RIGHT 5, 46,46, 27
id 5, 48,48, 39
scala.NotImplementedError: No parse rules for ASTNode type: 864, text: TOK_SUBQUERY_EXPR :
TOK_SUBQUERY_EXPR 3, 17,51, 10
TOK_SUBQUERY_OP 3, 17,17, 10
EXISTS 3, 17,17, 10
TOK_QUERY 4, 19,51, 16
TOK_FROM 4, 27,33, 16
TOK_TABREF 4, 29,33, 16
TOK_TABNAME 4, 29,29, 16
mtcars_db 4, 29,29, 16
TBL_RIGHT 4, 33,33, 31
TOK_INSERT 0, -1,49, 0
TOK_DESTINATION 0, -1,-1, 0
TOK_DIR 0, -1,-1, 0
TOK_TMP_FILE 0, -1,-1, 0
TOK_SELECT 4, 23,25, 9
TOK_SELEXPR 4, 25,25, 9
1 4, 25,25, 9
TOK_WHERE 5, 37,49, 25
= 5, 39,49, 25
. 5, 40,42, 19
TOK_TABLE_OR_COL 5, 40,40, 9
TBL_LEFT 5, 40,40, 9
id 5, 42,42, 20
. 5, 46,48, 38
TOK_TABLE_OR_COL 5, 46,46, 27
TBL_RIGHT 5, 46,46, 27
id 5, 48,48, 39
" +
org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1721)
;
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:326)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl.apply(ExtendedHiveQlParser.scala:41)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$$anonfun$apply.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$$anonfun$apply.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$$anon$$anonfun$apply.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$$anonfun$apply.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:890)
at scala.util.parsing.combinator.PackratParsers$$anon.apply(PackratParsers.scala:110)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295)
at org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse.apply(HiveContext.scala:66)
at org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse.apply(HiveContext.scala:66)
at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState.apply(ClientWrapper.scala:279)
at org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1(ClientWrapper.scala:226)
at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:225)
at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:268)
at org.apache.spark.sql.hive.HiveQLDialect.parse(HiveContext.scala:65)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:211)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:211)
at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others.apply(SparkSQLParser.scala:114)
at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others.apply(SparkSQLParser.scala:113)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$$anonfun$apply.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$$anonfun$apply.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$$anon$$anonfun$apply.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$$anonfun$apply.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon.apply(Parsers.scala:890)
at scala.util.parsing.combinator.PackratParsers$$anon.apply(PackratParsers.scala:110)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:208)
at org.apache.spark.sql.SQLContext$$anonfun.apply(SQLContext.scala:208)
at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:43)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231)
at org.apache.spark.sql.hive.HiveContext.parseSql(HiveContext.scala:331)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sparklyr.Invoke$.invoke(invoke.scala:102)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
at sparklyr.StreamHandler$.read(stream.scala:62)
at sparklyr.BackendHandler.channelRead0(handler.scala:52)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChann
如果我切换到 Spark 2.0.1,这个错误就会消失。对于这个问题,假设我被迫使用 1.6。是否有支持的方式来执行此连接?
LEFT ANTI JOIN
可以换成FULL OUTER JOIN
然后选择:
df1 <- copy_to(sc,
data.frame(id=c(1, 2, 3), x=c("a", "b", "c")),
name="df1", overwrite=TRUE)
df2 <- copy_to(sc,
data.frame(id=c(1, 3), y=c(2, -2)),
name="df2", overwrite=TRUE)
df1 %>%
full_join(df2 %>% mutate(id_ = id), by="id") %>%
filter(is.null(id_)) %>%
select(one_of(colnames(df1)))
如果有重复的列名,您也必须更正。
请注意,您不应该:
- 使用
row_number()
生成全局 ID - 它不会缩放并且不提供所需的正确性保证。 - 在 Spark 数据帧上使用
copy_to
。它将数据收集到本地节点,因此不适用于大型数据集。