Spark Streaming - 是否可以使用 Kafka 主题的特定分区?
Spark Streaming - Is it possible to consume a specific partition of Kafka Topic?
我正在尝试使用 Spark Streaming 使用 Kafka 主题的特定分区。
我在 KafkaUtils class 中没有看到此用例的任何方法。
有一种叫做createRDD
的方法,它基本上是期待offsets
,它只对非流应用程序有用。有没有其他方法可以使用 Spark Streaming 使用 Kafka 主题的特定分区?
没有办法使用单个分区,我们可以使用的最细粒度的是一个主题。但是,有一种方法可以指定说给定消息源自特定分区。您可以在使用 createDirectStream
的重载时执行此操作,该重载需要 Function1[MessageAndMetadata, R]
.
例如,假设我们有一个 String
类型的键和消息,并且我们目前只使用一个主题。我们可以做到:
val topicAndPartition: Map[TopicAndPartition, Long] = ???
val kafkaProperties: Map[String, String] = ???
KafkaUtils.createDirectStream[String,
String,
StringDecoder,
StringDecoder,
(String, String)](
streamingContext,
kafkaConfig.properties,
topicAndPartition,
(mam: MessageAndMetadata[String, String]) =>
(mam.partition, mam.message())
这样,我将输出分区 (1) 和底层消息 (2) 的元组。然后,我可以过滤此 DStream[(String, String)]
以仅包含来自特定分区的消息:
val filteredStream = kafkaDStream.filter { case (partition, _) => partition == 4 }
如果我们从多个主题中消费,我们需要输出主题和分区的元组,以便使用正确的主题过滤分区。幸运的是,我们可以使用名为 TopicAndPartition
的便利案例 class。我们有:
(mam: MessageAndMetadata[String, String]) =>
(TopicAndPartition(mam.topic(), mam.partition()), mam.message())
然后:
val filteredStream = kafkaDStream.filter {
case (tap, _) => tap.topic == "mytopic" && tap.partition == 4
}
我正在尝试使用 Spark Streaming 使用 Kafka 主题的特定分区。
我在 KafkaUtils class 中没有看到此用例的任何方法。
有一种叫做createRDD
的方法,它基本上是期待offsets
,它只对非流应用程序有用。有没有其他方法可以使用 Spark Streaming 使用 Kafka 主题的特定分区?
没有办法使用单个分区,我们可以使用的最细粒度的是一个主题。但是,有一种方法可以指定说给定消息源自特定分区。您可以在使用 createDirectStream
的重载时执行此操作,该重载需要 Function1[MessageAndMetadata, R]
.
例如,假设我们有一个 String
类型的键和消息,并且我们目前只使用一个主题。我们可以做到:
val topicAndPartition: Map[TopicAndPartition, Long] = ???
val kafkaProperties: Map[String, String] = ???
KafkaUtils.createDirectStream[String,
String,
StringDecoder,
StringDecoder,
(String, String)](
streamingContext,
kafkaConfig.properties,
topicAndPartition,
(mam: MessageAndMetadata[String, String]) =>
(mam.partition, mam.message())
这样,我将输出分区 (1) 和底层消息 (2) 的元组。然后,我可以过滤此 DStream[(String, String)]
以仅包含来自特定分区的消息:
val filteredStream = kafkaDStream.filter { case (partition, _) => partition == 4 }
如果我们从多个主题中消费,我们需要输出主题和分区的元组,以便使用正确的主题过滤分区。幸运的是,我们可以使用名为 TopicAndPartition
的便利案例 class。我们有:
(mam: MessageAndMetadata[String, String]) =>
(TopicAndPartition(mam.topic(), mam.partition()), mam.message())
然后:
val filteredStream = kafkaDStream.filter {
case (tap, _) => tap.topic == "mytopic" && tap.partition == 4
}