将kafka偏移量附加到foreachRDD中的每条记录
Attach kafka offset to each record in foreachRDD
我想在 foreachRDD 方法中检索我的 RDD 的每条记录上的每个 kafka 偏移量。我的主题有一个分区,所以我的 RDD 也有一个分区。我基本上会尝试这样的事情:
dStream.foreachRDD { rdd =>
if (!rdd.isEmpty) {
//get offset first value of the offset
val firstOffset = rdd.asInstanceOf[HasOffsetRanges].offsetRanges(0).fromOffset
val rddWithOffset = rdd.map(_.value)
.zipWithIndex()
.map{ case (v,i) => (v,i + firstOffset)}
}
}
例如,在我的生产者中,我使用循环发送消息,并将索引放在名为 position 的列中,如下所示:
+------+-----+--------+
| name| age|position|
+------+-----+--------+
|johnny| 26| 1|
| chloe| 42| 2|
| brian| 19| 3|
| eliot| 35| 4|
+------+-----+--------+
不幸的是,我注意到当我在我的消费者中添加偏移量列时没有维护订单:
+------+-----+--------+------+
| name| age|position|offset|
+------+-----+--------+------+
|johnny| 26| 1| 1|
| chloe| 42| 2| 3|
| brian| 19| 3| 4|
| eliot| 35| 4| 2|
+------+-----+--------+------+
看来我在这个过程中失去了秩序。
你有什么主意吗?谢谢
顺便说一下,我的 Java 制作人是这样的:
KafkaRestProducer<String, Object> producer = new KafkaRestProducer<>(props);
ArrayList<String> names = new ArrayList<String>()
names.add("johnny")
names.add("chloe")
names.add("brian")
names.add("eliot")
ArrayList<Integer> ages = ArrayList<Integer>()
names.add(26)
names.add(42)
names.add(19)
names.add(35)
for (int i = 0; i < 3; ++i) {
String name = names(i)
Int age = ages(i)
Person person = Person
.newBuilder()
.setName(name)
.setAge(age)
.setPosition(i)
.build();
ProducerRecord<String, Object> record = new ProducerRecord<>("/apps/PERSON/streams:myTopic", name, person);
producer.send(record, null);
System.out.println(i);
}
我的英语很差。我使用此代码:
val Array(brokers, topic, groupId) = args
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "group.id" -> groupId)
val topicPartition = Map[TopicAndPartition, Long](TopicAndPartition(topic, 0) -> 1.toLong)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.offset, mmd.message)
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (Long, String)](
ssc, kafkaParams, topicPartition, messageHandler)
kafkaStream.foreachRDD(rdd => rdd.foreach(println))
输出:
(偏移量,lineOfMessage)
...
我想在 foreachRDD 方法中检索我的 RDD 的每条记录上的每个 kafka 偏移量。我的主题有一个分区,所以我的 RDD 也有一个分区。我基本上会尝试这样的事情:
dStream.foreachRDD { rdd =>
if (!rdd.isEmpty) {
//get offset first value of the offset
val firstOffset = rdd.asInstanceOf[HasOffsetRanges].offsetRanges(0).fromOffset
val rddWithOffset = rdd.map(_.value)
.zipWithIndex()
.map{ case (v,i) => (v,i + firstOffset)}
}
}
例如,在我的生产者中,我使用循环发送消息,并将索引放在名为 position 的列中,如下所示:
+------+-----+--------+
| name| age|position|
+------+-----+--------+
|johnny| 26| 1|
| chloe| 42| 2|
| brian| 19| 3|
| eliot| 35| 4|
+------+-----+--------+
不幸的是,我注意到当我在我的消费者中添加偏移量列时没有维护订单:
+------+-----+--------+------+
| name| age|position|offset|
+------+-----+--------+------+
|johnny| 26| 1| 1|
| chloe| 42| 2| 3|
| brian| 19| 3| 4|
| eliot| 35| 4| 2|
+------+-----+--------+------+
看来我在这个过程中失去了秩序。 你有什么主意吗?谢谢
顺便说一下,我的 Java 制作人是这样的:
KafkaRestProducer<String, Object> producer = new KafkaRestProducer<>(props);
ArrayList<String> names = new ArrayList<String>()
names.add("johnny")
names.add("chloe")
names.add("brian")
names.add("eliot")
ArrayList<Integer> ages = ArrayList<Integer>()
names.add(26)
names.add(42)
names.add(19)
names.add(35)
for (int i = 0; i < 3; ++i) {
String name = names(i)
Int age = ages(i)
Person person = Person
.newBuilder()
.setName(name)
.setAge(age)
.setPosition(i)
.build();
ProducerRecord<String, Object> record = new ProducerRecord<>("/apps/PERSON/streams:myTopic", name, person);
producer.send(record, null);
System.out.println(i);
}
我的英语很差。我使用此代码:
val Array(brokers, topic, groupId) = args
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "group.id" -> groupId)
val topicPartition = Map[TopicAndPartition, Long](TopicAndPartition(topic, 0) -> 1.toLong)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.offset, mmd.message)
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (Long, String)](
ssc, kafkaParams, topicPartition, messageHandler)
kafkaStream.foreachRDD(rdd => rdd.foreach(println))
输出: (偏移量,lineOfMessage) ...