将 spark dStream 与变量合并到 saveToCassandra()
merge spark dStream with variable to saveToCassandra()
我有一个 DStream[String, Int
] 有成对的字数统计,例如("hello" -> 10)
。我想用步骤索引将这些计数写入 cassandra。索引初始化为 var step = 1
并随着每个微批次的处理而递增。
cassandra table 创建为:
CREATE TABLE wordcounts (
step int,
word text,
count int,
primary key (step, word)
);
尝试将流写入 table...
stream.saveToCassandra("keyspace", "wordcounts", SomeColumns("word", "count"))
...我得到 java.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: step
.
如何将 step
索引添加到流中以便将三列写在一起?
我正在使用 spark 2.0.0、scala 2.11.8、cassandra 3.4.0 和 spark-cassandra-connector 2.0.0-M3。
由于您试图将 RDD 保存到现有的 Cassandra table,因此您需要在 RDD 中包含所有主键列值。
你可以做的是,你可以使用下面的方法将RDD保存到新的table。
saveAsCassandraTable or saveAsCassandraTableEx
有关详细信息,请查看 this。
如前所述,虽然 Cassandra table 需要 (Int, String, Int)
形式的内容,但 wordCount DStream 的类型为 DStream[(String, Int)]
,因此对于 saveToCassandra(...)
的调用工作,我们需要 DStream[(Int, String, Int)]
.
类型的 DStream
这个问题中棘手的部分是如何将本地计数器(根据定义仅在驱动程序中已知)提升到 DStream 级别。
为此,我们需要做两件事:"lift" 分布式级别的计数器(在 Spark 中,我们指的是 "RDD" 或 "DataFrame")并将该值与现有 DStream
数据。
与经典的 Streaming 字数统计示例不同:
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
我们添加一个本地变量来保存微批次的计数:
@transient var batchCount = 0
它被声明为瞬态的,因此当我们声明使用它的转换时,Spark 不会尝试关闭它的值。
现在是棘手的一点:在 DStream transform
ation 的上下文中,我们从单个 var
iable 中创建一个 RDD,并使用笛卡尔积将其与 DStream 的底层 RDD 连接起来:
val batchWordCounts = wordCounts.transform{ rdd =>
batchCount = batchCount + 1
val localCount = sparkContext.parallelize(Seq(batchCount))
rdd.cartesian(localCount).map{case ((word, count), batch) => (batch, word, count)}
}
(请注意,一个简单的 map
函数将不起作用,因为只有 var
iable 的初始值会被捕获和序列化。因此,看起来计数器从未增加查看 DStream 数据。
最后,既然数据的形状正确,将其保存到 Cassandra:
batchWordCounts.saveToCassandra("keyspace", "wordcounts")
updateStateByKey
函数由 spark 提供,用于全局状态处理。
对于这种情况,它可能类似于以下内容
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount: Int = runningCount.getOrElse(0) + 1
Some(newCount)
}
val step = stream.updateStateByKey(updateFunction _)
stream.join(step).map{case (key,(count, step)) => (step,key,count)})
.saveToCassandra("keyspace", "wordcounts")
我有一个 DStream[String, Int
] 有成对的字数统计,例如("hello" -> 10)
。我想用步骤索引将这些计数写入 cassandra。索引初始化为 var step = 1
并随着每个微批次的处理而递增。
cassandra table 创建为:
CREATE TABLE wordcounts (
step int,
word text,
count int,
primary key (step, word)
);
尝试将流写入 table...
stream.saveToCassandra("keyspace", "wordcounts", SomeColumns("word", "count"))
...我得到 java.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: step
.
如何将 step
索引添加到流中以便将三列写在一起?
我正在使用 spark 2.0.0、scala 2.11.8、cassandra 3.4.0 和 spark-cassandra-connector 2.0.0-M3。
由于您试图将 RDD 保存到现有的 Cassandra table,因此您需要在 RDD 中包含所有主键列值。
你可以做的是,你可以使用下面的方法将RDD保存到新的table。
saveAsCassandraTable or saveAsCassandraTableEx
有关详细信息,请查看 this。
如前所述,虽然 Cassandra table 需要 (Int, String, Int)
形式的内容,但 wordCount DStream 的类型为 DStream[(String, Int)]
,因此对于 saveToCassandra(...)
的调用工作,我们需要 DStream[(Int, String, Int)]
.
DStream
这个问题中棘手的部分是如何将本地计数器(根据定义仅在驱动程序中已知)提升到 DStream 级别。
为此,我们需要做两件事:"lift" 分布式级别的计数器(在 Spark 中,我们指的是 "RDD" 或 "DataFrame")并将该值与现有 DStream
数据。
与经典的 Streaming 字数统计示例不同:
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
我们添加一个本地变量来保存微批次的计数:
@transient var batchCount = 0
它被声明为瞬态的,因此当我们声明使用它的转换时,Spark 不会尝试关闭它的值。
现在是棘手的一点:在 DStream transform
ation 的上下文中,我们从单个 var
iable 中创建一个 RDD,并使用笛卡尔积将其与 DStream 的底层 RDD 连接起来:
val batchWordCounts = wordCounts.transform{ rdd =>
batchCount = batchCount + 1
val localCount = sparkContext.parallelize(Seq(batchCount))
rdd.cartesian(localCount).map{case ((word, count), batch) => (batch, word, count)}
}
(请注意,一个简单的 map
函数将不起作用,因为只有 var
iable 的初始值会被捕获和序列化。因此,看起来计数器从未增加查看 DStream 数据。
最后,既然数据的形状正确,将其保存到 Cassandra:
batchWordCounts.saveToCassandra("keyspace", "wordcounts")
updateStateByKey
函数由 spark 提供,用于全局状态处理。
对于这种情况,它可能类似于以下内容
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount: Int = runningCount.getOrElse(0) + 1
Some(newCount)
}
val step = stream.updateStateByKey(updateFunction _)
stream.join(step).map{case (key,(count, step)) => (step,key,count)})
.saveToCassandra("keyspace", "wordcounts")