来自 Kafka 的 Spark Streaming 中的空值

Null value in spark streaming from Kafka

我有一个简单的程序,因为我正在尝试使用 kafka 接收数据。当我启动一个 kafka 生产者并发送数据时,例如:"Hello",我在打印消息时得到这个:(null, Hello)。而且我不知道为什么会出现这个空值。有什么办法可以避免这个 null 吗?我认为这是由于第一个参数Tuple2<String, String>,但我只想打印第二个参数。还有一件事,当我使用 System.out.println("inside map "+ message); 打印它时,它没有出现任何消​​息,有人知道为什么吗?谢谢。

public static void main(String[] args){

    SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]");
    // Substitute 127.0.0.1 with the actual address of your Spark Master (or use "local" to run in local mode
    sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
    // Create the context with 2 seconds batch size
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

    Map<String, Integer> topicMap = new HashMap<>();
    String[] topics = KafkaProperties.TOPIC.split(",");
    for (String topic: topics) {
        topicMap.put(topic, KafkaProperties.NUM_THREADS);
    }
    /* connection to cassandra */
    CassandraConnector connector = CassandraConnector.apply(sparkConf);
    System.out.println("+++++++++++ cassandra connector created ++++++++++++++++++++++++++++");

    /* Receive kafka inputs */
    JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, KafkaProperties.ZOOKEEPER, KafkaProperties.GROUP_CONSUMER, topicMap);
    System.out.println("+++++++++++++ streaming-kafka connection done +++++++++++++++++++++++++++");

    JavaDStream<String> lines = messages.map(
            new Function<Tuple2<String, String>, String>() {
                public String call(Tuple2<String, String> message) {
                    System.out.println("inside map "+ message);
                    return message._2();
                }
            }
    );

    messages.print();
    jssc.start();
    jssc.awaitTermination();
}

Q1) 空值: Kafka 中的消息是有键的,这意味着它们都有一个 (Key, Value) 结构。 当你看到 (null, Hello) 是因为生产者在主题中发布了 (null,"Hello") 值。 如果你想在你的过程中省略key,映射原来的Dtream去掉key:kafkaDStream.map( new Function<String,String>() {...})

Q2) System.out.println("inside map "+ message); 不打印。几个经典原因:

  1. 转换在执行器中应用,因此当 运行 在集群中时,该输出将出现在执行器中而不是主控器中。

  2. 操作是惰性的,需要具体化 DStreams 才能应用操作。

在这种特定情况下,JavaDStream<String> lines 永远不会具体化,即不用于输出操作。因此 map 永远不会被执行。