使用 sparklyr::spark_read_json 时添加文件名
Adding name of file when using sparklyr::spark_read_json
我有数百万个 json 文件,其中每个文件包含相同数量的列,比方说 x
和 y
。请注意,x
和 y
的长度对于单个文件是相等的,但在比较两个不同的文件时可能不同。
问题是唯一分隔数据的是文件名。因此,在合并文件时,我希望将文件名作为第三列包含在内。这可能使用 sparklyr::spark_read_json
,即使用通配符时吗?
MWE:
library(sparklyr)
## Spark connection
sc <- spark_connect(master = "local", version = "2.1.0")
## Create data
data_dir <- tempdir()
tbl_json1 <- data.frame(x = 1:3, y = 1:3)
tbl_json2 <- data.frame(x = 1:10, y = 1:10)
## Write data to disk
write(jsonlite::toJSON(tbl_json1), sprintf("%s/tab1.json", data_dir))
write(jsonlite::toJSON(tbl_json2), sprintf("%s/tab2.json", data_dir))
## Read both files using wildcard
combined_table <- spark_read_json(
sc,
name = "combined_table",
path = sprintf("%s/*.json", data_dir)
)
## Tranfer results to R
library(dplyr)
dt <- combined_table %>% collect
# # A tibble: 13 x 2
# x y
# <dbl> <dbl>
# 1 1. 1.
# 2 2. 2.
# 3 3. 3.
# 4 4. 4.
# 5 5. 5.
# 6 6. 6.
# 7 7. 7.
# 8 8. 8.
# 9 9. 9.
# 10 10. 10.
# 11 1. 1.
# 12 2. 2.
# 13 3. 3.
想要输出
# # A tibble: 13 x 2
# x y id
# <dbl> <dbl> <chr>
# 1 1. 1. tab2
# 2 2. 2. tab2
# 3 3. 3. tab2
# 4 4. 4. tab2
# 5 5. 5. tab2
# 6 6. 6. tab2
# 7 7. 7. tab2
# 8 8. 8. tab2
# 9 9. 9. tab2
# 10 10. 10. tab2
# 11 1. 1. tab1
# 12 2. 2. tab1
# 13 3. 3. tab1
您可以禁用预缓存(无论如何您真的应该):
combined_table <- spark_read_json(
sc,
name = "combined_table",
path = sprintf("%s/*.json", data_dir),
memory=FALSE
)
combined_table %>% mutate(id = input_file_name())
# Source: lazy query [?? x 3]
# Database: spark_connection
x y id
<dbl> <dbl> <chr>
1 1 1 file:///tmp/RtmpnIAUek/tab2.json
2 2 2 file:///tmp/RtmpnIAUek/tab2.json
3 3 3 file:///tmp/RtmpnIAUek/tab2.json
4 4 4 file:///tmp/RtmpnIAUek/tab2.json
5 5 5 file:///tmp/RtmpnIAUek/tab2.json
6 6 6 file:///tmp/RtmpnIAUek/tab2.json
7 7 7 file:///tmp/RtmpnIAUek/tab2.json
8 8 8 file:///tmp/RtmpnIAUek/tab2.json
9 9 9 file:///tmp/RtmpnIAUek/tab2.json
10 10 10 file:///tmp/RtmpnIAUek/tab2.json
# ... with more rows
如有需要,可与Hive's parse_url
UDF结合使用:
combined_table %>% mutate(id = parse_url(input_file_name(), "FILE"))
# Source: lazy query [?? x 3]
# Database: spark_connection
x y id
<dbl> <dbl> <chr>
1 1 1 /tmp/RtmpnIAUek/tab2.json
2 2 2 /tmp/RtmpnIAUek/tab2.json
3 3 3 /tmp/RtmpnIAUek/tab2.json
4 4 4 /tmp/RtmpnIAUek/tab2.json
5 5 5 /tmp/RtmpnIAUek/tab2.json
6 6 6 /tmp/RtmpnIAUek/tab2.json
7 7 7 /tmp/RtmpnIAUek/tab2.json
8 8 8 /tmp/RtmpnIAUek/tab2.json
9 9 9 /tmp/RtmpnIAUek/tab2.json
10 10 10 /tmp/RtmpnIAUek/tab2.json
# ... with more rows
并且您可以使用其他字符串处理函数来提取单个信息位。
我有数百万个 json 文件,其中每个文件包含相同数量的列,比方说 x
和 y
。请注意,x
和 y
的长度对于单个文件是相等的,但在比较两个不同的文件时可能不同。
问题是唯一分隔数据的是文件名。因此,在合并文件时,我希望将文件名作为第三列包含在内。这可能使用 sparklyr::spark_read_json
,即使用通配符时吗?
MWE:
library(sparklyr)
## Spark connection
sc <- spark_connect(master = "local", version = "2.1.0")
## Create data
data_dir <- tempdir()
tbl_json1 <- data.frame(x = 1:3, y = 1:3)
tbl_json2 <- data.frame(x = 1:10, y = 1:10)
## Write data to disk
write(jsonlite::toJSON(tbl_json1), sprintf("%s/tab1.json", data_dir))
write(jsonlite::toJSON(tbl_json2), sprintf("%s/tab2.json", data_dir))
## Read both files using wildcard
combined_table <- spark_read_json(
sc,
name = "combined_table",
path = sprintf("%s/*.json", data_dir)
)
## Tranfer results to R
library(dplyr)
dt <- combined_table %>% collect
# # A tibble: 13 x 2
# x y
# <dbl> <dbl>
# 1 1. 1.
# 2 2. 2.
# 3 3. 3.
# 4 4. 4.
# 5 5. 5.
# 6 6. 6.
# 7 7. 7.
# 8 8. 8.
# 9 9. 9.
# 10 10. 10.
# 11 1. 1.
# 12 2. 2.
# 13 3. 3.
想要输出
# # A tibble: 13 x 2
# x y id
# <dbl> <dbl> <chr>
# 1 1. 1. tab2
# 2 2. 2. tab2
# 3 3. 3. tab2
# 4 4. 4. tab2
# 5 5. 5. tab2
# 6 6. 6. tab2
# 7 7. 7. tab2
# 8 8. 8. tab2
# 9 9. 9. tab2
# 10 10. 10. tab2
# 11 1. 1. tab1
# 12 2. 2. tab1
# 13 3. 3. tab1
您可以禁用预缓存(无论如何您真的应该):
combined_table <- spark_read_json(
sc,
name = "combined_table",
path = sprintf("%s/*.json", data_dir),
memory=FALSE
)
combined_table %>% mutate(id = input_file_name())
# Source: lazy query [?? x 3]
# Database: spark_connection
x y id
<dbl> <dbl> <chr>
1 1 1 file:///tmp/RtmpnIAUek/tab2.json
2 2 2 file:///tmp/RtmpnIAUek/tab2.json
3 3 3 file:///tmp/RtmpnIAUek/tab2.json
4 4 4 file:///tmp/RtmpnIAUek/tab2.json
5 5 5 file:///tmp/RtmpnIAUek/tab2.json
6 6 6 file:///tmp/RtmpnIAUek/tab2.json
7 7 7 file:///tmp/RtmpnIAUek/tab2.json
8 8 8 file:///tmp/RtmpnIAUek/tab2.json
9 9 9 file:///tmp/RtmpnIAUek/tab2.json
10 10 10 file:///tmp/RtmpnIAUek/tab2.json
# ... with more rows
如有需要,可与Hive's parse_url
UDF结合使用:
combined_table %>% mutate(id = parse_url(input_file_name(), "FILE"))
# Source: lazy query [?? x 3]
# Database: spark_connection
x y id
<dbl> <dbl> <chr>
1 1 1 /tmp/RtmpnIAUek/tab2.json
2 2 2 /tmp/RtmpnIAUek/tab2.json
3 3 3 /tmp/RtmpnIAUek/tab2.json
4 4 4 /tmp/RtmpnIAUek/tab2.json
5 5 5 /tmp/RtmpnIAUek/tab2.json
6 6 6 /tmp/RtmpnIAUek/tab2.json
7 7 7 /tmp/RtmpnIAUek/tab2.json
8 8 8 /tmp/RtmpnIAUek/tab2.json
9 9 9 /tmp/RtmpnIAUek/tab2.json
10 10 10 /tmp/RtmpnIAUek/tab2.json
# ... with more rows
并且您可以使用其他字符串处理函数来提取单个信息位。