使用 Spark StreamingContext 从 Kafka 主题消费
Using Spark StreamingContext to Consume from Kafka topic
我是 Spark 和 Kafka 的新手,我正在尝试获取一些 Scala 代码(运行ning 作为 Spark 作业)作为一个长期 运行ning 进程(不仅仅是一个short-lived/scheduled 任务)并持续轮询 Kafka 代理以获取消息。当它收到消息时,我只想将它们打印到 console/STDOUT。同样,这需要一个漫长的 运行 宁过程,并且基本上(尝试)永生。
经过一番挖掘,我似乎想使用 StreamingContext
。这是我的最佳尝试:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.storage._
import org.apache.spark.streaming.{StreamingContext, Seconds, Minutes, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
def createKafkaStream(ssc: StreamingContext, kafkaTopics: String, brokers: String): DStream[(String, String)] = {
val topicsSet = kafkaTopics.split(",").toSet
val props = Map(
"bootstrap.servers" -> "my-kafka.example.com:9092",
"metadata.broker.list" -> "my-kafka.example.com:9092",
"serializer.class" -> "kafka.serializer.StringEncoder",
"value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, props, topicsSet)
}
def processEngine(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(1))
val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print()
ssc
}
StreamingContext.getActive.foreach {
_.stop(stopSparkContext = false)
}
val ssc1 = StreamingContext.getActiveOrCreate(processEngine)
ssc1.start()
ssc1.awaitTermination()
当我 运行 这个时,我没有得到 exceptions/errors,但似乎什么也没有发生。我可以确认有关于该主题的消息。关于我哪里出错有什么想法吗?
当您 foreachRDD
时,输出打印在 Worker 节点,而不是 Master。我假设您正在查看 Master 的控制台输出。您可以使用 DStream.print
代替:
val ssc = new StreamingContext(sc, Seconds(1))
val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print()
此外,不要忘记在 ssc.start()
之后调用 ssc.awaitTermination()
:
ssc.start()
ssc.awaitTermination()
作为旁注,我假设您复制粘贴了此示例,但如果您实际上不打算对 DStream
执行任何操作,则无需在 DStream
上使用 transform
=18=].
这是你的完整代码吗?你在哪里创建 sc?您必须在流上下文之前创建火花上下文。你可以这样创建 sc :
SparkConf sc = new SparkConf().setAppName("SparkConsumer");
此外,如果没有 awaitTermination
,很难捕获和打印后台数据处理过程中发生的异常。你能不能在最后加上ssc1.awaitTermination();
,看看有没有报错。
我是 Spark 和 Kafka 的新手,我正在尝试获取一些 Scala 代码(运行ning 作为 Spark 作业)作为一个长期 运行ning 进程(不仅仅是一个short-lived/scheduled 任务)并持续轮询 Kafka 代理以获取消息。当它收到消息时,我只想将它们打印到 console/STDOUT。同样,这需要一个漫长的 运行 宁过程,并且基本上(尝试)永生。
经过一番挖掘,我似乎想使用 StreamingContext
。这是我的最佳尝试:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.storage._
import org.apache.spark.streaming.{StreamingContext, Seconds, Minutes, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
def createKafkaStream(ssc: StreamingContext, kafkaTopics: String, brokers: String): DStream[(String, String)] = {
val topicsSet = kafkaTopics.split(",").toSet
val props = Map(
"bootstrap.servers" -> "my-kafka.example.com:9092",
"metadata.broker.list" -> "my-kafka.example.com:9092",
"serializer.class" -> "kafka.serializer.StringEncoder",
"value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, props, topicsSet)
}
def processEngine(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(1))
val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print()
ssc
}
StreamingContext.getActive.foreach {
_.stop(stopSparkContext = false)
}
val ssc1 = StreamingContext.getActiveOrCreate(processEngine)
ssc1.start()
ssc1.awaitTermination()
当我 运行 这个时,我没有得到 exceptions/errors,但似乎什么也没有发生。我可以确认有关于该主题的消息。关于我哪里出错有什么想法吗?
当您 foreachRDD
时,输出打印在 Worker 节点,而不是 Master。我假设您正在查看 Master 的控制台输出。您可以使用 DStream.print
代替:
val ssc = new StreamingContext(sc, Seconds(1))
val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print()
此外,不要忘记在 ssc.start()
之后调用 ssc.awaitTermination()
:
ssc.start()
ssc.awaitTermination()
作为旁注,我假设您复制粘贴了此示例,但如果您实际上不打算对 DStream
执行任何操作,则无需在 DStream
上使用 transform
=18=].
这是你的完整代码吗?你在哪里创建 sc?您必须在流上下文之前创建火花上下文。你可以这样创建 sc :
SparkConf sc = new SparkConf().setAppName("SparkConsumer");
此外,如果没有 awaitTermination
,很难捕获和打印后台数据处理过程中发生的异常。你能不能在最后加上ssc1.awaitTermination();
,看看有没有报错。