如何使用 Sparklyr 包将不同数据类型的数据展平?
How to flatten the data of different data types by using Sparklyr package?
简介
R 代码是使用 Sparklyr 包编写的,用于创建数据库模式。 [给出了可重现的代码和数据库]
现有结果
root
|-- contributors : string
|-- created_at : string
|-- entities (struct)
| |-- hashtags (array) : [string]
| |-- media (array)
| | |-- additional_media_info (struct)
| | | |-- description : string
| | | |-- embeddable : boolean
| | | |-- monetizable : bollean
| | |-- diplay_url : string
| | |-- id : long
| | |-- id_str : string
| |-- urls (array)
|-- extended_entities (struct)
|-- retweeted_status (struct)
|-- user (struct)
我想将这个结构展平如下,
预期结果
root
|-- contributors : string
|-- created_at : string
|-- entities (struct)
|-- entities.hashtags (array) : [string]
|-- entities.media (array)
|-- entities.media.additional_media_info (struct)
|-- entities.media.additional_media_info.description : string
|-- entities.media.additional_media_info.embeddable : boolean
|-- entities.media.additional_media_info.monetizable : bollean
|-- entities.media.diplay_url : string
|-- entities.media.id : long
|-- entities.media.id_str : string
|-- entities.urls (array)
|-- extended_entities (struct)
|-- retweeted_status (struct)
|-- user (struct)
数据库 导航至:
Data-178 KB。然后将编号的项目复制到名为 "example" 的文本文件中。保存到在您的工作目录中创建的名为“../example.json/”的目录。
编写 R 代码以重现如下示例,
退出代码
library(sparklyr)
library(dplyr)
library(devtools)
devtools::install_github("mitre/sparklyr.nested")
# If Spark is not installed, then also need:
# spark_install(version = "2.2.0")
library(sparklyr.nested)
library(testthat)
library(jsonlite)
Sys.setenv(SPARK_HOME="/usr/lib/spark")
conf <- spark_config()
conf$'sparklyr.shell.executor-memory' <- "20g"
conf$'sparklyr.shell.driver-memory' <- "20g"
conf$spark.executor.cores <- 16
conf$spark.executor.memory <- "20G"
conf$spark.yarn.am.cores <- 16
conf$spark.yarn.am.memory <- "20G"
conf$spark.executor.instances <- 8
conf$spark.dynamicAllocation.enabled <- "false"
conf$maximizeResourceAllocation <- "true"
conf$spark.default.parallelism <- 32
sc <- spark_connect(master = "local", config = conf, version = '2.2.0') # Connection
sample_tbl <- spark_read_json(sc,name="example",path="example.json", header = TRUE, memory = FALSE, overwrite = TRUE)
sdf_schema_viewer(sample_tbl) # to create db schema
付出的努力
使用 jsonlite。但它也无法读取大文件和块内的文件。花了很多时间。所以,我转向了 Sparklyr,因为它确实想知道并在几秒钟内读取了 10 亿条记录。我进一步研究了将记录展平到深层嵌套级别(因为展平是通过使用 flatten()
函数在 jsonlite 包中完成的)。但是,在 Sparklyr 中,没有这样的功能可用。在 Sparklyr 中,只有一级扁平化是可能的。
我想对不同数据类型的数据进行扁平化处理,并希望输出到CSV文件中。
好的,这是取消嵌套的一种可能方法。
您可以使用架构信息来创建所有嵌套名称。比如entities.media.additional_media_info
,那么你就可以用SQL来select了。
这有点费力,可能无法一概而论,但有效
我认为这也应该很快,因为它只是一个 SELECT
语句。
columns_to_flatten <- sdf_schema_json(sample_tbl, simplify = T) %>%
# using rlist package for ease of use
rlist::list.flatten(use.names = T) %>%
# get names
names() %>%
# remove contents of brackets and whitespace
gsub("\(.*?\)|\s", "", .) %>%
# add alias for column names, dot replaced with double underscore
# this avoids duplicate names that would otherwise occur with singular
{paste(., "AS", gsub("\.", "__", .))} %>%
# required, otherwise doesn't seem to work
sub("variants", "variants[0]", .)
# construct query
sql_statement <- paste("SELECT",
paste(columns_to_flatten, collapse = ", "),
"FROM example")
# execute on spark cluster, save as table in cluster
spark_session(sc) %>%
sparklyr::invoke("sql", sql_statement) %>%
sparklyr::invoke("createOrReplaceTempView", "flattened_example")
tbl(sc, "flattened_example") %>%
sdf_schema_viewer()
生成的SQL是这样的,比较简单,就是长:
SELECT contributors AS contributors, coordinates AS coordinates, created_at AS created_at, display_text_range AS display_text_range, entities.hashtags.indices AS entities__hashtags__indices, entities.hashtags.text AS entities__hashtags__text, entities.media.additional_media_info.description AS entities__media__additional_media_info__description, entities.media.additional_media_info.embeddable AS entities__media__additional_media_info__embeddable, entities.media.additional_media_info.monetizable AS entities__media__additional_media_info__monetizable, entities.media.additional_media_info.title AS entities__media__additional_media_info__title, entities.media.display_url AS entities__media__display_url, entities.media.expanded_url AS entities__media__expanded_url, entities.media.id AS entities__media__id, entities.media.id_str AS entities__media__id_str, entities.media.indices AS entities__media__indices, entities.media.media_url AS entities__media__media_url, entities.media.media_url_https AS entities__media__media_url_https, entities.media.sizes.large.h AS entities__media__sizes__large__h, entities.media.sizes.large.resize AS entities__media__sizes__large__resize, entities.media.sizes.large.w AS entities__media__sizes__large__w FROM example
简介
R 代码是使用 Sparklyr 包编写的,用于创建数据库模式。 [给出了可重现的代码和数据库]
现有结果
root
|-- contributors : string
|-- created_at : string
|-- entities (struct)
| |-- hashtags (array) : [string]
| |-- media (array)
| | |-- additional_media_info (struct)
| | | |-- description : string
| | | |-- embeddable : boolean
| | | |-- monetizable : bollean
| | |-- diplay_url : string
| | |-- id : long
| | |-- id_str : string
| |-- urls (array)
|-- extended_entities (struct)
|-- retweeted_status (struct)
|-- user (struct)
我想将这个结构展平如下,
预期结果
root
|-- contributors : string
|-- created_at : string
|-- entities (struct)
|-- entities.hashtags (array) : [string]
|-- entities.media (array)
|-- entities.media.additional_media_info (struct)
|-- entities.media.additional_media_info.description : string
|-- entities.media.additional_media_info.embeddable : boolean
|-- entities.media.additional_media_info.monetizable : bollean
|-- entities.media.diplay_url : string
|-- entities.media.id : long
|-- entities.media.id_str : string
|-- entities.urls (array)
|-- extended_entities (struct)
|-- retweeted_status (struct)
|-- user (struct)
数据库 导航至: Data-178 KB。然后将编号的项目复制到名为 "example" 的文本文件中。保存到在您的工作目录中创建的名为“../example.json/”的目录。
编写 R 代码以重现如下示例,
退出代码
library(sparklyr)
library(dplyr)
library(devtools)
devtools::install_github("mitre/sparklyr.nested")
# If Spark is not installed, then also need:
# spark_install(version = "2.2.0")
library(sparklyr.nested)
library(testthat)
library(jsonlite)
Sys.setenv(SPARK_HOME="/usr/lib/spark")
conf <- spark_config()
conf$'sparklyr.shell.executor-memory' <- "20g"
conf$'sparklyr.shell.driver-memory' <- "20g"
conf$spark.executor.cores <- 16
conf$spark.executor.memory <- "20G"
conf$spark.yarn.am.cores <- 16
conf$spark.yarn.am.memory <- "20G"
conf$spark.executor.instances <- 8
conf$spark.dynamicAllocation.enabled <- "false"
conf$maximizeResourceAllocation <- "true"
conf$spark.default.parallelism <- 32
sc <- spark_connect(master = "local", config = conf, version = '2.2.0') # Connection
sample_tbl <- spark_read_json(sc,name="example",path="example.json", header = TRUE, memory = FALSE, overwrite = TRUE)
sdf_schema_viewer(sample_tbl) # to create db schema
付出的努力
使用 jsonlite。但它也无法读取大文件和块内的文件。花了很多时间。所以,我转向了 Sparklyr,因为它确实想知道并在几秒钟内读取了 10 亿条记录。我进一步研究了将记录展平到深层嵌套级别(因为展平是通过使用 flatten()
函数在 jsonlite 包中完成的)。但是,在 Sparklyr 中,没有这样的功能可用。在 Sparklyr 中,只有一级扁平化是可能的。
我想对不同数据类型的数据进行扁平化处理,并希望输出到CSV文件中。
好的,这是取消嵌套的一种可能方法。
您可以使用架构信息来创建所有嵌套名称。比如entities.media.additional_media_info
,那么你就可以用SQL来select了。
这有点费力,可能无法一概而论,但有效
我认为这也应该很快,因为它只是一个 SELECT
语句。
columns_to_flatten <- sdf_schema_json(sample_tbl, simplify = T) %>%
# using rlist package for ease of use
rlist::list.flatten(use.names = T) %>%
# get names
names() %>%
# remove contents of brackets and whitespace
gsub("\(.*?\)|\s", "", .) %>%
# add alias for column names, dot replaced with double underscore
# this avoids duplicate names that would otherwise occur with singular
{paste(., "AS", gsub("\.", "__", .))} %>%
# required, otherwise doesn't seem to work
sub("variants", "variants[0]", .)
# construct query
sql_statement <- paste("SELECT",
paste(columns_to_flatten, collapse = ", "),
"FROM example")
# execute on spark cluster, save as table in cluster
spark_session(sc) %>%
sparklyr::invoke("sql", sql_statement) %>%
sparklyr::invoke("createOrReplaceTempView", "flattened_example")
tbl(sc, "flattened_example") %>%
sdf_schema_viewer()
生成的SQL是这样的,比较简单,就是长:
SELECT contributors AS contributors, coordinates AS coordinates, created_at AS created_at, display_text_range AS display_text_range, entities.hashtags.indices AS entities__hashtags__indices, entities.hashtags.text AS entities__hashtags__text, entities.media.additional_media_info.description AS entities__media__additional_media_info__description, entities.media.additional_media_info.embeddable AS entities__media__additional_media_info__embeddable, entities.media.additional_media_info.monetizable AS entities__media__additional_media_info__monetizable, entities.media.additional_media_info.title AS entities__media__additional_media_info__title, entities.media.display_url AS entities__media__display_url, entities.media.expanded_url AS entities__media__expanded_url, entities.media.id AS entities__media__id, entities.media.id_str AS entities__media__id_str, entities.media.indices AS entities__media__indices, entities.media.media_url AS entities__media__media_url, entities.media.media_url_https AS entities__media__media_url_https, entities.media.sizes.large.h AS entities__media__sizes__large__h, entities.media.sizes.large.resize AS entities__media__sizes__large__resize, entities.media.sizes.large.w AS entities__media__sizes__large__w FROM example