在 dplyr 和 sparklyr 中动态类型转换为数字

Dynamically type casting to numeric in dplyr and sparklyr

这个问题的要点是我有一些 R 代码在本地数据帧上运行良好,但在 Spark 数据帧上失败,即使这两个表在其他方面是相同的。

在 R 中,给定一个包含所有字符列的数据框,可以将所有列动态类型转换为数字,可以使用以下代码安全地转换为数字:

require(dplyr)
require(varhandle)
require(sparklyr)

checkNumeric <- function(column)
{
  column %>% as.data.frame %>% .[,1] %>% varhandle::check.numeric(.) %>% all
}

typeCast <- function(df)
{ 
  columns <- colnames(df)
  numericIdx <- df %>% mutate(across(columns, checkNumeric)) %>% .[1,]
  doThese <- columns[which(numericIdx==T)]
  
  df <- df %>% mutate_at(all_of(vars(doThese)), as.numeric)
  return(df)
}

举个简单的例子,可以 运行:

df <- iris
df$Sepal.Length <- as.character(df$Sepal.Length)
newDF <- df %>% typeCast
class(df$Sepal.Length)
class(newDF$Sepal.Length)

现在,此代码不适用于像 starwars 这样具有复合列的数据集。但是对于其他数据帧,我希望这段代码在 Spark 数据帧上工作得很好。它没有。即:

sc <- spark_connect('yarn', config=config) # define your Spark configuration somewhere, that's outside the scope of this question
df <- copy_to(sc, iris, "iris")
newDF <- df %>% typeCast

将失败并出现以下错误。

Error in .[1, ] : incorrect number of dimensions

调试时,如果我们尝试运行这段代码:

columns <- colnames(df)
df %>% mutate(across(columns, checkNumeric))

返回此错误: UseMethod("escape") 错误: 没有适用于 'escape' 的方法应用于 class“函数”

的对象

什么给了?为什么代码在本地数据框架上可以正常工作,但在 Spark 数据框架上却不行?

我本身没有找到确切的解决方案,但我确实找到了解决方法。

typeCheckPartition <- function(df)
{
  require(dplyr)
  require(varhandle)
  checkNumeric <- function(column)
  {
    column %>% as.data.frame %>% .[,1] %>% varhandle::check.numeric(.) %>% all
  }
  
  # this works on non-spark data frames
  columns <- colnames(df)
  numericIdx <- df %>% mutate(across(all_of(columns), checkNumeric)) %>% .[1,]
  
  return(numericIdx)
}

typeCastSpark <- function(df, max_partitions = 1000, undo_coalesce = T)
{
  # numericIdxDf will have these dimensions: num_partition rows x num_columns
  # so long as num_columns is not absurd, this coalesce should make collect a safe operation
  num_partitions <- sdf_num_partitions(df)
  if (num_partitions > max_partitions)
  {
    undo_coalesce <- T && undo_coalesce
    df <- df %>% sdf_coalesce(max_partitions)
  } else
  {
    undo_coalesce <- F
  }
  
  columns <- colnames(df)
  numericIdxDf <- df %>% spark_apply(typeCheckPartition, packages=T) %>% collect
  numericIdx <- numericIdxDf %>% as.data.frame %>% apply(2, all)
  
  doThese <- columns[which(numericIdx==T)]
  df <- df %>% mutate_at(all_of(vars(doThese)), as.numeric)
  
  if (undo_coalesce)
    df <- df %>% sdf_repartition(num_partitions)
  
  return(df)
}

只需 运行 针对您的数据框的 typeCastSpark 函数,它会将所有列类型转换为数字(可以是)。