gpt4 book ai didi

c# - 防止 BroadcastBlock 在 LinkTo 上发送缓冲消息

转载 作者:行者123 更新时间:2023-12-04 13:34:16 25 4
gpt4 key购买 nike

给定一个 BroadcastBlock如果缓冲区中有一条消息,是否可以防止将该消息发送到新链接的目标?例如:

static void Main(string[] args)
{
var myBroadcastBlock = new BroadcastBlock<string>(msg => msg);
var myActionBlock = new ActionBlock<string>(msg => Console.WriteLine(msg));

myBroadcastBlock.Post("Hello World!"); // No linked targets here.

myBroadcastBlock.LinkTo(myActionBlock); // Link a target.

// etc.
}
此代码将打印“Hello World”。基本上, BroadcastBlock仍然会将缓冲的消息发送到 ActionBlock.LinkTo ,尽管该消息是在建立链接之前发布的。
是否有内置方法可以防止这种行为?我只希望将消息发送到当前链接,而不是将来的链接。
我正在使用 System.Threading.Tasks.Dataflow 4.11.1

最佳答案

使用内置 BroadcastBlock 无法实现此行为类(class)。它的行为是不可配置的。如果您迫切需要这种行为,您可以尝试下面的实现。它使用内部 BroadcastBlock<(T, long)>具有随每条新消息递增的索引,以便在链接期间可以过滤掉当前事件的消息。BroadcastBlockNewOnly里面有很多间接的class,因为需要翻译自T(T, long)并返回 T .这使得该类难以维护,而且效率也不高。每收到一条消息都会分配一个新对象,为垃圾收集器创建更多工作,因此请谨慎使用此类。

public class BroadcastBlockNewOnly<T> : ITargetBlock<T>, ISourceBlock<T>
{
private readonly IPropagatorBlock<(T, long), (T, long)> _broadcastBlock;
private long _index;

public BroadcastBlockNewOnly(Func<T, T> cloningFunction,
DataflowBlockOptions dataflowBlockOptions = null)
{
if (cloningFunction == null)
throw new ArgumentNullException(nameof(cloningFunction));
_broadcastBlock = new BroadcastBlock<(T, long)>(entry =>
{
var (value, index) = entry;
return (cloningFunction(value), index);
}, dataflowBlockOptions ?? new DataflowBlockOptions());
}

public Task Completion => _broadcastBlock.Completion;
public void Complete() => _broadcastBlock.Complete();
void IDataflowBlock.Fault(Exception ex) => _broadcastBlock.Fault(ex);

public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
if (target == null) throw new ArgumentNullException(nameof(target));
var currentIndex = Interlocked.CompareExchange(ref _index, 0, 0);
var linkedTargetProxy = new LinkedTargetProxy(target, this, currentIndex);
return _broadcastBlock.LinkTo(linkedTargetProxy, linkOptions);
}

private long GetNewIndex() => Interlocked.Increment(ref _index);

DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader header,
T value, ISourceBlock<T> source, bool consumeToAccept)
{
var sourceProxy = source != null ?
new SourceProxy(source, this, GetNewIndex) : null;
return _broadcastBlock.OfferMessage(header, (value, GetNewIndex()),
sourceProxy, consumeToAccept);
}

T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<T> target, out bool messageConsumed)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
var (value, index) = _broadcastBlock.ConsumeMessage(header, targetProxy,
out messageConsumed);
return value;
}

bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<T> target)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
return _broadcastBlock.ReserveMessage(header, targetProxy);
}

void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<T> target)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
_broadcastBlock.ReleaseReservation(header, targetProxy);
}

private class LinkedTargetProxy : ITargetBlock<(T, long)>
{
private readonly ITargetBlock<T> _realTarget;
private readonly ISourceBlock<T> _realSource;
private readonly long _indexLimit;

public LinkedTargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource,
long indexLimit)
{
_realTarget = realTarget;
_realSource = realSource;
_indexLimit = indexLimit;
}

DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
DataflowMessageHeader header, (T, long) messageValue,
ISourceBlock<(T, long)> source, bool consumeToAccept)
{
var (value, index) = messageValue;
if (index <= _indexLimit) return DataflowMessageStatus.Declined;
return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
}

Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => _realTarget.Complete();
void IDataflowBlock.Fault(Exception ex) => _realTarget.Fault(ex);
}

private class SourceProxy : ISourceBlock<(T, long)>
{
private readonly ISourceBlock<T> _realSource;
private readonly ITargetBlock<T> _realTarget;
private readonly Func<long> _getNewIndex;

public SourceProxy(ISourceBlock<T> realSource, ITargetBlock<T> realTarget,
Func<long> getNewIndex)
{
_realSource = realSource;
_realTarget = realTarget;
_getNewIndex = getNewIndex;
}

(T, long) ISourceBlock<(T, long)>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<(T, long)> target, out bool messageConsumed)
{
var value = _realSource.ConsumeMessage(header, _realTarget,
out messageConsumed);
var newIndex = _getNewIndex();
return (value, newIndex);
}

bool ISourceBlock<(T, long)>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<(T, long)> target)
{
return _realSource.ReserveMessage(header, _realTarget);
}

void ISourceBlock<(T, long)>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<(T, long)> target)
{
_realSource.ReleaseReservation(header, _realTarget);
}

Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => throw new NotSupportedException();
void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
IDisposable ISourceBlock<(T, long)>.LinkTo(ITargetBlock<(T, long)> target,
DataflowLinkOptions linkOptions) => throw new NotSupportedException();
}

private class TargetProxy : ITargetBlock<(T, long)>
{
private readonly ITargetBlock<T> _realTarget;
private readonly ISourceBlock<T> _realSource;

public TargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource)
{
_realTarget = realTarget;
_realSource = realSource;
}

DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
DataflowMessageHeader header, (T, long) messageValue,
ISourceBlock<(T, long)> source, bool consumeToAccept)
{
var (value, index) = messageValue;
return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
}

Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => throw new NotSupportedException();
void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
}

}

关于c# - 防止 BroadcastBlock 在 LinkTo 上发送缓冲消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63160739/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com