使用 Apache Spark 进行实时分析
Real time analytic using Apache Spark
我正在使用 Apache Spark 分析来自 Cassandra 的数据,并将通过根据我们的查询在 Cassandra 中设计新的 table 将数据插入回 Cassandra。我想知道spark是否可以实时分析?如果是那么如何?我已经阅读了很多关于这方面的教程,但一无所获。
我想在数据瞬间进入我的 table 时执行分析并插入 Cassandra。
这可以通过 Spark Streaming 实现,您应该查看 Spark Cassandra 连接器随附的演示和文档。
https://github.com/datastax/spark-cassandra-connector
这包括对流的支持,以及对动态创建新表的支持。
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md
Spark Streaming extends the core API to allow high-throughput,
fault-tolerant stream processing of live data streams. Data can be
ingested from many sources such as Akka, Kafka, Flume, Twitter,
ZeroMQ, TCP sockets, etc. Results can be stored in Cassandra.
Use saveAsCassandraTable method to automatically create a new table
with given name and save the RDD into it. The keyspace you're saving
to must exist. The following code will create a new table words_new in
keyspace test with columns word and count, where word becomes a
primary key:
case class WordCount(word: String, count: Long) val collection =
sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveAsCassandraTable("test", "words_new",
SomeColumns("word", "count"))
我正在使用 Apache Spark 分析来自 Cassandra 的数据,并将通过根据我们的查询在 Cassandra 中设计新的 table 将数据插入回 Cassandra。我想知道spark是否可以实时分析?如果是那么如何?我已经阅读了很多关于这方面的教程,但一无所获。
我想在数据瞬间进入我的 table 时执行分析并插入 Cassandra。
这可以通过 Spark Streaming 实现,您应该查看 Spark Cassandra 连接器随附的演示和文档。
https://github.com/datastax/spark-cassandra-connector
这包括对流的支持,以及对动态创建新表的支持。
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md
Spark Streaming extends the core API to allow high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources such as Akka, Kafka, Flume, Twitter, ZeroMQ, TCP sockets, etc. Results can be stored in Cassandra.
Use saveAsCassandraTable method to automatically create a new table with given name and save the RDD into it. The keyspace you're saving to must exist. The following code will create a new table words_new in keyspace test with columns word and count, where word becomes a primary key:
case class WordCount(word: String, count: Long) val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60))) collection.saveAsCassandraTable("test", "words_new", SomeColumns("word", "count"))