gpt4 book ai didi

C#对许多相对较大的对象进行垃圾回收

转载 作者:行者123 更新时间:2023-12-03 23:42:38 27 4
gpt4 key购买 nike

我有几个进程轮询不同的数据源以获取某种特定类型的信息。他们经常轮询它并在后台进行,所以当我需要这些信息时,它随时可用,不需要会浪费时间的往返。
示例代码如下所示:

public class JournalBackgroundPoller
{
private readonly int _clusterSize;

private readonly IConfiguration _configuration;

Dictionary<int, string> _journalAddresses;
private readonly Random _localRandom;
private readonly Task _runHolder;

internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();

public JournalBackgroundPoller(IConfiguration configuration)
{
_localRandom = new Random();

_configuration = configuration;
_clusterSize = 20;//for the sake of demo

_journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};

_runHolder = BuildAndRun();
}

private Task BuildAndRun()
{
var pollingTasks = new List<Task>();
var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);

PopulateShardsRegistry();

foreach (var js in _journalAddresses)
{
var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });

var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });

buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);

dataProcessor.LinkTo(dataStorer);
dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());

pollingTasks.Add(PollInfinitely(js, buffer));
}

var r = Task.WhenAll(pollingTasks);
return r;
}

private void PopulateShardsRegistry()
{
try
{
for (int i = 0; i < _clusterSize; i++)
{
var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
}
}
catch (Exception e)
{
Console.WriteLine("Could `t initialize shards registry");
}
}

private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
{
while (true)
{
try
{
//here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
var journalEntries = new List<JournalEntryResponseItem>(200000);

buffer.Post(
new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = journalEntries });
}
catch (Exception ex)
{
Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data");
buffer.Post(
new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() });
}

await Task.Delay(_localRandom.Next(400, 601));
}
}

private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input)
{
try
{
if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any())
{
return input;
}

foreach (var journalEntry in input.JournalEntryResponseItems)
{
//do some transformations here
}

return input;
}
catch (Exception ex)
{
Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
return null;
}
}

private void StoreValuesInBuffer(JournalResponsesWrapper input)
{
try
{
ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
}
catch (Exception ex)
{
Console.WriteLine($"Could not write content to dictionary");
}
}
}
为简单起见,期刊相关实体将如下所示:
class JournalEntryResponseItem
{
public string SomeProperty1 { get; set; }

public string SomeProperty2 { get; set; }
}

class JournalResponsesWrapper
{
public KeyValuePair<int, string> JournalDataSource { get; set; }

public List<JournalEntryResponseItem> JournalEntryResponseItems { get; set; }
}
提供的代码的全局问题显然是我正在创建相对大量的对象,这些对象可能会在短时间内以 LOH 结束。数据源总是提供最新的条目,所以我不需要保留旧的条目(我也不能这样做,因为它们没有区别)。我的问题是是否可以优化内存使用、对象创建和替换往返,以便减少垃圾收集的频率?现在,垃圾收集每约 5-10 秒发生一次。
UPD 1:我通过 ResultsBuffer 访问数据并且可以在刷新之前多次读取相同的集合。不能保证一个特定的数据集只会被读取一次(或根本不会被读取)。我的大对象是 List<JournalEntryResponseItem>实例,最初来自数据源,然后保存到 ResultsBuffer .
UPD 2:数据源只有一个端点,一次返回这个“分片”中的所有实体,我无法在请求期间应用过滤。响应实体没有唯一的键/标识符。
UPD 3:一些答案建议先衡量/分析应用程序。虽然在这种特殊情况下这是完全有效的建议,但由于以下观察结果,它显然与内存/GC 相关:
  • 视觉节流恰好发生在应用程序 RAM 消耗在稳定增长一段时间后急剧下降的那一刻。
  • 如果我再添加 X 个日志源,应用程序的内存将增长,直到它占用服务器上的所有可用内存,然后卡住时间更长(1-3 秒),之后内存急剧下降,应用程序继续工作,直到达到内存限制再次。
  • 最佳答案

    身后List<T>总有一个 T[]连续项目,将其标注为 200000 肯定会直接将其放入 LOH。为了避免这种情况,我建议使用简单的逻辑分区而不是物理维度和 Post分批列出。这样在每次轮询期间,巨大的列表将转到 LOH,但会在下一个 GC 第 2 代集合中收集(请确保没有更多引用)。 LOH 几乎为空,但是由于托管堆中发生的添加复制操作,GC Generation 2 收集将比以前更多。这是一个小的变化,我提供了新的 JournalBackgroundPoller类(class):

    public class JournalBackgroundPoller
    {
    private readonly int _clusterSize;

    private readonly IConfiguration _configuration;

    Dictionary<int, string> _journalAddresses;
    private readonly Random _localRandom;
    private readonly Task _runHolder;

    internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();

    public JournalBackgroundPoller(IConfiguration configuration)
    {
    _localRandom = new Random();

    _configuration = configuration;
    _clusterSize = 20;//for the sake of demo

    // _journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};
    _journalAddresses = new Dictionary<int, string>
    {
    { 1, "SOME ADDR1" },
    { 2, "SOME ADDR 2" }
    };

    _runHolder = BuildAndRun();
    }

    private Task BuildAndRun()
    {
    var pollingTasks = new List<Task>();
    var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);

    PopulateShardsRegistry();

    foreach (var js in _journalAddresses)
    {
    var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
    new ExecutionDataflowBlockOptions
    { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });

    var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });

    buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);

    dataProcessor.LinkTo(dataStorer);
    dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());

    pollingTasks.Add(PollInfinitely(js, buffer));
    }

    var r = Task.WhenAll(pollingTasks);
    return r;
    }

    private void PopulateShardsRegistry()
    {
    try
    {
    for (int i = 0; i < _clusterSize; i++)
    {
    var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
    }
    }
    catch (Exception e)
    {
    Console.WriteLine("Could `t initialize shards registry");
    }
    }

    private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
    {
    while (true)
    {
    try
    {
    //here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
    var journalEntries = new List<JournalEntryResponseItem>(200000);

    // NOTE:
    // We need to avoid references to the huge list so GC collects it ASAP in the next
    // generation 2 collection: after that, nothing else goes to the LOH.
    const int PartitionSize = 1000;
    for (var index = 0; index < journalEntries.Count; index += PartitionSize)
    {
    var journalEntryResponseItems = journalEntries.GetRange(index, PartitionSize);
    buffer.Post(
    new JournalResponsesWrapper
    {
    JournalDataSource = dataSourceInfo,
    JournalEntryResponseItems = journalEntryResponseItems
    });
    }
    }
    catch (Exception ex)
    {
    Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data");
    buffer.Post(
    new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() });
    }

    await Task.Delay(_localRandom.Next(400, 601));
    }
    }

    private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input)
    {
    try
    {
    if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any())
    {
    return input;
    }

    foreach (var journalEntry in input.JournalEntryResponseItems)
    {
    //do some transformations here
    }

    return input;
    }
    catch (Exception ex)
    {
    Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
    return null;
    }
    }

    private void StoreValuesInBuffer(JournalResponsesWrapper input)
    {
    try
    {
    ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
    }
    catch (Exception ex)
    {
    Console.WriteLine($"Could not write content to dictionary");
    }
    }
    }

    请看一下 30 秒后原始内存使用情况的快照
    Before the optimization
    这是30秒后优化内存使用的快照
    After the optimization
    注意区别
  • 稀疏数组 :JournalEntryResponseItem[]从浪费的 1,600,000 和长度 200,000 到没有。
  • LOH 用法 : 从 3.05 MB 到没有。
  • 关于C#对许多相对较大的对象进行垃圾回收,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64878714/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com