如何在 scalaz-stream 中实现 receiveAvailable 转换器
How to implement receiveAvailable transducer in scalaz-stream
简短版本:
我想实现一个函数,该函数 returns 一个等待值块成为 "emitted" 的传感器。
我想到的函数具有以下签名:
/**
* The `Process1` which awaits the next "effect" to occur and passes all values emitted by
* this effect to `rcv` to determine the next state.
*/
def receiveBlock[I, O](rcv: Vector[I] => Process1[I,O]): Process1[I,O] = ???
详情:
我的理解是,我可以使用这个函数来实现以下我认为非常有用的功能:
/**
* Groups inputs into chunks of dynamic size based on the various effects
* that back emitted values.
*
* @example {{{
* val numberTask = Task.delay(1)
* val listOfNumbersTask = Task.delay(List(5,6,7))
* val sample = Process.eval(numberTask) ++ Process(2,3,4) ++ Process.await(listOfNumbersTask)(xs => Process.emitAll(xs))
* sample.chunkByEffect.runLog.run should be List(Vector(1), Vector(2,3,4), Vector(5,6,7))
* }}}
*/
def chunkByEffect[I]: Process1[I, Vector[I]] = {
receiveBlock(vec => emit(vec) ++ chunkByEffect)
}
[更新]更多详情
我的最终objective(稍微简化)是实现如下功能:
/**
* Transforms a stream of audio into a stream of text.
*/
voiceRecognition(audio: Process[Task, Byte]): Process[Task, String]
函数对外调用语音识别服务。因此,为流中的每个 Byte
进行网络调用是不合理的。在进行网络调用之前,我需要将字节分块在一起。我可以将 audio
设为 Process[Task, ByteVector]
,但这需要测试代码才能知道函数支持的最大块大小,我宁愿由函数本身管理。此外,当在服务内部使用此服务时,该服务本身将接收具有给定音频大小的网络调用,我希望 chunkXXX
函数能够智能地分块,这样它就不会保留到已经可用的数据上。
基本上,来自网络的音频流将具有 Process[Task, ByteVector]
的形式,并会被 flatMap(Process.emitAll(_))
转换为 Process[Task, Byte]
。但是,测试代码将直接生成 Process[Task, Byte]
并将其输入 voiceRecognition
。从理论上讲,我相信如果有适当的组合器,应该有可能提供 voiceRecognition
的实现,它对这两个流都做正确的事情,我认为上面描述的 chunkByEffect
函数是关键。我现在意识到,我需要 chunkByEffect 函数具有 min
和 max
参数来指定分块的最小和最大大小,而不管底层 Task
产生字节。
您需要以某种方式分隔字节。我建议在字节流上使用一些更高级别的抽象,即 ByteVector。
然后您可能需要手动执行 process1,它的实现方式与 process1.chunkBy
类似,只是它在 ByteVector 上运行。即
def chunkBy(separator:ByteVector): Process1[ByteVector, ByteVector] = {
def go(acc: ByteVector): Process1[ByteVector, ByteVector] =
receive1Or[ByteVector,ByteVector](emit(acc)) { i =>
// implement searching of separator in accumulated + new bytes
???
}
go(ByteVector.empty)
}
然后这将把所有东西连接在一起[=13=]
val speech: Process[Task,ByteVector] = ???
def chunkByWhatever: Process1[ByteVector,ByteVector] = ???
val recognizer: Channel[Task,ByteVector,String] = ???
//this shall do the trick
speech.pipe(chunkByWhatever).through(recognizer)
我想此时的答案是,这在 scalaz-stream
中确实很难或不可能完成。这个库的新版本称为 fs2
,它首先 class 支持 "chunking",这基本上就是我在这里寻找的。
简短版本:
我想实现一个函数,该函数 returns 一个等待值块成为 "emitted" 的传感器。
我想到的函数具有以下签名:
/**
* The `Process1` which awaits the next "effect" to occur and passes all values emitted by
* this effect to `rcv` to determine the next state.
*/
def receiveBlock[I, O](rcv: Vector[I] => Process1[I,O]): Process1[I,O] = ???
详情:
我的理解是,我可以使用这个函数来实现以下我认为非常有用的功能:
/**
* Groups inputs into chunks of dynamic size based on the various effects
* that back emitted values.
*
* @example {{{
* val numberTask = Task.delay(1)
* val listOfNumbersTask = Task.delay(List(5,6,7))
* val sample = Process.eval(numberTask) ++ Process(2,3,4) ++ Process.await(listOfNumbersTask)(xs => Process.emitAll(xs))
* sample.chunkByEffect.runLog.run should be List(Vector(1), Vector(2,3,4), Vector(5,6,7))
* }}}
*/
def chunkByEffect[I]: Process1[I, Vector[I]] = {
receiveBlock(vec => emit(vec) ++ chunkByEffect)
}
[更新]更多详情
我的最终objective(稍微简化)是实现如下功能:
/**
* Transforms a stream of audio into a stream of text.
*/
voiceRecognition(audio: Process[Task, Byte]): Process[Task, String]
函数对外调用语音识别服务。因此,为流中的每个 Byte
进行网络调用是不合理的。在进行网络调用之前,我需要将字节分块在一起。我可以将 audio
设为 Process[Task, ByteVector]
,但这需要测试代码才能知道函数支持的最大块大小,我宁愿由函数本身管理。此外,当在服务内部使用此服务时,该服务本身将接收具有给定音频大小的网络调用,我希望 chunkXXX
函数能够智能地分块,这样它就不会保留到已经可用的数据上。
基本上,来自网络的音频流将具有 Process[Task, ByteVector]
的形式,并会被 flatMap(Process.emitAll(_))
转换为 Process[Task, Byte]
。但是,测试代码将直接生成 Process[Task, Byte]
并将其输入 voiceRecognition
。从理论上讲,我相信如果有适当的组合器,应该有可能提供 voiceRecognition
的实现,它对这两个流都做正确的事情,我认为上面描述的 chunkByEffect
函数是关键。我现在意识到,我需要 chunkByEffect 函数具有 min
和 max
参数来指定分块的最小和最大大小,而不管底层 Task
产生字节。
您需要以某种方式分隔字节。我建议在字节流上使用一些更高级别的抽象,即 ByteVector。
然后您可能需要手动执行 process1,它的实现方式与 process1.chunkBy
类似,只是它在 ByteVector 上运行。即
def chunkBy(separator:ByteVector): Process1[ByteVector, ByteVector] = {
def go(acc: ByteVector): Process1[ByteVector, ByteVector] =
receive1Or[ByteVector,ByteVector](emit(acc)) { i =>
// implement searching of separator in accumulated + new bytes
???
}
go(ByteVector.empty)
}
然后这将把所有东西连接在一起[=13=]
val speech: Process[Task,ByteVector] = ???
def chunkByWhatever: Process1[ByteVector,ByteVector] = ???
val recognizer: Channel[Task,ByteVector,String] = ???
//this shall do the trick
speech.pipe(chunkByWhatever).through(recognizer)
我想此时的答案是,这在 scalaz-stream
中确实很难或不可能完成。这个库的新版本称为 fs2
,它首先 class 支持 "chunking",这基本上就是我在这里寻找的。