如何在 spark 执行器中收集所有记录并将其作为批处理
How to collect all records at spark executor and process it as batch
在我的 spark kinesis 流应用程序中,我使用 foreachBatch 获取流数据并需要将其发送到 drools 规则引擎进行进一步处理。
我的要求是,我需要将所有json数据累积在一个list/ruleSession中,然后将其发送给规则引擎,以便在执行程序端作为批处理进行处理。
//Scala Code Example:
val dataFrame = sparkSession.readStream
.format("kinesis")
.option("streamName", streamName)
.option("region", region)
.option("endpointUrl",endpointUrl)
.option("initialPosition", "TRIM_HORIZON")
.load()
val query = dataFrame
.selectExpr("CAST(data as STRING) as krecord")
.writeStream
.foreachBatch(function)
.start()
query.awaitTermination()
val function = (batchDF: DataFrame, batchId: Long) => {
val ruleSession = kBase.newKieSession() //Drools Rule Session, this is getting created at driver side
batchDF.foreach(row => { // This piece of code is being run in executor.
val jsonData: JSONData = jsonHandler.convertStringToJSONType(row.mkString)
ruleSession.insert(jsonData) // Getting a null pointer exception here as the ruleSession is not available in executor.
}
)
ruleHandler.processRule(ruleSession) // Again this is in the driver scope.
}
在上面的代码中,我面临的问题是:foreachBatch 中使用的函数在驱动程序端执行,而 batchDF.foreach 中的代码在 worker/executor 端执行,因此未能获得 ruleSession。
有什么方法可以 运行 每个执行者端的整个函数吗?
或
有没有更好的方法将转换后的批处理DataFrame中的所有数据累积起来,然后从executor/worker中发送到下一个进程?
我认为这可能有效...您可以使用 foreachBatch 或 foreachPartition(如果需要 return 信息,则可以使用 mapPartition 之类的地图版本)而不是 运行 foreach。在此部分中,打开与 drools 系统的连接。从那时起,迭代每个分区(或批次)中的数据集,将每个分区发送到 drools 系统(或者您可以将整个块发送到 drools)。在 foreachPartition / foreachBatch 部分的最后,关闭连接(如果适用)。
@codeaperature,这就是我实现批处理的方式,受到您的回答的启发,将其作为答案发布,因为这超出了评论中的字数限制。
- 在数据帧上使用 foreach 并传入 ForeachWriter。
- 正在ForeachWriter的open方法中初始化规则会话。
- 将每个输入 JSON 添加到处理方法中的规则会话。
- 使用加载了批数据的规则会话在 close 方法中执行规则。
//Scala代码:
val dataFrame = sparkSession.readStream
.format("kinesis")
.option("streamName", streamName)
.option("region", region)
.option("endpointUrl",endpointUrl)
.option("initialPosition", "TRIM_HORIZON")
.load()
val query = dataFrame
.selectExpr("CAST(data as STRING) as krecord")
.writeStream
.foreach(dataConsumer)
.start()
val dataConsumer = new ForeachWriter[Row] {
var ruleSession: KieSession = null;
def open(partitionId: Long, version: Long): Boolean = { // first open is called once for every batch
ruleSession = kBase.newKieSession()
true
}
def process(row: Row) = { // the process method will be called for a batch of records
val jsonData: JSONData = jsonHandler.convertStringToJSONType(row.mkString)
ruleSession.insert(jsonData) // Add all input json to rule session.
}
def close(errorOrNull: Throwable): Unit = { // after calling process for all records in bathc close is called
val factCount = ruleSession.getFactCount
if (factCount > 0) {
ruleHandler.processRule(ruleSession) //batch processing of rule
}
}
}
在我的 spark kinesis 流应用程序中,我使用 foreachBatch 获取流数据并需要将其发送到 drools 规则引擎进行进一步处理。
我的要求是,我需要将所有json数据累积在一个list/ruleSession中,然后将其发送给规则引擎,以便在执行程序端作为批处理进行处理。
//Scala Code Example:
val dataFrame = sparkSession.readStream
.format("kinesis")
.option("streamName", streamName)
.option("region", region)
.option("endpointUrl",endpointUrl)
.option("initialPosition", "TRIM_HORIZON")
.load()
val query = dataFrame
.selectExpr("CAST(data as STRING) as krecord")
.writeStream
.foreachBatch(function)
.start()
query.awaitTermination()
val function = (batchDF: DataFrame, batchId: Long) => {
val ruleSession = kBase.newKieSession() //Drools Rule Session, this is getting created at driver side
batchDF.foreach(row => { // This piece of code is being run in executor.
val jsonData: JSONData = jsonHandler.convertStringToJSONType(row.mkString)
ruleSession.insert(jsonData) // Getting a null pointer exception here as the ruleSession is not available in executor.
}
)
ruleHandler.processRule(ruleSession) // Again this is in the driver scope.
}
在上面的代码中,我面临的问题是:foreachBatch 中使用的函数在驱动程序端执行,而 batchDF.foreach 中的代码在 worker/executor 端执行,因此未能获得 ruleSession。
有什么方法可以 运行 每个执行者端的整个函数吗?
或
有没有更好的方法将转换后的批处理DataFrame中的所有数据累积起来,然后从executor/worker中发送到下一个进程?
我认为这可能有效...您可以使用 foreachBatch 或 foreachPartition(如果需要 return 信息,则可以使用 mapPartition 之类的地图版本)而不是 运行 foreach。在此部分中,打开与 drools 系统的连接。从那时起,迭代每个分区(或批次)中的数据集,将每个分区发送到 drools 系统(或者您可以将整个块发送到 drools)。在 foreachPartition / foreachBatch 部分的最后,关闭连接(如果适用)。
@codeaperature,这就是我实现批处理的方式,受到您的回答的启发,将其作为答案发布,因为这超出了评论中的字数限制。
- 在数据帧上使用 foreach 并传入 ForeachWriter。
- 正在ForeachWriter的open方法中初始化规则会话。
- 将每个输入 JSON 添加到处理方法中的规则会话。
- 使用加载了批数据的规则会话在 close 方法中执行规则。
//Scala代码:
val dataFrame = sparkSession.readStream
.format("kinesis")
.option("streamName", streamName)
.option("region", region)
.option("endpointUrl",endpointUrl)
.option("initialPosition", "TRIM_HORIZON")
.load()
val query = dataFrame
.selectExpr("CAST(data as STRING) as krecord")
.writeStream
.foreach(dataConsumer)
.start()
val dataConsumer = new ForeachWriter[Row] {
var ruleSession: KieSession = null;
def open(partitionId: Long, version: Long): Boolean = { // first open is called once for every batch
ruleSession = kBase.newKieSession()
true
}
def process(row: Row) = { // the process method will be called for a batch of records
val jsonData: JSONData = jsonHandler.convertStringToJSONType(row.mkString)
ruleSession.insert(jsonData) // Add all input json to rule session.
}
def close(errorOrNull: Throwable): Unit = { // after calling process for all records in bathc close is called
val factCount = ruleSession.getFactCount
if (factCount > 0) {
ruleHandler.processRule(ruleSession) //batch processing of rule
}
}
}