Spark Structured Stream 只从 Kafka 的一个分区获取消息
Spark Structured Stream get messages from only one partition of Kafka
我遇到了 spark 只能从 Kafka 2-patition 主题的一个分区流式传输和获取消息的情况。
我的主题:
C:\bigdata\kafka_2.11-0.10.1.1\bin\windows>kafka-topics --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic test4
卡夫卡生产者:
public class KafkaFileProducer {
// kafka producer
Producer<String, String> producer;
public KafkaFileProducer() {
// configs
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
//props.put("group.id", "testgroup");
props.put("batch.size", "16384");
props.put("auto.commit.interval.ms", "1000");
props.put("linger.ms", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("block.on.buffer.full", "true");
// instantiate a producer
producer = new KafkaProducer<String, String>(props);
}
/**
* @param filePath
*/
public void sendFile(String filePath) {
FileInputStream fis;
BufferedReader br = null;
try {
fis = new FileInputStream(filePath);
//Construct BufferedReader from InputStreamReader
br = new BufferedReader(new InputStreamReader(fis));
int count = 0;
String line = null;
while ((line = br.readLine()) != null) {
count ++;
// dont send the header
if (count > 1) {
producer.send(new ProducerRecord<String, String>("test4", count + "", line));
Thread.sleep(10);
}
}
System.out.println("Sent " + count + " lines of data");
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
producer.close();
}
}
}
Spark 结构化流:
System.setProperty("hadoop.home.dir", "C:\bigdata\winutils");
final SparkSession sparkSession = SparkSession.builder().appName("Spark Data Processing").master("local[2]").getOrCreate();
// create kafka stream to get the lines
Dataset<Tuple2<String, String>> stream = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test4")
.option("startingOffsets", "{\"test4\":{\"0\":-1,\"1\":-1}}")
.option("failOnDataLoss", "false")
.load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as(Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
Dataset<String> lines = stream.map((MapFunction<Tuple2<String, String>, String>) (Tuple2<String, String> tuple) -> tuple._2, Encoders.STRING());
Dataset<Row> result = lines.groupBy().count();
// Start running the query that prints the running counts to the console
StreamingQuery query = result//.orderBy("callTimeBin")
.writeStream()
.outputMode("complete")
.format("console")
.start();
// wait for the query to finish
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
当我运行生产者在一个文件中发送100行时,查询只返回了51行。我阅读了 spark 的调试日志并注意到如下内容:
17/02/15 10:52:49 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map(watermark -> 1970-01-01T00:00:00.000Z))
17/02/15 10:52:49 DEBUG StreamExecution: Starting Trigger Calculation
17/02/15 10:52:49 DEBUG KafkaConsumer: Pausing partition test4-1
17/02/15 10:52:49 DEBUG KafkaConsumer: Pausing partition test4-0
17/02/15 10:52:49 DEBUG KafkaSource: Partitions assigned to consumer: [test4-1, test4-0]. Seeking to the end.
17/02/15 10:52:49 DEBUG KafkaConsumer: Seeking to end of partition test4-1
17/02/15 10:52:49 DEBUG KafkaConsumer: Seeking to end of partition test4-0
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-1 to latest offset.
17/02/15 10:52:49 DEBUG Fetcher: **Fetched {timestamp=-1, offset=49} for partition test4-1
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-1 to earliest offset.
17/02/15 10:52:49 DEBUG Fetcher: Fetched {timestamp=-1, offset=0} for partition test4-1**
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-0 to latest offset.
17/02/15 10:52:49 DEBUG Fetcher: Fetched {timestamp=-1, offset=51} for partition test4-0
17/02/15 10:52:49 DEBUG KafkaSource: Got latest offsets for partition : Map(test4-1 -> 0, test4-0 -> 51)
17/02/15 10:52:49 DEBUG KafkaSource: GetOffset: ArrayBuffer((test4-0,51), (test4-1,0))
17/02/15 10:52:49 DEBUG StreamExecution: getOffset took 0 ms
17/02/15 10:52:49 DEBUG StreamExecution: triggerExecution took 0 ms
我不知道为什么 test4-1 总是重置为最小偏移量。
如果有人知道如何从所有分区获取所有消息,我将不胜感激。
谢谢,
0.10.1 中存在一个已知的 Kafka 问题。* 客户端:https://issues.apache.org/jira/browse/KAFKA-4547
现在您可以使用 0.10.0.1 客户端作为解决方法。它可以与 Kafka 0.10.1.* 集群通信。
有关详细信息,请参阅 https://issues.apache.org/jira/browse/SPARK-18779。
我遇到了 spark 只能从 Kafka 2-patition 主题的一个分区流式传输和获取消息的情况。
我的主题:
C:\bigdata\kafka_2.11-0.10.1.1\bin\windows>kafka-topics --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic test4
卡夫卡生产者:
public class KafkaFileProducer {
// kafka producer
Producer<String, String> producer;
public KafkaFileProducer() {
// configs
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
//props.put("group.id", "testgroup");
props.put("batch.size", "16384");
props.put("auto.commit.interval.ms", "1000");
props.put("linger.ms", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("block.on.buffer.full", "true");
// instantiate a producer
producer = new KafkaProducer<String, String>(props);
}
/**
* @param filePath
*/
public void sendFile(String filePath) {
FileInputStream fis;
BufferedReader br = null;
try {
fis = new FileInputStream(filePath);
//Construct BufferedReader from InputStreamReader
br = new BufferedReader(new InputStreamReader(fis));
int count = 0;
String line = null;
while ((line = br.readLine()) != null) {
count ++;
// dont send the header
if (count > 1) {
producer.send(new ProducerRecord<String, String>("test4", count + "", line));
Thread.sleep(10);
}
}
System.out.println("Sent " + count + " lines of data");
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
producer.close();
}
}
}
Spark 结构化流:
System.setProperty("hadoop.home.dir", "C:\bigdata\winutils");
final SparkSession sparkSession = SparkSession.builder().appName("Spark Data Processing").master("local[2]").getOrCreate();
// create kafka stream to get the lines
Dataset<Tuple2<String, String>> stream = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test4")
.option("startingOffsets", "{\"test4\":{\"0\":-1,\"1\":-1}}")
.option("failOnDataLoss", "false")
.load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as(Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
Dataset<String> lines = stream.map((MapFunction<Tuple2<String, String>, String>) (Tuple2<String, String> tuple) -> tuple._2, Encoders.STRING());
Dataset<Row> result = lines.groupBy().count();
// Start running the query that prints the running counts to the console
StreamingQuery query = result//.orderBy("callTimeBin")
.writeStream()
.outputMode("complete")
.format("console")
.start();
// wait for the query to finish
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
当我运行生产者在一个文件中发送100行时,查询只返回了51行。我阅读了 spark 的调试日志并注意到如下内容:
17/02/15 10:52:49 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map(watermark -> 1970-01-01T00:00:00.000Z))
17/02/15 10:52:49 DEBUG StreamExecution: Starting Trigger Calculation
17/02/15 10:52:49 DEBUG KafkaConsumer: Pausing partition test4-1
17/02/15 10:52:49 DEBUG KafkaConsumer: Pausing partition test4-0
17/02/15 10:52:49 DEBUG KafkaSource: Partitions assigned to consumer: [test4-1, test4-0]. Seeking to the end.
17/02/15 10:52:49 DEBUG KafkaConsumer: Seeking to end of partition test4-1
17/02/15 10:52:49 DEBUG KafkaConsumer: Seeking to end of partition test4-0
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-1 to latest offset.
17/02/15 10:52:49 DEBUG Fetcher: **Fetched {timestamp=-1, offset=49} for partition test4-1
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-1 to earliest offset.
17/02/15 10:52:49 DEBUG Fetcher: Fetched {timestamp=-1, offset=0} for partition test4-1**
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-0 to latest offset.
17/02/15 10:52:49 DEBUG Fetcher: Fetched {timestamp=-1, offset=51} for partition test4-0
17/02/15 10:52:49 DEBUG KafkaSource: Got latest offsets for partition : Map(test4-1 -> 0, test4-0 -> 51)
17/02/15 10:52:49 DEBUG KafkaSource: GetOffset: ArrayBuffer((test4-0,51), (test4-1,0))
17/02/15 10:52:49 DEBUG StreamExecution: getOffset took 0 ms
17/02/15 10:52:49 DEBUG StreamExecution: triggerExecution took 0 ms
我不知道为什么 test4-1 总是重置为最小偏移量。
如果有人知道如何从所有分区获取所有消息,我将不胜感激。 谢谢,
0.10.1 中存在一个已知的 Kafka 问题。* 客户端:https://issues.apache.org/jira/browse/KAFKA-4547
现在您可以使用 0.10.0.1 客户端作为解决方法。它可以与 Kafka 0.10.1.* 集群通信。
有关详细信息,请参阅 https://issues.apache.org/jira/browse/SPARK-18779。