gpt4 book ai didi

elixir - 具有前瞻性的可枚举/流

转载 作者:行者123 更新时间:2023-12-04 10:24:41 27 4
gpt4 key购买 nike

我开始学习 Elixir 并遇到了一个我无法轻松解决的挑战。

我正在尝试创建一个函数,该函数接受一个 Enumerable.t 并返回另一个 Enumerable.t ,其中包含下 n 个项目。它与 Enum.chunk(e, n, 1, []) 的行为略有不同,因为数字迭代计数将始终等于原始可枚举计数。我还需要支持 Streams

@spec lookahead(Enumerable.t, non_neg_integer) :: Enumerable.t

最好用 doctest 语法来说明这一点:
iex> lookahead(1..6, 1) |> Enum.to_list
[[1,2],[2,3],[3,4],[4,5],[5,6],[6]]

iex> lookahead(1..4, 2) |> Enum.to_list
[[1,2,3],[2,3,4],[3,4],[4]]

iex> Stream.cycle(1..4) |> lookahead(2) |> Enum.take(5)
[[1,2,3],[2,3,4],[3,4,1],[4,1,2],[1,2,3]]

iex> {:ok,io} = StringIO.open("abcd")
iex> IO.stream(io,1) |> lookahead(2) |> Enum.to_list
[["a","b","c"],["b","c","d"],["c","d"],["d"]]

我已经研究过实现 Enumerable.t 协议(protocol),但还不太了解 Enumerable.reduce 接口(interface)。

有没有任何简洁/优雅的方式来做到这一点?

我的用例是二进制流上的一个小的固定 n 值(1 或 2),因此优化版本的额外积分。但是,出于学习 Elixir 的目的,我对跨多个用例的解决方案感兴趣。性能很重要。我将对解决方案的各种 n 值运行一些基准测试并发布。

基准更新 - 2015 年 4 月 8 日

已发布 6 个可行的解决方案。 https://gist.github.com/spitsw/fce5304ec6941578e454 上提供了基准测试的详细信息.基准测试是在一个包含 500 个项目的列表上运行的,用于不同的 n 值。

对于 n=1,结果如下:
PatrickSuspend.lookahead    104.90 µs/op
Warren.lookahead 174.00 µs/op
PatrickChunk.lookahead 310.60 µs/op
PatrickTransform.lookahead 357.00 µs/op
Jose.lookahead 647.60 µs/op
PatrickUnfold.lookahead 1484000.00 µs/op

对于 n=50,结果如下:
PatrickSuspend.lookahead    220.80 µs/op
Warren.lookahead 320.60 µs/op
PatrickTransform.lookahead 518.60 µs/op
Jose.lookahead 1390.00 µs/op
PatrickChunk.lookahead 3058.00 µs/op
PatrickUnfold.lookahead 1345000.00 µs/op (faster than n=1)

最佳答案

正如评论中所讨论的,我的第一次尝试存在一些性能问题,并且不适用于具有副作用的流,例如 IO 流。我花时间深入挖掘流库,最终想出了这个解决方案:

defmodule MyStream
def lookahead(enum, n) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_lookahead(n, :buffer, [], next, &1, &2)
end

# stream suspended
defp do_lookahead(n, state, buf, next, {:suspend, acc}, fun) do
{:suspended, acc, &do_lookahead(n, state, buf, next, &1, fun)}
end

# stream halted
defp do_lookahead(_n, _state, _buf, _next, {:halt, acc}, _fun) do
{:halted, acc}
end

# initial buffering
defp do_lookahead(n, :buffer, buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
new_state = if length(buf) < n, do: :buffer, else: :emit
do_lookahead(n, new_state, buf ++ [val], next, {:cont, acc}, fun)
{_, _} ->
do_lookahead(n, :emit, buf, next, {:cont, acc}, fun)
end
end

# emitting
defp do_lookahead(n, :emit, [_|rest] = buf, next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_lookahead(n, :emit, rest ++ [val], next, fun.(buf, acc), fun)
{_, _} ->
do_lookahead(n, :emit, rest, next, fun.(buf, acc), fun)
end
end

# buffer empty, halting
defp do_lookahead(_n, :emit, [], _next, {:cont, acc}, _fun) do
{:halted, acc}
end
end

起初这可能看起来令人生畏,但实际上并没有那么难。我会尝试为您分解它,但是对于像这样一个成熟的例子来说这很难。

让我们从一个更简单的例子开始:一个不断重复给它的值的流。为了发出一个流,我们可以返回一个以累加器和一个函数作为参数的函数。要发出一个值,我们用两个参数调用函数:要发出的值和累加器。 acc累加器是一个由命令( :cont:suspend:halt )组成的元组,它告诉我们消费者想要我们做什么;我们需要返回的结果取决于操作。如果流应该被暂停,我们返回一个原子的三元素元组 :suspended ,累加器和枚举继续时将调用的函数(有时称为“继续”)。对于 :halt命令,我们只需返回 {:halted, acc}对于 :cont我们通过执行上述递归步骤来发出一个值。整个事情看起来像这样:
defmodule MyStream do
def repeat(val) do
&do_repeat(val, &1, &2)
end

defp do_repeat(val, {:suspend, acc}, fun) do
{:suspended, acc, &do_repeat(val, &1, fun)}
end

defp do_repeat(_val, {:halt, acc}, _fun) do
{:halted, acc}
end

defp do_repeat(val, {:cont, acc}, fun) do
do_repeat(val, fun.(val, acc), fun)
end
end

现在这只是难题的一部分。我们可以发出一个流,但我们还没有处理传入的流。同样,为了解释它是如何工作的,构建一个更简单的示例是有意义的。在这里,我将构建一个函数,该函数接受一个可枚举的值,并为每个值暂停和重新发出。
defmodule MyStream do
def passthrough(enum) do
step = fn val, _acc -> {:suspend, val} end
next = &Enumerable.reduce(enum, &1, step)
&do_passthrough(next, &1, &2)
end

defp do_passthrough(next, {:suspend, acc}, fun) do
{:suspended, acc, &do_passthrough(next, &1, fun)}
end

defp do_passthrough(_next, {:halt, acc}, _fun) do
{:halted, acc}
end

defp do_passthrough(next, {:cont, acc}, fun) do
case next.({:cont, []}) do
{:suspended, val, next} ->
do_passthrough(next, fun.(val, acc), fun)
{_, _} ->
{:halted, acc}
end
end
end

第一个子句设置 next传递给 do_passthrough 的函数功能。它的目的是从传入流中获取下一个值。内部使用的 step 函数定义了我们为流中的每个项目挂起。除了最后一个子句外,其余部分非常相似。在这里,我们用 {:cont, []} 调用下一个函数获取新值并通过 case 语句处理结果。如果有值,我们返回 {:suspended, val, next} ,如果没有,则流停止,我们将其传递给消费者。

我希望澄清一些关于如何在 Elixir 中手动构建流的事情。不幸的是,使用流需要大量的样板。如果您返回 lookahead现在实现,您将看到只有微小的差异,这才是真正有趣的部分。还有两个附加参数: state ,用于区分 :buffer:emit步骤和 buffer预填充 n+1初始缓冲步骤中的项目。在发射阶段,当前缓冲区被发射,然后在每次迭代时向左移动。当输入流停止或我们的流直接停止时,我们就完成了。

我在这里留下我的原始答案以供引用:

这是一个使用 Stream.unfold/2 的解决方案发出真正的值流
根据您的规范。这意味着您需要添加 Enum.to_list
在您的前两个示例结束时获取实际值。
defmodule MyStream do
def lookahead(stream, n) do
Stream.unfold split(stream, n+1), fn
{[], stream} ->
nil
{[_ | buf] = current, stream} ->
{value, stream} = split(stream, 1)
{current, {buf ++ value, stream}}
end
end

defp split(stream, n) do
{Enum.take(stream, n), Stream.drop(stream, n)}
end
end

一般的想法是我们保留以前迭代的缓冲区。在每次迭代中,我们发出当前 buf,从流中获取一个值并将其附加到 buf 的末尾。如此重复,直到 buf 为空。

例子:
iex> MyStream.lookahead(1..6, 1) |> Enum.to_list
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6]]

iex> MyStream.lookahead(1..4, 2) |> Enum.to_list
[[1, 2, 3], [2, 3, 4], [3, 4], [4]]

iex> Stream.cycle(1..3) |> MyStream.lookahead(2) |> Enum.take(5)
[[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 2, 3], [2, 3, 1]]

关于elixir - 具有前瞻性的可枚举/流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29136874/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com