值拆分不是 (String, String) 的成员

Value split is not a member of (String, String)

我正在尝试从 Kafka 读取数据并通过 Spark RDD 存储到 Cassandra 表中。

编译代码时出现错误:

/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String)

[error]     val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
[error]                                               ^
[error] one error found

[error] (compile:compileIncremental) Compilation failed

下面的代码:当我 运行 通过交互式 spark-shell 手动编写代码时,它工作正常,但是在为 spark-submit 编译代码时出现错误。

// Create direct kafka stream with brokers and topics
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet)

// Create the processing logic
// Get the lines, split
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long")) 

KafkaUtils.createDirectStream returns 键和值的 元组 (因为 Kafka 中的消息是可选键控的)。在您的情况下,它的类型为 (String, String)。如果要拆分,首先要取出来:

val lines = 
  messages
   .map(line => line._2.split(','))
   .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))

或使用部分函数语法:

val lines = 
  messages
   .map { case (_, value) => value.split(',') }
   .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))  

kafka 中的所有消息都是有键的。原始的 Kafka 流,在本例中 messages,是一个元组流 (key,value)

正如编译错误指出的那样,元组上没有 split 方法。

我们这里要做的是:

messages.map{ case (key, value)  => value.split(','))} ...