高地在被完全消耗之前添加到流中?
Highland add to stream before being completely consumed?
有没有一种方法可以将完整的需求描述添加到高地流中?
假设我们有以下情况:
const stream = high([1,4,6,7])
然后对于这个流,我想计算每个正在处理的值并说
sink.drain(stream.pipe(4))
数组的元素个数为4。考虑到流中可能有数以千计的对象,我需要从流中消费才能计数。
我不能说 array.length 因为它是一个可以带有任何信息的来源,并且该信息正在与流一起处理...我如何向流中添加带有描述的消息结束消耗了什么?
听起来你想围绕你的价值观建立一些状态,但又不妨碍你价值观的消耗。我建议查看一些涉及 h.through
.
的解决方案
您可以使用 fork
或 observe
拆分流并从值中减少一些状态:
h([1, 2, 3, 4])
.through(stream => {
const length = stream.observe()
.reduce(0, m => m + 1)
.map(length => ({ length }))
return h([stream, length])
.sequence()
})
.errors(err => console.error(err))
.each(x => console.log(x))
您可以根据源流的事件创建一些状态和return一个新流:
const h = require('highland')
h([1, 2, 3, 4])
.through(stream => {
let length = 0
return h(push => stream
.tap(() => ++length)
.errors(err => push(err))
.each(x => push(null, x))
.done(() => {
push(null, `length: ${length}`)
push(null, h.nil)
}))
})
.errors(err => console.error(err))
.each(x => console.log(x))
有没有一种方法可以将完整的需求描述添加到高地流中?
假设我们有以下情况:
const stream = high([1,4,6,7])
然后对于这个流,我想计算每个正在处理的值并说
sink.drain(stream.pipe(4))
数组的元素个数为4。考虑到流中可能有数以千计的对象,我需要从流中消费才能计数。
我不能说 array.length 因为它是一个可以带有任何信息的来源,并且该信息正在与流一起处理...我如何向流中添加带有描述的消息结束消耗了什么?
听起来你想围绕你的价值观建立一些状态,但又不妨碍你价值观的消耗。我建议查看一些涉及 h.through
.
您可以使用 fork
或 observe
拆分流并从值中减少一些状态:
h([1, 2, 3, 4])
.through(stream => {
const length = stream.observe()
.reduce(0, m => m + 1)
.map(length => ({ length }))
return h([stream, length])
.sequence()
})
.errors(err => console.error(err))
.each(x => console.log(x))
您可以根据源流的事件创建一些状态和return一个新流:
const h = require('highland')
h([1, 2, 3, 4])
.through(stream => {
let length = 0
return h(push => stream
.tap(() => ++length)
.errors(err => push(err))
.each(x => push(null, x))
.done(() => {
push(null, `length: ${length}`)
push(null, h.nil)
}))
})
.errors(err => console.error(err))
.each(x => console.log(x))