saveToCassandra 基于来自 RDD 的内容
saveToCassandra based on content from RDD
我在 scala 中使用 spark 构建一个通用应用程序来并行化 http 调用,我担心是否可以根据 RDD 的内容执行 saveToCassandra 操作,因为响应应该进入不同的 tables.
为了更加清晰,
val queries: List[Query] = List(Query("google", "fish"), Query("yahoo", "chicken"))
val inputRDD = sc.parallelize(queries)
其中
case class Query(dataSource: String, query: String)
然后每个查询都映射到要保存到 cassandra 中的元组列表,但根据查询中的数据源,google 的数据应该进入 cassandra table 以供 google 以及雅虎自己的 table。
TIA
我会简单地过滤并保存单个子集:
val keywords = Map("google" -> "googletab", "yahoo" -> "yahootab")
val keyspace: String = ???
val subsets = keywords.keys.map(k =>
(k -> inputRDD.filter{case Query(x, _) => x == k}))
subsets.foreach{ case (k, rdd) =>
rdd.saveToCassandra(keyspace, keywords(k), SomeColumns(???))
}
我在 scala 中使用 spark 构建一个通用应用程序来并行化 http 调用,我担心是否可以根据 RDD 的内容执行 saveToCassandra 操作,因为响应应该进入不同的 tables.
为了更加清晰,
val queries: List[Query] = List(Query("google", "fish"), Query("yahoo", "chicken"))
val inputRDD = sc.parallelize(queries)
其中
case class Query(dataSource: String, query: String)
然后每个查询都映射到要保存到 cassandra 中的元组列表,但根据查询中的数据源,google 的数据应该进入 cassandra table 以供 google 以及雅虎自己的 table。
TIA
我会简单地过滤并保存单个子集:
val keywords = Map("google" -> "googletab", "yahoo" -> "yahootab")
val keyspace: String = ???
val subsets = keywords.keys.map(k =>
(k -> inputRDD.filter{case Query(x, _) => x == k}))
subsets.foreach{ case (k, rdd) =>
rdd.saveToCassandra(keyspace, keywords(k), SomeColumns(???))
}