gpt4 book ai didi

c# - 用于 TPL 数据流的 BroadcastCopyBlock 保证交付

转载 作者:行者123 更新时间:2023-11-30 20:33:07 25 4
gpt4 key购买 nike

对于 TPL 数据流中 BroadcastCopyBlock 的以下实现,我很高兴提供一些输入,它将接收到的消息复制给所有注册到 BroadcastCopyBlock 的消费者,并且保证传递给所有消费者,这些消费者在收到消息时链接到 block 。 (与不保证消息传递的 BroadcastBlock 不同,如果下一个消息进来,则在前一个消息已传递给所有消费者之前)。

我主要关心的是消息的保留和保留的释放。如果接收 block 决定不处理消息,会发生什么?我的理解是,这会造成内存泄漏,因为消息将无限期保留。我在想,我应该以某种方式将消息标记为未使用,但我不确定如何标记。我在考虑一些人工消息接收器(没有任何操作的 ActionBlock),或者我可以将消息标记为已丢弃吗?

我们也欢迎您提供有关实现的进一步意见。

这可能几乎是以下问题的重复,但我更愿意使用我自己的类,而不是创建 block 的方法。或者这会被认为是糟糕的风格吗?
BroadcastBlock with Guaranteed Delivery in TPL Dataflow

/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
private ITargetBlock<T> In { get; }

/// <summary>
/// Holds a TransformBlock for each target, that subscribed to this block
/// </summary>
private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks = new Dictionary<ITargetBlock<T>, TransformBlock<T, T>>();


public BrodcastCopyBlock()
{
In = new ActionBlock<T>(message => Process(message));

In.Completion.ContinueWith(task =>
{
if (task.Exception == null)
Complete();
else
Fault(task.Exception);
}
);
}

/// <summary>
/// Creates a transform source block for the passed target.
/// </summary>
/// <param name="target"></param>
private void CreateOutBlock(ITargetBlock<T> target)
{
if (_OutBlocks.ContainsKey(target))
return;

var outBlock = new TransformBlock<T, T>(e => e);
_OutBlocks[target] = outBlock;
}

private void Process(T message)
{
foreach (var outBlock in _OutBlocks.Values)
{
outBlock.Post(message);
}
}

/// <inheritdoc />
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return In.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}

/// <inheritdoc />
public void Complete()
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Complete();
}
}

/// <inheritdoc />
public void Fault(Exception exception)
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Fault(exception);
}
}

/// <inheritdoc />
public Task Completion => Task.WhenAll(_OutBlocks.Select(b => b.Value.Completion));

/// <inheritdoc />
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
CreateOutBlock(target);
return _OutBlocks[target].LinkTo(target, linkOptions);
}

/// <inheritdoc />
public T ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ConsumeMessage(messageHeader, target, out messageConsumed);
}

/// <inheritdoc />
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ReserveMessage(messageHeader, target);
}

/// <inheritdoc />
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
((ISourceBlock<T>)_OutBlocks[target]).ReleaseReservation(messageHeader, target);
}
}

最佳答案

TL/DR
您的实现使用 Post 内部的 ActionBlock 方法,如果目标拒绝消息,它仍然会丢失数据,切换到 SendAsync 一个,并且,您可能不需要实现所有这些方法,您只需要 ITargetBlock<in TInput> 接口(interface)实现。


在回到您的主要问题之前,我想澄清一些事情。我认为您对 TPL Dataflow 库中的一些选项感到困惑,我想在这里稍微解释一下。您所说的 The first consumer, which receives the message, deletes it from the queue 行为与 BroadcastBlock 无关,它与为 ISourceBlock 链接的多个消费者有关,例如 BufferBlock :

var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => {});
var consumer2 = new ActionBlock<int>(i => { Console.WriteLine(i); });
buffer.LinkTo(consumer1);
buffer.LinkTo(consumer2);
// this one will go only for one consumer, no console output present
buffer.Post(1);

BroadcastBlock 所做的正是您所说的,请考虑以下代码:

private static void UnboundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Unbounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Unbounded Block: {i}");
});
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}

输出将是

FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2

但是,这只能在传入数据的速度小于处理数据的速度时才能完成,因为在其他情况下,您的内存会因为缓冲区增长而很快结束,正如您在问题中所述。让我们看看如果我们使用 ExecutionDataflowBlockOptions 来限制慢 block 的传入数据缓冲区会发生什么:

private static void BoundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Bounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Bounded Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}

输出将是

FAST Bounded Block: 0
FAST Bounded Block: 1
FAST Bounded Block: 2
SLOW Bounded Block: 0
SLOW Bounded Block: 1

如您所见,我们的慢 block 丢失了最后一条消息,这不是我们要找的。原因是默认情况下 BroadcastBlock 使用 Post 方法传递消息。根据 official Intro Document :

  • Post
    • An extension method that asynchronously posts to the target block. It returns immediately whether the data could be accepted or not, and it does not allow for the target to consume the message at a later time.
  • SendAsync
    • An extension method that asynchronously sends to target blocks while supporting buffering. A Post operation on a target is asynchronous, but if a target wants to postpone the offered data, there is nowhere for the data to be buffered and the target must instead be forced to decline. SendAsync enables asynchronous posting of the data with buffering, such that if a target postpones, it will later be able to retrieve the postponed data from the temporary buffer used for this one asynchronously posted message.

因此,这种方法可以帮助我们完成任务,让我们介绍一些包装器 ActionBlock ,它完全符合我们的要求 - SendAsync 用于我们的真实处理器的数据:

private static void BoundedWrapperInfiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Wrapper Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));

broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}

输出将是

FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2

但是这种等待永远不会结束——我们的基本包装器不会传播链接 block 的完成,并且 ActionBlock 不能链接到任何东西。我们可以尝试等待包装完成:

private static void BoundedWrapperFiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST finite Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW finite Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowActionWrapper.Completion.Wait();
}

输出将是

FAST finite Block: 0
FAST finite Block: 1
FAST finite Block: 2
SLOW finite Block: 0

这绝对不是我们想要的 - ActionBlock 完成了所有工作,并且不会等待最后一条消息的发布。此外,我们甚至没有看到第二条消息,因为我们在 Sleep 方法结束之前就退出了该方法!因此,您肯定需要自己的实现。

现在,最后,关于您的代码的一些想法:

  1. 您不需要实现如此大量的方法 - 您的包装器将用作 ITargetBlock<in TInput> ,因此只需实现该接口(interface)。
  2. 您的实现使用 Post 中的 ActionBlock 方法,如我们所见,如果消费者方面出现某些问题,这可能会导致数据丢失。请考虑使用 SendAsync 方法。
  3. 在之前的更改之后,您应该衡量数据流的性能 - 如果您有很多异步等待数据传送,您可能会看到性能和/或内存问题。这应该通过 linked documentation 中讨论的一些高级设置来解决。
  4. 您对 Completion 任务的实现实际上颠倒了数据流的顺序 - 您正在等待目标完成,我认为这不是好的做法 - 您可能应该为数据流创建一个结束 block (这可能是甚至 NullTarget block ,它只是同步地丢弃传入的消息),并等待它完成。

关于c# - 用于 TPL 数据流的 BroadcastCopyBlock 保证交付,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40605301/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com