gpt4 book ai didi

c# - BrokeredMessage 调用 OnMessage() 后自动释放

转载 作者:行者123 更新时间:2023-11-30 14:09:14 26 4
gpt4 key购买 nike

我正在尝试将来自 Azure 服务总线的项目排队,以便可以批量处理它们。我知道 Azure 服务总线有一个 ReceiveBatch() 但它似乎有问题,原因如下:

  • 我一次最多只能获取 256 条消息,甚至可以根据消息大小随机获取。
  • 即使我查看有多少消息正在等待,我也不知道要进行多少次 RequestBatch 调用,因为我不知道每个调用会给我返回多少消息。由于消息会不断传入,我不能继续发出请求,直到它为空,因为它永远不会为空。

我决定只使用消息监听器,它比浪费时间的查看更便宜,并且会给我更多的控制权。

Basically I am trying to let a set number of messages build up and then process them at once. I use a timer to force a delay but I need to be able to queue my items as they come in.

根据我的计时器要求,阻塞集合似乎不是一个好的选择,因此我尝试使用 ConcurrentBag。

var batchingQueue = new ConcurrentBag<BrokeredMessage>();
myQueueClient.OnMessage((m) =>
{
Console.WriteLine("Queueing message");
batchingQueue.Add(m);
});

while (true)
{
var sw = WaitableStopwatch.StartNew();
BrokeredMessage msg;
while (batchingQueue.TryTake(out msg)) // <== Object is already disposed
{
...do this until I have a thousand ready to be written to DB in batch
Console.WriteLine("Completing message");
msg.Complete(); // <== ERRORS HERE
}

sw.Wait(MINIMUM_DELAY);
}

However as soon as I access the message outside of the OnMessage pipeline it shows the BrokeredMessage as already being disposed.

我认为这一定是 OnMessage 的某种自动行为,除了立即处理该消息之外,我看不到任何其他方法可以对该消息执行任何操作,而我不想这样做。

最佳答案

使用 BlockingCollection 可以非常容易地做到这一点.

var batchingQueue = new BlockingCollection<BrokeredMessage>();

myQueueClient.OnMessage((m) =>
{
Console.WriteLine("Queueing message");
batchingQueue.Add(m);
});

以及您的消费者线程:

foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
Console.WriteLine("Completing message");
msg.Complete();
}

GetConsumingEnumerable返回一个迭代器,该迭代器消耗队列中的项目,直到 IsCompleted属性已设置且队列为空。如果队列为空但 IsCompletedFalse ,它会进行非忙等待下一个项目。

要取消消费者线程(即关闭程序),您需要停止向队列添加内容并让主线程调用 batchingQueue.CompleteAdding 。消费者将清空队列,看到IsCompleted属性是 True ,然后退出。

使用BlockingCollection这里比 ConcurrentBag 更好或ConcurrentQueue ,因为BlockingCollection界面更容易使用。特别是使用GetConsumingEnumerable让您不必担心检查计数或进行繁忙等待(轮询循环)。它确实有效。

另请注意 ConcurrentBag有一些相当奇怪的删除行为。特别是,删除项目的顺序会有所不同,具体取决于删除项目的线程。创建包的线程以与其他线程不同的顺序删除项目。请参阅Using the ConcurrentBag Collection了解详情。

您还没有说明为什么要在输入时对项目进行批处理。除非有一个压倒性的性能原因这样做,否则用批处理逻辑使代码复杂化似乎不是一个特别好的主意。

<小时/>

如果您想对数据库进行批量写入,那么我建议使用简单的 List<T>缓冲项目。如果您必须在将项目写入数据库之前对其进行处理,请使用我上面展示的技术来处理它们。然后,将项目添加到列表中,而不是直接写入数据库。当列表达到 1,000 个项目或经过给定时间时,分配一个新列表并启动一个任务将旧列表写入数据库。像这样:

// at class scope

// Flush every 5 minutes.
private readonly TimeSpan FlushDelay = TimeSpan.FromMinutes(5);
private const int MaxBufferItems = 1000;

// Create a timer for the buffer flush.
System.Threading.Timer _flushTimer = new System.Threading.Timer(TimedFlush, FlushDelay.TotalMilliseconds, Timeout.Infinite);

// A lock for the list. Unless you're getting hundreds of thousands
// of items per second, this will not be a performance problem.
object _listLock = new Object();

List<BrokeredMessage> _recordBuffer = new List<BrokeredMessage>();

然后,在您的消费者中:

foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
// process the message
Console.WriteLine("Completing message");
msg.Complete();
lock (_listLock)
{
_recordBuffer.Add(msg);
if (_recordBuffer.Count >= MaxBufferItems)
{
// Stop the timer
_flushTimer.Change(Timeout.Infinite, Timeout.Infinite);

// Save the old list and allocate a new one
var myList = _recordBuffer;
_recordBuffer = new List<BrokeredMessage>();

// Start a task to write to the database
Task.Factory.StartNew(() => FlushBuffer(myList));

// Restart the timer
_flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite);
}
}
}

private void TimedFlush()
{
bool lockTaken = false;
List<BrokeredMessage> myList = null;

try
{
if (Monitor.TryEnter(_listLock, 0, out lockTaken))
{
// Save the old list and allocate a new one
myList = _recordBuffer;
_recordBuffer = new List<BrokeredMessage>();
}
}
finally
{
if (lockTaken)
{
Monitor.Exit(_listLock);
}
}

if (myList != null)
{
FlushBuffer(myList);
}

// Restart the timer
_flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite);
}

这里的想法是,您将旧列表移开,分配一个新列表以便处理可以继续,然后将旧列表的项目写入数据库。锁的作用是防止计时器和记录计数器相互干扰。如果没有锁,事情可能会在一段时间内看起来工作正常,然后你会在不可预测的时间遇到​​奇怪的崩溃。

我喜欢这个设计,因为它消除了消费者的轮询。我唯一不喜欢的是消费者必须知道计时器(即它必须停止然后重新启动计时器)。经过多一点思考,我可以消除这个要求。但它按照编写的方式运行良好。

关于c# - BrokeredMessage 调用 OnMessage() 后自动释放,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30467896/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com