gpt4 book ai didi

c# - 如何分块化 IEnumerable,而不会在失败时丢失/丢弃项目?

转载 作者:行者123 更新时间:2023-12-05 01:50:29 26 4
gpt4 key购买 nike

我有一个生产者-消费者场景,其中生产者是一个可枚举的项目序列 ( IEnumerable<Item> )。我想以每 10 个项目的 block /批处理这些项目。所以我决定使用新的 (.NET 6) Chunk LINQ 运算符,如本问题中所建议:Create batches in LINQ .

我的问题是,有时生产者会失败,在这种情况下,分 block 序列的消费者会收到错误,而不会先收到包含错误之前生成的最后项目的 block 。因此,例如,如果生产者生成 15 个项目然后失败,则消费者将获得一个包含项目 1-10 的 block ,然后将出现异常。第 11-15 项将丢失!这是一个演示这种不良行为的最小示例:

static IEnumerable<int> Produce()
{
int i = 0;
while (true)
{
i++;
Console.WriteLine($"Producing #{i}");
yield return i;
if (i == 15) throw new Exception("Oops!");
}
}

// Consume
foreach (int[] chunk in Produce().Chunk(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}

输出:

Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()

Online demo .

理想的行为是获取一个值为 [11, 12, 13, 14, 15] 的 block 在获得异常之前。

我的问题是:有什么方法可以配置 Chunk运算符,以便它优先发出数据而不是异常?如果没有,我如何实现自定义 LINQ 运算符,例如命名为 ChunkNonDestructive , 具有理想的行为?

public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
this IEnumerable<TSource> source, int size);

注意: System.Linq.Chunk 除外运算符我还尝试了 Buffer来自 System.Interactive 的运算符(operator)包,以及 Batch 来自 MoreLinq 的运算符(operator)包裹。显然,它们的行为都相同(破坏性)。


更新:这是上述示例的理想输出:

Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Consumed: [11, 12, 13, 14, 15]
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()

区别在于 Consumed: [11, 12, 13, 14, 15] 行,实际输出中不存在。

最佳答案

如果您对源代码进行预处理以使其在遇到异常时停止,那么您可以按原样使用 Chunk()

public static class Extensions
{
public static IEnumerable<T> UntilFirstException<T>(this IEnumerable<T> source, Action<Exception> exceptionCallback = null)
{
var enumerator = source.GetEnumerator();
while(true)
{
try
{
if(!enumerator.MoveNext())
{
break;
}
} catch (Exception e) {
exceptionCallback?.Invoke(e);
break;
}
yield return enumerator.Current;
}
}
}
    Exception? e = null;
foreach (int[] chunk in Produce().UntilFirstException(thrown => e = thrown).Chunk(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}

我觉得这样可以很好地区分责任。如果您想要一个抛出异常的助手而不是必须自己捕获它,您可以将其用作一个组件来简化该助手的编写:

    public static IEnumerable<T[]> ChunkUntilFirstException<T>(this IEnumerable<T> source, int size)
{
Exception? e = null;
var result = source.UntilFirstException(thrown => e = thrown).Chunk(size);
foreach (var element in result)
{
yield return element;
}
if (e != null)
{
throw new InvalidOperationException("source threw an exception", e);
}
}

请注意,这将抛出与生产者发出的异常不同的异常。这使您可以保留与原始异常相关联的堆栈跟踪,而 throw e 会覆盖该堆栈跟踪。

您可以根据自己的需要进行调整。如果您需要捕获您希望生产者发出的特定类型的异常,可以很容易地使用 when 上下文关键字和一些模式匹配。

    try
{
foreach (int[] chunk in Produce().ChunkUntilFirstException(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}
}
catch (InvalidOperationException e) when (e.InnerException is {Message: "Oops!"})
{
Console.WriteLine(e.InnerException.ToString());
}

关于c# - 如何分块化 IEnumerable<T>,而不会在失败时丢失/丢弃项目?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73056639/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com