在 Spark Streaming 应用程序中加入数据的最佳方法是什么?
What is best approach to join data in spark streaming application?
问题:本质上它的意思是,不是 运行为每个流式记录连接 C* table,而是 运行 连接对于 spark streaming 中记录的每个微批次(micro-batching)?
我们几乎最终确定使用 spark-sql 2.4.x 版本,datastax-spark-cassandra-connector for Cassandra-3.x 版本。
但是有一个关于以下场景中效率的基本问题。
对于流数据记录(即 streamingDataSet ),我需要从 Cassandra(C*) table.
中查找现有记录(即 cassandraDataset)
即
Dataset<Row> streamingDataSet = //kafka read dataset
Dataset<Row> cassandraDataset= //loaded from C* table those records loaded earlier from above.
要查找数据,我需要加入以上数据集
即
Dataset<Row> joinDataSet = cassandraDataset.join(cassandraDataset).where(//somelogic)
进一步处理 joinDataSet 以实现业务逻辑...
In the above scenario, my understanding is ,for each record received
from kafka stream it would query the C* table i.e. data base call.
Does not it take huge time and network bandwidth if C* table consists
billions of records? What should be the approach/procedure to be
followed to improve look up C* table ?
What is the best solution in this scenario ? I CAN NOT load once from
C* table and look up as the data keep on adding to C* table ... i.e.
new look ups might need newly persisted data.
遇到这种情况怎么办?有什么建议plzz..
如果您使用的是 Apache Cassandra,那么您只有一种可能可以有效地连接 Cassandra 中的数据 - 通过 RDD API's joinWithCassandraTable
. The open source version of the Spark Cassandra Connector (SCC) supports only it, while in version for DSE, there is a code that allows to perform effective join against Cassandra also for Spark SQL - so-called DSE Direct Join。如果您在 Spark SQL 中使用 join
对抗 Cassandra table,Spark 将需要从 Cassandra 读取所有数据,然后执行连接 - 这非常慢。
我没有用于 Spark 结构化流连接的 OSS SCC 示例,但我有一些 "normal" 连接示例,例如 this:
CassandraJavaPairRDD<Tuple1<Integer>, Tuple2<Integer, String>> joinedRDD =
trdd.joinWithCassandraTable("test", "jtest",
someColumns("id", "v"), someColumns("id"),
mapRowToTuple(Integer.class, String.class), mapTupleToRow(Integer.class));
问题:本质上它的意思是,不是 运行为每个流式记录连接 C* table,而是 运行 连接对于 spark streaming 中记录的每个微批次(micro-batching)?
我们几乎最终确定使用 spark-sql 2.4.x 版本,datastax-spark-cassandra-connector for Cassandra-3.x 版本。
但是有一个关于以下场景中效率的基本问题。
对于流数据记录(即 streamingDataSet ),我需要从 Cassandra(C*) table.
中查找现有记录(即 cassandraDataset)即
Dataset<Row> streamingDataSet = //kafka read dataset
Dataset<Row> cassandraDataset= //loaded from C* table those records loaded earlier from above.
要查找数据,我需要加入以上数据集
即
Dataset<Row> joinDataSet = cassandraDataset.join(cassandraDataset).where(//somelogic)
进一步处理 joinDataSet 以实现业务逻辑...
In the above scenario, my understanding is ,for each record received from kafka stream it would query the C* table i.e. data base call.
Does not it take huge time and network bandwidth if C* table consists billions of records? What should be the approach/procedure to be followed to improve look up C* table ?
What is the best solution in this scenario ? I CAN NOT load once from C* table and look up as the data keep on adding to C* table ... i.e. new look ups might need newly persisted data.
遇到这种情况怎么办?有什么建议plzz..
如果您使用的是 Apache Cassandra,那么您只有一种可能可以有效地连接 Cassandra 中的数据 - 通过 RDD API's joinWithCassandraTable
. The open source version of the Spark Cassandra Connector (SCC) supports only it, while in version for DSE, there is a code that allows to perform effective join against Cassandra also for Spark SQL - so-called DSE Direct Join。如果您在 Spark SQL 中使用 join
对抗 Cassandra table,Spark 将需要从 Cassandra 读取所有数据,然后执行连接 - 这非常慢。
我没有用于 Spark 结构化流连接的 OSS SCC 示例,但我有一些 "normal" 连接示例,例如 this:
CassandraJavaPairRDD<Tuple1<Integer>, Tuple2<Integer, String>> joinedRDD =
trdd.joinWithCassandraTable("test", "jtest",
someColumns("id", "v"), someColumns("id"),
mapRowToTuple(Integer.class, String.class), mapTupleToRow(Integer.class));