运行 多个 cassandra 查询并在 Spark 2.3 中的一个 RDD 中获取结果
Running multiple cassandra queries and getting result in one RDD in Spark 2.3
我有一个字符串序列,我想在我的 Cassandra 查询的 where 子句中使用它。所以对于序列中的每个字符串都会有一个查询。
idSeq.foreach(id => {
val rdd1 = sc.cassandraTable("keyspace", "columnfamily").
where("id = ?", id).
limit(100)
})
所以我在我的序列上放置了一个循环,我是 运行 序列中每个 id 的查询。我想将所有结果合并到一个 RDD 中,并对合并后的 RDD 执行映射和保存操作。我已经尝试创建一个空的 RDD 并执行联合,但即使在循环之后 RDD 仍然是空的并且没有任何内容被保存。正确的做法是什么?
sc.union(idSeq.map(id => {
sc.cassandraTable("keyspace", "columnfamily").where("id = ?", id).limit(100)
}))
更快、更有效的解决方案是创建一个包含您要获取的 ID 的 RDD,然后使用 joinWithCassandraTable
从 Cassandra 查询数据。例如(来自文档):
val joinWithRDD = sc.parallelize(0 to 5)
.map(CustomerID(_))
.joinWithCassandraTable("test","customer_info")
有关详细信息,请参阅 documentation,包括关于输入数据重新分配以更有效地获取数据的说明。
我有一个字符串序列,我想在我的 Cassandra 查询的 where 子句中使用它。所以对于序列中的每个字符串都会有一个查询。
idSeq.foreach(id => {
val rdd1 = sc.cassandraTable("keyspace", "columnfamily").
where("id = ?", id).
limit(100)
})
所以我在我的序列上放置了一个循环,我是 运行 序列中每个 id 的查询。我想将所有结果合并到一个 RDD 中,并对合并后的 RDD 执行映射和保存操作。我已经尝试创建一个空的 RDD 并执行联合,但即使在循环之后 RDD 仍然是空的并且没有任何内容被保存。正确的做法是什么?
sc.union(idSeq.map(id => {
sc.cassandraTable("keyspace", "columnfamily").where("id = ?", id).limit(100)
}))
更快、更有效的解决方案是创建一个包含您要获取的 ID 的 RDD,然后使用 joinWithCassandraTable
从 Cassandra 查询数据。例如(来自文档):
val joinWithRDD = sc.parallelize(0 to 5)
.map(CustomerID(_))
.joinWithCassandraTable("test","customer_info")
有关详细信息,请参阅 documentation,包括关于输入数据重新分配以更有效地获取数据的说明。