gpt4 book ai didi

c# - AsyncLocal 值与 TPL 数据流不正确

转载 作者:行者123 更新时间:2023-11-30 12:19:25 30 4
gpt4 key购买 nike

考虑这个例子:

class Program

{
private static readonly ITargetBlock<string> Mesh = CreateMesh();
private static readonly AsyncLocal<string> AsyncLocalContext
= new AsyncLocal<string>();

static async Task Main(string[] args)
{
var tasks = Enumerable.Range(1, 4)
.Select(ProcessMessage);
await Task.WhenAll(tasks);

Mesh.Complete();
await Mesh.Completion;

Console.WriteLine();
Console.WriteLine("Done");
}

private static async Task ProcessMessage(int number)
{
var param = number.ToString();
using (SetScopedAsyncLocal(param))
{
Console.WriteLine($"Before send {param}");
await Mesh.SendAsync(param);
Console.WriteLine($"After send {param}");
}
}

private static IDisposable SetScopedAsyncLocal(string value)
{
AsyncLocalContext.Value = value;

return new Disposer(() => AsyncLocalContext.Value = null);
}

private static ITargetBlock<string> CreateMesh()
{
var blockOptions = new ExecutionDataflowBlockOptions
{
BoundedCapacity = DataflowBlockOptions.Unbounded,
EnsureOrdered = false,
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
};

var block1 = new TransformBlock<string, string>(async input =>
{
await Task.Yield();
Console.WriteLine(
$" Block1 [thread {Thread.CurrentThread.ManagedThreadId}]" +
$" Input: {input} - Context: {AsyncLocalContext.Value}.");

return input;
}, blockOptions);

var block2 = new TransformBlock<string, string>(async input =>
{
await Task.Yield();
Console.WriteLine(
$" Block2 [thread {Thread.CurrentThread.ManagedThreadId}]" +
$" Input: {input} - Context: {AsyncLocalContext.Value}.");

return input;
}, blockOptions);

var block3 = new ActionBlock<string>(async input =>
{
await Task.Yield();
Console.WriteLine(
$" Block3 [thread {Thread.CurrentThread.ManagedThreadId}]" +
$" Input: {input} - Context: {AsyncLocalContext.Value}.");
}, blockOptions);

var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};

block1.LinkTo(block2, linkOptions);
block2.LinkTo(block3, linkOptions);

return new EncapsulatedActionBlock<string>(block1, block3.Completion);
}
}

internal class EncapsulatedActionBlock<T> : ITargetBlock<T>
{
private readonly ITargetBlock<T> _wrapped;

public EncapsulatedActionBlock(ITargetBlock<T> wrapped, Task completion)
{
_wrapped = wrapped;
Completion = completion;
}

public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
T messageValue, ISourceBlock<T> source, bool consumeToAccept) =>
_wrapped.OfferMessage(messageHeader, messageValue, source, consumeToAccept);

public void Complete() => _wrapped.Complete();

public void Fault(Exception exception) => _wrapped.Fault(exception);

public Task Completion { get; }
}

internal class Disposer : IDisposable
{
private readonly Action _disposeAction;

public Disposer(Action disposeAction)
{
_disposeAction = disposeAction
?? throw new ArgumentNullException(nameof(disposeAction));
}

public void Dispose()
{
_disposeAction();
}
}

执行结果如下:

Before send 1After send 1Before send 2After send 2Before send 3After send 3Before send 4After send 4   Block1 [thread 9] Input: 3 - Context: 3.   Block1 [thread 10] Input: 2 - Context: 1.   Block1 [thread 8] Input: 4 - Context: 4.   Block1 [thread 11] Input: 1 - Context: 2.   Block2 [thread 9] Input: 2 - Context: 3.   Block2 [thread 7] Input: 1 - Context: 2.   Block2 [thread 10] Input: 3 - Context: 3.   Block2 [thread 8] Input: 4 - Context: 4.   Block3 [thread 11] Input: 4 - Context: 4.   Block3 [thread 7] Input: 1 - Context: 2.   Block3 [thread 9] Input: 3 - Context: 3.   Block3 [thread 4] Input: 2 - Context: 3.Done

如您所见,在移动到第二个 TDF block 后,传递的上下文值和存储的上下文值并不总是相同。此行为搞砸了多个日志记录框架的 LogContext 功能用法。

  1. 这是预期的行为吗(请解释原因)?
  2. TPL 数据流是否以某种方式扰乱了执行上下文?

最佳答案

要了解发生了什么,您必须了解数据流 block 的工作原理。它们内部没有阻塞的线程,等待消息到达。处理由工作任务完成。让我们考虑 MaxDegreeOfParallelism = 1 的简单(默认)情况。最初有零个工作任务。使用 SendAsync 异步发布消息时,发布消息的同一任务成为工作任务并开始处理消息。如果在处理第一条消息时发布另一条消息,则会发生其他情况。第二条消息将在 block 的输入队列中排队,发布它的任务将完成。第二条消息将由处理第一条消息的工作任务处理。只要队列中有消息入队,初始工作任务就会挑选它们并一条一条处理。如果在某个时刻没有更多的缓冲消息,工作任务将完成, block 将返回到它的初始状态(零工作任务)。下一个 SendAsync 将成为新的辅助任务,依此类推。使用 MaxDegreeOfParallelism = 1,在任何给定时刻只能存在一个工作任务。

让我们用一个例子来证明这一点。下面是一个ActionBlock以延迟 X 馈送,并以延迟 Y 处理每条消息。

private static void ActionBlockTest(int sendDelay, int processDelay)
{
Console.WriteLine($"SendDelay: {sendDelay}, ProcessDelay: {processDelay}");
var asyncLocal = new AsyncLocal<int>();
var actionBlock = new ActionBlock<int>(async i =>
{
await Task.Delay(processDelay);
Console.WriteLine($"Processed {i}, Context: {asyncLocal.Value}");
});
Task.Run(async () =>
{
foreach (var i in Enumerable.Range(1, 5))
{
asyncLocal.Value = i;
await actionBlock.SendAsync(i);
await Task.Delay(sendDelay);
}
}).Wait();
actionBlock.Complete();
actionBlock.Completion.Wait();
}

让我们看看如果我们快速发送消息并缓慢处理它们会发生什么:

ActionBlockTest(100, 200); // .NET Core 3.0

SendDelay: 100, ProcessDelay: 200
Processed 1, Context: 1
Processed 2, Context: 1
Processed 3, Context: 1
Processed 4, Context: 1
Processed 5, Context: 1

AsyncLocal上下文保持不变,因为同一个工作任务处理了所有消息。

现在让我们慢慢发送消息并快速处理它们:

ActionBlockTest(200, 100); // .NET Core 3.0

SendDelay: 200, ProcessDelay: 100
Processed 1, Context: 1
Processed 2, Context: 2
Processed 3, Context: 3
Processed 4, Context: 4
Processed 5, Context: 5

AsyncLocal 上下文对于每条消息都是不同的,因为每条消息都由不同的工作任务处理。

这个故事的道德教训是,每个 SendAsync 都不能保证创建一个异步工作流来跟踪消息,直到它的旅程结束,到达管道的末端。因此 AsyncLocal 类不能用于保存每条消息的环境数据。

关于c# - AsyncLocal 值与 TPL 数据流不正确,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58179359/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com