如何使用带有 SparkR 的箭头包修复 readBin() 中的错误?

How to fix Error in readBin() using arrow package with SparkR?

我正在研究 SparkR,并希望使用 Databricks 中的 arrow 包加快处理速度。但是,在 SparkR::dapply 或 gapply:

之后执行 collect(df) 时出现以下错误
Error in readBin(con, raw(), as.integer(dataLen), endian = "big") : Error in readBin(con, raw(), as.integer(dataLen), endian = "big") : 
  invalid 'n' argument

我用的是SalesRecords500万的数据,只是举个例子。代码如下:

library(SparkR)

SparkR::sparkR.session(sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))

library(arrow)

arrow::arrow_available()

dfSchema <- structType(structField("Region", "string")
                       ,structField("Country", "string")
                       ,structField("ItemType", "string")
                       ,structField("SalesChannel", "string")
                       ,structField("OrderPriority", "string")
                       ,structField("OrderDate", "string")
                       ,structField("OrderID", "int")
                       ,structField("ShipDate", "string")
                       ,structField("UnitsSold", "int")
                       ,structField("UnitPrice", "int")
                       ,structField("UnitCost", "int")
                       ,structField("TotalRevenue", "int")
                       ,structField("TotalCost", "int")
                       ,structField("TotalProfit", "int")
                      )
spark_df <- SparkR::read.df(path="/FileStore/tables/SalesRecords_5m.csv", source="csv", schema=dfSchema)

# Apply an R native function to each partition.
returnSchema <-  structType(structField("UnitsSold", "int"))

df <- SparkR::dapply(spark_df
                    , function(rdf) { data.frame(rdf$UnitsSold + 1) }
                    , returnSchema
                   )

collect(df)

当我通过将 spark.sql.execution.arrow.sparkr.enabled 设置为 false 关闭箭头时,整个代码运行时没有任何错误。因此,箭头不起作用,我该如何解决这个错误?

注意:我正在使用以下版本:Spark 3.1.1,arrow 4.0.1,R 版本 4.0.4

sessionInfo() 的输出是:

R version 4.0.4 (2021-02-15)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 18.04.5 LTS

Matrix products: default
BLAS:   /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.7.1
LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.7.1

locale:
 [1] LC_CTYPE=C.UTF-8       LC_NUMERIC=C           LC_TIME=C.UTF-8       
 [4] LC_COLLATE=C.UTF-8     LC_MONETARY=C.UTF-8    LC_MESSAGES=C.UTF-8   
 [7] LC_PAPER=C.UTF-8       LC_NAME=C              LC_ADDRESS=C          
[10] LC_TELEPHONE=C         LC_MEASUREMENT=C.UTF-8 LC_IDENTIFICATION=C   

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] SparkR_3.1.1

loaded via a namespace (and not attached):
 [1] Rcpp_1.0.5         magrittr_2.0.1     tidyselect_1.1.0   bit_4.0.4         
 [5] xtable_1.8-4       R6_2.5.0           rlang_0.4.9        fastmap_1.0.1     
 [9] hwriter_1.3.2      tools_4.0.4        arrow_4.0.1        htmltools_0.5.0   
[13] bit64_4.0.5        digest_0.6.27      assertthat_0.2.1   Rserve_1.8-7      
[17] shiny_1.5.0        purrr_0.3.4        later_1.1.0.1      hwriterPlus_1.0-3 
[21] vctrs_0.3.5        promises_1.1.1     glue_1.4.2         mime_0.9          
[25] compiler_4.0.4     TeachingDemos_2.10 httpuv_1.5.4 

虽然我不确定具体原因,但我可以为您提供部分解决方案。

这里发生了变量类型不匹配的问题 - 您试图创建一个“int”类型的字段,但那里的代码实际上创建了一个“double”类型的字段。

如果您将 'L' 添加到要添加的值,这有帮助吗?

df <- SparkR::dapply(spark_df
                    , function(rdf) { data.frame(rdf$UnitsSold + 1L) }
                    , returnSchema
                   )