使用 sparklyr::spark_read_json 时添加文件名

Adding name of file when using sparklyr::spark_read_json

我有数百万个 json 文件,其中每个文件包含相同数量的列,比方说 xy。请注意,xy 的长度对于单个文件是相等的,但在比较两个不同的文件时可能不同。

问题是唯一分隔数据的是文件名。因此,在合并文件时,我希望将文件名作为第三列包含在内。这可能使用 sparklyr::spark_read_json,即使用通配符时吗?

MWE:

library(sparklyr)

## Spark connection
sc <- spark_connect(master = "local", version = "2.1.0")

## Create data
data_dir <- tempdir()
tbl_json1 <- data.frame(x = 1:3, y = 1:3)
tbl_json2 <- data.frame(x = 1:10, y = 1:10)

## Write data to disk
write(jsonlite::toJSON(tbl_json1), sprintf("%s/tab1.json", data_dir))
write(jsonlite::toJSON(tbl_json2), sprintf("%s/tab2.json", data_dir))

## Read both files using wildcard 
combined_table <- spark_read_json(
    sc, 
    name = "combined_table", 
    path = sprintf("%s/*.json", data_dir)
)

## Tranfer results to R
library(dplyr)
dt <- combined_table %>% collect

# # A tibble: 13 x 2
#       x     y
#     <dbl> <dbl>
#  1    1.    1.
#  2    2.    2.
#  3    3.    3.
#  4    4.    4.
#  5    5.    5.
#  6    6.    6.
#  7    7.    7.
#  8    8.    8.
#  9    9.    9.
# 10   10.   10.
# 11    1.    1.
# 12    2.    2.
# 13    3.    3.

想要输出

# # A tibble: 13 x 2
#       x     y     id
#     <dbl> <dbl> <chr>
#  1    1.    1.    tab2
#  2    2.    2.    tab2
#  3    3.    3.    tab2
#  4    4.    4.    tab2
#  5    5.    5.    tab2
#  6    6.    6.    tab2
#  7    7.    7.    tab2
#  8    8.    8.    tab2
#  9    9.    9.    tab2
# 10   10.   10.    tab2
# 11    1.    1.    tab1
# 12    2.    2.    tab1
# 13    3.    3.    tab1

您可以禁用预缓存(无论如何您真的应该):

combined_table <- spark_read_json(
  sc, 
  name = "combined_table", 
  path = sprintf("%s/*.json", data_dir),
  memory=FALSE
)

并使用 input_file_name function:

combined_table %>% mutate(id = input_file_name())
# Source:   lazy query [?? x 3]
# Database: spark_connection
       x     y id                              
   <dbl> <dbl> <chr>                           
 1     1     1 file:///tmp/RtmpnIAUek/tab2.json
 2     2     2 file:///tmp/RtmpnIAUek/tab2.json
 3     3     3 file:///tmp/RtmpnIAUek/tab2.json
 4     4     4 file:///tmp/RtmpnIAUek/tab2.json
 5     5     5 file:///tmp/RtmpnIAUek/tab2.json
 6     6     6 file:///tmp/RtmpnIAUek/tab2.json
 7     7     7 file:///tmp/RtmpnIAUek/tab2.json
 8     8     8 file:///tmp/RtmpnIAUek/tab2.json
 9     9     9 file:///tmp/RtmpnIAUek/tab2.json
10    10    10 file:///tmp/RtmpnIAUek/tab2.json
# ... with more rows

如有需要,可与Hive's parse_url UDF结合使用:

combined_table %>% mutate(id = parse_url(input_file_name(), "FILE"))
# Source:   lazy query [?? x 3]
# Database: spark_connection
       x     y id                       
   <dbl> <dbl> <chr>                    
 1     1     1 /tmp/RtmpnIAUek/tab2.json
 2     2     2 /tmp/RtmpnIAUek/tab2.json
 3     3     3 /tmp/RtmpnIAUek/tab2.json
 4     4     4 /tmp/RtmpnIAUek/tab2.json
 5     5     5 /tmp/RtmpnIAUek/tab2.json
 6     6     6 /tmp/RtmpnIAUek/tab2.json
 7     7     7 /tmp/RtmpnIAUek/tab2.json
 8     8     8 /tmp/RtmpnIAUek/tab2.json
 9     9     9 /tmp/RtmpnIAUek/tab2.json
10    10    10 /tmp/RtmpnIAUek/tab2.json
# ... with more rows

并且您可以使用其他字符串处理函数来提取单个信息位。