在 map 方法中发布到 kafka 主题
publish to kafka topic within a map method
从映射函数 (SCALA) 写入 kafka 主题?
- 读取 FLINK 应用程序中的 kafka 主题
- 在地图函数中处理数据
- 问题陈述 - 在 map 函数中,我正在遍历列表。对于列表中的每个元素,我想发布到一个 kafka 主题。
- 当我从 map 中获取输出并接收它时,它可以工作,但是如果我尝试从 map 方法中推送到主题,它不会
是否可以从地图方法中发布到主题
// Main Function
def main(args: Array[String]) {
...
// some list
val list_ = ("a", "b", "c", "d")
// Setup Properties
val props = new Properties()
props.setProperty("zookeeper.connect", zookeeper_url + ":" + zookeeper_port)
props.setProperty("bootstrap.servers", broker_url + ":" + broker_port)
props.setProperty("auto.offset.reset", "earliest")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
...
// Connect to Source
val input_stream = env.addSource(new FlinkKafkaConsumer09[String](topic_in, new SimpleStringSchema(), properties))
// Process each Record
val stream = input_stream.map(x=> {
// loop through list "list_" -> variable in in Main
// and publish to topic_out
// -- THIS IS MY CURRENT ISSUE !!!)
// -- Does not work (No compile issue)
//
var producer2 = new KafkaProducer[String, String](props)
var record = new ProducerRecord(topic_out, "KEY", list(i))
producer2.send(record)
producer2.flush()
// ... Other process and return processed string
})
// publish to different topic of proccessed input string (Works)
stream.addSink(new FlinkKafkaProducer09[String](broker_url + ":" + broker_port, other_topic, new SimpleStringSchema()))
不要在 map 函数内创建 kafka 生产者,也不要尝试在 map 内写入 kafka 主题。老实说,我不能引用任何说这是个坏主意的东西……但这是个坏主意。
相反。将您的 map 函数更改为 flatMap(请参阅此处的第一个示例:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html)。
所以在你的循环中,你只需做 collector.collect(recordToPublishToKafka)
.
而不是在每个循环中都制作一个 kafka 生产者
并且您的接收器将在收集时发布每一个。
从映射函数 (SCALA) 写入 kafka 主题?
- 读取 FLINK 应用程序中的 kafka 主题
- 在地图函数中处理数据
- 问题陈述 - 在 map 函数中,我正在遍历列表。对于列表中的每个元素,我想发布到一个 kafka 主题。
- 当我从 map 中获取输出并接收它时,它可以工作,但是如果我尝试从 map 方法中推送到主题,它不会
是否可以从地图方法中发布到主题
// Main Function def main(args: Array[String]) { ... // some list val list_ = ("a", "b", "c", "d") // Setup Properties val props = new Properties() props.setProperty("zookeeper.connect", zookeeper_url + ":" + zookeeper_port) props.setProperty("bootstrap.servers", broker_url + ":" + broker_port) props.setProperty("auto.offset.reset", "earliest") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") ... // Connect to Source val input_stream = env.addSource(new FlinkKafkaConsumer09[String](topic_in, new SimpleStringSchema(), properties)) // Process each Record val stream = input_stream.map(x=> { // loop through list "list_" -> variable in in Main // and publish to topic_out // -- THIS IS MY CURRENT ISSUE !!!) // -- Does not work (No compile issue) // var producer2 = new KafkaProducer[String, String](props) var record = new ProducerRecord(topic_out, "KEY", list(i)) producer2.send(record) producer2.flush() // ... Other process and return processed string }) // publish to different topic of proccessed input string (Works) stream.addSink(new FlinkKafkaProducer09[String](broker_url + ":" + broker_port, other_topic, new SimpleStringSchema()))
不要在 map 函数内创建 kafka 生产者,也不要尝试在 map 内写入 kafka 主题。老实说,我不能引用任何说这是个坏主意的东西……但这是个坏主意。
相反。将您的 map 函数更改为 flatMap(请参阅此处的第一个示例:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html)。
所以在你的循环中,你只需做 collector.collect(recordToPublishToKafka)
.
并且您的接收器将在收集时发布每一个。