Spark Streaming 到达数据框列并添加查找 Redis 的新列
Spark Streaming reach dataframe columns and add new column looking up to Redis
在我之前的问题中( ) , i succeeded to reach redis with mapparttions thanks to https://whosebug.com/users/689676/fe2s
我尝试使用 mappartitions 但我无法解决一个问题,我如何在迭代时到达下面代码部分中的每一行列。
因为我想根据保存在 Redis 中的查找字段来丰富我的每行。
我发现了类似的东西,但我如何才能到达数据框列并添加查找 Redis 的新列。
对于任何帮助,我非常感谢,谢谢。
import org.apache.spark.sql.types._
def transformRow(row: Row): Row = {
Row.fromSeq(row.toSeq ++ Array[Any]("val1", "val2"))
}
def transformRows(iter: Iterator[Row]): Iterator[Row] =
{
val redisConn =new RedisClient("xxx.xxx.xx.xxx",6379,1,Option("Secret123"))
println(redisConn.get("ModelValidityPeriodName").getOrElse(""))
//want to reach DataFrame column here
redisConn.close()
iter.map(transformRow)
}
val newSchema = StructType(raw_customer_df.schema.fields ++
Array(
StructField("ModelValidityPeriod", StringType, false),
StructField("ModelValidityPeriod2", StringType, false)
)
)
spark.sqlContext.createDataFrame(raw_customer_df.rdd.mapPartitions(transformRows), newSchema).show
迭代器iter
表示数据帧行上的迭代器。因此,如果我正确回答了您的问题,您可以通过迭代 iter
并调用
来访问列值
row.getAs[Column_Type](column_name)
像这样
def transformRows(iter: Iterator[Row]): Iterator[Row] = {
val redisConn = new RedisClient("xxx.xxx.xx.xxx",6379,1,Option("Secret123"))
println(redisConn.get("ModelValidityPeriodName").getOrElse(""))
//want to reach DataFrame column here
val res = iter.map { row =>
val columnValue = row.getAs[String]("column_name")
// lookup in redis
val valueFromRedis = redisConn.get(...)
Row.fromSeq(row.toSeq ++ Array[Any](valueFromRedis))
}.toList
redisConn.close()
res.iterator
}
在我之前的问题中(
我尝试使用 mappartitions 但我无法解决一个问题,我如何在迭代时到达下面代码部分中的每一行列。 因为我想根据保存在 Redis 中的查找字段来丰富我的每行。 我发现了类似的东西,但我如何才能到达数据框列并添加查找 Redis 的新列。 对于任何帮助,我非常感谢,谢谢。
import org.apache.spark.sql.types._
def transformRow(row: Row): Row = {
Row.fromSeq(row.toSeq ++ Array[Any]("val1", "val2"))
}
def transformRows(iter: Iterator[Row]): Iterator[Row] =
{
val redisConn =new RedisClient("xxx.xxx.xx.xxx",6379,1,Option("Secret123"))
println(redisConn.get("ModelValidityPeriodName").getOrElse(""))
//want to reach DataFrame column here
redisConn.close()
iter.map(transformRow)
}
val newSchema = StructType(raw_customer_df.schema.fields ++
Array(
StructField("ModelValidityPeriod", StringType, false),
StructField("ModelValidityPeriod2", StringType, false)
)
)
spark.sqlContext.createDataFrame(raw_customer_df.rdd.mapPartitions(transformRows), newSchema).show
迭代器iter
表示数据帧行上的迭代器。因此,如果我正确回答了您的问题,您可以通过迭代 iter
并调用
row.getAs[Column_Type](column_name)
像这样
def transformRows(iter: Iterator[Row]): Iterator[Row] = {
val redisConn = new RedisClient("xxx.xxx.xx.xxx",6379,1,Option("Secret123"))
println(redisConn.get("ModelValidityPeriodName").getOrElse(""))
//want to reach DataFrame column here
val res = iter.map { row =>
val columnValue = row.getAs[String]("column_name")
// lookup in redis
val valueFromRedis = redisConn.get(...)
Row.fromSeq(row.toSeq ++ Array[Any](valueFromRedis))
}.toList
redisConn.close()
res.iterator
}