gpt4 book ai didi

c# - 如何在 Windows 服务中实现连续的生产者-消费者模式

转载 作者:行者123 更新时间:2023-11-30 15:58:39 25 4
gpt4 key购买 nike

这是我正在尝试做的:

  • 在内存中保留需要处理的项目的队列(即 IsProcessed = 0)
  • 每 5 秒,从数据库中获取未处理的项目,如果它们不在队列中,则添加它们
  • 不断从队列中拉取项目,处理它们,每次处理一个项目时,在数据库中更新它(IsProcessed = 1)
  • “尽可能并行”地完成这一切

我的服务有一个构造函数,例如

public MyService()
{
Ticker.Elapsed += FillQueue;
}

我在服务启动时启动该计时器,如

protected override void OnStart(string[] args)
{
Ticker.Enabled = true;
Task.Run(() => { ConsumeWork(); });
}

我的FillQueue就像

private static async void FillQueue(object source, ElapsedEventArgs e)   
{
var items = GetUnprocessedItemsFromDb();
foreach(var item in items)
{
if(!Work.Contains(item))
{
Work.Enqueue(item);
}
}
}

我的ConsumeWork就像

private static void ConsumeWork()
{
while(true)
{
if(Work.Count > 0)
{
var item = Work.Peek();
Process(item);
Work.Dequeue();
}
else
{
Thread.Sleep(500);
}
}
}

然而,这可能是一个幼稚的实现,我想知道 .NET 是否有任何类型的类正是我在这种情况下所需要的。

最佳答案

尽管@JSteward 的回答是一个好的开始,但您可以使用 mixing up 对其进行改进TPL-Dataflow Rx.NET extensions ,因为数据流 block 很容易成为您数据的观察者,并且 Rx Timer这对你来说会更轻松 ( Rx.Timer explanation )。

我们可以调整MSDN article满足您的需求,像这样:

private const int EventIntervalInSeconds = 5;
private const int DueIntervalInSeconds = 60;

var source =
// sequence of Int64 numbers, starting from 0
// https://msdn.microsoft.com/en-us/library/hh229435.aspx
Observable.Timer(
// fire first event after 1 minute waiting
TimeSpan.FromSeconds(DueIntervalInSeconds),
// fire all next events each 5 seconds
TimeSpan.FromSeconds(EventIntervalInSeconds))
// each number will have a timestamp
.Timestamp()
// each time we select some items to process
.SelectMany(GetItemsFromDB)
// filter already added
.Where(i => !_processedItemIds.Contains(i.Id));

var action = new ActionBlock<Item>(ProcessItem, new ExecutionDataflowBlockOptions
{
// we can start as many item processing as processor count
MaxDegreeOfParallelism = Environment.ProcessorCount,
});

IDisposable subscription = source.Subscribe(action.AsObserver());

此外,您对已处理项目的检查不太准确,因为有可能项目在您完成处理时从数据库中被选为未处理,但没有在数据库中更新它.在这种情况下,项目将从 Queue<T> 中删除,然后由制作人再次添加到那里,这就是我添加 ConcurrentBag<T> 的原因对于此解决方案(HashSet<T> 不是线程安全的):

private static async Task ProcessItem(Item item)
{
if (_processedItemIds.Contains(item.Id))
{
return;
}

_processedItemIds.Add(item.Id);
// actual work here

// save item as processed in database

// we need to wait to ensure item not to appear in queue again
await Task.Delay(TimeSpan.FromSeconds(EventIntervalInSeconds * 2));

// clear the processed cache to reduce memory usage
_processedItemIds.Remove(item.Id);
}

public class Item
{
public Guid Id { get; set; }
}

// temporary cache for items in process
private static ConcurrentBag<Guid> _processedItemIds = new ConcurrentBag<Guid>();

private static IEnumerable<Item> GetItemsFromDB(Timestamped<long> time)
{
// log event timing
Console.WriteLine($"Event # {time.Value} at {time.Timestamp}");

// return items from DB
return new[] { new Item { Id = Guid.NewGuid() } };
}

您可以通过其他方式实现缓存清理,例如,启动“GC”计时器,它会定期从缓存中删除已处理的项目。

要停止事件和处理项目,您应该 Dispose订阅,也许,Complete ActionBlock :

subscription.Dispose();
action.Complete();

您可以找到有关 Rx.Net 的更多信息在他们的 guidelines on github .

关于c# - 如何在 Windows 服务中实现连续的生产者-消费者模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42918401/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com