如何将记录从 Kafka 传递给方法?

How to pass records from Kafka to method?

我有一个 Kafka 队列,我从那里读取数据如下:

private static void startKafkaConsumerStream() {

        try {

            System.out.println("Print method: startKafkaConsumerStream");

            Dataset<String> lines = (Dataset<String>) _spark
                    .readStream()
                    .format("kafka")
                    .option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
                    .option("subscribe", HTTP_FED_VO_TOPIC)
                    .option("startingOffsets", "latest")
                    .load()
                    .selectExpr("CAST(value AS STRING)")
                    .as(Encoders.STRING());

            StreamingQuery query = lines.writeStream()
                    .outputMode("append")
                    .format("console")
                    .start();

            query.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

要求:使用上面的代码,我可以将记录打印到控制台但是,我很危险,因为我如何将这些传递给将处理它们的方法.

为此,我尝试查看文档但找不到任何相关内容。由于我是这方面的新手,这听起来可能有点傻。但是我被卡住了,非常感谢任何提示。

应用程序的目标应用程序的目标是接受请求并将其发送到 Kafka,然后在一个单独的线程中实现 Kafka reader 这是负责读取和处理请求并将输出生成到另一个 Kafka 队列。我只是在实现这个,架构不是我的想法。

您可以在 kafka 流应用程序的接收器部分使用 ForeachWriter[T] 来处理查询的每一行,如下所示:

   datasetOfString.write.foreach(new ForeachWriter[String] {

     def open(partitionId: Long, version: Long): Boolean = {
       // open connection
     }

     def process(record: String) = {
       // write string to connection
     }

     def close(errorOrNull: Throwable): Unit = {
       // close the connection
     }
   })

linesDataset<String>,Kafka 的值作为行。

how do I pass these to a method which will process them.

根据您具体想要做什么,您当然可以使用 foreach 运算符或使用任何其他可用于批处理数据集的运算符或函数。

您可以使用 withColumn(...)selectmap 运算符。

换句话说,将 Spark Structured Streaming 视为具有流数据集的 Spark SQL。