来自 Kafka 的 Spark Streaming 中的空值
Null value in spark streaming from Kafka
我有一个简单的程序,因为我正在尝试使用 kafka
接收数据。当我启动一个 kafka 生产者并发送数据时,例如:"Hello",我在打印消息时得到这个:(null, Hello)
。而且我不知道为什么会出现这个空值。有什么办法可以避免这个 null 吗?我认为这是由于第一个参数Tuple2<String, String>
,但我只想打印第二个参数。还有一件事,当我使用 System.out.println("inside map "+ message);
打印它时,它没有出现任何消息,有人知道为什么吗?谢谢。
public static void main(String[] args){
SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]");
// Substitute 127.0.0.1 with the actual address of your Spark Master (or use "local" to run in local mode
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = KafkaProperties.TOPIC.split(",");
for (String topic: topics) {
topicMap.put(topic, KafkaProperties.NUM_THREADS);
}
/* connection to cassandra */
CassandraConnector connector = CassandraConnector.apply(sparkConf);
System.out.println("+++++++++++ cassandra connector created ++++++++++++++++++++++++++++");
/* Receive kafka inputs */
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, KafkaProperties.ZOOKEEPER, KafkaProperties.GROUP_CONSUMER, topicMap);
System.out.println("+++++++++++++ streaming-kafka connection done +++++++++++++++++++++++++++");
JavaDStream<String> lines = messages.map(
new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> message) {
System.out.println("inside map "+ message);
return message._2();
}
}
);
messages.print();
jssc.start();
jssc.awaitTermination();
}
Q1) 空值:
Kafka 中的消息是有键的,这意味着它们都有一个 (Key, Value) 结构。
当你看到 (null, Hello)
是因为生产者在主题中发布了 (null,"Hello")
值。
如果你想在你的过程中省略key,映射原来的Dtream
去掉key:kafkaDStream.map( new Function<String,String>() {...})
Q2) System.out.println("inside map "+ message);
不打印。几个经典原因:
转换在执行器中应用,因此当 运行 在集群中时,该输出将出现在执行器中而不是主控器中。
操作是惰性的,需要具体化 DStreams 才能应用操作。
在这种特定情况下,JavaDStream<String> lines
永远不会具体化,即不用于输出操作。因此 map
永远不会被执行。
我有一个简单的程序,因为我正在尝试使用 kafka
接收数据。当我启动一个 kafka 生产者并发送数据时,例如:"Hello",我在打印消息时得到这个:(null, Hello)
。而且我不知道为什么会出现这个空值。有什么办法可以避免这个 null 吗?我认为这是由于第一个参数Tuple2<String, String>
,但我只想打印第二个参数。还有一件事,当我使用 System.out.println("inside map "+ message);
打印它时,它没有出现任何消息,有人知道为什么吗?谢谢。
public static void main(String[] args){
SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]");
// Substitute 127.0.0.1 with the actual address of your Spark Master (or use "local" to run in local mode
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = KafkaProperties.TOPIC.split(",");
for (String topic: topics) {
topicMap.put(topic, KafkaProperties.NUM_THREADS);
}
/* connection to cassandra */
CassandraConnector connector = CassandraConnector.apply(sparkConf);
System.out.println("+++++++++++ cassandra connector created ++++++++++++++++++++++++++++");
/* Receive kafka inputs */
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, KafkaProperties.ZOOKEEPER, KafkaProperties.GROUP_CONSUMER, topicMap);
System.out.println("+++++++++++++ streaming-kafka connection done +++++++++++++++++++++++++++");
JavaDStream<String> lines = messages.map(
new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> message) {
System.out.println("inside map "+ message);
return message._2();
}
}
);
messages.print();
jssc.start();
jssc.awaitTermination();
}
Q1) 空值:
Kafka 中的消息是有键的,这意味着它们都有一个 (Key, Value) 结构。
当你看到 (null, Hello)
是因为生产者在主题中发布了 (null,"Hello")
值。
如果你想在你的过程中省略key,映射原来的Dtream
去掉key:kafkaDStream.map( new Function<String,String>() {...})
Q2) System.out.println("inside map "+ message);
不打印。几个经典原因:
转换在执行器中应用,因此当 运行 在集群中时,该输出将出现在执行器中而不是主控器中。
操作是惰性的,需要具体化 DStreams 才能应用操作。
在这种特定情况下,JavaDStream<String> lines
永远不会具体化,即不用于输出操作。因此 map
永远不会被执行。