如何在转换不是 1:1 而是 1:many 时创建 Spark 数据集
How to create a Spark DataSet when the transformation is not 1:1, but 1:many
我正在编写一个结构化的流式 Spark 应用程序,我正在从 Kafka 队列读取数据并处理收到的消息。我想要的最终结果是 DataSet[MyMessage]
(其中 MyMessage
是自定义对象),我想将其排队到另一个 Kafka 主题。问题是,来自消费者 Kafka 队列的每个输入消息都可以产生多个 MyMessage
个对象,因此转换不是 1:1、1:Many.
所以我在做
val messagesDataSet: DataSet[List[MyMessage]] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "topic1")
.option("failOnDataLoss", false)
.option("startingOffsets", "offset1")
.load()
.select($"value")
.mapPartitions{r => createMessages(r)}
val createMessages(row: Iterator[Row]): List[MyMessage] = {
// ...
}
很明显,messagesDataSet
是一个DataSet[List[MyMessage]]
。有没有办法让我只得到一个 DataSet[MyMessage]
?
或者有没有办法获取一个 DataSet[List[MyMessage]]
然后将每个 MyMessage
对象写入另一个 Kafka 主题? (毕竟这是我的最终目标)
您可以使用 mapPartitions 创建多个值(因此它的工作方式类似于 flatMap),但您必须 return 迭代器:
def createMessages(row: Iterator[Row]): Iterator[MyMessage] = {
row.map(/*...*/) //you need too return iterator here
}
尝试
messagesDataSet.flatMap(identity)
我正在编写一个结构化的流式 Spark 应用程序,我正在从 Kafka 队列读取数据并处理收到的消息。我想要的最终结果是 DataSet[MyMessage]
(其中 MyMessage
是自定义对象),我想将其排队到另一个 Kafka 主题。问题是,来自消费者 Kafka 队列的每个输入消息都可以产生多个 MyMessage
个对象,因此转换不是 1:1、1:Many.
所以我在做
val messagesDataSet: DataSet[List[MyMessage]] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "topic1")
.option("failOnDataLoss", false)
.option("startingOffsets", "offset1")
.load()
.select($"value")
.mapPartitions{r => createMessages(r)}
val createMessages(row: Iterator[Row]): List[MyMessage] = {
// ...
}
很明显,messagesDataSet
是一个DataSet[List[MyMessage]]
。有没有办法让我只得到一个 DataSet[MyMessage]
?
或者有没有办法获取一个 DataSet[List[MyMessage]]
然后将每个 MyMessage
对象写入另一个 Kafka 主题? (毕竟这是我的最终目标)
您可以使用 mapPartitions 创建多个值(因此它的工作方式类似于 flatMap),但您必须 return 迭代器:
def createMessages(row: Iterator[Row]): Iterator[MyMessage] = {
row.map(/*...*/) //you need too return iterator here
}
尝试
messagesDataSet.flatMap(identity)