gpt4 book ai didi

elixir - Task.async_stream 超时行为

转载 作者:行者123 更新时间:2023-12-02 05:14:51 31 4
gpt4 key购买 nike

Task.async_stream options:timeout参数说明:

The maximum amount of time (in milliseconds) each task is allowed to execute for. Defaults to 5000

在测试中我执行了以下操作:

iex(8)> Task.async_stream([10, 4, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
[ok: 10, ok: 4, ok: 5]

iex(10)> Task.async_stream([10], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out
(elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
(elixir) lib/enum.ex:1776: Enum.reverse/2
(elixir) lib/enum.ex:2528: Enum.to_list/1

为什么第一个示例没有超时(但执行时间约为 10 秒),而第二个示例却表现出预期的超时行为?

最佳答案

Task.async_stream 的实现从 1.4.5 更改为 1.5.1。

让我们看看会发生什么。

Elixir 1.4.5

在此版本中 timeout is part of a receives after block .

receive do
{{^monitor_ref, position}, value} ->
# ...

{:down, {^monitor_ref, position}, reason} ->
# ...

{:DOWN, ^monitor_ref, _, ^monitor_pid, reason} ->
# ...
after
timeout ->
# ...
end

此接收 block 的作用是等待监控进程发送的衍生任务的更新消息。为了简单起见,我截断了代码。

这在应用场景中意味着什么? Task.async_stream 仅当在 timeout 毫秒内没有收到来自生成任务的消息时才会超时。

示例

让我们使用 [10, 3, 4] 尝试您的示例:

iex> Task.async_stream([10, 3, 4], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list
** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out
(elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
(elixir) lib/enum.ex:1776: Enum.reverse/2
(elixir) lib/enum.ex:2528: Enum.to_list/1

正如我们所见,这会导致超时,正如预期的那样。

现在,如果我们尝试使用 [10, 5],这会起作用吗?

iex> Task.async_stream([10, 5], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out
(elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
(elixir) lib/enum.ex:1776: Enum.reverse/2
(elixir) lib/enum.ex:2528: Enum.to_list/1

似乎初始任务花费的时间太长,超时时间为 5 秒。但只要我们添加一个中间步骤,它就会起作用。 1 怎么样?

iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
[ok: 10, ok: 5, ok: 1]

Elixir 1.5.1

在 Elixir 1.5.1 中,超时逻辑的工作方式有所不同。它使用 Process.send_after 向监控进程发送每个生成的任务的超时消息。

# Schedule a timeout message to ourselves, unless the timeout was set to :infinity
timer_ref = case timeout do
:infinity -> nil
timeout -> Process.send_after(self(), {:timeout, {monitor_ref, ref}}, timeout)
end

这条消息就是handled in the same receive which spawned the Task and sent the :timeout message .

Link to the full function.

示例

一旦单个进程花费的时间超过指定的超时时间,整个流就会陷入崩溃,这是应该的。

iex> Task.async_stream([10, 5, 1], fn i -> :timer.sleep(i * 1000); i end) |> Enum.to_list()
** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out
(elixir) lib/task/supervised.ex:237: Task.Supervised.stream_reduce/7
(elixir) lib/enum.ex:1847: Enum.reverse/1
(elixir) lib/enum.ex:2596: Enum.to_list/1

TL;DR

Elixir 1.4.5 在收到衍生进程的结果后重新跟踪超时。 Elixir 1.5.1 为每个生成的进程单独跟踪它。

关于elixir - Task.async_stream 超时行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45866245/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com