为什么 Spark Streaming 不读取 Kafka 主题?
Why does Spark Streaming not read from Kafka topic?
- Spark 流 1.6.0
- Apache Kafka 10.0.1
我使用 Spark Streaming 阅读 sample
主题。代码 运行s 没有错误或异常,但我没有通过 print()
方法在控制台上获得任何数据。
我查看了主题中是否有留言:
./bin/kafka-console-consumer.sh \
--zookeeper ip-172-xx-xx-xxx:2181 \
--topic sample \
--from-beginning
我收到消息:
message no. 1
message no. 2
message no. 3
message no. 4
message no. 5
运行 流作业的命令:
./bin/spark-submit \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MaxDirectMemorySize=512m" \
--jars /home/ubuntu/zifferlabs/target/ZifferLabs-1-jar-with-dependencies.jar \
--class "com.zifferlabs.stream.SampleStream" \
/home/ubuntu/zifferlabs/src/main/java/com/zifferlabs/stream/SampleStream.java
完整代码如下:
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
public class SampleStream {
private static void processStream() {
SparkConf conf = new SparkConf().setAppName("sampleStream")
.setMaster("local[3]")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
.set("spark.driver.memory", "2g").set("spark.streaming.blockInterval", "1000")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.scheduler.mode", "FAIR");
JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(Long.parseLong("2000")));
String[] topics = "sample".split(",");
Set<String> topicSet = new HashSet<String>(Arrays.asList(topics));
Map<String, String> props = new HashMap<String, String>();
props.put("metadata.broker.list", "ip-172-xx-xx-xxx:9092");
props.put("kafka.consumer.id", "sample_con");
props.put("group.id", "sample_group");
props.put("zookeeper.connect", "ip-172-xx-xx-xxx:2181");
props.put("zookeeper.connection.timeout.ms", "16000");
JavaPairInputDStream<String, byte[]> kafkaStream =
KafkaUtils.createDirectStream(jsc, String.class, byte[].class, StringDecoder.class,
DefaultDecoder.class, props, topicSet);
JavaDStream<String> data = kafkaStream.map(new Function<Tuple2<String,byte[]>, String>() {
public String call(Tuple2<String, byte[]> arg0) throws Exception {
System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ value is: " + arg0._2().toString());
return arg0._2().toString();
}
});
data.print();
System.out.println("Spark Streaming started....");
jsc.checkpoint("/home/spark/sparkChkPoint");
jsc.start();
jsc.awaitTermination();
System.out.println("Stopped Spark Streaming");
}
public static void main(String[] args) {
processStream();
}
}
我认为你的代码是正确的,但是执行它的命令行不正确。
您 spark-submit
申请如下(格式是我的 + spark.executor.extraJavaOptions
为简单起见删除):
./bin/spark-submit \
--jars /home/ubuntu/zifferlabs/target/ZifferLabs-1-jar-with-dependencies.jar \
--class "com.zifferlabs.stream.SampleStream" \
/home/ubuntu/zifferlabs/src/main/java/com/zifferlabs/stream/SampleStream.java
我 认为 它不会工作,因为 spark-submit
提交了你的 Java 源代码 而不是可执行文件代码。
请spark-submit
您的申请如下:
./bin/spark-submit \
--class "com.zifferlabs.stream.SampleStream" \
/home/ubuntu/zifferlabs/target/ZifferLabs-1-jar-with-dependencies.jar
这是 --class
来为您的 Spark 应用程序和具有依赖关系的代码定义 "entry point"(作为 spark-submit
的唯一输入参数)。
试一试并反馈!
- Spark 流 1.6.0
- Apache Kafka 10.0.1
我使用 Spark Streaming 阅读 sample
主题。代码 运行s 没有错误或异常,但我没有通过 print()
方法在控制台上获得任何数据。
我查看了主题中是否有留言:
./bin/kafka-console-consumer.sh \
--zookeeper ip-172-xx-xx-xxx:2181 \
--topic sample \
--from-beginning
我收到消息:
message no. 1
message no. 2
message no. 3
message no. 4
message no. 5
运行 流作业的命令:
./bin/spark-submit \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MaxDirectMemorySize=512m" \
--jars /home/ubuntu/zifferlabs/target/ZifferLabs-1-jar-with-dependencies.jar \
--class "com.zifferlabs.stream.SampleStream" \
/home/ubuntu/zifferlabs/src/main/java/com/zifferlabs/stream/SampleStream.java
完整代码如下:
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
public class SampleStream {
private static void processStream() {
SparkConf conf = new SparkConf().setAppName("sampleStream")
.setMaster("local[3]")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
.set("spark.driver.memory", "2g").set("spark.streaming.blockInterval", "1000")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.scheduler.mode", "FAIR");
JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(Long.parseLong("2000")));
String[] topics = "sample".split(",");
Set<String> topicSet = new HashSet<String>(Arrays.asList(topics));
Map<String, String> props = new HashMap<String, String>();
props.put("metadata.broker.list", "ip-172-xx-xx-xxx:9092");
props.put("kafka.consumer.id", "sample_con");
props.put("group.id", "sample_group");
props.put("zookeeper.connect", "ip-172-xx-xx-xxx:2181");
props.put("zookeeper.connection.timeout.ms", "16000");
JavaPairInputDStream<String, byte[]> kafkaStream =
KafkaUtils.createDirectStream(jsc, String.class, byte[].class, StringDecoder.class,
DefaultDecoder.class, props, topicSet);
JavaDStream<String> data = kafkaStream.map(new Function<Tuple2<String,byte[]>, String>() {
public String call(Tuple2<String, byte[]> arg0) throws Exception {
System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ value is: " + arg0._2().toString());
return arg0._2().toString();
}
});
data.print();
System.out.println("Spark Streaming started....");
jsc.checkpoint("/home/spark/sparkChkPoint");
jsc.start();
jsc.awaitTermination();
System.out.println("Stopped Spark Streaming");
}
public static void main(String[] args) {
processStream();
}
}
我认为你的代码是正确的,但是执行它的命令行不正确。
您 spark-submit
申请如下(格式是我的 + spark.executor.extraJavaOptions
为简单起见删除):
./bin/spark-submit \
--jars /home/ubuntu/zifferlabs/target/ZifferLabs-1-jar-with-dependencies.jar \
--class "com.zifferlabs.stream.SampleStream" \
/home/ubuntu/zifferlabs/src/main/java/com/zifferlabs/stream/SampleStream.java
我 认为 它不会工作,因为 spark-submit
提交了你的 Java 源代码 而不是可执行文件代码。
请spark-submit
您的申请如下:
./bin/spark-submit \
--class "com.zifferlabs.stream.SampleStream" \
/home/ubuntu/zifferlabs/target/ZifferLabs-1-jar-with-dependencies.jar
这是 --class
来为您的 Spark 应用程序和具有依赖关系的代码定义 "entry point"(作为 spark-submit
的唯一输入参数)。
试一试并反馈!