Sparklyr:如何在 Spark table 中将列表列分解为自己的列?
Sparklyr: how to explode a list column into their own columns in Spark table?
我的问题与 here 中的问题类似,但我在执行答案时遇到了问题,我无法在该主题中发表评论。
所以,我有一个包含嵌套数据的大 CSV 文件,其中包含由空格分隔的 2 列(比如第一列是 Y,第二列是 X)。 X 列本身也是逗号分隔值。
21.66 2.643227,1.2698358,2.6338573,1.8812188,3.8708665,...
35.15 3.422151,-0.59515584,2.4994135,-0.19701914,4.0771823,...
15.22 2.8302398,1.9080592,-0.68780196,3.1878228,4.6600842,...
...
我想使用 sparklyr
将此 CSV 读入 2 个不同的 Spark tables。
到目前为止,这就是我一直在做的事情:
使用spark_read_csv
将所有CSV内容导入Spark数据table
df = spark_read_csv(sc, path = "path", name = "simData", delimiter = " ", header = "false", infer_schema = "false")
结果是名为 simData
的 Spark table,包含 2 列:C0
和 C1
使用dplyr
到select第一列和第二列,然后将它们注册为新的table分别命名为Y和X
simY <- df %>% select(C0) %>% sdf_register("simY")
simX <- df %>% select(C1) %>% sdf_register("simX")
用ft_regex_tokenizer
函数拆分simX
中的值,关于here中写的答案。
ft_regex_tokenizer(input_DF, input.col = "COL", output.col = "ResultCols", pattern = '\###')
但是当我尝试 head
它使用 dplyr
:
Source: query [6 x 1]
Database: spark connection master=yarn-client app=sparklyr local=FALSE
Result
<list>
1 <list [789]>
2 <list [789]>
3 <list [789]>
4 <list [789]>
5 <list [789]>
6 <list [789]>
我想把它变成一个新的 Spark table 并将类型转换为 double。有什么办法吗?
我考虑过将 collect
数据放入 R(使用 dplyr
),转换为矩阵,然后对每一行执行 strsplit
,但我认为这不是解决方案,因为 CSV大小可以达到 40GB。
编辑:Spark 版本为 1.6.0
假设您的数据如下所示
library(dplyr)
library(sparklyr)
df <- data.frame(text = c("1.0,2.0,3.0", "4.0,5.0,6.0"))
sdf <- copy_to(sc, df, "df", overwrite = TRUE)
并且您已经创建了一个 spark_connection
您可以执行以下操作
n <- 3
# There is no function syntax for array access in Hive
# so we have to build [] expressions
# CAST(... AS double) could be handled in sparklyr / dplyr with as.numeric
exprs <- lapply(
0:(n - 1),
function(i) paste("CAST(bits[", i, "] AS double) AS x", i, sep=""))
sdf %>%
# Convert to Spark DataFrame
spark_dataframe() %>%
# Use expression with split and explode
invoke("selectExpr", list("split(text, ',') AS bits")) %>%
# Select individual columns
invoke("selectExpr", exprs) %>%
# Register table in the metastore ("registerTempTable" in Spark 1.x)
invoke("createOrReplaceTempView", "exploded_df")
并使用dplyr::tbl
取回sparklyr
对象:
tbl(sc, "exploded_df")
Source: query [2 x 3]
Database: spark connection master=local[8] app=sparklyr local=TRUE
x0 x1 x2
<dbl> <dbl> <dbl>
1 1 2 3
2 4 5 6
在最新版本中您还可以使用sdf_separate_column
:
sdf %>%
mutate(text=split(text, ",")) %>%
sdf_separate_column("text", paste0("x", 0:2))
# Source: table<sparklyr_tmp_87125f13b89> [?? x 4]
# Database: spark_connection
text x0 x1 x2
<list> <chr> <chr> <chr>
1 <list [3]> 1.0 2.0 3.0
2 <list [3]> 4.0 5.0 6.0
我的问题与 here 中的问题类似,但我在执行答案时遇到了问题,我无法在该主题中发表评论。
所以,我有一个包含嵌套数据的大 CSV 文件,其中包含由空格分隔的 2 列(比如第一列是 Y,第二列是 X)。 X 列本身也是逗号分隔值。
21.66 2.643227,1.2698358,2.6338573,1.8812188,3.8708665,...
35.15 3.422151,-0.59515584,2.4994135,-0.19701914,4.0771823,...
15.22 2.8302398,1.9080592,-0.68780196,3.1878228,4.6600842,...
...
我想使用 sparklyr
将此 CSV 读入 2 个不同的 Spark tables。
到目前为止,这就是我一直在做的事情:
使用
spark_read_csv
将所有CSV内容导入Spark数据tabledf = spark_read_csv(sc, path = "path", name = "simData", delimiter = " ", header = "false", infer_schema = "false")
结果是名为
simData
的 Spark table,包含 2 列:C0
和C1
使用
dplyr
到select第一列和第二列,然后将它们注册为新的table分别命名为Y和XsimY <- df %>% select(C0) %>% sdf_register("simY")
simX <- df %>% select(C1) %>% sdf_register("simX")
用
ft_regex_tokenizer
函数拆分simX
中的值,关于here中写的答案。ft_regex_tokenizer(input_DF, input.col = "COL", output.col = "ResultCols", pattern = '\###')
但是当我尝试 head
它使用 dplyr
:
Source: query [6 x 1]
Database: spark connection master=yarn-client app=sparklyr local=FALSE
Result
<list>
1 <list [789]>
2 <list [789]>
3 <list [789]>
4 <list [789]>
5 <list [789]>
6 <list [789]>
我想把它变成一个新的 Spark table 并将类型转换为 double。有什么办法吗?
我考虑过将 collect
数据放入 R(使用 dplyr
),转换为矩阵,然后对每一行执行 strsplit
,但我认为这不是解决方案,因为 CSV大小可以达到 40GB。
编辑:Spark 版本为 1.6.0
假设您的数据如下所示
library(dplyr)
library(sparklyr)
df <- data.frame(text = c("1.0,2.0,3.0", "4.0,5.0,6.0"))
sdf <- copy_to(sc, df, "df", overwrite = TRUE)
并且您已经创建了一个 spark_connection
您可以执行以下操作
n <- 3
# There is no function syntax for array access in Hive
# so we have to build [] expressions
# CAST(... AS double) could be handled in sparklyr / dplyr with as.numeric
exprs <- lapply(
0:(n - 1),
function(i) paste("CAST(bits[", i, "] AS double) AS x", i, sep=""))
sdf %>%
# Convert to Spark DataFrame
spark_dataframe() %>%
# Use expression with split and explode
invoke("selectExpr", list("split(text, ',') AS bits")) %>%
# Select individual columns
invoke("selectExpr", exprs) %>%
# Register table in the metastore ("registerTempTable" in Spark 1.x)
invoke("createOrReplaceTempView", "exploded_df")
并使用dplyr::tbl
取回sparklyr
对象:
tbl(sc, "exploded_df")
Source: query [2 x 3]
Database: spark connection master=local[8] app=sparklyr local=TRUE
x0 x1 x2
<dbl> <dbl> <dbl>
1 1 2 3
2 4 5 6
在最新版本中您还可以使用sdf_separate_column
:
sdf %>%
mutate(text=split(text, ",")) %>%
sdf_separate_column("text", paste0("x", 0:2))
# Source: table<sparklyr_tmp_87125f13b89> [?? x 4]
# Database: spark_connection
text x0 x1 x2
<list> <chr> <chr> <chr>
1 <list [3]> 1.0 2.0 3.0
2 <list [3]> 4.0 5.0 6.0