为什么 sparklyr::spark_apply 在指定数字模式时失败
why does sparklyr::spark_apply fail when specifying numeric schema
给定火花连接 sc
iris_spk <- copy_to(sc, iris)
接下来我举个傻逼的例子spark_apply
iris_spk %>%
spark_apply(
function(x) {
data.frame(A=c("a", "b", "c"), B=c(1, 2, 3))
},
group_by = "Species",
columns = c("A", "B"),
packages = FALSE
)
# # Source: table<sparklyr_tmp_3e96258604cd> [?? x 3]
# # Database: spark_connection
# Species A B
# <chr> <chr> <dbl>
# 1 versicolor a 1.00
# 2 versicolor b 2.00
# 3 versicolor c 3.00
# 4 virginica a 1.00
# 5 virginica b 2.00
# 6 virginica c 3.00
# 7 setosa a 1.00
# 8 setosa b 2.00
# 9 setosa c 3.00
到目前为止,还不错。但是 建议我可以通过指定输出模式而不是仅输出列名来提高性能。所以我尝试了:
iris_spk %>%
spark_apply(
function(x) {
data.frame(A=c("a", "b", "c"), B=c(1, 2, 3))
},
group_by = "Species",
columns = list(A="character",
B="numeric"),
packages = FALSE
)
但事情出了问题:
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 26.0 failed 4 times, most recent failure: Lost task 1.3 in stage 26.0 (TID 133, ml-dn38.mitre.org, executor 3): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of double
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, A), StringType), true) AS A#256
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, B), DoubleType) AS B#257
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
at org.apache.spark.sql.SparkSession$$anonfun.apply(SparkSession.scala:581)
at org.apache.spark.sql.SparkSession$$anonfun.apply(SparkSession.scala:581)
at scala.collection.Iterator$$anon.next(Iterator.scala:409) at scala.collection.Iterator$$anon.next(Iterator.scala:409)
... and so on
我是否错误地指定了架构?
啊!我认为 group_by
列不会从输入数据框中继承它的模式,但需要与其他列一起声明。我刚试过
iris_spk %>%
spark_apply(
function(x) {
data.frame(A=c("a", "b", "c"), B=c(1, 2, 3))
},
group_by = "Species",
columns = list(Species="character",
A="character",
B="numeric"),
packages = FALSE
)
哪个有效(与上面第一次尝试的结果相同)
给定火花连接 sc
iris_spk <- copy_to(sc, iris)
接下来我举个傻逼的例子spark_apply
iris_spk %>%
spark_apply(
function(x) {
data.frame(A=c("a", "b", "c"), B=c(1, 2, 3))
},
group_by = "Species",
columns = c("A", "B"),
packages = FALSE
)
# # Source: table<sparklyr_tmp_3e96258604cd> [?? x 3]
# # Database: spark_connection
# Species A B
# <chr> <chr> <dbl>
# 1 versicolor a 1.00
# 2 versicolor b 2.00
# 3 versicolor c 3.00
# 4 virginica a 1.00
# 5 virginica b 2.00
# 6 virginica c 3.00
# 7 setosa a 1.00
# 8 setosa b 2.00
# 9 setosa c 3.00
到目前为止,还不错。但是
iris_spk %>%
spark_apply(
function(x) {
data.frame(A=c("a", "b", "c"), B=c(1, 2, 3))
},
group_by = "Species",
columns = list(A="character",
B="numeric"),
packages = FALSE
)
但事情出了问题:
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 26.0 failed 4 times, most recent failure: Lost task 1.3 in stage 26.0 (TID 133, ml-dn38.mitre.org, executor 3): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of double
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, A), StringType), true) AS A#256
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, B), DoubleType) AS B#257
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
at org.apache.spark.sql.SparkSession$$anonfun.apply(SparkSession.scala:581)
at org.apache.spark.sql.SparkSession$$anonfun.apply(SparkSession.scala:581)
at scala.collection.Iterator$$anon.next(Iterator.scala:409) at scala.collection.Iterator$$anon.next(Iterator.scala:409)
... and so on
我是否错误地指定了架构?
啊!我认为 group_by
列不会从输入数据框中继承它的模式,但需要与其他列一起声明。我刚试过
iris_spk %>%
spark_apply(
function(x) {
data.frame(A=c("a", "b", "c"), B=c(1, 2, 3))
},
group_by = "Species",
columns = list(Species="character",
A="character",
B="numeric"),
packages = FALSE
)
哪个有效(与上面第一次尝试的结果相同)