Apache Flink DataStream API 没有 mapPartition 转换
Apache Flink DataStream API doesn't have a mapPartition transformation
Spark DStream 有 mapPartition
API,而 Flink DataStream
API 没有。有没有大神帮忙解释下原因。我想做的是在Flink上实现一个类似于SparkreduceByKey
的API
Flink 的流处理模型与以 mini batch 为中心的 Spark Streaming 有很大的不同。在 Spark Streaming 中,每个 mini batch 都像常规批处理程序一样在有限的数据集上执行,而 Flink DataStream 程序连续处理记录。
在Flink的DataSetAPI中,一个MapPartitionFunction
有两个参数。输入的迭代器和函数结果的收集器。 Flink DataStream 程序中的 MapPartitionFunction
永远不会从第一个函数调用开始 return ,因为迭代器会迭代无穷无尽的记录流。但是,Flink 的内部流处理模型需要用户函数 return 才能检查点函数状态。因此,DataStream API 不提供 mapPartition
转换。
为了实现类似于 Spark Streaming 的 reduceByKey
的功能,您需要在流上定义键控 window。 Windows 离散化流,这有点类似于小批量,但 windows 提供了更多的灵活性。由于 window 的大小有限,您可以将 reduce
称为 window。
这可能看起来像:
yourStream.keyBy("myKey") // organize stream by key "myKey"
.timeWindow(Time.seconds(5)) // build 5 sec tumbling windows
.reduce(new YourReduceFunction); // apply a reduce function on each window
DataStream documentation 展示了如何定义各种 window 类型并解释了所有可用函数。
注意: DataStream API 最近已重新设计。该示例假定最新版本 (0.10-SNAPSHOT) 将在未来几天发布为 0.10.0。
假设您的输入流是单分区数据(比如字符串)
val new_number_of_partitions = 4
//below line partitions your data, you can broadcast data to all partitions
val step1stream = yourStream.rescale.setParallelism(new_number_of_partitions)
//flexibility for mapping
val step2stream = step1stream.map(new RichMapFunction[String, (String, Int)]{
// var local_val_to_different_part : Type = null
var myTaskId : Int = null
//below function is executed once for each mapper function (one mapper per partition)
override def open(config: Configuration): Unit = {
myTaskId = getRuntimeContext.getIndexOfThisSubtask
//do whatever initialization you want to do. read from data sources..
}
def map(value: String): (String, Int) = {
(value, myTasKId)
}
})
val step3stream = step2stream.keyBy(0).countWindow(new_number_of_partitions).sum(1).print
//Instead of sum(1), you can use .reduce((x,y)=>(x._1,x._2+y._2))
//.countWindow will first wait for a certain number of records for perticular key
// and then apply the function
Flink streaming 是纯流式处理(不是批处理的)。看看 Iterate API.
Spark DStream 有 mapPartition
API,而 Flink DataStream
API 没有。有没有大神帮忙解释下原因。我想做的是在Flink上实现一个类似于SparkreduceByKey
的API
Flink 的流处理模型与以 mini batch 为中心的 Spark Streaming 有很大的不同。在 Spark Streaming 中,每个 mini batch 都像常规批处理程序一样在有限的数据集上执行,而 Flink DataStream 程序连续处理记录。
在Flink的DataSetAPI中,一个MapPartitionFunction
有两个参数。输入的迭代器和函数结果的收集器。 Flink DataStream 程序中的 MapPartitionFunction
永远不会从第一个函数调用开始 return ,因为迭代器会迭代无穷无尽的记录流。但是,Flink 的内部流处理模型需要用户函数 return 才能检查点函数状态。因此,DataStream API 不提供 mapPartition
转换。
为了实现类似于 Spark Streaming 的 reduceByKey
的功能,您需要在流上定义键控 window。 Windows 离散化流,这有点类似于小批量,但 windows 提供了更多的灵活性。由于 window 的大小有限,您可以将 reduce
称为 window。
这可能看起来像:
yourStream.keyBy("myKey") // organize stream by key "myKey"
.timeWindow(Time.seconds(5)) // build 5 sec tumbling windows
.reduce(new YourReduceFunction); // apply a reduce function on each window
DataStream documentation 展示了如何定义各种 window 类型并解释了所有可用函数。
注意: DataStream API 最近已重新设计。该示例假定最新版本 (0.10-SNAPSHOT) 将在未来几天发布为 0.10.0。
假设您的输入流是单分区数据(比如字符串)
val new_number_of_partitions = 4
//below line partitions your data, you can broadcast data to all partitions
val step1stream = yourStream.rescale.setParallelism(new_number_of_partitions)
//flexibility for mapping
val step2stream = step1stream.map(new RichMapFunction[String, (String, Int)]{
// var local_val_to_different_part : Type = null
var myTaskId : Int = null
//below function is executed once for each mapper function (one mapper per partition)
override def open(config: Configuration): Unit = {
myTaskId = getRuntimeContext.getIndexOfThisSubtask
//do whatever initialization you want to do. read from data sources..
}
def map(value: String): (String, Int) = {
(value, myTasKId)
}
})
val step3stream = step2stream.keyBy(0).countWindow(new_number_of_partitions).sum(1).print
//Instead of sum(1), you can use .reduce((x,y)=>(x._1,x._2+y._2))
//.countWindow will first wait for a certain number of records for perticular key
// and then apply the function
Flink streaming 是纯流式处理(不是批处理的)。看看 Iterate API.