Spark Direct Streaming - 在多个消费者中使用相同的消息
Spark Direct Streaming - consume same message in multiple consumers
如何使用 Direct Stream approach?
在多个消费者中消费 Kakfa topic messages
可能吗?由于 Direct Stream 方法没有 Consumer Group
概念。
如果我将 group.id
作为 kafkaparams 传递给 DirectStream 方法,会发生什么情况?下面的代码工作 with group.id
作为 Kafka 参数也 without group.id
.
示例代码:
val kafkaParams = Map(
"group.id" -> "group1",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> sasl,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
"metadata.broker.list" -> brokerList,
"zookeeper.connect" -> zookeeperURL
)
val dStream =
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicSet
).map(_._2)
没有任何反应,Sparks 直接流式处理方法根本不考虑组 ID 参数,因为它使用较低级别 SimpleConsumer
。您不能使用不同的 Spark 直接方法流使用相同的主题。您可以遵循使用组的旧的基于接收器的方法。
如何使用 Direct Stream approach?
Kakfa topic messages
可能吗?由于 Direct Stream 方法没有 Consumer Group
概念。
如果我将 group.id
作为 kafkaparams 传递给 DirectStream 方法,会发生什么情况?下面的代码工作 with group.id
作为 Kafka 参数也 without group.id
.
示例代码:
val kafkaParams = Map(
"group.id" -> "group1",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> sasl,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
"metadata.broker.list" -> brokerList,
"zookeeper.connect" -> zookeeperURL
)
val dStream =
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicSet
).map(_._2)
没有任何反应,Sparks 直接流式处理方法根本不考虑组 ID 参数,因为它使用较低级别 SimpleConsumer
。您不能使用不同的 Spark 直接方法流使用相同的主题。您可以遵循使用组的旧的基于接收器的方法。