Kafka producer.send() 被 producer.close() 停止

Kafka producer.send() is Stopped by producer.close()

我正在尝试发送关于名为 "test" 的 kafka 主题的字数统计问题(在 spark-scala 中)的输出。请参阅下面的代码:

val Dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

val lines = Dstream.map(f => f._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.foreachRDD(
      rdd => rdd.foreach(
        f =>
          {
            val sendProps = new Properties()
            sendProps.put("metadata.broker.list", brokers)
            sendProps.put("serializer.class", "kafka.serializer.StringEncoder")
            sendProps.put("producer.type", "async")

            val config = new ProducerConfig(sendProps)
            val producer = new Producer[String, String](config)
            producer.send(new KeyedMessage[String, String]"test", f._1 + " " +f._2))
            producer.close();

          })) 

问题是输出中随机丢失了一些单词。我还注意到,如果我删除语句

producer.close()

没有数据丢失。

这是否意味着 producer.close() 在实际将数据放入缓冲区之前中断 producer.send()由于那个特定的元组没有被发送给消费者?如果是,我该如何关闭生产者而不冒数据丢失的风险?

以上是我最初的问题,Vale 的回答解决了。

现在,当我再次更改 producer.type 属性 - 数据随机丢失。

sendProps.put("producer.type", "sync")

澄清 producer.send 是 运行 我需要放在输出主题中的所有单词。但是,有些词会丢失,并且不会显示在输出 Kafka 主题中。

这很奇怪。 close() 方法应该等待发送完成,这就是引入 close(time) 方法的原因:as you can see here.
所以,我用Java 7.rdd.foreach是对里面的每个分区进行操作吗?或者它是在每个元组上运行(正如我认为的那样)?
如果是后者,您可以尝试 rdd.foreachPartition (refer to this) 吗?因为你正在为你拍摄的每一行创建一个制作人,我担心这可能会导致问题(尽管理论上它不应该)。