值拆分不是 (String, String) 的成员
Value split is not a member of (String, String)
我正在尝试从 Kafka 读取数据并通过 Spark RDD 存储到 Cassandra 表中。
编译代码时出现错误:
/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String)
[error] val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
下面的代码:当我 运行 通过交互式 spark-shell
手动编写代码时,它工作正常,但是在为 spark-submit
编译代码时出现错误。
// Create direct kafka stream with brokers and topics
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet)
// Create the processing logic
// Get the lines, split
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long"))
KafkaUtils.createDirectStream
returns 键和值的 元组 (因为 Kafka 中的消息是可选键控的)。在您的情况下,它的类型为 (String, String)
。如果要拆分值,首先要取出来:
val lines =
messages
.map(line => line._2.split(','))
.map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
或使用部分函数语法:
val lines =
messages
.map { case (_, value) => value.split(',') }
.map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
kafka 中的所有消息都是有键的。原始的 Kafka 流,在本例中 messages
,是一个元组流 (key,value)
。
正如编译错误指出的那样,元组上没有 split
方法。
我们这里要做的是:
messages.map{ case (key, value) => value.split(','))} ...
我正在尝试从 Kafka 读取数据并通过 Spark RDD 存储到 Cassandra 表中。
编译代码时出现错误:
/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String)
[error] val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
下面的代码:当我 运行 通过交互式 spark-shell
手动编写代码时,它工作正常,但是在为 spark-submit
编译代码时出现错误。
// Create direct kafka stream with brokers and topics
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet)
// Create the processing logic
// Get the lines, split
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long"))
KafkaUtils.createDirectStream
returns 键和值的 元组 (因为 Kafka 中的消息是可选键控的)。在您的情况下,它的类型为 (String, String)
。如果要拆分值,首先要取出来:
val lines =
messages
.map(line => line._2.split(','))
.map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
或使用部分函数语法:
val lines =
messages
.map { case (_, value) => value.split(',') }
.map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
kafka 中的所有消息都是有键的。原始的 Kafka 流,在本例中 messages
,是一个元组流 (key,value)
。
正如编译错误指出的那样,元组上没有 split
方法。
我们这里要做的是:
messages.map{ case (key, value) => value.split(','))} ...