从 DataFrame 创建嵌套案例 class 实例

Create nested case class instance from a DataFrame

我有这两种情况类:

case class Inline_response_200(
  nodeid: Option[String],
  data: Option[List[ReadingsByEpoch_data]]
)

case class ReadingsByEpoch_data(
  timestamp: Option[Int],
  value: Option[String]
)

我有一个 Cassandra table,它有像 nodeid|timestamp|value 这样的数据。基本上,每个 nodeid 有多个 timestamp-value 对。

我想做的就是用 ReadingsByEpoch_data 的正确列表创建 Inline_response_200 的实例,这样 Jackson 就可以将它们正确地序列化为 Json。

我试过了

val res = sc.cassandraTable[Inline_response_200]("test", "taghistory").limit(100).collect()

但是我得到这个错误

java.lang.IllegalArgumentException: Failed to map constructor parameter data in com.wordnik.client.model.Inline_response_200 to a column of test.taghistory

完全合理,因为在我的 Cassandra table 中没有列 data。但是我怎样才能正确地创建实例呢?

Cassandra table 看起来像这样:

CREATE TABLE test.taghistory (
nodeid text,
timestamp text,
value text,
PRIMARY KEY (nodeid, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC)

编辑
根据 Alex Ott 的建议:

val grouped = data.groupByKey.map {
  case (k, v) =>
    Inline_response_200(k.getString(0), v.map(x => ReadingsByEpoch_data(x.getInt(1), x.getString(2))).toList)
}
grouped.collect().toList

我快到了,但还没到。这给了我期望的格式,但是它为每条记录创建了一个 Inline_response_200 实例:

[{"nodeid":"Tag3","data":[{"timestamp":1519411780,"value":"80.0"}]},{"nodeid":"Tag3","data":[{"timestamp":1519411776,"value":"76.0"}]}]  

在这个例子中,我需要有一个 nodeid 键和一个包含两个时间戳值对的数组,如下所示:

[{"nodeid":"Tag3","data":[{"timestamp":1519411780,"value":"80.0"},{"timestamp":1519411776,"value":"76.0"}]}]`  

也许我分组的方式有误?

如果您的数据库中有 nodeid|timestamp|value 之类的数据(是的,根据架构),您无法将其直接映射到您创建的结构中。从 table 中读取数据作为对 RDD:

val data = sc.cassandraTable[(String,String,Option[String])]("test", "taghistory")
     .select("nodeid","timestamp","value").keyBy[String]("nodeid")

然后通过在该对 RDD 上使用 groupByKey 将其转换为您需要的结构并转换为您需要的 Inline_response_200 class,如下所示:

val grouped = data.groupByKey.map{case (k,v) => Inline_response_200(k,
       v.map(x => ReadingsByEpoch_data(x._2, x._3)).toList)}
grouped.collect