apache flink:如何解释 DataStream.print 输出?
apache flink: how to interpret DataStream.print output?
我是Flink新手,想了解怎么用才最有效
我正在尝试使用 Window API,从 CSV 文件中读取。读取的行被转换成一个案例class,因此
case class IncomingDataUnit (
sensorUUID: String, radiationLevel: Int,photoSensor: Float,
humidity: Float,timeStamp: Long, ambientTemperature: Float)
extends Serializable {
}
而且,这就是我阅读行的方式:
env.readTextFile(inputPath).map(datum => {
val fields = datum.split(",")
IncomingDataUnit(
fields(0), // sensorUUID
fields(1).toInt, // radiationLevel
fields(2).toFloat, // photoSensor
fields(3).toFloat, // humidity
fields(4).toLong, // timeStamp
fields(5).toFloat // ambientTemperature
)
})
稍后,使用简单的 window,我尝试打印最大 环境温度,因此:
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val readings =
readIncomingReadings(env,"./sampleIOTTiny.csv")
.map(e => (e.sensorUUID,e.ambientTemperature))
.timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
.trigger(CountTrigger.of(5))
.evictor(CountEvictor.of(4))
.max(1)
readings.print
输出包含这些(来自一堆 DEBUG 日志语句):
1> (probe-987f2cb6,29.43)
1> (probe-987f2cb6,29.43)
3> (probe-dccefede,30.02)
3> (probe-42a9ddca,22.07)
2> (probe-df2d4cad,22.87)
2> (probe-20c609fb,27.62)
4> (probe-dccefede,30.02)
我想了解的是如何解释这一点? repeated 1>s 代表什么?
同样让我感到困惑的是 probe-987f2cb6 与我的数据集中的 ambient temperature 29.43 不对应。它对应于不同的值(准确地说是 14.72)。
仅供参考,这里是数据集:
probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
probe-dccefede,199,749.25,78.6057,1448028160,27.46
probe-f29f9662,199,821.81,81.7831,1448028160,22.35
probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98
probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02
probe-4d78b545,204,778.42,78.412,1448028160,25.92
probe-400c5cdf,204,711.65,73.585,1448028160,22.18
probe-df2d4cad,199,820.8,72.936,1448028161,16.18
probe-f4ef109e,199,785.68,77.5647,1448028161,16.36
probe-3fac3350,200,720.12,78.2073,1448028161,19.19
probe-42a9ddca,193,819.12,74.3712,1448028161,22.07
probe-252a5bbd,197,710.32,80.6072,1448028161,14.64
probe-987f2cb6,200,750.4,76.0533,1448028161,14.72
probe-24444323,197,816.06,84.0816,1448028161,4.405
probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43
probe-20c609fb,204,804.37,84.5243,1448028161,22.87
probe-c027fdc9,195,858.61,81.7682,1448028161,24.47
probe-2c6cd3de,198,826.96,85.26,1448028162,18.99
probe-960906ca,197,797.63,77.4359,1448028162,27.62
我可能遗漏了很多东西。请戳我。
忘了说:我用的是Flink 0.10.0.
“>X”表示打印结果元组的并行任务的任务 ID。我只是想知道为什么输出显示值 1 到 4——因为您使用的是 非并行 window(数据流未通过 .keyBy()
分区) 并且我希望印刷品是链式的并且是非平行的。但也许不是,4 个并行打印任务是 运行。
关于您的结果:如果 window 触发,则对所有元组和 head 元组的 window 值计算字段 1 的最大值用于字段 0。如果要 return 包含最大值的完整元组,可以使用 maxBy()
而不是 max()
。
我是Flink新手,想了解怎么用才最有效
我正在尝试使用 Window API,从 CSV 文件中读取。读取的行被转换成一个案例class,因此
case class IncomingDataUnit (
sensorUUID: String, radiationLevel: Int,photoSensor: Float,
humidity: Float,timeStamp: Long, ambientTemperature: Float)
extends Serializable {
}
而且,这就是我阅读行的方式:
env.readTextFile(inputPath).map(datum => {
val fields = datum.split(",")
IncomingDataUnit(
fields(0), // sensorUUID
fields(1).toInt, // radiationLevel
fields(2).toFloat, // photoSensor
fields(3).toFloat, // humidity
fields(4).toLong, // timeStamp
fields(5).toFloat // ambientTemperature
)
})
稍后,使用简单的 window,我尝试打印最大 环境温度,因此:
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val readings =
readIncomingReadings(env,"./sampleIOTTiny.csv")
.map(e => (e.sensorUUID,e.ambientTemperature))
.timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
.trigger(CountTrigger.of(5))
.evictor(CountEvictor.of(4))
.max(1)
readings.print
输出包含这些(来自一堆 DEBUG 日志语句):
1> (probe-987f2cb6,29.43)
1> (probe-987f2cb6,29.43)
3> (probe-dccefede,30.02)
3> (probe-42a9ddca,22.07)
2> (probe-df2d4cad,22.87)
2> (probe-20c609fb,27.62)
4> (probe-dccefede,30.02)
我想了解的是如何解释这一点? repeated 1>s 代表什么?
同样让我感到困惑的是 probe-987f2cb6 与我的数据集中的 ambient temperature 29.43 不对应。它对应于不同的值(准确地说是 14.72)。
仅供参考,这里是数据集:
probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
probe-dccefede,199,749.25,78.6057,1448028160,27.46
probe-f29f9662,199,821.81,81.7831,1448028160,22.35
probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98
probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02
probe-4d78b545,204,778.42,78.412,1448028160,25.92
probe-400c5cdf,204,711.65,73.585,1448028160,22.18
probe-df2d4cad,199,820.8,72.936,1448028161,16.18
probe-f4ef109e,199,785.68,77.5647,1448028161,16.36
probe-3fac3350,200,720.12,78.2073,1448028161,19.19
probe-42a9ddca,193,819.12,74.3712,1448028161,22.07
probe-252a5bbd,197,710.32,80.6072,1448028161,14.64
probe-987f2cb6,200,750.4,76.0533,1448028161,14.72
probe-24444323,197,816.06,84.0816,1448028161,4.405
probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43
probe-20c609fb,204,804.37,84.5243,1448028161,22.87
probe-c027fdc9,195,858.61,81.7682,1448028161,24.47
probe-2c6cd3de,198,826.96,85.26,1448028162,18.99
probe-960906ca,197,797.63,77.4359,1448028162,27.62
我可能遗漏了很多东西。请戳我。
忘了说:我用的是Flink 0.10.0.
“>X”表示打印结果元组的并行任务的任务 ID。我只是想知道为什么输出显示值 1 到 4——因为您使用的是 非并行 window(数据流未通过 .keyBy()
分区) 并且我希望印刷品是链式的并且是非平行的。但也许不是,4 个并行打印任务是 运行。
关于您的结果:如果 window 触发,则对所有元组和 head 元组的 window 值计算字段 1 的最大值用于字段 0。如果要 return 包含最大值的完整元组,可以使用 maxBy()
而不是 max()
。