id}} offset 是一个单调递增的整数 id 为任意数 我只需要保留每个 id -6ren">
gpt4 book ai didi

elixir - 如何使用 Elixir Flow 按键分区并按顺序加入

转载 作者:行者123 更新时间:2023-12-05 04:07:38 24 4
gpt4 key购买 nike

我正在尝试围绕 Flow 构建具有以下特征的并行处理管道:

  1. 传入事件的格式为 {offset, %{"id"=> id}}
  2. offset 是一个单调递增的整数
  3. id 为任意数
  4. 我只需要保留每个 id 的顺序,因此可以并行计算不同的 id。因此分区。

这是一个示例流,用于生成无限量的这些元组:

stream = Stream.unfold(1, fn i ->
offset = i+1
element = {offset, %{"id" => Enum.random(1..10_000)}}
{element, offset}
end)

我想通过关键字 id 对流进行分区。我知道该怎么做,例如开始 8 个并行阶段:

Flow.from_enumerable(stream)
|> Flow.partition(
key: fn {_, m} -> Map.get(m, "id") end,
stages: 8
)

我在此流程中遵循的每个操作现在都是并行发生的,顺序仅由分区键 id 保持。

现在我不明白的是:如何将流加入到单个阶段,按 offset 排序?

需要明确的是,这是一个无限流,所以我们需要记住我们需要通过窗口加入(我尝试了多种方法。毕竟 10 秒的超时可以开始丢弃不存在的事件从处理中及时到达)。

这是我想象它应该如何工作的例子:

INCOMING
|
V
* PARTIONING in N-stages by `id`
|\
|-\
|--\
|||| PARALLEL PROCESSING in order by `id`
|--/
|-/
|/
| JOIN in order by `offset`
| timing out after 10 seconds, moving on with the smallest known offset
|
| SEQUENTIAL PROCESSING of each offset of the JOIN

最佳答案

连接部分可以通过再次调用 partition/2 但将阶段数设置为 1 来完成。

这是一个示例脚本,它通过发出带有偏移量和随机分区的元组来重现您的用例:

1..10000
|> Stream.map(fn i -> {i, Enum.random([:a, :b, :c, :d])} end)
|> Flow.from_enumerable()
|> Flow.partition(key: {:elem, 1})
|> Flow.reduce(fn -> [] end, fn x, acc -> [process_x(x) | acc] end)
|> Flow.emit(:state)
|> Flow.partition(stages: 1)
|> Flow.reduce(fn -> [] end, &Kernel.++/2)
|> Flow.map_state(&Enum.sort/1)
|> Flow.emit(:state)
|> Enum.to_list()
|> IO.inspect

棘手的部分是分区。 一旦分区,就必须积累状态

所以在第一个分区之后,我们调用 Flow.reduce/3,处理元素然后将它们放在列表的顶部。处理是通过调用您必须实现的 process_x 完成的。处理完所有条目后,我们会要求将整个分区状态(即事件列表)发送到下一步。

然后我们再次分区,但这次分成一个阶段,简单地连接之前分区的结果,然后在最后对它们进行排序。

还有一点在我上面的例子中没有考虑到,你的流程是无限的,所以你需要加一些窗口。您需要选择从每个分区发出项目的频率。对于第一个分区,您可以批量发出 1000 个元素。对于加入的分区,您提到您希望它每 10 秒发生一次。因此,让我们添加它们。

最后,请注意上面的代码并不是最有效的,因为在最后一个分区中运行的所有内容都是串行的(单阶段)。理想情况下,您希望在第一个分区中排序,并在您定义的 merge_sorted 函数的帮助下,简单地将排序后的结果合并到最后一个分区中。

最终结果如下:

partition_window = Flow.Window.global |> Flow.Window.trigger_every(1000, :reset)
join_window = Flow.Window.global |> Flow.Window.trigger_periodically(10, :second, :reset)

1..10000
|> Stream.map(fn i -> {i, Enum.random([:a, :b, :c, :d])} end)
|> Flow.from_enumerable()
|> Flow.partition(key: {:elem, 1}, window: partition_window)
|> Flow.reduce(fn -> [] end, fn x, acc -> [process_x(x) | acc] end)
|> Flow.map_state(&Enum.sort/1)
|> Flow.emit(:state)
|> Flow.partition(stages: 1, window: join_window)
|> Flow.reduce(fn -> [] end, &merge_sorted/2)
|> Flow.emit(:state)
|> Enum.to_list()
|> IO.inspect

关于elixir - 如何使用 Elixir Flow 按键分区并按顺序加入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48464425/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com