gpt4 book ai didi

c# - 使用 IEnumerable 源进行分区

转载 作者:太空宇宙 更新时间:2023-11-03 12:23:17 24 4
gpt4 key购买 nike

我有 IProducerConsumerCollection 类型的 ConcurrentQueue 即

IProducerConsumerCollection<Job> _queue = new ConcurrentQueue<Job>();

生产者方法将作业添加到_queue,消费者方法处理来自_queue 的作业。现在在消费者方法中,我喜欢并发处理作业。下面是带有生产者和消费者方法的示例类的代码:

public class TestQueue
{
IProducerConsumerCollection<Job> _queue = new ConcurrentQueue<Job>();
private static HttpClient _client = new HttpClient();

public TestQueue()
{
WorkProducerThread();
WorkConsumerThread();
}

public void WorkConsumerThread()
{
if (_queue.Count > 0)
{
//At this point, 4 partitions are created but all records are in 1st partition only; 2,3,4 partition are empty
var partitioner = Partitioner.Create(_queue).GetPartitions(4);

Task t = Task.WhenAll(
from partition in partitioner
select Task.Run(async () =>
{
using (partition)
{
while (partition.MoveNext())
await CreateJobs(partition.Current);
}
}));

t.Wait();

//At this point, queue count is still 20, how to remove item from _queue collection when processed?
}
}

private async Task CreateJobs(Job job)
{
HttpContent bodyContent = null;
await _client.PostAsync("job", bodyContent);
}



public void WorkProducerThread()
{
if (_queue.Count == 0)
{
try
{
for (int i = 0; i < 20; i++)
{
Job job = new Job { Id = i, JobName = "j" + i.ToString(), JobCreated = DateTime.Now };
_queue.TryAdd(job);
}
}
catch (Exception ex)
{
//_Log.Error("Exception while adding jobs to collection", ex);
}
}
}

}

public class Job
{
public int Id { get; set; }
public string JobName { get; set; }
public DateTime JobCreated { get; set; }
}

有2个问题,

  1. Partitioner.Create(_queue).GetPartitions(4); Partitioner.GetPartions 创建 4 个分区,但所有记录仅在第一个分区中; 2、3、4分区是空的。我找不到,为什么会这样?理想情况下,所有 4 个分区都应该有 5 个记录(因为队列中总共有 20 个记录)。我读了this MSDN 上关于分区的文章,但没有得到任何线索。我还检查了 this 中的分区示例文章。

  2. 此外,我想在使用消费者方法处理后从 _queue 中删除项目,而只有一种方法 _queue.TryTake 方法可以删除项目。我不知道如何随着分区删除项目?

我可以考虑任何替代方法来实现相同的结果。

提前致谢。

最佳答案

Partitioner.Create(_queue).GetPartitions(4); Partitioner.GetPartions creates 4 partitions but all records are in 1st partition only; 2,3,4 partition are empty.

这是不正确的,您的队列条目已正确分区。要验证,请稍微更改您的处理逻辑以记录正在执行工作的分区:

Task t = Task.WhenAll(
from partition in partitioner.Select((jobs, i) => new { jobs, i })
select Task.Run(async () =>
{
using (partition.jobs)
{
while (partition.jobs.MoveNext())
{
Console.WriteLine(partition.i);
await CreateJobs(partition.jobs.Current);
}
}
}));

您会注意到 Console.WriteLine将从 0 写入值至 3 - 表明它们被正确分区。

Also, I want to remove the item from _queue after processing in consumer method and there is only one way _queue.TryTake method to remove item. I don't know how to remove item along with partitioning?

您可以通过稍微重写来实现。主要变化是切换到 BlockingCollection并通过添加 this NuGet package授予对 GetConsumingPartitioner 的访问权限.

试一试:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;

namespace Test
{
public class TestQueue
{
BlockingCollection<Job> _queue = new BlockingCollection<Job>();
private static HttpClient _client = new HttpClient();

public TestQueue()
{
WorkProducerThread();
WorkConsumerThread();
}

public void WorkConsumerThread()
{
if (!_queue.IsCompleted)
{
//At this point, 4 partitions are created but all records are in 1st partition only; 2,3,4 partition are empty
var partitioner = _queue.GetConsumingPartitioner().GetPartitions(4);

Task t = Task.WhenAll(
from partition in partitioner
select Task.Run(async () =>
{
using (partition)
{
while (partition.MoveNext())
await CreateJobs(partition.Current);
}
}));


t.Wait();

Console.WriteLine(_queue.Count);
}
}

private async Task CreateJobs(Job job)
{
//HttpContent bodyContent = null;
//await _client.PostAsync("job", bodyContent);
await Task.Delay(100);
}



public void WorkProducerThread()
{
if (_queue.Count == 0)
{
try
{
for (int i = 0; i < 20; i++)
{
Job job = new Job { Id = i, JobName = "j" + i.ToString(), JobCreated = DateTime.Now };
_queue.TryAdd(job);
}

_queue.CompleteAdding();
}
catch (Exception ex)
{
//_Log.Error("Exception while adding jobs to collection", ex);
}
}
}

}

public class Job
{
public int Id { get; set; }
public string JobName { get; set; }
public DateTime JobCreated { get; set; }
}
class Program
{
static void Main(string[] args)
{
var g = new TestQueue();

Console.ReadLine();
}
}
}

关于c# - 使用 IEnumerable 源进行分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46345081/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com