我如何设计一个流,它是另一个流的每最新 5 个数据的平均值?
How can I design a Flow which is a average value of every latest 5 data of another Flow?
有一个Flow,每100ms发出一次数据,我希望对Flow的每最新5条数据取一个平均值,将平均值转换为Double值,转入另一个Flow。
如何设计流程?
代码A
fun soundDbFlow(period: Long = 100) = flow {
while (true) {
var data = getAmplitude()
emit(data)
delay(period)
}
}
.get_Average_Per5_LatestData {...} //How can I do? or is there other way?
.map { soundDb(it) }
private fun getAmplitude(): Int {
var result = 0
mRecorder?.let {
result = it.maxAmplitude
}
return result
}
private fun soundDb(input:Int, referenceAmp: Double = 1.0): Double {
return 20 * Math.log10(input / referenceAmp)
}
新增内容:
致 plplmax:谢谢!
我假设代码 B 将发出 1,2,3,4,5,6,7,8,9,10.....
你保证代码 C 会先计算 (1+2+3+4+5)/5
,然后再计算 (6+7+8+9+10)/5
,......?这是我的期望。
我担心代码 C 可能首先计算 (1+2+3+4+5)/5
,然后计算 (2+3+4+5+6)/5
,...
代码B
suspend fun soundDbFlow(period: Long) = flow {
while (true) {
val data = getAmplitude()
emit(data)
delay(period)
}
}
代码C
private fun reduceFlow(period: Long = 100) = flow {
while (true) {
val result = soundDbFlow(period)
.take(5)
.map { soundDb((it / 5.0).roundToInt()) }
.reduce { accumulator, value -> accumulator + value }
emit(result)
}
}
这是你想要的吗?
var result = 0
fun soundDbFlow(period: Long) = flow {
while (true) {
delay(period)
val data = getAmplitude()
emit(data)
}
}
fun reduceFlow(period: Long = 100) = flow {
while (true) {
val sum = soundDbFlow(period)
.take(5)
.reduce { accumulator, value -> accumulator + value }
val average = (sum / 5.0).roundToInt()
emit(soundDb(average))
}
}
fun getAmplitude(): Int {
return ++result
}
fun soundDb(input: Int, referenceAmp: Double = 1.0): Double {
return 20 * log10(input / referenceAmp)
}
fun main(): Unit = runBlocking {
reduceFlow().collect { println(it) }
}
你可以这样写一个分块运算符:
/**
* Returns a Flow that emits sequential [size]d chunks of data from the source flow,
* after transforming them with [transform].
*
* The list passed to [transform] is transient and must not be cached.
*/
fun <T, R> Flow<T>.chunked(size: Int, transform: suspend (List<T>)-> R): Flow<R> = flow {
val cache = ArrayList<T>(size)
collect {
cache.add(it)
if (cache.size == size) {
emit(transform(cache))
cache.clear()
}
}
}
然后像这样使用它:
suspend fun soundDbFlow(period: Long) = flow {
while (true) {
val data = getAmplitude()
emit(data)
delay(period)
}
}
.chunked(5) { (it.sum() / 5.0).roundToInt() }
.map { soundDb(it) }
有一个Flow,每100ms发出一次数据,我希望对Flow的每最新5条数据取一个平均值,将平均值转换为Double值,转入另一个Flow。
如何设计流程?
代码A
fun soundDbFlow(period: Long = 100) = flow {
while (true) {
var data = getAmplitude()
emit(data)
delay(period)
}
}
.get_Average_Per5_LatestData {...} //How can I do? or is there other way?
.map { soundDb(it) }
private fun getAmplitude(): Int {
var result = 0
mRecorder?.let {
result = it.maxAmplitude
}
return result
}
private fun soundDb(input:Int, referenceAmp: Double = 1.0): Double {
return 20 * Math.log10(input / referenceAmp)
}
新增内容:
致 plplmax:谢谢!
我假设代码 B 将发出 1,2,3,4,5,6,7,8,9,10.....
你保证代码 C 会先计算 (1+2+3+4+5)/5
,然后再计算 (6+7+8+9+10)/5
,......?这是我的期望。
我担心代码 C 可能首先计算 (1+2+3+4+5)/5
,然后计算 (2+3+4+5+6)/5
,...
代码B
suspend fun soundDbFlow(period: Long) = flow {
while (true) {
val data = getAmplitude()
emit(data)
delay(period)
}
}
代码C
private fun reduceFlow(period: Long = 100) = flow {
while (true) {
val result = soundDbFlow(period)
.take(5)
.map { soundDb((it / 5.0).roundToInt()) }
.reduce { accumulator, value -> accumulator + value }
emit(result)
}
}
这是你想要的吗?
var result = 0
fun soundDbFlow(period: Long) = flow {
while (true) {
delay(period)
val data = getAmplitude()
emit(data)
}
}
fun reduceFlow(period: Long = 100) = flow {
while (true) {
val sum = soundDbFlow(period)
.take(5)
.reduce { accumulator, value -> accumulator + value }
val average = (sum / 5.0).roundToInt()
emit(soundDb(average))
}
}
fun getAmplitude(): Int {
return ++result
}
fun soundDb(input: Int, referenceAmp: Double = 1.0): Double {
return 20 * log10(input / referenceAmp)
}
fun main(): Unit = runBlocking {
reduceFlow().collect { println(it) }
}
你可以这样写一个分块运算符:
/**
* Returns a Flow that emits sequential [size]d chunks of data from the source flow,
* after transforming them with [transform].
*
* The list passed to [transform] is transient and must not be cached.
*/
fun <T, R> Flow<T>.chunked(size: Int, transform: suspend (List<T>)-> R): Flow<R> = flow {
val cache = ArrayList<T>(size)
collect {
cache.add(it)
if (cache.size == size) {
emit(transform(cache))
cache.clear()
}
}
}
然后像这样使用它:
suspend fun soundDbFlow(period: Long) = flow {
while (true) {
val data = getAmplitude()
emit(data)
delay(period)
}
}
.chunked(5) { (it.sum() / 5.0).roundToInt() }
.map { soundDb(it) }