订阅相同 kafka 主题的 Spark 流应用程序
Spark streaming applications subscribing to same kafka topic
我是 spark 和 kafka 的新手,我对 kafka 的 spark streaming 使用模式略有不同。
我正在使用
spark-core_2.10 - 2.1.1
spark-streaming_2.10 - 2.1.1
spark-streaming-kafka-0-10_2.10 - 2.0.0
kafka_2.10 - 0.10.1.1
连续事件数据正在流式传输到我需要从多个 spark 流应用程序处理的 kafka 主题。但是当我 运行 火花流应用程序时,只有其中一个接收数据。
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("group.id", "test-consumer-group");
kafkaParams.put("enable.auto.commit", "true");
kafkaParams.put("auto.commit.interval.ms", "1000");
kafkaParams.put("session.timeout.ms", "30000");
Collection<String> topics = Arrays.asList("4908100105999_000005");;
JavaInputDStream<ConsumerRecord<String, String>> stream = org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams) );
... //spark processing
我有两个 spark streaming 应用程序,通常我提交的第一个应用程序使用 kafka 消息。第二个应用程序只是等待消息,从不继续。
正如我所读,kafka 主题可以从多个消费者那里订阅,spark streaming 不是这样吗?或者我在 kafka 主题及其配置中缺少什么?
提前致谢。
您可以使用相同的组 ID 创建不同的流。以下是 0.8 集成在线文档的更多详细信息,有两种方法:
方法 1:基于接收者的方法
Multiple Kafka input DStreams can be created with different groups and
topics for parallel receiving of data using multiple receivers.
方法 2:直接方法(无接收器)
No need to create multiple input Kafka streams and union them. With
directStream, Spark Streaming will create as many RDD partitions as
there are Kafka partitions to consume, which will all read data from
Kafka in parallel. So there is a one-to-one mapping between Kafka and
RDD partitions, which is easier to understand and tune.
您可以在 Spark Streaming + Kafka Integration Guide 0.8
阅读更多内容
从你的代码看来你使用的是 0.10,参考 Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0
即使认为它使用的是 spark streaming api,一切都由 kafka 属性控制,因此取决于您在属性文件中指定的组 ID,您可以启动具有不同组 ID 的多个流。
干杯!
消费者数量[一个消费者组下],不能超过topic的partition数量。如果要并行消费消息,则需要引入适当数量的分区并创建接收者来处理每个分区。
我是 spark 和 kafka 的新手,我对 kafka 的 spark streaming 使用模式略有不同。 我正在使用
spark-core_2.10 - 2.1.1
spark-streaming_2.10 - 2.1.1
spark-streaming-kafka-0-10_2.10 - 2.0.0
kafka_2.10 - 0.10.1.1
连续事件数据正在流式传输到我需要从多个 spark 流应用程序处理的 kafka 主题。但是当我 运行 火花流应用程序时,只有其中一个接收数据。
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("group.id", "test-consumer-group");
kafkaParams.put("enable.auto.commit", "true");
kafkaParams.put("auto.commit.interval.ms", "1000");
kafkaParams.put("session.timeout.ms", "30000");
Collection<String> topics = Arrays.asList("4908100105999_000005");;
JavaInputDStream<ConsumerRecord<String, String>> stream = org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams) );
... //spark processing
我有两个 spark streaming 应用程序,通常我提交的第一个应用程序使用 kafka 消息。第二个应用程序只是等待消息,从不继续。 正如我所读,kafka 主题可以从多个消费者那里订阅,spark streaming 不是这样吗?或者我在 kafka 主题及其配置中缺少什么?
提前致谢。
您可以使用相同的组 ID 创建不同的流。以下是 0.8 集成在线文档的更多详细信息,有两种方法:
方法 1:基于接收者的方法
Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers.
方法 2:直接方法(无接收器)
No need to create multiple input Kafka streams and union them. With directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
您可以在 Spark Streaming + Kafka Integration Guide 0.8
阅读更多内容从你的代码看来你使用的是 0.10,参考 Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0
即使认为它使用的是 spark streaming api,一切都由 kafka 属性控制,因此取决于您在属性文件中指定的组 ID,您可以启动具有不同组 ID 的多个流。
干杯!
消费者数量[一个消费者组下],不能超过topic的partition数量。如果要并行消费消息,则需要引入适当数量的分区并创建接收者来处理每个分区。