结构化流 - 消费每条消息
Structured Streaming - Consume each message
什么是 "recommended" 处理每条消息的方法,因为它来自结构化流管道(我在 spark 2.1.1 上,源是 Kafka 0.10.2.1)?
到目前为止,我正在查看 dataframe.mapPartitions
(因为我需要连接到 HBase,其客户端连接 类 不可序列化,因此 mapPartitions
)。
想法?
您应该可以使用 foreach
输出接收器:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks and https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
即使客户端不可序列化,您也不必在 ForeachWriter
构造函数中打开它。直接留下None/null,在open
方法中初始化,在序列化后调用,但每个任务只调用一次
在某种伪代码中:
class HBaseForeachWriter extends ForeachWriter[MyType] {
var client: Option[HBaseClient] = None
def open(partitionId: Long, version: Long): Boolean = {
client = Some(... open a client ...)
}
def process(record: MyType) = {
client match {
case None => throw Exception("shouldn't happen")
case Some(cl) => {
... use cl to write record ...
}
}
}
def close(errorOrNull: Throwable): Unit = {
client.foreach(cl => cl.close())
}
}
什么是 "recommended" 处理每条消息的方法,因为它来自结构化流管道(我在 spark 2.1.1 上,源是 Kafka 0.10.2.1)?
到目前为止,我正在查看 dataframe.mapPartitions
(因为我需要连接到 HBase,其客户端连接 类 不可序列化,因此 mapPartitions
)。
想法?
您应该可以使用 foreach
输出接收器:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks and https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
即使客户端不可序列化,您也不必在 ForeachWriter
构造函数中打开它。直接留下None/null,在open
方法中初始化,在序列化后调用,但每个任务只调用一次
在某种伪代码中:
class HBaseForeachWriter extends ForeachWriter[MyType] {
var client: Option[HBaseClient] = None
def open(partitionId: Long, version: Long): Boolean = {
client = Some(... open a client ...)
}
def process(record: MyType) = {
client match {
case None => throw Exception("shouldn't happen")
case Some(cl) => {
... use cl to write record ...
}
}
}
def close(errorOrNull: Throwable): Unit = {
client.foreach(cl => cl.close())
}
}