java.io.IOException:无法将语句写入 batch_layer.test。最新的异常是 Key may not be empty

java.io.IOException: Failed to write statements to batch_layer.test. The latest exception was Key may not be empty

我正在尝试计算文本中的单词数并将结果保存到 Cassandra 数据库中。 Producer从文件中读取数据并发送给kafka。 Consumer使用spark streaming读取并处理日期,然后将计算结果发送给table.

我的制作人长这样:

object ProducerPlayground extends App {

  val topicName = "test"
  private def createProducer: Properties = {
    val producerProperties = new Properties()
    producerProperties.setProperty(
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      "localhost:9092"
    )
    producerProperties.setProperty(
      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      classOf[IntegerSerializer].getName
    )
    producerProperties.setProperty(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      classOf[StringSerializer].getName
    )
    producerProperties
  }

  val producer = new KafkaProducer[Int, String](createProducer)

  val source = Source.fromFile("G:\text.txt", "UTF-8")

  val lines = source.getLines()

  var key = 0
  for (line <- lines) {
    producer.send(new ProducerRecord[Int, String](topicName, key, line))
    key += 1
  }
  source.close()
  producer.flush()

}

消费者看起来像这样:

object BatchLayer {
  def main(args: Array[String]) {

    val brokers = "localhost:9092"
    val topics = "test"
    val groupId = "groupId-1"

    val sparkConf = new SparkConf()
      .setAppName("BatchLayer")
      .setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    val sc = ssc.sparkContext
    sc.setLogLevel("OFF")

    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
    )
    val stream =
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
      )

   
    val cass = CassandraConnector(sparkConf)

    cass.withSessionDo { session =>
      session.execute(
        s"CREATE KEYSPACE IF NOT EXISTS batch_layer WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }"
      )
      session.execute(s"CREATE TABLE IF NOT EXISTS batch_layer.test (key VARCHAR PRIMARY KEY, value INT)")
      session.execute(s"TRUNCATE batch_layer.test")
    }

    stream
      .map(v => v.value())
      .flatMap(x => x.split(" "))
      .filter(x => !x.contains(Array('\n', '\t')))
      .map(x => (x, 1))
      .reduceByKey(_ + _)
      .saveToCassandra("batch_layer", "test", SomeColumns("key", "value"))

    ssc.start()
    ssc.awaitTermination()
  }

}

启动生产者后,程序停止运行并出现此错误。我做错了什么?

在 2021 年使用遗留流已经没有什么意义了——使用起来非常麻烦,而且你还需要跟踪 Kafka 等的偏移量。最好使用 Structured Streaming instead——它会跟踪偏移量通过检查点,您将使用高级数据集 API 等。

在您的案例中,代码可能如下所示(未测试,但它是从 this working example 中采用的):

val streamingInputDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .load()

val wordsCountsDF = streamingInputDF.selectExpr("CAST(value AS STRING) as value")
  .selectExpr("split(value, '\w+', -1) as words")
  .selectExpr("explode(words) as word")
  .filter("word != ''")
  .groupBy($"word")
  .count()
  .select($"word", $"count")

// create table ...

val query = wordsCountsDF.writeStream
   .outputMode(OutputMode.Update)
   .format("org.apache.spark.sql.cassandra")
   .option("checkpointLocation", "path_to_checkpoint)
   .option("keyspace", "test")
   .option("table", "<table_name>")
   .start()

query.awaitTermination()

P.S。在您的示例中,最可能的错误是您试图直接在 DStream 上使用 .saveToCassandra - 它不会以这种方式工作。