gpt4 book ai didi

c# - EventProcessorClient 事件之间的延迟

转载 作者:行者123 更新时间:2023-12-03 05:23:32 25 4
gpt4 key购买 nike

我正在尝试创建一种方法来处理事件中心中的事件峰值。我当前的 poc 解决方案只是在消耗事件时触发并忘记任务,而不是等待它们,然后使用信号量限制并行任务量以避免资源匮乏。

限制事物的实用程序:

    public class ThrottledParallelTaskFactory
{
...

public Task StartNew(Func<Task> func)
{
_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
_semaphoreSlim.Wait(_timeout);

_ = Task.Run(func)
.ContinueWith(t =>
{
if (t.Status is TaskStatus.Faulted or TaskStatus.Canceled or TaskStatus.RanToCompletion)
{
_semaphoreSlim.Release();
_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
}
if (t.Status is TaskStatus.Canceled or TaskStatus.Faulted)
{
_logger?.LogError(t.Exception, "Parallel task failed");
}
});
return Task.CompletedTask;
}
}

我的EventProcessorClient.ProcessEventAsync委托(delegate):

 private Task ProcessEvent(ProcessEventArgs arg)
{
var sw = Stopwatch.StartNew();
try
{
_throttledParallelTaskFactory.StartNew(async () => await Task.Delay(1000));
}
catch (Exception e)
{
_logger.LogError(e, "Failed to process event");
}
_logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
return Task.CompletedTask;
}

运行此设置一段时间后,我注意到当我配置的限制为 15 时,我的节流器的信号量在并行运行 2-3 个任务时达到最大值。这表明我的处理程序需要 333-500 毫秒才能完成,但处理程序内的秒表显示整个处理程序需要 0 毫秒执行。后来我添加了处理程序开始/结束时的时间戳记录以确认它,它确实需要 0-1 毫秒,但它们之间有一个神秘的 300-600 毫秒差距。 注意:对于当前的测试,该客户端正在处理数百万个事件的积压,而不是处理实时数据,这可能会导致事件之间出现类似的延迟。

在每个事件之后,EventProcessorClient 是否会在内部设置检查点? 300-500 毫秒在我看来似乎很长。我既使用了默认的缓存事件/预取计数,又增加了计数,没有太大区别。

编辑:

最终这不是与实现相关的网络问题

最佳答案

您没有衡量正确的事情,基本上您使用的是错误的 async/await 和 Task。

        private Task ProcessEvent(ProcessEventArgs arg)
{
var sw = Stopwatch.StartNew();
try
{
_throttledParallelTaskFactory.StartNew(async () => await Task.Delay(1000));
}
catch (Exception e)
{
_logger.LogError(e, "Failed to process event");
}
_logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
return Task.CompletedTask;
}

在上面的代码中,不等待对_throttledParallelTask​​Factory.StartNew的调用。所以秒表没有什么可测量的。此外,由于不等待调用,因此不会捕获任何异常。

您应该将异常处理和时间测量移至 StartNew 方法,如下所示:

        private Task ProcessEvent(ProcessEventArgs arg)
{
_throttledParallelTaskFactory.StartNew(() => Task.Delay(1000));

return Task.CompletedTask;
}
public class ThrottledParallelTaskFactory
{
public async Task StartNew(Func<Task> func)
{
var sw = Stopwatch.StartNew();

_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
_semaphoreSlim.Wait(_timeout);

try
{
await func.Invoke();
}
catch
{
_logger.LogError(e, "Failed to process event");
_logger?.LogError(t.Exception, "Parallel task failed");
}
finally
{
_semaphoreSlim.Release();
_logger.LogDebug("Available semaphore count {AvailableDataConsumerCount} out of total {DataConsumerCountLimit}", _semaphore.CurrentCount, _limit);
_logger.LogDebug($"Took {sw.ElapsedMilliseconds} ms");
}
}
}

看看我们如何摆脱对 ContinueWith 的调用?此外,由于 func 已经代表一个 Task,因此无需将代码包装在对 Task.Run 的调用中。

Does by any chance EventProcessorClient checkpoint internally after every single event?

不,事实并非如此。您必须手动执行检查点。

关于c# - EventProcessorClient 事件之间的延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70555889/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com