Kafka-spark 同步流处理作业
Kafka-spark Streaming processing jobs synchronically
我正在尝试使用 Kafka-connect 和 spark 进行简单测试
我写了一个自定义的 kafka-connect 来创建这个源记录
SourceRecord sr = new SourceRecord(null,
null,
destTopic,
Schema.STRING_SCHEMA,
cleanPath);
在火花中我收到这样的消息
val kafkaConsumerParams = Map[String, String](
"metadata.broker.list" -> prop.getProperty("kafka_host"),
"zookeeper.connect" -> prop.getProperty("zookeeper_host"),
"group.id" -> prop.getProperty("kafka_group_id"),
"schema.registry.url" -> prop.getProperty("schema_registry_url"),
"auto.offset.reset" -> prop.getProperty("auto_offset_reset")
)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConsumerParams, topicsSet)
val ds = messages.foreachRDD(rdd => {
val toPrint = rdd.map(t => {
val file_path = t._2
val startTime = DateTime.now()
Thread.sleep(1000 * 60)
1
}).sum()
LogUtils.getLogger(classOf[DeviceManager]).info(" toPrint = " + toPrint +" (number of flows calculated)")
})
}
当我使用连接器向所需主题发送多条消息时(在我的测试中它有 6 个分区)
睡眠线程获取所有消息,但同步而不是异步地执行它们。
当我创建一个简单的测试生产者时,睡眠是异步完成的。
我还创建了 2 个简单的消费者,并尝试了连接器和生产者,两个任务都是异步消费的
这意味着我的问题在于 spark 接收从连接器发送的消息的方式。
我不明白为什么任务的行为方式与我从制作人发送的任务不同。
我什至打印了 spark 收到的记录,它们完全一样
生产者发送记录
1: {partition=2, offset=11, value=something, key=null}
2: {partition=5, offset=9, value=something2, key=null}
连接发送记录
1: {partition=3, offset=9, value=something, key=null}
我项目中使用的版本是
<scala.version>2.11.7</scala.version>
<confluent.version>4.0.0</confluent.version>
<kafka.version>1.0.0</kafka.version>
<java.version>1.8</java.version>
<spark.version>2.0.0</spark.version>
依赖关系
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.0.0-RC1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
<scope>${global.scope}</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${confluent.version}</version>
<scope>${global.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
</dependency>
我们不能 运行 Spark-Kafka 异步流作业。但是我们可以 运行 它们并行,就像 Kafka 消费者所做的那样。为此,我们需要在 SparkConf()
中设置以下配置:
sparkConf.set("spark.streaming.concurrentJobs","4")
默认情况下,其值为"1"
。但我们可以将其覆盖为更高的值。
希望对您有所帮助!
我正在尝试使用 Kafka-connect 和 spark 进行简单测试
我写了一个自定义的 kafka-connect 来创建这个源记录
SourceRecord sr = new SourceRecord(null,
null,
destTopic,
Schema.STRING_SCHEMA,
cleanPath);
在火花中我收到这样的消息
val kafkaConsumerParams = Map[String, String](
"metadata.broker.list" -> prop.getProperty("kafka_host"),
"zookeeper.connect" -> prop.getProperty("zookeeper_host"),
"group.id" -> prop.getProperty("kafka_group_id"),
"schema.registry.url" -> prop.getProperty("schema_registry_url"),
"auto.offset.reset" -> prop.getProperty("auto_offset_reset")
)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConsumerParams, topicsSet)
val ds = messages.foreachRDD(rdd => {
val toPrint = rdd.map(t => {
val file_path = t._2
val startTime = DateTime.now()
Thread.sleep(1000 * 60)
1
}).sum()
LogUtils.getLogger(classOf[DeviceManager]).info(" toPrint = " + toPrint +" (number of flows calculated)")
})
}
当我使用连接器向所需主题发送多条消息时(在我的测试中它有 6 个分区) 睡眠线程获取所有消息,但同步而不是异步地执行它们。
当我创建一个简单的测试生产者时,睡眠是异步完成的。
我还创建了 2 个简单的消费者,并尝试了连接器和生产者,两个任务都是异步消费的 这意味着我的问题在于 spark 接收从连接器发送的消息的方式。 我不明白为什么任务的行为方式与我从制作人发送的任务不同。
我什至打印了 spark 收到的记录,它们完全一样
生产者发送记录
1: {partition=2, offset=11, value=something, key=null}
2: {partition=5, offset=9, value=something2, key=null}
连接发送记录
1: {partition=3, offset=9, value=something, key=null}
我项目中使用的版本是
<scala.version>2.11.7</scala.version>
<confluent.version>4.0.0</confluent.version>
<kafka.version>1.0.0</kafka.version>
<java.version>1.8</java.version>
<spark.version>2.0.0</spark.version>
依赖关系
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.0.0-RC1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
<scope>${global.scope}</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${confluent.version}</version>
<scope>${global.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
</dependency>
我们不能 运行 Spark-Kafka 异步流作业。但是我们可以 运行 它们并行,就像 Kafka 消费者所做的那样。为此,我们需要在 SparkConf()
中设置以下配置:
sparkConf.set("spark.streaming.concurrentJobs","4")
默认情况下,其值为"1"
。但我们可以将其覆盖为更高的值。
希望对您有所帮助!