Kafka主题分区和Spark执行器映射
Kafka topic partition and Spark executor mapping
我正在使用带有 kafka 主题的 spark streaming。主题是用 5 个分区创建的。我的所有消息都使用 tablename 作为键发布到 kafka 主题。
鉴于此,我假设 table 的所有消息都应该转到同一个分区。
但是我在 spark 日志消息中注意到相同 table 有时会转到执行者的节点 1,有时会转到执行者的节点 2。
我 运行 使用以下命令在 yarn-cluster 模式下编写代码:
spark-submit --name DataProcessor --master yarn-cluster --files /opt/ETL_JAR/executor-log4j-spark.xml,/opt/ETL_JAR/driver-log4j-spark.xml,/opt/ETL_JAR/application.properties --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=driver-log4j-spark.xml" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=executor-log4j-spark.xml" --class com.test.DataProcessor /opt/ETL_JAR/etl-all-1.0.jar
并且此提交在 node-1 上创建了 1 个驱动程序,在 node-1 和 node-2 上创建了 2 个执行程序。
我不希望 node-1 和 node-2 执行程序读取同一个分区。但这正在发生
也尝试了以下配置来指定消费者组但没有区别。
kafkaParams.put("group.id", "app1");
这就是我们使用 createDirectStream 方法创建流的方式
*不是通过动物园管理员。
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("auto.offset.reset", "largest");
kafkaParams.put("group.id", "app1");
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
完整代码:
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
public class DataProcessor2 implements Serializable {
private static final long serialVersionUID = 3071125481526170241L;
private static Logger log = LoggerFactory.getLogger("DataProcessor");
public static void main(String[] args) {
final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);
DataProcessorContextFactory3 factory = new DataProcessorContextFactory3();
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(sparkCheckPointDir, factory);
// Start the process
jssc.start();
jssc.awaitTermination();
}
}
class DataProcessorContextFactory3 implements JavaStreamingContextFactory, Serializable {
private static final long serialVersionUID = 6070911284191531450L;
private static Logger logger = LoggerFactory.getLogger(DataProcessorContextFactory.class);
DataProcessorContextFactory3() {
}
@Override
public JavaStreamingContext create() {
logger.debug("creating new context..!");
final String brokers = ApplicationProperties.getProperty(Consts.KAFKA_BROKERS_NAME);
final String topic = ApplicationProperties.getProperty(Consts.KAFKA_TOPIC_NAME);
final String app = "app1";
final String offset = ApplicationProperties.getProperty(Consts.KAFKA_CONSUMER_OFFSET, "largest");
logger.debug("Data processing configuration. brokers={}, topic={}, app={}, offset={}", brokers, topic, app,
offset);
if (StringUtils.isBlank(brokers) || StringUtils.isBlank(topic) || StringUtils.isBlank(app)) {
System.err.println("Usage: DataProcessor <brokers> <topic>\n" + Consts.KAFKA_BROKERS_NAME
+ " is a list of one or more Kafka brokers separated by comma\n" + Consts.KAFKA_TOPIC_NAME
+ " is a kafka topic to consume from \n\n\n");
System.exit(1);
}
final String majorVersion = "1.0";
final String minorVersion = "3";
final String version = majorVersion + "." + minorVersion;
final String applicationName = "DataProcessor-" + topic + "-" + version;
// for dev environment
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(applicationName);
// for cluster environment
//SparkConf sparkConf = new SparkConf().setAppName(applicationName);
final long sparkBatchDuration = Long
.valueOf(ApplicationProperties.getProperty(Consts.SPARK_BATCH_DURATION, "10"));
final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchDuration));
logger.debug("setting checkpoint directory={}", sparkCheckPointDir);
jssc.checkpoint(sparkCheckPointDir);
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("auto.offset.reset", offset);
kafkaParams.put("group.id", "app1");
// @formatter:off
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
// @formatter:on
processRDD(messages, app);
return jssc;
}
private void processRDD(JavaPairInputDStream<String, String> messages, final String app) {
JavaDStream<MsgStruct> rdd = messages.map(new MessageProcessFunction());
rdd.foreachRDD(new Function<JavaRDD<MsgStruct>, Void>() {
private static final long serialVersionUID = 250647626267731218L;
@Override
public Void call(JavaRDD<MsgStruct> currentRdd) throws Exception {
if (!currentRdd.isEmpty()) {
logger.debug("Receive RDD. Create JobDispatcherFunction at HOST={}", FunctionUtil.getHostName());
currentRdd.foreachPartition(new VoidFunction<Iterator<MsgStruct>>() {
@Override
public void call(Iterator<MsgStruct> arg0) throws Exception {
while(arg0.hasNext()){
System.out.println(arg0.next().toString());
}
}
});
} else {
logger.debug("Current RDD is empty.");
}
return null;
}
});
}
public static class MessageProcessFunction implements Function<Tuple2<String, String>, MsgStruct> {
@Override
public MsgStruct call(Tuple2<String, String> data) throws Exception {
String message = data._2();
System.out.println("message:"+message);
return MsgStruct.parse(message);
}
}
public static class MsgStruct implements Serializable{
private String message;
public static MsgStruct parse(String msg){
MsgStruct m = new MsgStruct();
m.message = msg;
return m;
}
public String toString(){
return "content inside="+message;
}
}
}
使用 DirectStream
方法,发送到 Kafka 分区的消息将到达同一个 Spark 分区是一个正确的假设。
我们不能假设每个 Spark 分区每次都会由同一个 Spark worker 处理。在每个批次间隔,为每个分区的每个 OffsetRange
创建 Spark 任务,并将其发送到集群进行处理,并登陆一些可用的 worker。
您要查找的分区位置。唯一的 partition locality that the direct kafka consumer supports 是在您的 Spark 和 Kafka 部署位于同一位置的情况下包含正在处理的偏移量范围的 kafka 主机;但这是我不常看到的部署拓扑。
如果您的要求规定需要具有主机位置,您应该查看 Apache Samza or Kafka Streams。
根据 Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher),您可以指定分区到 hosts 的显式映射。
假设您有两个主机(h1 和 h2),Kafka 主题 topic-name
有三个分区。以下关键代码将向您展示如何将指定分区映射到 Java.
中的主机
Map<TopicPartition, String> partitionMapToHost = new HashMap<>();
// partition 0 -> h1, partition 1 and 2 -> h2
partitionMapToHost.put(new TopicPartition("topic-name", 0), "h1");
partitionMapToHost.put(new TopicPartition("topic-name", 1), "h2");
partitionMapToHost.put(new TopicPartition("topic-name", 2), "h2");
List<String> topicCollection = Arrays.asList("topic-name");
Map<String, Object> kafkaParams = new HasMap<>();
kafkaParams.put("bootstrap.servers", "10.0.0.2:9092,10.0.0.3:9092");
kafkaParams.put("group.id", "group-id-name");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
JavaInputDStream<ConsumerRecord<String, String>> records = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferFixed(partitionMapToHost), // PreferFixed is the key
ConsumerStrategies.Subscribe(topicCollection, kafkaParams));
您还可以使用 LocationStrategies.PreferConsistent()
,它将分区均匀分布在可用的 执行程序中 ,并确保指定的分区仅由指定的执行程序使用。
我正在使用带有 kafka 主题的 spark streaming。主题是用 5 个分区创建的。我的所有消息都使用 tablename 作为键发布到 kafka 主题。 鉴于此,我假设 table 的所有消息都应该转到同一个分区。 但是我在 spark 日志消息中注意到相同 table 有时会转到执行者的节点 1,有时会转到执行者的节点 2。
我 运行 使用以下命令在 yarn-cluster 模式下编写代码:
spark-submit --name DataProcessor --master yarn-cluster --files /opt/ETL_JAR/executor-log4j-spark.xml,/opt/ETL_JAR/driver-log4j-spark.xml,/opt/ETL_JAR/application.properties --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=driver-log4j-spark.xml" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=executor-log4j-spark.xml" --class com.test.DataProcessor /opt/ETL_JAR/etl-all-1.0.jar
并且此提交在 node-1 上创建了 1 个驱动程序,在 node-1 和 node-2 上创建了 2 个执行程序。
我不希望 node-1 和 node-2 执行程序读取同一个分区。但这正在发生
也尝试了以下配置来指定消费者组但没有区别。
kafkaParams.put("group.id", "app1");
这就是我们使用 createDirectStream 方法创建流的方式 *不是通过动物园管理员。
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("auto.offset.reset", "largest");
kafkaParams.put("group.id", "app1");
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
完整代码:
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
public class DataProcessor2 implements Serializable {
private static final long serialVersionUID = 3071125481526170241L;
private static Logger log = LoggerFactory.getLogger("DataProcessor");
public static void main(String[] args) {
final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);
DataProcessorContextFactory3 factory = new DataProcessorContextFactory3();
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(sparkCheckPointDir, factory);
// Start the process
jssc.start();
jssc.awaitTermination();
}
}
class DataProcessorContextFactory3 implements JavaStreamingContextFactory, Serializable {
private static final long serialVersionUID = 6070911284191531450L;
private static Logger logger = LoggerFactory.getLogger(DataProcessorContextFactory.class);
DataProcessorContextFactory3() {
}
@Override
public JavaStreamingContext create() {
logger.debug("creating new context..!");
final String brokers = ApplicationProperties.getProperty(Consts.KAFKA_BROKERS_NAME);
final String topic = ApplicationProperties.getProperty(Consts.KAFKA_TOPIC_NAME);
final String app = "app1";
final String offset = ApplicationProperties.getProperty(Consts.KAFKA_CONSUMER_OFFSET, "largest");
logger.debug("Data processing configuration. brokers={}, topic={}, app={}, offset={}", brokers, topic, app,
offset);
if (StringUtils.isBlank(brokers) || StringUtils.isBlank(topic) || StringUtils.isBlank(app)) {
System.err.println("Usage: DataProcessor <brokers> <topic>\n" + Consts.KAFKA_BROKERS_NAME
+ " is a list of one or more Kafka brokers separated by comma\n" + Consts.KAFKA_TOPIC_NAME
+ " is a kafka topic to consume from \n\n\n");
System.exit(1);
}
final String majorVersion = "1.0";
final String minorVersion = "3";
final String version = majorVersion + "." + minorVersion;
final String applicationName = "DataProcessor-" + topic + "-" + version;
// for dev environment
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(applicationName);
// for cluster environment
//SparkConf sparkConf = new SparkConf().setAppName(applicationName);
final long sparkBatchDuration = Long
.valueOf(ApplicationProperties.getProperty(Consts.SPARK_BATCH_DURATION, "10"));
final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchDuration));
logger.debug("setting checkpoint directory={}", sparkCheckPointDir);
jssc.checkpoint(sparkCheckPointDir);
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("auto.offset.reset", offset);
kafkaParams.put("group.id", "app1");
// @formatter:off
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
// @formatter:on
processRDD(messages, app);
return jssc;
}
private void processRDD(JavaPairInputDStream<String, String> messages, final String app) {
JavaDStream<MsgStruct> rdd = messages.map(new MessageProcessFunction());
rdd.foreachRDD(new Function<JavaRDD<MsgStruct>, Void>() {
private static final long serialVersionUID = 250647626267731218L;
@Override
public Void call(JavaRDD<MsgStruct> currentRdd) throws Exception {
if (!currentRdd.isEmpty()) {
logger.debug("Receive RDD. Create JobDispatcherFunction at HOST={}", FunctionUtil.getHostName());
currentRdd.foreachPartition(new VoidFunction<Iterator<MsgStruct>>() {
@Override
public void call(Iterator<MsgStruct> arg0) throws Exception {
while(arg0.hasNext()){
System.out.println(arg0.next().toString());
}
}
});
} else {
logger.debug("Current RDD is empty.");
}
return null;
}
});
}
public static class MessageProcessFunction implements Function<Tuple2<String, String>, MsgStruct> {
@Override
public MsgStruct call(Tuple2<String, String> data) throws Exception {
String message = data._2();
System.out.println("message:"+message);
return MsgStruct.parse(message);
}
}
public static class MsgStruct implements Serializable{
private String message;
public static MsgStruct parse(String msg){
MsgStruct m = new MsgStruct();
m.message = msg;
return m;
}
public String toString(){
return "content inside="+message;
}
}
}
使用 DirectStream
方法,发送到 Kafka 分区的消息将到达同一个 Spark 分区是一个正确的假设。
我们不能假设每个 Spark 分区每次都会由同一个 Spark worker 处理。在每个批次间隔,为每个分区的每个 OffsetRange
创建 Spark 任务,并将其发送到集群进行处理,并登陆一些可用的 worker。
您要查找的分区位置。唯一的 partition locality that the direct kafka consumer supports 是在您的 Spark 和 Kafka 部署位于同一位置的情况下包含正在处理的偏移量范围的 kafka 主机;但这是我不常看到的部署拓扑。
如果您的要求规定需要具有主机位置,您应该查看 Apache Samza or Kafka Streams。
根据 Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher),您可以指定分区到 hosts 的显式映射。
假设您有两个主机(h1 和 h2),Kafka 主题 topic-name
有三个分区。以下关键代码将向您展示如何将指定分区映射到 Java.
Map<TopicPartition, String> partitionMapToHost = new HashMap<>();
// partition 0 -> h1, partition 1 and 2 -> h2
partitionMapToHost.put(new TopicPartition("topic-name", 0), "h1");
partitionMapToHost.put(new TopicPartition("topic-name", 1), "h2");
partitionMapToHost.put(new TopicPartition("topic-name", 2), "h2");
List<String> topicCollection = Arrays.asList("topic-name");
Map<String, Object> kafkaParams = new HasMap<>();
kafkaParams.put("bootstrap.servers", "10.0.0.2:9092,10.0.0.3:9092");
kafkaParams.put("group.id", "group-id-name");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
JavaInputDStream<ConsumerRecord<String, String>> records = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferFixed(partitionMapToHost), // PreferFixed is the key
ConsumerStrategies.Subscribe(topicCollection, kafkaParams));
您还可以使用 LocationStrategies.PreferConsistent()
,它将分区均匀分布在可用的 执行程序中 ,并确保指定的分区仅由指定的执行程序使用。