如何将记录从 Kafka 传递给方法?
How to pass records from Kafka to method?
我有一个 Kafka 队列,我从那里读取数据如下:
private static void startKafkaConsumerStream() {
try {
System.out.println("Print method: startKafkaConsumerStream");
Dataset<String> lines = (Dataset<String>) _spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
.option("subscribe", HTTP_FED_VO_TOPIC)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
StreamingQuery query = lines.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
要求:使用上面的代码,我可以将记录打印到控制台但是,我很危险,因为我如何将这些传递给将处理它们的方法.
为此,我尝试查看文档但找不到任何相关内容。由于我是这方面的新手,这听起来可能有点傻。但是我被卡住了,非常感谢任何提示。
应用程序的目标应用程序的目标是接受请求并将其发送到 Kafka,然后在一个单独的线程中实现 Kafka reader 这是负责读取和处理请求并将输出生成到另一个 Kafka 队列。我只是在实现这个,架构不是我的想法。
您可以在 kafka 流应用程序的接收器部分使用 ForeachWriter[T]
来处理查询的每一行,如下所示:
datasetOfString.write.foreach(new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// open connection
}
def process(record: String) = {
// write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// close the connection
}
})
lines
是 Dataset<String>
,Kafka 的值作为行。
how do I pass these to a method which will process them.
根据您具体想要做什么,您当然可以使用 foreach
运算符或使用任何其他可用于批处理数据集的运算符或函数。
您可以使用 withColumn(...)
或 select
或 map
运算符。
换句话说,将 Spark Structured Streaming 视为具有流数据集的 Spark SQL。
我有一个 Kafka 队列,我从那里读取数据如下:
private static void startKafkaConsumerStream() {
try {
System.out.println("Print method: startKafkaConsumerStream");
Dataset<String> lines = (Dataset<String>) _spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
.option("subscribe", HTTP_FED_VO_TOPIC)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
StreamingQuery query = lines.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
要求:使用上面的代码,我可以将记录打印到控制台但是,我很危险,因为我如何将这些传递给将处理它们的方法.
为此,我尝试查看文档但找不到任何相关内容。由于我是这方面的新手,这听起来可能有点傻。但是我被卡住了,非常感谢任何提示。
应用程序的目标应用程序的目标是接受请求并将其发送到 Kafka,然后在一个单独的线程中实现 Kafka reader 这是负责读取和处理请求并将输出生成到另一个 Kafka 队列。我只是在实现这个,架构不是我的想法。
您可以在 kafka 流应用程序的接收器部分使用 ForeachWriter[T]
来处理查询的每一行,如下所示:
datasetOfString.write.foreach(new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// open connection
}
def process(record: String) = {
// write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// close the connection
}
})
lines
是 Dataset<String>
,Kafka 的值作为行。
how do I pass these to a method which will process them.
根据您具体想要做什么,您当然可以使用 foreach
运算符或使用任何其他可用于批处理数据集的运算符或函数。
您可以使用 withColumn(...)
或 select
或 map
运算符。
换句话说,将 Spark Structured Streaming 视为具有流数据集的 Spark SQL。