Kafka scala Consumer代码——打印消费记录
Kafka scala Consumer code - to print consumed records
因为我正在使用 url 创建简单的 kafka 消费者,如下所示:https://gist.github.com/akhil/6dfda8a04e33eff91a20 .
在那link中,为了打印消费记录,用了一个词"asScala",未被识别。请告诉我,如何迭代 return 类型:ConsumerRecord[String,String],它是 poll() 方法的 return 类型。
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
object KafkaConsumerEx extends App {
val topic_name = "newtopic55"
val consumer_group = "KafkaConsumerBatch"
val prot = new Properties()
prot.put("bootstrap.servers","localhost:9092")
prot.put("group.id",consumer_group)
prot.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prot.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
val kfk_consumer = new KafkaConsumer[String,String](prot)
kfk_consumer.subscribe(util.Collections.singleton(topic_name))
println("here")
while(true){
val consumer_record : ConsumerRecords[String, String] = kfk_consumer.poll(100)
println("records count : " + consumer_record.count())
println("records partitions: " + consumer_record.partitions())
consumer_record.iterator().
}
}
致谢
你可以轻松做到这一点
for (record <- consumer_record.iterator()) {
println(s"Here's your $record")
}
记得添加这个导入:
import scala.collection.JavaConversions._
添加另一个答案,因为 scala.collection.JavaConversions
已被弃用,如 here 中所述。
关于这个问题,代码可以这样
import scala.collection.JavaConverters._
for (record <- asScalaIterator(consumer_record.iterator)) {
println(s"Here's your $record")
}
while(true){
val consumer_records = kfk_consumer.poll(100)
val record_iter=consumer_record.iterator()
while(record_iter.hasNext())
{
record=record_iter.next()
println("records partitions: " + record.partition()
"records_data:" + record.value())
}
}
因为我正在使用 url 创建简单的 kafka 消费者,如下所示:https://gist.github.com/akhil/6dfda8a04e33eff91a20 .
在那link中,为了打印消费记录,用了一个词"asScala",未被识别。请告诉我,如何迭代 return 类型:ConsumerRecord[String,String],它是 poll() 方法的 return 类型。
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
object KafkaConsumerEx extends App {
val topic_name = "newtopic55"
val consumer_group = "KafkaConsumerBatch"
val prot = new Properties()
prot.put("bootstrap.servers","localhost:9092")
prot.put("group.id",consumer_group)
prot.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prot.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
val kfk_consumer = new KafkaConsumer[String,String](prot)
kfk_consumer.subscribe(util.Collections.singleton(topic_name))
println("here")
while(true){
val consumer_record : ConsumerRecords[String, String] = kfk_consumer.poll(100)
println("records count : " + consumer_record.count())
println("records partitions: " + consumer_record.partitions())
consumer_record.iterator().
}
}
致谢
你可以轻松做到这一点
for (record <- consumer_record.iterator()) {
println(s"Here's your $record")
}
记得添加这个导入:
import scala.collection.JavaConversions._
添加另一个答案,因为 scala.collection.JavaConversions
已被弃用,如 here 中所述。
关于这个问题,代码可以这样
import scala.collection.JavaConverters._
for (record <- asScalaIterator(consumer_record.iterator)) {
println(s"Here's your $record")
}
while(true){
val consumer_records = kfk_consumer.poll(100)
val record_iter=consumer_record.iterator()
while(record_iter.hasNext())
{
record=record_iter.next()
println("records partitions: " + record.partition()
"records_data:" + record.value())
}
}