gpt4 book ai didi

elixir - 使用 Elixir 去抖动事件

转载 作者:行者123 更新时间:2023-12-01 08:52:51 25 4
gpt4 key购买 nike

我正在从 MQ 获取事件流到我的 Elixir 消费者。

在消费者中我需要:

  1. 按 ID 和事件聚合事件
  2. 如果 3 分钟内没有该 ID 的新数据,则向下游发送该 ID 的汇总数据。

就我而言,数据集并不大。每天可能有几百个 ID 和几千个更新。

有什么方法可以使用 GenServer 魔法解决这个问题?

谢谢!

最佳答案

我会这样做:

每当有新事件发生时:

  • 如果它是第一个具有该 id 的事件,则使用 Process.send_after/3 创建一个计时器 ref,超时时间为 3 分钟,并将事件和计时器存储在状态中。

  • 如果不是第一个具有该 id 的事件,则使用 Process.cancel_timer/1 取消存储的计时器引用,如上一步所述创建一个新计时器,并存储新的计时器计时器以及与旧事件连接的新事件。

并且在计时器触发的 handle_info 中,将该 ID 的事件推送到下游并从状态中删除该条目。

这是上面的一个简单实现:

defmodule DebouncedEcho do
@timeout 1000

use GenServer

def start_link do
GenServer.start_link __MODULE__, []
end

def init(_) do
{:ok, %{}}
end

def handle_cast({:store, id, event}, state) do
case state[id] do
nil ->
timer = Process.send_after(self, {:timer, id}, @timeout)
state = Map.put(state, id, %{events: [event], timer: timer})
{:noreply, state}
%{events: events, timer: timer} ->
Process.cancel_timer(timer)
timer = Process.send_after(self, {:timer, id}, @timeout)
state = Map.put(state, id, %{events: [event | events], timer: timer})
{:noreply, state}
end
end

def handle_info({:timer, id}, state) do
%{events: events} = state[id]
IO.inspect {:flush, id, events}
state = Map.delete(state, id)
{:noreply, state}
end
end

测试:

{:ok, server} = DebouncedEcho.start_link
GenServer.cast server, {:store, 1, :foo}
GenServer.cast server, {:store, 1, :bar}
GenServer.cast server, {:store, 2, :foo}
:timer.sleep(500)
GenServer.cast server, {:store, 2, :bar}
:timer.sleep(500)
GenServer.cast server, {:store, 2, :baz}
:timer.sleep(500)
GenServer.cast server, {:store, 1, :baz}
:timer.sleep(2000)

输出:

{:flush, 1, [:bar, :foo]}
{:flush, 2, [:baz, :bar, :foo]}
{:flush, 1, [:baz]}

关于elixir - 使用 Elixir 去抖动事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37152916/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com