Enumerable/Stream 向前看
Enumerable/Stream with look ahead
我开始学习 Elixir,遇到了一个我无法轻松解决的挑战。
我正在尝试创建接受 Enumerable.t 和 returns 另一个包含下一个 n 项的 Enumerable.t 的函数。它与 Enum.chunk(e, n, 1, []) 的行为略有不同,因为数字迭代计数始终等于原始可枚举计数。我还需要支持 Streams
@spec lookahead(Enumerable.t, non_neg_integer) :: Enumerable.t
这最好用 doctest 语法来说明:
iex> lookahead(1..6, 1) |> Enum.to_list
[[1,2],[2,3],[3,4],[4,5],[5,6],[6]]
iex> lookahead(1..4, 2) |> Enum.to_list
[[1,2,3],[2,3,4],[3,4],[4]]
iex> Stream.cycle(1..4) |> lookahead(2) |> Enum.take(5)
[[1,2,3],[2,3,4],[3,4,1],[4,1,2],[1,2,3]]
iex> {:ok,io} = StringIO.open("abcd")
iex> IO.stream(io,1) |> lookahead(2) |> Enum.to_list
[["a","b","c"],["b","c","d"],["c","d"],["d"]]
我研究了 Enumerable.t 协议的实现,但还不太了解 Enumerable.reduce 接口。
有什么succinct/elegant方法可以做到这一点吗?
我的用例是二进制流上的一个小的固定 n 值(1 或 2),因此优化版本加分。但是,出于学习 Elixir 的目的,我对跨多个用例的解决方案感兴趣。性能很重要。我将 运行 针对解决方案的各种 n 值进行一些基准测试并发布。
基准更新 - 2015 年 4 月 8 日
6 个可行的解决方案已发布。 https://gist.github.com/spitsw/fce5304ec6941578e454 提供了基准测试的详细信息。基准测试 运行 包含 500 个项目的列表,用于各种 n 值。
对于 n=1,结果如下:
PatrickSuspend.lookahead 104.90 µs/op
Warren.lookahead 174.00 µs/op
PatrickChunk.lookahead 310.60 µs/op
PatrickTransform.lookahead 357.00 µs/op
Jose.lookahead 647.60 µs/op
PatrickUnfold.lookahead 1484000.00 µs/op
对于 n=50,结果如下:
PatrickSuspend.lookahead 220.80 µs/op
Warren.lookahead 320.60 µs/op
PatrickTransform.lookahead 518.60 µs/op
Jose.lookahead 1390.00 µs/op
PatrickChunk.lookahead 3058.00 µs/op
PatrickUnfold.lookahead 1345000.00 µs/op (faster than n=1)
您应该可以使用 Stream。chunk/4
看起来像这样:
defmodule MyMod do
def lookahead(enum, amount) do
Stream.chunk(enum, amount + 1, 1, [])
end
end
根据您的意见:
iex(2)> MyMod.lookahead(1..6, 1) |> Enum.to_list
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]]
iex(3)> MyMod.lookahead(1..4, 2) |> Enum.to_list
[[1, 2, 3], [2, 3, 4], [3, 4]]
iex(4)> Stream.cycle(1..3) |> MyMod.lookahead(1) |> Enum.take(5)
[[1, 2], [2, 3], [3, 1], [1, 2], [2, 3]]
下面是此类函数的低效实现:
defmodule Lookahead do
def lookahead(enumerable, n) when n > 0 do
enumerable
|> Stream.chunk(n + 1, 1, [])
|> Stream.flat_map(fn list ->
length = length(list)
if length < n + 1 do
[list|Enum.scan(1..n-1, list, fn _, acc -> Enum.drop(acc, 1) end)]
else
[list]
end
end)
end
end
它建立在@hahuang65 实现之上,除了我们使用 Stream.flat_map/2
检查每个发出的项目的长度,一旦我们检测到发出的项目变短就添加缺失的。
从头开始手写实现会更快,因为我们不需要在每次迭代时都调用 length(list)
。如果 n
很小,上面的实现可能没问题。如果 n 是固定的,您甚至可以显式地在生成的列表上进行模式匹配。
正如评论中所讨论的那样,我的第一次尝试遇到了一些性能问题并且不适用于具有副作用的流,例如 IO 流。我花时间深入研究流库并最终想出了这个解决方案:
defmodule MyStream
def lookahead(enum, n) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_lookahead(n, :buffer, [], next, &1, &2)
end
# stream suspended
defp do_lookahead(n, state, buf, next, {:suspend, acc}, fun) do
{:suspended, acc, &do_lookahead(n, state, buf, next, &1, fun)}
end
# stream halted
defp do_lookahead(_n, _state, _buf, _next, {:halt, acc}, _fun) do
{:halted, acc}
end
# initial buffering
defp do_lookahead(n, :buffer, buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
new_state = if length(buf) < n, do: :buffer, else: :emit
do_lookahead(n, new_state, buf ++ [val], next, {:cont, acc}, fun)
{_, _} ->
do_lookahead(n, :emit, buf, next, {:cont, acc}, fun)
end
end
# emitting
defp do_lookahead(n, :emit, [_|rest] = buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_lookahead(n, :emit, rest ++ [val], next, fun.(buf, acc), fun)
{_, _} ->
do_lookahead(n, :emit, rest, next, fun.(buf, acc), fun)
end
end
# buffer empty, halting
defp do_lookahead(_n, :emit, [], _next, {:cont, acc}, _fun) do
{:halted, acc}
end
end
乍一看这可能让人望而生畏,但实际上并没有那么难。我会尝试为您分解它,但是像这样的完整示例很难做到这一点。
让我们从一个更简单的示例开始:不断重复赋予它的值的流。为了发出流,我们可以 return 一个以累加器和函数作为参数的函数。为了发出一个值,我们用两个参数调用函数:要发出的值和累加器。 acc
累加器是一个元组,由命令(:cont
、:suspend
或 :halt
)组成,并告诉我们消费者希望我们做什么;我们需要 return 的结果取决于操作。如果应该暂停流,我们 return 原子的三元素元组 :suspended
、累加器和枚举继续时将调用的函数(有时称为 "continuation")。对于 :halt
命令,我们简单地 return {:halted, acc}
而对于 :cont
我们通过执行上述递归步骤发出一个值。整个事情看起来像这样:
defmodule MyStream do
def repeat(val) do
&do_repeat(val, &1, &2)
end
defp do_repeat(val, {:suspend, acc}, fun) do
{:suspended, acc, &do_repeat(val, &1, fun)}
end
defp do_repeat(_val, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_repeat(val, {:cont, acc}, fun) do
do_repeat(val, fun.(val, acc), fun)
end
end
现在这只是拼图的一部分。我们可以发出一个流,但我们还没有处理传入的流。同样,为了解释它是如何工作的,构建一个更简单的例子是有意义的。在这里,我将构建一个函数,它接受一个可枚举的对象,并为每个值暂停和重新发出。
defmodule MyStream do
def passthrough(enum) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_passthrough(next, &1, &2)
end
defp do_passthrough(next, {:suspend, acc}, fun) do
{:suspended, acc, &do_passthrough(next, &1, fun)}
end
defp do_passthrough(_next, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_passthrough(next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_passthrough(next, fun.(val, acc), fun)
{_, _} ->
{:halted, acc}
end
end
end
第一个子句设置了 next
函数,该函数向下传递给 do_passthrough
函数。它用于从传入流中获取下一个值的目的。内部使用的 step 函数定义了我们为流中的每个项目暂停。除了最后一个子句外,其余部分非常相似。在这里,我们用 {:cont, []}
调用 next 函数以获取新值并通过 case 语句处理结果。如果有值,我们返回 {:suspended, val, next}
,如果没有,流将停止,我们将其传递给消费者。
我希望澄清一些关于如何在 Elixir 中手动构建流的事情。不幸的是,使用流需要大量的样板文件。如果你现在回到 lookahead
实现,你会发现只有微小的差异,这是真正有趣的部分。还有两个附加参数:state
,用于区分 :buffer
和 :emit
步骤,以及 buffer
,它预先填充了 n+1
项初始缓冲步骤。在发射阶段,当前缓冲区被发射,然后在每次迭代中向左移动。当输入流停止或我们的流直接停止时,我们就完成了。
原答案留在这里供参考:
这是一个使用 Stream.unfold/2
发出真实值流的解决方案
根据您的规格。这意味着您需要将 Enum.to_list
添加到
结束你的前两个例子获取实际值。
defmodule MyStream do
def lookahead(stream, n) do
Stream.unfold split(stream, n+1), fn
{[], stream} ->
nil
{[_ | buf] = current, stream} ->
{value, stream} = split(stream, 1)
{current, {buf ++ value, stream}}
end
end
defp split(stream, n) do
{Enum.take(stream, n), Stream.drop(stream, n)}
end
end
一般的想法是我们保留一个以前迭代的 buf。在每次迭代中,我们发出当前 buf,从流中取出一个值并将其附加到 buf 的末尾。重复此过程直到 buf 为空。
示例:
iex> MyStream.lookahead(1..6, 1) |> Enum.to_list
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]]
iex> MyStream.lookahead(1..4, 2) |> Enum.to_list
[[1, 2, 3], [2, 3, 4], [3, 4], [4]]
iex> Stream.cycle(1..3) |> MyStream.lookahead(2) |> Enum.take(5)
[[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 2, 3], [2, 3, 1]]
以下解决方案使用 Stream.resource 和 Enumerable.reduce 的挂起能力。所有的例子都通过了。
简而言之,它使用Enumerable.reduce构建列表。然后,它在每次迭代时暂停 reducer,移除列表的头部,并将最新的项目添加到列表的尾部。最后,当 reducer 为 :done 或 :halted 时,它会生成流的尾部。所有这些都使用 Stream.resource.
进行协调
如果每次迭代都使用 FIFO 队列而不是列表,这会更有效。
请提供任何简化、效率或错误的反馈
def Module
def lookahead(enum, n) when n >= 0 do
reducer = fn -> Enumerable.reduce(enum, {:cont, {0, []}}, fn
item, {c, list} when c < n -> {:cont, {c+1, list ++ [item]}} # Build up the first list
item, {c, list} when c == n -> {:suspend, {c+1, list ++ [item]}} # Suspend on first full list
item, {c, [_|list]} -> {:suspend, {c, list ++ [item]}} # Remove the first item and emit
end)
end
Stream.resource(reducer,
fn
{:suspended, {_, list} = acc , fun} -> {[list], fun.({:cont, acc})}
{:halted, _} = result -> lookahead_trail(n, result) # Emit the trailing items
{:done, _} = result -> lookahead_trail(n, result) # Emit the trailing items
end,
fn
{:suspended, acc, fun} -> fun.({:halt, acc}) # Ensure the reducer is halted after suspend
_ ->
end)
end
defp lookahead_trail(n, acc) do
case acc do
{action, {c, [_|rest]}} when c > n -> {[], {action, {c-1, rest}}} # List already emitted here
{action, {c, [_|rest] = list}} -> {[list], {action, {c-1, rest}}} # Emit the next tail item
acc -> {:halt, acc } # Finish of the stream
end
end
end
I had started a discussion about my proposed Stream.mutate
method on the elixir core mailing list, where Peter Hamilton suggested another way of solving this problem. By using make_ref
to create a globally unique reference,我们可以创建一个填充流并将其与原始可枚举连接起来,以便在原始流停止后继续发射。这可以与 Stream.chunk
结合使用,这意味着我们需要在最后一步中删除不需要的引用:
def lookahead(enum, n) do
stop = make_ref
enum
|> Stream.concat(List.duplicate(stop, n))
|> Stream.chunk(n+1, 1)
|> Stream.map(&Enum.reject(&1, fn x -> x == stop end))
end
我认为从句法的角度来看,这是迄今为止最漂亮的解决方案。或者,我们可以使用 Stream.transform
手动构建缓冲区,这与我之前提出的手动解决方案非常相似:
def lookahead(enum, n) do
stop = make_ref
enum
|> Stream.concat(List.duplicate(stop, n+1))
|> Stream.transform([], fn val, acc ->
case {val, acc} do
{^stop, []} -> {[] , [] }
{^stop, [_|rest] = buf} -> {[buf], rest }
{val , buf} when length(buf) < n+1 -> {[] , buf ++ [val] }
{val , [_|rest] = buf} -> {[buf], rest ++ [val]}
end
end)
end
我没有对这些解决方案进行基准测试,但我想第二个虽然稍微笨拙,但应该表现得更好一些,因为它不必遍历每个块。
顺便说一下,第二种方案可以不写case语句once Elixir allows to use the pin operator in function heads (probably in v1.1.0):
def lookahead(enum, n) do
stop = make_ref
enum
|> Stream.concat(List.duplicate(stop, n+1))
|> Stream.transform([], fn
^stop, [] -> {[] , [] }
^stop, [_|rest] = buf -> {[buf], rest }
val , buf when length(buf) < n+1 -> {[] , buf ++ [val] }
val , [_|rest] = buf -> {[buf], rest ++ [val]}
end)
end
我从沃伦那里得到灵感,做了这个。基本用法:
ex> {peek, enum} = StreamSplit.peek 1..10, 3
{[1, 2, 3], #Function<57.77324385/2 in Stream.transform/3>}
iex> Enum.take(enum, 5)
[1, 2, 3, 4, 5]
我可能会迟到,但可以用 Stream.chunk_while/4,
defmodule Denis do
def lookahead(enumerable) do
chunk_fun = fn
element, nil -> {:cont, element}
element, acc -> {:cont, [acc, element], element}
end
after_fun = fn
nil -> {:cont, []}
[] -> {:cont, []}
acc -> {:cont, [acc], []}
end
enumerable
|> Stream.chunk_while(nil, chunk_fun, after_fun)
end
end
我开始学习 Elixir,遇到了一个我无法轻松解决的挑战。
我正在尝试创建接受 Enumerable.t 和 returns 另一个包含下一个 n 项的 Enumerable.t 的函数。它与 Enum.chunk(e, n, 1, []) 的行为略有不同,因为数字迭代计数始终等于原始可枚举计数。我还需要支持 Streams
@spec lookahead(Enumerable.t, non_neg_integer) :: Enumerable.t
这最好用 doctest 语法来说明:
iex> lookahead(1..6, 1) |> Enum.to_list
[[1,2],[2,3],[3,4],[4,5],[5,6],[6]]
iex> lookahead(1..4, 2) |> Enum.to_list
[[1,2,3],[2,3,4],[3,4],[4]]
iex> Stream.cycle(1..4) |> lookahead(2) |> Enum.take(5)
[[1,2,3],[2,3,4],[3,4,1],[4,1,2],[1,2,3]]
iex> {:ok,io} = StringIO.open("abcd")
iex> IO.stream(io,1) |> lookahead(2) |> Enum.to_list
[["a","b","c"],["b","c","d"],["c","d"],["d"]]
我研究了 Enumerable.t 协议的实现,但还不太了解 Enumerable.reduce 接口。
有什么succinct/elegant方法可以做到这一点吗?
我的用例是二进制流上的一个小的固定 n 值(1 或 2),因此优化版本加分。但是,出于学习 Elixir 的目的,我对跨多个用例的解决方案感兴趣。性能很重要。我将 运行 针对解决方案的各种 n 值进行一些基准测试并发布。
基准更新 - 2015 年 4 月 8 日
6 个可行的解决方案已发布。 https://gist.github.com/spitsw/fce5304ec6941578e454 提供了基准测试的详细信息。基准测试 运行 包含 500 个项目的列表,用于各种 n 值。
对于 n=1,结果如下:
PatrickSuspend.lookahead 104.90 µs/op
Warren.lookahead 174.00 µs/op
PatrickChunk.lookahead 310.60 µs/op
PatrickTransform.lookahead 357.00 µs/op
Jose.lookahead 647.60 µs/op
PatrickUnfold.lookahead 1484000.00 µs/op
对于 n=50,结果如下:
PatrickSuspend.lookahead 220.80 µs/op
Warren.lookahead 320.60 µs/op
PatrickTransform.lookahead 518.60 µs/op
Jose.lookahead 1390.00 µs/op
PatrickChunk.lookahead 3058.00 µs/op
PatrickUnfold.lookahead 1345000.00 µs/op (faster than n=1)
您应该可以使用 Stream。chunk/4
看起来像这样:
defmodule MyMod do
def lookahead(enum, amount) do
Stream.chunk(enum, amount + 1, 1, [])
end
end
根据您的意见:
iex(2)> MyMod.lookahead(1..6, 1) |> Enum.to_list
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]]
iex(3)> MyMod.lookahead(1..4, 2) |> Enum.to_list
[[1, 2, 3], [2, 3, 4], [3, 4]]
iex(4)> Stream.cycle(1..3) |> MyMod.lookahead(1) |> Enum.take(5)
[[1, 2], [2, 3], [3, 1], [1, 2], [2, 3]]
下面是此类函数的低效实现:
defmodule Lookahead do
def lookahead(enumerable, n) when n > 0 do
enumerable
|> Stream.chunk(n + 1, 1, [])
|> Stream.flat_map(fn list ->
length = length(list)
if length < n + 1 do
[list|Enum.scan(1..n-1, list, fn _, acc -> Enum.drop(acc, 1) end)]
else
[list]
end
end)
end
end
它建立在@hahuang65 实现之上,除了我们使用 Stream.flat_map/2
检查每个发出的项目的长度,一旦我们检测到发出的项目变短就添加缺失的。
从头开始手写实现会更快,因为我们不需要在每次迭代时都调用 length(list)
。如果 n
很小,上面的实现可能没问题。如果 n 是固定的,您甚至可以显式地在生成的列表上进行模式匹配。
正如评论中所讨论的那样,我的第一次尝试遇到了一些性能问题并且不适用于具有副作用的流,例如 IO 流。我花时间深入研究流库并最终想出了这个解决方案:
defmodule MyStream
def lookahead(enum, n) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_lookahead(n, :buffer, [], next, &1, &2)
end
# stream suspended
defp do_lookahead(n, state, buf, next, {:suspend, acc}, fun) do
{:suspended, acc, &do_lookahead(n, state, buf, next, &1, fun)}
end
# stream halted
defp do_lookahead(_n, _state, _buf, _next, {:halt, acc}, _fun) do
{:halted, acc}
end
# initial buffering
defp do_lookahead(n, :buffer, buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
new_state = if length(buf) < n, do: :buffer, else: :emit
do_lookahead(n, new_state, buf ++ [val], next, {:cont, acc}, fun)
{_, _} ->
do_lookahead(n, :emit, buf, next, {:cont, acc}, fun)
end
end
# emitting
defp do_lookahead(n, :emit, [_|rest] = buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_lookahead(n, :emit, rest ++ [val], next, fun.(buf, acc), fun)
{_, _} ->
do_lookahead(n, :emit, rest, next, fun.(buf, acc), fun)
end
end
# buffer empty, halting
defp do_lookahead(_n, :emit, [], _next, {:cont, acc}, _fun) do
{:halted, acc}
end
end
乍一看这可能让人望而生畏,但实际上并没有那么难。我会尝试为您分解它,但是像这样的完整示例很难做到这一点。
让我们从一个更简单的示例开始:不断重复赋予它的值的流。为了发出流,我们可以 return 一个以累加器和函数作为参数的函数。为了发出一个值,我们用两个参数调用函数:要发出的值和累加器。 acc
累加器是一个元组,由命令(:cont
、:suspend
或 :halt
)组成,并告诉我们消费者希望我们做什么;我们需要 return 的结果取决于操作。如果应该暂停流,我们 return 原子的三元素元组 :suspended
、累加器和枚举继续时将调用的函数(有时称为 "continuation")。对于 :halt
命令,我们简单地 return {:halted, acc}
而对于 :cont
我们通过执行上述递归步骤发出一个值。整个事情看起来像这样:
defmodule MyStream do
def repeat(val) do
&do_repeat(val, &1, &2)
end
defp do_repeat(val, {:suspend, acc}, fun) do
{:suspended, acc, &do_repeat(val, &1, fun)}
end
defp do_repeat(_val, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_repeat(val, {:cont, acc}, fun) do
do_repeat(val, fun.(val, acc), fun)
end
end
现在这只是拼图的一部分。我们可以发出一个流,但我们还没有处理传入的流。同样,为了解释它是如何工作的,构建一个更简单的例子是有意义的。在这里,我将构建一个函数,它接受一个可枚举的对象,并为每个值暂停和重新发出。
defmodule MyStream do
def passthrough(enum) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_passthrough(next, &1, &2)
end
defp do_passthrough(next, {:suspend, acc}, fun) do
{:suspended, acc, &do_passthrough(next, &1, fun)}
end
defp do_passthrough(_next, {:halt, acc}, _fun) do
{:halted, acc}
end
defp do_passthrough(next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_passthrough(next, fun.(val, acc), fun)
{_, _} ->
{:halted, acc}
end
end
end
第一个子句设置了 next
函数,该函数向下传递给 do_passthrough
函数。它用于从传入流中获取下一个值的目的。内部使用的 step 函数定义了我们为流中的每个项目暂停。除了最后一个子句外,其余部分非常相似。在这里,我们用 {:cont, []}
调用 next 函数以获取新值并通过 case 语句处理结果。如果有值,我们返回 {:suspended, val, next}
,如果没有,流将停止,我们将其传递给消费者。
我希望澄清一些关于如何在 Elixir 中手动构建流的事情。不幸的是,使用流需要大量的样板文件。如果你现在回到 lookahead
实现,你会发现只有微小的差异,这是真正有趣的部分。还有两个附加参数:state
,用于区分 :buffer
和 :emit
步骤,以及 buffer
,它预先填充了 n+1
项初始缓冲步骤。在发射阶段,当前缓冲区被发射,然后在每次迭代中向左移动。当输入流停止或我们的流直接停止时,我们就完成了。
原答案留在这里供参考:
这是一个使用 Stream.unfold/2
发出真实值流的解决方案
根据您的规格。这意味着您需要将 Enum.to_list
添加到
结束你的前两个例子获取实际值。
defmodule MyStream do
def lookahead(stream, n) do
Stream.unfold split(stream, n+1), fn
{[], stream} ->
nil
{[_ | buf] = current, stream} ->
{value, stream} = split(stream, 1)
{current, {buf ++ value, stream}}
end
end
defp split(stream, n) do
{Enum.take(stream, n), Stream.drop(stream, n)}
end
end
一般的想法是我们保留一个以前迭代的 buf。在每次迭代中,我们发出当前 buf,从流中取出一个值并将其附加到 buf 的末尾。重复此过程直到 buf 为空。
示例:
iex> MyStream.lookahead(1..6, 1) |> Enum.to_list
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]]
iex> MyStream.lookahead(1..4, 2) |> Enum.to_list
[[1, 2, 3], [2, 3, 4], [3, 4], [4]]
iex> Stream.cycle(1..3) |> MyStream.lookahead(2) |> Enum.take(5)
[[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 2, 3], [2, 3, 1]]
以下解决方案使用 Stream.resource 和 Enumerable.reduce 的挂起能力。所有的例子都通过了。
简而言之,它使用Enumerable.reduce构建列表。然后,它在每次迭代时暂停 reducer,移除列表的头部,并将最新的项目添加到列表的尾部。最后,当 reducer 为 :done 或 :halted 时,它会生成流的尾部。所有这些都使用 Stream.resource.
进行协调如果每次迭代都使用 FIFO 队列而不是列表,这会更有效。
请提供任何简化、效率或错误的反馈
def Module
def lookahead(enum, n) when n >= 0 do
reducer = fn -> Enumerable.reduce(enum, {:cont, {0, []}}, fn
item, {c, list} when c < n -> {:cont, {c+1, list ++ [item]}} # Build up the first list
item, {c, list} when c == n -> {:suspend, {c+1, list ++ [item]}} # Suspend on first full list
item, {c, [_|list]} -> {:suspend, {c, list ++ [item]}} # Remove the first item and emit
end)
end
Stream.resource(reducer,
fn
{:suspended, {_, list} = acc , fun} -> {[list], fun.({:cont, acc})}
{:halted, _} = result -> lookahead_trail(n, result) # Emit the trailing items
{:done, _} = result -> lookahead_trail(n, result) # Emit the trailing items
end,
fn
{:suspended, acc, fun} -> fun.({:halt, acc}) # Ensure the reducer is halted after suspend
_ ->
end)
end
defp lookahead_trail(n, acc) do
case acc do
{action, {c, [_|rest]}} when c > n -> {[], {action, {c-1, rest}}} # List already emitted here
{action, {c, [_|rest] = list}} -> {[list], {action, {c-1, rest}}} # Emit the next tail item
acc -> {:halt, acc } # Finish of the stream
end
end
end
I had started a discussion about my proposed Stream.mutate
method on the elixir core mailing list, where Peter Hamilton suggested another way of solving this problem. By using make_ref
to create a globally unique reference,我们可以创建一个填充流并将其与原始可枚举连接起来,以便在原始流停止后继续发射。这可以与 Stream.chunk
结合使用,这意味着我们需要在最后一步中删除不需要的引用:
def lookahead(enum, n) do
stop = make_ref
enum
|> Stream.concat(List.duplicate(stop, n))
|> Stream.chunk(n+1, 1)
|> Stream.map(&Enum.reject(&1, fn x -> x == stop end))
end
我认为从句法的角度来看,这是迄今为止最漂亮的解决方案。或者,我们可以使用 Stream.transform
手动构建缓冲区,这与我之前提出的手动解决方案非常相似:
def lookahead(enum, n) do
stop = make_ref
enum
|> Stream.concat(List.duplicate(stop, n+1))
|> Stream.transform([], fn val, acc ->
case {val, acc} do
{^stop, []} -> {[] , [] }
{^stop, [_|rest] = buf} -> {[buf], rest }
{val , buf} when length(buf) < n+1 -> {[] , buf ++ [val] }
{val , [_|rest] = buf} -> {[buf], rest ++ [val]}
end
end)
end
我没有对这些解决方案进行基准测试,但我想第二个虽然稍微笨拙,但应该表现得更好一些,因为它不必遍历每个块。
顺便说一下,第二种方案可以不写case语句once Elixir allows to use the pin operator in function heads (probably in v1.1.0):
def lookahead(enum, n) do
stop = make_ref
enum
|> Stream.concat(List.duplicate(stop, n+1))
|> Stream.transform([], fn
^stop, [] -> {[] , [] }
^stop, [_|rest] = buf -> {[buf], rest }
val , buf when length(buf) < n+1 -> {[] , buf ++ [val] }
val , [_|rest] = buf -> {[buf], rest ++ [val]}
end)
end
我从沃伦那里得到灵感,做了这个。基本用法:
ex> {peek, enum} = StreamSplit.peek 1..10, 3
{[1, 2, 3], #Function<57.77324385/2 in Stream.transform/3>}
iex> Enum.take(enum, 5)
[1, 2, 3, 4, 5]
我可能会迟到,但可以用 Stream.chunk_while/4,
defmodule Denis do
def lookahead(enumerable) do
chunk_fun = fn
element, nil -> {:cont, element}
element, acc -> {:cont, [acc, element], element}
end
after_fun = fn
nil -> {:cont, []}
[] -> {:cont, []}
acc -> {:cont, [acc], []}
end
enumerable
|> Stream.chunk_while(nil, chunk_fun, after_fun)
end
end