将值从 spark 保存到 Cassandra

Saving values from spark to Cassandra

我需要存储来自 kafka->spark streaming->cassandra 的值。

现在,我正在从 kafka->spark 接收值,并且我有一个 spark 作业将值保存到 cassandra 数据库中。但是,我遇到了数据类型 dstream 的问题。

在下面的代码片段中,您可以看到我如何尝试将 DStream 转换为 python 友好列表对象,以便我可以使用它,但它给出了一个错误。

kafka 生产者的输入:

Byrne 24 San Diego robbyrne@email.com Rob

火花作业:

map1={'spark-kafka':1}
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1)
lines = kafkaStream.map(lambda x: x[1])
words = lines.flatMap(lambda line: line.split(" "))

words.pprint() # outputs-> Byrne 24 SanDiego robbyrne@email.com Rob

list=[lambda word for word in words]
#gives an error -> TypeError: 'TransformedDStream' object is not iterable

这就是我从 spark->cassandra

中保存值的方式
rdd2=sc.parallelize([{
... "lastname":'Byrne',
... "age":24,
... "city":"SanDiego",
... "email":"robbyrne@email.com",
... "firstname":"Rob"}])
rdd2.saveToCassandra("keyspace2","users")

将 DStream 对象转换为字典的最佳方式是什么,或者执行我在这里尝试执行的操作的最佳方式是什么?

我只需要将从 kafka 接收到的值(以 DStream 的形式)保存在 Cassandra 中。

谢谢,如有帮助将不胜感激!

版本:

Cassandra v2.1.12
Spark v1.4.1
Scala 2.10

和所有东西一样 'sparky',我认为应该做一个简短的解释,因为即使您熟悉 RDD,DStream 也是一个更高的概念:
Discretized Stream(DStream),是同类型RDD的连续序列,代表一个连续的数据流。在您的情况下,DStream 是根据实时 Kafka 数据创建的。
虽然 Spark Streaming 程序是 运行,但每个 DStream 会定期从实时 Kafka 数据生成一个 RDD

现在,要遍历接收到的 RDD,您需要使用 DStream#foreachRDD(顾名思义,它的用途与 foreach 相似,但是这时间,遍历 RDDs).
一旦你有了一个 RDD,你可以为 RDD 调用 rdd.collect()rdd.take() 或任何其他标准 API。

现在,作为结束语,为了让事情变得更加有趣,Spark 引入了一种新的 receiver-less“直接”方法来确保更强的 end-to-end 保证。
KafkaUtils.createDirectStream 需要 Spark 1.3+)
这种方法不是使用接收器接收数据,而是定期向 Kafka 查询每个主题+分区中的最新偏移量,并相应地定义要在每个批次中处理的偏移量范围。当启动处理数据的作业时,Kafka 的简单消费者 API 用于从 Kafka 读取定义的偏移量范围。
(这是一个很好的说法,你必须自己 "mess" 使用偏移量)

有关详细信息,请参阅 Direct Streams Approach
有关 Scala 代码示例,请参阅 here

根据 spark-cassandra 连接器的官方文档:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md

import com.datastax.spark.connector.streaming._

val ssc = new StreamingContext(conf, Seconds(n))

val stream = ...

val wc = stream
        .map(...)
        .filter(...)   
        .saveToCassandra("streaming_test", "words", SomeColumns("word", "count")) 

ssc.start()

其实我在这篇教程中找到了答案http://katychuang.me/blog/2015-09-30-kafka_spark.html