SparkR - 为 R 函数提取数据帧的数组 <int>
SparkR - extracting dataframe's array<int> for an R function
我有 1000 个传感器,我需要对数据进行分区(即每天每个传感器),然后将每个数据点列表提交给 R 算法)。使用 Spark,简化示例如下所示:
//Spark
val rddData = List(
("1:3", List(1,1,456,1,1,2,480,0,1,3,425,0)),
("1:4", List(1,4,437,1,1,5,490,0)),
("1:6", List(1,6,500,0,1,7,515,1,1,8,517,0,1,9,522,0,1,10,525,0)),
("1:11", List(1,11,610,1))
)
case class DataPoint(
key: String,
value: List[Int]) // 4 value pattern, sensorID:seq#, seq#, value, state
我转换成parquet文件,保存。
在 SparkR 中加载镶木地板,没问题,架构说:
#SparkR
df <- read.df(sqlContext, filespec, "parquet")
schema(df)
StructType
|-name = "key", type = "StringType", nullable = TRUE
|-name = "value", type = "ArrayType(IntegerType,true)", nullable = TRUE
所以在 SparkR 中,我有一个数据框,其中每条记录都包含我想要的所有数据 (df$value)。我想将该数组提取到 R 可以使用的内容中,然后用一个包含结果数组的新列来改变我的原始数据帧(df)。逻辑上类似于 results = function(df$value)。然后我需要将结果(所有行)返回到 SparkR 数据帧中以供输出。
如何从 SparkR 数据帧中提取数组然后用结果进行变异?
设spark数据帧为df
,R数据帧为df_r
要将 sparkR df 转换为 R df,请使用代码
df_r <- collect(df)
使用 R 数据框 df_r
,您可以在 R 中完成所有您想做的计算。
假设您在 df_r$result
列中有结果
Then for converting back to SparkR data frame use code,
#this is a new SparkR data frame, df_1
df_1 <- createDataFrame(sqlContext, df_r)
For adding the result back to SparkR data frame `df` use code
#this adds the df_1$result to a new column df$result
#note that number of rows should be same in df and `df_1`, if not use `join` operation
df$result <- df_1$result
希望这能解决您的问题
我也有这个问题。我解决这个问题的方法是在 spark DataFrame 中添加行索引,然后在 select 语句中使用 explode
。确保 select 索引,然后是 select 语句中所需的行。这将为您提供 "long" 数据框。如果 DataFrame 列中的每个嵌套列表都包含相同数量的信息(例如,如果您正在分解 x,y 坐标的列表列),您会期望长 DataFrame 中的每个行索引出现两次。
完成上述操作后,我通常在展开的 DataFrame 上执行 groupBy(index)
,过滤每个索引的 n()
而不是 等于列表中预期的项目数,并继续对 Spark DataFrame 进行额外的 groupBy、合并、连接、过滤等操作。
Urban Institute 的 GitHub 页面上有一些优秀的指南。祝你好运。 -nate
我有 1000 个传感器,我需要对数据进行分区(即每天每个传感器),然后将每个数据点列表提交给 R 算法)。使用 Spark,简化示例如下所示:
//Spark
val rddData = List(
("1:3", List(1,1,456,1,1,2,480,0,1,3,425,0)),
("1:4", List(1,4,437,1,1,5,490,0)),
("1:6", List(1,6,500,0,1,7,515,1,1,8,517,0,1,9,522,0,1,10,525,0)),
("1:11", List(1,11,610,1))
)
case class DataPoint(
key: String,
value: List[Int]) // 4 value pattern, sensorID:seq#, seq#, value, state
我转换成parquet文件,保存。 在 SparkR 中加载镶木地板,没问题,架构说:
#SparkR
df <- read.df(sqlContext, filespec, "parquet")
schema(df)
StructType
|-name = "key", type = "StringType", nullable = TRUE
|-name = "value", type = "ArrayType(IntegerType,true)", nullable = TRUE
所以在 SparkR 中,我有一个数据框,其中每条记录都包含我想要的所有数据 (df$value)。我想将该数组提取到 R 可以使用的内容中,然后用一个包含结果数组的新列来改变我的原始数据帧(df)。逻辑上类似于 results = function(df$value)。然后我需要将结果(所有行)返回到 SparkR 数据帧中以供输出。
如何从 SparkR 数据帧中提取数组然后用结果进行变异?
设spark数据帧为df
,R数据帧为df_r
要将 sparkR df 转换为 R df,请使用代码
df_r <- collect(df)
使用 R 数据框 df_r
,您可以在 R 中完成所有您想做的计算。
假设您在 df_r$result
Then for converting back to SparkR data frame use code,
#this is a new SparkR data frame, df_1
df_1 <- createDataFrame(sqlContext, df_r)
For adding the result back to SparkR data frame `df` use code
#this adds the df_1$result to a new column df$result
#note that number of rows should be same in df and `df_1`, if not use `join` operation
df$result <- df_1$result
希望这能解决您的问题
我也有这个问题。我解决这个问题的方法是在 spark DataFrame 中添加行索引,然后在 select 语句中使用 explode
。确保 select 索引,然后是 select 语句中所需的行。这将为您提供 "long" 数据框。如果 DataFrame 列中的每个嵌套列表都包含相同数量的信息(例如,如果您正在分解 x,y 坐标的列表列),您会期望长 DataFrame 中的每个行索引出现两次。
完成上述操作后,我通常在展开的 DataFrame 上执行 groupBy(index)
,过滤每个索引的 n()
而不是 等于列表中预期的项目数,并继续对 Spark DataFrame 进行额外的 groupBy、合并、连接、过滤等操作。
Urban Institute 的 GitHub 页面上有一些优秀的指南。祝你好运。 -nate